< Summary

Information
Class: Elsa.Workflows.Activities.ParallelForEach<T>
Assembly: Elsa.Workflows.Core
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/ParallelForEachT.cs
Line coverage
91%
Covered lines: 43
Uncovered lines: 4
Coverable lines: 47
Total lines: 125
Line coverage: 91.4%
Branch coverage
100%
Covered branches: 8
Total branches: 8
Branch coverage: 100%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
.ctor(...)100%210%
.ctor(...)100%11100%
.ctor(...)100%11100%
.ctor(...)100%11100%
get_Items()100%11100%
get_Body()100%11100%
ExecuteAsync()100%44100%
OnChildCompleted()100%22100%
GetTagList(...)100%22100%
SetTagList(...)100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/ParallelForEachT.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using System.Text.Json;
 3using System.Text.Json.Nodes;
 4using Elsa.Expressions.Helpers;
 5using Elsa.Expressions.Models;
 6using Elsa.Extensions;
 7using Elsa.Workflows.Attributes;
 8using Elsa.Workflows.Memory;
 9using Elsa.Workflows.Models;
 10
 11namespace Elsa.Workflows.Activities;
 12
 13/// <summary>
 14/// Schedule an activity for each item in parallel.
 15/// </summary>
 16/// <typeparam name="T"></typeparam>
 17[Activity("Elsa", "Looping", "Schedule an activity for each item in parallel.")]
 18public class ParallelForEach<T> : Activity
 19{
 20    private const string ScheduledTagsProperty = nameof(ScheduledTagsProperty);
 21    private const string CompletedTagsProperty = nameof(CompletedTagsProperty);
 22
 23    /// <inheritdoc />
 024    public ParallelForEach(Func<ExpressionExecutionContext, ICollection<T>> @delegate, [CallerFilePath] string? source =
 25    {
 026    }
 27
 28    /// <inheritdoc />
 029    public ParallelForEach(Func<ICollection<T>> @delegate, [CallerFilePath] string? source = null, [CallerLineNumber] in
 30    {
 031    }
 32
 33    /// <inheritdoc />
 1434    public ParallelForEach(ICollection<T> items, [CallerFilePath] string? source = null, [CallerLineNumber] int? line = 
 35    {
 1436    }
 37
 38    /// <inheritdoc />
 1439    public ParallelForEach(Input<object> items, [CallerFilePath] string? source = null, [CallerLineNumber] int? line = n
 40    {
 1441        Items = items;
 1442    }
 43
 44    /// <inheritdoc />
 1645    public ParallelForEach([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, l
 46    {
 1647    }
 48
 49    /// <summary>
 50    /// The items to iterate.
 51    /// </summary>
 52    [Input(Description = "The items to iterate through.")]
 6553    public Input<object> Items { get; set; } = new(Array.Empty<T>());
 54
 55    /// <summary>
 56    /// The <see cref="IActivity"/> to execute each iteration.
 57    /// </summary>
 58    [Port]
 5659    public IActivity Body { get; set; } = null!;
 60
 61    /// <inheritdoc />
 62    protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
 63    {
 1464        var items = context.GetItemSource<T>(Items);
 1465        var tags = new List<Guid>();
 1466        var currentIndex = 0;
 67
 8468        await foreach (var item in items)
 69        {
 70            // For each item, declare a new variable for the work to be scheduled.
 2871            var currentValueVariable = new Variable<T>("CurrentValue", item)
 2872            {
 2873                // TODO: This should be configurable, because this won't work for e.g. file streams and other non-serial
 2874                StorageDriverType = typeof(WorkflowInstanceStorageDriver)
 2875            };
 76
 2877            var currentIndexVariable = new Variable<int>("CurrentIndex", currentIndex++) { StorageDriverType = typeof(Wo
 2878            var variables = new List<Variable> { currentValueVariable, currentIndexVariable };
 79
 80            // Schedule a body of work for each item.
 2881            var tag = Guid.NewGuid();
 2882            tags.Add(tag);
 2883            await context.ScheduleActivityAsync(Body, OnChildCompleted, tag, variables);
 84        }
 85
 1486        SetTagList(context, ScheduledTagsProperty, tags);
 1487        SetTagList(context, CompletedTagsProperty, new List<Guid>());
 88
 89        // If there were no items, we're done.
 1490        if (tags.Count == 0)
 391            await context.CompleteActivityAsync();
 1492    }
 93
 94    private async ValueTask OnChildCompleted(ActivityCompletedContext context)
 95    {
 1696        var targetContext = context.TargetContext;
 1697        var scheduledTags = GetTagList(targetContext, ScheduledTagsProperty);
 1698        var completedTag = targetContext.Tag.ConvertTo<Guid>();
 1699        var completedTags = GetTagList(targetContext, CompletedTagsProperty);
 100
 16101        completedTags.Add(completedTag);
 16102        SetTagList(targetContext, CompletedTagsProperty, completedTags);
 103
 104        // If not all scheduled activities have completed yet, we're not done yet.
 16105        if (!scheduledTags.IsEqualTo(completedTags))
 11106            return;
 107
 108        // We're done, so complete the activity.
 5109        await targetContext.CompleteActivityAsync();
 16110    }
 111
 112    private ICollection<Guid> GetTagList(ActivityExecutionContext context, string propertyName)
 113    {
 114        // Read the list of tags from the context using the specified property name. The value is stored as JsonArray, s
 32115        var jsonArray = context.GetProperty<JsonArray>(propertyName)!;
 98116        return jsonArray.Select(x => x.ConvertTo<Guid>()).ToList();
 117    }
 118
 119    private void SetTagList(ActivityExecutionContext context, string propertyName, ICollection<Guid> tags)
 120    {
 121        // Serialize the list of tags to a JsonArray and store it in the context using the specified property name.
 44122        var jsonArray = JsonSerializer.SerializeToNode(tags) as JsonArray;
 44123        context.SetProperty(propertyName, jsonArray);
 44124    }
 125}