< Summary

Information
Class: Elsa.Diagnostics.StructuredLogs.RealTime.StructuredLogSubscriptionManager
Assembly: Elsa.Diagnostics.StructuredLogs
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.StructuredLogs/RealTime/StructuredLogSubscriptionManager.cs
Line coverage
0%
Covered lines: 0
Uncovered lines: 72
Coverable lines: 72
Total lines: 147
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 14
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
SubscribeAsync(...)100%210%
UpdateFilterAsync(...)100%210%
UnsubscribeAsync(...)100%210%
Dispose()0%620%
StreamAsync()0%620%
Unsubscribe(...)0%620%
Remove(...)100%210%
OnSourceChanged(...)100%210%
StreamLogEventsAsync()0%620%
StreamWithDroppedEventsAsync()0%4260%
BroadcastSourceChangedAsync()100%210%
get_CancellationTokenSource()100%210%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.StructuredLogs/RealTime/StructuredLogSubscriptionManager.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using Elsa.Diagnostics.StructuredLogs.Contracts;
 3using Elsa.Diagnostics.StructuredLogs.Models;
 4using Microsoft.AspNetCore.SignalR;
 5using Microsoft.Extensions.Logging;
 6
 7namespace Elsa.Diagnostics.StructuredLogs.RealTime;
 8
 9public class StructuredLogSubscriptionManager : IDisposable
 10{
 011    private readonly ConcurrentDictionary<string, StructuredLogSubscription> _subscriptions = new(StringComparer.Ordinal
 12    private readonly IStructuredLogProvider _logProvider;
 13    private readonly IStructuredLogSourceRegistry _sourceRegistry;
 14    private readonly IHubContext<StructuredLogsHub, IStructuredLogsClient> _hubContext;
 15    private readonly ILogger<StructuredLogSubscriptionManager> _logger;
 16
 017    public StructuredLogSubscriptionManager(
 018        IStructuredLogProvider logProvider,
 019        IStructuredLogSourceRegistry sourceRegistry,
 020        IHubContext<StructuredLogsHub, IStructuredLogsClient> hubContext,
 021        ILogger<StructuredLogSubscriptionManager> logger)
 22    {
 023        _logProvider = logProvider;
 024        _sourceRegistry = sourceRegistry;
 025        _hubContext = hubContext;
 026        _logger = logger;
 027        _sourceRegistry.SourceChanged += OnSourceChanged;
 028    }
 29
 30    public Task SubscribeAsync(string connectionId, StructuredLogFilter filter, CancellationToken cancellationToken)
 31    {
 032        Unsubscribe(connectionId);
 033        var subscriptionCancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 034        var subscription = new StructuredLogSubscription(subscriptionCancellation);
 035        _subscriptions[connectionId] = subscription;
 36
 037        _ = StreamAsync(connectionId, filter, subscription, subscriptionCancellation.Token);
 038        return Task.CompletedTask;
 39    }
 40
 41    public Task UpdateFilterAsync(string connectionId, StructuredLogFilter filter, CancellationToken cancellationToken)
 42    {
 043        return SubscribeAsync(connectionId, filter, cancellationToken);
 44    }
 45
 46    public Task UnsubscribeAsync(string connectionId)
 47    {
 048        Unsubscribe(connectionId);
 049        return Task.CompletedTask;
 50    }
 51
 52    public void Dispose()
 53    {
 054        _sourceRegistry.SourceChanged -= OnSourceChanged;
 55
 056        foreach (var subscription in _subscriptions.Values)
 57        {
 058            subscription.CancellationTokenSource.Cancel();
 059            subscription.CancellationTokenSource.Dispose();
 60        }
 61
 062        _subscriptions.Clear();
 063    }
 64
 65    private async Task StreamAsync(string connectionId, StructuredLogFilter filter, StructuredLogSubscription subscripti
 66    {
 67        try
 68        {
 069            if (_logProvider is IStructuredLogStreamProvider streamProvider)
 070                await StreamWithDroppedEventsAsync(connectionId, filter, streamProvider, cancellationToken);
 71            else
 072                await StreamLogEventsAsync(connectionId, filter, cancellationToken);
 073        }
 074        catch (OperationCanceledException e)
 75        {
 076            _logger.LogDebug(e, "Structured log subscription for connection {ConnectionId} was canceled", connectionId);
 077        }
 078        catch (HubException e)
 79        {
 080            _logger.LogWarning(e, "Structured log subscription for connection {ConnectionId} stopped unexpectedly", conn
 081        }
 082        catch (InvalidOperationException e)
 83        {
 084            _logger.LogWarning(e, "Structured log subscription for connection {ConnectionId} stopped unexpectedly", conn
 085        }
 86        finally
 87        {
 088            Remove(connectionId, subscription);
 89        }
 090    }
 91
 92    private void Unsubscribe(string connectionId)
 93    {
 094        if (!_subscriptions.TryRemove(connectionId, out var subscription))
 095            return;
 96
 097        subscription.CancellationTokenSource.Cancel();
 098        subscription.CancellationTokenSource.Dispose();
 099    }
 100
 101    private void Remove(string connectionId, StructuredLogSubscription subscription)
 102    {
 0103        var entry = new KeyValuePair<string, StructuredLogSubscription>(connectionId, subscription);
 0104        ((ICollection<KeyValuePair<string, StructuredLogSubscription>>)_subscriptions).Remove(entry);
 0105    }
 106
 107    private void OnSourceChanged(StructuredLogSource source)
 108    {
 0109        _ = BroadcastSourceChangedAsync(source);
 0110    }
 111
 112    private async Task StreamLogEventsAsync(string connectionId, StructuredLogFilter filter, CancellationToken cancellat
 113    {
 0114        await foreach (var logEvent in _logProvider.SubscribeAsync(filter, cancellationToken))
 0115            await _hubContext.Clients.Client(connectionId).ReceiveLogEventAsync(logEvent, cancellationToken);
 0116    }
 117
 118    private async Task StreamWithDroppedEventsAsync(string connectionId, StructuredLogFilter filter, IStructuredLogStrea
 119    {
 0120        await foreach (var item in streamProvider.SubscribeWithDroppedEventsAsync(filter, cancellationToken))
 121        {
 0122            if (item.LogEvent != null)
 0123                await _hubContext.Clients.Client(connectionId).ReceiveLogEventAsync(item.LogEvent, cancellationToken);
 124
 0125            if (item.DroppedEvents != null)
 0126                await _hubContext.Clients.Client(connectionId).ReceiveDroppedEventsAsync(item.DroppedEvents, cancellatio
 0127        }
 0128    }
 129
 130    private async Task BroadcastSourceChangedAsync(StructuredLogSource source)
 131    {
 132        try
 133        {
 0134            await _hubContext.Clients.All.ReceiveSourceChangedAsync(source);
 0135        }
 0136        catch (OperationCanceledException e)
 137        {
 0138            _logger.LogDebug(e, "Structured log source change broadcast for source {SourceId} was canceled", source.Id);
 0139        }
 0140        catch (Exception e) when (e is not OperationCanceledException)
 141        {
 0142            _logger.LogDebug(e, "Failed to broadcast structured log source change for source {SourceId}", source.Id);
 0143        }
 0144    }
 145
 0146    private sealed record StructuredLogSubscription(CancellationTokenSource CancellationTokenSource);
 147}