| | | 1 | | using Elsa.Common; |
| | | 2 | | |
| | | 3 | | namespace Elsa.Workflows.Runtime; |
| | | 4 | | |
| | | 5 | | /// <summary> |
| | | 6 | | /// Tracks a single in-flight execution cycle of the workflow runtime. Created when a cycle starts, disposed when |
| | | 7 | | /// it completes or is force-cancelled during drain. Active-cycle accounting is done through |
| | | 8 | | /// <see cref="IExecutionCycleRegistry"/>. |
| | | 9 | | /// </summary> |
| | | 10 | | public sealed class ExecutionCycleHandle : IDisposable |
| | | 11 | | { |
| | | 12 | | private readonly CancellationTokenSource _cycleCts; |
| | | 13 | | private readonly Action<ExecutionCycleHandle>? _onDisposed; |
| | | 14 | | private readonly Action? _cancelCallback; |
| | 570 | 15 | | private readonly TaskCompletionSource _disposedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); |
| | | 16 | | private int _cancelled; |
| | | 17 | | private int _disposed; |
| | | 18 | | |
| | | 19 | | /// <summary> |
| | | 20 | | /// Creates a new handle. The owning <see cref="IExecutionCycleRegistry"/> supplies <paramref name="onDisposed"/> |
| | | 21 | | /// so it can decrement its active count. The optional <paramref name="cancelCallback"/> is invoked by |
| | | 22 | | /// <see cref="Cancel"/> to propagate cancellation into the workflow execution itself (e.g., |
| | | 23 | | /// <c>WorkflowExecutionContext.Cancel()</c>) so that the running cycle stops scheduling new activities. Without it, |
| | | 24 | | /// <see cref="Cancel"/> only signals the cycle's own <see cref="CancellationToken"/>, which is consumed by the |
| | | 25 | | /// execution-cycle registry but does NOT propagate to the workflow runner's pipeline (the runner reads from |
| | | 26 | | /// <c>WorkflowExecutionContext.CancellationToken</c>, which is captured at context construction and is not part |
| | | 27 | | /// of this linked chain). |
| | | 28 | | /// </summary> |
| | 570 | 29 | | public ExecutionCycleHandle( |
| | 570 | 30 | | Guid id, |
| | 570 | 31 | | string workflowInstanceId, |
| | 570 | 32 | | string? ingressSourceName, |
| | 570 | 33 | | DateTimeOffset startedAt, |
| | 570 | 34 | | CancellationToken linkedToken, |
| | 570 | 35 | | Action<ExecutionCycleHandle>? onDisposed = null, |
| | 570 | 36 | | Action? cancelCallback = null) |
| | | 37 | | { |
| | 570 | 38 | | Id = id; |
| | 570 | 39 | | WorkflowInstanceId = workflowInstanceId; |
| | 570 | 40 | | IngressSourceName = ingressSourceName; |
| | 570 | 41 | | StartedAt = startedAt; |
| | 570 | 42 | | _cycleCts = CancellationTokenSource.CreateLinkedTokenSource(linkedToken); |
| | 570 | 43 | | _onDisposed = onDisposed; |
| | 570 | 44 | | _cancelCallback = cancelCallback; |
| | 570 | 45 | | } |
| | | 46 | | |
| | | 47 | | /// <summary>Unique identifier for this execution cycle within the current runtime generation.</summary> |
| | 1126 | 48 | | public Guid Id { get; } |
| | | 49 | | |
| | | 50 | | /// <summary>Workflow instance whose execution this cycle is driving.</summary> |
| | 8 | 51 | | public string WorkflowInstanceId { get; } |
| | | 52 | | |
| | | 53 | | /// <summary> |
| | | 54 | | /// Name of the ingress source that initiated this cycle, when attribution is available (null for direct API invocat |
| | | 55 | | /// </summary> |
| | 3 | 56 | | public string? IngressSourceName { get; } |
| | | 57 | | |
| | | 58 | | /// <summary>When the cycle started.</summary> |
| | 3 | 59 | | public DateTimeOffset StartedAt { get; } |
| | | 60 | | |
| | | 61 | | /// <summary>Cancellation token passed into the workflow execution pipeline for this cycle.</summary> |
| | 4 | 62 | | public CancellationToken CancellationToken => _cycleCts.Token; |
| | | 63 | | |
| | | 64 | | /// <summary> |
| | | 65 | | /// Completes when <see cref="Dispose"/> runs — i.e., when the workflow runner finishes the cycle (cleanly or |
| | | 66 | | /// via cancellation) and the middleware exits its <c>using</c> block. The drain orchestrator awaits this with |
| | | 67 | | /// a timeout before persisting <see cref="WorkflowSubStatus.Interrupted"/>, ensuring its write happens AFTER |
| | | 68 | | /// any commit the runner emits in response to <see cref="Cancel"/>. |
| | | 69 | | /// </summary> |
| | 6 | 70 | | public Task Disposed => _disposedTcs.Task; |
| | | 71 | | |
| | | 72 | | /// <summary> |
| | | 73 | | /// Cancels the cycle — used by the drain orchestrator on deadline breach or operator force. |
| | | 74 | | /// Invokes the cancel callback (when supplied at construction) to propagate cancellation into the workflow |
| | | 75 | | /// execution, then cancels the cycle's own linked CTS. Safe to call multiple times; idempotent. |
| | | 76 | | /// </summary> |
| | | 77 | | public void Cancel() |
| | | 78 | | { |
| | 9 | 79 | | if (_disposed != 0) return; |
| | | 80 | | // Idempotent guard: ensures the cancel callback and CTS cancellation run AT MOST once even if Cancel() is |
| | | 81 | | // called repeatedly before disposal. Without this the drain orchestrator (or any other future caller) could |
| | | 82 | | // accidentally trigger a non-idempotent cancellation side effect multiple times. |
| | 8 | 83 | | if (Interlocked.Exchange(ref _cancelled, 1) != 0) return; |
| | | 84 | | |
| | | 85 | | // Propagate to the workflow execution first (this typically marks the workflow as Cancelled and clears its |
| | | 86 | | // schedule, so the runner stops scheduling new activities). The orchestrator's subsequent Interrupted |
| | | 87 | | // persistence then overrides the Cancelled sub-status — see Disposed-await sequencing in DrainOrchestrator. |
| | 11 | 88 | | try { _cancelCallback?.Invoke(); } |
| | 2 | 89 | | catch (Exception ex) when (!ex.IsFatal()) { /* Cancellation is best-effort; non-fatal failures here must not bre |
| | | 90 | | |
| | 12 | 91 | | try { _cycleCts.Cancel(); } |
| | 0 | 92 | | catch (ObjectDisposedException) { /* Race with Dispose — acceptable. */ } |
| | 6 | 93 | | } |
| | | 94 | | |
| | | 95 | | /// <summary>Releases the linked CTS, notifies the registry, and signals <see cref="Disposed"/>.</summary> |
| | | 96 | | public void Dispose() |
| | | 97 | | { |
| | 558 | 98 | | if (Interlocked.Exchange(ref _disposed, 1) != 0) return; |
| | 556 | 99 | | _onDisposed?.Invoke(this); |
| | 556 | 100 | | _cycleCts.Dispose(); |
| | 556 | 101 | | _disposedTcs.TrySetResult(); |
| | 556 | 102 | | } |
| | | 103 | | } |