| | | 1 | | using System.Collections.Concurrent; |
| | | 2 | | using Elsa.Common; |
| | | 3 | | |
| | | 4 | | namespace Elsa.Workflows.Runtime.Services; |
| | | 5 | | |
| | | 6 | | /// <summary> |
| | | 7 | | /// Default implementation of <see cref="IIngressSourceRegistry"/>. Backed by a <see cref="ConcurrentDictionary{TKey,TVa |
| | | 8 | | /// keyed by source name, with atomic state transitions. |
| | | 9 | | /// </summary> |
| | | 10 | | public sealed class IngressSourceRegistry : IIngressSourceRegistry |
| | | 11 | | { |
| | 164 | 12 | | private readonly ConcurrentDictionary<string, Entry> _entries = new(StringComparer.Ordinal); |
| | | 13 | | private readonly Lazy<IEnumerable<IIngressSource>> _sourcesFactory; |
| | | 14 | | private readonly ISystemClock _clock; |
| | 164 | 15 | | private readonly object _materializeSync = new(); |
| | | 16 | | private volatile bool _materialized; |
| | | 17 | | |
| | | 18 | | /// <summary> |
| | | 19 | | /// Creates the registry. The <paramref name="sourcesFactory"/> is materialized lazily so adapter |
| | | 20 | | /// implementations can take a direct <see cref="IQuiescenceSignal"/> dependency without creating a DI cycle |
| | | 21 | | /// (the signal depends on <see cref="IExecutionCycleRegistry"/>, which depends on this registry, which depends on |
| | | 22 | | /// <c>IEnumerable<IIngressSource></c>). The first call that requires <see cref="Sources"/> materializes |
| | | 23 | | /// the enumerable; subsequent calls reuse the snapshot. |
| | | 24 | | /// </summary> |
| | 164 | 25 | | public IngressSourceRegistry(Lazy<IEnumerable<IIngressSource>> sourcesFactory, ISystemClock clock) |
| | | 26 | | { |
| | 164 | 27 | | _sourcesFactory = sourcesFactory; |
| | 164 | 28 | | _clock = clock; |
| | 164 | 29 | | } |
| | | 30 | | |
| | | 31 | | /// <inheritdoc /> |
| | | 32 | | public IReadOnlyCollection<IIngressSource> Sources |
| | | 33 | | { |
| | | 34 | | get |
| | | 35 | | { |
| | 31 | 36 | | EnsureMaterialized(); |
| | 105 | 37 | | return _entries.Values.Select(e => e.Source).ToArray(); |
| | | 38 | | } |
| | | 39 | | } |
| | | 40 | | |
| | | 41 | | private void EnsureMaterialized() |
| | | 42 | | { |
| | | 43 | | // Double-checked lock: a `_entries.Count > 0` guard is not atomic with the population loop, so concurrent |
| | | 44 | | // first callers could both observe an empty dictionary, both iterate _sourcesFactory.Value, and both call |
| | | 45 | | // TryAdd — losing the duplicate-name guarantee and crashing the second caller's execution cycle start with |
| | | 46 | | // "Duplicate ingress source registration". A short lock around the one-time population is sufficient; |
| | | 47 | | // steady-state reads short-circuit on the volatile flag without taking the lock. |
| | 352 | 48 | | if (_materialized) return; |
| | 18 | 49 | | lock (_materializeSync) |
| | | 50 | | { |
| | 18 | 51 | | if (_materialized) return; |
| | 101 | 52 | | foreach (var source in _sourcesFactory.Value) |
| | | 53 | | { |
| | 33 | 54 | | if (!_entries.TryAdd(source.Name, new Entry(source))) |
| | 1 | 55 | | throw new InvalidOperationException($"Duplicate ingress source registration for name '{source.Name}' |
| | | 56 | | } |
| | 17 | 57 | | _materialized = true; |
| | 17 | 58 | | } |
| | 17 | 59 | | } |
| | | 60 | | |
| | | 61 | | /// <inheritdoc /> |
| | | 62 | | public IReadOnlyCollection<IngressSourceSnapshot> Snapshot() |
| | | 63 | | { |
| | 14 | 64 | | EnsureMaterialized(); |
| | 14 | 65 | | return _entries.Values |
| | 24 | 66 | | .Select(e => new IngressSourceSnapshot(e.Source.Name, e.State, e.LastError, e.LastTransitionAt)) |
| | 14 | 67 | | .ToArray(); |
| | | 68 | | } |
| | | 69 | | |
| | | 70 | | /// <inheritdoc /> |
| | | 71 | | public ValueTask MarkPauseFailedAsync(string name, string reason, Exception? error = null) |
| | | 72 | | { |
| | 101 | 73 | | EnsureMaterialized(); |
| | 101 | 74 | | if (_entries.TryGetValue(name, out var entry)) |
| | | 75 | | { |
| | 101 | 76 | | lock (entry.Sync) |
| | | 77 | | { |
| | 101 | 78 | | entry.State = IngressSourceState.PauseFailed; |
| | 101 | 79 | | entry.LastError = error ?? new InvalidOperationException(reason); |
| | 101 | 80 | | entry.LastTransitionAt = _clock.UtcNow; |
| | 101 | 81 | | } |
| | | 82 | | } |
| | 101 | 83 | | return ValueTask.CompletedTask; |
| | | 84 | | } |
| | | 85 | | |
| | | 86 | | /// <inheritdoc /> |
| | | 87 | | public void RecordTransition(string name, IngressSourceState newState, Exception? error = null) |
| | | 88 | | { |
| | 39 | 89 | | EnsureMaterialized(); |
| | 39 | 90 | | if (_entries.TryGetValue(name, out var entry)) |
| | | 91 | | { |
| | 39 | 92 | | lock (entry.Sync) |
| | | 93 | | { |
| | 39 | 94 | | entry.State = newState; |
| | 39 | 95 | | entry.LastError = error; |
| | 39 | 96 | | entry.LastTransitionAt = _clock.UtcNow; |
| | 39 | 97 | | } |
| | | 98 | | } |
| | 39 | 99 | | } |
| | | 100 | | |
| | | 101 | | internal IngressSourceState GetState(string name) |
| | | 102 | | { |
| | 0 | 103 | | EnsureMaterialized(); |
| | 0 | 104 | | return _entries.TryGetValue(name, out var e) ? e.State : IngressSourceState.Running; |
| | | 105 | | } |
| | | 106 | | |
| | 33 | 107 | | private sealed class Entry(IIngressSource source) |
| | | 108 | | { |
| | 33 | 109 | | public readonly object Sync = new(); |
| | 132 | 110 | | public IIngressSource Source { get; } = source; |
| | 164 | 111 | | public IngressSourceState State { get; set; } = IngressSourceState.Running; |
| | 164 | 112 | | public Exception? LastError { get; set; } |
| | 164 | 113 | | public DateTimeOffset? LastTransitionAt { get; set; } |
| | | 114 | | } |
| | | 115 | | } |