< Summary

Information
Class: Elsa.Workflows.Runtime.Services.QuiescenceSignal
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/QuiescenceSignal.cs
Line coverage
97%
Covered lines: 114
Uncovered lines: 3
Coverable lines: 117
Total lines: 273
Line coverage: 97.4%
Branch coverage
88%
Covered branches: 37
Total branches: 42
Branch coverage: 88%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%44100%
.ctor(...)100%11100%
.ctor(...)100%11100%
Create(...)100%11100%
get_CurrentState()100%11100%
get_IsAcceptingNewWork()100%11100%
get_ActiveExecutionCycleCount()100%11100%
InitializePersistedStateAsync()83.33%66100%
BeginDrainAsync(...)100%22100%
PauseAsync()100%44100%
ResumeAsync()100%66100%
PersistAsync()100%66100%
UseKeyValueStoreAsync()50%8662.5%
UseKeyValueStoreAsync()100%66100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/QuiescenceSignal.cs

#LineLine coverage
 1using Elsa.Common;
 2using Elsa.KeyValues.Contracts;
 3using Elsa.KeyValues.Entities;
 4using Elsa.KeyValues.Models;
 5using Elsa.Workflows.Runtime.Options;
 6using Microsoft.Extensions.DependencyInjection;
 7using Microsoft.Extensions.Options;
 8
 9namespace Elsa.Workflows.Runtime.Services;
 10
 11/// <summary>
 12/// Default thread-safe implementation of <see cref="IQuiescenceSignal"/>. Uses a single lock for transitions
 13/// and a volatile reference read for lock-free state queries. See FR-001..FR-005 and research R8 for
 14/// pause-persistence semantics.
 15/// </summary>
 16public sealed class QuiescenceSignal : IQuiescenceSignal
 17{
 18    private const string PersistenceKeyPrefix = "elsa.quiescence.pause.";
 19
 9820    private readonly object _sync = new();
 21    // Serializes persistence I/O so racing Pause/Resume can't reorder writes in the store. Held only across
 22    // the I/O — the in-memory transition still uses the fast _sync lock, and pause/resume aren't hot paths.
 9823    private readonly SemaphoreSlim _persistenceMutex = new(1, 1);
 24    private readonly IOptions<GracefulShutdownOptions> _options;
 25    private readonly ISystemClock _clock;
 26    private readonly IKeyValueStore? _keyValueStore;
 27    private readonly IServiceScopeFactory? _serviceScopeFactory;
 28    private readonly IExecutionCycleRegistry _cycleRegistry;
 29    private readonly string _persistenceKey;
 30
 31    private QuiescenceState _state;
 32
 33    /// <summary>
 34    /// Creates the signal. The generation id defaults to a new GUID per construction — when the container is torn
 35    /// down and rebuilt (shell reactivation or host restart), a fresh id is minted, which is what scopes recovery
 36    /// in <c>RecoverInterruptedWorkflowsStartupTask</c>.
 37    /// </summary>
 38    [ActivatorUtilitiesConstructor]
 39    public QuiescenceSignal(
 40        IOptions<GracefulShutdownOptions> options,
 41        ISystemClock clock,
 42        IExecutionCycleRegistry cycleRegistry,
 43        IServiceScopeFactory serviceScopeFactory,
 44        string? shellName = null,
 8145        string? generationId = null) : this(options, clock, cycleRegistry, keyValueStore: null, serviceScopeFactory, she
 46    {
 8147    }
 48
 49    public QuiescenceSignal(
 50        IOptions<GracefulShutdownOptions> options,
 51        ISystemClock clock,
 52        IExecutionCycleRegistry cycleRegistry,
 53        string? shellName = null,
 954        string? generationId = null) : this(options, clock, cycleRegistry, keyValueStore: null, serviceScopeFactory: nul
 55    {
 956    }
 57
 58    /// <summary>
 59    /// Creates the signal with a fixed key-value store. Intended for tests and non-container usage.
 60    /// </summary>
 61    public static QuiescenceSignal Create(
 62        IOptions<GracefulShutdownOptions> options,
 63        ISystemClock clock,
 64        IExecutionCycleRegistry cycleRegistry,
 65        IKeyValueStore? keyValueStore = null,
 66        string? shellName = null,
 867        string? generationId = null) => new(options, clock, cycleRegistry, keyValueStore, serviceScopeFactory: null, she
 68
 9869    private QuiescenceSignal(
 9870        IOptions<GracefulShutdownOptions> options,
 9871        ISystemClock clock,
 9872        IExecutionCycleRegistry cycleRegistry,
 9873        IKeyValueStore? keyValueStore,
 9874        IServiceScopeFactory? serviceScopeFactory,
 9875        string? shellName,
 9876        string? generationId)
 77    {
 9878        _options = options;
 9879        _clock = clock;
 9880        _cycleRegistry = cycleRegistry;
 9881        _keyValueStore = keyValueStore;
 9882        _serviceScopeFactory = serviceScopeFactory;
 9883        _persistenceKey = PersistenceKeyPrefix + (shellName ?? "default");
 9884        _state = QuiescenceState.Initial(generationId ?? Guid.NewGuid().ToString("N"));
 9885    }
 86
 87    /// <inheritdoc />
 88    public QuiescenceState CurrentState
 89    {
 90        get
 91        {
 92            // Volatile read — the reference is always overwritten atomically under the lock.
 28593            return Volatile.Read(ref _state);
 94        }
 95    }
 96
 97    /// <inheritdoc />
 27498    public bool IsAcceptingNewWork => CurrentState.IsAcceptingNewWork;
 99
 100    /// <inheritdoc />
 1101    public int ActiveExecutionCycleCount => _cycleRegistry.ActiveCount;
 102
 103    /// <summary>
 104    /// Loads any persisted administrative pause state. Called once per runtime generation by a startup task when
 105    /// <see cref="GracefulShutdownOptions.PausePersistence"/> is <see cref="PausePersistencePolicy.AcrossReactivations"
 106    /// No-op otherwise, or when the key-value store is not registered.
 107    /// </summary>
 108    public async ValueTask InitializePersistedStateAsync(CancellationToken cancellationToken)
 109    {
 162110        if (_options.Value.PausePersistence != PausePersistencePolicy.AcrossReactivations) return;
 111
 3112        var pair = await UseKeyValueStoreAsync(store => store.FindAsync(new KeyValueFilter { Key = _persistenceKey }, ca
 3113        if (pair is null) return;
 114
 1115        lock (_sync)
 116        {
 1117            if ((_state.Reason & QuiescenceReason.AdministrativePause) != 0) return; // someone already paused us
 1118            var next = _state with
 1119            {
 1120                Reason = _state.Reason | QuiescenceReason.AdministrativePause,
 1121                PausedAt = _clock.UtcNow,
 1122                PauseReasonText = pair.SerializedValue,
 1123                PauseRequestedBy = "persisted",
 1124            };
 1125            Volatile.Write(ref _state, next);
 1126        }
 82127    }
 128
 129    /// <inheritdoc />
 130    public ValueTask<QuiescenceState> BeginDrainAsync(CancellationToken cancellationToken = default)
 131    {
 132        QuiescenceState next;
 15133        lock (_sync)
 134        {
 15135            if ((_state.Reason & QuiescenceReason.Drain) != 0)
 136            {
 1137                return new ValueTask<QuiescenceState>(_state);
 138            }
 139
 14140            next = _state with
 14141            {
 14142                Reason = _state.Reason | QuiescenceReason.Drain,
 14143                DrainStartedAt = _clock.UtcNow,
 14144            };
 14145            Volatile.Write(ref _state, next);
 14146        }
 147
 14148        return new ValueTask<QuiescenceState>(next);
 1149    }
 150
 151    /// <inheritdoc />
 152    public async ValueTask<QuiescenceState> PauseAsync(string? reasonText, string? requestedBy, CancellationToken cancel
 153    {
 154        QuiescenceState next;
 18155        bool transitioned = false;
 18156        lock (_sync)
 157        {
 18158            if ((_state.Reason & QuiescenceReason.AdministrativePause) != 0)
 159            {
 2160                next = _state;
 161            }
 162            else
 163            {
 16164                next = _state with
 16165                {
 16166                    Reason = _state.Reason | QuiescenceReason.AdministrativePause,
 16167                    PausedAt = _clock.UtcNow,
 16168                    PauseReasonText = reasonText,
 16169                    PauseRequestedBy = requestedBy,
 16170                };
 16171                Volatile.Write(ref _state, next);
 16172                transitioned = true;
 173            }
 18174        }
 175
 18176        if (transitioned)
 16177            await PersistAsync();
 178
 18179        return next;
 18180    }
 181
 182    /// <inheritdoc />
 183    public async ValueTask<QuiescenceState> ResumeAsync(string? requestedBy, CancellationToken cancellationToken)
 184    {
 185        QuiescenceState next;
 9186        bool transitioned = false;
 9187        lock (_sync)
 188        {
 189            // Resume is a no-op while drain is active — the runtime cannot return to normal operation within the same g
 11190            if ((_state.Reason & QuiescenceReason.Drain) != 0) { return _state; }
 9191            if ((_state.Reason & QuiescenceReason.AdministrativePause) == 0) { return _state; }
 192
 5193            next = _state with
 5194            {
 5195                Reason = _state.Reason & ~QuiescenceReason.AdministrativePause,
 5196                PausedAt = null,
 5197                PauseReasonText = null,
 5198                PauseRequestedBy = requestedBy,
 5199            };
 5200            Volatile.Write(ref _state, next);
 5201            transitioned = true;
 5202        }
 203
 5204        if (transitioned)
 5205            await PersistAsync();
 206
 5207        return next;
 9208    }
 209
 210    /// <summary>
 211    /// Persists the current administrative-pause state. Serialized via <see cref="_persistenceMutex"/> so racing
 212    /// Pause/Resume can't reorder writes in the store. The live state is re-read inside the semaphore — each I/O
 213    /// writes whatever the latest in-memory transition was, so N racing transitions produce N serialized writes
 214    /// and the final persisted state always matches the final in-memory state.
 215    /// </summary>
 216    /// <remarks>
 217    /// Uses <see cref="CancellationToken.None"/> deliberately for both the semaphore wait and the store I/O.
 218    /// By the time this runs the in-memory transition has already committed; if a cancelled HTTP request token
 219    /// caused the persistence to skip, in-memory state would diverge from the store — and the idempotent
 220    /// fast-path in <see cref="PauseAsync"/>/<see cref="ResumeAsync"/> (transitioned == false) means a later call
 221    /// would not retry the write. So persistence must complete regardless of caller cancellation.
 222    /// </remarks>
 223    private async ValueTask PersistAsync()
 224    {
 21225        if (_options.Value.PausePersistence != PausePersistencePolicy.AcrossReactivations)
 10226            return;
 227
 11228        await _persistenceMutex.WaitAsync(CancellationToken.None);
 229        try
 230        {
 11231            var live = Volatile.Read(ref _state);
 11232            if ((live.Reason & QuiescenceReason.AdministrativePause) != 0)
 15233                await UseKeyValueStoreAsync(store => store.SaveAsync(new SerializedKeyValuePair { Key = _persistenceKey,
 234            else
 5235                await UseKeyValueStoreAsync(store => store.DeleteAsync(_persistenceKey, CancellationToken.None));
 11236        }
 237        finally
 238        {
 11239            _persistenceMutex.Release();
 240        }
 21241    }
 242
 243    private async ValueTask<TResult> UseKeyValueStoreAsync<TResult>(Func<IKeyValueStore, Task<TResult>> action, TResult 
 244    {
 2245        if (_keyValueStore is not null)
 1246            return await action(_keyValueStore);
 247
 1248        if (_serviceScopeFactory is null)
 1249            return defaultValue;
 250
 0251        using var scope = _serviceScopeFactory.CreateScope();
 0252        var store = scope.ServiceProvider.GetService<IKeyValueStore>();
 253
 0254        return store is null ? defaultValue : await action(store);
 2255    }
 256
 257    private async ValueTask UseKeyValueStoreAsync(Func<IKeyValueStore, Task> action)
 258    {
 11259        if (_keyValueStore is not null)
 260        {
 8261            await action(_keyValueStore);
 8262            return;
 263        }
 264
 3265        if (_serviceScopeFactory is null)
 2266            return;
 267
 1268        using var scope = _serviceScopeFactory.CreateScope();
 1269        var store = scope.ServiceProvider.GetService<IKeyValueStore>();
 1270        if (store is not null)
 1271            await action(store);
 11272    }
 273}