< Summary

Information
Class: Elsa.Diagnostics.OpenTelemetry.RealTime.OpenTelemetrySubscriptionManager
Assembly: Elsa.Diagnostics.OpenTelemetry
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.OpenTelemetry/RealTime/OpenTelemetrySubscriptionManager.cs
Line coverage
58%
Covered lines: 23
Uncovered lines: 16
Coverable lines: 39
Total lines: 82
Line coverage: 58.9%
Branch coverage
62%
Covered branches: 5
Total branches: 8
Branch coverage: 62.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
SubscribeAsync(...)100%11100%
UnsubscribeAsync(...)100%210%
Dispose()0%620%
StreamAsync()100%3245.45%
Unsubscribe(...)50%3240%
Remove(...)100%22100%
get_Filter()100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.OpenTelemetry/RealTime/OpenTelemetrySubscriptionManager.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using Elsa.Diagnostics.OpenTelemetry.Contracts;
 3using Elsa.Diagnostics.OpenTelemetry.Models;
 4using Microsoft.AspNetCore.SignalR;
 5using Microsoft.Extensions.Logging;
 6
 7namespace Elsa.Diagnostics.OpenTelemetry.RealTime;
 8
 59public sealed class OpenTelemetrySubscriptionManager(
 510    IOpenTelemetryLiveFeed liveFeed,
 511    IHubContext<OpenTelemetryHub, IOpenTelemetryClient> hubContext,
 512    ILogger<OpenTelemetrySubscriptionManager> logger) : IDisposable
 13{
 514    private readonly ConcurrentDictionary<string, OpenTelemetrySubscription> _subscriptions = new(StringComparer.Ordinal
 15
 16    public Task SubscribeAsync(string connectionId, OpenTelemetryTraceFilter filter, CancellationToken cancellationToken
 17    {
 318        Unsubscribe(connectionId);
 319        var subscriptionCancellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 320        var subscription = new OpenTelemetrySubscription(filter, subscriptionCancellation);
 321        _subscriptions[connectionId] = subscription;
 22
 323        _ = StreamAsync(connectionId, filter, subscription, subscriptionCancellation.Token);
 324        return Task.CompletedTask;
 25    }
 26
 27    public Task UnsubscribeAsync(string connectionId)
 28    {
 029        Unsubscribe(connectionId);
 030        return Task.CompletedTask;
 31    }
 32
 33    public void Dispose()
 34    {
 035        foreach (var subscription in _subscriptions.Values)
 36        {
 037            subscription.CancellationTokenSource.Cancel();
 038            subscription.CancellationTokenSource.Dispose();
 39        }
 40
 041        _subscriptions.Clear();
 042    }
 43
 44    private async Task StreamAsync(string connectionId, OpenTelemetryTraceFilter filter, OpenTelemetrySubscription subsc
 45    {
 46        try
 47        {
 1248            await foreach (var item in liveFeed.SubscribeAsync(filter, cancellationToken).ConfigureAwait(false))
 349                await hubContext.Clients.Client(connectionId).ReceiveAsync(item).ConfigureAwait(false);
 350        }
 051        catch (OperationCanceledException e)
 52        {
 053            logger.LogDebug(e, "OpenTelemetry subscription for connection {ConnectionId} was canceled", connectionId);
 054        }
 055        catch (Exception e) when (e is not OperationCanceledException)
 56        {
 057            logger.LogWarning(e, "OpenTelemetry subscription for connection {ConnectionId} stopped unexpectedly", connec
 058        }
 59        finally
 60        {
 361            Remove(connectionId, subscription);
 62        }
 363    }
 64
 65    private void Unsubscribe(string connectionId)
 66    {
 367        if (!_subscriptions.TryRemove(connectionId, out var subscription))
 368            return;
 69
 070        subscription.CancellationTokenSource.Cancel();
 071        subscription.CancellationTokenSource.Dispose();
 072    }
 73
 74    private void Remove(string connectionId, OpenTelemetrySubscription subscription)
 75    {
 376        var entry = new KeyValuePair<string, OpenTelemetrySubscription>(connectionId, subscription);
 377        if (((ICollection<KeyValuePair<string, OpenTelemetrySubscription>>)_subscriptions).Remove(entry))
 378            subscription.CancellationTokenSource.Dispose();
 379    }
 80
 681    private sealed record OpenTelemetrySubscription(OpenTelemetryTraceFilter Filter, CancellationTokenSource Cancellatio
 82}