| | | 1 | | using System.Diagnostics; |
| | | 2 | | using Elsa.Common; |
| | | 3 | | using Elsa.Workflows.Management; |
| | | 4 | | using Elsa.Workflows.Management.Filters; |
| | | 5 | | using Elsa.Workflows.Runtime.Options; |
| | | 6 | | using Microsoft.Extensions.DependencyInjection; |
| | | 7 | | using Microsoft.Extensions.Hosting; |
| | | 8 | | using Microsoft.Extensions.Logging; |
| | | 9 | | using Microsoft.Extensions.Options; |
| | | 10 | | |
| | | 11 | | namespace Elsa.Workflows.Runtime.Services; |
| | | 12 | | |
| | | 13 | | /// <summary> |
| | | 14 | | /// Default implementation of <see cref="IDrainOrchestrator"/>. |
| | | 15 | | /// </summary> |
| | | 16 | | /// <remarks> |
| | | 17 | | /// <para> |
| | | 18 | | /// Protocol per <c>contracts/drain-orchestrator.md</c>: |
| | | 19 | | /// 1. Enter drain via <see cref="IQuiescenceSignal.BeginDrainAsync"/>. |
| | | 20 | | /// 2. Pause every registered ingress source in parallel, each bounded by its own per-source timeout. Sources that |
| | | 21 | | /// fail to pause are recorded as <see cref="IngressSourceState.PauseFailed"/> and, if they implement |
| | | 22 | | /// <see cref="IForceStoppable"/>, are escalated. |
| | | 23 | | /// 3. Wait for <see cref="IExecutionCycleRegistry.ActiveCount"/> to reach zero, polling on a short interval. |
| | | 24 | | /// 4. On deadline breach (or on operator force, where deadline is zero), iterate live cycles, cancel each, |
| | | 25 | | /// persist the corresponding instance in <see cref="WorkflowSubStatus.Interrupted"/>, and write a |
| | | 26 | | /// <c>WorkflowInterrupted</c> entry in the per-instance execution log. |
| | | 27 | | /// 5. Return a <see cref="DrainOutcome"/>. |
| | | 28 | | /// </para> |
| | | 29 | | /// <para> |
| | | 30 | | /// All exceptions inside the protocol are captured into the outcome — only an <see cref="InvalidOperationException"/> |
| | | 31 | | /// thrown by a second non-force invocation propagates. |
| | | 32 | | /// </para> |
| | | 33 | | /// </remarks> |
| | | 34 | | public sealed class DrainOrchestrator : IDrainOrchestrator |
| | | 35 | | { |
| | 3 | 36 | | private static readonly TimeSpan SafetyEpsilon = TimeSpan.FromMilliseconds(500); |
| | 3 | 37 | | private static readonly TimeSpan PollInterval = TimeSpan.FromMilliseconds(10); |
| | | 38 | | |
| | | 39 | | private readonly IQuiescenceSignal _signal; |
| | | 40 | | private readonly IIngressSourceRegistry _registry; |
| | | 41 | | private readonly IExecutionCycleRegistry _cycles; |
| | | 42 | | private readonly IServiceScopeFactory _scopeFactory; |
| | | 43 | | private readonly IOptions<GracefulShutdownOptions> _options; |
| | | 44 | | private readonly IOptions<HostOptions> _hostOptions; |
| | | 45 | | private readonly ISystemClock _clock; |
| | | 46 | | private readonly IIdentityGenerator _identityGenerator; |
| | | 47 | | private readonly ILogger<DrainOrchestrator> _logger; |
| | | 48 | | |
| | 22 | 49 | | private readonly object _sync = new(); |
| | | 50 | | private DrainOutcome? _previousOutcome; |
| | | 51 | | private bool _drainInProgress; |
| | | 52 | | |
| | 22 | 53 | | public DrainOrchestrator( |
| | 22 | 54 | | IQuiescenceSignal signal, |
| | 22 | 55 | | IIngressSourceRegistry registry, |
| | 22 | 56 | | IExecutionCycleRegistry cycles, |
| | 22 | 57 | | IServiceScopeFactory scopeFactory, |
| | 22 | 58 | | IOptions<GracefulShutdownOptions> options, |
| | 22 | 59 | | IOptions<HostOptions> hostOptions, |
| | 22 | 60 | | ISystemClock clock, |
| | 22 | 61 | | IIdentityGenerator identityGenerator, |
| | 22 | 62 | | ILogger<DrainOrchestrator> logger) |
| | | 63 | | { |
| | 22 | 64 | | _signal = signal; |
| | 22 | 65 | | _registry = registry; |
| | 22 | 66 | | _cycles = cycles; |
| | 22 | 67 | | _scopeFactory = scopeFactory; |
| | 22 | 68 | | _options = options; |
| | 22 | 69 | | _hostOptions = hostOptions; |
| | 22 | 70 | | _clock = clock; |
| | 22 | 71 | | _identityGenerator = identityGenerator; |
| | 22 | 72 | | _logger = logger; |
| | 22 | 73 | | } |
| | | 74 | | |
| | | 75 | | /// <inheritdoc /> |
| | | 76 | | public async ValueTask<DrainOutcome> DrainAsync(DrainTrigger trigger, CancellationToken cancellationToken = default) |
| | | 77 | | { |
| | 26 | 78 | | lock (_sync) |
| | | 79 | | { |
| | 26 | 80 | | if (_drainInProgress) |
| | | 81 | | { |
| | 3 | 82 | | if (trigger == DrainTrigger.OperatorForce && _previousOutcome is not null) |
| | 0 | 83 | | return _previousOutcome with { WasCached = true }; |
| | 3 | 84 | | throw new InvalidOperationException($"Drain already in progress; second invocation rejected (trigger={tr |
| | | 85 | | } |
| | 23 | 86 | | if (_previousOutcome is not null) |
| | | 87 | | { |
| | 3 | 88 | | if (trigger == DrainTrigger.OperatorForce) return _previousOutcome with { WasCached = true }; |
| | 1 | 89 | | throw new InvalidOperationException("Drain already completed in this generation; subsequent non-force in |
| | | 90 | | } |
| | 21 | 91 | | _drainInProgress = true; |
| | 21 | 92 | | } |
| | | 93 | | |
| | 21 | 94 | | var startedAt = _clock.UtcNow; |
| | 21 | 95 | | var deadline = ComputeEffectiveDeadline(trigger); |
| | 21 | 96 | | var sw = Stopwatch.StartNew(); |
| | 21 | 97 | | TimeSpan pausePhase = TimeSpan.Zero; |
| | 21 | 98 | | TimeSpan waitPhase = TimeSpan.Zero; |
| | | 99 | | |
| | | 100 | | try |
| | | 101 | | { |
| | 21 | 102 | | await _signal.BeginDrainAsync(cancellationToken); |
| | 21 | 103 | | _logger.LogInformation("Drain initiated (trigger={Trigger}, deadline={Deadline}).", trigger, deadline); |
| | | 104 | | |
| | 21 | 105 | | var deadlineAt = startedAt + deadline; |
| | | 106 | | |
| | | 107 | | // Phase 1: parallel pause. Each source has its own timeout independent of the overall deadline, |
| | | 108 | | // but force-stop fallbacks are clamped to the *remaining* overall budget (deadlineAt - now), not |
| | | 109 | | // the full TimeSpan, so a slow per-source pause can't extend the total shutdown window. |
| | 21 | 110 | | await PauseAllSourcesAsync(deadlineAt, cancellationToken); |
| | 21 | 111 | | pausePhase = sw.Elapsed; |
| | 21 | 112 | | _logger.LogInformation("Ingress pause phase complete in {Elapsed}.", pausePhase); |
| | | 113 | | |
| | | 114 | | // Phase 2: wait for active execution cycles to drain. |
| | 21 | 115 | | var waitStart = sw.Elapsed; |
| | 21 | 116 | | var deadlineBreach = !await WaitForCyclesAsync(deadlineAt, cancellationToken); |
| | 21 | 117 | | waitPhase = sw.Elapsed - waitStart; |
| | | 118 | | |
| | | 119 | | // Phase 3: outcome assembly. |
| | 21 | 120 | | int forceCancelled = 0; |
| | 21 | 121 | | IReadOnlyList<string> forceCancelledIds = Array.Empty<string>(); |
| | | 122 | | |
| | 21 | 123 | | if (deadlineBreach || trigger == DrainTrigger.OperatorForce) |
| | | 124 | | { |
| | 4 | 125 | | _logger.LogWarning("Drain {What}; force-cancelling {Count} active execution cycle(s).", |
| | 4 | 126 | | deadlineBreach ? "deadline exceeded" : "operator-forced", _cycles.ActiveCount); |
| | 4 | 127 | | (forceCancelled, forceCancelledIds) = await ForceCancelActiveCyclesAsync(trigger, _signal.CurrentState.G |
| | | 128 | | } |
| | | 129 | | |
| | | 130 | | // OperatorForce always reports Forced regardless of zero-deadline breach mechanics. |
| | 20 | 131 | | var result = trigger switch |
| | 20 | 132 | | { |
| | 2 | 133 | | DrainTrigger.OperatorForce => DrainResult.Forced, |
| | 19 | 134 | | _ when deadlineBreach => DrainResult.DeadlineExceeded, |
| | 17 | 135 | | _ => DrainResult.CompletedWithinDeadline, |
| | 20 | 136 | | }; |
| | | 137 | | |
| | 20 | 138 | | var outcome = new DrainOutcome( |
| | 20 | 139 | | OverallResult: result, |
| | 20 | 140 | | StartedAt: startedAt, |
| | 20 | 141 | | CompletedAt: _clock.UtcNow, |
| | 20 | 142 | | PausePhaseDuration: pausePhase, |
| | 20 | 143 | | WaitPhaseDuration: waitPhase, |
| | 20 | 144 | | Sources: BuildSourceFinalStates(), |
| | 20 | 145 | | ExecutionCyclesForceCancelledCount: forceCancelled, |
| | 20 | 146 | | ForceCancelledInstanceIds: forceCancelledIds); |
| | | 147 | | |
| | 40 | 148 | | lock (_sync) _previousOutcome = outcome; |
| | 20 | 149 | | _logger.LogInformation("Drain completed: {Result} (paused={Paused}, waited={Waited}, forceCancelled={ForceCa |
| | 20 | 150 | | outcome.OverallResult, outcome.PausePhaseDuration, outcome.WaitPhaseDuration, outcome.ExecutionCyclesFor |
| | 20 | 151 | | return outcome; |
| | | 152 | | } |
| | 1 | 153 | | catch (Exception ex) when (!ex.IsFatal()) |
| | | 154 | | { |
| | | 155 | | // The "drain already in progress / completed" InvalidOperationExceptions are thrown OUTSIDE this |
| | | 156 | | // try block (during the lock-protected setup), so they bubble out without triggering this handler. |
| | | 157 | | // Any IOE that lands here is incidental — from a store/dispatcher inside the protocol — and should |
| | | 158 | | // be captured into the outcome rather than escaping the whole drain. |
| | 1 | 159 | | _logger.LogError(ex, "Drain aborted by unhandled exception."); |
| | 1 | 160 | | var outcome = new DrainOutcome( |
| | 1 | 161 | | OverallResult: DrainResult.AbortedByUnhandledException, |
| | 1 | 162 | | StartedAt: startedAt, |
| | 1 | 163 | | CompletedAt: _clock.UtcNow, |
| | 1 | 164 | | PausePhaseDuration: pausePhase, |
| | 1 | 165 | | WaitPhaseDuration: waitPhase, |
| | 1 | 166 | | Sources: BuildSourceFinalStates(), |
| | 1 | 167 | | ExecutionCyclesForceCancelledCount: 0, |
| | 1 | 168 | | ForceCancelledInstanceIds: Array.Empty<string>()); |
| | 2 | 169 | | lock (_sync) _previousOutcome = outcome; |
| | 1 | 170 | | return outcome; |
| | | 171 | | } |
| | | 172 | | finally |
| | | 173 | | { |
| | 42 | 174 | | lock (_sync) _drainInProgress = false; |
| | | 175 | | } |
| | 22 | 176 | | } |
| | | 177 | | |
| | | 178 | | private TimeSpan ComputeEffectiveDeadline(DrainTrigger trigger) |
| | | 179 | | { |
| | 23 | 180 | | if (trigger == DrainTrigger.OperatorForce) return TimeSpan.Zero; |
| | | 181 | | |
| | 19 | 182 | | var configured = _options.Value.DrainDeadline; |
| | 19 | 183 | | var hostShutdown = _hostOptions.Value.ShutdownTimeout; |
| | | 184 | | |
| | | 185 | | // Clamp to host's own shutdown budget minus a safety epsilon, so persistence can finish before the process is k |
| | 19 | 186 | | if (hostShutdown > SafetyEpsilon) |
| | | 187 | | { |
| | 19 | 188 | | var hostBudget = hostShutdown - SafetyEpsilon; |
| | 26 | 189 | | if (hostBudget < configured) return hostBudget; |
| | | 190 | | } |
| | | 191 | | |
| | 12 | 192 | | return configured; |
| | | 193 | | } |
| | | 194 | | |
| | | 195 | | private async Task PauseAllSourcesAsync(DateTimeOffset deadlineAt, CancellationToken cancellationToken) |
| | | 196 | | { |
| | 21 | 197 | | var sources = _registry.Sources; |
| | 27 | 198 | | if (sources.Count == 0) return; |
| | | 199 | | |
| | 41 | 200 | | var pauseTasks = sources.Select(source => PauseOneSourceAsync(source, deadlineAt, cancellationToken)).ToArray(); |
| | 15 | 201 | | await Task.WhenAll(pauseTasks); |
| | 21 | 202 | | } |
| | | 203 | | |
| | | 204 | | private async Task PauseOneSourceAsync(IIngressSource source, DateTimeOffset deadlineAt, CancellationToken cancellat |
| | | 205 | | { |
| | | 206 | | // Precedence: a positive per-source PauseTimeout wins; Zero (or negative) defers to the |
| | | 207 | | // configured GracefulShutdownOptions.IngressPauseTimeout. The resolved value is then |
| | | 208 | | // clamped to the overall remaining budget, with a 1 ms floor so a misconfigured zero |
| | | 209 | | // configured-default still produces a non-zero CancelAfter. |
| | 26 | 210 | | var configured = _options.Value.IngressPauseTimeout; |
| | 26 | 211 | | var sourceDeadline = source.PauseTimeout > TimeSpan.Zero ? source.PauseTimeout : configured; |
| | 26 | 212 | | var overallRemaining = deadlineAt - _clock.UtcNow; |
| | 30 | 213 | | if (sourceDeadline > overallRemaining) sourceDeadline = overallRemaining; |
| | 26 | 214 | | if (sourceDeadline <= TimeSpan.Zero) sourceDeadline = TimeSpan.FromMilliseconds(1); |
| | | 215 | | |
| | 26 | 216 | | _registry.RecordTransition(source.Name, IngressSourceState.Pausing); |
| | 26 | 217 | | using var perSourceCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); |
| | 26 | 218 | | perSourceCts.CancelAfter(sourceDeadline); |
| | | 219 | | |
| | | 220 | | try |
| | | 221 | | { |
| | 26 | 222 | | await source.PauseAsync(perSourceCts.Token).AsTask().WaitAsync(perSourceCts.Token); |
| | 22 | 223 | | _registry.RecordTransition(source.Name, IngressSourceState.Paused); |
| | 22 | 224 | | } |
| | 2 | 225 | | catch (OperationCanceledException) when (perSourceCts.IsCancellationRequested && !cancellationToken.IsCancellati |
| | | 226 | | { |
| | 2 | 227 | | await _registry.MarkPauseFailedAsync(source.Name, "timeout", new TimeoutException($"Source '{source.Name}' d |
| | 2 | 228 | | await TryForceStopAsync(source, deadlineAt, cancellationToken); |
| | 2 | 229 | | } |
| | 2 | 230 | | catch (Exception ex) when (!ex.IsFatal()) |
| | | 231 | | { |
| | 2 | 232 | | await _registry.MarkPauseFailedAsync(source.Name, "exception", ex); |
| | 2 | 233 | | await TryForceStopAsync(source, deadlineAt, cancellationToken); |
| | | 234 | | } |
| | 26 | 235 | | } |
| | | 236 | | |
| | | 237 | | private async Task TryForceStopAsync(IIngressSource source, DateTimeOffset deadlineAt, CancellationToken cancellatio |
| | | 238 | | { |
| | 7 | 239 | | if (source is not IForceStoppable forceStoppable) return; |
| | | 240 | | |
| | | 241 | | // Use the *remaining* budget — not the full overall deadline — so a per-source pause that already |
| | | 242 | | // burned most of the window can't get another full deadline's worth of force-stop runway. |
| | 1 | 243 | | var remaining = deadlineAt - _clock.UtcNow; |
| | 1 | 244 | | if (remaining <= TimeSpan.Zero) |
| | | 245 | | { |
| | 0 | 246 | | _logger.LogWarning("Skipping force-stop of ingress source '{Name}': overall drain deadline already exceeded. |
| | 0 | 247 | | return; |
| | | 248 | | } |
| | | 249 | | |
| | | 250 | | try |
| | | 251 | | { |
| | 1 | 252 | | using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); |
| | 1 | 253 | | cts.CancelAfter(remaining); |
| | 1 | 254 | | await forceStoppable.ForceStopAsync(cts.Token); |
| | 1 | 255 | | _logger.LogInformation("Force-stopped ingress source '{Name}'.", source.Name); |
| | 1 | 256 | | } |
| | 0 | 257 | | catch (Exception ex) when (!ex.IsFatal()) |
| | | 258 | | { |
| | 0 | 259 | | _logger.LogWarning(ex, "Force-stop of ingress source '{Name}' failed; drain continues.", source.Name); |
| | 0 | 260 | | } |
| | 4 | 261 | | } |
| | | 262 | | |
| | | 263 | | private async Task<bool> WaitForCyclesAsync(DateTimeOffset deadlineAt, CancellationToken cancellationToken) |
| | | 264 | | { |
| | 2926 | 265 | | while (_cycles.ActiveCount > 0) |
| | | 266 | | { |
| | 2913 | 267 | | if (_clock.UtcNow >= deadlineAt) return false; |
| | 5810 | 268 | | try { await Task.Delay(PollInterval, cancellationToken); } |
| | 0 | 269 | | catch (OperationCanceledException) { return _cycles.ActiveCount == 0; } |
| | | 270 | | } |
| | 17 | 271 | | return true; |
| | 21 | 272 | | } |
| | | 273 | | |
| | | 274 | | /// <summary>Bound on how long the orchestrator waits for a runner to settle after invoking execution cycle cancella |
| | 3 | 275 | | private static readonly TimeSpan ForceCancelSettleTimeout = TimeSpan.FromSeconds(2); |
| | | 276 | | |
| | | 277 | | /// <summary> |
| | | 278 | | /// Per-handle bound on the forensic Interrupted persist write in Phase C. Decoupled from the drain |
| | | 279 | | /// cancellation token: when the host's drain CT has already fired, the persist still has up to this long |
| | | 280 | | /// to land the row + log entry. Keeps "proceed to persist anyway" honest while preventing a stuck DB |
| | | 281 | | /// from hanging shutdown indefinitely. |
| | | 282 | | /// </summary> |
| | 3 | 283 | | private static readonly TimeSpan PersistInterruptedTimeout = TimeSpan.FromSeconds(5); |
| | | 284 | | |
| | | 285 | | private async Task<(int Count, IReadOnlyList<string> Ids)> ForceCancelActiveCyclesAsync(DrainTrigger trigger, string |
| | | 286 | | { |
| | 4 | 287 | | var cap = _options.Value.MaxForceCancelledInstanceIdsReported; |
| | 4 | 288 | | var reportedIds = new List<string>(capacity: Math.Min(cap, 16)); |
| | 4 | 289 | | var totalCancelled = 0; |
| | | 290 | | |
| | 4 | 291 | | var live = _cycles.ListActiveCycles(); |
| | 4 | 292 | | if (live.Count == 0) return (0, Array.Empty<string>()); |
| | | 293 | | |
| | 4 | 294 | | using var scope = _scopeFactory.CreateScope(); |
| | 3 | 295 | | var instanceStore = scope.ServiceProvider.GetRequiredService<IWorkflowInstanceStore>(); |
| | 3 | 296 | | var logStore = scope.ServiceProvider.GetRequiredService<IWorkflowExecutionLogStore>(); |
| | | 297 | | |
| | 3 | 298 | | var reason = trigger == DrainTrigger.OperatorForce |
| | 3 | 299 | | ? WorkflowInterruptedPayload.ReasonOperatorForce |
| | 3 | 300 | | : WorkflowInterruptedPayload.ReasonDeadlineBreach; |
| | | 301 | | |
| | | 302 | | // Force-cancel proceeds in three phases. The split exists because cancelling and |
| | | 303 | | // awaiting in the same loop made every execution cycle after the first run at full speed |
| | | 304 | | // through the prior execution cycle's settle window — total wall time was O(N × settle |
| | | 305 | | // timeout), defeating the intent of force-cancel under concurrency. |
| | | 306 | | // |
| | | 307 | | // Phase A — cancel every handle synchronously. CancellationTokenSource.Cancel is |
| | | 308 | | // cheap and we want every runner to observe cancellation simultaneously rather |
| | | 309 | | // than serialized behind preceding settle waits. |
| | 12 | 310 | | foreach (var handle in live) |
| | | 311 | | { |
| | | 312 | | try |
| | | 313 | | { |
| | 3 | 314 | | handle.Cancel(); |
| | 3 | 315 | | totalCancelled++; |
| | 6 | 316 | | if (reportedIds.Count < cap) reportedIds.Add(handle.WorkflowInstanceId); |
| | 3 | 317 | | } |
| | 0 | 318 | | catch (Exception ex) when (!ex.IsFatal()) |
| | | 319 | | { |
| | 0 | 320 | | _logger.LogError(ex, "Failed to cancel execution cycle {ExecutionCycleId} (instance={InstanceId}).", han |
| | 0 | 321 | | } |
| | | 322 | | } |
| | | 323 | | |
| | | 324 | | // Phase B — wait for every runner to settle in parallel under a shared deadline. |
| | | 325 | | // The handle disposes when ExecutionCycleTrackingMiddleware exits its `using` block, which |
| | | 326 | | // is after the workflow runner has finished its commit. We want runners' terminal |
| | | 327 | | // commits to land before we overwrite the sub-status with Interrupted — but we |
| | | 328 | | // bound the wait so a non-cancellable activity cannot block drain, accepting the |
| | | 329 | | // runner-clobber race for that one instance (the recovery scan picks it up). |
| | | 330 | | // Total wall time for this phase is at most ForceCancelSettleTimeout regardless |
| | | 331 | | // of N. |
| | 3 | 332 | | var settleTasks = live.Select(async handle => |
| | 3 | 333 | | { |
| | 3 | 334 | | try |
| | 3 | 335 | | { |
| | 3 | 336 | | await handle.Disposed.WaitAsync(ForceCancelSettleTimeout, cancellationToken).ConfigureAwait(false); |
| | 1 | 337 | | } |
| | 2 | 338 | | catch (TimeoutException) |
| | 3 | 339 | | { |
| | 2 | 340 | | _logger.LogWarning("Execution cycle {ExecutionCycleId} (instance={InstanceId}) did not settle within {Ti |
| | 2 | 341 | | } |
| | 0 | 342 | | catch (OperationCanceledException) { /* drain CT fired — proceed to persist anyway */ } |
| | 6 | 343 | | }); |
| | 3 | 344 | | await Task.WhenAll(settleTasks).ConfigureAwait(false); |
| | | 345 | | |
| | | 346 | | // Phase C — persist Interrupted for every handle. Sequential to keep DbContext |
| | | 347 | | // usage single-threaded; per-handle persistence is small. |
| | | 348 | | // |
| | | 349 | | // Each persist runs under its own bounded token that is NOT linked to the drain CT. |
| | | 350 | | // Phase B's catch on OperationCanceledException explicitly comments "drain CT fired — |
| | | 351 | | // proceed to persist anyway"; reusing the cancelled token here would have made that |
| | | 352 | | // a lie — the very first await inside PersistInterruptedAsync would observe the |
| | | 353 | | // cancelled token and throw. Result: every execution cycle would be left in an unrecovered |
| | | 354 | | // executing state on host shutdown. The bounded non-drain token preserves the |
| | | 355 | | // forensic write while preventing a stuck DB from hanging shutdown indefinitely. |
| | 12 | 356 | | foreach (var handle in live) |
| | | 357 | | { |
| | | 358 | | try |
| | | 359 | | { |
| | 3 | 360 | | using var persistCts = new CancellationTokenSource(PersistInterruptedTimeout); |
| | 3 | 361 | | await PersistInterruptedAsync(instanceStore, logStore, handle, generationId, reason, persistCts.Token); |
| | 3 | 362 | | } |
| | 0 | 363 | | catch (Exception ex) when (!ex.IsFatal()) |
| | | 364 | | { |
| | 0 | 365 | | _logger.LogError(ex, "Failed to persist Interrupted for execution cycle {ExecutionCycleId} (instance={In |
| | 0 | 366 | | } |
| | 3 | 367 | | } |
| | | 368 | | |
| | 3 | 369 | | return (totalCancelled, reportedIds); |
| | 3 | 370 | | } |
| | | 371 | | |
| | | 372 | | private async Task PersistInterruptedAsync( |
| | | 373 | | IWorkflowInstanceStore instanceStore, |
| | | 374 | | IWorkflowExecutionLogStore logStore, |
| | | 375 | | ExecutionCycleHandle handle, |
| | | 376 | | string generationId, |
| | | 377 | | string reason, |
| | | 378 | | CancellationToken cancellationToken) |
| | | 379 | | { |
| | 3 | 380 | | var instance = await instanceStore.FindAsync(new WorkflowInstanceFilter { Id = handle.WorkflowInstanceId }, canc |
| | | 381 | | |
| | 3 | 382 | | if (instance is null) |
| | | 383 | | { |
| | | 384 | | // The instance row was never persisted (e.g., a execution cycle whose runner never reached commitStateHandl |
| | | 385 | | // We cannot update a row that doesn't exist, but we MUST still write the forensic trail so postmortem |
| | | 386 | | // recovery can audit what happened. Write a synthetic WorkflowInterrupted log entry with the |
| | | 387 | | // PersistenceFailure discriminator — the workflow-definition fields are unknown so they remain empty. |
| | 0 | 388 | | var orphanPayload = new WorkflowInterruptedPayload( |
| | 0 | 389 | | InterruptedAt: _clock.UtcNow, |
| | 0 | 390 | | Reason: WorkflowInterruptedPayload.ReasonPersistenceFailure, |
| | 0 | 391 | | GenerationId: generationId, |
| | 0 | 392 | | LastActivityId: null, |
| | 0 | 393 | | LastActivityNodeId: null, |
| | 0 | 394 | | IngressSourceName: handle.IngressSourceName, |
| | 0 | 395 | | ExecutionCycleDuration: _clock.UtcNow - handle.StartedAt); |
| | | 396 | | try |
| | | 397 | | { |
| | 0 | 398 | | await logStore.AddAsync(new Entities.WorkflowExecutionLogRecord |
| | 0 | 399 | | { |
| | 0 | 400 | | Id = _identityGenerator.GenerateId(), |
| | 0 | 401 | | WorkflowInstanceId = handle.WorkflowInstanceId, |
| | 0 | 402 | | WorkflowDefinitionId = string.Empty, |
| | 0 | 403 | | WorkflowDefinitionVersionId = string.Empty, |
| | 0 | 404 | | WorkflowVersion = 0, |
| | 0 | 405 | | ActivityInstanceId = string.Empty, |
| | 0 | 406 | | ActivityId = string.Empty, |
| | 0 | 407 | | ActivityType = string.Empty, |
| | 0 | 408 | | ActivityNodeId = string.Empty, |
| | 0 | 409 | | Timestamp = orphanPayload.InterruptedAt, |
| | 0 | 410 | | EventName = WorkflowInterruptedPayload.WorkflowInterruptedEventName, |
| | 0 | 411 | | Source = "Elsa.Workflows.Runtime.GracefulShutdown", |
| | 0 | 412 | | Message = $"Workflow execution cycle force-cancelled by the runtime ({orphanPayload.Reason}); no ins |
| | 0 | 413 | | Payload = orphanPayload, |
| | 0 | 414 | | }, cancellationToken); |
| | 0 | 415 | | } |
| | 0 | 416 | | catch (Exception ex) when (!ex.IsFatal()) |
| | | 417 | | { |
| | 0 | 418 | | _logger.LogWarning(ex, "Failed to write WorkflowInterrupted log entry for orphan execution cycle {Execut |
| | 0 | 419 | | } |
| | 0 | 420 | | return; |
| | | 421 | | } |
| | | 422 | | |
| | 3 | 423 | | var actualReason = reason; |
| | | 424 | | |
| | | 425 | | try |
| | | 426 | | { |
| | 3 | 427 | | instance.SubStatus = WorkflowSubStatus.Interrupted; |
| | 3 | 428 | | instance.IsExecuting = false; |
| | 3 | 429 | | await instanceStore.SaveAsync(instance, cancellationToken); |
| | 2 | 430 | | } |
| | 1 | 431 | | catch (Exception ex) when (!ex.IsFatal()) |
| | | 432 | | { |
| | 1 | 433 | | _logger.LogWarning(ex, "Failed to persist Interrupted sub-status for instance {InstanceId}; falling back to |
| | 1 | 434 | | actualReason = WorkflowInterruptedPayload.ReasonPersistenceFailure; |
| | 1 | 435 | | } |
| | | 436 | | |
| | 3 | 437 | | var payload = new WorkflowInterruptedPayload( |
| | 3 | 438 | | InterruptedAt: _clock.UtcNow, |
| | 3 | 439 | | Reason: actualReason, |
| | 3 | 440 | | GenerationId: generationId, |
| | 3 | 441 | | LastActivityId: null, |
| | 3 | 442 | | LastActivityNodeId: null, |
| | 3 | 443 | | IngressSourceName: handle.IngressSourceName, |
| | 3 | 444 | | ExecutionCycleDuration: _clock.UtcNow - handle.StartedAt); |
| | | 445 | | |
| | | 446 | | try |
| | | 447 | | { |
| | 3 | 448 | | await logStore.LogWorkflowInterruptedAsync(_identityGenerator, instance, payload, cancellationToken); |
| | 3 | 449 | | } |
| | 0 | 450 | | catch (Exception ex) when (!ex.IsFatal()) |
| | | 451 | | { |
| | 0 | 452 | | _logger.LogWarning(ex, "Failed to write WorkflowInterrupted log entry for instance {InstanceId}.", instance. |
| | 0 | 453 | | } |
| | 3 | 454 | | } |
| | | 455 | | |
| | | 456 | | private IReadOnlyList<IngressSourceFinalState> BuildSourceFinalStates() |
| | | 457 | | { |
| | 21 | 458 | | return _registry.Snapshot() |
| | 19 | 459 | | .Select(s => new IngressSourceFinalState(s.Name, s.State, s.LastError, WasForceStopped: s.State == IngressSo |
| | 21 | 460 | | .ToArray(); |
| | | 461 | | } |
| | | 462 | | } |