< Summary

Information
Class: Elsa.Http.Middleware.HttpWorkflowsMiddleware
Assembly: Elsa.Http
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Http/Middleware/HttpWorkflowsMiddleware.cs
Line coverage
73%
Covered lines: 147
Uncovered lines: 52
Coverable lines: 199
Total lines: 390
Line coverage: 73.8%
Branch coverage
69%
Covered branches: 39
Total branches: 56
Branch coverage: 69.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Http/Middleware/HttpWorkflowsMiddleware.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Http.Bookmarks;
 3using Elsa.Http.Options;
 4using Elsa.Workflows.Runtime.Filters;
 5using JetBrains.Annotations;
 6using Microsoft.AspNetCore.Http;
 7using Microsoft.Extensions.DependencyInjection;
 8using Microsoft.Extensions.Options;
 9using System.Net;
 10using System.Net.Mime;
 11using System.Text.Json;
 12using Elsa.Workflows.Activities;
 13using Elsa.Workflows.Runtime.Entities;
 14using FastEndpoints;
 15using System.Diagnostics.CodeAnalysis;
 16using Elsa.Workflows;
 17using Elsa.Workflows.Management;
 18using Elsa.Workflows.Management.Entities;
 19using Elsa.Workflows.Models;
 20using Elsa.Workflows.Options;
 21using Elsa.Workflows.Runtime;
 22using Open.Linq.AsyncExtensions;
 23
 24namespace Elsa.Http.Middleware;
 25
 26/// <summary>
 27/// An ASP.NET middleware component that tries to match the inbound request path to an associated workflow and then run 
 28/// </summary>
 29[PublicAPI]
 930public class HttpWorkflowsMiddleware(RequestDelegate next)
 31{
 32    /// <summary>
 33    /// Attempts to match the inbound request path to an associated workflow and then run that workflow.
 34    /// </summary>
 35    [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Serialize<TValue>(TValue, JsonSerializerOptions)")]
 36    public async Task InvokeAsync(
 37        HttpContext httpContext,
 38        IServiceProvider serviceProvider,
 39        IOptions<HttpActivityOptions> options,
 40        IHttpWorkflowLookupService httpWorkflowLookupService)
 41    {
 29642        var path = httpContext.Request.Path.Value!.NormalizeRoute();
 29643        var matchingPath = GetMatchingRoute(serviceProvider, path).Route;
 29644        var basePath = options.Value.BasePath?.ToString().NormalizeRoute();
 45
 46        // If the request path does not match the configured base path to handle workflows, then skip.
 29647        if (!string.IsNullOrWhiteSpace(basePath))
 48        {
 29549            if (!path.StartsWith(basePath, StringComparison.OrdinalIgnoreCase))
 50            {
 3051                await next(httpContext);
 3052                return;
 53            }
 54
 55            // Strip the base path.
 26556            matchingPath = matchingPath[basePath.Length..];
 57        }
 58
 59        // Graceful-shutdown gate: when the runtime is paused or draining, we don't accept new HTTP-triggered work.
 60        // The ingress source registry visibility is provided by HttpTriggerIngressSource — this is the actual mechanism
 26661        var quiescenceSignal = serviceProvider.GetService<IQuiescenceSignal>();
 26662        if (quiescenceSignal is not null && !quiescenceSignal.IsAcceptingNewWork)
 63        {
 064            httpContext.Response.StatusCode = (int)HttpStatusCode.ServiceUnavailable;
 65            // Retry-After is reason-aware: drain is short (host is exiting and will be replaced shortly), but an
 66            // administrative pause is indefinite, so a longer back-off avoids a tight retry loop while operators
 67            // perform maintenance.
 068            httpContext.Response.Headers.RetryAfter = quiescenceSignal.CurrentState.Reason.HasFlag(QuiescenceReason.Drai
 069            return;
 70        }
 71
 26672        matchingPath = matchingPath.NormalizeRoute();
 73
 26674        var input = new Dictionary<string, object>
 26675        {
 26676            [HttpEndpoint.HttpContextInputKey] = true,
 26677            [HttpEndpoint.PathInputKey] = path
 26678        };
 79
 26680        var cancellationToken = httpContext.RequestAborted;
 26681        var request = httpContext.Request;
 26682        var method = request.Method.ToLowerInvariant();
 26683        var workflowInstanceId = await GetWorkflowInstanceIdAsync(serviceProvider, httpContext, cancellationToken);
 26684        var correlationId = await GetCorrelationIdAsync(serviceProvider, httpContext, cancellationToken);
 26685        var bookmarkHash = ComputeBookmarkHash(serviceProvider, matchingPath, method);
 26686        var lookupResult = await httpWorkflowLookupService.FindWorkflowAsync(bookmarkHash, cancellationToken);
 87
 26688        if (lookupResult != null)
 89        {
 25890            var triggers = lookupResult.Triggers;
 91
 25892            if (triggers.Count > 1)
 93            {
 094                await HandleMultipleWorkflowsFoundAsync(httpContext, () => triggers.Select(x => new
 095                {
 096                    x.WorkflowDefinitionId
 097                }), cancellationToken);
 098                return;
 99            }
 100
 258101            var trigger = triggers.FirstOrDefault();
 258102            if (trigger != null)
 103            {
 258104                var workflowGraph = lookupResult.WorkflowGraph!;
 258105                await StartWorkflowAsync(httpContext, trigger, workflowGraph, input, workflowInstanceId, correlationId);
 258106                return;
 107            }
 108        }
 109
 8110        var bookmarks = await FindBookmarksAsync(serviceProvider, bookmarkHash, workflowInstanceId, correlationId, cance
 111
 8112        if (bookmarks.Count > 1)
 113        {
 0114            await HandleMultipleWorkflowsFoundAsync(httpContext, () => bookmarks.Select(x => new
 0115            {
 0116                x.WorkflowInstanceId
 0117            }), cancellationToken);
 0118            return;
 119        }
 120
 8121        var bookmark = bookmarks.SingleOrDefault();
 122
 8123        if (bookmark != null)
 124        {
 2125            await ResumeWorkflowAsync(httpContext, bookmark, input, correlationId);
 2126            return;
 127        }
 128
 129        // If a base path was configured, the requester tried to execute a workflow that doesn't exist.
 6130        if (basePath != null)
 131        {
 5132            await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
 5133            return;
 134        }
 135
 136        // If no base path was configured, the request should be handled by subsequent middlewares.
 1137        await next(httpContext);
 296138    }
 139
 140    private async Task<WorkflowGraph?> FindWorkflowGraphAsync(IServiceProvider serviceProvider, StoredTrigger trigger, C
 141    {
 0142        var workflowDefinitionService = serviceProvider.GetRequiredService<IWorkflowDefinitionService>();
 0143        var workflowDefinitionId = trigger.WorkflowDefinitionVersionId;
 0144        return await workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionId, cancellationToken);
 0145    }
 146
 147    private async Task<IEnumerable<StoredTrigger>> FindTriggersAsync(IServiceProvider serviceProvider, string bookmarkHa
 148    {
 0149        var triggerStore = serviceProvider.GetRequiredService<ITriggerStore>();
 0150        var triggerFilter = new TriggerFilter
 0151        {
 0152            Hash = bookmarkHash
 0153        };
 0154        return await triggerStore.FindManyAsync(triggerFilter, cancellationToken);
 0155    }
 156
 157    private async Task<IEnumerable<StoredBookmark>> FindBookmarksAsync(IServiceProvider serviceProvider, string bookmark
 158    {
 8159        var bookmarkStore = serviceProvider.GetRequiredService<IBookmarkStore>();
 8160        var bookmarkFilter = new BookmarkFilter
 8161        {
 8162            Hash = bookmarkHash,
 8163            WorkflowInstanceId = workflowInstanceId,
 8164            CorrelationId = correlationId
 8165        };
 8166        return await bookmarkStore.FindManyAsync(bookmarkFilter, cancellationToken);
 8167    }
 168
 169    private async Task StartWorkflowAsync(HttpContext httpContext, StoredTrigger trigger, WorkflowGraph workflowGraph, I
 170    {
 258171        var bookmarkPayload = trigger.GetPayload<HttpEndpointBookmarkPayload>();
 258172        var workflowOptions = new RunWorkflowOptions
 258173        {
 258174            Input = input,
 258175            CorrelationId = correlationId,
 258176            TriggerActivityId = trigger.ActivityId,
 258177            WorkflowInstanceId = workflowInstanceId
 258178        };
 179
 258180        await ExecuteWorkflowAsync(httpContext, workflowGraph, workflowOptions, bookmarkPayload, null, input);
 258181    }
 182
 183    private async Task ResumeWorkflowAsync(HttpContext httpContext, StoredBookmark bookmark, IDictionary<string, object>
 184    {
 2185        var serviceProvider = httpContext.RequestServices;
 2186        var cancellationToken = httpContext.RequestAborted;
 2187        var bookmarkPayload = bookmark.GetPayload<HttpEndpointBookmarkPayload>();
 2188        var workflowInstanceStore = serviceProvider.GetRequiredService<IWorkflowInstanceStore>();
 2189        var workflowInstance = await workflowInstanceStore.FindAsync(bookmark.WorkflowInstanceId, cancellationToken);
 190
 2191        if (workflowInstance == null)
 192        {
 0193            await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
 0194            return;
 195        }
 196
 2197        var workflowDefinitionService = serviceProvider.GetRequiredService<IWorkflowDefinitionService>();
 2198        var workflowGraph = await workflowDefinitionService.FindWorkflowGraphAsync(workflowInstance.DefinitionVersionId,
 199
 2200        if (workflowGraph == null)
 201        {
 0202            await httpContext.Response.SendNotFoundAsync(cancellation: cancellationToken);
 0203            return;
 204        }
 205
 2206        var runWorkflowParams = new RunWorkflowOptions
 2207        {
 2208            WorkflowInstanceId = workflowInstance.Id,
 2209            Input = input,
 2210            CorrelationId = correlationId,
 2211            ActivityHandle = bookmark.ActivityInstanceId != null ? ActivityHandle.FromActivityInstanceId(bookmark.Activi
 2212            BookmarkId = bookmark.Id
 2213        };
 214
 2215        await ExecuteWorkflowAsync(httpContext, workflowGraph, runWorkflowParams, bookmarkPayload, workflowInstance, inp
 2216    }
 217
 218    private async Task ExecuteWorkflowAsync(HttpContext httpContext, WorkflowGraph workflowGraph, RunWorkflowOptions wor
 219    {
 260220        var serviceProvider = httpContext.RequestServices;
 260221        var cancellationToken = httpContext.RequestAborted;
 260222        var workflow = workflowGraph.Workflow;
 223
 260224        if (!await AuthorizeAsync(serviceProvider, httpContext, workflow, bookmarkPayload, cancellationToken))
 225        {
 0226            httpContext.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
 0227            return;
 228        }
 229
 260230        var workflowRunner = serviceProvider.GetRequiredService<IWorkflowRunner>();
 260231        var result = await ExecuteWithinTimeoutAsync(async ct =>
 260232        {
 260233            if (workflowInstance == null)
 258234                return await workflowRunner.RunAsync(workflowGraph, workflowOptions, ct);
 2235            return await workflowRunner.RunAsync(workflow, workflowInstance.WorkflowState, workflowOptions, ct);
 520236        }, bookmarkPayload.RequestTimeout, httpContext);
 260237        await HandleWorkflowFaultAsync(serviceProvider, httpContext, result, cancellationToken);
 260238    }
 239
 240    private async Task<T> ExecuteWithinTimeoutAsync<T>(Func<CancellationToken, Task<T>> action, TimeSpan? requestTimeout
 241    {
 242        // If no request timeout is specified, execute the action without any timeout.
 263243        if (requestTimeout == null)
 260244            return await action(httpContext.RequestAborted);
 245
 246        // Create a combined cancellation token that cancels when the request is aborted or when the request timeout is 
 3247        using var requestTimeoutCancellationTokenSource = new CancellationTokenSource();
 3248        requestTimeoutCancellationTokenSource.CancelAfter(requestTimeout.Value);
 3249        using var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(httpContext.RequestAborted, requ
 3250        var originalCancellationToken = httpContext.RequestAborted;
 251
 252        // Replace the original cancellation token with the combined one.
 3253        httpContext.RequestAborted = combinedTokenSource.Token;
 254
 255        try
 256        {
 3257            return await action(httpContext.RequestAborted);
 258        }
 259        finally
 260        {
 261            // Restore the original cancellation token even when execution faults or is canceled.
 3262            httpContext.RequestAborted = originalCancellationToken;
 263        }
 261264    }
 265
 266    private HttpRouteData GetMatchingRoute(IServiceProvider serviceProvider, string path)
 267    {
 296268        var routeMatcher = serviceProvider.GetRequiredService<IRouteMatcher>();
 296269        var routeTable = serviceProvider.GetRequiredService<IRouteTable>();
 270
 296271        var matchingRouteQuery =
 296272            from routeData in routeTable
 3499273            let routeValues = routeMatcher.Match(routeData.Route, path)
 3499274            where routeValues != null
 560275            select new
 560276            {
 560277                route = routeData,
 560278                routeValues
 560279            };
 280
 296281        var matchingRoute = matchingRouteQuery.FirstOrDefault();
 296282        var routeTemplate = matchingRoute?.route ?? new HttpRouteData(path);
 283
 296284        return routeTemplate;
 285    }
 286
 287    private async Task<string?> GetCorrelationIdAsync(IServiceProvider serviceProvider, HttpContext httpContext, Cancell
 288    {
 266289        var correlationIdSelectors = serviceProvider.GetServices<IHttpCorrelationIdSelector>();
 290
 266291        var correlationId = default(string);
 292
 2118293        foreach (var selector in correlationIdSelectors.OrderByDescending(x => x.Priority))
 294        {
 530295            correlationId = await selector.GetCorrelationIdAsync(httpContext, cancellationToken);
 296
 530297            if (correlationId != null)
 298                break;
 299        }
 300
 266301        return correlationId;
 266302    }
 303
 304    private async Task<string?> GetWorkflowInstanceIdAsync(IServiceProvider serviceProvider, HttpContext httpContext, Ca
 305    {
 266306        var workflowInstanceIdSelectors = serviceProvider.GetServices<IHttpWorkflowInstanceIdSelector>();
 307
 266308        var workflowInstanceId = default(string);
 309
 2118310        foreach (var selector in workflowInstanceIdSelectors.OrderByDescending(x => x.Priority))
 311        {
 530312            workflowInstanceId = await selector.GetWorkflowInstanceIdAsync(httpContext, cancellationToken);
 313
 530314            if (workflowInstanceId != null)
 315                break;
 316        }
 317
 266318        return workflowInstanceId;
 266319    }
 320
 321    [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Serialize<TValue>(TValue, JsonSerializerOptions)")]
 322    private static async Task WriteResponseAsync(HttpContext httpContext, CancellationToken cancellationToken)
 323    {
 0324        var response = httpContext.Response;
 325
 0326        if (!response.HasStarted)
 327        {
 0328            response.ContentType = MediaTypeNames.Application.Json;
 0329            response.StatusCode = StatusCodes.Status200OK;
 330
 0331            var model = new
 0332            {
 0333                workflowInstanceIds = Array.Empty<string>(),
 0334            };
 335
 0336            var json = JsonSerializer.Serialize(model);
 0337            await response.WriteAsync(json, cancellationToken);
 338        }
 0339    }
 340
 341    [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Serialize<TValue>(TValue, JsonSerializerOptions)")]
 342    private async Task<bool> HandleMultipleWorkflowsFoundAsync(HttpContext httpContext, Func<IEnumerable<object>> workfl
 343    {
 0344        httpContext.Response.ContentType = "application/json";
 0345        httpContext.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
 346
 0347        var responseContent = JsonSerializer.Serialize(new
 0348        {
 0349            errorMessage = "The call is ambiguous and matches multiple workflows.",
 0350            workflows = workflowMatches().ToArray()
 0351        });
 352
 0353        await httpContext.Response.WriteAsync(responseContent, cancellationToken);
 0354        return true;
 0355    }
 356
 357    private async Task<bool> HandleWorkflowFaultAsync(IServiceProvider serviceProvider, HttpContext httpContext, RunWork
 358    {
 262359        if (!workflowExecutionResult.WorkflowState.Incidents.Any() || httpContext.Response.HasStarted)
 260360            return false;
 361
 2362        var httpEndpointFaultHandler = serviceProvider.GetRequiredService<IHttpEndpointFaultHandler>();
 2363        var workflowInstanceManager = serviceProvider.GetRequiredService<IWorkflowInstanceManager>();
 2364        var workflowState = (await workflowInstanceManager.FindByIdAsync(workflowExecutionResult.WorkflowState.Id, cance
 2365        await httpEndpointFaultHandler.HandleAsync(new(httpContext, workflowState, cancellationToken));
 2366        return true;
 262367    }
 368
 369    private async Task<bool> AuthorizeAsync(
 370        IServiceProvider serviceProvider,
 371        HttpContext httpContext,
 372        Workflow workflow,
 373        HttpEndpointBookmarkPayload bookmarkPayload,
 374        CancellationToken cancellationToken)
 375    {
 260376        var httpEndpointAuthorizationHandler = serviceProvider.GetRequiredService<IHttpEndpointAuthorizationHandler>();
 377
 260378        if (bookmarkPayload.Authorize == false)
 260379            return true;
 380
 0381        return await httpEndpointAuthorizationHandler.AuthorizeAsync(new(httpContext, workflow, bookmarkPayload.Policy))
 260382    }
 383
 384    private string ComputeBookmarkHash(IServiceProvider serviceProvider, string path, string method)
 385    {
 266386        var bookmarkPayload = new HttpEndpointBookmarkPayload(path, method);
 266387        var bookmarkHasher = serviceProvider.GetRequiredService<IStimulusHasher>();
 266388        return bookmarkHasher.Hash(HttpStimulusNames.HttpEndpoint, bookmarkPayload);
 389    }
 390}