< Summary

Information
Class: Elsa.Diagnostics.ConsoleLogs.RealTime.ElsaConsoleLogSubscriptionManager
Assembly: Elsa.Diagnostics.ConsoleLogs
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.ConsoleLogs/RealTime/ElsaConsoleLogSubscriptionManager.cs
Line coverage
80%
Covered lines: 67
Uncovered lines: 16
Coverable lines: 83
Total lines: 184
Line coverage: 80.7%
Branch coverage
81%
Covered branches: 18
Total branches: 22
Branch coverage: 81.8%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
SubscribeAsync(...)100%11100%
UpdateFilterAsync(...)100%210%
UnsubscribeAsync(...)100%11100%
Dispose()100%22100%
StreamAsync()87.5%9880%
Unsubscribe(...)50%3240%
Remove(...)100%22100%
OnSourceChanged(...)100%11100%
BroadcastSourceChangedAsync()100%7445.45%
MatchesSource(...)50%22100%
GetSourceSignature(...)50%2283.33%
get_Filter()100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.ConsoleLogs/RealTime/ElsaConsoleLogSubscriptionManager.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using ConsoleLogStreaming.Core;
 3using Elsa.Diagnostics.ConsoleLogs.Services;
 4using Microsoft.AspNetCore.SignalR;
 5using Microsoft.Extensions.Logging;
 6namespace Elsa.Diagnostics.ConsoleLogs.RealTime;
 7
 8/// <summary>
 9/// Manages pushed SignalR console log subscriptions.
 10/// </summary>
 11public sealed class ElsaConsoleLogSubscriptionManager : IDisposable
 12{
 1213    private readonly ConcurrentDictionary<string, ConsoleLogSubscription> _subscriptions = new(StringComparer.Ordinal);
 14    private readonly IConsoleLogProvider _provider;
 15    private readonly IConsoleLogSourceRegistry _sourceRegistry;
 16    private readonly IHubContext<ElsaConsoleLogsHub, IElsaConsoleLogsClient> _hubContext;
 17    private readonly ILogger<ElsaConsoleLogSubscriptionManager> _logger;
 18
 19    /// <summary>
 20    /// Initializes a new instance of the subscription manager.
 21    /// </summary>
 1222    public ElsaConsoleLogSubscriptionManager(
 1223        IConsoleLogProvider provider,
 1224        IConsoleLogSourceRegistry sourceRegistry,
 1225        IHubContext<ElsaConsoleLogsHub, IElsaConsoleLogsClient> hubContext,
 1226        ILogger<ElsaConsoleLogSubscriptionManager> logger)
 27    {
 1228        _provider = provider;
 1229        _sourceRegistry = sourceRegistry;
 1230        _hubContext = hubContext;
 1231        _logger = logger;
 1232        _sourceRegistry.SourceChanged += OnSourceChanged;
 1233    }
 34
 35    /// <summary>
 36    /// Starts a pushed subscription for a connection.
 37    /// </summary>
 38    public Task SubscribeAsync(string connectionId, ElsaConsoleLogFilter filter, CancellationToken cancellationToken)
 39    {
 740        Unsubscribe(connectionId);
 741        var subscriptionCancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 742        var subscription = new ConsoleLogSubscription(filter, subscriptionCancellation);
 743        _subscriptions[connectionId] = subscription;
 44
 745        _ = StreamAsync(connectionId, filter, subscription, subscriptionCancellation.Token);
 746        return Task.CompletedTask;
 47    }
 48
 49    /// <summary>
 50    /// Replaces the active pushed subscription filter.
 51    /// </summary>
 52    public Task UpdateFilterAsync(string connectionId, ElsaConsoleLogFilter filter, CancellationToken cancellationToken)
 53    {
 054        return SubscribeAsync(connectionId, filter, cancellationToken);
 55    }
 56
 57    /// <summary>
 58    /// Removes a pushed subscription.
 59    /// </summary>
 60    public Task UnsubscribeAsync(string connectionId)
 61    {
 162        Unsubscribe(connectionId);
 163        return Task.CompletedTask;
 64    }
 65
 66    /// <inheritdoc />
 67    public void Dispose()
 68    {
 469        _sourceRegistry.SourceChanged -= OnSourceChanged;
 70
 1271        foreach (var subscription in _subscriptions.Values)
 72        {
 273            subscription.CancellationTokenSource.Cancel();
 274            subscription.CancellationTokenSource.Dispose();
 75        }
 76
 477        _subscriptions.Clear();
 478    }
 79
 80    private async Task StreamAsync(string connectionId, ElsaConsoleLogFilter filter, ConsoleLogSubscription subscription
 81    {
 82        try
 83        {
 784            string? lastSourceSignature = null;
 85
 2086            await foreach (var item in _provider.SubscribeAsync(ConsoleLogFilterMapper.ToStreamingFilter(filter), cancel
 87            {
 388                if (item.Line != null)
 89                {
 390                    await _hubContext.Clients.Client(connectionId).ReceiveConsoleLogLineAsync(item.Line, cancellationTok
 91
 392                    var sourceSignature = GetSourceSignature(item.Line.Source);
 393                    if (!string.Equals(sourceSignature, lastSourceSignature, StringComparison.Ordinal))
 94                    {
 295                        lastSourceSignature = sourceSignature;
 296                        await _hubContext.Clients.Client(connectionId).ReceiveSourceChangedAsync(item.Line.Source, cance
 97                    }
 98                }
 99
 3100                if (item.Dropped != null)
 0101                    await _hubContext.Clients.Client(connectionId).ReceiveDroppedLinesAsync(item.Dropped, cancellationTo
 3102            }
 6103        }
 1104        catch (OperationCanceledException e)
 105        {
 1106            _logger.LogDebug(e, "Console log subscription for connection {ConnectionId} was canceled", connectionId);
 1107        }
 0108        catch (Exception e) when (e is not OperationCanceledException)
 109        {
 0110            _logger.LogWarning(e, "Console log subscription for connection {ConnectionId} stopped unexpectedly", connect
 0111        }
 112        finally
 113        {
 7114            Remove(connectionId, subscription);
 115        }
 7116    }
 117
 118    private void Unsubscribe(string connectionId)
 119    {
 8120        if (!_subscriptions.TryRemove(connectionId, out var subscription))
 8121            return;
 122
 0123        subscription.CancellationTokenSource.Cancel();
 0124        subscription.CancellationTokenSource.Dispose();
 0125    }
 126
 127    private void Remove(string connectionId, ConsoleLogSubscription subscription)
 128    {
 7129        var entry = new KeyValuePair<string, ConsoleLogSubscription>(connectionId, subscription);
 7130        if (((ICollection<KeyValuePair<string, ConsoleLogSubscription>>)_subscriptions).Remove(entry))
 5131            subscription.CancellationTokenSource.Dispose();
 7132    }
 133
 134    private void OnSourceChanged(ConsoleLogSource source)
 135    {
 1136        _ = BroadcastSourceChangedAsync(source, _subscriptions.ToArray());
 1137    }
 138
 139    private async Task BroadcastSourceChangedAsync(ConsoleLogSource source, IReadOnlyCollection<KeyValuePair<string, Con
 140    {
 141        try
 142        {
 4143            foreach (var (connectionId, subscription) in subscriptions)
 144            {
 1145                if (!MatchesSource(source, subscription.Filter))
 146                    continue;
 147
 1148                await _hubContext.Clients.Client(connectionId).ReceiveSourceChangedAsync(source, subscription.Cancellati
 149            }
 1150        }
 0151        catch (OperationCanceledException e)
 152        {
 0153            _logger.LogDebug(e, "Console log source change broadcast for source {SourceId} was canceled", source.Id);
 0154        }
 0155        catch (Exception e) when (e is not OperationCanceledException)
 156        {
 0157            _logger.LogDebug(e, "Failed to broadcast console log source change for source {SourceId}", source.Id);
 0158        }
 1159    }
 160
 161    private static bool MatchesSource(ConsoleLogSource source, ElsaConsoleLogFilter filter)
 162    {
 1163        return string.IsNullOrWhiteSpace(filter.SourceId) || string.Equals(source.Id, filter.SourceId, StringComparison.
 164    }
 165
 166    private static string GetSourceSignature(ConsoleLogSource source)
 167    {
 3168        var metadata = source.Metadata
 0169            .OrderBy(x => x.Key, StringComparer.Ordinal)
 0170            .ThenBy(x => x.Value, StringComparer.Ordinal)
 3171            .Select(x => $"{x.Key}={x.Value}");
 172
 3173        return string.Join('\u001f',
 3174            source.Id,
 3175            source.DisplayName,
 3176            source.ServiceName,
 3177            source.ProcessId?.ToString(),
 3178            source.MachineName,
 3179            source.Health.ToString(),
 3180            string.Join('\u001e', metadata));
 181    }
 182
 18183    private sealed record ConsoleLogSubscription(ElsaConsoleLogFilter Filter, CancellationTokenSource CancellationTokenS
 184}