| | | 1 | | using System.Collections.Concurrent; |
| | | 2 | | using Elsa.Common; |
| | | 3 | | |
| | | 4 | | namespace Elsa.Workflows.Runtime.Services; |
| | | 5 | | |
| | | 6 | | /// <summary> |
| | | 7 | | /// Default implementation of <see cref="IExecutionCycleRegistry"/>. Tracks active execution cycles with a |
| | | 8 | | /// thread-safe dictionary and detects the FR-018 invariant violation (a source that reports |
| | | 9 | | /// <see cref="IngressSourceState.Paused"/> but initiates an execution cycle). |
| | | 10 | | /// </summary> |
| | | 11 | | public sealed class ExecutionCycleRegistry : IExecutionCycleRegistry |
| | | 12 | | { |
| | 167 | 13 | | private readonly ConcurrentDictionary<Guid, ExecutionCycleHandle> _active = new(); |
| | | 14 | | private readonly IIngressSourceRegistry _sources; |
| | | 15 | | private readonly ISystemClock _clock; |
| | | 16 | | |
| | 167 | 17 | | public ExecutionCycleRegistry(IIngressSourceRegistry sources, ISystemClock clock) |
| | | 18 | | { |
| | 167 | 19 | | _sources = sources; |
| | 167 | 20 | | _clock = clock; |
| | 167 | 21 | | } |
| | | 22 | | |
| | | 23 | | /// <inheritdoc /> |
| | 2925 | 24 | | public int ActiveCount => _active.Count; |
| | | 25 | | |
| | | 26 | | /// <inheritdoc /> |
| | | 27 | | public ExecutionCycleHandle BeginCycle(string workflowInstanceId, string? ingressSourceName, CancellationToken linke |
| | | 28 | | { |
| | | 29 | | // FR-018: a source that reports Paused but starts an execution cycle is inconsistent — flip it to PauseFailed. |
| | 568 | 30 | | if (ingressSourceName is not null) |
| | | 31 | | { |
| | 4 | 32 | | var snapshot = _sources.Snapshot().FirstOrDefault(s => s.Name == ingressSourceName); |
| | 2 | 33 | | if (snapshot is not null && snapshot.State == IngressSourceState.Paused) |
| | | 34 | | { |
| | | 35 | | // Fire-and-forget: flipping the recorded state is a local operation that does not await I/O. |
| | 1 | 36 | | _ = _sources.MarkPauseFailedAsync(ingressSourceName, reason: "delivered-while-paused"); |
| | | 37 | | } |
| | | 38 | | } |
| | | 39 | | |
| | 568 | 40 | | var handle = new ExecutionCycleHandle( |
| | 568 | 41 | | id: Guid.NewGuid(), |
| | 568 | 42 | | workflowInstanceId: workflowInstanceId, |
| | 568 | 43 | | ingressSourceName: ingressSourceName, |
| | 568 | 44 | | startedAt: _clock.UtcNow, |
| | 568 | 45 | | linkedToken: linkedToken, |
| | 556 | 46 | | onDisposed: h => _active.TryRemove(h.Id, out _), |
| | 568 | 47 | | cancelCallback: cancelCallback); |
| | | 48 | | |
| | 568 | 49 | | _active[handle.Id] = handle; |
| | 568 | 50 | | return handle; |
| | | 51 | | } |
| | | 52 | | |
| | | 53 | | /// <inheritdoc /> |
| | 3 | 54 | | public IReadOnlyCollection<ExecutionCycleHandle> ListActiveCycles() => _active.Values.ToArray(); |
| | | 55 | | } |