< Summary

Information
Class: Elsa.Diagnostics.StructuredLogs.Persistence.Relational.Services.StructuredLogWriteBuffer
Assembly: Elsa.Diagnostics.StructuredLogs.Persistence.Relational
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.StructuredLogs.Persistence.Relational/Services/StructuredLogWriteBuffer.cs
Line coverage
25%
Covered lines: 31
Uncovered lines: 92
Coverable lines: 123
Total lines: 250
Line coverage: 25.2%
Branch coverage
21%
Covered branches: 8
Total branches: 38
Branch coverage: 21%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_DroppedWriteCount()100%210%
WriteAsync(...)0%2040%
WriteManyAsync()0%620%
QueryAsync(...)100%210%
ListSourcesAsync(...)100%210%
StartAsync(...)0%4260%
StopAsync()0%4260%
FlushAsync()50%3236.36%
DisposeAsync()83.33%8663.63%
ProcessQueueAsync()0%4260%
DequeueBatch()50%4485.71%
CountPendingWritesAsDropped()0%620%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.StructuredLogs.Persistence.Relational/Services/StructuredLogWriteBuffer.cs

#LineLine coverage
 1using System.Diagnostics;
 2using Elsa.Diagnostics.StructuredLogs.Contracts;
 3using Elsa.Diagnostics.StructuredLogs.Models;
 4using Elsa.Diagnostics.StructuredLogs.Persistence.Relational.Contracts;
 5using Elsa.Diagnostics.StructuredLogs.Persistence.Relational.Options;
 6using Elsa.Diagnostics.StructuredLogs.Persistence.Relational.Stores;
 7using Microsoft.Extensions.Hosting;
 8using Microsoft.Extensions.Options;
 9
 10namespace Elsa.Diagnostics.StructuredLogs.Persistence.Relational.Services;
 11
 112public class StructuredLogWriteBuffer(
 113    RelationalStructuredLogStore store,
 114    IOptions<RelationalStructuredLogOptions> options) : IStructuredLogStore, IStructuredLogWriteBuffer, IHostedService, 
 15{
 116    private readonly object _lifecycleLock = new();
 117    private readonly Queue<StructuredLogEvent> _queue = new();
 118    private readonly SemaphoreSlim _signal = new(0);
 119    private CancellationTokenSource _stopTokenSource = new();
 20    private Task? _backgroundTask;
 21    private long _droppedWriteCount;
 22    private int _activeStartCount;
 23    private int _disposed;
 24
 025    public long DroppedWriteCount => Interlocked.Read(ref _droppedWriteCount);
 26
 27    public ValueTask WriteAsync(StructuredLogEvent logEvent, CancellationToken cancellationToken = default)
 28    {
 029        var shouldSignal = false;
 30
 031        lock (_queue)
 32        {
 033            if (_queue.Count >= Math.Max(1, options.Value.WriteQueue.Capacity))
 34            {
 035                Interlocked.Increment(ref _droppedWriteCount);
 036                return ValueTask.CompletedTask;
 37            }
 38
 039            shouldSignal = _queue.Count == 0;
 040            _queue.Enqueue(logEvent);
 041        }
 42
 043        if (shouldSignal)
 044            _signal.Release();
 45
 046        return ValueTask.CompletedTask;
 047    }
 48
 49    public async ValueTask WriteManyAsync(IReadOnlyCollection<StructuredLogEvent> logEvents, CancellationToken cancellat
 50    {
 051        foreach (var logEvent in logEvents)
 052            await WriteAsync(logEvent, cancellationToken);
 053    }
 54
 55    public ValueTask<RecentStructuredLogsResult> QueryAsync(StructuredLogFilter filter, CancellationToken cancellationTo
 56    {
 057        return store.QueryAsync(filter, cancellationToken);
 58    }
 59
 60    public ValueTask<IReadOnlyCollection<StructuredLogSource>> ListSourcesAsync(CancellationToken cancellationToken = de
 61    {
 062        return store.ListSourcesAsync(cancellationToken);
 63    }
 64
 65    public Task StartAsync(CancellationToken cancellationToken)
 66    {
 067        lock (_lifecycleLock)
 68        {
 069            _activeStartCount++;
 70
 071            if (_backgroundTask is { IsCompleted: false })
 072                return Task.CompletedTask;
 73
 074            if (_stopTokenSource.IsCancellationRequested)
 75            {
 076                _stopTokenSource.Dispose();
 077                _stopTokenSource = new();
 78            }
 79
 080            var stopToken = _stopTokenSource.Token;
 081            _backgroundTask = Task.Run(() => ProcessQueueAsync(stopToken), CancellationToken.None);
 082        }
 83
 084        return Task.CompletedTask;
 085    }
 86
 87    public async Task StopAsync(CancellationToken cancellationToken)
 88    {
 89        Task? backgroundTask;
 90        CancellationTokenSource stopTokenSource;
 91
 092        lock (_lifecycleLock)
 93        {
 094            if (_activeStartCount == 0)
 095                return;
 96
 097            _activeStartCount--;
 98
 099            if (_activeStartCount > 0)
 0100                return;
 101
 0102            backgroundTask = _backgroundTask;
 0103            stopTokenSource = _stopTokenSource;
 0104        }
 105
 0106        await stopTokenSource.CancelAsync();
 107
 0108        if (backgroundTask != null)
 109        {
 110            try
 111            {
 0112                await backgroundTask.WaitAsync(cancellationToken);
 0113            }
 0114            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested || stopTokenSource.IsCanc
 115            {
 116                // Expected during shutdown; remaining queued writes are flushed below.
 0117            }
 118        }
 119
 0120        using var timeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 0121        timeoutTokenSource.CancelAfter(options.Value.WriteQueue.ShutdownFlushTimeout);
 122        try
 123        {
 0124            await FlushAsync(timeoutTokenSource.Token);
 0125        }
 0126        catch (OperationCanceledException) when (timeoutTokenSource.IsCancellationRequested)
 127        {
 0128            CountPendingWritesAsDropped();
 0129        }
 0130    }
 131
 132    public async ValueTask FlushAsync(CancellationToken cancellationToken = default)
 133    {
 0134        while (true)
 135        {
 1136            var batch = DequeueBatch();
 1137            if (batch.Count == 0)
 1138                return;
 139
 140            try
 141            {
 0142                await store.WriteManyAsync(batch, cancellationToken);
 0143            }
 0144            catch
 145            {
 0146                Interlocked.Add(ref _droppedWriteCount, batch.Count);
 0147                throw;
 148            }
 0149        }
 1150    }
 151
 152    public async ValueTask DisposeAsync()
 153    {
 2154        if (Interlocked.Exchange(ref _disposed, 1) == 1)
 1155            return;
 156
 157        Task? backgroundTask;
 158        CancellationTokenSource stopTokenSource;
 159
 1160        lock (_lifecycleLock)
 161        {
 1162            backgroundTask = _backgroundTask;
 1163            stopTokenSource = _stopTokenSource;
 1164        }
 165
 1166        await stopTokenSource.CancelAsync();
 1167        using var timeoutTokenSource = new CancellationTokenSource(options.Value.WriteQueue.ShutdownFlushTimeout);
 168
 1169        if (backgroundTask != null)
 170        {
 171            try
 172            {
 0173                await backgroundTask.WaitAsync(timeoutTokenSource.Token);
 0174            }
 0175            catch (OperationCanceledException) when (timeoutTokenSource.IsCancellationRequested || stopTokenSource.IsCan
 176            {
 0177                CountPendingWritesAsDropped();
 0178            }
 179        }
 180
 181        try
 182        {
 1183            await FlushAsync(timeoutTokenSource.Token);
 1184        }
 0185        catch (OperationCanceledException) when (timeoutTokenSource.IsCancellationRequested)
 186        {
 0187            CountPendingWritesAsDropped();
 0188        }
 189
 1190        _signal.Dispose();
 1191        stopTokenSource.Dispose();
 2192    }
 193
 194    private async Task ProcessQueueAsync(CancellationToken cancellationToken)
 195    {
 0196        while (!cancellationToken.IsCancellationRequested)
 197        {
 198            try
 199            {
 0200                await _signal.WaitAsync(options.Value.WriteQueue.FlushInterval, cancellationToken);
 201
 0202                if (cancellationToken.IsCancellationRequested)
 0203                    return;
 204
 0205                await FlushAsync();
 0206            }
 0207            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 208            {
 0209                return;
 210            }
 0211            catch (ObjectDisposedException) when (cancellationToken.IsCancellationRequested)
 212            {
 0213                return;
 214            }
 0215            catch (Exception e)
 216            {
 0217                Trace.TraceError("Failed to flush structured log writes: {0}", e);
 218
 0219                if (cancellationToken.IsCancellationRequested)
 0220                    return;
 0221            }
 222        }
 0223    }
 224
 225    private IReadOnlyCollection<StructuredLogEvent> DequeueBatch()
 226    {
 1227        var batchSize = Math.Max(1, options.Value.WriteQueue.BatchSize);
 1228        var batch = new List<StructuredLogEvent>(batchSize);
 229
 1230        lock (_queue)
 231        {
 1232            while (_queue.Count > 0 && batch.Count < batchSize)
 0233                batch.Add(_queue.Dequeue());
 1234        }
 235
 1236        return batch;
 237    }
 238
 239    private void CountPendingWritesAsDropped()
 240    {
 0241        lock (_queue)
 242        {
 0243            if (_queue.Count == 0)
 0244                return;
 245
 0246            Interlocked.Add(ref _droppedWriteCount, _queue.Count);
 0247            _queue.Clear();
 0248        }
 0249    }
 250}