| | | 1 | | using System.Buffers; |
| | | 2 | | using Elsa.Diagnostics.OpenTelemetry.Options; |
| | | 3 | | using Elsa.Diagnostics.OpenTelemetry.Contracts; |
| | | 4 | | using Elsa.Diagnostics.OpenTelemetry.Ingestion; |
| | | 5 | | using Elsa.Diagnostics.OpenTelemetry.Ingestion.HttpProtobuf; |
| | | 6 | | using Elsa.Diagnostics.OpenTelemetry.Models; |
| | | 7 | | using Elsa.Diagnostics.OpenTelemetry.RealTime; |
| | | 8 | | using Microsoft.AspNetCore.Http; |
| | | 9 | | using Microsoft.AspNetCore.Builder; |
| | | 10 | | using Microsoft.AspNetCore.Routing; |
| | | 11 | | using Microsoft.Extensions.DependencyInjection; |
| | | 12 | | using Microsoft.Extensions.Options; |
| | | 13 | | |
| | | 14 | | namespace Elsa.Diagnostics.OpenTelemetry.Extensions; |
| | | 15 | | |
| | | 16 | | public static class EndpointRouteBuilderExtensions |
| | | 17 | | { |
| | | 18 | | public const string DefaultHubRoute = "/elsa/hubs/diagnostics/opentelemetry"; |
| | | 19 | | |
| | | 20 | | public static void MapOpenTelemetryHub(this IEndpointRouteBuilder endpoints) |
| | | 21 | | { |
| | 0 | 22 | | var options = endpoints.ServiceProvider.GetRequiredService<IOptions<OpenTelemetryDiagnosticsOptions>>().Value; |
| | 0 | 23 | | endpoints.MapHub<OpenTelemetryHub>(string.IsNullOrWhiteSpace(options.HubRoute) ? DefaultHubRoute : options.HubRo |
| | 0 | 24 | | } |
| | | 25 | | |
| | | 26 | | public static void MapOpenTelemetryHttpProtobufCollector(this IEndpointRouteBuilder endpoints) |
| | | 27 | | { |
| | 6 | 28 | | var options = endpoints.ServiceProvider.GetRequiredService<IOptions<OpenTelemetryDiagnosticsOptions>>().Value; |
| | 6 | 29 | | var basePath = (string.IsNullOrWhiteSpace(options.HttpEndpointPath) ? "/elsa/otlp/v1" : options.HttpEndpointPath |
| | | 30 | | |
| | 6 | 31 | | endpoints.MapPost($"{basePath}/traces", static async (HttpContext httpContext, IOpenTelemetryIngestor ingestor, |
| | 6 | 32 | | { |
| | 10 | 33 | | return await IngestAsync(httpContext, ingestor, options, payload => OtlpHttpProtobufParser.ParseTraces(paylo |
| | 12 | 34 | | }); |
| | | 35 | | |
| | 6 | 36 | | endpoints.MapPost($"{basePath}/metrics", static async (HttpContext httpContext, IOpenTelemetryIngestor ingestor, |
| | 6 | 37 | | { |
| | 0 | 38 | | return await IngestAsync(httpContext, ingestor, options, payload => OtlpHttpProtobufParser.ParseMetrics(payl |
| | 6 | 39 | | }); |
| | | 40 | | |
| | 6 | 41 | | endpoints.MapPost($"{basePath}/logs", static async (HttpContext httpContext, IOpenTelemetryIngestor ingestor, IO |
| | 6 | 42 | | { |
| | 0 | 43 | | return await IngestAsync(httpContext, ingestor, options, payload => OtlpHttpProtobufParser.ParseLogs(payload |
| | 6 | 44 | | }); |
| | 6 | 45 | | } |
| | | 46 | | |
| | | 47 | | public static void MapOpenTelemetryGrpcCollector(this IEndpointRouteBuilder endpoints) |
| | | 48 | | { |
| | 0 | 49 | | var options = endpoints.ServiceProvider.GetRequiredService<IOptions<OpenTelemetryDiagnosticsOptions>>().Value; |
| | | 50 | | |
| | 0 | 51 | | if (!options.EnableGrpc) |
| | 0 | 52 | | return; |
| | | 53 | | |
| | 0 | 54 | | if (string.IsNullOrWhiteSpace(options.GrpcEndpointPath)) |
| | 0 | 55 | | throw new InvalidOperationException("OpenTelemetry gRPC ingestion is enabled, but no gRPC endpoint path was |
| | | 56 | | |
| | | 57 | | // The actual gRPC service binding is host-specific. This module exposes shared ingestion |
| | | 58 | | // contracts and accurate collector metadata without forcing every host to reference gRPC. |
| | 0 | 59 | | } |
| | | 60 | | |
| | | 61 | | private static async Task<IResult> IngestAsync( |
| | | 62 | | HttpContext httpContext, |
| | | 63 | | IOpenTelemetryIngestor ingestor, |
| | | 64 | | IOptions<OpenTelemetryDiagnosticsOptions> options, |
| | | 65 | | Func<ReadOnlyMemory<byte>, OpenTelemetryBatch> parse, |
| | | 66 | | CancellationToken cancellationToken) |
| | | 67 | | { |
| | 6 | 68 | | if (!OtlpIngestionSecurity.IsAuthorized(httpContext, options.Value)) |
| | 1 | 69 | | return Results.Unauthorized(); |
| | | 70 | | |
| | | 71 | | try |
| | | 72 | | { |
| | 5 | 73 | | var payload = await ReadBodyAsync(httpContext, options.Value.MaxHttpRequestBodySize, cancellationToken); |
| | 4 | 74 | | await ingestor.IngestAsync(parse(payload), cancellationToken); |
| | 3 | 75 | | return Results.Ok(); |
| | | 76 | | } |
| | 1 | 77 | | catch (RequestBodyTooLargeException) |
| | | 78 | | { |
| | 1 | 79 | | return Results.StatusCode(StatusCodes.Status413PayloadTooLarge); |
| | | 80 | | } |
| | 1 | 81 | | catch (InvalidDataException) |
| | | 82 | | { |
| | 1 | 83 | | return Results.BadRequest(); |
| | | 84 | | } |
| | 6 | 85 | | } |
| | | 86 | | |
| | | 87 | | private static async Task<ReadOnlyMemory<byte>> ReadBodyAsync(HttpContext httpContext, long maxBodySize, Cancellatio |
| | | 88 | | { |
| | 5 | 89 | | using var stream = new MemoryStream(); |
| | 5 | 90 | | var buffer = ArrayPool<byte>.Shared.Rent(81920); |
| | 5 | 91 | | var totalBytes = 0L; |
| | | 92 | | |
| | | 93 | | try |
| | | 94 | | { |
| | | 95 | | int read; |
| | 7 | 96 | | while ((read = await httpContext.Request.Body.ReadAsync(buffer.AsMemory(0, buffer.Length), cancellationToken |
| | | 97 | | { |
| | 3 | 98 | | totalBytes += read; |
| | 3 | 99 | | if (totalBytes > maxBodySize) |
| | 1 | 100 | | throw new RequestBodyTooLargeException(); |
| | | 101 | | |
| | 2 | 102 | | stream.Write(buffer, 0, read); |
| | | 103 | | } |
| | 4 | 104 | | } |
| | | 105 | | finally |
| | | 106 | | { |
| | 5 | 107 | | ArrayPool<byte>.Shared.Return(buffer); |
| | | 108 | | } |
| | | 109 | | |
| | 4 | 110 | | return stream.ToArray(); |
| | 4 | 111 | | } |
| | | 112 | | |
| | | 113 | | private sealed class RequestBodyTooLargeException : Exception; |
| | | 114 | | } |