< Summary

Information
Class: Elsa.Workflows.Runtime.Activities.BulkDispatchWorkflows
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Activities/BulkDispatchWorkflows.cs
Line coverage
98%
Covered lines: 115
Uncovered lines: 2
Coverable lines: 117
Total lines: 260
Line coverage: 98.2%
Branch coverage
89%
Covered branches: 25
Total branches: 28
Branch coverage: 89.2%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_WorkflowDefinitionId()100%11100%
get_Items()100%11100%
get_DefaultItemInputKey()100%11100%
get_CorrelationIdFunction()100%11100%
get_Input()100%11100%
get_WaitForCompletion()100%11100%
get_StartNewTrace()100%11100%
get_ChannelName()100%11100%
get_ChildCompleted()100%11100%
get_ChildFaulted()100%11100%
ExecuteAsync()100%66100%
DispatchChildWorkflowAsync()83.33%1212100%
OnChildWorkflowCompletedAsync()87.5%8892.85%
OnChildFinishedCompletedAsync()100%11100%
AttemptToCompleteAsync()100%22100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Activities/BulkDispatchWorkflows.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using Elsa.Common.Models;
 3using Elsa.Expressions.Contracts;
 4using Elsa.Expressions.Helpers;
 5using Elsa.Expressions.Models;
 6using Elsa.Extensions;
 7using Elsa.Workflows.Activities.Flowchart.Attributes;
 8using Elsa.Workflows.Attributes;
 9using Elsa.Workflows.Management;
 10using Elsa.Workflows.Memory;
 11using Elsa.Workflows.Models;
 12using Elsa.Workflows.Options;
 13using Elsa.Workflows.Runtime.Requests;
 14using Elsa.Workflows.Runtime.Stimuli;
 15using Elsa.Workflows.Runtime.UIHints;
 16using Elsa.Workflows.UIHints;
 17using JetBrains.Annotations;
 18
 19namespace Elsa.Workflows.Runtime.Activities;
 20
 21/// <summary>
 22/// Creates new workflow instances of the specified workflow for each item in the data source and dispatches them for ex
 23/// </summary>
 24[Activity("Elsa", "Composition", "Create new workflow instances for each item in the data source and dispatch them for e
 25[FlowNode("Completed", "Canceled", "Done")]
 26[UsedImplicitly]
 27public class BulkDispatchWorkflows : Activity
 28{
 29    private const string DispatchedInstancesCountKey = nameof(DispatchedInstancesCountKey);
 30    private const string CompletedInstancesCountKey = nameof(CompletedInstancesCountKey);
 31
 32    /// <inheritdoc />
 45733    public BulkDispatchWorkflows([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(sou
 34    {
 45735    }
 36
 37    /// <summary>
 38    /// The definition ID of the workflows to dispatch.
 39    /// </summary>
 40    [Input(
 41        DisplayName = "Workflow Definition",
 42        Description = "The definition ID of the workflows to dispatch.",
 43        UIHint = InputUIHints.WorkflowDefinitionPicker
 44    )]
 170245    public Input<string> WorkflowDefinitionId { get; set; } = null!;
 46
 47    /// <summary>
 48    /// The data source to use for dispatching the workflows.
 49    /// </summary>
 50    [Input(Description = "The data source to use for dispatching the workflows.")]
 169151    public Input<object> Items { get; set; } = null!;
 52
 53    /// <summary>
 54    /// The default key to use for the item input. Will not be used if the Items contain a list of dictionaries.
 55    /// </summary>
 56    [Input(Description = "The default key to use for the input name when sending the current item to the dispatched work
 170157    public Input<string> DefaultItemInputKey { get; set; } = new("Item");
 58
 59    /// <summary>
 60    /// The correlation ID to associate the workflow with.
 61    /// </summary>
 62    [Input(
 63        DisplayName = "Correlation ID Function",
 64        Description = "A function to compute the correlation ID to associate a dispatched workflow with.",
 65        AutoEvaluate = false)]
 128366    public Input<string?>? CorrelationIdFunction { get; set; }
 67
 68    /// <summary>
 69    /// The input to send to the workflows.
 70    /// </summary>
 71    [Input(Description = "Additional input to send to the workflows being dispatched.")]
 125072    public Input<IDictionary<string, object>?> Input { get; set; } = null!;
 73
 74    /// <summary>
 75    /// True to wait for the child workflow to complete before completing this activity, false to "fire and forget".
 76    /// </summary>
 77    [Input(
 78        Description = "Wait for the dispatched workflows to complete before completing this activity.",
 79        DefaultValue = true)]
 214580    public Input<bool> WaitForCompletion { get; set; } = new(true);
 81
 82    /// <summary>
 83    /// Indicates whether a new trace context should be started for the workflow execution.
 84    /// </summary>
 85    [Input(Description = "Start a new trace context when using Open Telemetry.", Category = "Open Telemetry")]
 169186    public Input<bool> StartNewTrace { get; set; } = new(false);
 87
 88    /// <summary>
 89    /// The channel to dispatch the workflow to.
 90    /// </summary>
 91    [Input(
 92        DisplayName = "Channel",
 93        Description = "The channel to dispatch the workflow to.",
 94        UIHint = InputUIHints.DropDown,
 95        UIHandler = typeof(DispatcherChannelOptionsProvider)
 96    )]
 124497    public Input<string?> ChannelName { get; set; } = null!;
 98
 99    /// <summary>
 100    /// An activity to execute when the child workflow finishes.
 101    /// </summary>
 1417102    [Port] public IActivity? ChildCompleted { get; set; }
 103
 104    /// <summary>
 105    /// An activity to execute when the child workflow faults.
 106    /// </summary>
 1414107    [Port] public IActivity? ChildFaulted { get; set; }
 108
 109    /// <inheritdoc />
 110    protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
 111    {
 8112        var waitForCompletion = WaitForCompletion.GetOrDefault(context);
 8113        var startNewTrace = StartNewTrace.GetOrDefault(context);
 8114        var items = await context.GetItemSource<object>(Items).ToListAsync(context.CancellationToken);
 8115        var count = items.Count;
 116
 117        // Dispatch the child workflows.
 53118        foreach (var item in items)
 19119            await DispatchChildWorkflowAsync(context, item, waitForCompletion, startNewTrace);
 120
 121        // Store the number of dispatched instances for tracking.
 7122        context.SetProperty(DispatchedInstancesCountKey, count);
 123
 124        // If we need to wait for the child workflows to complete (if any), create a bookmark.
 7125        if (waitForCompletion && count > 0)
 126        {
 4127            var workflowInstanceId = context.WorkflowExecutionContext.Id;
 4128            var bookmarkOptions = new CreateBookmarkArgs
 4129            {
 4130                Callback = OnChildWorkflowCompletedAsync,
 4131                Stimulus = new BulkDispatchWorkflowsStimulus(workflowInstanceId)
 4132                {
 4133                    ParentInstanceId = context.WorkflowExecutionContext.Id,
 4134                    ScheduledInstanceIdsCount = count
 4135                },
 4136                IncludeActivityInstanceId = false,
 4137                AutoBurn = false,
 4138            };
 139
 4140            context.CreateBookmark(bookmarkOptions);
 141        }
 142        else
 143        {
 144            // Otherwise, we can complete immediately.
 3145            await context.CompleteActivityWithOutcomesAsync("Done");
 146        }
 7147    }
 148
 149    private async ValueTask<string> DispatchChildWorkflowAsync(ActivityExecutionContext context, object item, bool waitF
 150    {
 19151        var workflowDefinitionId = WorkflowDefinitionId.Get(context);
 19152        var workflowDefinitionService = context.GetRequiredService<IWorkflowDefinitionService>();
 19153        var workflowGraph = await workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionId, VersionOptions.
 154
 19155        if (workflowGraph == null)
 1156            throw new($"No published version of workflow definition with ID {workflowDefinitionId} found.");
 157
 18158        var parentInstanceId = context.WorkflowExecutionContext.Id;
 18159        var baseInput = Input.GetOrDefault(context);
 18160        var input = baseInput != null ? new Dictionary<string, object>(baseInput) : new Dictionary<string, object>();
 18161        var channelName = ChannelName.GetOrDefault(context);
 18162        var defaultInputItemKey = DefaultItemInputKey.GetOrDefault(context, () => "Item")!;
 18163        var properties = new Dictionary<string, object>
 18164        {
 18165            ["ParentInstanceId"] = parentInstanceId
 18166        };
 167
 30168        if (waitForCompletion) properties["WaitForCompletion"] = true;
 18169        if (startNewTrace) properties["StartNewTrace"] = true;
 170
 18171        var itemDictionary = new Dictionary<string, object>
 18172        {
 18173            [defaultInputItemKey] = item
 18174        };
 175
 18176        var evaluatorOptions = new ExpressionEvaluatorOptions
 18177        {
 18178            Arguments = itemDictionary
 18179        };
 180
 18181        var inputDictionary = item as IDictionary<string, object> ?? itemDictionary;
 18182        input["ParentInstanceId"] = parentInstanceId;
 18183        input.Merge(inputDictionary);
 184
 18185        var workflowDispatcher = context.GetRequiredService<IWorkflowDispatcher>();
 18186        var identityGenerator = context.GetRequiredService<IIdentityGenerator>();
 18187        var evaluator = context.GetRequiredService<IExpressionEvaluator>();
 18188        var correlationId = CorrelationIdFunction != null ? await evaluator.EvaluateAsync<string>(CorrelationIdFunction!
 18189        var instanceId = identityGenerator.GenerateId();
 18190        var request = new DispatchWorkflowDefinitionRequest(workflowGraph.Workflow.Identity.Id)
 18191        {
 18192            ParentWorkflowInstanceId = parentInstanceId,
 18193            Input = input,
 18194            Properties = properties,
 18195            CorrelationId = correlationId,
 18196            InstanceId = instanceId
 18197        };
 18198        var options = new DispatchWorkflowOptions
 18199        {
 18200            Channel = channelName
 18201        };
 202
 18203        await workflowDispatcher.DispatchAsync(request, options, context.CancellationToken);
 18204        return instanceId;
 18205    }
 206
 207    private async ValueTask OnChildWorkflowCompletedAsync(ActivityExecutionContext context)
 208    {
 12209        var input = context.WorkflowInput;
 12210        var workflowInstanceId = input["WorkflowInstanceId"].ConvertTo<string>()!;
 12211        var workflowSubStatus = input["WorkflowSubStatus"].ConvertTo<WorkflowSubStatus>();
 12212        var finishedInstancesCount = context.GetProperty<long>(CompletedInstancesCountKey) + 1;
 213
 12214        context.SetProperty(CompletedInstancesCountKey, finishedInstancesCount);
 215
 12216        var childInstanceId = new Variable<string>("ChildInstanceId", workflowInstanceId)
 12217        {
 12218            StorageDriverType = typeof(WorkflowInstanceStorageDriver)
 12219        };
 220
 12221        var variables = new List<Variable>
 12222        {
 12223            childInstanceId
 12224        };
 225
 12226        var options = new ScheduleWorkOptions
 12227        {
 12228            Input = input,
 12229            Variables = variables,
 12230            CompletionCallback = OnChildFinishedCompletedAsync
 12231        };
 232
 12233        switch (workflowSubStatus)
 234        {
 3235            case WorkflowSubStatus.Faulted when ChildFaulted is not null:
 3236                await context.ScheduleActivityAsync(ChildFaulted, options);
 3237                return;
 9238            case WorkflowSubStatus.Finished when ChildCompleted is not null:
 0239                await context.ScheduleActivityAsync(ChildCompleted, options);
 0240                return;
 241            default:
 9242                await AttemptToCompleteAsync(context);
 243                break;
 244        }
 12245    }
 246
 247    private async ValueTask OnChildFinishedCompletedAsync(ActivityCompletedContext context)
 248    {
 3249        await AttemptToCompleteAsync(context.TargetContext);
 3250    }
 251
 252    private async ValueTask AttemptToCompleteAsync(ActivityExecutionContext context)
 253    {
 12254        var dispatchedInstancesCount = context.GetProperty<long>(DispatchedInstancesCountKey);
 12255        var finishedInstancesCount = context.GetProperty<long>(CompletedInstancesCountKey);
 256
 12257        if (finishedInstancesCount >= dispatchedInstancesCount)
 4258            await context.CompleteActivityWithOutcomesAsync("Completed", "Done");
 12259    }
 260}