< Summary

Information
Class: Elsa.Workflows.Runtime.Services.IngressSourceRegistry
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/IngressSourceRegistry.cs
Line coverage
95%
Covered lines: 43
Uncovered lines: 2
Coverable lines: 45
Total lines: 115
Line coverage: 95.5%
Branch coverage
81%
Covered branches: 13
Total branches: 16
Branch coverage: 81.2%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_Sources()100%11100%
EnsureMaterialized()87.5%88100%
Snapshot()100%11100%
MarkPauseFailedAsync(...)100%44100%
RecordTransition(...)100%22100%
GetState(...)0%620%
.ctor(...)100%11100%
get_Source()100%11100%
get_State()100%11100%
get_LastError()100%11100%
get_LastTransitionAt()100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/IngressSourceRegistry.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using Elsa.Common;
 3
 4namespace Elsa.Workflows.Runtime.Services;
 5
 6/// <summary>
 7/// Default implementation of <see cref="IIngressSourceRegistry"/>. Backed by a <see cref="ConcurrentDictionary{TKey,TVa
 8/// keyed by source name, with atomic state transitions.
 9/// </summary>
 10public sealed class IngressSourceRegistry : IIngressSourceRegistry
 11{
 16412    private readonly ConcurrentDictionary<string, Entry> _entries = new(StringComparer.Ordinal);
 13    private readonly Lazy<IEnumerable<IIngressSource>> _sourcesFactory;
 14    private readonly ISystemClock _clock;
 16415    private readonly object _materializeSync = new();
 16    private volatile bool _materialized;
 17
 18    /// <summary>
 19    /// Creates the registry. The <paramref name="sourcesFactory"/> is materialized lazily so adapter
 20    /// implementations can take a direct <see cref="IQuiescenceSignal"/> dependency without creating a DI cycle
 21    /// (the signal depends on <see cref="IExecutionCycleRegistry"/>, which depends on this registry, which depends on
 22    /// <c>IEnumerable&lt;IIngressSource&gt;</c>). The first call that requires <see cref="Sources"/> materializes
 23    /// the enumerable; subsequent calls reuse the snapshot.
 24    /// </summary>
 16425    public IngressSourceRegistry(Lazy<IEnumerable<IIngressSource>> sourcesFactory, ISystemClock clock)
 26    {
 16427        _sourcesFactory = sourcesFactory;
 16428        _clock = clock;
 16429    }
 30
 31    /// <inheritdoc />
 32    public IReadOnlyCollection<IIngressSource> Sources
 33    {
 34        get
 35        {
 3136            EnsureMaterialized();
 10537            return _entries.Values.Select(e => e.Source).ToArray();
 38        }
 39    }
 40
 41    private void EnsureMaterialized()
 42    {
 43        // Double-checked lock: a `_entries.Count > 0` guard is not atomic with the population loop, so concurrent
 44        // first callers could both observe an empty dictionary, both iterate _sourcesFactory.Value, and both call
 45        // TryAdd — losing the duplicate-name guarantee and crashing the second caller's execution cycle start with
 46        // "Duplicate ingress source registration". A short lock around the one-time population is sufficient;
 47        // steady-state reads short-circuit on the volatile flag without taking the lock.
 35248        if (_materialized) return;
 1849        lock (_materializeSync)
 50        {
 1851            if (_materialized) return;
 10152            foreach (var source in _sourcesFactory.Value)
 53            {
 3354                if (!_entries.TryAdd(source.Name, new Entry(source)))
 155                    throw new InvalidOperationException($"Duplicate ingress source registration for name '{source.Name}'
 56            }
 1757            _materialized = true;
 1758        }
 1759    }
 60
 61    /// <inheritdoc />
 62    public IReadOnlyCollection<IngressSourceSnapshot> Snapshot()
 63    {
 1464        EnsureMaterialized();
 1465        return _entries.Values
 2466            .Select(e => new IngressSourceSnapshot(e.Source.Name, e.State, e.LastError, e.LastTransitionAt))
 1467            .ToArray();
 68    }
 69
 70    /// <inheritdoc />
 71    public ValueTask MarkPauseFailedAsync(string name, string reason, Exception? error = null)
 72    {
 10173        EnsureMaterialized();
 10174        if (_entries.TryGetValue(name, out var entry))
 75        {
 10176            lock (entry.Sync)
 77            {
 10178                entry.State = IngressSourceState.PauseFailed;
 10179                entry.LastError = error ?? new InvalidOperationException(reason);
 10180                entry.LastTransitionAt = _clock.UtcNow;
 10181            }
 82        }
 10183        return ValueTask.CompletedTask;
 84    }
 85
 86    /// <inheritdoc />
 87    public void RecordTransition(string name, IngressSourceState newState, Exception? error = null)
 88    {
 3989        EnsureMaterialized();
 3990        if (_entries.TryGetValue(name, out var entry))
 91        {
 3992            lock (entry.Sync)
 93            {
 3994                entry.State = newState;
 3995                entry.LastError = error;
 3996                entry.LastTransitionAt = _clock.UtcNow;
 3997            }
 98        }
 3999    }
 100
 101    internal IngressSourceState GetState(string name)
 102    {
 0103        EnsureMaterialized();
 0104        return _entries.TryGetValue(name, out var e) ? e.State : IngressSourceState.Running;
 105    }
 106
 33107    private sealed class Entry(IIngressSource source)
 108    {
 33109        public readonly object Sync = new();
 132110        public IIngressSource Source { get; } = source;
 164111        public IngressSourceState State { get; set; } = IngressSourceState.Running;
 164112        public Exception? LastError { get; set; }
 164113        public DateTimeOffset? LastTransitionAt { get; set; }
 114    }
 115}