< Summary

Information
Class: Elsa.Workflows.Runtime.Distributed.DistributedWorkflowClient
Assembly: Elsa.Workflows.Runtime.Distributed
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime.Distributed/Services/DistributedWorkflowClient.cs
Line coverage
91%
Covered lines: 71
Uncovered lines: 7
Coverable lines: 78
Total lines: 153
Line coverage: 91%
Branch coverage
50%
Covered branches: 1
Total branches: 2
Branch coverage: 50%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
get_WorkflowInstanceId()100%11100%
CreateInstanceAsync()100%11100%
<RunInstanceAsync()100%11100%
RunInstanceAsync()100%11100%
CreateAndRunInstanceAsync()100%11100%
<CreateAndRunInstanceAsync()100%11100%
CancelAsync()100%210%
ExportStateAsync()100%11100%
ImportStateAsync()100%210%
InstanceExistsAsync()100%210%
<DeleteAsync()100%11100%
DeleteAsync()100%11100%
WithLockAsync()100%11100%
AcquireLockWithRetryAsync()100%11100%
<AcquireLockWithRetryAsync()100%11100%
ReleaseLockAsync()50%2287.5%
CreateRetryPipeline(...)100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime.Distributed/Services/DistributedWorkflowClient.cs

#LineLine coverage
 1using Elsa.Common.DistributedHosting;
 2using Elsa.Resilience;
 3using Elsa.Workflows.Runtime.Messages;
 4using Elsa.Workflows.State;
 5using Medallion.Threading;
 6using Microsoft.Extensions.DependencyInjection;
 7using Microsoft.Extensions.Logging;
 8using Microsoft.Extensions.Options;
 9using Polly;
 10
 11namespace Elsa.Workflows.Runtime.Distributed;
 12
 13613public class DistributedWorkflowClient(
 13614    string workflowInstanceId,
 13615    IDistributedLockProvider distributedLockProvider,
 13616    ITransientExceptionDetector transientExceptionDetector,
 13617    IOptions<DistributedLockingOptions> distributedLockingOptions,
 13618    IServiceProvider serviceProvider,
 13619    ILogger<DistributedWorkflowClient> logger)
 20    : IWorkflowClient
 21{
 13622    private readonly LocalWorkflowClient _localWorkflowClient = ActivatorUtilities.CreateInstance<LocalWorkflowClient>(s
 26123    private readonly Lazy<ResiliencePipeline> _retryPipeline = new(() => CreateRetryPipeline(transientExceptionDetector,
 13624    public string WorkflowInstanceId => workflowInstanceId;
 25
 26    public async Task<CreateWorkflowInstanceResponse> CreateInstanceAsync(CreateWorkflowInstanceRequest request, Cancell
 27    {
 3228        return await _localWorkflowClient.CreateInstanceAsync(request, cancellationToken);
 3229    }
 30
 31    public async Task<RunWorkflowInstanceResponse> RunInstanceAsync(RunWorkflowInstanceRequest request, CancellationToke
 32    {
 14133        var result = await WithLockAsync(async () => await _localWorkflowClient.RunInstanceAsync(request, cancellationTo
 7034        return result;
 7035    }
 36
 37    public async Task<RunWorkflowInstanceResponse> CreateAndRunInstanceAsync(CreateAndRunWorkflowInstanceRequest request
 38    {
 5339        var createRequest = new CreateWorkflowInstanceRequest
 5340        {
 5341            Properties = request.Properties,
 5342            CorrelationId = request.CorrelationId,
 5343            Name = request.Name,
 5344            Input = request.Input,
 5345            WorkflowDefinitionHandle = request.WorkflowDefinitionHandle,
 5346            ParentId = request.ParentId
 5347        };
 5348        var workflowInstance = await _localWorkflowClient.CreateInstanceInternalAsync(createRequest, cancellationToken);
 49
 50        // We need to lock newly created workflow instances too, because it might dispatch child workflows that attempt 
 51        // For example, when using a DispatchWorkflow activity configured to wait for the dispatched workflow to complet
 10652        return await WithLockAsync(async () => await _localWorkflowClient.RunInstanceAsync(workflowInstance, new()
 10653        {
 10654            Input = request.Input,
 10655            Variables = request.Variables,
 10656            Properties = request.Properties,
 10657            TriggerActivityId = request.TriggerActivityId,
 10658            ActivityHandle = request.ActivityHandle,
 10659            IncludeWorkflowOutput = request.IncludeWorkflowOutput
 10660        }, cancellationToken), cancellationToken);
 5361    }
 62
 63    public async Task CancelAsync(CancellationToken cancellationToken = default)
 64    {
 065        await _localWorkflowClient.CancelAsync(cancellationToken);
 066    }
 67
 68    public async Task<WorkflowState> ExportStateAsync(CancellationToken cancellationToken = default)
 69    {
 1170        return await _localWorkflowClient.ExportStateAsync(cancellationToken);
 1171    }
 72
 73    public async Task ImportStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default)
 74    {
 075        await _localWorkflowClient.ImportStateAsync(workflowState, cancellationToken);
 076    }
 77
 78    public async Task<bool> InstanceExistsAsync(CancellationToken cancellationToken = default)
 79    {
 080        return await _localWorkflowClient.InstanceExistsAsync(cancellationToken);
 081    }
 82
 83    public async Task<bool> DeleteAsync(CancellationToken cancellationToken = default)
 84    {
 85        // Use the same distributed lock as for execution to prevent concurrent DB writes
 1686        return await WithLockAsync(async () => await _localWorkflowClient.DeleteAsync(cancellationToken), cancellationTo
 887    }
 88
 89    private async Task<TReturn> WithLockAsync<TReturn>(Func<Task<TReturn>> func, CancellationToken cancellationToken = d
 90    {
 13291        var lockKey = $"workflow-instance:{WorkflowInstanceId}";
 13292        var lockHandle = await AcquireLockWithRetryAsync(lockKey, cancellationToken);
 93
 94        try
 95        {
 13196            return await func();
 97        }
 98        finally
 99        {
 131100            await ReleaseLockAsync(lockHandle);
 101        }
 131102    }
 103
 104    private async Task<IDistributedSynchronizationHandle?> AcquireLockWithRetryAsync(string lockKey, CancellationToken c
 105    {
 132106        var lockTimeout = distributedLockingOptions.Value.LockAcquisitionTimeout;
 107
 132108        return await _retryPipeline.Value.ExecuteAsync(async ct =>
 138109            await distributedLockProvider.AcquireLockAsync(lockKey, lockTimeout, ct),
 132110            cancellationToken);
 131111    }
 112
 113    private async Task ReleaseLockAsync(IDistributedSynchronizationHandle? lockHandle)
 114    {
 131115        if (lockHandle == null)
 0116            return;
 117
 118        try
 119        {
 131120            await lockHandle.DisposeAsync();
 130121        }
 1122        catch (Exception ex)
 123        {
 124            // Log but don't throw - the work is already done, and the lock
 125            // will be automatically released when the connection dies
 1126            logger.LogWarning(ex, "Failed to release distributed lock for workflow instance {WorkflowInstanceId}. The lo
 1127        }
 131128    }
 129
 130    private static ResiliencePipeline CreateRetryPipeline(
 131        ITransientExceptionDetector transientExceptionDetector,
 132        ILogger<DistributedWorkflowClient> logger,
 133        string workflowInstanceId)
 134    {
 135        const int maxRetryAttempts = 3;
 136
 125137        return new ResiliencePipelineBuilder()
 125138            .AddRetry(new()
 125139            {
 125140                MaxRetryAttempts = maxRetryAttempts,
 125141                Delay = TimeSpan.FromMilliseconds(500),
 125142                BackoffType = DelayBackoffType.Exponential,
 125143                UseJitter = true,
 125144                ShouldHandle = new PredicateBuilder().Handle<Exception>(transientExceptionDetector.IsTransient),
 125145                OnRetry = args =>
 125146                {
 6147                    logger.LogWarning(args.Outcome.Exception, "Transient error acquiring lock for workflow instance {Wor
 6148                    return ValueTask.CompletedTask;
 125149                }
 125150            })
 125151            .Build();
 152    }
 153}