| | | 1 | | using System.Runtime.CompilerServices; |
| | | 2 | | using Elsa.Common.Models; |
| | | 3 | | using Elsa.Expressions.Contracts; |
| | | 4 | | using Elsa.Expressions.Helpers; |
| | | 5 | | using Elsa.Expressions.Models; |
| | | 6 | | using Elsa.Extensions; |
| | | 7 | | using Elsa.Workflows.Activities.Flowchart.Attributes; |
| | | 8 | | using Elsa.Workflows.Attributes; |
| | | 9 | | using Elsa.Workflows.Management; |
| | | 10 | | using Elsa.Workflows.Memory; |
| | | 11 | | using Elsa.Workflows.Models; |
| | | 12 | | using Elsa.Workflows.Options; |
| | | 13 | | using Elsa.Workflows.Runtime.Requests; |
| | | 14 | | using Elsa.Workflows.Runtime.Stimuli; |
| | | 15 | | using Elsa.Workflows.Runtime.UIHints; |
| | | 16 | | using Elsa.Workflows.UIHints; |
| | | 17 | | using JetBrains.Annotations; |
| | | 18 | | |
| | | 19 | | namespace Elsa.Workflows.Runtime.Activities; |
| | | 20 | | |
| | | 21 | | /// <summary> |
| | | 22 | | /// Creates new workflow instances of the specified workflow for each item in the data source and dispatches them for ex |
| | | 23 | | /// </summary> |
| | | 24 | | [Activity("Elsa", "Composition", "Create new workflow instances for each item in the data source and dispatch them for e |
| | | 25 | | [FlowNode("Completed", "Canceled", "Done")] |
| | | 26 | | [UsedImplicitly] |
| | | 27 | | public class BulkDispatchWorkflows : Activity |
| | | 28 | | { |
| | | 29 | | private const string DispatchedInstancesCountKey = nameof(DispatchedInstancesCountKey); |
| | | 30 | | private const string CompletedInstancesCountKey = nameof(CompletedInstancesCountKey); |
| | | 31 | | |
| | | 32 | | /// <inheritdoc /> |
| | 145 | 33 | | public BulkDispatchWorkflows([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(sou |
| | | 34 | | { |
| | 145 | 35 | | } |
| | | 36 | | |
| | | 37 | | /// <summary> |
| | | 38 | | /// The definition ID of the workflows to dispatch. |
| | | 39 | | /// </summary> |
| | | 40 | | [Input( |
| | | 41 | | DisplayName = "Workflow Definition", |
| | | 42 | | Description = "The definition ID of the workflows to dispatch.", |
| | | 43 | | UIHint = InputUIHints.WorkflowDefinitionPicker |
| | | 44 | | )] |
| | 536 | 45 | | public Input<string> WorkflowDefinitionId { get; set; } = null!; |
| | | 46 | | |
| | | 47 | | /// <summary> |
| | | 48 | | /// The data source to use for dispatching the workflows. |
| | | 49 | | /// </summary> |
| | | 50 | | [Input(Description = "The data source to use for dispatching the workflows.")] |
| | 527 | 51 | | public Input<object> Items { get; set; } = null!; |
| | | 52 | | |
| | | 53 | | /// <summary> |
| | | 54 | | /// The default key to use for the item input. Will not be used if the Items contain a list of dictionaries. |
| | | 55 | | /// </summary> |
| | | 56 | | [Input(Description = "The default key to use for the input name when sending the current item to the dispatched work |
| | 535 | 57 | | public Input<string> DefaultItemInputKey { get; set; } = new("Item"); |
| | | 58 | | |
| | | 59 | | /// <summary> |
| | | 60 | | /// The correlation ID to associate the workflow with. |
| | | 61 | | /// </summary> |
| | | 62 | | [Input( |
| | | 63 | | DisplayName = "Correlation ID Function", |
| | | 64 | | Description = "A function to compute the correlation ID to associate a dispatched workflow with.", |
| | | 65 | | AutoEvaluate = false)] |
| | 392 | 66 | | public Input<string?>? CorrelationIdFunction { get; set; } |
| | | 67 | | |
| | | 68 | | /// <summary> |
| | | 69 | | /// The input to send to the workflows. |
| | | 70 | | /// </summary> |
| | | 71 | | [Input(Description = "Additional input to send to the workflows being dispatched.")] |
| | 390 | 72 | | public Input<IDictionary<string, object>?> Input { get; set; } = null!; |
| | | 73 | | |
| | | 74 | | /// <summary> |
| | | 75 | | /// True to wait for the child workflow to complete before completing this activity, false to "fire and forget". |
| | | 76 | | /// </summary> |
| | | 77 | | [Input( |
| | | 78 | | Description = "Wait for the dispatched workflows to complete before completing this activity.", |
| | | 79 | | DefaultValue = true)] |
| | 669 | 80 | | public Input<bool> WaitForCompletion { get; set; } = new(true); |
| | | 81 | | |
| | | 82 | | /// <summary> |
| | | 83 | | /// Indicates whether a new trace context should be started for the workflow execution. |
| | | 84 | | /// </summary> |
| | | 85 | | [Input(Description = "Start a new trace context when using Open Telemetry.", Category = "Open Telemetry")] |
| | 527 | 86 | | public Input<bool> StartNewTrace { get; set; } = new(false); |
| | | 87 | | |
| | | 88 | | /// <summary> |
| | | 89 | | /// The channel to dispatch the workflow to. |
| | | 90 | | /// </summary> |
| | | 91 | | [Input( |
| | | 92 | | DisplayName = "Channel", |
| | | 93 | | Description = "The channel to dispatch the workflow to.", |
| | | 94 | | UIHint = InputUIHints.DropDown, |
| | | 95 | | UIHandler = typeof(DispatcherChannelOptionsProvider) |
| | | 96 | | )] |
| | 390 | 97 | | public Input<string?> ChannelName { get; set; } = null!; |
| | | 98 | | |
| | | 99 | | /// <summary> |
| | | 100 | | /// An activity to execute when the child workflow finishes. |
| | | 101 | | /// </summary> |
| | 440 | 102 | | [Port] public IActivity? ChildCompleted { get; set; } |
| | | 103 | | |
| | | 104 | | /// <summary> |
| | | 105 | | /// An activity to execute when the child workflow faults. |
| | | 106 | | /// </summary> |
| | 437 | 107 | | [Port] public IActivity? ChildFaulted { get; set; } |
| | | 108 | | |
| | | 109 | | /// <inheritdoc /> |
| | | 110 | | protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) |
| | | 111 | | { |
| | 7 | 112 | | var waitForCompletion = WaitForCompletion.GetOrDefault(context); |
| | 7 | 113 | | var startNewTrace = StartNewTrace.GetOrDefault(context); |
| | 7 | 114 | | var items = await context.GetItemSource<object>(Items).ToListAsync(context.CancellationToken); |
| | 7 | 115 | | var count = items.Count; |
| | | 116 | | |
| | | 117 | | // Dispatch the child workflows. |
| | 45 | 118 | | foreach (var item in items) |
| | 16 | 119 | | await DispatchChildWorkflowAsync(context, item, waitForCompletion, startNewTrace); |
| | | 120 | | |
| | | 121 | | // Store the number of dispatched instances for tracking. |
| | 6 | 122 | | context.SetProperty(DispatchedInstancesCountKey, count); |
| | | 123 | | |
| | | 124 | | // If we need to wait for the child workflows to complete (if any), create a bookmark. |
| | 6 | 125 | | if (waitForCompletion && count > 0) |
| | | 126 | | { |
| | 4 | 127 | | var workflowInstanceId = context.WorkflowExecutionContext.Id; |
| | 4 | 128 | | var bookmarkOptions = new CreateBookmarkArgs |
| | 4 | 129 | | { |
| | 4 | 130 | | Callback = OnChildWorkflowCompletedAsync, |
| | 4 | 131 | | Stimulus = new BulkDispatchWorkflowsStimulus(workflowInstanceId) |
| | 4 | 132 | | { |
| | 4 | 133 | | ParentInstanceId = context.WorkflowExecutionContext.Id, |
| | 4 | 134 | | ScheduledInstanceIdsCount = count |
| | 4 | 135 | | }, |
| | 4 | 136 | | IncludeActivityInstanceId = false, |
| | 4 | 137 | | AutoBurn = false, |
| | 4 | 138 | | }; |
| | | 139 | | |
| | 4 | 140 | | context.CreateBookmark(bookmarkOptions); |
| | | 141 | | } |
| | | 142 | | else |
| | | 143 | | { |
| | | 144 | | // Otherwise, we can complete immediately. |
| | 2 | 145 | | await context.CompleteActivityWithOutcomesAsync("Done"); |
| | | 146 | | } |
| | 6 | 147 | | } |
| | | 148 | | |
| | | 149 | | private async ValueTask<string> DispatchChildWorkflowAsync(ActivityExecutionContext context, object item, bool waitF |
| | | 150 | | { |
| | 16 | 151 | | var workflowDefinitionId = WorkflowDefinitionId.Get(context); |
| | 16 | 152 | | var workflowDefinitionService = context.GetRequiredService<IWorkflowDefinitionService>(); |
| | 16 | 153 | | var workflowGraph = await workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionId, VersionOptions. |
| | | 154 | | |
| | 16 | 155 | | if (workflowGraph == null) |
| | 1 | 156 | | throw new($"No published version of workflow definition with ID {workflowDefinitionId} found."); |
| | | 157 | | |
| | 15 | 158 | | var parentInstanceId = context.WorkflowExecutionContext.Id; |
| | 15 | 159 | | var input = Input.GetOrDefault(context) ?? new Dictionary<string, object>(); |
| | 15 | 160 | | var channelName = ChannelName.GetOrDefault(context); |
| | 15 | 161 | | var defaultInputItemKey = DefaultItemInputKey.GetOrDefault(context, () => "Item")!; |
| | 15 | 162 | | var properties = new Dictionary<string, object> |
| | 15 | 163 | | { |
| | 15 | 164 | | ["ParentInstanceId"] = parentInstanceId |
| | 15 | 165 | | }; |
| | | 166 | | |
| | 27 | 167 | | if (waitForCompletion) properties["WaitForCompletion"] = true; |
| | 15 | 168 | | if (startNewTrace) properties["StartNewTrace"] = true; |
| | | 169 | | |
| | 15 | 170 | | var itemDictionary = new Dictionary<string, object> |
| | 15 | 171 | | { |
| | 15 | 172 | | [defaultInputItemKey] = item |
| | 15 | 173 | | }; |
| | | 174 | | |
| | 15 | 175 | | var evaluatorOptions = new ExpressionEvaluatorOptions |
| | 15 | 176 | | { |
| | 15 | 177 | | Arguments = itemDictionary |
| | 15 | 178 | | }; |
| | | 179 | | |
| | 15 | 180 | | var inputDictionary = item as IDictionary<string, object> ?? itemDictionary; |
| | 15 | 181 | | input["ParentInstanceId"] = parentInstanceId; |
| | 15 | 182 | | input.Merge(inputDictionary); |
| | | 183 | | |
| | 15 | 184 | | var workflowDispatcher = context.GetRequiredService<IWorkflowDispatcher>(); |
| | 15 | 185 | | var identityGenerator = context.GetRequiredService<IIdentityGenerator>(); |
| | 15 | 186 | | var evaluator = context.GetRequiredService<IExpressionEvaluator>(); |
| | 15 | 187 | | var correlationId = CorrelationIdFunction != null ? await evaluator.EvaluateAsync<string>(CorrelationIdFunction! |
| | 15 | 188 | | var instanceId = identityGenerator.GenerateId(); |
| | 15 | 189 | | var request = new DispatchWorkflowDefinitionRequest(workflowGraph.Workflow.Identity.Id) |
| | 15 | 190 | | { |
| | 15 | 191 | | ParentWorkflowInstanceId = parentInstanceId, |
| | 15 | 192 | | Input = input, |
| | 15 | 193 | | Properties = properties, |
| | 15 | 194 | | CorrelationId = correlationId, |
| | 15 | 195 | | InstanceId = instanceId |
| | 15 | 196 | | }; |
| | 15 | 197 | | var options = new DispatchWorkflowOptions |
| | 15 | 198 | | { |
| | 15 | 199 | | Channel = channelName |
| | 15 | 200 | | }; |
| | | 201 | | |
| | 15 | 202 | | await workflowDispatcher.DispatchAsync(request, options, context.CancellationToken); |
| | 15 | 203 | | return instanceId; |
| | 15 | 204 | | } |
| | | 205 | | |
| | | 206 | | private async ValueTask OnChildWorkflowCompletedAsync(ActivityExecutionContext context) |
| | | 207 | | { |
| | 12 | 208 | | var input = context.WorkflowInput; |
| | 12 | 209 | | var workflowInstanceId = input["WorkflowInstanceId"].ConvertTo<string>()!; |
| | 12 | 210 | | var workflowSubStatus = input["WorkflowSubStatus"].ConvertTo<WorkflowSubStatus>(); |
| | 12 | 211 | | var finishedInstancesCount = context.GetProperty<long>(CompletedInstancesCountKey) + 1; |
| | | 212 | | |
| | 12 | 213 | | context.SetProperty(CompletedInstancesCountKey, finishedInstancesCount); |
| | | 214 | | |
| | 12 | 215 | | var childInstanceId = new Variable<string>("ChildInstanceId", workflowInstanceId) |
| | 12 | 216 | | { |
| | 12 | 217 | | StorageDriverType = typeof(WorkflowInstanceStorageDriver) |
| | 12 | 218 | | }; |
| | | 219 | | |
| | 12 | 220 | | var variables = new List<Variable> |
| | 12 | 221 | | { |
| | 12 | 222 | | childInstanceId |
| | 12 | 223 | | }; |
| | | 224 | | |
| | 12 | 225 | | var options = new ScheduleWorkOptions |
| | 12 | 226 | | { |
| | 12 | 227 | | Input = input, |
| | 12 | 228 | | Variables = variables, |
| | 12 | 229 | | CompletionCallback = OnChildFinishedCompletedAsync |
| | 12 | 230 | | }; |
| | | 231 | | |
| | 12 | 232 | | switch (workflowSubStatus) |
| | | 233 | | { |
| | 3 | 234 | | case WorkflowSubStatus.Faulted when ChildFaulted is not null: |
| | 3 | 235 | | await context.ScheduleActivityAsync(ChildFaulted, options); |
| | 3 | 236 | | return; |
| | 9 | 237 | | case WorkflowSubStatus.Finished when ChildCompleted is not null: |
| | 0 | 238 | | await context.ScheduleActivityAsync(ChildCompleted, options); |
| | 0 | 239 | | return; |
| | | 240 | | default: |
| | 9 | 241 | | await AttemptToCompleteAsync(context); |
| | | 242 | | break; |
| | | 243 | | } |
| | 12 | 244 | | } |
| | | 245 | | |
| | | 246 | | private async ValueTask OnChildFinishedCompletedAsync(ActivityCompletedContext context) |
| | | 247 | | { |
| | 3 | 248 | | await AttemptToCompleteAsync(context.TargetContext); |
| | 3 | 249 | | } |
| | | 250 | | |
| | | 251 | | private async ValueTask AttemptToCompleteAsync(ActivityExecutionContext context) |
| | | 252 | | { |
| | 12 | 253 | | var dispatchedInstancesCount = context.GetProperty<long>(DispatchedInstancesCountKey); |
| | 12 | 254 | | var finishedInstancesCount = context.GetProperty<long>(CompletedInstancesCountKey); |
| | | 255 | | |
| | 12 | 256 | | if (finishedInstancesCount >= dispatchedInstancesCount) |
| | 4 | 257 | | await context.CompleteActivityWithOutcomesAsync("Completed", "Done"); |
| | 12 | 258 | | } |
| | | 259 | | } |