| | | 1 | | using Elsa.Extensions; |
| | | 2 | | using Elsa.Workflows.Activities.Flowchart.Contracts; |
| | | 3 | | using Elsa.Workflows.Activities.Flowchart.Extensions; |
| | | 4 | | using Elsa.Workflows.Activities.Flowchart.Models; |
| | | 5 | | using Elsa.Workflows.Options; |
| | | 6 | | using Elsa.Workflows.Signals; |
| | | 7 | | |
| | | 8 | | namespace Elsa.Workflows.Activities.Flowchart.Activities; |
| | | 9 | | |
| | | 10 | | public partial class Flowchart |
| | | 11 | | { |
| | | 12 | | private const string ScopeProperty = "FlowScope"; |
| | | 13 | | private const string GraphTransientProperty = "FlowGraph"; |
| | | 14 | | private const string BackwardConnectionActivityInput = "BackwardConnection"; |
| | | 15 | | |
| | | 16 | | |
| | | 17 | | private async ValueTask OnChildCompletedCounterBasedLogicAsync(ActivityCompletedContext context) |
| | | 18 | | { |
| | 130 | 19 | | var flowchartContext = context.TargetContext; |
| | 130 | 20 | | var completedActivityContext = context.ChildContext; |
| | 130 | 21 | | var completedActivity = completedActivityContext.Activity; |
| | 130 | 22 | | var result = context.Result; |
| | | 23 | | |
| | | 24 | | // Determine the outcomes from the completed activity |
| | 130 | 25 | | var outcomes = result is Outcomes o ? o : Outcomes.Default; |
| | | 26 | | |
| | 130 | 27 | | await ProcessChildCompletedAsync(flowchartContext, completedActivity, completedActivityContext, outcomes); |
| | 130 | 28 | | } |
| | | 29 | | |
| | | 30 | | private IActivity? GetStartActivity(ActivityExecutionContext context) |
| | | 31 | | { |
| | | 32 | | // If there's a trigger that triggered this workflow, use that. |
| | 732 | 33 | | var triggerActivityId = context.WorkflowExecutionContext.TriggerActivityId; |
| | 1342 | 34 | | var triggerActivity = triggerActivityId != null ? Activities.FirstOrDefault(x => x.Id == triggerActivityId) : nu |
| | | 35 | | |
| | 732 | 36 | | if (triggerActivity != null) |
| | 207 | 37 | | return triggerActivity; |
| | | 38 | | |
| | | 39 | | // If an explicit Start activity was provided, use that. |
| | 525 | 40 | | if (Start != null) |
| | 104 | 41 | | return Start; |
| | | 42 | | |
| | | 43 | | // If there is a Start activity on the flowchart, use that. |
| | 849 | 44 | | var startActivity = Activities.FirstOrDefault(x => x is Start); |
| | | 45 | | |
| | 421 | 46 | | if (startActivity != null) |
| | 2 | 47 | | return startActivity; |
| | | 48 | | |
| | | 49 | | // If there's an activity marked as "Can Start Workflow", use that. |
| | 845 | 50 | | var canStartWorkflowActivity = Activities.FirstOrDefault(x => x.GetCanStartWorkflow()); |
| | | 51 | | |
| | 419 | 52 | | if (canStartWorkflowActivity != null) |
| | 0 | 53 | | return canStartWorkflowActivity; |
| | | 54 | | |
| | | 55 | | // If there is a single activity that has no inbound connections, use that. |
| | 419 | 56 | | var root = GetRootActivity(); |
| | | 57 | | |
| | 419 | 58 | | if (root != null) |
| | 416 | 59 | | return root; |
| | | 60 | | |
| | | 61 | | // If no start activity found, return the first activity. |
| | 3 | 62 | | return Activities.FirstOrDefault(); |
| | | 63 | | } |
| | | 64 | | |
| | | 65 | | /// <summary> |
| | | 66 | | /// Checks if there is any pending work for the flowchart. |
| | | 67 | | /// </summary> |
| | | 68 | | private bool HasPendingWork(ActivityExecutionContext context) |
| | | 69 | | { |
| | 44 | 70 | | var workflowExecutionContext = context.WorkflowExecutionContext; |
| | | 71 | | |
| | | 72 | | // Use HashSet for O(1) lookups |
| | 283 | 73 | | var activityIds = new HashSet<string>(Activities.Select(x => x.Id)); |
| | | 74 | | |
| | | 75 | | // Short circuit evaluation - check running instances first before more expensive scheduler check |
| | 220 | 76 | | if (context.Children.Any(x => activityIds.Contains(x.Activity.Id) && x.Status == ActivityStatus.Running)) |
| | 0 | 77 | | return true; |
| | | 78 | | |
| | | 79 | | // Scheduler check - optimize to avoid repeated LINQ evaluations |
| | 44 | 80 | | var scheduledItems = workflowExecutionContext.Scheduler.List().ToList(); |
| | | 81 | | |
| | 44 | 82 | | return scheduledItems.Any(workItem => |
| | 44 | 83 | | { |
| | 12 | 84 | | var ownerInstanceId = workItem.Owner?.Id; |
| | 44 | 85 | | |
| | 12 | 86 | | if (ownerInstanceId == null) |
| | 0 | 87 | | return false; |
| | 44 | 88 | | |
| | 12 | 89 | | if (ownerInstanceId == context.Id) |
| | 12 | 90 | | return true; |
| | 44 | 91 | | |
| | 0 | 92 | | var ownerContext = workflowExecutionContext.ActivityExecutionContexts.First(x => x.Id == ownerInstanceId); |
| | 0 | 93 | | return ownerContext.GetAncestors().Any(x => x == context); |
| | 44 | 94 | | }); |
| | | 95 | | } |
| | | 96 | | |
| | | 97 | | private IActivity? GetRootActivity() |
| | | 98 | | { |
| | | 99 | | // Get the first activity that has no inbound connections. |
| | 419 | 100 | | var query = |
| | 419 | 101 | | from activity in Activities |
| | 426 | 102 | | let inboundConnections = Connections.Any(x => x.Target.Activity == activity) |
| | 416 | 103 | | where !inboundConnections |
| | 835 | 104 | | select activity; |
| | | 105 | | |
| | 419 | 106 | | var rootActivity = query.FirstOrDefault(); |
| | 419 | 107 | | return rootActivity; |
| | | 108 | | } |
| | | 109 | | |
| | | 110 | | private FlowGraph GetFlowGraph(ActivityExecutionContext context) |
| | | 111 | | { |
| | | 112 | | // Store in TransientProperties so FlowChart is not persisted in WorkflowState |
| | 167 | 113 | | return context.TransientProperties.GetOrAdd(GraphTransientProperty, () => new FlowGraph(Connections, GetStartAct |
| | | 114 | | } |
| | | 115 | | |
| | | 116 | | private FlowScope GetFlowScope(ActivityExecutionContext context) |
| | | 117 | | { |
| | 162 | 118 | | return context.GetProperty(ScopeProperty, () => new FlowScope()); |
| | | 119 | | } |
| | | 120 | | |
| | | 121 | | private async ValueTask ProcessChildCompletedAsync(ActivityExecutionContext flowchartContext, IActivity completedAct |
| | | 122 | | { |
| | 130 | 123 | | if (flowchartContext.Activity != this) |
| | | 124 | | { |
| | 0 | 125 | | throw new("Target context activity must be this flowchart"); |
| | | 126 | | } |
| | | 127 | | |
| | | 128 | | // If the completed activity's status is anything but "Completed", do not schedule its outbound activities. |
| | 130 | 129 | | if (completedActivityContext.Status != ActivityStatus.Completed) |
| | | 130 | | { |
| | 0 | 131 | | return; |
| | | 132 | | } |
| | | 133 | | |
| | | 134 | | // If the complete activity is a terminal node, complete the flowchart immediately. |
| | 130 | 135 | | if (completedActivity is ITerminalNode) |
| | | 136 | | { |
| | 1 | 137 | | await flowchartContext.CompleteActivityAsync(); |
| | 1 | 138 | | return; |
| | | 139 | | } |
| | | 140 | | |
| | | 141 | | // Schedule the outbound activities |
| | 129 | 142 | | var flowGraph = GetFlowGraph(flowchartContext); |
| | 129 | 143 | | var flowScope = GetFlowScope(flowchartContext); |
| | 129 | 144 | | var completedActivityExcecutedByBackwardConnection = completedActivityContext.ActivityInput.GetValueOrDefault<bo |
| | 129 | 145 | | bool hasScheduledActivity = await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, c |
| | | 146 | | |
| | | 147 | | // If there are not any outbound connections, complete the flowchart activity if there is no other pending work |
| | 129 | 148 | | if (!hasScheduledActivity) |
| | | 149 | | { |
| | 44 | 150 | | await CompleteIfNoPendingWorkAsync(flowchartContext); |
| | | 151 | | } |
| | 130 | 152 | | } |
| | | 153 | | |
| | | 154 | | /// <summary> |
| | | 155 | | /// Schedules outbound activities based on the flowchart's structure and execution state. |
| | | 156 | | /// This method determines whether an activity should be scheduled based on visited connections, |
| | | 157 | | /// forward traversal rules, and backward connections. If outcomes is Outcomes.Empty, it indicates |
| | | 158 | | /// that the activity should be skipped - all outbound connections will be visited and treated as |
| | | 159 | | /// not followed. |
| | | 160 | | /// </summary> |
| | | 161 | | /// <param name="flowGraph">The graph representation of the flowchart.</param> |
| | | 162 | | /// <param name="flowScope">Tracks activity and connection visits.</param> |
| | | 163 | | /// <param name="flowchartContext">The execution context of the flowchart.</param> |
| | | 164 | | /// <param name="activity">The current activity being processed.</param> |
| | | 165 | | /// <param name="outcomes">The outcomes that determine which connections were followed.</param> |
| | | 166 | | /// <param name="completionCallback">The callback to invoke upon activity completion.</param> |
| | | 167 | | /// <param name="completedActivityExecutedByBackwardConnection">Indicates if the completed activity |
| | | 168 | | /// was executed due to a backward connection.</param> |
| | | 169 | | /// <returns>True if at least one activity was scheduled; otherwise, false.</returns> |
| | | 170 | | private static async ValueTask<bool> MaybeScheduleOutboundActivitiesAsync(FlowGraph flowGraph, FlowScope flowScope, |
| | | 171 | | { |
| | 145 | 172 | | bool hasScheduledActivity = false; |
| | | 173 | | |
| | | 174 | | // Check if the activity is dangling (i.e., it is not reachable from the flowchart graph) |
| | 145 | 175 | | if (flowGraph.IsDanglingActivity(activity)) |
| | | 176 | | { |
| | 0 | 177 | | throw new($"Activity {activity.Id} is not reachable from the flowchart graph. Unable to schedule it's outbou |
| | | 178 | | } |
| | | 179 | | |
| | | 180 | | // Register the activity as visited unless it was executed due to a backward connection |
| | 145 | 181 | | if (!completedActivityExecutedByBackwardConnection) |
| | | 182 | | { |
| | 145 | 183 | | flowScope.RegisterActivityVisit(activity); |
| | | 184 | | } |
| | | 185 | | |
| | | 186 | | // Process each outbound connection from the current activity |
| | 542 | 187 | | foreach (var outboundConnection in flowGraph.GetOutboundConnections(activity)) |
| | | 188 | | { |
| | 126 | 189 | | var connectionFollowed = outcomes.Names.Contains(outboundConnection.Source.Port); |
| | 126 | 190 | | flowScope.RegisterConnectionVisit(outboundConnection, connectionFollowed); |
| | 126 | 191 | | var outboundActivity = outboundConnection.Target.Activity; |
| | | 192 | | |
| | | 193 | | // Determine the scheduling strategy based on connection-type. |
| | 126 | 194 | | if (flowGraph.IsBackwardConnection(outboundConnection, out var backwardConnectionIsValid)) |
| | | 195 | | // Backward connections are scheduled differently |
| | 0 | 196 | | hasScheduledActivity |= await MaybeScheduleBackwardConnectionActivityAsync(flowGraph, flowchartContext, |
| | | 197 | | else |
| | 126 | 198 | | hasScheduledActivity |= await MaybeScheduleOutboundActivityAsync(flowGraph, flowScope, flowchartContext, |
| | | 199 | | } |
| | | 200 | | |
| | 145 | 201 | | return hasScheduledActivity; |
| | 145 | 202 | | } |
| | | 203 | | |
| | | 204 | | /// <summary> |
| | | 205 | | /// Schedules an outbound activity that originates from a backward connection. |
| | | 206 | | /// </summary> |
| | | 207 | | private static async ValueTask<bool> MaybeScheduleBackwardConnectionActivityAsync(FlowGraph flowGraph, ActivityExecu |
| | | 208 | | { |
| | 0 | 209 | | if (!connectionFollowed) |
| | | 210 | | { |
| | 0 | 211 | | return false; |
| | | 212 | | } |
| | | 213 | | |
| | 0 | 214 | | if (!backwardConnectionIsValid) |
| | | 215 | | { |
| | 0 | 216 | | throw new($"Invalid backward connection: Every path from the source ('{outboundConnection.Source.Activity.Id |
| | | 217 | | } |
| | | 218 | | |
| | 0 | 219 | | var scheduleWorkOptions = new ScheduleWorkOptions |
| | 0 | 220 | | { |
| | 0 | 221 | | CompletionCallback = completionCallback, |
| | 0 | 222 | | Input = new Dictionary<string, object>() { { BackwardConnectionActivityInput, true } } |
| | 0 | 223 | | }; |
| | | 224 | | |
| | 0 | 225 | | await flowchartContext.ScheduleActivityAsync(outboundActivity, scheduleWorkOptions); |
| | 0 | 226 | | return true; |
| | 0 | 227 | | } |
| | | 228 | | |
| | | 229 | | /// <summary> |
| | | 230 | | /// Determines the merge mode for a given outbound activity. If the outbound activity is a FlowJoin, it retrieves it |
| | | 231 | | /// mode. Otherwise, it defaults to FlowJoinMode.WaitAllActive for implicit joins. |
| | | 232 | | /// </summary> |
| | | 233 | | private static async ValueTask<FlowJoinMode> GetMergeModeAsync(ActivityExecutionContext flowchartContext, IActivity |
| | | 234 | | { |
| | 126 | 235 | | if (outboundActivity is FlowJoin) |
| | | 236 | | { |
| | 24 | 237 | | var outboundActivityExecutionContext = await flowchartContext.WorkflowExecutionContext.CreateActivityExecuti |
| | 24 | 238 | | return await outboundActivityExecutionContext.EvaluateInputPropertyAsync<FlowJoin, FlowJoinMode>(x => x.Mode |
| | | 239 | | } |
| | | 240 | | else |
| | | 241 | | { |
| | | 242 | | // Implicit join case - treat as WaitAllActive |
| | 102 | 243 | | return FlowJoinMode.WaitAllActive; |
| | | 244 | | } |
| | 126 | 245 | | } |
| | | 246 | | |
| | | 247 | | /// <summary> |
| | | 248 | | /// Schedules a join activity based on inbound connection statuses. |
| | | 249 | | /// </summary> |
| | | 250 | | private static async ValueTask<bool> MaybeScheduleOutboundActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Ac |
| | | 251 | | { |
| | 126 | 252 | | FlowJoinMode mode = await GetMergeModeAsync(flowchartContext, outboundActivity); |
| | | 253 | | |
| | 126 | 254 | | return mode switch |
| | 126 | 255 | | { |
| | 15 | 256 | | FlowJoinMode.WaitAll => await MaybeScheduleWaitAllActivityAsync(flowGraph, flowScope, flowchartContext, outb |
| | 102 | 257 | | FlowJoinMode.WaitAllActive => await MaybeScheduleWaitAllActiveActivityAsync(flowGraph, flowScope, flowchartC |
| | 9 | 258 | | FlowJoinMode.WaitAny => await MaybeScheduleWaitAnyActivityAsync(flowGraph, flowScope, flowchartContext, outb |
| | 0 | 259 | | _ => throw new($"Unsupported FlowJoinMode: {mode}"), |
| | 126 | 260 | | }; |
| | 126 | 261 | | } |
| | | 262 | | |
| | | 263 | | /// <summary> |
| | | 264 | | /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAll behavior. |
| | | 265 | | /// If all inbound connections were visited, it checks if they were all followed to decide whether to schedule or sk |
| | | 266 | | /// </summary> |
| | | 267 | | private static async ValueTask<bool> MaybeScheduleWaitAllActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Act |
| | | 268 | | { |
| | 15 | 269 | | if (!flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity)) |
| | | 270 | | // Not all inbound connections have been visited yet; do not schedule anything yet. |
| | 7 | 271 | | return false; |
| | | 272 | | |
| | 8 | 273 | | if (flowScope.AllInboundConnectionsFollowed(flowGraph, outboundActivity)) |
| | | 274 | | // All inbound connections were followed; schedule the outbound activity. |
| | 8 | 275 | | return await ScheduleOutboundActivityAsync(flowchartContext, outboundActivity, completionCallback); |
| | | 276 | | else |
| | | 277 | | // No inbound connections were followed; skip the outbound activity. |
| | 0 | 278 | | return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, outboundActivity, completionC |
| | 15 | 279 | | } |
| | | 280 | | |
| | | 281 | | /// <summary> |
| | | 282 | | /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAllActive behavior. |
| | | 283 | | /// If all inbound connections have been visited, it checks if any were followed to decide whether to schedule or sk |
| | | 284 | | /// </summary> |
| | | 285 | | private static async ValueTask<bool> MaybeScheduleWaitAllActiveActivityAsync(FlowGraph flowGraph, FlowScope flowScop |
| | | 286 | | { |
| | 102 | 287 | | if (!flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity)) |
| | | 288 | | // Not all inbound connections have been visited yet; do not schedule anything yet. |
| | 2 | 289 | | return false; |
| | | 290 | | |
| | 100 | 291 | | if (flowScope.AnyInboundConnectionsFollowed(flowGraph, outboundActivity)) |
| | | 292 | | // At least one inbound connection was followed; schedule the outbound activity. |
| | 84 | 293 | | return await ScheduleOutboundActivityAsync(flowchartContext, outboundActivity, completionCallback); |
| | | 294 | | else |
| | | 295 | | // No inbound connections were followed; skip the outbound activity. |
| | 16 | 296 | | return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, outboundActivity, completionC |
| | 102 | 297 | | } |
| | | 298 | | |
| | | 299 | | /// <summary> |
| | | 300 | | /// Determines whether to schedule an activity based on the FlowJoinMode.WaitAny behavior. |
| | | 301 | | /// If any inbound connection has been followed, it schedules the activity and cancels remaining inbound activities. |
| | | 302 | | /// If a subsequent inbound connection is followed after the activity has been scheduled, it ignores it. |
| | | 303 | | /// </summary> |
| | | 304 | | private static async ValueTask<bool> MaybeScheduleWaitAnyActivityAsync(FlowGraph flowGraph, FlowScope flowScope, Act |
| | | 305 | | { |
| | 9 | 306 | | if (flowScope.ShouldIgnoreConnection(outboundConnection, outboundActivity)) |
| | | 307 | | // Ignore the connection if the outbound activity has already completed (JoinAny scenario) |
| | 0 | 308 | | return false; |
| | | 309 | | |
| | 17 | 310 | | if (flowchartContext.WorkflowExecutionContext.Scheduler.List().Any(workItem => workItem.Owner == flowchartContex |
| | | 311 | | // Ignore the connection if the outbound activity is already scheduled |
| | 4 | 312 | | return false; |
| | | 313 | | |
| | 5 | 314 | | if (flowScope.AnyInboundConnectionsFollowed(flowGraph, outboundActivity)) |
| | | 315 | | { |
| | | 316 | | // An inbound connection has been followed; cancel remaining inbound activities |
| | 5 | 317 | | await CancelRemainingInboundActivitiesAsync(flowchartContext, outboundActivity); |
| | | 318 | | |
| | | 319 | | // This is the first inbound connection followed; schedule the outbound activity |
| | 5 | 320 | | return await ScheduleOutboundActivityAsync(flowchartContext, outboundActivity, completionCallback); |
| | | 321 | | } |
| | | 322 | | |
| | 0 | 323 | | if (flowScope.AllInboundConnectionsVisited(flowGraph, outboundActivity)) |
| | | 324 | | // All inbound connections have been visited without any being followed; skip the outbound activity |
| | 0 | 325 | | return await SkipOutboundActivityAsync(flowGraph, flowScope, flowchartContext, outboundActivity, completionC |
| | | 326 | | |
| | | 327 | | // No inbound connections have been followed yet; do not schedule anything yet. |
| | 0 | 328 | | return false; |
| | 9 | 329 | | } |
| | | 330 | | |
| | | 331 | | /// <summary> |
| | | 332 | | /// Schedules the outbound activity. |
| | | 333 | | /// </summary> |
| | | 334 | | private static async ValueTask<bool> ScheduleOutboundActivityAsync(ActivityExecutionContext flowchartContext, IActiv |
| | | 335 | | { |
| | 97 | 336 | | await flowchartContext.ScheduleActivityAsync(outboundActivity, completionCallback); |
| | 97 | 337 | | return true; |
| | 97 | 338 | | } |
| | | 339 | | |
| | | 340 | | /// <summary> |
| | | 341 | | /// Skips the outbound activity by propagating skipped connections. |
| | | 342 | | /// </summary> |
| | | 343 | | private static async ValueTask<bool> SkipOutboundActivityAsync(FlowGraph flowGraph, FlowScope flowScope, ActivityExe |
| | | 344 | | { |
| | 16 | 345 | | return await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, outboundActivity, Outc |
| | 16 | 346 | | } |
| | | 347 | | |
| | | 348 | | private static async ValueTask CancelRemainingInboundActivitiesAsync(ActivityExecutionContext flowchartContext, IAct |
| | | 349 | | { |
| | 5 | 350 | | var flowchart = (Flowchart)flowchartContext.Activity; |
| | 5 | 351 | | var flowGraph = flowchart.GetFlowGraph(flowchartContext); |
| | 5 | 352 | | var ancestorActivities = flowGraph.GetAncestorActivities(outboundActivity); |
| | 32 | 353 | | var inboundActivityExecutionContexts = flowchartContext.WorkflowExecutionContext.ActivityExecutionContexts.Where |
| | | 354 | | |
| | | 355 | | // Cancel each ancestor activity. |
| | 44 | 356 | | foreach (var activityExecutionContext in inboundActivityExecutionContexts) |
| | | 357 | | { |
| | 17 | 358 | | await activityExecutionContext.CancelActivityAsync(); |
| | | 359 | | } |
| | 5 | 360 | | } |
| | | 361 | | |
| | | 362 | | private async ValueTask OnScheduleOutcomesAsync(ScheduleActivityOutcomes signal, SignalContext context) |
| | | 363 | | { |
| | 0 | 364 | | var flowchartContext = context.ReceiverActivityExecutionContext; |
| | 0 | 365 | | var schedulingActivityContext = context.SenderActivityExecutionContext; |
| | 0 | 366 | | var schedulingActivity = schedulingActivityContext.Activity; |
| | 0 | 367 | | var outcomes = new Outcomes(signal.Outcomes); |
| | 0 | 368 | | } |
| | | 369 | | |
| | | 370 | | private async ValueTask OnCounterFlowActivityCanceledAsync(CancelSignal signal, SignalContext context) |
| | | 371 | | { |
| | 0 | 372 | | var flowchartContext = context.ReceiverActivityExecutionContext; |
| | 0 | 373 | | await CompleteIfNoPendingWorkAsync(flowchartContext); |
| | 0 | 374 | | var flowchart = (Flowchart)flowchartContext.Activity; |
| | 0 | 375 | | var flowGraph = flowchartContext.GetFlowGraph(); |
| | 0 | 376 | | var flowScope = flowchart.GetFlowScope(flowchartContext); |
| | | 377 | | |
| | | 378 | | // Propagate canceled connections visited count by scheduling with Outcomes.Empty |
| | 0 | 379 | | await MaybeScheduleOutboundActivitiesAsync(flowGraph, flowScope, flowchartContext, context.SenderActivityExecuti |
| | 0 | 380 | | } |
| | | 381 | | } |