| | | 1 | | using System.Runtime.CompilerServices; |
| | | 2 | | using Elsa.Common.Models; |
| | | 3 | | using Elsa.Extensions; |
| | | 4 | | using Elsa.Workflows.Attributes; |
| | | 5 | | using Elsa.Workflows.Management; |
| | | 6 | | using Elsa.Workflows.Models; |
| | | 7 | | using Elsa.Workflows.Runtime.Requests; |
| | | 8 | | using Elsa.Workflows.Runtime.Stimuli; |
| | | 9 | | using Elsa.Workflows.Runtime.UIHints; |
| | | 10 | | using Elsa.Workflows.UIHints; |
| | | 11 | | using JetBrains.Annotations; |
| | | 12 | | |
| | | 13 | | namespace Elsa.Workflows.Runtime.Activities; |
| | | 14 | | |
| | | 15 | | /// <summary> |
| | | 16 | | /// Creates a new workflow instance of the specified workflow and dispatches it for execution. |
| | | 17 | | /// </summary> |
| | | 18 | | [Activity("Elsa", "Composition", "Create a new workflow instance of the specified workflow and dispatch it for execution |
| | | 19 | | [UsedImplicitly] |
| | | 20 | | public class DispatchWorkflow : Activity<object> |
| | | 21 | | { |
| | | 22 | | /// <inheritdoc /> |
| | 90 | 23 | | public DispatchWorkflow([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, |
| | | 24 | | { |
| | 90 | 25 | | } |
| | | 26 | | |
| | | 27 | | /// <summary> |
| | | 28 | | /// The definition ID of the workflow to dispatch. |
| | | 29 | | /// </summary> |
| | | 30 | | [Input( |
| | | 31 | | DisplayName = "Workflow Definition", |
| | | 32 | | Description = "The definition ID of the workflow to dispatch.", |
| | | 33 | | UIHint = InputUIHints.WorkflowDefinitionPicker |
| | | 34 | | )] |
| | 323 | 35 | | public Input<string> WorkflowDefinitionId { get; set; } = null!; |
| | | 36 | | |
| | | 37 | | /// <summary> |
| | | 38 | | /// The correlation ID to associate the workflow with. |
| | | 39 | | /// </summary> |
| | | 40 | | [Input( |
| | | 41 | | DisplayName = "Correlation ID", |
| | | 42 | | Description = "The correlation ID to associate the workflow with." |
| | | 43 | | )] |
| | 250 | 44 | | public Input<string?> CorrelationId { get; set; } = null!; |
| | | 45 | | |
| | | 46 | | /// <summary> |
| | | 47 | | /// The input to send to the workflow. |
| | | 48 | | /// </summary> |
| | | 49 | | [Input(Description = "The input to send to the workflow.")] |
| | 250 | 50 | | public Input<IDictionary<string, object>?> Input { get; set; } = null!; |
| | | 51 | | |
| | | 52 | | /// <summary> |
| | | 53 | | /// True to wait for the child workflow to complete before completing this activity, false to "fire and forget". |
| | | 54 | | /// </summary> |
| | | 55 | | [Input(Description = "Wait for the child workflow to complete before completing this activity.")] |
| | 323 | 56 | | public Input<bool> WaitForCompletion { get; set; } = null!; |
| | | 57 | | |
| | | 58 | | /// <summary> |
| | | 59 | | /// Indicates whether a new trace context should be started for the workflow execution. |
| | | 60 | | /// </summary> |
| | | 61 | | [Input(Description = "Start a new trace context when using Open Telemetry.", Category = "Open Telemetry")] |
| | 232 | 62 | | public Input<bool> StartNewTrace { get; set; } = null!; |
| | | 63 | | |
| | | 64 | | /// <summary> |
| | | 65 | | /// The channel to dispatch the workflow to. |
| | | 66 | | /// </summary> |
| | | 67 | | [Input( |
| | | 68 | | DisplayName = "Channel", |
| | | 69 | | Description = "The channel to dispatch the workflow to.", |
| | | 70 | | UIHint = InputUIHints.DropDown, |
| | | 71 | | UIHandler = typeof(DispatcherChannelOptionsProvider) |
| | | 72 | | )] |
| | 232 | 73 | | public Input<string?> ChannelName { get; set; } = null!; |
| | | 74 | | |
| | | 75 | | /// <inheritdoc /> |
| | | 76 | | protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) |
| | | 77 | | { |
| | 5 | 78 | | var waitForCompletion = WaitForCompletion.GetOrDefault(context); |
| | | 79 | | |
| | | 80 | | // Dispatch the child workflow. |
| | 5 | 81 | | var instanceId = await DispatchChildWorkflowAsync(context, waitForCompletion); |
| | | 82 | | |
| | | 83 | | // If we need to wait for the child workflow to complete, create a bookmark. |
| | 4 | 84 | | if (waitForCompletion) |
| | | 85 | | { |
| | 3 | 86 | | var bookmarkOptions = new CreateBookmarkArgs |
| | 3 | 87 | | { |
| | 3 | 88 | | Callback = OnChildWorkflowCompletedAsync, |
| | 3 | 89 | | Stimulus = new DispatchWorkflowStimulus(instanceId), |
| | 3 | 90 | | IncludeActivityInstanceId = false |
| | 3 | 91 | | }; |
| | 3 | 92 | | context.CreateBookmark(bookmarkOptions); |
| | | 93 | | } |
| | | 94 | | else |
| | | 95 | | { |
| | | 96 | | // Otherwise, we can complete immediately. |
| | 1 | 97 | | await context.CompleteActivityAsync(); |
| | | 98 | | } |
| | 4 | 99 | | } |
| | | 100 | | |
| | | 101 | | private async ValueTask<string> DispatchChildWorkflowAsync(ActivityExecutionContext context, bool waitForCompletion) |
| | | 102 | | { |
| | 5 | 103 | | var workflowDefinitionId = WorkflowDefinitionId.Get(context); |
| | 5 | 104 | | var workflowDefinitionService = context.GetRequiredService<IWorkflowDefinitionService>(); |
| | 5 | 105 | | var workflowGraph = await workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionId, VersionOptions. |
| | | 106 | | |
| | 5 | 107 | | if (workflowGraph == null) |
| | 1 | 108 | | throw new($"No published version of workflow definition with ID {workflowDefinitionId} found."); |
| | | 109 | | |
| | 4 | 110 | | var input = Input.GetOrDefault(context) ?? new Dictionary<string, object>(); |
| | 4 | 111 | | var channelName = ChannelName.GetOrDefault(context); |
| | 4 | 112 | | var startNewTrace = StartNewTrace.GetOrDefault(context); |
| | 4 | 113 | | var parentInstanceId = context.WorkflowExecutionContext.Id; |
| | 4 | 114 | | var properties = new Dictionary<string, object> |
| | 4 | 115 | | { |
| | 4 | 116 | | ["ParentInstanceId"] = parentInstanceId, |
| | 4 | 117 | | }; |
| | | 118 | | |
| | | 119 | | // If we need to wait for the child workflow to complete, set the property. This will be used by the ResumeDispa |
| | 7 | 120 | | if (waitForCompletion) properties["WaitForCompletion"] = true; |
| | 4 | 121 | | if (startNewTrace) properties["StartNewTrace"] = true; |
| | | 122 | | |
| | 4 | 123 | | input["ParentInstanceId"] = parentInstanceId; |
| | | 124 | | |
| | 4 | 125 | | var correlationId = CorrelationId.GetOrDefault(context); |
| | 4 | 126 | | var workflowDispatcher = context.GetRequiredService<IWorkflowDispatcher>(); |
| | 4 | 127 | | var identityGenerator = context.GetRequiredService<IIdentityGenerator>(); |
| | 4 | 128 | | var instanceId = identityGenerator.GenerateId(); |
| | 4 | 129 | | var request = new DispatchWorkflowDefinitionRequest(workflowGraph.Workflow.Identity.Id) |
| | 4 | 130 | | { |
| | 4 | 131 | | ParentWorkflowInstanceId = parentInstanceId, |
| | 4 | 132 | | Input = input, |
| | 4 | 133 | | Properties = properties, |
| | 4 | 134 | | CorrelationId = correlationId, |
| | 4 | 135 | | InstanceId = instanceId, |
| | 4 | 136 | | }; |
| | 4 | 137 | | var options = new DispatchWorkflowOptions |
| | 4 | 138 | | { |
| | 4 | 139 | | Channel = channelName |
| | 4 | 140 | | }; |
| | | 141 | | |
| | | 142 | | // Dispatch the child workflow. |
| | 4 | 143 | | var dispatchResponse = await workflowDispatcher.DispatchAsync(request, options, context.CancellationToken); |
| | 4 | 144 | | dispatchResponse.ThrowIfFailed(); |
| | | 145 | | |
| | 4 | 146 | | return instanceId; |
| | 4 | 147 | | } |
| | | 148 | | |
| | | 149 | | private async ValueTask OnChildWorkflowCompletedAsync(ActivityExecutionContext context) |
| | | 150 | | { |
| | 3 | 151 | | var input = context.WorkflowInput; |
| | 3 | 152 | | context.Set(Result, input); |
| | 3 | 153 | | await context.CompleteActivityAsync(); |
| | 3 | 154 | | } |
| | | 155 | | } |