< Summary

Line coverage
76%
Covered lines: 232
Uncovered lines: 72
Coverable lines: 304
Total lines: 729
Line coverage: 76.3%
Branch coverage
77%
Covered branches: 143
Total branches: 184
Branch coverage: 77.7%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
File 1: OnChildCompletedCounterBasedLogicAsync()100%22100%
File 1: GetStartActivity(...)66.66%131281.25%
File 1: HasPendingWork(...)50%111077.77%
File 1: GetRootActivity()100%11100%
File 1: GetFlowGraph(...)100%11100%
File 1: GetFlowScope(...)100%11100%
File 1: ProcessChildCompletedAsync()75%8885.71%
File 1: MaybeScheduleOutboundActivitiesAsync()75%8885.71%
File 1: MaybeScheduleBackwardConnectionActivityAsync()0%2040%
File 1: GetMergeModeAsync()100%22100%
File 1: MaybeScheduleOutboundActivityAsync()75%4488.88%
File 1: MaybeScheduleWaitAllActivityAsync()75%4483.33%
File 1: MaybeScheduleWaitAllActiveActivityAsync()100%44100%
File 1: MaybeScheduleWaitAnyActivityAsync()50%12860%
File 1: ScheduleOutboundActivityAsync()100%11100%
File 1: SkipOutboundActivityAsync()100%11100%
File 1: CancelRemainingInboundActivitiesAsync()100%22100%
File 1: OnScheduleOutcomesAsync()100%210%
File 1: OnCounterFlowActivityCanceledAsync()100%210%
File 2: .ctor(...)100%11100%
File 2: get_Start()100%11100%
File 2: get_Connections()100%11100%
File 2: ScheduleChildrenAsync()100%22100%
File 2: OnScheduleChildActivityAsync()0%620%
File 2: OnChildCompletedAsync(...)100%11100%
File 2: OnActivityCanceledAsync(...)100%210%
File 2: CompleteIfNoPendingWorkAsync()100%44100%
File 2: ExecuteBasedOnMode(...)100%22100%
File 2: GetEffectiveExecutionMode(...)75%44100%
File 2: ParseExecutionMode(...)10%181057.14%
File 2: GetDefaultModeFromOptions(...)33.33%6680%
File 3: OnChildCompletedTokenBasedLogicAsync()97.5%404094.54%
File 3: OnTokenFlowActivityCanceledAsync()100%210%
File 3: GetTokenList(...)100%44100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.Counters.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Activities.Flowchart.Contracts;
 3using Elsa.Workflows.Activities.Flowchart.Extensions;
 4using Elsa.Workflows.Activities.Flowchart.Models;
 5using Elsa.Workflows.Options;
 6using Elsa.Workflows.Signals;
 7
 8namespace Elsa.Workflows.Activities.Flowchart.Activities;
 9
 10public partial class Flowchart
 11{
 12    private const string ScopeProperty = "FlowScope";
 13    private const string GraphTransientProperty = "FlowGraph";
 14    private const string BackwardConnectionActivityInput = "BackwardConnection";
 15
 16
 17    private async ValueTask OnChildCompletedCounterBasedLogicAsync(ActivityCompletedContext context)
 18    {
 13019        var flowchartContext = context.TargetContext;
 13020        var completedActivityContext = context.ChildContext;
 13021        var completedActivity = completedActivityContext.Activity;
 13022        var result = context.Result;
 23
 24        // Determine the outcomes from the completed activity
 13025        var outcomes = result is Outcomes o ? o : Outcomes.Default;
 26
 13027        await ProcessChildCompletedAsync(flowchartContext, completedActivity, completedActivityContext, outcomes);
 13028    }
 29
 30    private IActivity? GetStartActivity(ActivityExecutionContext context)
 31    {
 32        // If there's a trigger that triggered this workflow, use that.
 73233        var triggerActivityId = context.WorkflowExecutionContext.TriggerActivityId;
 134234        var triggerActivity = triggerActivityId != null ? Activities.FirstOrDefault(x => x.Id == triggerActivityId) : nu
 35
 73236        if (triggerActivity != null)
 20737            return triggerActivity;
 38
 39        // If an explicit Start activity was provided, use that.
 52540        if (Start != null)
 10441            return Start;
 42
 43        // If there is a Start activity on the flowchart, use that.
 84944        var startActivity = Activities.FirstOrDefault(x => x is Start);
 45
 42146        if (startActivity != null)
 247            return startActivity;
 48
 49        // If there's an activity marked as "Can Start Workflow", use that.
 84550        var canStartWorkflowActivity = Activities.FirstOrDefault(x => x.GetCanStartWorkflow());
 51
 41952        if (canStartWorkflowActivity != null)
 053            return canStartWorkflowActivity;
 54
 55        // If there is a single activity that has no inbound connections, use that.
 41956        var root = GetRootActivity();
 57
 41958        if (root != null)
 41659            return root;
 60
 61        // If no start activity found, return the first activity.
 362        return Activities.FirstOrDefault();
 63    }
 64
 65    /// <summary>
 66    /// Checks if there is any pending work for the flowchart.
 67    /// </summary>
 68    private bool HasPendingWork(ActivityExecutionContext context)
 69    {
 4470        var workflowExecutionContext = context.WorkflowExecutionContext;
 71
 72        // Use HashSet for O(1) lookups
 28373        var activityIds = new HashSet<string>(Activities.Select(x => x.Id));
 74
 75        // Short circuit evaluation - check running instances first before more expensive scheduler check
 22076        if (context.Children.Any(x => activityIds.Contains(x.Activity.Id) && x.Status == ActivityStatus.Running))
 077            return true;
 78
 79        // Scheduler check - optimize to avoid repeated LINQ evaluations
 4480        var scheduledItems = workflowExecutionContext.Scheduler.List().ToList();
 81
 4482        return scheduledItems.Any(workItem =>
 4483        {
 1284            var ownerInstanceId = workItem.Owner?.Id;
 4485
 1286            if (ownerInstanceId == null)
 087                return false;
 4488
 1289            if (ownerInstanceId == context.Id)
 1290                return true;
 4491
 092            var ownerContext = workflowExecutionContext.ActivityExecutionContexts.First(x => x.Id == ownerInstanceId);
 093            return ownerContext.GetAncestors().Any(x => x == context);
 4494        });
 95    }
 96
 97    private IActivity? GetRootActivity()
 98    {
 99        // Get the first activity that has no inbound connections.
 419100        var query =
 419101            from activity in Activities
 426102            let inboundConnections = Connections.Any(x => x.Target.Activity == activity)
 416103            where !inboundConnections
 835104            select activity;
 105
 419106        var rootActivity = query.FirstOrDefault();
 419107        return rootActivity;
 108    }
 109
 110    private FlowGraph GetFlowGraph(ActivityExecutionContext context)
 111    {
 112        // Store in TransientProperties so FlowChart is not persisted in WorkflowState
 167113        return context.TransientProperties.GetOrAdd(GraphTransientProperty, () => new FlowGraph(Connections, GetStartAct
 114    }
 115
 116    private FlowScope GetFlowScope(ActivityExecutionContext context)
 117    {
 162118        return context.GetProperty(ScopeProperty, () => new FlowScope());
 119    }
 120
 121    private async ValueTask ProcessChildCompletedAsync(ActivityExecutionContext flowchartContext, IActivity completedAct
 122    {
 130123        if (flowchartContext.Activity != this)
 124        {
 0125            throw new("Target context activity must be this flowchart");
 126        }
 127
 128        // If the completed activity's status is anything but "Completed", do not schedule its outbound activities.
 130129        if (completedActivityContext.Status != ActivityStatus.Completed)
 130        {
 0131            return;
 132        }
 133
 134        // If the complete activity is a terminal node, complete the flowchart immediately.
 130135        if (completedActivity is ITerminalNode)
 136        {
 1137            await flowchartContext.CompleteActivityAsync();
 1138            return;
 139        }
 140
 141        // Schedule the outbound activities
 129142        var flowGraph = GetFlowGraph(flowchartContext);
 129143        var flowScope = GetFlowScope(flowchartContext);
 129144        var completedActivityExcecutedByBackwardConnection = completedActivityContext.ActivityInput.GetValueOrDefault<bo
 129145        bool hasScheduledActivity = await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, c
 146
 147        // If there are not any outbound connections, complete the flowchart activity if there is no other pending work
 129148        if (!hasScheduledActivity)
 149        {
 44150            await CompleteIfNoPendingWorkAsync(flowchartContext);
 151        }
 130152    }
 153
 154    /// <summary>
 155    /// Schedules outbound activities based on the flowchart's structure and execution state.
 156    /// This method determines whether an activity should be scheduled based on visited connections,
 157    /// forward traversal rules, and backward connections. If outcomes is Outcomes.Empty, it indicates
 158    /// that the activity should be skipped - all outbound connections will be visited and treated as
 159    /// not followed.
 160    /// </summary>
 161    /// <param name="flowGraph">The graph representation of the flowchart.</param>
 162    /// <param name="flowScope">Tracks activity and connection visits.</param>
 163    /// <param name="flowchartContext">The execution context of the flowchart.</param>
 164    /// <param name="activity">The current activity being processed.</param>
 165    /// <param name="outcomes">The outcomes that determine which connections were followed.</param>
 166    /// <param name="completionCallback">The callback to invoke upon activity completion.</param>
 167    /// <param name="completedActivityExecutedByBackwardConnection">Indicates if the completed activity
 168    /// was executed due to a backward connection.</param>
 169    /// <returns>True if at least one activity was scheduled; otherwise, false.</returns>
 170    private static async ValueTask<bool> MaybeScheduleOutboundActivitiesAsync(FlowGraph flowGraph, FlowScope flowScope, 
 171    {
 145172        bool hasScheduledActivity = false;
 173
 174        // Check if the activity is dangling (i.e., it is not reachable from the flowchart graph)
 145175        if (flowGraph.IsDanglingActivity(activity))
 176        {
 0177            throw new($"Activity {activity.Id} is not reachable from the flowchart graph. Unable to schedule it's outbou
 178        }
 179
 180        // Register the activity as visited unless it was executed due to a backward connection
 145181        if (!completedActivityExecutedByBackwardConnection)
 182        {
 145183            flowScope.RegisterActivityVisit(activity);
 184        }
 185
 186        // Process each outbound connection from the current activity
 542187        foreach (var outboundConnection in flowGraph.GetOutboundConnections(activity))
 188        {
 126189            var connectionFollowed = outcomes.Names.Contains(outboundConnection.Source.Port);
 126190            flowScope.RegisterConnectionVisit(outboundConnection, connectionFollowed);
 126191            var outboundActivity = outboundConnection.Target.Activity;
 192
 193            // Determine the scheduling strategy based on connection-type.
 126194            if (flowGraph.IsBackwardConnection(outboundConnection, out var backwardConnectionIsValid))
 195                // Backward connections are scheduled differently
 0196                hasScheduledActivity |= await MaybeScheduleBackwardConnectionActivityAsync(flowGraph, flowchartContext, 
 197            else
 126198                hasScheduledActivity |= await MaybeScheduleOutboundActivityAsync(flowGraph, flowScope, flowchartContext,
 199        }
 200
 145201        return hasScheduledActivity;
 145202    }
 203
 204    /// <summary>
 205    /// Schedules an outbound activity that originates from a backward connection.
 206    /// </summary>
 207    private static async ValueTask<bool> MaybeScheduleBackwardConnectionActivityAsync(FlowGraph flowGraph, ActivityExecu
 208    {
 0209        if (!connectionFollowed)
 210        {
 0211            return false;
 212        }
 213
 0214        if (!backwardConnectionIsValid)
 215        {
 0216            throw new($"Invalid backward connection: Every path from the source ('{outboundConnection.Source.Activity.Id
 217        }
 218
 0219        var scheduleWorkOptions = new ScheduleWorkOptions
 0220        {
 0221            CompletionCallback = completionCallback,
 0222            Input = new Dictionary<string, object>() { { BackwardConnectionActivityInput, true } }
 0223        };
 224
 0225        await flowchartContext.ScheduleActivityAsync(outboundActivity, scheduleWorkOptions);
 0226        return true;
 0227    }
 228
 229    /// <summary>
 230    /// Determines the merge mode for a given outbound activity. If the outbound activity is a FlowJoin, it retrieves it
 231    /// mode. Otherwise, it defaults to FlowJoinMode.WaitAllActive for implicit joins.
 232    /// </summary>
 233    private static async ValueTask<FlowJoinMode> GetMergeModeAsync(ActivityExecutionContext flowchartContext, IActivity 
 234    {
 126235        if (outboundActivity is FlowJoin)
 236        {
 24237            var outboundActivityExecutionContext = await flowchartContext.WorkflowExecutionContext.CreateActivityExecuti
 24238            return await outboundActivityExecutionContext.EvaluateInputPropertyAsync<FlowJoin, FlowJoinMode>(x => x.Mode
 239        }
 240        else
 241        {
 242            // Implicit join case - treat as WaitAllActive
 102243            return FlowJoinMode.WaitAllActive;
 244        }
 126245    }
 246
 247    /// <summary>
 248    /// Schedules a join activity based on inbound connection statuses.
 249    /// </summary>
 250    private static async ValueTask<bool> MaybeScheduleOutboundActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Ac
 251    {
 126252        FlowJoinMode mode = await GetMergeModeAsync(flowchartContext, outboundActivity);
 253
 126254        return mode switch
 126255        {
 15256            FlowJoinMode.WaitAll => await MaybeScheduleWaitAllActivityAsync(flowGraph, flowScope, flowchartContext, outb
 102257            FlowJoinMode.WaitAllActive => await MaybeScheduleWaitAllActiveActivityAsync(flowGraph, flowScope, flowchartC
 9258            FlowJoinMode.WaitAny => await MaybeScheduleWaitAnyActivityAsync(flowGraph, flowScope, flowchartContext, outb
 0259            _ => throw new($"Unsupported FlowJoinMode: {mode}"),
 126260        };
 126261    }
 262
 263    /// <summary>
 264    /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAll behavior.
 265    /// If all inbound connections were visited, it checks if they were all followed to decide whether to schedule or sk
 266    /// </summary>
 267    private static async ValueTask<bool> MaybeScheduleWaitAllActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Act
 268    {
 15269        if (!flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 270            // Not all inbound connections have been visited yet; do not schedule anything yet.
 7271            return false;
 272
 8273        if (flowScope.AllInboundConnectionsFollowed(flowGraph, outboundActivity))
 274            // All inbound connections were followed; schedule the outbound activity.
 8275            return await ScheduleOutboundActivityAsync(flowchartContext, outboundActivity, completionCallback);
 276        else
 277            // No inbound connections were followed; skip the outbound activity.
 0278            return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, outboundActivity, completionC
 15279    }
 280
 281    /// <summary>
 282    /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAllActive behavior.
 283    /// If all inbound connections have been visited, it checks if any were followed to decide whether to schedule or sk
 284    /// </summary>
 285    private static async ValueTask<bool> MaybeScheduleWaitAllActiveActivityAsync(FlowGraph flowGraph, FlowScope flowScop
 286    {
 102287        if (!flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 288            // Not all inbound connections have been visited yet; do not schedule anything yet.
 2289            return false;
 290
 100291        if (flowScope.AnyInboundConnectionsFollowed(flowGraph, outboundActivity))
 292            // At least one inbound connection was followed; schedule the outbound activity.
 84293            return await ScheduleOutboundActivityAsync(flowchartContext, outboundActivity, completionCallback);
 294        else
 295            // No inbound connections were followed; skip the outbound activity.
 16296            return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, outboundActivity, completionC
 102297    }
 298
 299    /// <summary>
 300    /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAny behavior.
 301    /// If any inbound connection has been followed, it schedules the activity and cancels remaining inbound activities.
 302    /// If a subsequent inbound connection is followed after the activity has been scheduled, it ignores it.
 303    /// </summary>
 304    private static async ValueTask<bool> MaybeScheduleWaitAnyActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Act
 305    {
 9306        if (flowScope.ShouldIgnoreConnection(outboundConnection, outboundActivity))
 307            // Ignore the connection if the outbound activity has already completed (JoinAny scenario)
 0308            return false;
 309
 17310        if (flowchartContext.WorkflowExecutionContext.Scheduler.List().Any(workItem => workItem.Owner == flowchartContex
 311            // Ignore the connection if the outbound activity is already scheduled
 4312            return false;
 313
 5314        if (flowScope.AnyInboundConnectionsFollowed(flowGraph, outboundActivity))
 315        {
 316            // An inbound connection has been followed; cancel remaining inbound activities
 5317            await CancelRemainingInboundActivitiesAsync(flowchartContext, outboundActivity);
 318
 319            // This is the first inbound connection followed; schedule the outbound activity
 5320            return await ScheduleOutboundActivityAsync(flowchartContext, outboundActivity, completionCallback);
 321        }
 322
 0323        if (flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 324            // All inbound connections have been visited without any being followed; skip the outbound activity
 0325            return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, outboundActivity, completionC
 326
 327        // No inbound connections have been followed yet; do not schedule anything yet.
 0328        return false;
 9329    }
 330
 331    /// <summary>
 332    /// Schedules the outbound activity.
 333    /// </summary>
 334    private static async ValueTask<bool> ScheduleOutboundActivityAsync(ActivityExecutionContext flowchartContext, IActiv
 335    {
 97336        await flowchartContext.ScheduleActivityAsync(outboundActivity, completionCallback);
 97337        return true;
 97338    }
 339
 340    /// <summary>
 341    /// Skips the outbound activity by propagating skipped connections.
 342    /// </summary>
 343    private static async ValueTask<bool> SkipOutboundActivityAsync(FlowGraph flowGraph, FlowScope flowScope, ActivityExe
 344    {
 16345        return await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, outboundActivity, Outc
 16346    }
 347
 348    private static async ValueTask CancelRemainingInboundActivitiesAsync(ActivityExecutionContext flowchartContext, IAct
 349    {
 5350        var flowchart = (Flowchart)flowchartContext.Activity;
 5351        var flowGraph = flowchart.GetFlowGraph(flowchartContext);
 5352        var ancestorActivities = flowGraph.GetAncestorActivities(outboundActivity);
 32353        var inboundActivityExecutionContexts = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.Where
 354
 355        // Cancel each ancestor activity.
 44356        foreach (var activityExecutionContext in inboundActivityExecutionContexts)
 357        {
 17358            await activityExecutionContext.CancelActivityAsync();
 359        }
 5360    }
 361
 362    private async ValueTask OnScheduleOutcomesAsync(ScheduleActivityOutcomes signal, SignalContext context)
 363    {
 0364        var flowchartContext = context.ReceiverActivityExecutionContext;
 0365        var schedulingActivityContext = context.SenderActivityExecutionContext;
 0366        var schedulingActivity = schedulingActivityContext.Activity;
 0367        var outcomes = new Outcomes(signal.Outcomes);
 0368    }
 369
 370    private async ValueTask OnCounterFlowActivityCanceledAsync(CancelSignal signal, SignalContext context)
 371    {
 0372        var flowchartContext = context.ReceiverActivityExecutionContext;
 0373        await CompleteIfNoPendingWorkAsync(flowchartContext);
 0374        var flowchart = (Flowchart)flowchartContext.Activity;
 0375        var flowGraph = flowchartContext.GetFlowGraph();
 0376        var flowScope = flowchart.GetFlowScope(flowchartContext);
 377
 378        // Propagate canceled connections visited count by scheduling with Outcomes.Empty
 0379        await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, context.SenderActivityExecuti
 0380    }
 381}

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.cs

#LineLine coverage
 1using System.ComponentModel;
 2using System.Runtime.CompilerServices;
 3using Elsa.Workflows.Activities.Flowchart.Models;
 4using Elsa.Workflows.Activities.Flowchart.Options;
 5using Elsa.Workflows.Attributes;
 6using Elsa.Workflows.Signals;
 7using Microsoft.Extensions.DependencyInjection;
 8using Microsoft.Extensions.Options;
 9
 10namespace Elsa.Workflows.Activities.Flowchart.Activities;
 11
 12/// <summary>
 13/// A flowchart consists of a collection of activities and connections between them.
 14/// </summary>
 15[Activity("Elsa", "Flow", "A flowchart is a collection of activities and connections between them.")]
 16[Browsable(false)]
 17public partial class Flowchart : Container
 18{
 19    /// <summary>
 20    /// The property key used to store the flowchart execution mode in <see cref="WorkflowExecutionContext.Properties"/>
 21    /// </summary>
 22    public const string ExecutionModePropertyKey = "Flowchart:ExecutionMode";
 23
 24    /// <summary>
 25    /// Set this to <c>false</c> from your program file in case you wish to use the old counter based model.
 26    /// This static field is used as a final fallback when no execution mode is specified via options or workflow execut
 27    /// Note: Prefer using <see cref="FlowchartOptions"/> configured via DI for application-wide settings.
 28    /// </summary>
 29    // ReSharper disable once GrammarMistakeInComment
 30    public static bool UseTokenFlow = false; // Default to false to maintain the same behavior with 3.5.2 out of the box
 31
 32    /// <inheritdoc />
 157633    public Flowchart([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, line)
 34    {
 157635        OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
 157636        OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
 157637        OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
 157638    }
 39
 40    /// <summary>
 41    /// The activity to execute when the flowchart starts.
 42    /// </summary>
 413643    [Port] [Browsable(false)] public IActivity? Start { get; set; }
 44
 45    /// <summary>
 46    /// A list of connections between activities.
 47    /// </summary>
 496948    public ICollection<Connection> Connections { get; set; } = new List<Connection>();
 49
 50    /// <inheritdoc />
 51    protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
 52    {
 69953        var startActivity = GetStartActivity(context);
 54
 69955        if (startActivity == null)
 56        {
 57            // Nothing else to execute.
 358            await context.CompleteActivityAsync();
 359            return;
 60        }
 61
 69662        await context.ScheduleActivityAsync(startActivity, OnChildCompletedAsync);
 69963    }
 64
 65    private async ValueTask OnScheduleChildActivityAsync(ScheduleChildActivity signal, SignalContext context)
 66    {
 067        var flowchartContext = context.ReceiverActivityExecutionContext;
 068        var activity = signal.Activity;
 069        var activityExecutionContext = signal.ActivityExecutionContext;
 70
 071        if (activityExecutionContext != null)
 72        {
 073            await flowchartContext.ScheduleActivityAsync(activityExecutionContext.Activity, new()
 074            {
 075                ExistingActivityExecutionContext = activityExecutionContext,
 076                CompletionCallback = OnChildCompletedAsync,
 077                Input = signal.Input
 078            });
 79        }
 80        else
 81        {
 082            await flowchartContext.ScheduleActivityAsync(activity, new()
 083            {
 084                CompletionCallback = OnChildCompletedAsync,
 085                Input = signal.Input
 086            });
 87        }
 088    }
 89
 90    private ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
 91    {
 78592        return ExecuteBasedOnMode(
 78593            context.TargetContext,
 65594            () => OnChildCompletedTokenBasedLogicAsync(context),
 91595            () => OnChildCompletedCounterBasedLogicAsync(context));
 96    }
 97
 98    private ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
 99    {
 0100        return ExecuteBasedOnMode(
 0101            context.ReceiverActivityExecutionContext,
 0102            () => OnTokenFlowActivityCanceledAsync(signal, context),
 0103            () => OnCounterFlowActivityCanceledAsync(signal, context));
 104    }
 105
 106    private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
 107    {
 44108        var hasPendingWork = HasPendingWork(context);
 109
 44110        if (!hasPendingWork)
 111        {
 159112            var hasFaultedActivities = context.Children.Any(x => x.Status == ActivityStatus.Faulted);
 113
 32114            if (!hasFaultedActivities)
 115            {
 32116                await context.CompleteActivityAsync();
 117            }
 118        }
 44119    }
 120
 121    private static ValueTask ExecuteBasedOnMode(ActivityExecutionContext context, Func<ValueTask> tokenBasedAction, Func
 122    {
 785123        var mode = GetEffectiveExecutionMode(context);
 124
 785125        return mode switch
 785126        {
 655127            FlowchartExecutionMode.TokenBased => tokenBasedAction(),
 130128            FlowchartExecutionMode.CounterBased or FlowchartExecutionMode.Default or _ => counterBasedAction()
 785129        };
 130    }
 131
 132    /// <summary>
 133    /// Gets the effective execution mode for this flowchart execution.
 134    /// Priority: WorkflowExecutionContext.Properties > FlowchartOptions (DI) > Static UseTokenFlow flag
 135    /// </summary>
 136    private static FlowchartExecutionMode GetEffectiveExecutionMode(ActivityExecutionContext context)
 137    {
 785138        var workflowExecutionContext = context.WorkflowExecutionContext;
 139
 785140        if (!workflowExecutionContext.Properties.TryGetValue(ExecutionModePropertyKey, out var modeValue))
 641141            return GetDefaultModeFromOptions(context);
 142
 144143        var mode = ParseExecutionMode(modeValue);
 144144        return mode != FlowchartExecutionMode.Default ? mode : GetDefaultModeFromOptions(context);
 145    }
 146
 147    private static FlowchartExecutionMode ParseExecutionMode(object modeValue)
 148    {
 144149        return modeValue switch
 144150        {
 144151            FlowchartExecutionMode executionMode => executionMode,
 0152            string str when Enum.TryParse<FlowchartExecutionMode>(str, true, out var parsed) => parsed,
 0153            int intValue when Enum.IsDefined(typeof(FlowchartExecutionMode), intValue) => (FlowchartExecutionMode)intVal
 0154            _ => FlowchartExecutionMode.Default
 144155        };
 156    }
 157
 158    private static FlowchartExecutionMode GetDefaultModeFromOptions(ActivityExecutionContext context)
 159    {
 641160        var options = context.WorkflowExecutionContext.ServiceProvider.GetService<IOptions<FlowchartOptions>>();
 641161        var mode = options?.Value.DefaultExecutionMode ?? FlowchartExecutionMode.Default;
 641162        if (mode == FlowchartExecutionMode.Default)
 0163            return UseTokenFlow ? FlowchartExecutionMode.TokenBased : FlowchartExecutionMode.CounterBased;
 641164        return mode;
 165    }
 166}

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.Tokens.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Activities.Flowchart.Extensions;
 3using Elsa.Workflows.Activities.Flowchart.Models;
 4using Elsa.Workflows.Signals;
 5
 6namespace Elsa.Workflows.Activities.Flowchart.Activities;
 7
 8public partial class Flowchart
 9{
 10    private const string TokenStoreKey = "Flowchart.Tokens";
 11
 12    private async ValueTask OnChildCompletedTokenBasedLogicAsync(ActivityCompletedContext ctx)
 13    {
 97614        var flowContext = ctx.TargetContext;
 97615        var completedActivity = ctx.ChildContext.Activity;
 97616        var flowGraph = flowContext.GetFlowGraph();
 97617        var tokens = GetTokenList(flowContext);
 18
 19        // If the completed activity is a terminal node, complete the flowchart immediately.
 97620        if (completedActivity is ITerminalNode)
 21        {
 022            tokens.Clear();
 023            await flowContext.CompleteActivityAsync();
 024            return;
 25        }
 26
 27        // Emit tokens for active outcomes.
 97628        var outcomes = (ctx.Result as Outcomes ?? Outcomes.Default).Names;
 97629        var outboundConnections = flowGraph.GetOutboundConnections(completedActivity);
 132630        var activeOutboundConnections = outboundConnections.Where(x => outcomes.Contains(x.Source.Port)).Distinct().ToLi
 31
 262632        foreach (var connection in activeOutboundConnections)
 33733            tokens.Add(Token.Create(connection.Source.Activity, connection.Target.Activity, connection.Source.Port));
 34
 35        // Consume inbound tokens to the completed activity.
 169636        var inboundTokens = tokens.Where(t => t.ToActivityId == completedActivity.Id && t is { Consumed: false, Blocked:
 262637        foreach (var t in inboundTokens)
 33738            t.Consume();
 39
 40        // Schedule next activities based on merge modes.
 262641        foreach (var connection in activeOutboundConnections)
 42        {
 33743            var targetActivity = connection.Target.Activity;
 33744            var mergeMode = await targetActivity.GetMergeModeAsync(ctx.ChildContext);
 45
 46            switch (mergeMode)
 47            {
 48                case MergeMode.Cascade:
 49                case MergeMode.Race:
 950                    if (mergeMode == MergeMode.Race)
 951                        await flowContext.CancelInboundAncestorsAsync(targetActivity);
 52
 53                    // Check for existing blocked token on this specific connection.
 954                    var existingBlockedToken = tokens.FirstOrDefault(t =>
 3455                        t.ToActivityId == targetActivity.Id &&
 3456                        t.FromActivityId == connection.Source.Activity.Id &&
 3457                        t.Outcome == connection.Source.Port &&
 3458                        t.Blocked);
 59
 960                    if (existingBlockedToken == null)
 61                    {
 62                        // Schedule the target.
 563                        await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync);
 64
 65                        // Block other inbound connections (adjust per mode if needed).
 566                        var otherInboundConnections = flowGraph.GetForwardInboundConnections(targetActivity)
 967                            .Where(x => x.Source.Activity != completedActivity)
 568                            .ToList();
 69
 1870                        foreach (var inboundConnection in otherInboundConnections)
 71                        {
 472                            var blockedToken = Token.Create(inboundConnection.Source.Activity, inboundConnection.Target.
 473                            tokens.Add(blockedToken);
 74                        }
 75                    }
 76                    else
 77                    {
 78                        // Consume the block without scheduling.
 479                        existingBlockedToken.Consume();
 80                    }
 81
 482                    break;
 83
 84                case MergeMode.Merge:
 85                    // Wait for tokens from all forward inbound connections.
 86                    // Unlike Converge, this ignores backward connections (loops).
 87                    // Schedule on arrival for <=1 forward inbound (e.g., loops, sequential).
 788                    var inboundConnectionsMerge = flowGraph.GetForwardInboundConnections(targetActivity);
 89
 790                    if (inboundConnectionsMerge.Count > 1)
 91                    {
 692                        var hasAllTokens = inboundConnectionsMerge.All(inbound =>
 1293                            tokens.Any(t =>
 3394                                t is { Consumed: false, Blocked: false } &&
 3395                                t.FromActivityId == inbound.Source.Activity.Id &&
 3396                                t.ToActivityId == targetActivity.Id &&
 3397                                t.Outcome == inbound.Source.Port
 1298                            )
 699                        );
 100
 6101                        if (hasAllTokens)
 3102                            await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync
 103                    }
 104                    else
 105                    {
 1106                        await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync);
 107                    }
 108
 1109                    break;
 110
 111                case MergeMode.Converge:
 112                    // Strictest mode: Wait for tokens from ALL inbound connections (forward + backward).
 113                    // Requires every possible inbound path to execute before proceeding.
 13114                    var allInboundConnectionsConverge = flowGraph.GetInboundConnections(targetActivity);
 115
 13116                    if (allInboundConnectionsConverge.Count > 1)
 117                    {
 12118                        var hasAllTokens = allInboundConnectionsConverge.All(inbound =>
 24119                            tokens.Any(t =>
 66120                                t is { Consumed: false, Blocked: false } &&
 66121                                t.FromActivityId == inbound.Source.Activity.Id &&
 66122                                t.ToActivityId == targetActivity.Id &&
 66123                                t.Outcome == inbound.Source.Port
 24124                            )
 12125                        );
 126
 12127                        if (hasAllTokens)
 6128                            await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync
 129                    }
 130                    else
 131                    {
 1132                        await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync);
 133                    }
 134
 1135                    break;
 136
 137                case MergeMode.Stream:
 138                default:
 139                    // Flows freely - approximation that proceeds when upstream completes, ignoring dead paths.
 308140                    var inboundConnectionsStream = flowGraph.GetForwardInboundConnections(targetActivity);
 308141                    var hasUnconsumed = inboundConnectionsStream.Any(inbound =>
 772142                        tokens.Any(t => !t.Consumed && !t.Blocked && t.ToActivityId == inbound.Source.Activity.Id)
 308143                    );
 144
 308145                    if (!hasUnconsumed)
 305146                        await flowContext.ScheduleActivityAsync(targetActivity, OnChildCompletedTokenBasedLogicAsync);
 147                    break;
 148            }
 337149        }
 150
 151        // Complete flowchart if no pending work.
 976152        if (!flowContext.HasPendingWork())
 153        {
 655154            tokens.Clear();
 655155            await flowContext.CompleteActivityAsync();
 156        }
 157
 158        // Purge consumed tokens for the completed activity.
 1459159        tokens.RemoveWhere(t => t.ToActivityId == completedActivity.Id && t.Consumed);
 976160    }
 161
 162    private async ValueTask OnTokenFlowActivityCanceledAsync(CancelSignal signal, SignalContext context)
 163    {
 0164        var flowchartContext = context.ReceiverActivityExecutionContext;
 0165        var cancelledActivityContext = context.SenderActivityExecutionContext;
 166
 167        // Remove all tokens from and to this activity.
 0168        var tokenList = GetTokenList(flowchartContext);
 0169        tokenList.RemoveWhere(x => x.FromActivityId == cancelledActivityContext.Activity.Id || x.ToActivityId == cancell
 0170        await CompleteIfNoPendingWorkAsync(flowchartContext);
 0171    }
 172
 173    internal List<Token> GetTokenList(ActivityExecutionContext context)
 174    {
 1631175        if (context.Properties.TryGetValue(TokenStoreKey, out var obj) && obj is List<Token> list)
 976176            return list;
 177
 655178        var newList = new List<Token>();
 655179        context.Properties[TokenStoreKey] = newList;
 655180        return newList;
 181    }
 182}

Methods/Properties

OnChildCompletedCounterBasedLogicAsync()
GetStartActivity(Elsa.Workflows.ActivityExecutionContext)
HasPendingWork(Elsa.Workflows.ActivityExecutionContext)
GetRootActivity()
GetFlowGraph(Elsa.Workflows.ActivityExecutionContext)
GetFlowScope(Elsa.Workflows.ActivityExecutionContext)
ProcessChildCompletedAsync()
MaybeScheduleOutboundActivitiesAsync()
MaybeScheduleBackwardConnectionActivityAsync()
GetMergeModeAsync()
MaybeScheduleOutboundActivityAsync()
MaybeScheduleWaitAllActivityAsync()
MaybeScheduleWaitAllActiveActivityAsync()
MaybeScheduleWaitAnyActivityAsync()
ScheduleOutboundActivityAsync()
SkipOutboundActivityAsync()
CancelRemainingInboundActivitiesAsync()
OnScheduleOutcomesAsync()
OnCounterFlowActivityCanceledAsync()
.ctor(System.String,System.Nullable`1<System.Int32>)
get_Start()
get_Connections()
ScheduleChildrenAsync()
OnScheduleChildActivityAsync()
OnChildCompletedAsync(Elsa.Workflows.ActivityCompletedContext)
OnActivityCanceledAsync(Elsa.Workflows.Signals.CancelSignal,Elsa.Workflows.SignalContext)
CompleteIfNoPendingWorkAsync()
ExecuteBasedOnMode(Elsa.Workflows.ActivityExecutionContext,System.Func`1<System.Threading.Tasks.ValueTask>,System.Func`1<System.Threading.Tasks.ValueTask>)
GetEffectiveExecutionMode(Elsa.Workflows.ActivityExecutionContext)
ParseExecutionMode(System.Object)
GetDefaultModeFromOptions(Elsa.Workflows.ActivityExecutionContext)
OnChildCompletedTokenBasedLogicAsync()
OnTokenFlowActivityCanceledAsync()
GetTokenList(Elsa.Workflows.ActivityExecutionContext)