| | | 1 | | using Elsa.Common.DistributedHosting; |
| | | 2 | | using Elsa.Resilience; |
| | | 3 | | using Elsa.Workflows.Runtime.Messages; |
| | | 4 | | using Elsa.Workflows.State; |
| | | 5 | | using Medallion.Threading; |
| | | 6 | | using Microsoft.Extensions.DependencyInjection; |
| | | 7 | | using Microsoft.Extensions.Logging; |
| | | 8 | | using Microsoft.Extensions.Options; |
| | | 9 | | using Polly; |
| | | 10 | | |
| | | 11 | | namespace Elsa.Workflows.Runtime.Distributed; |
| | | 12 | | |
| | 136 | 13 | | public class DistributedWorkflowClient( |
| | 136 | 14 | | string workflowInstanceId, |
| | 136 | 15 | | IDistributedLockProvider distributedLockProvider, |
| | 136 | 16 | | ITransientExceptionDetector transientExceptionDetector, |
| | 136 | 17 | | IOptions<DistributedLockingOptions> distributedLockingOptions, |
| | 136 | 18 | | IServiceProvider serviceProvider, |
| | 136 | 19 | | ILogger<DistributedWorkflowClient> logger) |
| | | 20 | | : IWorkflowClient |
| | | 21 | | { |
| | 136 | 22 | | private readonly LocalWorkflowClient _localWorkflowClient = ActivatorUtilities.CreateInstance<LocalWorkflowClient>(s |
| | 261 | 23 | | private readonly Lazy<ResiliencePipeline> _retryPipeline = new(() => CreateRetryPipeline(transientExceptionDetector, |
| | 136 | 24 | | public string WorkflowInstanceId => workflowInstanceId; |
| | | 25 | | |
| | | 26 | | public async Task<CreateWorkflowInstanceResponse> CreateInstanceAsync(CreateWorkflowInstanceRequest request, Cancell |
| | | 27 | | { |
| | 32 | 28 | | return await _localWorkflowClient.CreateInstanceAsync(request, cancellationToken); |
| | 32 | 29 | | } |
| | | 30 | | |
| | | 31 | | public async Task<RunWorkflowInstanceResponse> RunInstanceAsync(RunWorkflowInstanceRequest request, CancellationToke |
| | | 32 | | { |
| | 141 | 33 | | var result = await WithLockAsync(async () => await _localWorkflowClient.RunInstanceAsync(request, cancellationTo |
| | 70 | 34 | | return result; |
| | 70 | 35 | | } |
| | | 36 | | |
| | | 37 | | public async Task<RunWorkflowInstanceResponse> CreateAndRunInstanceAsync(CreateAndRunWorkflowInstanceRequest request |
| | | 38 | | { |
| | 53 | 39 | | var createRequest = new CreateWorkflowInstanceRequest |
| | 53 | 40 | | { |
| | 53 | 41 | | Properties = request.Properties, |
| | 53 | 42 | | CorrelationId = request.CorrelationId, |
| | 53 | 43 | | Name = request.Name, |
| | 53 | 44 | | Input = request.Input, |
| | 53 | 45 | | WorkflowDefinitionHandle = request.WorkflowDefinitionHandle, |
| | 53 | 46 | | ParentId = request.ParentId |
| | 53 | 47 | | }; |
| | 53 | 48 | | var workflowInstance = await _localWorkflowClient.CreateInstanceInternalAsync(createRequest, cancellationToken); |
| | | 49 | | |
| | | 50 | | // We need to lock newly created workflow instances too, because it might dispatch child workflows that attempt |
| | | 51 | | // For example, when using a DispatchWorkflow activity configured to wait for the dispatched workflow to complet |
| | 106 | 52 | | return await WithLockAsync(async () => await _localWorkflowClient.RunInstanceAsync(workflowInstance, new() |
| | 106 | 53 | | { |
| | 106 | 54 | | Input = request.Input, |
| | 106 | 55 | | Variables = request.Variables, |
| | 106 | 56 | | Properties = request.Properties, |
| | 106 | 57 | | TriggerActivityId = request.TriggerActivityId, |
| | 106 | 58 | | ActivityHandle = request.ActivityHandle, |
| | 106 | 59 | | IncludeWorkflowOutput = request.IncludeWorkflowOutput |
| | 106 | 60 | | }, cancellationToken), cancellationToken); |
| | 53 | 61 | | } |
| | | 62 | | |
| | | 63 | | public async Task CancelAsync(CancellationToken cancellationToken = default) |
| | | 64 | | { |
| | 0 | 65 | | await _localWorkflowClient.CancelAsync(cancellationToken); |
| | 0 | 66 | | } |
| | | 67 | | |
| | | 68 | | public async Task<WorkflowState> ExportStateAsync(CancellationToken cancellationToken = default) |
| | | 69 | | { |
| | 11 | 70 | | return await _localWorkflowClient.ExportStateAsync(cancellationToken); |
| | 11 | 71 | | } |
| | | 72 | | |
| | | 73 | | public async Task ImportStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default) |
| | | 74 | | { |
| | 0 | 75 | | await _localWorkflowClient.ImportStateAsync(workflowState, cancellationToken); |
| | 0 | 76 | | } |
| | | 77 | | |
| | | 78 | | public async Task<bool> InstanceExistsAsync(CancellationToken cancellationToken = default) |
| | | 79 | | { |
| | 0 | 80 | | return await _localWorkflowClient.InstanceExistsAsync(cancellationToken); |
| | 0 | 81 | | } |
| | | 82 | | |
| | | 83 | | public async Task<bool> DeleteAsync(CancellationToken cancellationToken = default) |
| | | 84 | | { |
| | | 85 | | // Use the same distributed lock as for execution to prevent concurrent DB writes |
| | 16 | 86 | | return await WithLockAsync(async () => await _localWorkflowClient.DeleteAsync(cancellationToken), cancellationTo |
| | 8 | 87 | | } |
| | | 88 | | |
| | | 89 | | private async Task<TReturn> WithLockAsync<TReturn>(Func<Task<TReturn>> func, CancellationToken cancellationToken = d |
| | | 90 | | { |
| | 132 | 91 | | var lockKey = $"workflow-instance:{WorkflowInstanceId}"; |
| | 132 | 92 | | var lockHandle = await AcquireLockWithRetryAsync(lockKey, cancellationToken); |
| | | 93 | | |
| | | 94 | | try |
| | | 95 | | { |
| | 131 | 96 | | return await func(); |
| | | 97 | | } |
| | | 98 | | finally |
| | | 99 | | { |
| | 131 | 100 | | await ReleaseLockAsync(lockHandle); |
| | | 101 | | } |
| | 131 | 102 | | } |
| | | 103 | | |
| | | 104 | | private async Task<IDistributedSynchronizationHandle?> AcquireLockWithRetryAsync(string lockKey, CancellationToken c |
| | | 105 | | { |
| | 132 | 106 | | var lockTimeout = distributedLockingOptions.Value.LockAcquisitionTimeout; |
| | | 107 | | |
| | 132 | 108 | | return await _retryPipeline.Value.ExecuteAsync(async ct => |
| | 138 | 109 | | await distributedLockProvider.AcquireLockAsync(lockKey, lockTimeout, ct), |
| | 132 | 110 | | cancellationToken); |
| | 131 | 111 | | } |
| | | 112 | | |
| | | 113 | | private async Task ReleaseLockAsync(IDistributedSynchronizationHandle? lockHandle) |
| | | 114 | | { |
| | 131 | 115 | | if (lockHandle == null) |
| | 0 | 116 | | return; |
| | | 117 | | |
| | | 118 | | try |
| | | 119 | | { |
| | 131 | 120 | | await lockHandle.DisposeAsync(); |
| | 130 | 121 | | } |
| | 1 | 122 | | catch (Exception ex) |
| | | 123 | | { |
| | | 124 | | // Log but don't throw - the work is already done, and the lock |
| | | 125 | | // will be automatically released when the connection dies |
| | 1 | 126 | | logger.LogWarning(ex, "Failed to release distributed lock for workflow instance {WorkflowInstanceId}. The lo |
| | 1 | 127 | | } |
| | 131 | 128 | | } |
| | | 129 | | |
| | | 130 | | private static ResiliencePipeline CreateRetryPipeline( |
| | | 131 | | ITransientExceptionDetector transientExceptionDetector, |
| | | 132 | | ILogger<DistributedWorkflowClient> logger, |
| | | 133 | | string workflowInstanceId) |
| | | 134 | | { |
| | | 135 | | const int maxRetryAttempts = 3; |
| | | 136 | | |
| | 125 | 137 | | return new ResiliencePipelineBuilder() |
| | 125 | 138 | | .AddRetry(new() |
| | 125 | 139 | | { |
| | 125 | 140 | | MaxRetryAttempts = maxRetryAttempts, |
| | 125 | 141 | | Delay = TimeSpan.FromMilliseconds(500), |
| | 125 | 142 | | BackoffType = DelayBackoffType.Exponential, |
| | 125 | 143 | | UseJitter = true, |
| | 125 | 144 | | ShouldHandle = new PredicateBuilder().Handle<Exception>(transientExceptionDetector.IsTransient), |
| | 125 | 145 | | OnRetry = args => |
| | 125 | 146 | | { |
| | 6 | 147 | | logger.LogWarning(args.Outcome.Exception, "Transient error acquiring lock for workflow instance {Wor |
| | 6 | 148 | | return ValueTask.CompletedTask; |
| | 125 | 149 | | } |
| | 125 | 150 | | }) |
| | 125 | 151 | | .Build(); |
| | | 152 | | } |
| | | 153 | | } |