< Summary

Information
Class: Elsa.Http.Middleware.HttpWorkflowsMiddleware
Assembly: Elsa.Http
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Http/Middleware/HttpWorkflowsMiddleware.cs
Line coverage
67%
Covered lines: 135
Uncovered lines: 66
Coverable lines: 201
Total lines: 389
Line coverage: 67.1%
Branch coverage
67%
Covered branches: 35
Total branches: 52
Branch coverage: 67.3%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
InvokeAsync()70.83%252487.23%
FindWorkflowGraphAsync()100%210%
FindTriggersAsync()100%210%
FindBookmarksAsync()100%11100%
StartWorkflowAsync()100%11100%
ResumeWorkflowAsync()50%6682.6%
ExecuteWorkflowAsync()50%2281.81%
<ExecuteWorkflowAsync()100%22100%
ExecuteWithinTimeoutAsync()50%4227.27%
GetMatchingRoute(...)100%44100%
GetCorrelationIdAsync()100%44100%
GetWorkflowInstanceIdAsync()100%44100%
WriteResponseAsync()0%620%
HandleMultipleWorkflowsFoundAsync()100%210%
HandleWorkflowFaultAsync()50%8437.5%
AuthorizeAsync()50%2280%
ComputeBookmarkHash(...)100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Http/Middleware/HttpWorkflowsMiddleware.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Http.Bookmarks;
 3using Elsa.Http.Options;
 4using Elsa.Workflows.Runtime.Filters;
 5using JetBrains.Annotations;
 6using Microsoft.AspNetCore.Http;
 7using Microsoft.Extensions.DependencyInjection;
 8using Microsoft.Extensions.Options;
 9using System.Net;
 10using System.Net.Mime;
 11using System.Text.Json;
 12using Elsa.Workflows.Activities;
 13using Elsa.Workflows.Runtime.Entities;
 14using FastEndpoints;
 15using System.Diagnostics.CodeAnalysis;
 16using Elsa.Workflows;
 17using Elsa.Workflows.Management;
 18using Elsa.Workflows.Management.Entities;
 19using Elsa.Workflows.Models;
 20using Elsa.Workflows.Options;
 21using Elsa.Workflows.Runtime;
 22using Open.Linq.AsyncExtensions;
 23
 24namespace Elsa.Http.Middleware;
 25
 26/// <summary>
 27/// An ASP.NET middleware component that tries to match the inbound request path to an associated workflow and then run 
 28/// </summary>
 29[PublicAPI]
 330public class HttpWorkflowsMiddleware(RequestDelegate next)
 31{
 32    /// <summary>
 33    /// Attempts to match the inbound request path to an associated workflow and then run that workflow.
 34    /// </summary>
 35    [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Serialize<TValue>(TValue, JsonSerializerOptions)")]
 36    public async Task InvokeAsync(
 37        HttpContext httpContext,
 38        IServiceProvider serviceProvider,
 39        IOptions<HttpActivityOptions> options,
 40        IHttpWorkflowLookupService httpWorkflowLookupService)
 41    {
 29842        var path = httpContext.Request.Path.Value!.NormalizeRoute();
 29843        var matchingPath = GetMatchingRoute(serviceProvider, path).Route;
 29844        var basePath = options.Value.BasePath?.ToString().NormalizeRoute();
 45
 46        // If the request path does not match the configured base path to handle workflows, then skip.
 29847        if (!string.IsNullOrWhiteSpace(basePath))
 48        {
 29849            if (!path.StartsWith(basePath, StringComparison.OrdinalIgnoreCase))
 50            {
 2851                await next(httpContext);
 2852                return;
 53            }
 54
 55            // Strip the base path.
 27056            matchingPath = matchingPath[basePath.Length..];
 57        }
 58
 59        // Graceful-shutdown gate: when the runtime is paused or draining, we don't accept new HTTP-triggered work.
 60        // The ingress source registry visibility is provided by HttpTriggerIngressSource — this is the actual mechanism
 27061        var quiescenceSignal = serviceProvider.GetService<IQuiescenceSignal>();
 27062        if (quiescenceSignal is not null && !quiescenceSignal.IsAcceptingNewWork)
 63        {
 064            httpContext.Response.StatusCode = (int)HttpStatusCode.ServiceUnavailable;
 65            // Retry-After is reason-aware: drain is short (host is exiting and will be replaced shortly), but an
 66            // administrative pause is indefinite, so a longer back-off avoids a tight retry loop while operators
 67            // perform maintenance.
 068            httpContext.Response.Headers.RetryAfter = quiescenceSignal.CurrentState.Reason.HasFlag(QuiescenceReason.Drai
 069            return;
 70        }
 71
 27072        matchingPath = matchingPath.NormalizeRoute();
 73
 27074        var input = new Dictionary<string, object>
 27075        {
 27076            [HttpEndpoint.HttpContextInputKey] = true,
 27077            [HttpEndpoint.PathInputKey] = path
 27078        };
 79
 27080        var cancellationToken = httpContext.RequestAborted;
 27081        var request = httpContext.Request;
 27082        var method = request.Method.ToLowerInvariant();
 27083        var workflowInstanceId = await GetWorkflowInstanceIdAsync(serviceProvider, httpContext, cancellationToken);
 27084        var correlationId = await GetCorrelationIdAsync(serviceProvider, httpContext, cancellationToken);
 27085        var bookmarkHash = ComputeBookmarkHash(serviceProvider, matchingPath, method);
 27086        var lookupResult = await httpWorkflowLookupService.FindWorkflowAsync(bookmarkHash, cancellationToken);
 87
 27088        if (lookupResult != null)
 89        {
 26390            var triggers = lookupResult.Triggers;
 91
 26392            if (triggers.Count > 1)
 93            {
 094                await HandleMultipleWorkflowsFoundAsync(httpContext, () => triggers.Select(x => new
 095                {
 096                    x.WorkflowDefinitionId
 097                }), cancellationToken);
 098                return;
 99            }
 100
 263101            var trigger = triggers.FirstOrDefault();
 263102            if (trigger != null)
 103            {
 263104                var workflowGraph = lookupResult.WorkflowGraph!;
 263105                await StartWorkflowAsync(httpContext, trigger, workflowGraph, input, workflowInstanceId, correlationId);
 263106                return;
 107            }
 108        }
 109
 7110        var bookmarks = await FindBookmarksAsync(serviceProvider, bookmarkHash, workflowInstanceId, correlationId, cance
 111
 7112        if (bookmarks.Count > 1)
 113        {
 0114            await HandleMultipleWorkflowsFoundAsync(httpContext, () => bookmarks.Select(x => new
 0115            {
 0116                x.WorkflowInstanceId
 0117            }), cancellationToken);
 0118            return;
 119        }
 120
 7121        var bookmark = bookmarks.SingleOrDefault();
 122
 7123        if (bookmark != null)
 124        {
 2125            await ResumeWorkflowAsync(httpContext, bookmark, input, correlationId);
 2126            return;
 127        }
 128
 129        // If a base path was configured, the requester tried to execute a workflow that doesn't exist.
 5130        if (basePath != null)
 131        {
 5132            await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
 5133            return;
 134        }
 135
 136        // If no base path was configured, the request should be handled by subsequent middlewares.
 0137        await next(httpContext);
 298138    }
 139
 140    private async Task<WorkflowGraph?> FindWorkflowGraphAsync(IServiceProvider serviceProvider, StoredTrigger trigger, C
 141    {
 0142        var workflowDefinitionService = serviceProvider.GetRequiredService<IWorkflowDefinitionService>();
 0143        var workflowDefinitionId = trigger.WorkflowDefinitionVersionId;
 0144        return await workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionId, cancellationToken);
 0145    }
 146
 147    private async Task<IEnumerable<StoredTrigger>> FindTriggersAsync(IServiceProvider serviceProvider, string bookmarkHa
 148    {
 0149        var triggerStore = serviceProvider.GetRequiredService<ITriggerStore>();
 0150        var triggerFilter = new TriggerFilter
 0151        {
 0152            Hash = bookmarkHash
 0153        };
 0154        return await triggerStore.FindManyAsync(triggerFilter, cancellationToken);
 0155    }
 156
 157    private async Task<IEnumerable<StoredBookmark>> FindBookmarksAsync(IServiceProvider serviceProvider, string bookmark
 158    {
 7159        var bookmarkStore = serviceProvider.GetRequiredService<IBookmarkStore>();
 7160        var bookmarkFilter = new BookmarkFilter
 7161        {
 7162            Hash = bookmarkHash,
 7163            WorkflowInstanceId = workflowInstanceId,
 7164            CorrelationId = correlationId,
 7165            TenantAgnostic = true
 7166        };
 7167        return await bookmarkStore.FindManyAsync(bookmarkFilter, cancellationToken);
 7168    }
 169
 170    private async Task StartWorkflowAsync(HttpContext httpContext, StoredTrigger trigger, WorkflowGraph workflowGraph, I
 171    {
 263172        var bookmarkPayload = trigger.GetPayload<HttpEndpointBookmarkPayload>();
 263173        var workflowOptions = new RunWorkflowOptions
 263174        {
 263175            Input = input,
 263176            CorrelationId = correlationId,
 263177            TriggerActivityId = trigger.ActivityId,
 263178            WorkflowInstanceId = workflowInstanceId
 263179        };
 180
 263181        await ExecuteWorkflowAsync(httpContext, workflowGraph, workflowOptions, bookmarkPayload, null, input);
 263182    }
 183
 184    private async Task ResumeWorkflowAsync(HttpContext httpContext, StoredBookmark bookmark, IDictionary<string, object>
 185    {
 2186        var serviceProvider = httpContext.RequestServices;
 2187        var cancellationToken = httpContext.RequestAborted;
 2188        var bookmarkPayload = bookmark.GetPayload<HttpEndpointBookmarkPayload>();
 2189        var workflowInstanceStore = serviceProvider.GetRequiredService<IWorkflowInstanceStore>();
 2190        var workflowInstance = await workflowInstanceStore.FindAsync(bookmark.WorkflowInstanceId, cancellationToken);
 191
 2192        if (workflowInstance == null)
 193        {
 0194            await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
 0195            return;
 196        }
 197
 2198        var workflowDefinitionService = serviceProvider.GetRequiredService<IWorkflowDefinitionService>();
 2199        var workflowGraph = await workflowDefinitionService.FindWorkflowGraphAsync(workflowInstance.DefinitionVersionId,
 200
 2201        if (workflowGraph == null)
 202        {
 0203            await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
 0204            return;
 205        }
 206
 2207        var runWorkflowParams = new RunWorkflowOptions
 2208        {
 2209            WorkflowInstanceId = workflowInstance.Id,
 2210            Input = input,
 2211            CorrelationId = correlationId,
 2212            ActivityHandle = bookmark.ActivityInstanceId != null ? ActivityHandle.FromActivityInstanceId(bookmark.Activi
 2213            BookmarkId = bookmark.Id
 2214        };
 215
 2216        await ExecuteWorkflowAsync(httpContext, workflowGraph, runWorkflowParams, bookmarkPayload, workflowInstance, inp
 2217    }
 218
 219    private async Task ExecuteWorkflowAsync(HttpContext httpContext, WorkflowGraph workflowGraph, RunWorkflowOptions wor
 220    {
 265221        var serviceProvider = httpContext.RequestServices;
 265222        var cancellationToken = httpContext.RequestAborted;
 265223        var workflow = workflowGraph.Workflow;
 224
 265225        if (!await AuthorizeAsync(serviceProvider, httpContext, workflow, bookmarkPayload, cancellationToken))
 226        {
 0227            httpContext.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
 0228            return;
 229        }
 230
 265231        var workflowRunner = serviceProvider.GetRequiredService<IWorkflowRunner>();
 265232        var result = await ExecuteWithinTimeoutAsync(async ct =>
 265233        {
 265234            if (workflowInstance == null)
 263235                return await workflowRunner.RunAsync(workflowGraph, workflowOptions, ct);
 2236            return await workflowRunner.RunAsync(workflow, workflowInstance.WorkflowState, workflowOptions, ct);
 530237        }, bookmarkPayload.RequestTimeout, httpContext);
 265238        await HandleWorkflowFaultAsync(serviceProvider, httpContext, result, cancellationToken);
 265239    }
 240
 241    private async Task<T> ExecuteWithinTimeoutAsync<T>(Func<CancellationToken, Task<T>> action, TimeSpan? requestTimeout
 242    {
 243        // If no request timeout is specified, execute the action without any timeout.
 265244        if (requestTimeout == null)
 265245            return await action(httpContext.RequestAborted);
 246
 247        // Create a combined cancellation token that cancels when the request is aborted or when the request timeout is 
 0248        using var requestTimeoutCancellationTokenSource = new CancellationTokenSource();
 0249        requestTimeoutCancellationTokenSource.CancelAfter(requestTimeout.Value);
 0250        using var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(httpContext.RequestAborted, requ
 0251        var originalCancellationToken = httpContext.RequestAborted;
 252
 253        // Replace the original cancellation token with the combined one.
 0254        httpContext.RequestAborted = combinedTokenSource.Token;
 255
 256        // Execute the action.
 0257        var result = await action(httpContext.RequestAborted);
 258
 259        // Restore the original cancellation token.
 0260        httpContext.RequestAborted = originalCancellationToken;
 261
 0262        return result;
 265263    }
 264
 265    private HttpRouteData GetMatchingRoute(IServiceProvider serviceProvider, string path)
 266    {
 298267        var routeMatcher = serviceProvider.GetRequiredService<IRouteMatcher>();
 298268        var routeTable = serviceProvider.GetRequiredService<IRouteTable>();
 269
 298270        var matchingRouteQuery =
 298271            from routeData in routeTable
 2829272            let routeValues = routeMatcher.Match(routeData.Route, path)
 2829273            where routeValues != null
 566274            select new
 566275            {
 566276                route = routeData,
 566277                routeValues
 566278            };
 279
 298280        var matchingRoute = matchingRouteQuery.FirstOrDefault();
 298281        var routeTemplate = matchingRoute?.route ?? new HttpRouteData(path);
 282
 298283        return routeTemplate;
 284    }
 285
 286    private async Task<string?> GetCorrelationIdAsync(IServiceProvider serviceProvider, HttpContext httpContext, Cancell
 287    {
 270288        var correlationIdSelectors = serviceProvider.GetServices<IHttpCorrelationIdSelector>();
 289
 270290        var correlationId = default(string);
 291
 2156292        foreach (var selector in correlationIdSelectors.OrderByDescending(x => x.Priority))
 293        {
 540294            correlationId = await selector.GetCorrelationIdAsync(httpContext, cancellationToken);
 295
 540296            if (correlationId != null)
 297                break;
 298        }
 299
 270300        return correlationId;
 270301    }
 302
 303    private async Task<string?> GetWorkflowInstanceIdAsync(IServiceProvider serviceProvider, HttpContext httpContext, Ca
 304    {
 270305        var workflowInstanceIdSelectors = serviceProvider.GetServices<IHttpWorkflowInstanceIdSelector>();
 306
 270307        var workflowInstanceId = default(string);
 308
 2156309        foreach (var selector in workflowInstanceIdSelectors.OrderByDescending(x => x.Priority))
 310        {
 540311            workflowInstanceId = await selector.GetWorkflowInstanceIdAsync(httpContext, cancellationToken);
 312
 540313            if (workflowInstanceId != null)
 314                break;
 315        }
 316
 270317        return workflowInstanceId;
 270318    }
 319
 320    [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Serialize<TValue>(TValue, JsonSerializerOptions)")]
 321    private static async Task WriteResponseAsync(HttpContext httpContext, CancellationToken cancellationToken)
 322    {
 0323        var response = httpContext.Response;
 324
 0325        if (!response.HasStarted)
 326        {
 0327            response.ContentType = MediaTypeNames.Application.Json;
 0328            response.StatusCode = StatusCodes.Status200OK;
 329
 0330            var model = new
 0331            {
 0332                workflowInstanceIds = Array.Empty<string>(),
 0333            };
 334
 0335            var json = JsonSerializer.Serialize(model);
 0336            await response.WriteAsync(json, cancellationToken);
 337        }
 0338    }
 339
 340    [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Serialize<TValue>(TValue, JsonSerializerOptions)")]
 341    private async Task<bool> HandleMultipleWorkflowsFoundAsync(HttpContext httpContext, Func<IEnumerable<object>> workfl
 342    {
 0343        httpContext.Response.ContentType = "application/json";
 0344        httpContext.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
 345
 0346        var responseContent = JsonSerializer.Serialize(new
 0347        {
 0348            errorMessage = "The call is ambiguous and matches multiple workflows.",
 0349            workflows = workflowMatches().ToArray()
 0350        });
 351
 0352        await httpContext.Response.WriteAsync(responseContent, cancellationToken);
 0353        return true;
 0354    }
 355
 356    private async Task<bool> HandleWorkflowFaultAsync(IServiceProvider serviceProvider, HttpContext httpContext, RunWork
 357    {
 265358        if (!workflowExecutionResult.WorkflowState.Incidents.Any() || httpContext.Response.HasStarted)
 265359            return false;
 360
 0361        var httpEndpointFaultHandler = serviceProvider.GetRequiredService<IHttpEndpointFaultHandler>();
 0362        var workflowInstanceManager = serviceProvider.GetRequiredService<IWorkflowInstanceManager>();
 0363        var workflowState = (await workflowInstanceManager.FindByIdAsync(workflowExecutionResult.WorkflowState.Id, cance
 0364        await httpEndpointFaultHandler.HandleAsync(new(httpContext, workflowState.WorkflowState, cancellationToken));
 0365        return true;
 265366    }
 367
 368    private async Task<bool> AuthorizeAsync(
 369        IServiceProvider serviceProvider,
 370        HttpContext httpContext,
 371        Workflow workflow,
 372        HttpEndpointBookmarkPayload bookmarkPayload,
 373        CancellationToken cancellationToken)
 374    {
 265375        var httpEndpointAuthorizationHandler = serviceProvider.GetRequiredService<IHttpEndpointAuthorizationHandler>();
 376
 265377        if (bookmarkPayload.Authorize == false)
 265378            return true;
 379
 0380        return await httpEndpointAuthorizationHandler.AuthorizeAsync(new(httpContext, workflow, bookmarkPayload.Policy))
 265381    }
 382
 383    private string ComputeBookmarkHash(IServiceProvider serviceProvider, string path, string method)
 384    {
 270385        var bookmarkPayload = new HttpEndpointBookmarkPayload(path, method);
 270386        var bookmarkHasher = serviceProvider.GetRequiredService<IStimulusHasher>();
 270387        return bookmarkHasher.Hash(HttpStimulusNames.HttpEndpoint, bookmarkPayload);
 388    }
 389}