| | | 1 | | using Elsa.Extensions; |
| | | 2 | | using Elsa.Workflows.Activities.Flowchart.Contracts; |
| | | 3 | | using Elsa.Workflows.Activities.Flowchart.Extensions; |
| | | 4 | | using Elsa.Workflows.Activities.Flowchart.Models; |
| | | 5 | | using Elsa.Workflows.Options; |
| | | 6 | | using Elsa.Workflows.Signals; |
| | | 7 | | |
| | | 8 | | namespace Elsa.Workflows.Activities.Flowchart.Activities; |
| | | 9 | | |
| | | 10 | | public partial class Flowchart |
| | | 11 | | { |
| | | 12 | | private const string ScopeProperty = "FlowScope"; |
| | | 13 | | private const string BackwardConnectionActivityInput = "BackwardConnection"; |
| | | 14 | | |
| | | 15 | | private async ValueTask OnChildCompletedCounterBasedLogicAsync(ActivityCompletedContext context) |
| | | 16 | | { |
| | 117 | 17 | | var flowchartContext = context.TargetContext; |
| | 117 | 18 | | var completedActivityContext = context.ChildContext; |
| | 117 | 19 | | var completedActivity = completedActivityContext.Activity; |
| | 117 | 20 | | var result = context.Result; |
| | | 21 | | |
| | 117 | 22 | | if (flowchartContext.Activity != this) |
| | | 23 | | { |
| | 0 | 24 | | throw new Exception("Target context activity must be this flowchart"); |
| | | 25 | | } |
| | | 26 | | |
| | | 27 | | // If the completed activity's status is anything but "Completed", do not schedule its outbound activities. |
| | 117 | 28 | | if (completedActivityContext.Status != ActivityStatus.Completed) |
| | | 29 | | { |
| | 0 | 30 | | return; |
| | | 31 | | } |
| | | 32 | | |
| | | 33 | | // If the complete activity is a terminal node, complete the flowchart immediately. |
| | 117 | 34 | | if (completedActivity is ITerminalNode) |
| | | 35 | | { |
| | 0 | 36 | | await flowchartContext.CompleteActivityAsync(); |
| | 0 | 37 | | return; |
| | | 38 | | } |
| | | 39 | | |
| | | 40 | | // Determine the outcomes from the completed activity |
| | 117 | 41 | | var outcomes = result is Outcomes o ? o : Outcomes.Default; |
| | | 42 | | |
| | | 43 | | // Schedule the outbound activities |
| | 117 | 44 | | var flowGraph = flowchartContext.GetFlowGraph(); |
| | 117 | 45 | | var flowScope = GetFlowScope(flowchartContext); |
| | 117 | 46 | | var completedActivityExecutedByBackwardConnection = completedActivityContext.ActivityInput.GetValueOrDefault<boo |
| | 117 | 47 | | bool hasScheduledActivity = await ScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, comple |
| | | 48 | | |
| | | 49 | | // If there are not any outbound connections, complete the flowchart activity if there is no other pending work |
| | 117 | 50 | | if (!hasScheduledActivity) |
| | | 51 | | { |
| | 29 | 52 | | await CompleteIfNoPendingWorkAsync(flowchartContext); |
| | | 53 | | } |
| | 117 | 54 | | } |
| | | 55 | | |
| | | 56 | | private FlowScope GetFlowScope(ActivityExecutionContext context) |
| | | 57 | | { |
| | 153 | 58 | | return context.GetProperty(ScopeProperty, () => new FlowScope()); |
| | | 59 | | } |
| | | 60 | | |
| | | 61 | | /// <summary> |
| | | 62 | | /// Schedules outbound activities based on the flowchart's structure and execution state. |
| | | 63 | | /// This method determines whether an activity should be scheduled based on visited connections, |
| | | 64 | | /// forward traversal rules, and backward connections. |
| | | 65 | | /// </summary> |
| | | 66 | | /// <param name="flowGraph">The graph representation of the flowchart.</param> |
| | | 67 | | /// <param name="flowScope">Tracks activity and connection visits.</param> |
| | | 68 | | /// <param name="flowchartContext">The execution context of the flowchart.</param> |
| | | 69 | | /// <param name="activity">The current activity being processed.</param> |
| | | 70 | | /// <param name="outcomes">The outcomes that determine which connections were followed.</param> |
| | | 71 | | /// <param name="completedActivityExecutedByBackwardConnection">Indicates if the completed activity was executed due |
| | | 72 | | /// <returns>True if at least one activity was scheduled; otherwise, false.</returns> |
| | | 73 | | private async ValueTask<bool> ScheduleOutboundActivitiesAsync(FlowGraph flowGraph, FlowScope flowScope, ActivityExec |
| | | 74 | | { |
| | 133 | 75 | | var hasScheduledActivity = false; |
| | | 76 | | |
| | | 77 | | // Check if the activity is dangling (i.e., it is not reachable from the flowchart graph) |
| | 133 | 78 | | if (flowGraph.IsDanglingActivity(activity)) |
| | | 79 | | { |
| | 0 | 80 | | throw new Exception($"Activity {activity.Id} is not reachable from the flowchart graph. Unable to schedule i |
| | | 81 | | } |
| | | 82 | | |
| | | 83 | | // Register the activity as visited unless it was executed due to a backward connection |
| | 133 | 84 | | if (!completedActivityExecutedByBackwardConnection) |
| | | 85 | | { |
| | 133 | 86 | | flowScope.RegisterActivityVisit(activity); |
| | | 87 | | } |
| | | 88 | | |
| | | 89 | | // Process each outbound connection from the current activity |
| | 502 | 90 | | foreach (var outboundConnection in flowGraph.GetOutboundConnections(activity)) |
| | | 91 | | { |
| | 118 | 92 | | var connectionFollowed = outcomes.Names.Contains(outboundConnection.Source.Port); |
| | 118 | 93 | | flowScope.RegisterConnectionVisit(outboundConnection, connectionFollowed); |
| | 118 | 94 | | var outboundActivity = outboundConnection.Target.Activity; |
| | | 95 | | |
| | | 96 | | // Determine scheduling strategy based on connection type |
| | 118 | 97 | | if (flowGraph.IsBackwardConnection(outboundConnection, out var backwardConnectionIsValid)) |
| | | 98 | | { |
| | 0 | 99 | | hasScheduledActivity |= await ScheduleBackwardConnectionActivityAsync(flowGraph, flowchartContext, outbo |
| | | 100 | | } |
| | 118 | 101 | | else if (outboundActivity is not IJoinNode) |
| | | 102 | | { |
| | 94 | 103 | | hasScheduledActivity |= await ScheduleNonJoinActivityAsync(flowGraph, flowScope, flowchartContext, outbo |
| | | 104 | | } |
| | | 105 | | else |
| | | 106 | | { |
| | 24 | 107 | | hasScheduledActivity |= await ScheduleJoinActivityAsync(flowGraph, flowScope, flowchartContext, outbound |
| | | 108 | | } |
| | | 109 | | } |
| | | 110 | | |
| | 133 | 111 | | return hasScheduledActivity; |
| | 133 | 112 | | } |
| | | 113 | | |
| | | 114 | | /// <summary> |
| | | 115 | | /// Schedules an outbound activity that originates from a backward connection. |
| | | 116 | | /// </summary> |
| | | 117 | | private async ValueTask<bool> ScheduleBackwardConnectionActivityAsync(FlowGraph flowGraph, ActivityExecutionContext |
| | | 118 | | { |
| | 0 | 119 | | if (!connectionFollowed) |
| | | 120 | | { |
| | 0 | 121 | | return false; |
| | | 122 | | } |
| | | 123 | | |
| | 0 | 124 | | if (!backwardConnectionIsValid) |
| | | 125 | | { |
| | 0 | 126 | | throw new Exception($"Invalid backward connection: Every path from the source ('{outboundConnection.Source.A |
| | | 127 | | } |
| | | 128 | | |
| | 0 | 129 | | var scheduleWorkOptions = new ScheduleWorkOptions |
| | 0 | 130 | | { |
| | 0 | 131 | | CompletionCallback = OnChildCompletedCounterBasedLogicAsync, |
| | 0 | 132 | | Input = new Dictionary<string, object>() |
| | 0 | 133 | | { |
| | 0 | 134 | | { |
| | 0 | 135 | | BackwardConnectionActivityInput, true |
| | 0 | 136 | | } |
| | 0 | 137 | | } |
| | 0 | 138 | | }; |
| | | 139 | | |
| | 0 | 140 | | await flowchartContext.ScheduleActivityAsync(outboundActivity, scheduleWorkOptions); |
| | 0 | 141 | | return true; |
| | 0 | 142 | | } |
| | | 143 | | |
| | | 144 | | /// <summary> |
| | | 145 | | /// Schedules a non-join activity if all its forward inbound connections have been visited. |
| | | 146 | | /// </summary> |
| | | 147 | | private async ValueTask<bool> ScheduleNonJoinActivityAsync(FlowGraph flowGraph, FlowScope flowScope, ActivityExecuti |
| | | 148 | | { |
| | 94 | 149 | | if (!flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity)) |
| | | 150 | | { |
| | 2 | 151 | | return false; |
| | | 152 | | } |
| | | 153 | | |
| | 92 | 154 | | if (flowScope.HasFollowedInboundConnection(flowGraph, outboundActivity)) |
| | | 155 | | { |
| | 76 | 156 | | await flowchartContext.ScheduleActivityAsync(outboundActivity, OnChildCompletedCounterBasedLogicAsync); |
| | 76 | 157 | | return true; |
| | | 158 | | } |
| | | 159 | | else |
| | | 160 | | { |
| | | 161 | | // Propagate skipped connections by scheduling with Outcomes.Empty |
| | 16 | 162 | | return await ScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, outboundActivity, Outco |
| | | 163 | | } |
| | 94 | 164 | | } |
| | | 165 | | |
| | | 166 | | /// <summary> |
| | | 167 | | /// Schedules a join activity based on inbound connection statuses. |
| | | 168 | | /// </summary> |
| | | 169 | | private async ValueTask<bool> ScheduleJoinActivityAsync(FlowGraph flowGraph, FlowScope flowScope, ActivityExecutionC |
| | | 170 | | { |
| | | 171 | | // Ignore the connection if the join activity has already completed (JoinAny scenario) |
| | 24 | 172 | | if (flowScope.ShouldIgnoreConnection(outboundConnection, outboundActivity)) |
| | | 173 | | { |
| | 0 | 174 | | return false; |
| | | 175 | | } |
| | | 176 | | |
| | | 177 | | // Schedule the join activity only if at least one inbound connection was followed |
| | 24 | 178 | | if (!flowScope.HasFollowedInboundConnection(flowGraph, outboundActivity)) |
| | | 179 | | { |
| | 0 | 180 | | if (flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity)) |
| | | 181 | | { |
| | | 182 | | // Propagate skipped connections by scheduling with Outcomes.Empty |
| | 0 | 183 | | return await ScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, outboundActivity, O |
| | | 184 | | } |
| | | 185 | | |
| | 0 | 186 | | return false; |
| | | 187 | | } |
| | | 188 | | |
| | | 189 | | // Check for an existing execution context for the join activity |
| | 24 | 190 | | var joinContext = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.LastOrDefault(x => |
| | 171 | 191 | | x.ParentActivityExecutionContext == flowchartContext && |
| | 171 | 192 | | x.Activity == outboundActivity && |
| | 171 | 193 | | x.Status is ActivityStatus.Pending or ActivityStatus.Running); |
| | | 194 | | |
| | | 195 | | // If the join activity was already scheduled, do not schedule it again |
| | 24 | 196 | | if (joinContext == null) |
| | | 197 | | { |
| | 46 | 198 | | var activityScheduled = flowchartContext.WorkflowExecutionContext.Scheduler.List().Any(workItem => workItem. |
| | 24 | 199 | | if (activityScheduled) |
| | | 200 | | { |
| | 11 | 201 | | return true; |
| | | 202 | | } |
| | | 203 | | } |
| | | 204 | | |
| | 13 | 205 | | if (joinContext is not { Status: ActivityStatus.Running }) |
| | | 206 | | { |
| | 13 | 207 | | var scheduleWorkOptions = new ScheduleWorkOptions |
| | 13 | 208 | | { |
| | 13 | 209 | | CompletionCallback = OnChildCompletedCounterBasedLogicAsync, |
| | 13 | 210 | | ExistingActivityExecutionContext = joinContext |
| | 13 | 211 | | }; |
| | 13 | 212 | | await flowchartContext.ScheduleActivityAsync(outboundActivity, scheduleWorkOptions); |
| | 13 | 213 | | return true; |
| | | 214 | | } |
| | | 215 | | else |
| | | 216 | | { |
| | 0 | 217 | | return false; |
| | | 218 | | } |
| | 24 | 219 | | } |
| | | 220 | | |
| | | 221 | | public static bool CanWaitAllProceed(ActivityExecutionContext context) |
| | | 222 | | { |
| | 8 | 223 | | var flowchartContext = context.ParentActivityExecutionContext!; |
| | 8 | 224 | | var flowchart = (Flowchart)flowchartContext.Activity; |
| | 8 | 225 | | var flowGraph = flowchartContext.GetFlowGraph(); |
| | 8 | 226 | | var flowScope = flowchart.GetFlowScope(flowchartContext); |
| | 8 | 227 | | var activity = context.Activity; |
| | | 228 | | |
| | 8 | 229 | | return flowScope.AllInboundConnectionsVisited(flowGraph, activity); |
| | | 230 | | } |
| | | 231 | | |
| | | 232 | | private async ValueTask OnScheduleOutcomesAsync(ScheduleActivityOutcomes signal, SignalContext context) |
| | | 233 | | { |
| | 0 | 234 | | var flowchartContext = context.ReceiverActivityExecutionContext; |
| | 0 | 235 | | var schedulingActivityContext = context.SenderActivityExecutionContext; |
| | 0 | 236 | | var schedulingActivity = schedulingActivityContext.Activity; |
| | 0 | 237 | | var outcomes = signal.Outcomes; |
| | 0 | 238 | | var outboundConnections = Connections.Where(connection => connection.Source.Activity == schedulingActivity && ou |
| | 0 | 239 | | var outboundActivities = outboundConnections.Select(x => x.Target.Activity).ToList(); |
| | | 240 | | |
| | 0 | 241 | | if (outboundActivities.Any()) |
| | | 242 | | { |
| | 0 | 243 | | foreach (var activity in outboundActivities) |
| | 0 | 244 | | await flowchartContext.ScheduleActivityAsync(activity, OnChildCompletedCounterBasedLogicAsync); |
| | | 245 | | } |
| | 0 | 246 | | } |
| | | 247 | | |
| | | 248 | | private async ValueTask OnCounterFlowActivityCanceledAsync(CancelSignal signal, SignalContext context) |
| | | 249 | | { |
| | 0 | 250 | | var flowchartContext = context.ReceiverActivityExecutionContext; |
| | 0 | 251 | | await CompleteIfNoPendingWorkAsync(flowchartContext); |
| | 0 | 252 | | var flowchart = (Flowchart)flowchartContext.Activity; |
| | 0 | 253 | | var flowGraph = flowchartContext.GetFlowGraph(); |
| | 0 | 254 | | var flowScope = flowchart.GetFlowScope(flowchartContext); |
| | | 255 | | |
| | | 256 | | // Propagate canceled connections visited count by scheduling with Outcomes.Empty |
| | 0 | 257 | | await flowchart.ScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, context.SenderActivityEx |
| | 0 | 258 | | } |
| | | 259 | | } |