< Summary

Line coverage
82%
Covered lines: 284
Uncovered lines: 60
Coverable lines: 344
Total lines: 779
Line coverage: 82.5%
Branch coverage
78%
Covered branches: 150
Total branches: 190
Branch coverage: 78.9%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
File 1: OnChildCompletedCounterBasedLogicAsync()100%22100%
File 1: GetStartActivity(...)75%121287.5%
File 1: HasPendingWork(...)80%101094.44%
File 1: GetRootActivity()100%11100%
File 1: GetFlowGraph(...)100%11100%
File 1: GetFlowScope(...)100%11100%
File 1: ProcessChildCompletedAsync()75%8885.71%
File 1: MaybeScheduleOutboundActivitiesAsync()75%8885.71%
File 1: MaybeScheduleBackwardConnectionActivityAsync()0%4260%
File 1: GetMergeModeAsync()100%22100%
File 1: MaybeScheduleOutboundActivityAsync()75%4488.88%
File 1: MaybeScheduleWaitAllActivityAsync()75%4483.33%
File 1: MaybeScheduleWaitAllActiveActivityAsync()100%44100%
File 1: MaybeScheduleWaitAnyActivityAsync()50%11863.63%
File 1: ScheduleOutboundActivityAsync()50%22100%
File 1: SkipOutboundActivityAsync()100%11100%
File 1: CancelRemainingInboundActivitiesAsync()100%22100%
File 1: OnScheduleOutcomesAsync()100%210%
File 1: OnCounterFlowActivityCanceledAsync()100%22100%
File 2: .ctor(...)100%11100%
File 2: get_Start()100%11100%
File 2: get_Connections()100%11100%
File 2: ScheduleChildrenAsync()100%22100%
File 2: OnScheduleChildActivityAsync()0%620%
File 2: OnChildCompletedAsync(...)100%11100%
File 2: OnActivityCanceledAsync(...)100%1175%
File 2: CompleteIfNoPendingWorkAsync()100%44100%
File 2: ExecuteBasedOnMode(...)100%22100%
File 2: GetEffectiveExecutionMode(...)75%44100%
File 2: ParseExecutionMode(...)10%181057.14%
File 2: GetDefaultModeFromOptions(...)33.33%6680%
File 3: OnChildCompletedTokenBasedLogicAsync()97.5%404096.47%
File 3: OnTokenFlowActivityCanceledAsync()100%210%
File 3: GetTokenList(...)100%44100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.Counters.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Activities.Flowchart.Contracts;
 3using Elsa.Workflows.Activities.Flowchart.Extensions;
 4using Elsa.Workflows.Activities.Flowchart.Models;
 5using Elsa.Workflows.Options;
 6using Elsa.Workflows.Signals;
 7
 8namespace Elsa.Workflows.Activities.Flowchart.Activities;
 9
 10public partial class Flowchart
 11{
 12    private const string ScopeProperty = "FlowScope";
 13    private const string GraphTransientProperty = "FlowGraph";
 14    private const string BackwardConnectionActivityInput = "BackwardConnection";
 15
 16
 17    private async ValueTask OnChildCompletedCounterBasedLogicAsync(ActivityCompletedContext context)
 18    {
 14219        var flowchartContext = context.TargetContext;
 14220        var completedActivityContext = context.ChildContext;
 14221        var completedActivity = completedActivityContext.Activity;
 14222        var result = context.Result;
 23
 24        // Determine the outcomes from the completed activity
 14225        var outcomes = result is Outcomes o ? o : Outcomes.Default;
 26
 14227        await ProcessChildCompletedAsync(flowchartContext, completedActivity, completedActivityContext, outcomes);
 14228    }
 29
 30    private IActivity? GetStartActivity(ActivityExecutionContext context)
 31    {
 32        // If there's a trigger that triggered this workflow, use that.
 74033        var triggerActivityId = context.WorkflowExecutionContext.TriggerActivityId;
 135034        var triggerActivity = triggerActivityId != null ? Activities.FirstOrDefault(x => x.Id == triggerActivityId) : nu
 35
 74036        if (triggerActivity != null)
 20737            return triggerActivity;
 38
 39        // If an explicit Start activity was provided, use that.
 53340        if (Start != null)
 10641            return Start;
 42
 43        // If there is a Start activity on the flowchart, use that.
 87544        var startActivity = Activities.FirstOrDefault(x => x is Start);
 45
 42746        if (startActivity != null)
 247            return startActivity;
 48
 49        // If there's an activity marked as "Can Start Workflow", use that.
 87150        var canStartWorkflowActivity = Activities.FirstOrDefault(x => x.GetCanStartWorkflow());
 51
 42552        if (canStartWorkflowActivity != null)
 053            return canStartWorkflowActivity;
 54
 55        // If there is a single activity that has no inbound connections, use that.
 42556        var root = GetRootActivity();
 57
 42558        if (root != null)
 42259            return root;
 60
 61        // If no start activity found, return the first activity.
 362        return Activities.FirstOrDefault();
 63    }
 64
 65    /// <summary>
 66    /// Checks if there is any pending work for the flowchart.
 67    /// </summary>
 68    private bool HasPendingWork(ActivityExecutionContext context)
 69    {
 5470        var workflowExecutionContext = context.WorkflowExecutionContext;
 71
 72        // Use HashSet for O(1) lookups
 32873        var activityIds = new HashSet<string>(Activities.Select(x => x.Id));
 74
 75        // Short circuit evaluation - check running instances first before more expensive scheduler check
 26176        if (context.Children.Any(x => activityIds.Contains(x.Activity.Id) && x.Status == ActivityStatus.Running))
 177            return true;
 78
 79        // Scheduler check - optimize to avoid repeated LINQ evaluations
 5380        var scheduledItems = workflowExecutionContext.Scheduler.List().ToList();
 81
 5382        return scheduledItems.Any(workItem =>
 5383        {
 1884            var ownerInstanceId = workItem.Owner?.Id;
 5385
 1886            if (ownerInstanceId == null)
 087                return false;
 5388
 1889            if (ownerInstanceId == context.Id)
 1790                return true;
 5391
 392            var ownerContext = workflowExecutionContext.ActivityExecutionContexts.First(x => x.Id == ownerInstanceId);
 293            return ownerContext.GetAncestors().Any(x => x == context);
 5394        });
 95    }
 96
 97    private IActivity? GetRootActivity()
 98    {
 99        // Get the first activity that has no inbound connections.
 425100        var query =
 425101            from activity in Activities
 446102            let inboundConnections = Connections.Any(x => x.Target.Activity == activity)
 422103            where !inboundConnections
 847104            select activity;
 105
 425106        var rootActivity = query.FirstOrDefault();
 425107        return rootActivity;
 108    }
 109
 110    private FlowGraph GetFlowGraph(ActivityExecutionContext context)
 111    {
 112        // Store in TransientProperties so FlowChart is not persisted in WorkflowState
 185113        return context.TransientProperties.GetOrAdd(GraphTransientProperty, () => new FlowGraph(Connections, GetStartAct
 114    }
 115
 116    private FlowScope GetFlowScope(ActivityExecutionContext context)
 117    {
 181118        return context.GetProperty(ScopeProperty, () => new FlowScope());
 119    }
 120
 121    private async ValueTask ProcessChildCompletedAsync(ActivityExecutionContext flowchartContext, IActivity completedAct
 122    {
 142123        if (flowchartContext.Activity != this)
 124        {
 0125            throw new("Target context activity must be this flowchart");
 126        }
 127
 128        // If the completed activity's status is anything but "Completed", do not schedule its outbound activities.
 142129        if (completedActivityContext.Status != ActivityStatus.Completed)
 130        {
 0131            return;
 132        }
 133
 134        // If the complete activity is a terminal node, complete the flowchart immediately.
 142135        if (completedActivity is ITerminalNode)
 136        {
 1137            await flowchartContext.CompleteActivityAsync();
 1138            return;
 139        }
 140
 141        // Schedule the outbound activities
 141142        var flowGraph = GetFlowGraph(flowchartContext);
 141143        var flowScope = GetFlowScope(flowchartContext);
 141144        var completedActivityExcecutedByBackwardConnection = completedActivityContext.ActivityInput.GetValueOrDefault<bo
 141145        bool hasScheduledActivity = await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, c
 146
 147        // If there are not any outbound connections, complete the flowchart activity if there is no other pending work
 141148        if (!hasScheduledActivity)
 149        {
 47150            await CompleteIfNoPendingWorkAsync(flowchartContext);
 151        }
 142152    }
 153
 154    /// <summary>
 155    /// Schedules outbound activities based on the flowchart's structure and execution state.
 156    /// This method determines whether an activity should be scheduled based on visited connections,
 157    /// forward traversal rules, and backward connections. If outcomes is Outcomes.Empty, it indicates
 158    /// that the activity should be skipped - all outbound connections will be visited and treated as
 159    /// not followed.
 160    /// </summary>
 161    /// <param name="flowGraph">The graph representation of the flowchart.</param>
 162    /// <param name="flowScope">Tracks activity and connection visits.</param>
 163    /// <param name="flowchartContext">The execution context of the flowchart.</param>
 164    /// <param name="activity">The current activity being processed.</param>
 165    /// <param name="completedActivityContext">The execution context of the completed activity (for call stack tracking)
 166    /// <param name="outcomes">The outcomes that determine which connections were followed.</param>
 167    /// <param name="completionCallback">The callback to invoke upon activity completion.</param>
 168    /// <param name="completedActivityExecutedByBackwardConnection">Indicates if the completed activity
 169    /// was executed due to a backward connection.</param>
 170    /// <returns>True if at least one activity was scheduled; otherwise, false.</returns>
 171    private static async ValueTask<bool> MaybeScheduleOutboundActivitiesAsync(FlowGraph flowGraph, FlowScope flowScope, 
 172    {
 163173        bool hasScheduledActivity = false;
 174
 175        // Check if the activity is dangling (i.e., it is not reachable from the flowchart graph)
 163176        if (flowGraph.IsDanglingActivity(activity))
 177        {
 0178            throw new($"Activity {activity.Id} is not reachable from the flowchart graph. Unable to schedule it's outbou
 179        }
 180
 181        // Register the activity as visited unless it was executed due to a backward connection
 163182        if (!completedActivityExecutedByBackwardConnection)
 183        {
 163184            flowScope.RegisterActivityVisit(activity);
 185        }
 186
 187        // Process each outbound connection from the current activity
 606188        foreach (var outboundConnection in flowGraph.GetOutboundConnections(activity))
 189        {
 140190            var connectionFollowed = outcomes.Names.Contains(outboundConnection.Source.Port);
 140191            flowScope.RegisterConnectionVisit(outboundConnection, connectionFollowed);
 140192            var outboundActivity = outboundConnection.Target.Activity;
 193
 194            // Determine the scheduling strategy based on connection-type.
 140195            if (flowGraph.IsBackwardConnection(outboundConnection, out var backwardConnectionIsValid))
 196                // Backward connections are scheduled differently
 0197                hasScheduledActivity |= await MaybeScheduleBackwardConnectionActivityAsync(flowGraph, flowchartContext, 
 198            else
 140199                hasScheduledActivity |= await MaybeScheduleOutboundActivityAsync(flowGraph, flowScope, flowchartContext,
 200        }
 201
 163202        return hasScheduledActivity;
 163203    }
 204
 205    /// <summary>
 206    /// Schedules an outbound activity that originates from a backward connection.
 207    /// </summary>
 208    private static async ValueTask<bool> MaybeScheduleBackwardConnectionActivityAsync(FlowGraph flowGraph, ActivityExecu
 209    {
 0210        if (!connectionFollowed)
 211        {
 0212            return false;
 213        }
 214
 0215        if (!backwardConnectionIsValid)
 216        {
 0217            throw new($"Invalid backward connection: Every path from the source ('{outboundConnection.Source.Activity.Id
 218        }
 219
 0220        var scheduleWorkOptions = new ScheduleWorkOptions
 0221        {
 0222            CompletionCallback = completionCallback,
 0223            Input = new Dictionary<string, object>() { { BackwardConnectionActivityInput, true } },
 0224            SchedulingActivityExecutionId = completedActivityContext?.Id
 0225        };
 226
 0227        await flowchartContext.ScheduleActivityAsync(outboundActivity, scheduleWorkOptions);
 0228        return true;
 0229    }
 230
 231    /// <summary>
 232    /// Determines the merge mode for a given outbound activity. If the outbound activity is a FlowJoin, it retrieves it
 233    /// mode. Otherwise, it defaults to FlowJoinMode.WaitAllActive for implicit joins.
 234    /// </summary>
 235    private static async ValueTask<FlowJoinMode> GetMergeModeAsync(ActivityExecutionContext flowchartContext, IActivity 
 236    {
 140237        if (outboundActivity is FlowJoin)
 238        {
 26239            var outboundActivityExecutionContext = await flowchartContext.WorkflowExecutionContext.CreateActivityExecuti
 26240            return await outboundActivityExecutionContext.EvaluateInputPropertyAsync<FlowJoin, FlowJoinMode>(x => x.Mode
 241        }
 242        else
 243        {
 244            // Implicit join case - treat as WaitAllActive
 114245            return FlowJoinMode.WaitAllActive;
 246        }
 140247    }
 248
 249    /// <summary>
 250    /// Schedules a join activity based on inbound connection statuses.
 251    /// </summary>
 252    private static async ValueTask<bool> MaybeScheduleOutboundActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Ac
 253    {
 140254        FlowJoinMode mode = await GetMergeModeAsync(flowchartContext, outboundActivity);
 255
 140256        return mode switch
 140257        {
 15258            FlowJoinMode.WaitAll => await MaybeScheduleWaitAllActivityAsync(flowGraph, flowScope, flowchartContext, comp
 114259            FlowJoinMode.WaitAllActive => await MaybeScheduleWaitAllActiveActivityAsync(flowGraph, flowScope, flowchartC
 11260            FlowJoinMode.WaitAny => await MaybeScheduleWaitAnyActivityAsync(flowGraph, flowScope, flowchartContext, comp
 0261            _ => throw new($"Unsupported FlowJoinMode: {mode}"),
 140262        };
 140263    }
 264
 265    /// <summary>
 266    /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAll behavior.
 267    /// If all inbound connections were visited, it checks if they were all followed to decide whether to schedule or sk
 268    /// </summary>
 269    private static async ValueTask<bool> MaybeScheduleWaitAllActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Act
 270    {
 15271        if (!flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 272            // Not all inbound connections have been visited yet; do not schedule anything yet.
 7273            return false;
 274
 8275        if (flowScope.AllInboundConnectionsFollowed(flowGraph, outboundActivity))
 276            // All inbound connections were followed; schedule the outbound activity.
 8277            return await ScheduleOutboundActivityAsync(flowchartContext, completedActivityContext, outboundActivity, com
 278        else
 279            // No inbound connections were followed; skip the outbound activity.
 0280            return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, completedActivityContext, out
 15281    }
 282
 283    /// <summary>
 284    /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAllActive behavior.
 285    /// If all inbound connections have been visited, it checks if any were followed to decide whether to schedule or sk
 286    /// </summary>
 287    private static async ValueTask<bool> MaybeScheduleWaitAllActiveActivityAsync(FlowGraph flowGraph, FlowScope flowScop
 288    {
 114289        if (!flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 290            // Not all inbound connections have been visited yet; do not schedule anything yet.
 2291            return false;
 292
 112293        if (flowScope.AnyInboundConnectionsFollowed(flowGraph, outboundActivity))
 294            // At least one inbound connection was followed; schedule the outbound activity.
 93295            return await ScheduleOutboundActivityAsync(flowchartContext, completedActivityContext, outboundActivity, com
 296        else
 297            // No inbound connections were followed; skip the outbound activity.
 19298            return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, completedActivityContext, out
 114299    }
 300
 301    /// <summary>
 302    /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAny behavior.
 303    /// If any inbound connection has been followed, it schedules the activity and cancels remaining inbound activities.
 304    /// If a subsequent inbound connection is followed after the activity has been scheduled, it ignores it.
 305    /// </summary>
 306    private static async ValueTask<bool> MaybeScheduleWaitAnyActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Act
 307    {
 11308        if (flowScope.ShouldIgnoreConnection(outboundConnection, outboundActivity))
 309            // Ignore the connection if the outbound activity has already completed (JoinAny scenario)
 0310            return false;
 311
 20312        if (flowchartContext.WorkflowExecutionContext.Scheduler.List().Any(workItem => workItem.Owner == flowchartContex
 313            // Ignore the connection if the outbound activity is already scheduled
 4314            return false;
 315
 7316        if (flowScope.AnyInboundConnectionsFollowed(flowGraph, outboundActivity))
 317        {
 318            // This is the first inbound connection followed; schedule the outbound activity
 7319            var scheduleResponse = await ScheduleOutboundActivityAsync(flowchartContext, completedActivityContext, outbo
 320            // An inbound connection has been followed; cancel remaining inbound activities
 7321            await CancelRemainingInboundActivitiesAsync(flowchartContext, outboundActivity);
 322
 7323            return scheduleResponse;
 324        }
 325
 0326        if (flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 327            // All inbound connections have been visited without any being followed; skip the outbound activity
 0328            return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, completedActivityContext, out
 329
 330        // No inbound connections have been followed yet; do not schedule anything yet.
 0331        return false;
 11332    }
 333
 334    /// <summary>
 335    /// Schedules the outbound activity.
 336    /// </summary>
 337    private static async ValueTask<bool> ScheduleOutboundActivityAsync(ActivityExecutionContext flowchartContext, Activi
 338    {
 108339        var options = new ScheduleWorkOptions
 108340        {
 108341            CompletionCallback = completionCallback,
 108342            SchedulingActivityExecutionId = completedActivityContext?.Id
 108343        };
 108344        await flowchartContext.ScheduleActivityAsync(outboundActivity, options);
 108345        return true;
 108346    }
 347
 348    /// <summary>
 349    /// Skips the outbound activity by propagating skipped connections.
 350    /// </summary>
 351    private static async ValueTask<bool> SkipOutboundActivityAsync(FlowGraph flowGraph, FlowScope flowScope, ActivityExe
 352    {
 19353        return await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, outboundActivity, comp
 19354    }
 355
 356    private static async ValueTask CancelRemainingInboundActivitiesAsync(ActivityExecutionContext flowchartContext, IAct
 357    {
 7358        var flowchart = (Flowchart)flowchartContext.Activity;
 7359        var flowGraph = flowchart.GetFlowGraph(flowchartContext);
 7360        var ancestorActivities = flowGraph.GetAncestorActivities(outboundActivity);
 44361        var inboundActivityExecutionContexts = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.Where
 362
 363        // Cancel each ancestor activity.
 58364        foreach (var activityExecutionContext in inboundActivityExecutionContexts)
 365        {
 22366            await activityExecutionContext.CancelActivityAsync();
 367        }
 7368    }
 369
 370    private async ValueTask OnScheduleOutcomesAsync(ScheduleActivityOutcomes signal, SignalContext context)
 371    {
 0372        var flowchartContext = context.ReceiverActivityExecutionContext;
 0373        var schedulingActivityContext = context.SenderActivityExecutionContext;
 0374        var schedulingActivity = schedulingActivityContext.Activity;
 0375        var outcomes = new Outcomes(signal.Outcomes);
 0376    }
 377
 378    private async ValueTask OnCounterFlowActivityCanceledAsync(CancelSignal signal, SignalContext context)
 379    {
 7380        var flowchartContext = context.ReceiverActivityExecutionContext;
 7381        await CompleteIfNoPendingWorkAsync(flowchartContext);
 7382        var flowchart = (Flowchart)flowchartContext.Activity;
 7383        var canceledActivity = context.SenderActivityExecutionContext.Activity;
 384
 7385        if (!flowchart.Activities.Contains(canceledActivity))
 4386            return;
 387
 3388        var flowGraph = flowchartContext.GetFlowGraph();
 3389        var flowScope = flowchart.GetFlowScope(flowchartContext);
 390
 391        // Propagate canceled connections visited count by scheduling with Outcomes.Empty
 3392        await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, canceledActivity, context.Sen
 7393    }
 394}

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.cs

#LineLine coverage
 1using System.ComponentModel;
 2using System.Runtime.CompilerServices;
 3using Elsa.Workflows.Activities.Flowchart.Models;
 4using Elsa.Workflows.Activities.Flowchart.Options;
 5using Elsa.Workflows.Attributes;
 6using Elsa.Workflows.Signals;
 7using Microsoft.Extensions.DependencyInjection;
 8using Microsoft.Extensions.Options;
 9
 10namespace Elsa.Workflows.Activities.Flowchart.Activities;
 11
 12/// <summary>
 13/// A flowchart consists of a collection of activities and connections between them.
 14/// </summary>
 15[Activity("Elsa", "Flow", "A flowchart is a collection of activities and connections between them.")]
 16[Browsable(false)]
 17public partial class Flowchart : Container
 18{
 19    /// <summary>
 20    /// The property key used to store the flowchart execution mode in <see cref="WorkflowExecutionContext.Properties"/>
 21    /// </summary>
 22    public const string ExecutionModePropertyKey = "Flowchart:ExecutionMode";
 23
 24    /// <summary>
 25    /// Set this to <c>false</c> from your program file in case you wish to use the old counter based model.
 26    /// This static field is used as a final fallback when no execution mode is specified via options or workflow execut
 27    /// Note: Prefer using <see cref="FlowchartOptions"/> configured via DI for application-wide settings.
 28    /// </summary>
 29    // ReSharper disable once GrammarMistakeInComment
 30    public static bool UseTokenFlow = false; // Default to false to maintain the same behavior with 3.5.2 out of the box
 31
 32    /// <inheritdoc />
 170733    public Flowchart([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, line)
 34    {
 170735        OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
 170736        OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
 170737        OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
 170738    }
 39
 40    /// <summary>
 41    /// The activity to execute when the flowchart starts.
 42    /// </summary>
 357343    [Port] [Browsable(false)] public IActivity? Start { get; set; }
 44
 45    /// <summary>
 46    /// A list of connections between activities.
 47    /// </summary>
 528048    public ICollection<Connection> Connections { get; set; } = new List<Connection>();
 49
 50    /// <inheritdoc />
 51    protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
 52    {
 70353        var startActivity = GetStartActivity(context);
 54
 70355        if (startActivity == null)
 56        {
 57            // Nothing else to execute.
 358            await context.CompleteActivityAsync();
 359            return;
 60        }
 61
 70062        await context.ScheduleActivityAsync(startActivity, OnChildCompletedAsync);
 70363    }
 64
 65    private async ValueTask OnScheduleChildActivityAsync(ScheduleChildActivity signal, SignalContext context)
 66    {
 067        var flowchartContext = context.ReceiverActivityExecutionContext;
 068        var activity = signal.Activity;
 069        var activityExecutionContext = signal.ActivityExecutionContext;
 70
 071        if (activityExecutionContext != null)
 72        {
 073            await flowchartContext.ScheduleActivityAsync(activityExecutionContext.Activity, new()
 074            {
 075                ExistingActivityExecutionContext = activityExecutionContext,
 076                CompletionCallback = OnChildCompletedAsync,
 077                Input = signal.Input
 078            });
 79        }
 80        else
 81        {
 082            await flowchartContext.ScheduleActivityAsync(activity, new()
 083            {
 084                CompletionCallback = OnChildCompletedAsync,
 085                Input = signal.Input
 086            });
 87        }
 088    }
 89
 90    private ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
 91    {
 79792        return ExecuteBasedOnMode(
 79793            context.TargetContext,
 65594            () => OnChildCompletedTokenBasedLogicAsync(context),
 93995            () => OnChildCompletedCounterBasedLogicAsync(context));
 96    }
 97
 98    private ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
 99    {
 7100        return ExecuteBasedOnMode(
 7101            context.ReceiverActivityExecutionContext,
 0102            () => OnTokenFlowActivityCanceledAsync(signal, context),
 14103            () => OnCounterFlowActivityCanceledAsync(signal, context));
 104    }
 105
 106    private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
 107    {
 54108        var hasPendingWork = HasPendingWork(context);
 109
 54110        if (!hasPendingWork)
 111        {
 180112            var hasFaultedActivities = context.Children.Any(x => x.Status == ActivityStatus.Faulted);
 113
 36114            if (!hasFaultedActivities)
 115            {
 36116                await context.CompleteActivityAsync();
 117            }
 118        }
 54119    }
 120
 121    private static ValueTask ExecuteBasedOnMode(ActivityExecutionContext context, Func<ValueTask> tokenBasedAction, Func
 122    {
 804123        var mode = GetEffectiveExecutionMode(context);
 124
 804125        return mode switch
 804126        {
 655127            FlowchartExecutionMode.TokenBased => tokenBasedAction(),
 149128            FlowchartExecutionMode.CounterBased or FlowchartExecutionMode.Default or _ => counterBasedAction()
 804129        };
 130    }
 131
 132    /// <summary>
 133    /// Gets the effective execution mode for this flowchart execution.
 134    /// Priority: WorkflowExecutionContext.Properties > FlowchartOptions (DI) > Static UseTokenFlow flag
 135    /// </summary>
 136    private static FlowchartExecutionMode GetEffectiveExecutionMode(ActivityExecutionContext context)
 137    {
 804138        var workflowExecutionContext = context.WorkflowExecutionContext;
 139
 804140        if (!workflowExecutionContext.Properties.TryGetValue(ExecutionModePropertyKey, out var modeValue))
 641141            return GetDefaultModeFromOptions(context);
 142
 163143        var mode = ParseExecutionMode(modeValue);
 163144        return mode != FlowchartExecutionMode.Default ? mode : GetDefaultModeFromOptions(context);
 145    }
 146
 147    private static FlowchartExecutionMode ParseExecutionMode(object modeValue)
 148    {
 163149        return modeValue switch
 163150        {
 163151            FlowchartExecutionMode executionMode => executionMode,
 0152            string str when Enum.TryParse<FlowchartExecutionMode>(str, true, out var parsed) => parsed,
 0153            int intValue when Enum.IsDefined(typeof(FlowchartExecutionMode), intValue) => (FlowchartExecutionMode)intVal
 0154            _ => FlowchartExecutionMode.Default
 163155        };
 156    }
 157
 158    private static FlowchartExecutionMode GetDefaultModeFromOptions(ActivityExecutionContext context)
 159    {
 641160        var options = context.WorkflowExecutionContext.ServiceProvider.GetService<IOptions<FlowchartOptions>>();
 641161        var mode = options?.Value.DefaultExecutionMode ?? FlowchartExecutionMode.Default;
 641162        if (mode == FlowchartExecutionMode.Default)
 0163            return UseTokenFlow ? FlowchartExecutionMode.TokenBased : FlowchartExecutionMode.CounterBased;
 641164        return mode;
 165    }
 166}

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.Tokens.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Activities.Flowchart.Extensions;
 3using Elsa.Workflows.Activities.Flowchart.Models;
 4using Elsa.Workflows.Options;
 5using Elsa.Workflows.Signals;
 6
 7namespace Elsa.Workflows.Activities.Flowchart.Activities;
 8
 9public partial class Flowchart
 10{
 11    private const string TokenStoreKey = "Flowchart.Tokens";
 12
 13    private async ValueTask OnChildCompletedTokenBasedLogicAsync(ActivityCompletedContext ctx)
 14    {
 97615        var flowContext = ctx.TargetContext;
 97616        var completedActivity = ctx.ChildContext.Activity;
 97617        var flowGraph = flowContext.GetFlowGraph();
 97618        var tokens = GetTokenList(flowContext);
 19
 20        // If the completed activity is a terminal node, complete the flowchart immediately.
 97621        if (completedActivity is ITerminalNode)
 22        {
 023            tokens.Clear();
 024            await flowContext.CompleteActivityAsync();
 025            return;
 26        }
 27
 28        // Emit tokens for active outcomes.
 97629        var outcomes = (ctx.Result as Outcomes ?? Outcomes.Default).Names;
 97630        var outboundConnections = flowGraph.GetOutboundConnections(completedActivity);
 132631        var activeOutboundConnections = outboundConnections.Where(x => outcomes.Contains(x.Source.Port)).Distinct().ToLi
 32
 262633        foreach (var connection in activeOutboundConnections)
 33734            tokens.Add(Token.Create(connection.Source.Activity, connection.Target.Activity, connection.Source.Port));
 35
 36        // Consume inbound tokens to the completed activity.
 169637        var inboundTokens = tokens.Where(t => t.ToActivityId == completedActivity.Id && t is { Consumed: false, Blocked:
 262638        foreach (var t in inboundTokens)
 33739            t.Consume();
 40
 41        // Schedule next activities based on merge modes.
 262642        foreach (var connection in activeOutboundConnections)
 43        {
 33744            var targetActivity = connection.Target.Activity;
 33745            var mergeMode = await targetActivity.GetMergeModeAsync(ctx.ChildContext);
 46
 47            switch (mergeMode)
 48            {
 49                case MergeMode.Cascade:
 50                case MergeMode.Race:
 951                    if (mergeMode == MergeMode.Race)
 952                        await flowContext.CancelInboundAncestorsAsync(targetActivity);
 53
 54                    // Check for existing blocked token on this specific connection.
 955                    var existingBlockedToken = tokens.FirstOrDefault(t =>
 3456                        t.ToActivityId == targetActivity.Id &&
 3457                        t.FromActivityId == connection.Source.Activity.Id &&
 3458                        t.Outcome == connection.Source.Port &&
 3459                        t.Blocked);
 60
 961                    if (existingBlockedToken == null)
 62                    {
 63                        // Schedule the target.
 564                        var options = new ScheduleWorkOptions
 565                        {
 566                            CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 567                            SchedulingActivityExecutionId = ctx.ChildContext.Id
 568                        };
 569                        await flowContext.ScheduleActivityAsync(targetActivity, options);
 70
 71                        // Block other inbound connections (adjust per mode if needed).
 572                        var otherInboundConnections = flowGraph.GetForwardInboundConnections(targetActivity)
 973                            .Where(x => x.Source.Activity != completedActivity)
 574                            .ToList();
 75
 1876                        foreach (var inboundConnection in otherInboundConnections)
 77                        {
 478                            var blockedToken = Token.Create(inboundConnection.Source.Activity, inboundConnection.Target.
 479                            tokens.Add(blockedToken);
 80                        }
 81                    }
 82                    else
 83                    {
 84                        // Consume the block without scheduling.
 485                        existingBlockedToken.Consume();
 86                    }
 87
 488                    break;
 89
 90                case MergeMode.Merge:
 91                    // Wait for tokens from all forward inbound connections.
 92                    // Unlike Converge, this ignores backward connections (loops).
 93                    // Schedule on arrival for <=1 forward inbound (e.g., loops, sequential).
 794                    var inboundConnectionsMerge = flowGraph.GetForwardInboundConnections(targetActivity);
 95
 796                    if (inboundConnectionsMerge.Count > 1)
 97                    {
 698                        var hasAllTokens = inboundConnectionsMerge.All(inbound =>
 1299                            tokens.Any(t =>
 33100                                t is { Consumed: false, Blocked: false } &&
 33101                                t.FromActivityId == inbound.Source.Activity.Id &&
 33102                                t.ToActivityId == targetActivity.Id &&
 33103                                t.Outcome == inbound.Source.Port
 12104                            )
 6105                        );
 106
 6107                        if (hasAllTokens)
 108                        {
 3109                            var options = new ScheduleWorkOptions
 3110                            {
 3111                                CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 3112                                SchedulingActivityExecutionId = ctx.ChildContext.Id
 3113                            };
 3114                            await flowContext.ScheduleActivityAsync(targetActivity, options);
 115                        }
 116                    }
 117                    else
 118                    {
 1119                        var options = new ScheduleWorkOptions
 1120                        {
 1121                            CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 1122                            SchedulingActivityExecutionId = ctx.ChildContext.Id
 1123                        };
 1124                        await flowContext.ScheduleActivityAsync(targetActivity, options);
 125                    }
 126
 1127                    break;
 128
 129                case MergeMode.Converge:
 130                    // Strictest mode: Wait for tokens from ALL inbound connections (forward + backward).
 131                    // Requires every possible inbound path to execute before proceeding.
 13132                    var allInboundConnectionsConverge = flowGraph.GetInboundConnections(targetActivity);
 133
 13134                    if (allInboundConnectionsConverge.Count > 1)
 135                    {
 12136                        var hasAllTokens = allInboundConnectionsConverge.All(inbound =>
 24137                            tokens.Any(t =>
 66138                                t is { Consumed: false, Blocked: false } &&
 66139                                t.FromActivityId == inbound.Source.Activity.Id &&
 66140                                t.ToActivityId == targetActivity.Id &&
 66141                                t.Outcome == inbound.Source.Port
 24142                            )
 12143                        );
 144
 12145                        if (hasAllTokens)
 146                        {
 6147                            var options = new ScheduleWorkOptions
 6148                            {
 6149                                CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 6150                                SchedulingActivityExecutionId = ctx.ChildContext.Id
 6151                            };
 6152                            await flowContext.ScheduleActivityAsync(targetActivity, options);
 153                        }
 154                    }
 155                    else
 156                    {
 1157                        var options = new ScheduleWorkOptions
 1158                        {
 1159                            CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 1160                            SchedulingActivityExecutionId = ctx.ChildContext.Id
 1161                        };
 1162                        await flowContext.ScheduleActivityAsync(targetActivity, options);
 163                    }
 164
 1165                    break;
 166
 167                case MergeMode.Stream:
 168                default:
 169                    // Flows freely - approximation that proceeds when upstream completes, ignoring dead paths.
 308170                    var inboundConnectionsStream = flowGraph.GetForwardInboundConnections(targetActivity);
 308171                    var hasUnconsumed = inboundConnectionsStream.Any(inbound =>
 772172                        tokens.Any(t => !t.Consumed && !t.Blocked && t.ToActivityId == inbound.Source.Activity.Id)
 308173                    );
 174
 308175                    if (!hasUnconsumed)
 176                    {
 305177                        var options = new ScheduleWorkOptions
 305178                        {
 305179                            CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 305180                            SchedulingActivityExecutionId = ctx.ChildContext.Id
 305181                        };
 305182                        await flowContext.ScheduleActivityAsync(targetActivity, options);
 183                    }
 184                    break;
 185            }
 337186        }
 187
 188        // Complete flowchart if no pending work.
 976189        if (!flowContext.HasPendingWork())
 190        {
 655191            tokens.Clear();
 655192            await flowContext.CompleteActivityAsync();
 193        }
 194
 195        // Purge consumed tokens for the completed activity.
 1459196        tokens.RemoveWhere(t => t.ToActivityId == completedActivity.Id && t.Consumed);
 976197    }
 198
 199    private async ValueTask OnTokenFlowActivityCanceledAsync(CancelSignal signal, SignalContext context)
 200    {
 0201        var flowchartContext = context.ReceiverActivityExecutionContext;
 0202        var cancelledActivityContext = context.SenderActivityExecutionContext;
 203
 204        // Remove all tokens from and to this activity.
 0205        var tokenList = GetTokenList(flowchartContext);
 0206        tokenList.RemoveWhere(x => x.FromActivityId == cancelledActivityContext.Activity.Id || x.ToActivityId == cancell
 0207        await CompleteIfNoPendingWorkAsync(flowchartContext);
 0208    }
 209
 210    internal List<Token> GetTokenList(ActivityExecutionContext context)
 211    {
 1631212        if (context.Properties.TryGetValue(TokenStoreKey, out var obj) && obj is List<Token> list)
 976213            return list;
 214
 655215        var newList = new List<Token>();
 655216        context.Properties[TokenStoreKey] = newList;
 655217        return newList;
 218    }
 219}

Methods/Properties

OnChildCompletedCounterBasedLogicAsync()
GetStartActivity(Elsa.Workflows.ActivityExecutionContext)
HasPendingWork(Elsa.Workflows.ActivityExecutionContext)
GetRootActivity()
GetFlowGraph(Elsa.Workflows.ActivityExecutionContext)
GetFlowScope(Elsa.Workflows.ActivityExecutionContext)
ProcessChildCompletedAsync()
MaybeScheduleOutboundActivitiesAsync()
MaybeScheduleBackwardConnectionActivityAsync()
GetMergeModeAsync()
MaybeScheduleOutboundActivityAsync()
MaybeScheduleWaitAllActivityAsync()
MaybeScheduleWaitAllActiveActivityAsync()
MaybeScheduleWaitAnyActivityAsync()
ScheduleOutboundActivityAsync()
SkipOutboundActivityAsync()
CancelRemainingInboundActivitiesAsync()
OnScheduleOutcomesAsync()
OnCounterFlowActivityCanceledAsync()
.ctor(System.String,System.Nullable`1<System.Int32>)
get_Start()
get_Connections()
ScheduleChildrenAsync()
OnScheduleChildActivityAsync()
OnChildCompletedAsync(Elsa.Workflows.ActivityCompletedContext)
OnActivityCanceledAsync(Elsa.Workflows.Signals.CancelSignal,Elsa.Workflows.SignalContext)
CompleteIfNoPendingWorkAsync()
ExecuteBasedOnMode(Elsa.Workflows.ActivityExecutionContext,System.Func`1<System.Threading.Tasks.ValueTask>,System.Func`1<System.Threading.Tasks.ValueTask>)
GetEffectiveExecutionMode(Elsa.Workflows.ActivityExecutionContext)
ParseExecutionMode(System.Object)
GetDefaultModeFromOptions(Elsa.Workflows.ActivityExecutionContext)
OnChildCompletedTokenBasedLogicAsync()
OnTokenFlowActivityCanceledAsync()
GetTokenList(Elsa.Workflows.ActivityExecutionContext)