< Summary

Line coverage
70%
Covered lines: 164
Uncovered lines: 70
Coverable lines: 234
Total lines: 542
Line coverage: 70%
Branch coverage
79%
Covered branches: 119
Total branches: 150
Branch coverage: 79.3%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.Counters.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Activities.Flowchart.Contracts;
 3using Elsa.Workflows.Activities.Flowchart.Extensions;
 4using Elsa.Workflows.Activities.Flowchart.Models;
 5using Elsa.Workflows.Options;
 6using Elsa.Workflows.Signals;
 7
 8namespace Elsa.Workflows.Activities.Flowchart.Activities;
 9
 10public partial class Flowchart
 11{
 12    private const string ScopeProperty = "FlowScope";
 13    private const string BackwardConnectionActivityInput = "BackwardConnection";
 14
 15    private async ValueTask OnChildCompletedCounterBasedLogicAsync(ActivityCompletedContext context)
 16    {
 11717        var flowchartContext = context.TargetContext;
 11718        var completedActivityContext = context.ChildContext;
 11719        var completedActivity = completedActivityContext.Activity;
 11720        var result = context.Result;
 21
 11722        if (flowchartContext.Activity != this)
 23        {
 024            throw new Exception("Target context activity must be this flowchart");
 25        }
 26
 27        // If the completed activity's status is anything but "Completed", do not schedule its outbound activities.
 11728        if (completedActivityContext.Status != ActivityStatus.Completed)
 29        {
 030            return;
 31        }
 32
 33        // If the complete activity is a terminal node, complete the flowchart immediately.
 11734        if (completedActivity is ITerminalNode)
 35        {
 036            await flowchartContext.CompleteActivityAsync();
 037            return;
 38        }
 39
 40        // Determine the outcomes from the completed activity
 11741        var outcomes = result is Outcomes o ? o : Outcomes.Default;
 42
 43        // Schedule the outbound activities
 11744        var flowGraph = flowchartContext.GetFlowGraph();
 11745        var flowScope = GetFlowScope(flowchartContext);
 11746        var completedActivityExecutedByBackwardConnection = completedActivityContext.ActivityInput.GetValueOrDefault<boo
 11747        bool hasScheduledActivity = await ScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, comple
 48
 49        // If there are not any outbound connections, complete the flowchart activity if there is no other pending work
 11750        if (!hasScheduledActivity)
 51        {
 2952            await CompleteIfNoPendingWorkAsync(flowchartContext);
 53        }
 11754    }
 55
 56    private FlowScope GetFlowScope(ActivityExecutionContext context)
 57    {
 15358        return context.GetProperty(ScopeProperty, () => new FlowScope());
 59    }
 60
 61    /// <summary>
 62    /// Schedules outbound activities based on the flowchart's structure and execution state.
 63    /// This method determines whether an activity should be scheduled based on visited connections,
 64    /// forward traversal rules, and backward connections.
 65    /// </summary>
 66    /// <param name="flowGraph">The graph representation of the flowchart.</param>
 67    /// <param name="flowScope">Tracks activity and connection visits.</param>
 68    /// <param name="flowchartContext">The execution context of the flowchart.</param>
 69    /// <param name="activity">The current activity being processed.</param>
 70    /// <param name="outcomes">The outcomes that determine which connections were followed.</param>
 71    /// <param name="completedActivityExecutedByBackwardConnection">Indicates if the completed activity was executed due
 72    /// <returns>True if at least one activity was scheduled; otherwise, false.</returns>
 73    private async ValueTask<bool> ScheduleOutboundActivitiesAsync(FlowGraph flowGraph, FlowScope flowScope, ActivityExec
 74    {
 13375        var hasScheduledActivity = false;
 76
 77        // Check if the activity is dangling (i.e., it is not reachable from the flowchart graph)
 13378        if (flowGraph.IsDanglingActivity(activity))
 79        {
 080            throw new Exception($"Activity {activity.Id} is not reachable from the flowchart graph. Unable to schedule i
 81        }
 82
 83        // Register the activity as visited unless it was executed due to a backward connection
 13384        if (!completedActivityExecutedByBackwardConnection)
 85        {
 13386            flowScope.RegisterActivityVisit(activity);
 87        }
 88
 89        // Process each outbound connection from the current activity
 50290        foreach (var outboundConnection in flowGraph.GetOutboundConnections(activity))
 91        {
 11892            var connectionFollowed = outcomes.Names.Contains(outboundConnection.Source.Port);
 11893            flowScope.RegisterConnectionVisit(outboundConnection, connectionFollowed);
 11894            var outboundActivity = outboundConnection.Target.Activity;
 95
 96            // Determine scheduling strategy based on connection type
 11897            if (flowGraph.IsBackwardConnection(outboundConnection, out var backwardConnectionIsValid))
 98            {
 099                hasScheduledActivity |= await ScheduleBackwardConnectionActivityAsync(flowGraph, flowchartContext, outbo
 100            }
 118101            else if (outboundActivity is not IJoinNode)
 102            {
 94103                hasScheduledActivity |= await ScheduleNonJoinActivityAsync(flowGraph, flowScope, flowchartContext, outbo
 104            }
 105            else
 106            {
 24107                hasScheduledActivity |= await ScheduleJoinActivityAsync(flowGraph, flowScope, flowchartContext, outbound
 108            }
 109        }
 110
 133111        return hasScheduledActivity;
 133112    }
 113
 114    /// <summary>
 115    /// Schedules an outbound activity that originates from a backward connection.
 116    /// </summary>
 117    private async ValueTask<bool> ScheduleBackwardConnectionActivityAsync(FlowGraph flowGraph, ActivityExecutionContext 
 118    {
 0119        if (!connectionFollowed)
 120        {
 0121            return false;
 122        }
 123
 0124        if (!backwardConnectionIsValid)
 125        {
 0126            throw new Exception($"Invalid backward connection: Every path from the source ('{outboundConnection.Source.A
 127        }
 128
 0129        var scheduleWorkOptions = new ScheduleWorkOptions
 0130        {
 0131            CompletionCallback = OnChildCompletedCounterBasedLogicAsync,
 0132            Input = new Dictionary<string, object>()
 0133            {
 0134                {
 0135                    BackwardConnectionActivityInput, true
 0136                }
 0137            }
 0138        };
 139
 0140        await flowchartContext.ScheduleActivityAsync(outboundActivity, scheduleWorkOptions);
 0141        return true;
 0142    }
 143
 144    /// <summary>
 145    /// Schedules a non-join activity if all its forward inbound connections have been visited.
 146    /// </summary>
 147    private async ValueTask<bool> ScheduleNonJoinActivityAsync(FlowGraph flowGraph, FlowScope flowScope, ActivityExecuti
 148    {
 94149        if (!flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 150        {
 2151            return false;
 152        }
 153
 92154        if (flowScope.HasFollowedInboundConnection(flowGraph, outboundActivity))
 155        {
 76156            await flowchartContext.ScheduleActivityAsync(outboundActivity, OnChildCompletedCounterBasedLogicAsync);
 76157            return true;
 158        }
 159        else
 160        {
 161            // Propagate skipped connections by scheduling with Outcomes.Empty
 16162            return await ScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, outboundActivity, Outco
 163        }
 94164    }
 165
 166    /// <summary>
 167    /// Schedules a join activity based on inbound connection statuses.
 168    /// </summary>
 169    private async ValueTask<bool> ScheduleJoinActivityAsync(FlowGraph flowGraph, FlowScope flowScope, ActivityExecutionC
 170    {
 171        // Ignore the connection if the join activity has already completed (JoinAny scenario)
 24172        if (flowScope.ShouldIgnoreConnection(outboundConnection, outboundActivity))
 173        {
 0174            return false;
 175        }
 176
 177        // Schedule the join activity only if at least one inbound connection was followed
 24178        if (!flowScope.HasFollowedInboundConnection(flowGraph, outboundActivity))
 179        {
 0180            if (flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 181            {
 182                // Propagate skipped connections by scheduling with Outcomes.Empty
 0183                return await ScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, outboundActivity, O
 184            }
 185
 0186            return false;
 187        }
 188
 189        // Check for an existing execution context for the join activity
 24190        var joinContext = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.LastOrDefault(x =>
 171191            x.ParentActivityExecutionContext == flowchartContext &&
 171192            x.Activity == outboundActivity &&
 171193            x.Status is ActivityStatus.Pending or ActivityStatus.Running);
 194
 195        // If the join activity was already scheduled, do not schedule it again
 24196        if (joinContext == null)
 197        {
 46198            var activityScheduled = flowchartContext.WorkflowExecutionContext.Scheduler.List().Any(workItem => workItem.
 24199            if (activityScheduled)
 200            {
 11201                return true;
 202            }
 203        }
 204
 13205        if (joinContext is not { Status: ActivityStatus.Running })
 206        {
 13207            var scheduleWorkOptions = new ScheduleWorkOptions
 13208            {
 13209                CompletionCallback = OnChildCompletedCounterBasedLogicAsync,
 13210                ExistingActivityExecutionContext = joinContext
 13211            };
 13212            await flowchartContext.ScheduleActivityAsync(outboundActivity, scheduleWorkOptions);
 13213            return true;
 214        }
 215        else
 216        {
 0217            return false;
 218        }
 24219    }
 220
 221    public static bool CanWaitAllProceed(ActivityExecutionContext context)
 222    {
 8223        var flowchartContext = context.ParentActivityExecutionContext!;
 8224        var flowchart = (Flowchart)flowchartContext.Activity;
 8225        var flowGraph = flowchartContext.GetFlowGraph();
 8226        var flowScope = flowchart.GetFlowScope(flowchartContext);
 8227        var activity = context.Activity;
 228
 8229        return flowScope.AllInboundConnectionsVisited(flowGraph, activity);
 230    }
 231
 232    private async ValueTask OnScheduleOutcomesAsync(ScheduleActivityOutcomes signal, SignalContext context)
 233    {
 0234        var flowchartContext = context.ReceiverActivityExecutionContext;
 0235        var schedulingActivityContext = context.SenderActivityExecutionContext;
 0236        var schedulingActivity = schedulingActivityContext.Activity;
 0237        var outcomes = signal.Outcomes;
 0238        var outboundConnections = Connections.Where(connection => connection.Source.Activity == schedulingActivity && ou
 0239        var outboundActivities = outboundConnections.Select(x => x.Target.Activity).ToList();
 240
 0241        if (outboundActivities.Any())
 242        {
 0243            foreach (var activity in outboundActivities)
 0244                await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedCounterBasedLogicAsync);
 245        }
 0246    }
 247
 248    private async ValueTask OnCounterFlowActivityCanceledAsync(CancelSignal signal, SignalContext context)
 249    {
 0250        var flowchartContext = context.ReceiverActivityExecutionContext;
 0251        await CompleteIfNoPendingWorkAsync(flowchartContext);
 0252        var flowchart = (Flowchart)flowchartContext.Activity;
 0253        var flowGraph = flowchartContext.GetFlowGraph();
 0254        var flowScope = flowchart.GetFlowScope(flowchartContext);
 255
 256        // Propagate canceled connections visited count by scheduling with Outcomes.Empty
 0257        await flowchart.ScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, context.SenderActivityEx
 0258    }
 259}

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.cs

#LineLine coverage
 1using System.ComponentModel;
 2using System.Runtime.CompilerServices;
 3using Elsa.Workflows.Activities.Flowchart.Extensions;
 4using Elsa.Workflows.Activities.Flowchart.Models;
 5using Elsa.Workflows.Attributes;
 6using Elsa.Workflows.Signals;
 7
 8namespace Elsa.Workflows.Activities.Flowchart.Activities;
 9
 10/// <summary>
 11/// A flowchart consists of a collection of activities and connections between them.
 12/// </summary>
 13[Activity("Elsa", "Flow", "A flowchart is a collection of activities and connections between them.")]
 14[Browsable(false)]
 15public partial class Flowchart : Container
 16{
 17    /// <summary>
 18    /// Set this to <c>false</c> from your program file in case you wish to use the old counter based model.
 19    /// </summary>
 320    public static bool UseTokenFlow = true;
 21
 22    /// <inheritdoc />
 118423    public Flowchart([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, line)
 24    {
 118425        OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
 118426        OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
 118427        OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
 118428    }
 29
 30    /// <summary>
 31    /// The activity to execute when the flowchart starts.
 32    /// </summary>
 347633    [Port][Browsable(false)] public IActivity? Start { get; set; }
 34
 35    /// <summary>
 36    /// A list of connections between activities.
 37    /// </summary>
 413338    public ICollection<Connection> Connections { get; set; } = new List<Connection>();
 39
 40    /// <inheritdoc />
 41    protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
 42    {
 69743        var startActivity = this.GetStartActivity(context.WorkflowExecutionContext.TriggerActivityId);
 44
 69745        if (startActivity == null)
 46        {
 47            // Nothing else to execute.
 348            await context.CompleteActivityAsync();
 349            return;
 50        }
 51
 69452        await context.ScheduleActivityAsync(startActivity, OnChildCompletedAsync);
 69753    }
 54
 55    private async ValueTask OnScheduleChildActivityAsync(ScheduleChildActivity signal, SignalContext context)
 56    {
 057        var flowchartContext = context.ReceiverActivityExecutionContext;
 058        var activity = signal.Activity;
 059        var activityExecutionContext = signal.ActivityExecutionContext;
 60
 061        if (activityExecutionContext != null)
 62        {
 063            await flowchartContext.ScheduleActivityAsync(activityExecutionContext.Activity, new()
 064            {
 065                ExistingActivityExecutionContext = activityExecutionContext,
 066                CompletionCallback = OnChildCompletedAsync,
 067                Input = signal.Input
 068            });
 69        }
 70        else
 71        {
 072            await flowchartContext.ScheduleActivityAsync(activity, new()
 073            {
 074                CompletionCallback = OnChildCompletedAsync,
 075                Input = signal.Input
 076            });
 77        }
 078    }
 79
 80    private ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
 81    {
 68682        return UseTokenFlow
 68683            ? OnChildCompletedTokenBasedLogicAsync(context)
 68684            : OnChildCompletedCounterBasedLogicAsync(context);
 85    }
 86
 87    private ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
 88    {
 089        return UseTokenFlow
 090            ? OnTokenFlowActivityCanceledAsync(signal, context)
 091            : OnCounterFlowActivityCanceledAsync(signal, context);
 92    }
 93
 94    private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
 95    {
 2996        var hasPendingWork = context.HasPendingWork();
 97
 2998        if (!hasPendingWork)
 2899            await context.CompleteActivityAsync();
 29100    }
 101}

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.Tokens.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Activities.Flowchart.Extensions;
 3using Elsa.Workflows.Activities.Flowchart.Models;
 4using Elsa.Workflows.Signals;
 5
 6namespace Elsa.Workflows.Activities.Flowchart.Activities;
 7
 8public partial class Flowchart
 9{
 10    private const string TokenStoreKey = "Flowchart.Tokens";
 11
 12    private async ValueTask OnChildCompletedTokenBasedLogicAsync(ActivityCompletedContext ctx)
 13    {
 98514        var flowContext = ctx.TargetContext;
 98515        var completedActivity = ctx.ChildContext.Activity;
 98516        var flowGraph = flowContext.GetFlowGraph();
 98517        var tokens = GetTokenList(flowContext);
 18
 19        // If the completed activity is a terminal node, complete the flowchart immediately.
 98520        if (completedActivity is ITerminalNode)
 21        {
 122            tokens.Clear();
 123            await flowContext.CompleteActivityAsync();
 124            return;
 25        }
 26
 27        // Emit tokens for active outcomes.
 98428        var outcomes = (ctx.Result as Outcomes ?? Outcomes.Default).Names;
 98429        var outboundConnections = flowGraph.GetOutboundConnections(completedActivity);
 133830        var activeOutboundConnections = outboundConnections.Where(x => outcomes.Contains(x.Source.Port)).Distinct().ToLi
 31
 265432        foreach (var connection in activeOutboundConnections)
 34333            tokens.Add(Token.Create(connection.Source.Activity, connection.Target.Activity, connection.Source.Port));
 34
 35        // Consume inbound tokens to the completed activity.
 171536        var inboundTokens = tokens.Where(t => t.ToActivityId == completedActivity.Id && t is { Consumed: false, Blocked:
 265237        foreach (var t in inboundTokens)
 34238            t.Consume();
 39
 40        // Schedule next activities based on merge modes.
 265441        foreach (var connection in activeOutboundConnections)
 42        {
 34343            var targetActivity = connection.Target.Activity;
 34344            var mergeMode = await targetActivity.GetMergeModeAsync(ctx.ChildContext);
 45
 46            switch (mergeMode)
 47            {
 48                case MergeMode.Cascade:
 49                case MergeMode.Race:
 950                    if (mergeMode == MergeMode.Race)
 951                        await flowContext.CancelInboundAncestorsAsync(targetActivity);
 52
 53                    // Check for existing blocked token on this specific connection.
 954                    var existingBlockedToken = tokens.FirstOrDefault(t =>
 3455                        t.ToActivityId == targetActivity.Id &&
 3456                        t.FromActivityId == connection.Source.Activity.Id &&
 3457                        t.Outcome == connection.Source.Port &&
 3458                        t.Blocked);
 59
 960                    if (existingBlockedToken == null)
 61                    {
 62                        // Schedule the target.
 563                        await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync);
 64
 65                        // Block other inbound connections (adjust per mode if needed).
 566                        var otherInboundConnections = flowGraph.GetForwardInboundConnections(targetActivity)
 967                            .Where(x => x.Source.Activity != completedActivity)
 568                            .ToList();
 69
 1870                        foreach (var inboundConnection in otherInboundConnections)
 71                        {
 472                            var blockedToken = Token.Create(inboundConnection.Source.Activity, inboundConnection.Target.
 473                            tokens.Add(blockedToken);
 74                        }
 75                    }
 76                    else
 77                    {
 78                        // Consume the block without scheduling.
 479                        existingBlockedToken.Consume();
 80                    }
 81
 482                    break;
 83
 84                case MergeMode.Merge:
 85                    // Wait for tokens from all forward inbound connections.
 86                    // Unlike Converge, this ignores backward connections (loops).
 87                    // Schedule on arrival for <=1 forward inbound (e.g., loops, sequential).
 788                    var inboundConnectionsMerge = flowGraph.GetForwardInboundConnections(targetActivity);
 89
 790                    if (inboundConnectionsMerge.Count > 1)
 91                    {
 692                        var hasAllTokens = inboundConnectionsMerge.All(inbound =>
 1293                            tokens.Any(t =>
 3394                                t is { Consumed: false, Blocked: false } &&
 3395                                t.FromActivityId == inbound.Source.Activity.Id &&
 3396                                t.ToActivityId == targetActivity.Id &&
 3397                                t.Outcome == inbound.Source.Port
 1298                            )
 699                        );
 100
 6101                        if (hasAllTokens)
 3102                            await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync
 103                    }
 104                    else
 105                    {
 1106                        await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync);
 107                    }
 108
 1109                    break;
 110
 111                case MergeMode.Converge:
 112                    // Strictest mode: Wait for tokens from ALL inbound connections (forward + backward).
 113                    // Requires every possible inbound path to execute before proceeding.
 13114                    var allInboundConnectionsConverge = flowGraph.GetInboundConnections(targetActivity);
 115
 13116                    if (allInboundConnectionsConverge.Count > 1)
 117                    {
 12118                        var hasAllTokens = allInboundConnectionsConverge.All(inbound =>
 24119                            tokens.Any(t =>
 66120                                t is { Consumed: false, Blocked: false } &&
 66121                                t.FromActivityId == inbound.Source.Activity.Id &&
 66122                                t.ToActivityId == targetActivity.Id &&
 66123                                t.Outcome == inbound.Source.Port
 24124                            )
 12125                        );
 126
 12127                        if (hasAllTokens)
 6128                            await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync
 129                    }
 130                    else
 131                    {
 1132                        await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync);
 133                    }
 134
 1135                    break;
 136
 137                case MergeMode.Stream:
 138                default:
 139                    // Flows freely - approximation that proceeds when upstream completes, ignoring dead paths.
 314140                    var inboundConnectionsStream = flowGraph.GetForwardInboundConnections(targetActivity);
 314141                    var hasUnconsumed = inboundConnectionsStream.Any(inbound =>
 787142                        tokens.Any(t => !t.Consumed && !t.Blocked && t.ToActivityId == inbound.Source.Activity.Id)
 314143                    );
 144
 314145                    if (!hasUnconsumed)
 311146                        await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync);
 147                    break;
 148            }
 343149        }
 150
 151        // Complete flowchart if no pending work.
 984152        if (!flowContext.HasPendingWork())
 153        {
 657154            tokens.Clear();
 657155            await flowContext.CompleteActivityAsync();
 156        }
 157
 158        // Purge consumed tokens for the completed activity.
 1476159        tokens.RemoveWhere(t => t.ToActivityId == completedActivity.Id && t.Consumed);
 985160    }
 161
 162    private async ValueTask OnTokenFlowActivityCanceledAsync(CancelSignal signal, SignalContext context)
 163    {
 0164        var flowchartContext = context.ReceiverActivityExecutionContext;
 0165        var cancelledActivityContext = context.SenderActivityExecutionContext;
 166
 167        // Remove all tokens from and to this activity.
 0168        var tokenList = GetTokenList(flowchartContext);
 0169        tokenList.RemoveWhere(x => x.FromActivityId == cancelledActivityContext.Activity.Id || x.ToActivityId == cancell
 0170        await CompleteIfNoPendingWorkAsync(flowchartContext);
 0171    }
 172
 173    internal List<Token> GetTokenList(ActivityExecutionContext context)
 174    {
 1670175        if (context.Properties.TryGetValue(TokenStoreKey, out var obj) && obj is List<Token> list)
 984176            return list;
 177
 686178        var newList = new List<Token>();
 686179        context.Properties[TokenStoreKey] = newList;
 686180        return newList;
 181    }
 182}