< Summary

Information
Class: Elsa.Diagnostics.StructuredLogs.Providers.InMemory.InMemoryStructuredLogProvider
Assembly: Elsa.Diagnostics.StructuredLogs
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.StructuredLogs/Providers/InMemory/InMemoryStructuredLogProvider.cs
Line coverage
97%
Covered lines: 73
Uncovered lines: 2
Coverable lines: 75
Total lines: 153
Line coverage: 97.3%
Branch coverage
90%
Covered branches: 18
Total branches: 20
Branch coverage: 90%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
PublishAsync(...)100%22100%
GetRecentAsync(...)100%22100%
SubscribeAsync()75%44100%
SubscribeWithDroppedEventsAsync()50%2290.9%
ListSourcesAsync(...)100%210%
.ctor(...)100%11100%
get_Channel()100%11100%
TryWrite(...)100%44100%
MarkConsumed(...)100%22100%
QueueDroppedSummaryIfNeeded()100%44100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.StructuredLogs/Providers/InMemory/InMemoryStructuredLogProvider.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using System.Threading.Channels;
 3using Elsa.Diagnostics.StructuredLogs.Contracts;
 4using Elsa.Diagnostics.StructuredLogs.Models;
 5using Elsa.Diagnostics.StructuredLogs.Options;
 6using Elsa.Diagnostics.StructuredLogs.Services;
 7using Microsoft.Extensions.Options;
 8
 9namespace Elsa.Diagnostics.StructuredLogs.Providers.InMemory;
 10
 11public class InMemoryStructuredLogProvider : IStructuredLogStreamProvider
 12{
 13    private readonly RingBuffer<StructuredLogEvent> _recentLogs;
 14    private readonly IStructuredLogSourceRegistry _sourceRegistry;
 15    private readonly StructuredLogsOptions _options;
 616    private readonly object _subscribersLock = new();
 617    private readonly Dictionary<Guid, StructuredLogSubscriber> _subscribers = new();
 18
 619    public InMemoryStructuredLogProvider(IOptions<StructuredLogsOptions> options, IStructuredLogSourceRegistry sourceReg
 20    {
 621        _options = options.Value;
 622        _sourceRegistry = sourceRegistry;
 623        _recentLogs = new(_options.RecentLogCapacity);
 624    }
 25
 26    public ValueTask PublishAsync(StructuredLogEvent logEvent, CancellationToken cancellationToken = default)
 27    {
 1528        _recentLogs.Add(logEvent);
 1529        _sourceRegistry.MarkSeen(logEvent.SourceId, logEvent.ReceivedAt);
 30
 31        List<StructuredLogSubscriber> subscribers;
 1532        lock (_subscribersLock)
 1533            subscribers = _subscribers.Values.ToList();
 34
 3835        foreach (var subscriber in subscribers)
 436            subscriber.TryWrite(logEvent, _options.SubscriberChannelCapacity);
 37
 1538        return ValueTask.CompletedTask;
 39    }
 40
 41    public ValueTask<RecentStructuredLogsResult> GetRecentAsync(StructuredLogFilter filter, CancellationToken cancellati
 42    {
 443        var take = Math.Clamp(filter.Take ?? _options.MaxRecentLogQuerySize, 0, _options.MaxRecentLogQuerySize);
 444        var items = _recentLogs
 445            .Snapshot()
 1046            .Where(x => StructuredLogFilterEvaluator.Matches(x, filter))
 847            .OrderBy(x => x.Timestamp)
 848            .ThenBy(x => x.ReceivedAt)
 849            .ThenBy(x => x.SourceId, StringComparer.OrdinalIgnoreCase)
 850            .ThenBy(x => x.Sequence)
 851            .ThenBy(x => x.Id, StringComparer.Ordinal)
 452            .TakeLast(take)
 453            .ToList();
 54
 455        return ValueTask.FromResult(new RecentStructuredLogsResult(items, _recentLogs.DroppedCount));
 56    }
 57
 58    public async IAsyncEnumerable<StructuredLogEvent> SubscribeAsync(StructuredLogFilter filter, [EnumeratorCancellation
 59    {
 360        await foreach (var item in SubscribeWithDroppedEventsAsync(filter, cancellationToken))
 61        {
 162            if (item.LogEvent != null)
 163                yield return item.LogEvent;
 64        }
 165    }
 66
 67    public async IAsyncEnumerable<StructuredLogStreamItem> SubscribeWithDroppedEventsAsync(StructuredLogFilter filter, [
 68    {
 269        var subscriberId = Guid.NewGuid();
 270        var subscriber = new StructuredLogSubscriber(filter);
 71
 272        lock (_subscribersLock)
 273            _subscribers[subscriberId] = subscriber;
 74
 75        try
 76        {
 877            await foreach (var item in subscriber.Channel.Reader.ReadAllAsync(cancellationToken))
 78            {
 379                subscriber.MarkConsumed(item);
 380                yield return item;
 81            }
 082        }
 83        finally
 84        {
 285            lock (_subscribersLock)
 286                _subscribers.Remove(subscriberId);
 87        }
 288    }
 89
 90    public ValueTask<IReadOnlyCollection<StructuredLogSource>> ListSourcesAsync(CancellationToken cancellationToken = de
 91    {
 092        return ValueTask.FromResult(_sourceRegistry.List());
 93    }
 94
 295    private sealed class StructuredLogSubscriber(StructuredLogFilter filter)
 96    {
 297        private readonly object _lock = new();
 98        private int _pendingItemCount;
 99        private long _droppedSinceLastSummary;
 100        private bool _summaryQueued;
 101
 7102        public Channel<StructuredLogStreamItem> Channel { get; } = System.Threading.Channels.Channel.CreateUnbounded<Str
 2103        {
 2104            SingleReader = true,
 2105            SingleWriter = false
 2106        });
 107
 108        public void TryWrite(StructuredLogEvent logEvent, int capacity)
 109        {
 4110            if (!StructuredLogFilterEvaluator.Matches(logEvent, filter))
 1111                return;
 112
 3113            lock (_lock)
 114            {
 3115                if (_pendingItemCount >= capacity)
 116                {
 1117                    _droppedSinceLastSummary++;
 1118                    QueueDroppedSummaryIfNeeded();
 1119                    return;
 120                }
 121
 2122                Channel.Writer.TryWrite(StructuredLogStreamItem.FromLogEvent(logEvent));
 2123                _pendingItemCount++;
 2124            }
 3125        }
 126
 127        public void MarkConsumed(StructuredLogStreamItem item)
 128        {
 3129            lock (_lock)
 130            {
 3131                _pendingItemCount = Math.Max(0, _pendingItemCount - 1);
 132
 3133                if (item.DroppedEvents != null)
 134                {
 1135                    _summaryQueued = false;
 1136                    QueueDroppedSummaryIfNeeded();
 137                }
 3138            }
 3139        }
 140
 141        private void QueueDroppedSummaryIfNeeded()
 142        {
 2143            if (_summaryQueued || _droppedSinceLastSummary == 0)
 1144                return;
 145
 1146            var summary = new StructuredLogDroppedEventSummary(null, _droppedSinceLastSummary, "SubscriberChannelFull");
 1147            _droppedSinceLastSummary = 0;
 1148            _summaryQueued = true;
 1149            _pendingItemCount++;
 1150            Channel.Writer.TryWrite(StructuredLogStreamItem.FromDroppedEvents(summary));
 1151        }
 152    }
 153}