< Summary

Information
Class: Elsa.Diagnostics.StructuredLogs.Providers.InMemory.InMemoryStructuredLogLiveFeed
Assembly: Elsa.Diagnostics.StructuredLogs
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.StructuredLogs/Providers/InMemory/InMemoryStructuredLogLiveFeed.cs
Line coverage
98%
Covered lines: 52
Uncovered lines: 1
Coverable lines: 53
Total lines: 110
Line coverage: 98.1%
Branch coverage
92%
Covered branches: 13
Total branches: 14
Branch coverage: 92.8%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
PublishAsync(...)100%22100%
SubscribeAsync()50%2290.9%
.ctor(...)100%11100%
get_Channel()100%11100%
TryWrite(...)100%44100%
MarkConsumed(...)100%22100%
QueueDroppedSummaryIfNeeded()100%44100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.StructuredLogs/Providers/InMemory/InMemoryStructuredLogLiveFeed.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using System.Threading.Channels;
 3using Elsa.Diagnostics.StructuredLogs.Contracts;
 4using Elsa.Diagnostics.StructuredLogs.Models;
 5using Elsa.Diagnostics.StructuredLogs.Options;
 6using Elsa.Diagnostics.StructuredLogs.Services;
 7using Microsoft.Extensions.Options;
 8
 9namespace Elsa.Diagnostics.StructuredLogs.Providers.InMemory;
 10
 811public class InMemoryStructuredLogLiveFeed(IOptions<StructuredLogsOptions> options) : IStructuredLogLiveFeed
 12{
 813    private readonly StructuredLogsOptions _options = options.Value;
 814    private readonly object _subscribersLock = new();
 815    private readonly Dictionary<Guid, StructuredLogSubscriber> _subscribers = new();
 16
 17    public ValueTask PublishAsync(StructuredLogEvent logEvent, CancellationToken cancellationToken = default)
 18    {
 19        List<StructuredLogSubscriber> subscribers;
 1520        lock (_subscribersLock)
 1521            subscribers = _subscribers.Values.ToList();
 22
 3823        foreach (var subscriber in subscribers)
 424            subscriber.TryWrite(logEvent, _options.SubscriberChannelCapacity);
 25
 1526        return ValueTask.CompletedTask;
 27    }
 28
 29    public async IAsyncEnumerable<StructuredLogStreamItem> SubscribeAsync(StructuredLogFilter filter, [EnumeratorCancell
 30    {
 231        var subscriberId = Guid.NewGuid();
 232        var subscriber = new StructuredLogSubscriber(filter);
 33
 234        lock (_subscribersLock)
 235            _subscribers[subscriberId] = subscriber;
 36
 37        try
 38        {
 839            await foreach (var item in subscriber.Channel.Reader.ReadAllAsync(cancellationToken))
 40            {
 341                subscriber.MarkConsumed(item);
 342                yield return item;
 43            }
 044        }
 45        finally
 46        {
 247            lock (_subscribersLock)
 248                _subscribers.Remove(subscriberId);
 49        }
 250    }
 51
 252    private sealed class StructuredLogSubscriber(StructuredLogFilter filter)
 53    {
 254        private readonly object _lock = new();
 55        private int _pendingItemCount;
 56        private long _droppedSinceLastSummary;
 57        private bool _summaryQueued;
 58
 759        public Channel<StructuredLogStreamItem> Channel { get; } = System.Threading.Channels.Channel.CreateUnbounded<Str
 260        {
 261            SingleReader = true,
 262            SingleWriter = false
 263        });
 64
 65        public void TryWrite(StructuredLogEvent logEvent, int capacity)
 66        {
 467            if (!StructuredLogFilterEvaluator.Matches(logEvent, filter))
 168                return;
 69
 370            lock (_lock)
 71            {
 372                if (_pendingItemCount >= capacity)
 73                {
 174                    _droppedSinceLastSummary++;
 175                    QueueDroppedSummaryIfNeeded();
 176                    return;
 77                }
 78
 279                Channel.Writer.TryWrite(StructuredLogStreamItem.FromLogEvent(logEvent));
 280                _pendingItemCount++;
 281            }
 382        }
 83
 84        public void MarkConsumed(StructuredLogStreamItem item)
 85        {
 386            lock (_lock)
 87            {
 388                _pendingItemCount = Math.Max(0, _pendingItemCount - 1);
 89
 390                if (item.DroppedEvents != null)
 91                {
 192                    _summaryQueued = false;
 193                    QueueDroppedSummaryIfNeeded();
 94                }
 395            }
 396        }
 97
 98        private void QueueDroppedSummaryIfNeeded()
 99        {
 2100            if (_summaryQueued || _droppedSinceLastSummary == 0)
 1101                return;
 102
 1103            var summary = new StructuredLogDroppedEventSummary(null, _droppedSinceLastSummary, "SubscriberChannelFull");
 1104            _droppedSinceLastSummary = 0;
 1105            _summaryQueued = true;
 1106            _pendingItemCount++;
 1107            Channel.Writer.TryWrite(StructuredLogStreamItem.FromDroppedEvents(summary));
 1108        }
 109    }
 110}