< Summary

Line coverage
81%
Covered lines: 279
Uncovered lines: 62
Coverable lines: 341
Total lines: 774
Line coverage: 81.8%
Branch coverage
76%
Covered branches: 144
Total branches: 188
Branch coverage: 76.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
File 1: OnChildCompletedCounterBasedLogicAsync()100%22100%
File 1: GetStartActivity(...)66.66%131281.25%
File 1: HasPendingWork(...)50%101083.33%
File 1: GetRootActivity()100%11100%
File 1: GetFlowGraph(...)100%11100%
File 1: GetFlowScope(...)100%11100%
File 1: ProcessChildCompletedAsync()75%8885.71%
File 1: MaybeScheduleOutboundActivitiesAsync()75%8885.71%
File 1: MaybeScheduleBackwardConnectionActivityAsync()0%4260%
File 1: GetMergeModeAsync()100%22100%
File 1: MaybeScheduleOutboundActivityAsync()75%4488.88%
File 1: MaybeScheduleWaitAllActivityAsync()75%4483.33%
File 1: MaybeScheduleWaitAllActiveActivityAsync()100%44100%
File 1: MaybeScheduleWaitAnyActivityAsync()50%11863.63%
File 1: ScheduleOutboundActivityAsync()50%22100%
File 1: SkipOutboundActivityAsync()100%11100%
File 1: CancelRemainingInboundActivitiesAsync()100%22100%
File 1: OnScheduleOutcomesAsync()100%210%
File 1: OnCounterFlowActivityCanceledAsync()100%11100%
File 2: .ctor(...)100%11100%
File 2: get_Start()100%11100%
File 2: get_Connections()100%11100%
File 2: ScheduleChildrenAsync()100%22100%
File 2: OnScheduleChildActivityAsync()0%620%
File 2: OnChildCompletedAsync(...)100%11100%
File 2: OnActivityCanceledAsync(...)100%1175%
File 2: CompleteIfNoPendingWorkAsync()100%44100%
File 2: ExecuteBasedOnMode(...)100%22100%
File 2: GetEffectiveExecutionMode(...)75%44100%
File 2: ParseExecutionMode(...)10%181057.14%
File 2: GetDefaultModeFromOptions(...)33.33%6680%
File 3: OnChildCompletedTokenBasedLogicAsync()97.5%404096.47%
File 3: OnTokenFlowActivityCanceledAsync()100%210%
File 3: GetTokenList(...)100%44100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.Counters.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Activities.Flowchart.Contracts;
 3using Elsa.Workflows.Activities.Flowchart.Extensions;
 4using Elsa.Workflows.Activities.Flowchart.Models;
 5using Elsa.Workflows.Options;
 6using Elsa.Workflows.Signals;
 7
 8namespace Elsa.Workflows.Activities.Flowchart.Activities;
 9
 10public partial class Flowchart
 11{
 12    private const string ScopeProperty = "FlowScope";
 13    private const string GraphTransientProperty = "FlowGraph";
 14    private const string BackwardConnectionActivityInput = "BackwardConnection";
 15
 16
 17    private async ValueTask OnChildCompletedCounterBasedLogicAsync(ActivityCompletedContext context)
 18    {
 13619        var flowchartContext = context.TargetContext;
 13620        var completedActivityContext = context.ChildContext;
 13621        var completedActivity = completedActivityContext.Activity;
 13622        var result = context.Result;
 23
 24        // Determine the outcomes from the completed activity
 13625        var outcomes = result is Outcomes o ? o : Outcomes.Default;
 26
 13627        await ProcessChildCompletedAsync(flowchartContext, completedActivity, completedActivityContext, outcomes);
 13628    }
 29
 30    private IActivity? GetStartActivity(ActivityExecutionContext context)
 31    {
 32        // If there's a trigger that triggered this workflow, use that.
 73433        var triggerActivityId = context.WorkflowExecutionContext.TriggerActivityId;
 134434        var triggerActivity = triggerActivityId != null ? Activities.FirstOrDefault(x => x.Id == triggerActivityId) : nu
 35
 73436        if (triggerActivity != null)
 20737            return triggerActivity;
 38
 39        // If an explicit Start activity was provided, use that.
 52740        if (Start != null)
 10641            return Start;
 42
 43        // If there is a Start activity on the flowchart, use that.
 84944        var startActivity = Activities.FirstOrDefault(x => x is Start);
 45
 42146        if (startActivity != null)
 247            return startActivity;
 48
 49        // If there's an activity marked as "Can Start Workflow", use that.
 84550        var canStartWorkflowActivity = Activities.FirstOrDefault(x => x.GetCanStartWorkflow());
 51
 41952        if (canStartWorkflowActivity != null)
 053            return canStartWorkflowActivity;
 54
 55        // If there is a single activity that has no inbound connections, use that.
 41956        var root = GetRootActivity();
 57
 41958        if (root != null)
 41659            return root;
 60
 61        // If no start activity found, return the first activity.
 362        return Activities.FirstOrDefault();
 63    }
 64
 65    /// <summary>
 66    /// Checks if there is any pending work for the flowchart.
 67    /// </summary>
 68    private bool HasPendingWork(ActivityExecutionContext context)
 69    {
 4770        var workflowExecutionContext = context.WorkflowExecutionContext;
 71
 72        // Use HashSet for O(1) lookups
 30173        var activityIds = new HashSet<string>(Activities.Select(x => x.Id));
 74
 75        // Short circuit evaluation - check running instances first before more expensive scheduler check
 24076        if (context.Children.Any(x => activityIds.Contains(x.Activity.Id) && x.Status == ActivityStatus.Running))
 177            return true;
 78
 79        // Scheduler check - optimize to avoid repeated LINQ evaluations
 4680        var scheduledItems = workflowExecutionContext.Scheduler.List().ToList();
 81
 4682        return scheduledItems.Any(workItem =>
 4683        {
 1384            var ownerInstanceId = workItem.Owner?.Id;
 4685
 1386            if (ownerInstanceId == null)
 087                return false;
 4688
 1389            if (ownerInstanceId == context.Id)
 1390                return true;
 4691
 092            var ownerContext = workflowExecutionContext.ActivityExecutionContexts.First(x => x.Id == ownerInstanceId);
 093            return ownerContext.GetAncestors().Any(x => x == context);
 4694        });
 95    }
 96
 97    private IActivity? GetRootActivity()
 98    {
 99        // Get the first activity that has no inbound connections.
 419100        var query =
 419101            from activity in Activities
 426102            let inboundConnections = Connections.Any(x => x.Target.Activity == activity)
 416103            where !inboundConnections
 835104            select activity;
 105
 419106        var rootActivity = query.FirstOrDefault();
 419107        return rootActivity;
 108    }
 109
 110    private FlowGraph GetFlowGraph(ActivityExecutionContext context)
 111    {
 112        // Store in TransientProperties so FlowChart is not persisted in WorkflowState
 176113        return context.TransientProperties.GetOrAdd(GraphTransientProperty, () => new FlowGraph(Connections, GetStartAct
 114    }
 115
 116    private FlowScope GetFlowScope(ActivityExecutionContext context)
 117    {
 170118        return context.GetProperty(ScopeProperty, () => new FlowScope());
 119    }
 120
 121    private async ValueTask ProcessChildCompletedAsync(ActivityExecutionContext flowchartContext, IActivity completedAct
 122    {
 136123        if (flowchartContext.Activity != this)
 124        {
 0125            throw new("Target context activity must be this flowchart");
 126        }
 127
 128        // If the completed activity's status is anything but "Completed", do not schedule its outbound activities.
 136129        if (completedActivityContext.Status != ActivityStatus.Completed)
 130        {
 0131            return;
 132        }
 133
 134        // If the complete activity is a terminal node, complete the flowchart immediately.
 136135        if (completedActivity is ITerminalNode)
 136        {
 1137            await flowchartContext.CompleteActivityAsync();
 1138            return;
 139        }
 140
 141        // Schedule the outbound activities
 135142        var flowGraph = GetFlowGraph(flowchartContext);
 135143        var flowScope = GetFlowScope(flowchartContext);
 135144        var completedActivityExcecutedByBackwardConnection = completedActivityContext.ActivityInput.GetValueOrDefault<bo
 135145        bool hasScheduledActivity = await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, c
 146
 147        // If there are not any outbound connections, complete the flowchart activity if there is no other pending work
 135148        if (!hasScheduledActivity)
 149        {
 46150            await CompleteIfNoPendingWorkAsync(flowchartContext);
 151        }
 136152    }
 153
 154    /// <summary>
 155    /// Schedules outbound activities based on the flowchart's structure and execution state.
 156    /// This method determines whether an activity should be scheduled based on visited connections,
 157    /// forward traversal rules, and backward connections. If outcomes is Outcomes.Empty, it indicates
 158    /// that the activity should be skipped - all outbound connections will be visited and treated as
 159    /// not followed.
 160    /// </summary>
 161    /// <param name="flowGraph">The graph representation of the flowchart.</param>
 162    /// <param name="flowScope">Tracks activity and connection visits.</param>
 163    /// <param name="flowchartContext">The execution context of the flowchart.</param>
 164    /// <param name="activity">The current activity being processed.</param>
 165    /// <param name="completedActivityContext">The execution context of the completed activity (for call stack tracking)
 166    /// <param name="outcomes">The outcomes that determine which connections were followed.</param>
 167    /// <param name="completionCallback">The callback to invoke upon activity completion.</param>
 168    /// <param name="completedActivityExecutedByBackwardConnection">Indicates if the completed activity
 169    /// was executed due to a backward connection.</param>
 170    /// <returns>True if at least one activity was scheduled; otherwise, false.</returns>
 171    private static async ValueTask<bool> MaybeScheduleOutboundActivitiesAsync(FlowGraph flowGraph, FlowScope flowScope, 
 172    {
 152173        bool hasScheduledActivity = false;
 174
 175        // Check if the activity is dangling (i.e., it is not reachable from the flowchart graph)
 152176        if (flowGraph.IsDanglingActivity(activity))
 177        {
 0178            throw new($"Activity {activity.Id} is not reachable from the flowchart graph. Unable to schedule it's outbou
 179        }
 180
 181        // Register the activity as visited unless it was executed due to a backward connection
 152182        if (!completedActivityExecutedByBackwardConnection)
 183        {
 152184            flowScope.RegisterActivityVisit(activity);
 185        }
 186
 187        // Process each outbound connection from the current activity
 568188        foreach (var outboundConnection in flowGraph.GetOutboundConnections(activity))
 189        {
 132190            var connectionFollowed = outcomes.Names.Contains(outboundConnection.Source.Port);
 132191            flowScope.RegisterConnectionVisit(outboundConnection, connectionFollowed);
 132192            var outboundActivity = outboundConnection.Target.Activity;
 193
 194            // Determine the scheduling strategy based on connection-type.
 132195            if (flowGraph.IsBackwardConnection(outboundConnection, out var backwardConnectionIsValid))
 196                // Backward connections are scheduled differently
 0197                hasScheduledActivity |= await MaybeScheduleBackwardConnectionActivityAsync(flowGraph, flowchartContext, 
 198            else
 132199                hasScheduledActivity |= await MaybeScheduleOutboundActivityAsync(flowGraph, flowScope, flowchartContext,
 200        }
 201
 152202        return hasScheduledActivity;
 152203    }
 204
 205    /// <summary>
 206    /// Schedules an outbound activity that originates from a backward connection.
 207    /// </summary>
 208    private static async ValueTask<bool> MaybeScheduleBackwardConnectionActivityAsync(FlowGraph flowGraph, ActivityExecu
 209    {
 0210        if (!connectionFollowed)
 211        {
 0212            return false;
 213        }
 214
 0215        if (!backwardConnectionIsValid)
 216        {
 0217            throw new($"Invalid backward connection: Every path from the source ('{outboundConnection.Source.Activity.Id
 218        }
 219
 0220        var scheduleWorkOptions = new ScheduleWorkOptions
 0221        {
 0222            CompletionCallback = completionCallback,
 0223            Input = new Dictionary<string, object>() { { BackwardConnectionActivityInput, true } },
 0224            SchedulingActivityExecutionId = completedActivityContext?.Id
 0225        };
 226
 0227        await flowchartContext.ScheduleActivityAsync(outboundActivity, scheduleWorkOptions);
 0228        return true;
 0229    }
 230
 231    /// <summary>
 232    /// Determines the merge mode for a given outbound activity. If the outbound activity is a FlowJoin, it retrieves it
 233    /// mode. Otherwise, it defaults to FlowJoinMode.WaitAllActive for implicit joins.
 234    /// </summary>
 235    private static async ValueTask<FlowJoinMode> GetMergeModeAsync(ActivityExecutionContext flowchartContext, IActivity 
 236    {
 132237        if (outboundActivity is FlowJoin)
 238        {
 26239            var outboundActivityExecutionContext = await flowchartContext.WorkflowExecutionContext.CreateActivityExecuti
 26240            return await outboundActivityExecutionContext.EvaluateInputPropertyAsync<FlowJoin, FlowJoinMode>(x => x.Mode
 241        }
 242        else
 243        {
 244            // Implicit join case - treat as WaitAllActive
 106245            return FlowJoinMode.WaitAllActive;
 246        }
 132247    }
 248
 249    /// <summary>
 250    /// Schedules a join activity based on inbound connection statuses.
 251    /// </summary>
 252    private static async ValueTask<bool> MaybeScheduleOutboundActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Ac
 253    {
 132254        FlowJoinMode mode = await GetMergeModeAsync(flowchartContext, outboundActivity);
 255
 132256        return mode switch
 132257        {
 15258            FlowJoinMode.WaitAll => await MaybeScheduleWaitAllActivityAsync(flowGraph, flowScope, flowchartContext, comp
 106259            FlowJoinMode.WaitAllActive => await MaybeScheduleWaitAllActiveActivityAsync(flowGraph, flowScope, flowchartC
 11260            FlowJoinMode.WaitAny => await MaybeScheduleWaitAnyActivityAsync(flowGraph, flowScope, flowchartContext, comp
 0261            _ => throw new($"Unsupported FlowJoinMode: {mode}"),
 132262        };
 132263    }
 264
 265    /// <summary>
 266    /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAll behavior.
 267    /// If all inbound connections were visited, it checks if they were all followed to decide whether to schedule or sk
 268    /// </summary>
 269    private static async ValueTask<bool> MaybeScheduleWaitAllActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Act
 270    {
 15271        if (!flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 272            // Not all inbound connections have been visited yet; do not schedule anything yet.
 7273            return false;
 274
 8275        if (flowScope.AllInboundConnectionsFollowed(flowGraph, outboundActivity))
 276            // All inbound connections were followed; schedule the outbound activity.
 8277            return await ScheduleOutboundActivityAsync(flowchartContext, completedActivityContext, outboundActivity, com
 278        else
 279            // No inbound connections were followed; skip the outbound activity.
 0280            return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, completedActivityContext, out
 15281    }
 282
 283    /// <summary>
 284    /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAllActive behavior.
 285    /// If all inbound connections have been visited, it checks if any were followed to decide whether to schedule or sk
 286    /// </summary>
 287    private static async ValueTask<bool> MaybeScheduleWaitAllActiveActivityAsync(FlowGraph flowGraph, FlowScope flowScop
 288    {
 106289        if (!flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 290            // Not all inbound connections have been visited yet; do not schedule anything yet.
 2291            return false;
 292
 104293        if (flowScope.AnyInboundConnectionsFollowed(flowGraph, outboundActivity))
 294            // At least one inbound connection was followed; schedule the outbound activity.
 88295            return await ScheduleOutboundActivityAsync(flowchartContext, completedActivityContext, outboundActivity, com
 296        else
 297            // No inbound connections were followed; skip the outbound activity.
 16298            return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, completedActivityContext, out
 106299    }
 300
 301    /// <summary>
 302    /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAny behavior.
 303    /// If any inbound connection has been followed, it schedules the activity and cancels remaining inbound activities.
 304    /// If a subsequent inbound connection is followed after the activity has been scheduled, it ignores it.
 305    /// </summary>
 306    private static async ValueTask<bool> MaybeScheduleWaitAnyActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Act
 307    {
 11308        if (flowScope.ShouldIgnoreConnection(outboundConnection, outboundActivity))
 309            // Ignore the connection if the outbound activity has already completed (JoinAny scenario)
 0310            return false;
 311
 20312        if (flowchartContext.WorkflowExecutionContext.Scheduler.List().Any(workItem => workItem.Owner == flowchartContex
 313            // Ignore the connection if the outbound activity is already scheduled
 4314            return false;
 315
 7316        if (flowScope.AnyInboundConnectionsFollowed(flowGraph, outboundActivity))
 317        {
 318            // This is the first inbound connection followed; schedule the outbound activity
 7319            var scheduleResponse = await ScheduleOutboundActivityAsync(flowchartContext, completedActivityContext, outbo
 320            // An inbound connection has been followed; cancel remaining inbound activities
 7321            await CancelRemainingInboundActivitiesAsync(flowchartContext, outboundActivity);
 322
 7323            return scheduleResponse;
 324        }
 325
 0326        if (flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity))
 327            // All inbound connections have been visited without any being followed; skip the outbound activity
 0328            return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, completedActivityContext, out
 329
 330        // No inbound connections have been followed yet; do not schedule anything yet.
 0331        return false;
 11332    }
 333
 334    /// <summary>
 335    /// Schedules the outbound activity.
 336    /// </summary>
 337    private static async ValueTask<bool> ScheduleOutboundActivityAsync(ActivityExecutionContext flowchartContext, Activi
 338    {
 103339        var options = new ScheduleWorkOptions
 103340        {
 103341            CompletionCallback = completionCallback,
 103342            SchedulingActivityExecutionId = completedActivityContext?.Id
 103343        };
 103344        await flowchartContext.ScheduleActivityAsync(outboundActivity, options);
 103345        return true;
 103346    }
 347
 348    /// <summary>
 349    /// Skips the outbound activity by propagating skipped connections.
 350    /// </summary>
 351    private static async ValueTask<bool> SkipOutboundActivityAsync(FlowGraph flowGraph, FlowScope flowScope, ActivityExe
 352    {
 16353        return await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, outboundActivity, comp
 16354    }
 355
 356    private static async ValueTask CancelRemainingInboundActivitiesAsync(ActivityExecutionContext flowchartContext, IAct
 357    {
 7358        var flowchart = (Flowchart)flowchartContext.Activity;
 7359        var flowGraph = flowchart.GetFlowGraph(flowchartContext);
 7360        var ancestorActivities = flowGraph.GetAncestorActivities(outboundActivity);
 44361        var inboundActivityExecutionContexts = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.Where
 362
 363        // Cancel each ancestor activity.
 58364        foreach (var activityExecutionContext in inboundActivityExecutionContexts)
 365        {
 22366            await activityExecutionContext.CancelActivityAsync();
 367        }
 7368    }
 369
 370    private async ValueTask OnScheduleOutcomesAsync(ScheduleActivityOutcomes signal, SignalContext context)
 371    {
 0372        var flowchartContext = context.ReceiverActivityExecutionContext;
 0373        var schedulingActivityContext = context.SenderActivityExecutionContext;
 0374        var schedulingActivity = schedulingActivityContext.Activity;
 0375        var outcomes = new Outcomes(signal.Outcomes);
 0376    }
 377
 378    private async ValueTask OnCounterFlowActivityCanceledAsync(CancelSignal signal, SignalContext context)
 379    {
 1380        var flowchartContext = context.ReceiverActivityExecutionContext;
 1381        await CompleteIfNoPendingWorkAsync(flowchartContext);
 1382        var flowchart = (Flowchart)flowchartContext.Activity;
 1383        var flowGraph = flowchartContext.GetFlowGraph();
 1384        var flowScope = flowchart.GetFlowScope(flowchartContext);
 385
 386        // Propagate canceled connections visited count by scheduling with Outcomes.Empty
 1387        await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, context.SenderActivityExecuti
 1388    }
 389}

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.cs

#LineLine coverage
 1using System.ComponentModel;
 2using System.Runtime.CompilerServices;
 3using Elsa.Workflows.Activities.Flowchart.Models;
 4using Elsa.Workflows.Activities.Flowchart.Options;
 5using Elsa.Workflows.Attributes;
 6using Elsa.Workflows.Signals;
 7using Microsoft.Extensions.DependencyInjection;
 8using Microsoft.Extensions.Options;
 9
 10namespace Elsa.Workflows.Activities.Flowchart.Activities;
 11
 12/// <summary>
 13/// A flowchart consists of a collection of activities and connections between them.
 14/// </summary>
 15[Activity("Elsa", "Flow", "A flowchart is a collection of activities and connections between them.")]
 16[Browsable(false)]
 17public partial class Flowchart : Container
 18{
 19    /// <summary>
 20    /// The property key used to store the flowchart execution mode in <see cref="WorkflowExecutionContext.Properties"/>
 21    /// </summary>
 22    public const string ExecutionModePropertyKey = "Flowchart:ExecutionMode";
 23
 24    /// <summary>
 25    /// Set this to <c>false</c> from your program file in case you wish to use the old counter based model.
 26    /// This static field is used as a final fallback when no execution mode is specified via options or workflow execut
 27    /// Note: Prefer using <see cref="FlowchartOptions"/> configured via DI for application-wide settings.
 28    /// </summary>
 29    // ReSharper disable once GrammarMistakeInComment
 30    public static bool UseTokenFlow = false; // Default to false to maintain the same behavior with 3.5.2 out of the box
 31
 32    /// <inheritdoc />
 220033    public Flowchart([CallerFilePath] string? source = null, [CallerLineNumber] int? line = null) : base(source, line)
 34    {
 220035        OnSignalReceived<ScheduleActivityOutcomes>(OnScheduleOutcomesAsync);
 220036        OnSignalReceived<ScheduleChildActivity>(OnScheduleChildActivityAsync);
 220037        OnSignalReceived<CancelSignal>(OnActivityCanceledAsync);
 220038    }
 39
 40    /// <summary>
 41    /// The activity to execute when the flowchart starts.
 42    /// </summary>
 454143    [Port] [Browsable(false)] public IActivity? Start { get; set; }
 44
 45    /// <summary>
 46    /// A list of connections between activities.
 47    /// </summary>
 625348    public ICollection<Connection> Connections { get; set; } = new List<Connection>();
 49
 50    /// <inheritdoc />
 51    protected override async ValueTask ScheduleChildrenAsync(ActivityExecutionContext context)
 52    {
 70053        var startActivity = GetStartActivity(context);
 54
 70055        if (startActivity == null)
 56        {
 57            // Nothing else to execute.
 358            await context.CompleteActivityAsync();
 359            return;
 60        }
 61
 69762        await context.ScheduleActivityAsync(startActivity, OnChildCompletedAsync);
 70063    }
 64
 65    private async ValueTask OnScheduleChildActivityAsync(ScheduleChildActivity signal, SignalContext context)
 66    {
 067        var flowchartContext = context.ReceiverActivityExecutionContext;
 068        var activity = signal.Activity;
 069        var activityExecutionContext = signal.ActivityExecutionContext;
 70
 071        if (activityExecutionContext != null)
 72        {
 073            await flowchartContext.ScheduleActivityAsync(activityExecutionContext.Activity, new()
 074            {
 075                ExistingActivityExecutionContext = activityExecutionContext,
 076                CompletionCallback = OnChildCompletedAsync,
 077                Input = signal.Input
 078            });
 79        }
 80        else
 81        {
 082            await flowchartContext.ScheduleActivityAsync(activity, new()
 083            {
 084                CompletionCallback = OnChildCompletedAsync,
 085                Input = signal.Input
 086            });
 87        }
 088    }
 89
 90    private ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
 91    {
 79192        return ExecuteBasedOnMode(
 79193            context.TargetContext,
 65594            () => OnChildCompletedTokenBasedLogicAsync(context),
 92795            () => OnChildCompletedCounterBasedLogicAsync(context));
 96    }
 97
 98    private ValueTask OnActivityCanceledAsync(CancelSignal signal, SignalContext context)
 99    {
 1100        return ExecuteBasedOnMode(
 1101            context.ReceiverActivityExecutionContext,
 0102            () => OnTokenFlowActivityCanceledAsync(signal, context),
 2103            () => OnCounterFlowActivityCanceledAsync(signal, context));
 104    }
 105
 106    private async Task CompleteIfNoPendingWorkAsync(ActivityExecutionContext context)
 107    {
 47108        var hasPendingWork = HasPendingWork(context);
 109
 47110        if (!hasPendingWork)
 111        {
 167112            var hasFaultedActivities = context.Children.Any(x => x.Status == ActivityStatus.Faulted);
 113
 33114            if (!hasFaultedActivities)
 115            {
 33116                await context.CompleteActivityAsync();
 117            }
 118        }
 47119    }
 120
 121    private static ValueTask ExecuteBasedOnMode(ActivityExecutionContext context, Func<ValueTask> tokenBasedAction, Func
 122    {
 792123        var mode = GetEffectiveExecutionMode(context);
 124
 792125        return mode switch
 792126        {
 655127            FlowchartExecutionMode.TokenBased => tokenBasedAction(),
 137128            FlowchartExecutionMode.CounterBased or FlowchartExecutionMode.Default or _ => counterBasedAction()
 792129        };
 130    }
 131
 132    /// <summary>
 133    /// Gets the effective execution mode for this flowchart execution.
 134    /// Priority: WorkflowExecutionContext.Properties > FlowchartOptions (DI) > Static UseTokenFlow flag
 135    /// </summary>
 136    private static FlowchartExecutionMode GetEffectiveExecutionMode(ActivityExecutionContext context)
 137    {
 792138        var workflowExecutionContext = context.WorkflowExecutionContext;
 139
 792140        if (!workflowExecutionContext.Properties.TryGetValue(ExecutionModePropertyKey, out var modeValue))
 641141            return GetDefaultModeFromOptions(context);
 142
 151143        var mode = ParseExecutionMode(modeValue);
 151144        return mode != FlowchartExecutionMode.Default ? mode : GetDefaultModeFromOptions(context);
 145    }
 146
 147    private static FlowchartExecutionMode ParseExecutionMode(object modeValue)
 148    {
 151149        return modeValue switch
 151150        {
 151151            FlowchartExecutionMode executionMode => executionMode,
 0152            string str when Enum.TryParse<FlowchartExecutionMode>(str, true, out var parsed) => parsed,
 0153            int intValue when Enum.IsDefined(typeof(FlowchartExecutionMode), intValue) => (FlowchartExecutionMode)intVal
 0154            _ => FlowchartExecutionMode.Default
 151155        };
 156    }
 157
 158    private static FlowchartExecutionMode GetDefaultModeFromOptions(ActivityExecutionContext context)
 159    {
 641160        var options = context.WorkflowExecutionContext.ServiceProvider.GetService<IOptions<FlowchartOptions>>();
 641161        var mode = options?.Value.DefaultExecutionMode ?? FlowchartExecutionMode.Default;
 641162        if (mode == FlowchartExecutionMode.Default)
 0163            return UseTokenFlow ? FlowchartExecutionMode.TokenBased : FlowchartExecutionMode.CounterBased;
 641164        return mode;
 165    }
 166}

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Activities/Flowchart/Activities/Flowchart.Tokens.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Activities.Flowchart.Extensions;
 3using Elsa.Workflows.Activities.Flowchart.Models;
 4using Elsa.Workflows.Options;
 5using Elsa.Workflows.Signals;
 6
 7namespace Elsa.Workflows.Activities.Flowchart.Activities;
 8
 9public partial class Flowchart
 10{
 11    private const string TokenStoreKey = "Flowchart.Tokens";
 12
 13    private async ValueTask OnChildCompletedTokenBasedLogicAsync(ActivityCompletedContext ctx)
 14    {
 97615        var flowContext = ctx.TargetContext;
 97616        var completedActivity = ctx.ChildContext.Activity;
 97617        var flowGraph = flowContext.GetFlowGraph();
 97618        var tokens = GetTokenList(flowContext);
 19
 20        // If the completed activity is a terminal node, complete the flowchart immediately.
 97621        if (completedActivity is ITerminalNode)
 22        {
 023            tokens.Clear();
 024            await flowContext.CompleteActivityAsync();
 025            return;
 26        }
 27
 28        // Emit tokens for active outcomes.
 97629        var outcomes = (ctx.Result as Outcomes ?? Outcomes.Default).Names;
 97630        var outboundConnections = flowGraph.GetOutboundConnections(completedActivity);
 132631        var activeOutboundConnections = outboundConnections.Where(x => outcomes.Contains(x.Source.Port)).Distinct().ToLi
 32
 262633        foreach (var connection in activeOutboundConnections)
 33734            tokens.Add(Token.Create(connection.Source.Activity, connection.Target.Activity, connection.Source.Port));
 35
 36        // Consume inbound tokens to the completed activity.
 169637        var inboundTokens = tokens.Where(t => t.ToActivityId == completedActivity.Id && t is { Consumed: false, Blocked:
 262638        foreach (var t in inboundTokens)
 33739            t.Consume();
 40
 41        // Schedule next activities based on merge modes.
 262642        foreach (var connection in activeOutboundConnections)
 43        {
 33744            var targetActivity = connection.Target.Activity;
 33745            var mergeMode = await targetActivity.GetMergeModeAsync(ctx.ChildContext);
 46
 47            switch (mergeMode)
 48            {
 49                case MergeMode.Cascade:
 50                case MergeMode.Race:
 951                    if (mergeMode == MergeMode.Race)
 952                        await flowContext.CancelInboundAncestorsAsync(targetActivity);
 53
 54                    // Check for existing blocked token on this specific connection.
 955                    var existingBlockedToken = tokens.FirstOrDefault(t =>
 3456                        t.ToActivityId == targetActivity.Id &&
 3457                        t.FromActivityId == connection.Source.Activity.Id &&
 3458                        t.Outcome == connection.Source.Port &&
 3459                        t.Blocked);
 60
 961                    if (existingBlockedToken == null)
 62                    {
 63                        // Schedule the target.
 564                        var options = new ScheduleWorkOptions
 565                        {
 566                            CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 567                            SchedulingActivityExecutionId = ctx.ChildContext.Id
 568                        };
 569                        await flowContext.ScheduleActivityAsync(targetActivity, options);
 70
 71                        // Block other inbound connections (adjust per mode if needed).
 572                        var otherInboundConnections = flowGraph.GetForwardInboundConnections(targetActivity)
 973                            .Where(x => x.Source.Activity != completedActivity)
 574                            .ToList();
 75
 1876                        foreach (var inboundConnection in otherInboundConnections)
 77                        {
 478                            var blockedToken = Token.Create(inboundConnection.Source.Activity, inboundConnection.Target.
 479                            tokens.Add(blockedToken);
 80                        }
 81                    }
 82                    else
 83                    {
 84                        // Consume the block without scheduling.
 485                        existingBlockedToken.Consume();
 86                    }
 87
 488                    break;
 89
 90                case MergeMode.Merge:
 91                    // Wait for tokens from all forward inbound connections.
 92                    // Unlike Converge, this ignores backward connections (loops).
 93                    // Schedule on arrival for <=1 forward inbound (e.g., loops, sequential).
 794                    var inboundConnectionsMerge = flowGraph.GetForwardInboundConnections(targetActivity);
 95
 796                    if (inboundConnectionsMerge.Count > 1)
 97                    {
 698                        var hasAllTokens = inboundConnectionsMerge.All(inbound =>
 1299                            tokens.Any(t =>
 33100                                t is { Consumed: false, Blocked: false } &&
 33101                                t.FromActivityId == inbound.Source.Activity.Id &&
 33102                                t.ToActivityId == targetActivity.Id &&
 33103                                t.Outcome == inbound.Source.Port
 12104                            )
 6105                        );
 106
 6107                        if (hasAllTokens)
 108                        {
 3109                            var options = new ScheduleWorkOptions
 3110                            {
 3111                                CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 3112                                SchedulingActivityExecutionId = ctx.ChildContext.Id
 3113                            };
 3114                            await flowContext.ScheduleActivityAsync(targetActivity, options);
 115                        }
 116                    }
 117                    else
 118                    {
 1119                        var options = new ScheduleWorkOptions
 1120                        {
 1121                            CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 1122                            SchedulingActivityExecutionId = ctx.ChildContext.Id
 1123                        };
 1124                        await flowContext.ScheduleActivityAsync(targetActivity, options);
 125                    }
 126
 1127                    break;
 128
 129                case MergeMode.Converge:
 130                    // Strictest mode: Wait for tokens from ALL inbound connections (forward + backward).
 131                    // Requires every possible inbound path to execute before proceeding.
 13132                    var allInboundConnectionsConverge = flowGraph.GetInboundConnections(targetActivity);
 133
 13134                    if (allInboundConnectionsConverge.Count > 1)
 135                    {
 12136                        var hasAllTokens = allInboundConnectionsConverge.All(inbound =>
 24137                            tokens.Any(t =>
 66138                                t is { Consumed: false, Blocked: false } &&
 66139                                t.FromActivityId == inbound.Source.Activity.Id &&
 66140                                t.ToActivityId == targetActivity.Id &&
 66141                                t.Outcome == inbound.Source.Port
 24142                            )
 12143                        );
 144
 12145                        if (hasAllTokens)
 146                        {
 6147                            var options = new ScheduleWorkOptions
 6148                            {
 6149                                CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 6150                                SchedulingActivityExecutionId = ctx.ChildContext.Id
 6151                            };
 6152                            await flowContext.ScheduleActivityAsync(targetActivity, options);
 153                        }
 154                    }
 155                    else
 156                    {
 1157                        var options = new ScheduleWorkOptions
 1158                        {
 1159                            CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 1160                            SchedulingActivityExecutionId = ctx.ChildContext.Id
 1161                        };
 1162                        await flowContext.ScheduleActivityAsync(targetActivity, options);
 163                    }
 164
 1165                    break;
 166
 167                case MergeMode.Stream:
 168                default:
 169                    // Flows freely - approximation that proceeds when upstream completes, ignoring dead paths.
 308170                    var inboundConnectionsStream = flowGraph.GetForwardInboundConnections(targetActivity);
 308171                    var hasUnconsumed = inboundConnectionsStream.Any(inbound =>
 772172                        tokens.Any(t => !t.Consumed && !t.Blocked && t.ToActivityId == inbound.Source.Activity.Id)
 308173                    );
 174
 308175                    if (!hasUnconsumed)
 176                    {
 305177                        var options = new ScheduleWorkOptions
 305178                        {
 305179                            CompletionCallback = OnChildCompletedTokenBasedLogicAsync,
 305180                            SchedulingActivityExecutionId = ctx.ChildContext.Id
 305181                        };
 305182                        await flowContext.ScheduleActivityAsync(targetActivity, options);
 183                    }
 184                    break;
 185            }
 337186        }
 187
 188        // Complete flowchart if no pending work.
 976189        if (!flowContext.HasPendingWork())
 190        {
 655191            tokens.Clear();
 655192            await flowContext.CompleteActivityAsync();
 193        }
 194
 195        // Purge consumed tokens for the completed activity.
 1459196        tokens.RemoveWhere(t => t.ToActivityId == completedActivity.Id && t.Consumed);
 976197    }
 198
 199    private async ValueTask OnTokenFlowActivityCanceledAsync(CancelSignal signal, SignalContext context)
 200    {
 0201        var flowchartContext = context.ReceiverActivityExecutionContext;
 0202        var cancelledActivityContext = context.SenderActivityExecutionContext;
 203
 204        // Remove all tokens from and to this activity.
 0205        var tokenList = GetTokenList(flowchartContext);
 0206        tokenList.RemoveWhere(x => x.FromActivityId == cancelledActivityContext.Activity.Id || x.ToActivityId == cancell
 0207        await CompleteIfNoPendingWorkAsync(flowchartContext);
 0208    }
 209
 210    internal List<Token> GetTokenList(ActivityExecutionContext context)
 211    {
 1631212        if (context.Properties.TryGetValue(TokenStoreKey, out var obj) && obj is List<Token> list)
 976213            return list;
 214
 655215        var newList = new List<Token>();
 655216        context.Properties[TokenStoreKey] = newList;
 655217        return newList;
 218    }
 219}

Methods/Properties

OnChildCompletedCounterBasedLogicAsync()
GetStartActivity(Elsa.Workflows.ActivityExecutionContext)
HasPendingWork(Elsa.Workflows.ActivityExecutionContext)
GetRootActivity()
GetFlowGraph(Elsa.Workflows.ActivityExecutionContext)
GetFlowScope(Elsa.Workflows.ActivityExecutionContext)
ProcessChildCompletedAsync()
MaybeScheduleOutboundActivitiesAsync()
MaybeScheduleBackwardConnectionActivityAsync()
GetMergeModeAsync()
MaybeScheduleOutboundActivityAsync()
MaybeScheduleWaitAllActivityAsync()
MaybeScheduleWaitAllActiveActivityAsync()
MaybeScheduleWaitAnyActivityAsync()
ScheduleOutboundActivityAsync()
SkipOutboundActivityAsync()
CancelRemainingInboundActivitiesAsync()
OnScheduleOutcomesAsync()
OnCounterFlowActivityCanceledAsync()
.ctor(System.String,System.Nullable`1<System.Int32>)
get_Start()
get_Connections()
ScheduleChildrenAsync()
OnScheduleChildActivityAsync()
OnChildCompletedAsync(Elsa.Workflows.ActivityCompletedContext)
OnActivityCanceledAsync(Elsa.Workflows.Signals.CancelSignal,Elsa.Workflows.SignalContext)
CompleteIfNoPendingWorkAsync()
ExecuteBasedOnMode(Elsa.Workflows.ActivityExecutionContext,System.Func`1<System.Threading.Tasks.ValueTask>,System.Func`1<System.Threading.Tasks.ValueTask>)
GetEffectiveExecutionMode(Elsa.Workflows.ActivityExecutionContext)
ParseExecutionMode(System.Object)
GetDefaultModeFromOptions(Elsa.Workflows.ActivityExecutionContext)
OnChildCompletedTokenBasedLogicAsync()
OnTokenFlowActivityCanceledAsync()
GetTokenList(Elsa.Workflows.ActivityExecutionContext)