< Summary

Information
Class: Elsa.Diagnostics.OpenTelemetry.Providers.InMemory.InMemoryOpenTelemetryLiveFeed
Assembly: Elsa.Diagnostics.OpenTelemetry
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.OpenTelemetry/Providers/InMemory/InMemoryOpenTelemetryLiveFeed.cs
Line coverage
76%
Covered lines: 121
Uncovered lines: 38
Coverable lines: 159
Total lines: 293
Line coverage: 76.1%
Branch coverage
55%
Covered branches: 106
Total branches: 192
Branch coverage: 55.2%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
PublishAsync(...)100%22100%
SubscribeAsync()50%2290%
.ctor(...)100%11100%
get_Channel()100%11100%
TryWrite(...)100%1414100%
TryWrite(...)50%2287.5%
MarkRead(...)100%22100%
EnsureCapacityForWrite()75%4485.71%
TrackDrop(...)50%4483.33%
TryWriteDroppedSummary()100%66100%
GetSignalType(...)12.5%31828.57%
MatchesResource(...)50%1043258.82%
MatchesTrace(...)55.26%1023864.7%
MatchesLog(...)50%1153458.82%
MatchesMetricPoint(...)50%1273658.82%
ResolveServiceResourceIds(...)100%22100%
AttributeMatches(...)0%620%
EqualsIgnoreCase(...)100%11100%
Matches(...)0%2040%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Diagnostics.OpenTelemetry/Providers/InMemory/InMemoryOpenTelemetryLiveFeed.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using System.Threading.Channels;
 3using Elsa.Diagnostics.OpenTelemetry.Contracts;
 4using Elsa.Diagnostics.OpenTelemetry.Models;
 5using Elsa.Diagnostics.OpenTelemetry.Options;
 6using Microsoft.Extensions.Options;
 7
 8namespace Elsa.Diagnostics.OpenTelemetry.Providers.InMemory;
 9
 1010public class InMemoryOpenTelemetryLiveFeed(IOptions<OpenTelemetryDiagnosticsOptions> options) : IOpenTelemetryLiveFeed
 11{
 1012    private readonly OpenTelemetryDiagnosticsOptions _options = options.Value;
 1013    private readonly object _subscribersLock = new();
 1014    private readonly List<OpenTelemetrySubscriber> _subscribers = [];
 15
 16    public ValueTask PublishAsync(OpenTelemetryBatch batch, CancellationToken cancellationToken = default)
 17    {
 18        List<OpenTelemetrySubscriber> subscribers;
 819        lock (_subscribersLock)
 820            subscribers = _subscribers.ToList();
 21
 2622        foreach (var subscriber in subscribers)
 523            subscriber.TryWrite(batch);
 24
 825        return ValueTask.CompletedTask;
 26    }
 27
 28    public async IAsyncEnumerable<OpenTelemetryStreamItem> SubscribeAsync(OpenTelemetryTraceFilter filter, [EnumeratorCa
 29    {
 430        var subscriber = new OpenTelemetrySubscriber(filter, _options.SubscriberChannelCapacity);
 31
 432        lock (_subscribersLock)
 433            _subscribers.Add(subscriber);
 34
 35        try
 36        {
 1837            await foreach (var item in subscriber.Channel.Reader.ReadAllAsync(cancellationToken))
 38            {
 739                subscriber.MarkRead(item);
 740                yield return item;
 41            }
 042        }
 43        finally
 44        {
 445            lock (_subscribersLock)
 446                _subscribers.Remove(subscriber);
 47        }
 448    }
 49
 450    private sealed class OpenTelemetrySubscriber(OpenTelemetryTraceFilter filter, int channelCapacity)
 51    {
 452        private readonly object _lock = new();
 453        private readonly int _capacity = Math.Max(1, channelCapacity);
 54        private int _pendingItemCount;
 455        private readonly Dictionary<OpenTelemetrySignalType, long> _droppedSinceLastSummary = [];
 56
 1957        public Channel<OpenTelemetryStreamItem> Channel { get; } = System.Threading.Channels.Channel.CreateBounded<OpenT
 458        {
 459            SingleReader = true,
 460            SingleWriter = false,
 461            FullMode = BoundedChannelFullMode.Wait
 462        });
 63
 64        public void TryWrite(OpenTelemetryBatch batch)
 65        {
 566            var serviceResourceIds = ResolveServiceResourceIds(batch.Resources);
 67
 1268            foreach (var resource in batch.Resources.Where(MatchesResource))
 169                TryWrite(new OpenTelemetryStreamItem { Resource = resource });
 70
 2771            foreach (var trace in batch.Traces.Where(trace => MatchesTrace(trace, serviceResourceIds)))
 572                TryWrite(new OpenTelemetryStreamItem { Trace = trace });
 73
 1474            foreach (var log in batch.Logs.Where(log => MatchesLog(log, serviceResourceIds)))
 175                TryWrite(new OpenTelemetryStreamItem { Log = log });
 76
 1477            foreach (var point in batch.MetricPoints.Where(point => MatchesMetricPoint(point, serviceResourceIds)))
 178                TryWrite(new OpenTelemetryStreamItem { MetricPoint = point });
 579        }
 80
 81        private void TryWrite(OpenTelemetryStreamItem item)
 82        {
 883            lock (_lock)
 84            {
 885                EnsureCapacityForWrite();
 886                if (Channel.Writer.TryWrite(item))
 887                    _pendingItemCount++;
 88                else
 089                    TrackDrop(item);
 90
 891                TryWriteDroppedSummary();
 892            }
 893        }
 94
 95        public void MarkRead(OpenTelemetryStreamItem item)
 96        {
 797            lock (_lock)
 98            {
 799                _pendingItemCount = Math.Max(0, _pendingItemCount - 1);
 100
 7101                if (item.DroppedItems != null)
 1102                    _droppedSinceLastSummary.Remove(item.DroppedItems.SignalType);
 103
 7104                TryWriteDroppedSummary();
 7105            }
 7106        }
 107
 108        private void EnsureCapacityForWrite()
 109        {
 8110            if (_pendingItemCount < _capacity)
 6111                return;
 112
 2113            if (!Channel.Reader.TryRead(out var dropped))
 0114                return;
 115
 2116            _pendingItemCount = Math.Max(0, _pendingItemCount - 1);
 2117            TrackDrop(dropped);
 2118        }
 119
 120        private void TrackDrop(OpenTelemetryStreamItem item)
 121        {
 2122            var signalType = GetSignalType(item);
 2123            if (signalType == null)
 0124                return;
 125
 2126            _droppedSinceLastSummary.TryGetValue(signalType.Value, out var count);
 2127            _droppedSinceLastSummary[signalType.Value] = count + (item.DroppedItems?.Count ?? 1);
 2128        }
 129
 130        private void TryWriteDroppedSummary()
 131        {
 15132            if (_droppedSinceLastSummary.Count == 0)
 12133                return;
 134
 3135            if (_pendingItemCount >= _capacity)
 2136                return;
 137
 2138            var dropped = _droppedSinceLastSummary.OrderBy(x => x.Key).First();
 1139            if (Channel.Writer.TryWrite(new OpenTelemetryStreamItem { DroppedItems = new(dropped.Key, dropped.Value, "Su
 1140                _pendingItemCount++;
 1141        }
 142
 143        private static OpenTelemetrySignalType? GetSignalType(OpenTelemetryStreamItem item)
 144        {
 2145            if (item.Trace != null)
 2146                return OpenTelemetrySignalType.Trace;
 147
 0148            if (item.MetricPoint != null)
 0149                return OpenTelemetrySignalType.Metric;
 150
 0151            if (item.Log != null)
 0152                return OpenTelemetrySignalType.Log;
 153
 0154            return item.DroppedItems?.SignalType;
 155        }
 156
 157        private bool MatchesResource(TelemetryResource resource)
 158        {
 2159            if (!string.IsNullOrWhiteSpace(filter.TraceId))
 0160                return false;
 161
 2162            if (!string.IsNullOrWhiteSpace(filter.WorkflowInstanceId))
 0163                return false;
 164
 2165            if (filter.Status != null)
 0166                return false;
 167
 2168            if (!string.IsNullOrWhiteSpace(filter.ResourceId) && !EqualsIgnoreCase(resource.Id, filter.ResourceId))
 0169                return false;
 170
 2171            if (!string.IsNullOrWhiteSpace(filter.ServiceName) && !EqualsIgnoreCase(resource.ServiceName, filter.Service
 1172                return false;
 173
 1174            if (filter.From != null && resource.LastSeen < filter.From)
 0175                return false;
 176
 1177            if (filter.To != null && resource.LastSeen > filter.To)
 0178                return false;
 179
 1180            if (!string.IsNullOrWhiteSpace(filter.Search) && !Matches(resource.Id, filter.Search) && !Matches(resource.S
 0181                return false;
 182
 1183            return true;
 184        }
 185
 186        private bool MatchesTrace(TelemetryTrace trace, HashSet<string>? serviceResourceIds)
 187        {
 7188            if (!string.IsNullOrWhiteSpace(filter.TraceId) && !EqualsIgnoreCase(trace.TraceId, filter.TraceId))
 1189                return false;
 190
 6191            if (filter.Status != null && trace.Status != filter.Status)
 0192                return false;
 193
 6194            if (filter.From != null && trace.StartTime < filter.From)
 0195                return false;
 196
 6197            if (filter.To != null && trace.StartTime > filter.To)
 0198                return false;
 199
 6200            if (!string.IsNullOrWhiteSpace(filter.WorkflowInstanceId) && !trace.WorkflowInstanceIds.Any(id => Matches(id
 0201                return false;
 202
 6203            if (!string.IsNullOrWhiteSpace(filter.ResourceId) && !trace.ResourceIds.Contains(filter.ResourceId, StringCo
 0204                return false;
 205
 6206            if (serviceResourceIds != null && !trace.ResourceIds.Any(serviceResourceIds.Contains))
 1207                return false;
 208
 5209            if (!string.IsNullOrWhiteSpace(filter.Search) && !Matches(trace.TraceId, filter.Search) && !Matches(trace.Na
 0210                return false;
 211
 5212            return true;
 213        }
 214
 215        private bool MatchesLog(OtlpLogRecord log, HashSet<string>? serviceResourceIds)
 216        {
 2217            if (filter.Status != null)
 0218                return false;
 219
 2220            if (!string.IsNullOrWhiteSpace(filter.ResourceId) && !EqualsIgnoreCase(log.ResourceId, filter.ResourceId))
 1221                return false;
 222
 1223            if (serviceResourceIds != null && !serviceResourceIds.Contains(log.ResourceId))
 0224                return false;
 225
 1226            if (!string.IsNullOrWhiteSpace(filter.TraceId) && !EqualsIgnoreCase(log.TraceId, filter.TraceId))
 0227                return false;
 228
 1229            if (!string.IsNullOrWhiteSpace(filter.WorkflowInstanceId) && !AttributeMatches(log.Attributes, "workflow.ins
 0230                return false;
 231
 1232            if (filter.From != null && log.Timestamp < filter.From)
 0233                return false;
 234
 1235            if (filter.To != null && log.Timestamp > filter.To)
 0236                return false;
 237
 1238            if (!string.IsNullOrWhiteSpace(filter.Search) && !Matches(log.Body, filter.Search))
 0239                return false;
 240
 1241            return true;
 242        }
 243
 244        private bool MatchesMetricPoint(MetricPoint point, HashSet<string>? serviceResourceIds)
 245        {
 2246            if (filter.Status != null)
 0247                return false;
 248
 2249            if (!string.IsNullOrWhiteSpace(filter.ResourceId) && !EqualsIgnoreCase(point.ResourceId, filter.ResourceId))
 1250                return false;
 251
 1252            if (serviceResourceIds != null && !serviceResourceIds.Contains(point.ResourceId))
 0253                return false;
 254
 1255            if (!string.IsNullOrWhiteSpace(filter.TraceId) && !EqualsIgnoreCase(point.TraceId, filter.TraceId))
 0256                return false;
 257
 1258            if (!string.IsNullOrWhiteSpace(filter.WorkflowInstanceId) && !AttributeMatches(point.Attributes, "workflow.i
 0259                return false;
 260
 1261            if (filter.From != null && point.Timestamp < filter.From)
 0262                return false;
 263
 1264            if (filter.To != null && point.Timestamp > filter.To)
 0265                return false;
 266
 1267            if (!string.IsNullOrWhiteSpace(filter.Search) && !Matches(point.InstrumentName, filter.Search) && !Matches(p
 0268                return false;
 269
 1270            return true;
 271        }
 272
 273        private HashSet<string>? ResolveServiceResourceIds(IEnumerable<TelemetryResource> resources)
 274        {
 5275            if (string.IsNullOrWhiteSpace(filter.ServiceName))
 3276                return null;
 277
 2278            return resources
 2279                .Where(x => EqualsIgnoreCase(x.ServiceName, filter.ServiceName))
 1280                .Select(x => x.Id)
 2281                .ToHashSet(StringComparer.OrdinalIgnoreCase);
 282        }
 283
 284        private static bool AttributeMatches(IDictionary<string, string?> attributes, string key, string? search)
 285        {
 0286            return attributes.TryGetValue(key, out var value) && Matches(value, search);
 287        }
 288
 10289        private static bool EqualsIgnoreCase(string? candidate, string? search) => string.Equals(candidate, search, Stri
 290
 0291        private static bool Matches(string? candidate, string? search) => !string.IsNullOrEmpty(candidate) && !string.Is
 292    }
 293}

Methods/Properties

.ctor(Microsoft.Extensions.Options.IOptions`1<Elsa.Diagnostics.OpenTelemetry.Options.OpenTelemetryDiagnosticsOptions>)
PublishAsync(Elsa.Diagnostics.OpenTelemetry.Models.OpenTelemetryBatch,System.Threading.CancellationToken)
SubscribeAsync()
.ctor(Elsa.Diagnostics.OpenTelemetry.Models.OpenTelemetryTraceFilter,System.Int32)
get_Channel()
TryWrite(Elsa.Diagnostics.OpenTelemetry.Models.OpenTelemetryBatch)
TryWrite(Elsa.Diagnostics.OpenTelemetry.Models.OpenTelemetryStreamItem)
MarkRead(Elsa.Diagnostics.OpenTelemetry.Models.OpenTelemetryStreamItem)
EnsureCapacityForWrite()
TrackDrop(Elsa.Diagnostics.OpenTelemetry.Models.OpenTelemetryStreamItem)
TryWriteDroppedSummary()
GetSignalType(Elsa.Diagnostics.OpenTelemetry.Models.OpenTelemetryStreamItem)
MatchesResource(Elsa.Diagnostics.OpenTelemetry.Models.TelemetryResource)
MatchesTrace(Elsa.Diagnostics.OpenTelemetry.Models.TelemetryTrace,System.Collections.Generic.HashSet`1<System.String>)
MatchesLog(Elsa.Diagnostics.OpenTelemetry.Models.OtlpLogRecord,System.Collections.Generic.HashSet`1<System.String>)
MatchesMetricPoint(Elsa.Diagnostics.OpenTelemetry.Models.MetricPoint,System.Collections.Generic.HashSet`1<System.String>)
ResolveServiceResourceIds(System.Collections.Generic.IEnumerable`1<Elsa.Diagnostics.OpenTelemetry.Models.TelemetryResource>)
AttributeMatches(System.Collections.Generic.IDictionary`2<System.String,System.String>,System.String,System.String)
EqualsIgnoreCase(System.String,System.String)
Matches(System.String,System.String)