< Summary

Information
Class: Elsa.Workflows.Runtime.Activities.BulkDispatchWorkflows
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Activities/BulkDispatchWorkflows.cs
Line coverage
98%
Covered lines: 114
Uncovered lines: 2
Coverable lines: 116
Total lines: 259
Line coverage: 98.2%
Branch coverage
92%
Covered branches: 26
Total branches: 28
Branch coverage: 92.8%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_WorkflowDefinitionId()100%11100%
get_Items()100%11100%
get_DefaultItemInputKey()100%11100%
get_CorrelationIdFunction()100%11100%
get_Input()100%11100%
get_WaitForCompletion()100%11100%
get_StartNewTrace()100%11100%
get_ChannelName()100%11100%
get_ChildCompleted()100%11100%
get_ChildFaulted()100%11100%
ExecuteAsync()100%66100%
DispatchChildWorkflowAsync()91.66%1212100%
OnChildWorkflowCompletedAsync()87.5%8892.85%
OnChildFinishedCompletedAsync()100%11100%
AttemptToCompleteAsync()100%22100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Activities/BulkDispatchWorkflows.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using Elsa.Common.Models;
 3using Elsa.Expressions.Contracts;
 4using Elsa.Expressions.Helpers;
 5using Elsa.Expressions.Models;
 6using Elsa.Extensions;
 7using Elsa.Workflows.Activities.Flowchart.Attributes;
 8using Elsa.Workflows.Attributes;
 9using Elsa.Workflows.Management;
 10using Elsa.Workflows.Memory;
 11using Elsa.Workflows.Models;
 12using Elsa.Workflows.Options;
 13using Elsa.Workflows.Runtime.Requests;
 14using Elsa.Workflows.Runtime.Stimuli;
 15using Elsa.Workflows.Runtime.UIHints;
 16using Elsa.Workflows.UIHints;
 17using JetBrains.Annotations;
 18
 19namespace Elsa.Workflows.Runtime.Activities;
 20
 21/// <summary>
 22/// Creates new workflow instances of the specified workflow for each item in the data source and dispatches them for ex
 23/// </summary>
 24[Activity("Elsa", "Composition", "Create new workflow instances for each item in the data source and dispatch them for e
 25[FlowNode("Completed", "Canceled", "Done")]
 26[UsedImplicitly]
 27public class BulkDispatchWorkflows : Activity
 28{
 29    private const string DispatchedInstancesCountKey = nameof(DispatchedInstancesCountKey);
 30    private const string CompletedInstancesCountKey = nameof(CompletedInstancesCountKey);
 31
 32    /// <inheritdoc />
 14533    public BulkDispatchWorkflows([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(sou
 34    {
 14535    }
 36
 37    /// <summary>
 38    /// The definition ID of the workflows to dispatch.
 39    /// </summary>
 40    [Input(
 41        DisplayName = "Workflow Definition",
 42        Description = "The definition ID of the workflows to dispatch.",
 43        UIHint = InputUIHints.WorkflowDefinitionPicker
 44    )]
 53645    public Input<string> WorkflowDefinitionId { get; set; } = null!;
 46
 47    /// <summary>
 48    /// The data source to use for dispatching the workflows.
 49    /// </summary>
 50    [Input(Description = "The data source to use for dispatching the workflows.")]
 52751    public Input<object> Items { get; set; } = null!;
 52
 53    /// <summary>
 54    /// The default key to use for the item input. Will not be used if the Items contain a list of dictionaries.
 55    /// </summary>
 56    [Input(Description = "The default key to use for the input name when sending the current item to the dispatched work
 53557    public Input<string> DefaultItemInputKey { get; set; } = new("Item");
 58
 59    /// <summary>
 60    /// The correlation ID to associate the workflow with.
 61    /// </summary>
 62    [Input(
 63        DisplayName = "Correlation ID Function",
 64        Description = "A function to compute the correlation ID to associate a dispatched workflow with.",
 65        AutoEvaluate = false)]
 39266    public Input<string?>? CorrelationIdFunction { get; set; }
 67
 68    /// <summary>
 69    /// The input to send to the workflows.
 70    /// </summary>
 71    [Input(Description = "Additional input to send to the workflows being dispatched.")]
 39072    public Input<IDictionary<string, object>?> Input { get; set; } = null!;
 73
 74    /// <summary>
 75    /// True to wait for the child workflow to complete before completing this activity, false to "fire and forget".
 76    /// </summary>
 77    [Input(
 78        Description = "Wait for the dispatched workflows to complete before completing this activity.",
 79        DefaultValue = true)]
 66980    public Input<bool> WaitForCompletion { get; set; } = new(true);
 81
 82    /// <summary>
 83    /// Indicates whether a new trace context should be started for the workflow execution.
 84    /// </summary>
 85    [Input(Description = "Start a new trace context when using Open Telemetry.", Category = "Open Telemetry")]
 52786    public Input<bool> StartNewTrace { get; set; } = new(false);
 87
 88    /// <summary>
 89    /// The channel to dispatch the workflow to.
 90    /// </summary>
 91    [Input(
 92        DisplayName = "Channel",
 93        Description = "The channel to dispatch the workflow to.",
 94        UIHint = InputUIHints.DropDown,
 95        UIHandler = typeof(DispatcherChannelOptionsProvider)
 96    )]
 39097    public Input<string?> ChannelName { get; set; } = null!;
 98
 99    /// <summary>
 100    /// An activity to execute when the child workflow finishes.
 101    /// </summary>
 440102    [Port] public IActivity? ChildCompleted { get; set; }
 103
 104    /// <summary>
 105    /// An activity to execute when the child workflow faults.
 106    /// </summary>
 437107    [Port] public IActivity? ChildFaulted { get; set; }
 108
 109    /// <inheritdoc />
 110    protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
 111    {
 7112        var waitForCompletion = WaitForCompletion.GetOrDefault(context);
 7113        var startNewTrace = StartNewTrace.GetOrDefault(context);
 7114        var items = await context.GetItemSource<object>(Items).ToListAsync(context.CancellationToken);
 7115        var count = items.Count;
 116
 117        // Dispatch the child workflows.
 45118        foreach (var item in items)
 16119            await DispatchChildWorkflowAsync(context, item, waitForCompletion, startNewTrace);
 120
 121        // Store the number of dispatched instances for tracking.
 6122        context.SetProperty(DispatchedInstancesCountKey, count);
 123
 124        // If we need to wait for the child workflows to complete (if any), create a bookmark.
 6125        if (waitForCompletion && count > 0)
 126        {
 4127            var workflowInstanceId = context.WorkflowExecutionContext.Id;
 4128            var bookmarkOptions = new CreateBookmarkArgs
 4129            {
 4130                Callback = OnChildWorkflowCompletedAsync,
 4131                Stimulus = new BulkDispatchWorkflowsStimulus(workflowInstanceId)
 4132                {
 4133                    ParentInstanceId = context.WorkflowExecutionContext.Id,
 4134                    ScheduledInstanceIdsCount = count
 4135                },
 4136                IncludeActivityInstanceId = false,
 4137                AutoBurn = false,
 4138            };
 139
 4140            context.CreateBookmark(bookmarkOptions);
 141        }
 142        else
 143        {
 144            // Otherwise, we can complete immediately.
 2145            await context.CompleteActivityWithOutcomesAsync("Done");
 146        }
 6147    }
 148
 149    private async ValueTask<string> DispatchChildWorkflowAsync(ActivityExecutionContext context, object item, bool waitF
 150    {
 16151        var workflowDefinitionId = WorkflowDefinitionId.Get(context);
 16152        var workflowDefinitionService = context.GetRequiredService<IWorkflowDefinitionService>();
 16153        var workflowGraph = await workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionId, VersionOptions.
 154
 16155        if (workflowGraph == null)
 1156            throw new($"No published version of workflow definition with ID {workflowDefinitionId} found.");
 157
 15158        var parentInstanceId = context.WorkflowExecutionContext.Id;
 15159        var input = Input.GetOrDefault(context) ?? new Dictionary<string, object>();
 15160        var channelName = ChannelName.GetOrDefault(context);
 15161        var defaultInputItemKey = DefaultItemInputKey.GetOrDefault(context, () => "Item")!;
 15162        var properties = new Dictionary<string, object>
 15163        {
 15164            ["ParentInstanceId"] = parentInstanceId
 15165        };
 166
 27167        if (waitForCompletion) properties["WaitForCompletion"] = true;
 15168        if (startNewTrace) properties["StartNewTrace"] = true;
 169
 15170        var itemDictionary = new Dictionary<string, object>
 15171        {
 15172            [defaultInputItemKey] = item
 15173        };
 174
 15175        var evaluatorOptions = new ExpressionEvaluatorOptions
 15176        {
 15177            Arguments = itemDictionary
 15178        };
 179
 15180        var inputDictionary = item as IDictionary<string, object> ?? itemDictionary;
 15181        input["ParentInstanceId"] = parentInstanceId;
 15182        input.Merge(inputDictionary);
 183
 15184        var workflowDispatcher = context.GetRequiredService<IWorkflowDispatcher>();
 15185        var identityGenerator = context.GetRequiredService<IIdentityGenerator>();
 15186        var evaluator = context.GetRequiredService<IExpressionEvaluator>();
 15187        var correlationId = CorrelationIdFunction != null ? await evaluator.EvaluateAsync<string>(CorrelationIdFunction!
 15188        var instanceId = identityGenerator.GenerateId();
 15189        var request = new DispatchWorkflowDefinitionRequest(workflowGraph.Workflow.Identity.Id)
 15190        {
 15191            ParentWorkflowInstanceId = parentInstanceId,
 15192            Input = input,
 15193            Properties = properties,
 15194            CorrelationId = correlationId,
 15195            InstanceId = instanceId
 15196        };
 15197        var options = new DispatchWorkflowOptions
 15198        {
 15199            Channel = channelName
 15200        };
 201
 15202        await workflowDispatcher.DispatchAsync(request, options, context.CancellationToken);
 15203        return instanceId;
 15204    }
 205
 206    private async ValueTask OnChildWorkflowCompletedAsync(ActivityExecutionContext context)
 207    {
 12208        var input = context.WorkflowInput;
 12209        var workflowInstanceId = input["WorkflowInstanceId"].ConvertTo<string>()!;
 12210        var workflowSubStatus = input["WorkflowSubStatus"].ConvertTo<WorkflowSubStatus>();
 12211        var finishedInstancesCount = context.GetProperty<long>(CompletedInstancesCountKey) + 1;
 212
 12213        context.SetProperty(CompletedInstancesCountKey, finishedInstancesCount);
 214
 12215        var childInstanceId = new Variable<string>("ChildInstanceId", workflowInstanceId)
 12216        {
 12217            StorageDriverType = typeof(WorkflowInstanceStorageDriver)
 12218        };
 219
 12220        var variables = new List<Variable>
 12221        {
 12222            childInstanceId
 12223        };
 224
 12225        var options = new ScheduleWorkOptions
 12226        {
 12227            Input = input,
 12228            Variables = variables,
 12229            CompletionCallback = OnChildFinishedCompletedAsync
 12230        };
 231
 12232        switch (workflowSubStatus)
 233        {
 3234            case WorkflowSubStatus.Faulted when ChildFaulted is not null:
 3235                await context.ScheduleActivityAsync(ChildFaulted, options);
 3236                return;
 9237            case WorkflowSubStatus.Finished when ChildCompleted is not null:
 0238                await context.ScheduleActivityAsync(ChildCompleted, options);
 0239                return;
 240            default:
 9241                await AttemptToCompleteAsync(context);
 242                break;
 243        }
 12244    }
 245
 246    private async ValueTask OnChildFinishedCompletedAsync(ActivityCompletedContext context)
 247    {
 3248        await AttemptToCompleteAsync(context.TargetContext);
 3249    }
 250
 251    private async ValueTask AttemptToCompleteAsync(ActivityExecutionContext context)
 252    {
 12253        var dispatchedInstancesCount = context.GetProperty<long>(DispatchedInstancesCountKey);
 12254        var finishedInstancesCount = context.GetProperty<long>(CompletedInstancesCountKey);
 255
 12256        if (finishedInstancesCount >= dispatchedInstancesCount)
 4257            await context.CompleteActivityWithOutcomesAsync("Completed", "Done");
 12258    }
 259}