| | | 1 | | using System.Collections.Concurrent; |
| | | 2 | | using Elsa.Diagnostics.StructuredLogs.Contracts; |
| | | 3 | | using Elsa.Diagnostics.StructuredLogs.Models; |
| | | 4 | | using Microsoft.AspNetCore.SignalR; |
| | | 5 | | using Microsoft.Extensions.Logging; |
| | | 6 | | |
| | | 7 | | namespace Elsa.Diagnostics.StructuredLogs.RealTime; |
| | | 8 | | |
| | | 9 | | public class StructuredLogSubscriptionManager : IDisposable |
| | | 10 | | { |
| | 0 | 11 | | private readonly ConcurrentDictionary<string, StructuredLogSubscription> _subscriptions = new(StringComparer.Ordinal |
| | | 12 | | private readonly IStructuredLogProvider _logProvider; |
| | | 13 | | private readonly IStructuredLogSourceRegistry _sourceRegistry; |
| | | 14 | | private readonly IHubContext<StructuredLogsHub, IStructuredLogsClient> _hubContext; |
| | | 15 | | private readonly ILogger<StructuredLogSubscriptionManager> _logger; |
| | | 16 | | |
| | 0 | 17 | | public StructuredLogSubscriptionManager( |
| | 0 | 18 | | IStructuredLogProvider logProvider, |
| | 0 | 19 | | IStructuredLogSourceRegistry sourceRegistry, |
| | 0 | 20 | | IHubContext<StructuredLogsHub, IStructuredLogsClient> hubContext, |
| | 0 | 21 | | ILogger<StructuredLogSubscriptionManager> logger) |
| | | 22 | | { |
| | 0 | 23 | | _logProvider = logProvider; |
| | 0 | 24 | | _sourceRegistry = sourceRegistry; |
| | 0 | 25 | | _hubContext = hubContext; |
| | 0 | 26 | | _logger = logger; |
| | 0 | 27 | | _sourceRegistry.SourceChanged += OnSourceChanged; |
| | 0 | 28 | | } |
| | | 29 | | |
| | | 30 | | public Task SubscribeAsync(string connectionId, StructuredLogFilter filter, CancellationToken cancellationToken) |
| | | 31 | | { |
| | 0 | 32 | | Unsubscribe(connectionId); |
| | 0 | 33 | | var subscriptionCancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); |
| | 0 | 34 | | var subscription = new StructuredLogSubscription(subscriptionCancellation); |
| | 0 | 35 | | _subscriptions[connectionId] = subscription; |
| | | 36 | | |
| | 0 | 37 | | _ = StreamAsync(connectionId, filter, subscription, subscriptionCancellation.Token); |
| | 0 | 38 | | return Task.CompletedTask; |
| | | 39 | | } |
| | | 40 | | |
| | | 41 | | public Task UpdateFilterAsync(string connectionId, StructuredLogFilter filter, CancellationToken cancellationToken) |
| | | 42 | | { |
| | 0 | 43 | | return SubscribeAsync(connectionId, filter, cancellationToken); |
| | | 44 | | } |
| | | 45 | | |
| | | 46 | | public Task UnsubscribeAsync(string connectionId) |
| | | 47 | | { |
| | 0 | 48 | | Unsubscribe(connectionId); |
| | 0 | 49 | | return Task.CompletedTask; |
| | | 50 | | } |
| | | 51 | | |
| | | 52 | | public void Dispose() |
| | | 53 | | { |
| | 0 | 54 | | _sourceRegistry.SourceChanged -= OnSourceChanged; |
| | | 55 | | |
| | 0 | 56 | | foreach (var subscription in _subscriptions.Values) |
| | | 57 | | { |
| | 0 | 58 | | subscription.CancellationTokenSource.Cancel(); |
| | 0 | 59 | | subscription.CancellationTokenSource.Dispose(); |
| | | 60 | | } |
| | | 61 | | |
| | 0 | 62 | | _subscriptions.Clear(); |
| | 0 | 63 | | } |
| | | 64 | | |
| | | 65 | | private async Task StreamAsync(string connectionId, StructuredLogFilter filter, StructuredLogSubscription subscripti |
| | | 66 | | { |
| | | 67 | | try |
| | | 68 | | { |
| | 0 | 69 | | if (_logProvider is IStructuredLogStreamProvider streamProvider) |
| | 0 | 70 | | await StreamWithDroppedEventsAsync(connectionId, filter, streamProvider, cancellationToken); |
| | | 71 | | else |
| | 0 | 72 | | await StreamLogEventsAsync(connectionId, filter, cancellationToken); |
| | 0 | 73 | | } |
| | 0 | 74 | | catch (OperationCanceledException e) |
| | | 75 | | { |
| | 0 | 76 | | _logger.LogDebug(e, "Structured log subscription for connection {ConnectionId} was canceled", connectionId); |
| | 0 | 77 | | } |
| | 0 | 78 | | catch (HubException e) |
| | | 79 | | { |
| | 0 | 80 | | _logger.LogWarning(e, "Structured log subscription for connection {ConnectionId} stopped unexpectedly", conn |
| | 0 | 81 | | } |
| | 0 | 82 | | catch (InvalidOperationException e) |
| | | 83 | | { |
| | 0 | 84 | | _logger.LogWarning(e, "Structured log subscription for connection {ConnectionId} stopped unexpectedly", conn |
| | 0 | 85 | | } |
| | | 86 | | finally |
| | | 87 | | { |
| | 0 | 88 | | Remove(connectionId, subscription); |
| | | 89 | | } |
| | 0 | 90 | | } |
| | | 91 | | |
| | | 92 | | private void Unsubscribe(string connectionId) |
| | | 93 | | { |
| | 0 | 94 | | if (!_subscriptions.TryRemove(connectionId, out var subscription)) |
| | 0 | 95 | | return; |
| | | 96 | | |
| | 0 | 97 | | subscription.CancellationTokenSource.Cancel(); |
| | 0 | 98 | | subscription.CancellationTokenSource.Dispose(); |
| | 0 | 99 | | } |
| | | 100 | | |
| | | 101 | | private void Remove(string connectionId, StructuredLogSubscription subscription) |
| | | 102 | | { |
| | 0 | 103 | | var entry = new KeyValuePair<string, StructuredLogSubscription>(connectionId, subscription); |
| | 0 | 104 | | ((ICollection<KeyValuePair<string, StructuredLogSubscription>>)_subscriptions).Remove(entry); |
| | 0 | 105 | | } |
| | | 106 | | |
| | | 107 | | private void OnSourceChanged(StructuredLogSource source) |
| | | 108 | | { |
| | 0 | 109 | | _ = BroadcastSourceChangedAsync(source); |
| | 0 | 110 | | } |
| | | 111 | | |
| | | 112 | | private async Task StreamLogEventsAsync(string connectionId, StructuredLogFilter filter, CancellationToken cancellat |
| | | 113 | | { |
| | 0 | 114 | | await foreach (var logEvent in _logProvider.SubscribeAsync(filter, cancellationToken)) |
| | 0 | 115 | | await _hubContext.Clients.Client(connectionId).ReceiveLogEventAsync(logEvent, cancellationToken); |
| | 0 | 116 | | } |
| | | 117 | | |
| | | 118 | | private async Task StreamWithDroppedEventsAsync(string connectionId, StructuredLogFilter filter, IStructuredLogStrea |
| | | 119 | | { |
| | 0 | 120 | | await foreach (var item in streamProvider.SubscribeWithDroppedEventsAsync(filter, cancellationToken)) |
| | | 121 | | { |
| | 0 | 122 | | if (item.LogEvent != null) |
| | 0 | 123 | | await _hubContext.Clients.Client(connectionId).ReceiveLogEventAsync(item.LogEvent, cancellationToken); |
| | | 124 | | |
| | 0 | 125 | | if (item.DroppedEvents != null) |
| | 0 | 126 | | await _hubContext.Clients.Client(connectionId).ReceiveDroppedEventsAsync(item.DroppedEvents, cancellatio |
| | 0 | 127 | | } |
| | 0 | 128 | | } |
| | | 129 | | |
| | | 130 | | private async Task BroadcastSourceChangedAsync(StructuredLogSource source) |
| | | 131 | | { |
| | | 132 | | try |
| | | 133 | | { |
| | 0 | 134 | | await _hubContext.Clients.All.ReceiveSourceChangedAsync(source); |
| | 0 | 135 | | } |
| | 0 | 136 | | catch (OperationCanceledException e) |
| | | 137 | | { |
| | 0 | 138 | | _logger.LogDebug(e, "Structured log source change broadcast for source {SourceId} was canceled", source.Id); |
| | 0 | 139 | | } |
| | 0 | 140 | | catch (Exception e) when (e is not OperationCanceledException) |
| | | 141 | | { |
| | 0 | 142 | | _logger.LogDebug(e, "Failed to broadcast structured log source change for source {SourceId}", source.Id); |
| | 0 | 143 | | } |
| | 0 | 144 | | } |
| | | 145 | | |
| | 0 | 146 | | private sealed record StructuredLogSubscription(CancellationTokenSource CancellationTokenSource); |
| | | 147 | | } |