< Summary

Information
Class: Elsa.Workflows.Runtime.Services.DrainOrchestrator
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DrainOrchestrator.cs
Line coverage
80%
Covered lines: 193
Uncovered lines: 47
Coverable lines: 240
Total lines: 462
Line coverage: 80.4%
Branch coverage
82%
Covered branches: 48
Total branches: 58
Branch coverage: 82.7%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
.ctor(...)100%11100%
DrainAsync()65%202093.75%
ComputeEffectiveDeadline(...)83.33%66100%
PauseAllSourcesAsync()100%22100%
PauseOneSourceAsync()62.5%88100%
TryForceStopAsync()75%5464.28%
WaitForCyclesAsync()100%4483.33%
ForceCancelActiveCyclesAsync()80%111082.85%
<ForceCancelActiveCyclesAsync()100%1171.42%
PersistInterruptedAsync()50%3240%
BuildSourceFinalStates()50%22100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DrainOrchestrator.cs

#LineLine coverage
 1using System.Diagnostics;
 2using Elsa.Common;
 3using Elsa.Workflows.Management;
 4using Elsa.Workflows.Management.Filters;
 5using Elsa.Workflows.Runtime.Options;
 6using Microsoft.Extensions.DependencyInjection;
 7using Microsoft.Extensions.Hosting;
 8using Microsoft.Extensions.Logging;
 9using Microsoft.Extensions.Options;
 10
 11namespace Elsa.Workflows.Runtime.Services;
 12
 13/// <summary>
 14/// Default implementation of <see cref="IDrainOrchestrator"/>.
 15/// </summary>
 16/// <remarks>
 17/// <para>
 18/// Protocol per <c>contracts/drain-orchestrator.md</c>:
 19/// 1. Enter drain via <see cref="IQuiescenceSignal.BeginDrainAsync"/>.
 20/// 2. Pause every registered ingress source in parallel, each bounded by its own per-source timeout. Sources that
 21///    fail to pause are recorded as <see cref="IngressSourceState.PauseFailed"/> and, if they implement
 22///    <see cref="IForceStoppable"/>, are escalated.
 23/// 3. Wait for <see cref="IExecutionCycleRegistry.ActiveCount"/> to reach zero, polling on a short interval.
 24/// 4. On deadline breach (or on operator force, where deadline is zero), iterate live cycles, cancel each,
 25///    persist the corresponding instance in <see cref="WorkflowSubStatus.Interrupted"/>, and write a
 26///    <c>WorkflowInterrupted</c> entry in the per-instance execution log.
 27/// 5. Return a <see cref="DrainOutcome"/>.
 28/// </para>
 29/// <para>
 30/// All exceptions inside the protocol are captured into the outcome — only an <see cref="InvalidOperationException"/>
 31/// thrown by a second non-force invocation propagates.
 32/// </para>
 33/// </remarks>
 34public sealed class DrainOrchestrator : IDrainOrchestrator
 35{
 336    private static readonly TimeSpan SafetyEpsilon = TimeSpan.FromMilliseconds(500);
 337    private static readonly TimeSpan PollInterval = TimeSpan.FromMilliseconds(10);
 38
 39    private readonly IQuiescenceSignal _signal;
 40    private readonly IIngressSourceRegistry _registry;
 41    private readonly IExecutionCycleRegistry _cycles;
 42    private readonly IServiceScopeFactory _scopeFactory;
 43    private readonly IOptions<GracefulShutdownOptions> _options;
 44    private readonly IOptions<HostOptions> _hostOptions;
 45    private readonly ISystemClock _clock;
 46    private readonly IIdentityGenerator _identityGenerator;
 47    private readonly ILogger<DrainOrchestrator> _logger;
 48
 2249    private readonly object _sync = new();
 50    private DrainOutcome? _previousOutcome;
 51    private bool _drainInProgress;
 52
 2253    public DrainOrchestrator(
 2254        IQuiescenceSignal signal,
 2255        IIngressSourceRegistry registry,
 2256        IExecutionCycleRegistry cycles,
 2257        IServiceScopeFactory scopeFactory,
 2258        IOptions<GracefulShutdownOptions> options,
 2259        IOptions<HostOptions> hostOptions,
 2260        ISystemClock clock,
 2261        IIdentityGenerator identityGenerator,
 2262        ILogger<DrainOrchestrator> logger)
 63    {
 2264        _signal = signal;
 2265        _registry = registry;
 2266        _cycles = cycles;
 2267        _scopeFactory = scopeFactory;
 2268        _options = options;
 2269        _hostOptions = hostOptions;
 2270        _clock = clock;
 2271        _identityGenerator = identityGenerator;
 2272        _logger = logger;
 2273    }
 74
 75    /// <inheritdoc />
 76    public async ValueTask<DrainOutcome> DrainAsync(DrainTrigger trigger, CancellationToken cancellationToken = default)
 77    {
 2678        lock (_sync)
 79        {
 2680            if (_drainInProgress)
 81            {
 382                if (trigger == DrainTrigger.OperatorForce && _previousOutcome is not null)
 083                    return _previousOutcome with { WasCached = true };
 384                throw new InvalidOperationException($"Drain already in progress; second invocation rejected (trigger={tr
 85            }
 2386            if (_previousOutcome is not null)
 87            {
 388                if (trigger == DrainTrigger.OperatorForce) return _previousOutcome with { WasCached = true };
 189                throw new InvalidOperationException("Drain already completed in this generation; subsequent non-force in
 90            }
 2191            _drainInProgress = true;
 2192        }
 93
 2194        var startedAt = _clock.UtcNow;
 2195        var deadline = ComputeEffectiveDeadline(trigger);
 2196        var sw = Stopwatch.StartNew();
 2197        TimeSpan pausePhase = TimeSpan.Zero;
 2198        TimeSpan waitPhase = TimeSpan.Zero;
 99
 100        try
 101        {
 21102            await _signal.BeginDrainAsync(cancellationToken);
 21103            _logger.LogInformation("Drain initiated (trigger={Trigger}, deadline={Deadline}).", trigger, deadline);
 104
 21105            var deadlineAt = startedAt + deadline;
 106
 107            // Phase 1: parallel pause. Each source has its own timeout independent of the overall deadline,
 108            // but force-stop fallbacks are clamped to the *remaining* overall budget (deadlineAt - now), not
 109            // the full TimeSpan, so a slow per-source pause can't extend the total shutdown window.
 21110            await PauseAllSourcesAsync(deadlineAt, cancellationToken);
 21111            pausePhase = sw.Elapsed;
 21112            _logger.LogInformation("Ingress pause phase complete in {Elapsed}.", pausePhase);
 113
 114            // Phase 2: wait for active execution cycles to drain.
 21115            var waitStart = sw.Elapsed;
 21116            var deadlineBreach = !await WaitForCyclesAsync(deadlineAt, cancellationToken);
 21117            waitPhase = sw.Elapsed - waitStart;
 118
 119            // Phase 3: outcome assembly.
 21120            int forceCancelled = 0;
 21121            IReadOnlyList<string> forceCancelledIds = Array.Empty<string>();
 122
 21123            if (deadlineBreach || trigger == DrainTrigger.OperatorForce)
 124            {
 4125                _logger.LogWarning("Drain {What}; force-cancelling {Count} active execution cycle(s).",
 4126                    deadlineBreach ? "deadline exceeded" : "operator-forced", _cycles.ActiveCount);
 4127                (forceCancelled, forceCancelledIds) = await ForceCancelActiveCyclesAsync(trigger, _signal.CurrentState.G
 128            }
 129
 130            // OperatorForce always reports Forced regardless of zero-deadline breach mechanics.
 20131            var result = trigger switch
 20132            {
 2133                DrainTrigger.OperatorForce => DrainResult.Forced,
 19134                _ when deadlineBreach => DrainResult.DeadlineExceeded,
 17135                _ => DrainResult.CompletedWithinDeadline,
 20136            };
 137
 20138            var outcome = new DrainOutcome(
 20139                OverallResult: result,
 20140                StartedAt: startedAt,
 20141                CompletedAt: _clock.UtcNow,
 20142                PausePhaseDuration: pausePhase,
 20143                WaitPhaseDuration: waitPhase,
 20144                Sources: BuildSourceFinalStates(),
 20145                ExecutionCyclesForceCancelledCount: forceCancelled,
 20146                ForceCancelledInstanceIds: forceCancelledIds);
 147
 40148            lock (_sync) _previousOutcome = outcome;
 20149            _logger.LogInformation("Drain completed: {Result} (paused={Paused}, waited={Waited}, forceCancelled={ForceCa
 20150                outcome.OverallResult, outcome.PausePhaseDuration, outcome.WaitPhaseDuration, outcome.ExecutionCyclesFor
 20151            return outcome;
 152        }
 1153        catch (Exception ex) when (!ex.IsFatal())
 154        {
 155            // The "drain already in progress / completed" InvalidOperationExceptions are thrown OUTSIDE this
 156            // try block (during the lock-protected setup), so they bubble out without triggering this handler.
 157            // Any IOE that lands here is incidental — from a store/dispatcher inside the protocol — and should
 158            // be captured into the outcome rather than escaping the whole drain.
 1159            _logger.LogError(ex, "Drain aborted by unhandled exception.");
 1160            var outcome = new DrainOutcome(
 1161                OverallResult: DrainResult.AbortedByUnhandledException,
 1162                StartedAt: startedAt,
 1163                CompletedAt: _clock.UtcNow,
 1164                PausePhaseDuration: pausePhase,
 1165                WaitPhaseDuration: waitPhase,
 1166                Sources: BuildSourceFinalStates(),
 1167                ExecutionCyclesForceCancelledCount: 0,
 1168                ForceCancelledInstanceIds: Array.Empty<string>());
 2169            lock (_sync) _previousOutcome = outcome;
 1170            return outcome;
 171        }
 172        finally
 173        {
 42174            lock (_sync) _drainInProgress = false;
 175        }
 22176    }
 177
 178    private TimeSpan ComputeEffectiveDeadline(DrainTrigger trigger)
 179    {
 23180        if (trigger == DrainTrigger.OperatorForce) return TimeSpan.Zero;
 181
 19182        var configured = _options.Value.DrainDeadline;
 19183        var hostShutdown = _hostOptions.Value.ShutdownTimeout;
 184
 185        // Clamp to host's own shutdown budget minus a safety epsilon, so persistence can finish before the process is k
 19186        if (hostShutdown > SafetyEpsilon)
 187        {
 19188            var hostBudget = hostShutdown - SafetyEpsilon;
 26189            if (hostBudget < configured) return hostBudget;
 190        }
 191
 12192        return configured;
 193    }
 194
 195    private async Task PauseAllSourcesAsync(DateTimeOffset deadlineAt, CancellationToken cancellationToken)
 196    {
 21197        var sources = _registry.Sources;
 27198        if (sources.Count == 0) return;
 199
 41200        var pauseTasks = sources.Select(source => PauseOneSourceAsync(source, deadlineAt, cancellationToken)).ToArray();
 15201        await Task.WhenAll(pauseTasks);
 21202    }
 203
 204    private async Task PauseOneSourceAsync(IIngressSource source, DateTimeOffset deadlineAt, CancellationToken cancellat
 205    {
 206        // Precedence: a positive per-source PauseTimeout wins; Zero (or negative) defers to the
 207        // configured GracefulShutdownOptions.IngressPauseTimeout. The resolved value is then
 208        // clamped to the overall remaining budget, with a 1 ms floor so a misconfigured zero
 209        // configured-default still produces a non-zero CancelAfter.
 26210        var configured = _options.Value.IngressPauseTimeout;
 26211        var sourceDeadline = source.PauseTimeout > TimeSpan.Zero ? source.PauseTimeout : configured;
 26212        var overallRemaining = deadlineAt - _clock.UtcNow;
 30213        if (sourceDeadline > overallRemaining) sourceDeadline = overallRemaining;
 26214        if (sourceDeadline <= TimeSpan.Zero) sourceDeadline = TimeSpan.FromMilliseconds(1);
 215
 26216        _registry.RecordTransition(source.Name, IngressSourceState.Pausing);
 26217        using var perSourceCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 26218        perSourceCts.CancelAfter(sourceDeadline);
 219
 220        try
 221        {
 26222            await source.PauseAsync(perSourceCts.Token).AsTask().WaitAsync(perSourceCts.Token);
 22223            _registry.RecordTransition(source.Name, IngressSourceState.Paused);
 22224        }
 2225        catch (OperationCanceledException) when (perSourceCts.IsCancellationRequested && !cancellationToken.IsCancellati
 226        {
 2227            await _registry.MarkPauseFailedAsync(source.Name, "timeout", new TimeoutException($"Source '{source.Name}' d
 2228            await TryForceStopAsync(source, deadlineAt, cancellationToken);
 2229        }
 2230        catch (Exception ex) when (!ex.IsFatal())
 231        {
 2232            await _registry.MarkPauseFailedAsync(source.Name, "exception", ex);
 2233            await TryForceStopAsync(source, deadlineAt, cancellationToken);
 234        }
 26235    }
 236
 237    private async Task TryForceStopAsync(IIngressSource source, DateTimeOffset deadlineAt, CancellationToken cancellatio
 238    {
 7239        if (source is not IForceStoppable forceStoppable) return;
 240
 241        // Use the *remaining* budget — not the full overall deadline — so a per-source pause that already
 242        // burned most of the window can't get another full deadline's worth of force-stop runway.
 1243        var remaining = deadlineAt - _clock.UtcNow;
 1244        if (remaining <= TimeSpan.Zero)
 245        {
 0246            _logger.LogWarning("Skipping force-stop of ingress source '{Name}': overall drain deadline already exceeded.
 0247            return;
 248        }
 249
 250        try
 251        {
 1252            using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 1253            cts.CancelAfter(remaining);
 1254            await forceStoppable.ForceStopAsync(cts.Token);
 1255            _logger.LogInformation("Force-stopped ingress source '{Name}'.", source.Name);
 1256        }
 0257        catch (Exception ex) when (!ex.IsFatal())
 258        {
 0259            _logger.LogWarning(ex, "Force-stop of ingress source '{Name}' failed; drain continues.", source.Name);
 0260        }
 4261    }
 262
 263    private async Task<bool> WaitForCyclesAsync(DateTimeOffset deadlineAt, CancellationToken cancellationToken)
 264    {
 2926265        while (_cycles.ActiveCount > 0)
 266        {
 2913267            if (_clock.UtcNow >= deadlineAt) return false;
 5810268            try { await Task.Delay(PollInterval, cancellationToken); }
 0269            catch (OperationCanceledException) { return _cycles.ActiveCount == 0; }
 270        }
 17271        return true;
 21272    }
 273
 274    /// <summary>Bound on how long the orchestrator waits for a runner to settle after invoking execution cycle cancella
 3275    private static readonly TimeSpan ForceCancelSettleTimeout = TimeSpan.FromSeconds(2);
 276
 277    /// <summary>
 278    /// Per-handle bound on the forensic Interrupted persist write in Phase C. Decoupled from the drain
 279    /// cancellation token: when the host's drain CT has already fired, the persist still has up to this long
 280    /// to land the row + log entry. Keeps "proceed to persist anyway" honest while preventing a stuck DB
 281    /// from hanging shutdown indefinitely.
 282    /// </summary>
 3283    private static readonly TimeSpan PersistInterruptedTimeout = TimeSpan.FromSeconds(5);
 284
 285    private async Task<(int Count, IReadOnlyList<string> Ids)> ForceCancelActiveCyclesAsync(DrainTrigger trigger, string
 286    {
 4287        var cap = _options.Value.MaxForceCancelledInstanceIdsReported;
 4288        var reportedIds = new List<string>(capacity: Math.Min(cap, 16));
 4289        var totalCancelled = 0;
 290
 4291        var live = _cycles.ListActiveCycles();
 4292        if (live.Count == 0) return (0, Array.Empty<string>());
 293
 4294        using var scope = _scopeFactory.CreateScope();
 3295        var instanceStore = scope.ServiceProvider.GetRequiredService<IWorkflowInstanceStore>();
 3296        var logStore = scope.ServiceProvider.GetRequiredService<IWorkflowExecutionLogStore>();
 297
 3298        var reason = trigger == DrainTrigger.OperatorForce
 3299            ? WorkflowInterruptedPayload.ReasonOperatorForce
 3300            : WorkflowInterruptedPayload.ReasonDeadlineBreach;
 301
 302        // Force-cancel proceeds in three phases. The split exists because cancelling and
 303        // awaiting in the same loop made every execution cycle after the first run at full speed
 304        // through the prior execution cycle's settle window — total wall time was O(N × settle
 305        // timeout), defeating the intent of force-cancel under concurrency.
 306        //
 307        // Phase A — cancel every handle synchronously. CancellationTokenSource.Cancel is
 308        // cheap and we want every runner to observe cancellation simultaneously rather
 309        // than serialized behind preceding settle waits.
 12310        foreach (var handle in live)
 311        {
 312            try
 313            {
 3314                handle.Cancel();
 3315                totalCancelled++;
 6316                if (reportedIds.Count < cap) reportedIds.Add(handle.WorkflowInstanceId);
 3317            }
 0318            catch (Exception ex) when (!ex.IsFatal())
 319            {
 0320                _logger.LogError(ex, "Failed to cancel execution cycle {ExecutionCycleId} (instance={InstanceId}).", han
 0321            }
 322        }
 323
 324        // Phase B — wait for every runner to settle in parallel under a shared deadline.
 325        // The handle disposes when ExecutionCycleTrackingMiddleware exits its `using` block, which
 326        // is after the workflow runner has finished its commit. We want runners' terminal
 327        // commits to land before we overwrite the sub-status with Interrupted — but we
 328        // bound the wait so a non-cancellable activity cannot block drain, accepting the
 329        // runner-clobber race for that one instance (the recovery scan picks it up).
 330        // Total wall time for this phase is at most ForceCancelSettleTimeout regardless
 331        // of N.
 3332        var settleTasks = live.Select(async handle =>
 3333        {
 3334            try
 3335            {
 3336                await handle.Disposed.WaitAsync(ForceCancelSettleTimeout, cancellationToken).ConfigureAwait(false);
 1337            }
 2338            catch (TimeoutException)
 3339            {
 2340                _logger.LogWarning("Execution cycle {ExecutionCycleId} (instance={InstanceId}) did not settle within {Ti
 2341            }
 0342            catch (OperationCanceledException) { /* drain CT fired — proceed to persist anyway */ }
 6343        });
 3344        await Task.WhenAll(settleTasks).ConfigureAwait(false);
 345
 346        // Phase C — persist Interrupted for every handle. Sequential to keep DbContext
 347        // usage single-threaded; per-handle persistence is small.
 348        //
 349        // Each persist runs under its own bounded token that is NOT linked to the drain CT.
 350        // Phase B's catch on OperationCanceledException explicitly comments "drain CT fired —
 351        // proceed to persist anyway"; reusing the cancelled token here would have made that
 352        // a lie — the very first await inside PersistInterruptedAsync would observe the
 353        // cancelled token and throw. Result: every execution cycle would be left in an unrecovered
 354        // executing state on host shutdown. The bounded non-drain token preserves the
 355        // forensic write while preventing a stuck DB from hanging shutdown indefinitely.
 12356        foreach (var handle in live)
 357        {
 358            try
 359            {
 3360                using var persistCts = new CancellationTokenSource(PersistInterruptedTimeout);
 3361                await PersistInterruptedAsync(instanceStore, logStore, handle, generationId, reason, persistCts.Token);
 3362            }
 0363            catch (Exception ex) when (!ex.IsFatal())
 364            {
 0365                _logger.LogError(ex, "Failed to persist Interrupted for execution cycle {ExecutionCycleId} (instance={In
 0366            }
 3367        }
 368
 3369        return (totalCancelled, reportedIds);
 3370    }
 371
 372    private async Task PersistInterruptedAsync(
 373        IWorkflowInstanceStore instanceStore,
 374        IWorkflowExecutionLogStore logStore,
 375        ExecutionCycleHandle handle,
 376        string generationId,
 377        string reason,
 378        CancellationToken cancellationToken)
 379    {
 3380        var instance = await instanceStore.FindAsync(new WorkflowInstanceFilter { Id = handle.WorkflowInstanceId }, canc
 381
 3382        if (instance is null)
 383        {
 384            // The instance row was never persisted (e.g., a execution cycle whose runner never reached commitStateHandl
 385            // We cannot update a row that doesn't exist, but we MUST still write the forensic trail so postmortem
 386            // recovery can audit what happened. Write a synthetic WorkflowInterrupted log entry with the
 387            // PersistenceFailure discriminator — the workflow-definition fields are unknown so they remain empty.
 0388            var orphanPayload = new WorkflowInterruptedPayload(
 0389                InterruptedAt: _clock.UtcNow,
 0390                Reason: WorkflowInterruptedPayload.ReasonPersistenceFailure,
 0391                GenerationId: generationId,
 0392                LastActivityId: null,
 0393                LastActivityNodeId: null,
 0394                IngressSourceName: handle.IngressSourceName,
 0395                ExecutionCycleDuration: _clock.UtcNow - handle.StartedAt);
 396            try
 397            {
 0398                await logStore.AddAsync(new Entities.WorkflowExecutionLogRecord
 0399                {
 0400                    Id = _identityGenerator.GenerateId(),
 0401                    WorkflowInstanceId = handle.WorkflowInstanceId,
 0402                    WorkflowDefinitionId = string.Empty,
 0403                    WorkflowDefinitionVersionId = string.Empty,
 0404                    WorkflowVersion = 0,
 0405                    ActivityInstanceId = string.Empty,
 0406                    ActivityId = string.Empty,
 0407                    ActivityType = string.Empty,
 0408                    ActivityNodeId = string.Empty,
 0409                    Timestamp = orphanPayload.InterruptedAt,
 0410                    EventName = WorkflowInterruptedPayload.WorkflowInterruptedEventName,
 0411                    Source = "Elsa.Workflows.Runtime.GracefulShutdown",
 0412                    Message = $"Workflow execution cycle force-cancelled by the runtime ({orphanPayload.Reason}); no ins
 0413                    Payload = orphanPayload,
 0414                }, cancellationToken);
 0415            }
 0416            catch (Exception ex) when (!ex.IsFatal())
 417            {
 0418                _logger.LogWarning(ex, "Failed to write WorkflowInterrupted log entry for orphan execution cycle {Execut
 0419            }
 0420            return;
 421        }
 422
 3423        var actualReason = reason;
 424
 425        try
 426        {
 3427            instance.SubStatus = WorkflowSubStatus.Interrupted;
 3428            instance.IsExecuting = false;
 3429            await instanceStore.SaveAsync(instance, cancellationToken);
 2430        }
 1431        catch (Exception ex) when (!ex.IsFatal())
 432        {
 1433            _logger.LogWarning(ex, "Failed to persist Interrupted sub-status for instance {InstanceId}; falling back to 
 1434            actualReason = WorkflowInterruptedPayload.ReasonPersistenceFailure;
 1435        }
 436
 3437        var payload = new WorkflowInterruptedPayload(
 3438            InterruptedAt: _clock.UtcNow,
 3439            Reason: actualReason,
 3440            GenerationId: generationId,
 3441            LastActivityId: null,
 3442            LastActivityNodeId: null,
 3443            IngressSourceName: handle.IngressSourceName,
 3444            ExecutionCycleDuration: _clock.UtcNow - handle.StartedAt);
 445
 446        try
 447        {
 3448            await logStore.LogWorkflowInterruptedAsync(_identityGenerator, instance, payload, cancellationToken);
 3449        }
 0450        catch (Exception ex) when (!ex.IsFatal())
 451        {
 0452            _logger.LogWarning(ex, "Failed to write WorkflowInterrupted log entry for instance {InstanceId}.", instance.
 0453        }
 3454    }
 455
 456    private IReadOnlyList<IngressSourceFinalState> BuildSourceFinalStates()
 457    {
 21458        return _registry.Snapshot()
 19459            .Select(s => new IngressSourceFinalState(s.Name, s.State, s.LastError, WasForceStopped: s.State == IngressSo
 21460            .ToArray();
 461    }
 462}