< Summary

Information
Class: Elsa.Workflows.Runtime.Distributed.DistributedWorkflowClient
Assembly: Elsa.Workflows.Runtime.Distributed
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime.Distributed/Services/DistributedWorkflowClient.cs
Line coverage
87%
Covered lines: 42
Uncovered lines: 6
Coverable lines: 48
Total lines: 92
Line coverage: 87.5%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_WorkflowInstanceId()100%11100%
CreateInstanceAsync()100%11100%
<RunInstanceAsync()100%11100%
RunInstanceAsync()100%11100%
CreateAndRunInstanceAsync()100%11100%
<CreateAndRunInstanceAsync()100%11100%
CancelAsync()100%210%
ExportStateAsync()100%11100%
ImportStateAsync()100%210%
InstanceExistsAsync()100%210%
<DeleteAsync()100%11100%
DeleteAsync()100%11100%
WithLockAsync()100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime.Distributed/Services/DistributedWorkflowClient.cs

#LineLine coverage
 1using Elsa.Common.DistributedHosting;
 2using Elsa.Workflows.Runtime.Messages;
 3using Elsa.Workflows.State;
 4using Medallion.Threading;
 5using Microsoft.Extensions.DependencyInjection;
 6using Microsoft.Extensions.Options;
 7
 8namespace Elsa.Workflows.Runtime.Distributed;
 9
 12210public class DistributedWorkflowClient(
 12211    string workflowInstanceId,
 12212    IDistributedLockProvider distributedLockProvider,
 12213    IOptions<DistributedLockingOptions> distributedLockingOptions,
 12214    IServiceProvider serviceProvider)
 15    : IWorkflowClient
 16{
 12217    private readonly LocalWorkflowClient _localWorkflowClient = ActivatorUtilities.CreateInstance<LocalWorkflowClient>(s
 18
 11819    public string WorkflowInstanceId => workflowInstanceId;
 20
 21    public async Task<CreateWorkflowInstanceResponse> CreateInstanceAsync(CreateWorkflowInstanceRequest request, Cancell
 22    {
 2223        return await _localWorkflowClient.CreateInstanceAsync(request, cancellationToken);
 2224    }
 25
 26    public async Task<RunWorkflowInstanceResponse> RunInstanceAsync(RunWorkflowInstanceRequest request, CancellationToke
 27    {
 11628        var result = await WithLockAsync(async () => await _localWorkflowClient.RunInstanceAsync(request, cancellationTo
 5829        return result;
 5830    }
 31
 32    public async Task<RunWorkflowInstanceResponse> CreateAndRunInstanceAsync(CreateAndRunWorkflowInstanceRequest request
 33    {
 5234        var createRequest = new CreateWorkflowInstanceRequest
 5235        {
 5236            Properties = request.Properties,
 5237            CorrelationId = request.CorrelationId,
 5238            Name = request.Name,
 5239            Input = request.Input,
 5240            WorkflowDefinitionHandle = request.WorkflowDefinitionHandle,
 5241            ParentId = request.ParentId
 5242        };
 5243        var workflowInstance = await _localWorkflowClient.CreateInstanceInternalAsync(createRequest, cancellationToken);
 44
 45        // We need to lock newly created workflow instances too, because it might dispatch child workflows that attempt 
 46        // For example, when using a DispatchWorkflow activity configured to wait for the dispatched workflow to complet
 10447        return await WithLockAsync(async () => await _localWorkflowClient.RunInstanceAsync(workflowInstance, new()
 10448        {
 10449            Input = request.Input,
 10450            Variables = request.Variables,
 10451            Properties = request.Properties,
 10452            TriggerActivityId = request.TriggerActivityId,
 10453            ActivityHandle = request.ActivityHandle,
 10454            IncludeWorkflowOutput = request.IncludeWorkflowOutput
 10455        }, cancellationToken));
 5256    }
 57
 58    public async Task CancelAsync(CancellationToken cancellationToken = default)
 59    {
 060        await _localWorkflowClient.CancelAsync(cancellationToken);
 061    }
 62
 63    public async Task<WorkflowState> ExportStateAsync(CancellationToken cancellationToken = default)
 64    {
 1165        return await _localWorkflowClient.ExportStateAsync(cancellationToken);
 1166    }
 67
 68    public async Task ImportStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default)
 69    {
 070        await _localWorkflowClient.ImportStateAsync(workflowState, cancellationToken);
 071    }
 72
 73    public async Task<bool> InstanceExistsAsync(CancellationToken cancellationToken = default)
 74    {
 075        return await _localWorkflowClient.InstanceExistsAsync(cancellationToken);
 076    }
 77
 78    public async Task<bool> DeleteAsync(CancellationToken cancellationToken = default)
 79    {
 80        // Use the same distributed lock as for execution to prevent concurrent DB writes
 1681        return await WithLockAsync(async () => await _localWorkflowClient.DeleteAsync(cancellationToken));
 882    }
 83
 84    private async Task<R> WithLockAsync<R>(Func<Task<R>> func)
 85    {
 11886        var lockKey = $"workflow-instance:{WorkflowInstanceId}";
 11887        var lockTimeout = distributedLockingOptions.Value.LockAcquisitionTimeout;
 11888        await using var @lock = await distributedLockProvider.AcquireLockAsync(lockKey, lockTimeout);
 11889        var result = await func();
 11890        return result;
 11891    }
 92}