< Summary

Information
Class: Elsa.Workflows.Runtime.WorkflowResumer
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/WorkflowResumer.cs
Line coverage
37%
Covered lines: 30
Uncovered lines: 49
Coverable lines: 79
Total lines: 135
Line coverage: 37.9%
Branch coverage
57%
Covered branches: 8
Total branches: 14
Branch coverage: 57.1%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ResumeAsync(...)100%210%
ResumeAsync()100%210%
ResumeAsync()100%210%
ResumeAsync()100%210%
ResumeAsync()0%2040%
ResumeAsync()80%111079.31%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/WorkflowResumer.cs

#LineLine coverage
 1using Elsa.Common.DistributedHosting;
 2using Elsa.Workflows.Helpers;
 3using Elsa.Workflows.Runtime.Exceptions;
 4using Elsa.Workflows.Runtime.Filters;
 5using Elsa.Workflows.Runtime.Messages;
 6using Elsa.Workflows.Runtime.Options;
 7using Medallion.Threading;
 8using Microsoft.Extensions.Logging;
 9using Microsoft.Extensions.Options;
 10
 11namespace Elsa.Workflows.Runtime;
 12
 13/// <inheritdoc />
 10514public class WorkflowResumer(
 10515    IWorkflowRuntime workflowRuntime,
 10516    IBookmarkStore bookmarkStore,
 10517    IStimulusHasher stimulusHasher,
 10518    IDistributedLockProvider distributedLockProvider,
 10519    IOptions<DistributedLockingOptions> distributedLockingOptions,
 10520    ILogger<WorkflowResumer> logger) : IWorkflowResumer
 21{
 22    /// <inheritdoc />
 23    public Task<IEnumerable<RunWorkflowInstanceResponse>> ResumeAsync<TActivity>(object stimulus, ResumeBookmarkOptions?
 24    {
 025        return ResumeAsync<TActivity>(stimulus, null, options, cancellationToken);
 26    }
 27
 28    /// <inheritdoc />
 29    public async Task<IEnumerable<RunWorkflowInstanceResponse>> ResumeAsync<TActivity>(object stimulus, string? workflow
 30    {
 031        var activityTypeName = ActivityTypeNameHelper.GenerateTypeName<TActivity>();
 032        var stimulusHash = stimulusHasher.Hash(activityTypeName, stimulus);
 033        var bookmarkFilter = new BookmarkFilter
 034        {
 035            Name = activityTypeName,
 036            WorkflowInstanceId = workflowInstanceId,
 037            Hash = stimulusHash,
 038        };
 039        return await ResumeAsync(bookmarkFilter, options, cancellationToken);
 040    }
 41
 42    /// <inheritdoc />
 43    public async Task<RunWorkflowInstanceResponse?> ResumeAsync(string bookmarkId, IDictionary<string, object> input, Ca
 44    {
 045        var bookmarkFilter = new BookmarkFilter
 046        {
 047            BookmarkId = bookmarkId
 048        };
 049        var options = new ResumeBookmarkOptions
 050        {
 051            Input = input
 052        };
 053        var responses = await ResumeAsync(bookmarkFilter, options, cancellationToken);
 054        return responses.FirstOrDefault();
 055    }
 56
 57    /// <inheritdoc />
 58    public async Task<RunWorkflowInstanceResponse?> ResumeAsync<TActivity>(string bookmarkId, ResumeBookmarkOptions? opt
 59    {
 060        var activityTypeName = ActivityTypeNameHelper.GenerateTypeName<TActivity>();
 061        var bookmarkFilter = new BookmarkFilter
 062        {
 063            Name = activityTypeName,
 064            BookmarkId = bookmarkId
 065        };
 066        var response = await ResumeAsync(bookmarkFilter, options, cancellationToken);
 067        return response.FirstOrDefault();
 068    }
 69
 70    public async Task<IEnumerable<RunWorkflowInstanceResponse>> ResumeAsync(ResumeBookmarkRequest request, CancellationT
 71    {
 072        var filter = new BookmarkFilter
 073        {
 074            BookmarkId = request.BookmarkId,
 075            ActivityInstanceId = request.ActivityInstanceId ?? request.ActivityHandle?.ActivityInstanceId,
 076        };
 77
 078        var resumeOptions = new ResumeBookmarkOptions()
 079        {
 080            Input = request.Input,
 081            Properties = request.Properties,
 082        };
 083        return await ResumeAsync(filter, resumeOptions, cancellationToken);
 084    }
 85
 86    /// <inheritdoc />
 87    public async Task<IEnumerable<RunWorkflowInstanceResponse>> ResumeAsync(BookmarkFilter filter, ResumeBookmarkOptions
 88    {
 110689        var hashableFilterString = filter.GetHashableString();
 110690        var lockKey = $"workflow-resumer:{hashableFilterString}";
 91
 92        try
 93        {
 110694            await using var filterLock = await distributedLockProvider.AcquireLockAsync(lockKey, distributedLockingOptio
 110695            var bookmarks = (await bookmarkStore.FindManyAsync(filter, cancellationToken)).ToList();
 96
 110697            if (bookmarks.Count == 0)
 98            {
 108699                logger.LogDebug("No bookmarks found in store for filter {@Filter}", filter);
 1086100                return [];
 101            }
 102
 20103            var responses = new List<RunWorkflowInstanceResponse>();
 80104            foreach (var bookmark in bookmarks)
 105            {
 20106                var workflowClient = await workflowRuntime.CreateClientAsync(bookmark.WorkflowInstanceId, cancellationTo
 20107                var runRequest = new RunWorkflowInstanceRequest
 20108                {
 20109                    Input = options?.Input,
 20110                    Properties = options?.Properties,
 20111                    BookmarkId = bookmark.Id
 20112                };
 113
 114                try
 115                {
 20116                    var response = await workflowClient.RunInstanceAsync(runRequest, cancellationToken);
 20117                    logger.LogDebug("Resumed workflow instance {WorkflowInstanceId} with bookmark {BookmarkId}", bookmar
 20118                    responses.Add(response);
 20119                }
 0120                catch (WorkflowInstanceNotFoundException)
 121                {
 122                    // The workflow instance does not (yet) exist in the DB.
 0123                    logger.LogDebug("No workflow instance with ID {WorkflowInstanceId} found for bookmark {BookmarkId} a
 0124                }
 20125            }
 126
 20127            return responses;
 0128        }
 0129        catch (TimeoutException e)
 130        {
 131            // Rethrow but with a more specific message.
 0132            throw new TimeoutException($"Could not acquire distributed lock with key '{lockKey}' within the configured t
 133        }
 1106134    }
 135}