| | | 1 | | using Elsa.Common.DistributedHosting; |
| | | 2 | | using Elsa.Workflows.Runtime.Messages; |
| | | 3 | | using Elsa.Workflows.State; |
| | | 4 | | using Medallion.Threading; |
| | | 5 | | using Microsoft.Extensions.DependencyInjection; |
| | | 6 | | using Microsoft.Extensions.Options; |
| | | 7 | | |
| | | 8 | | namespace Elsa.Workflows.Runtime.Distributed; |
| | | 9 | | |
| | 122 | 10 | | public class DistributedWorkflowClient( |
| | 122 | 11 | | string workflowInstanceId, |
| | 122 | 12 | | IDistributedLockProvider distributedLockProvider, |
| | 122 | 13 | | IOptions<DistributedLockingOptions> distributedLockingOptions, |
| | 122 | 14 | | IServiceProvider serviceProvider) |
| | | 15 | | : IWorkflowClient |
| | | 16 | | { |
| | 122 | 17 | | private readonly LocalWorkflowClient _localWorkflowClient = ActivatorUtilities.CreateInstance<LocalWorkflowClient>(s |
| | | 18 | | |
| | 118 | 19 | | public string WorkflowInstanceId => workflowInstanceId; |
| | | 20 | | |
| | | 21 | | public async Task<CreateWorkflowInstanceResponse> CreateInstanceAsync(CreateWorkflowInstanceRequest request, Cancell |
| | | 22 | | { |
| | 22 | 23 | | return await _localWorkflowClient.CreateInstanceAsync(request, cancellationToken); |
| | 22 | 24 | | } |
| | | 25 | | |
| | | 26 | | public async Task<RunWorkflowInstanceResponse> RunInstanceAsync(RunWorkflowInstanceRequest request, CancellationToke |
| | | 27 | | { |
| | 116 | 28 | | var result = await WithLockAsync(async () => await _localWorkflowClient.RunInstanceAsync(request, cancellationTo |
| | 58 | 29 | | return result; |
| | 58 | 30 | | } |
| | | 31 | | |
| | | 32 | | public async Task<RunWorkflowInstanceResponse> CreateAndRunInstanceAsync(CreateAndRunWorkflowInstanceRequest request |
| | | 33 | | { |
| | 52 | 34 | | var createRequest = new CreateWorkflowInstanceRequest |
| | 52 | 35 | | { |
| | 52 | 36 | | Properties = request.Properties, |
| | 52 | 37 | | CorrelationId = request.CorrelationId, |
| | 52 | 38 | | Name = request.Name, |
| | 52 | 39 | | Input = request.Input, |
| | 52 | 40 | | WorkflowDefinitionHandle = request.WorkflowDefinitionHandle, |
| | 52 | 41 | | ParentId = request.ParentId |
| | 52 | 42 | | }; |
| | 52 | 43 | | var workflowInstance = await _localWorkflowClient.CreateInstanceInternalAsync(createRequest, cancellationToken); |
| | | 44 | | |
| | | 45 | | // We need to lock newly created workflow instances too, because it might dispatch child workflows that attempt |
| | | 46 | | // For example, when using a DispatchWorkflow activity configured to wait for the dispatched workflow to complet |
| | 104 | 47 | | return await WithLockAsync(async () => await _localWorkflowClient.RunInstanceAsync(workflowInstance, new() |
| | 104 | 48 | | { |
| | 104 | 49 | | Input = request.Input, |
| | 104 | 50 | | Variables = request.Variables, |
| | 104 | 51 | | Properties = request.Properties, |
| | 104 | 52 | | TriggerActivityId = request.TriggerActivityId, |
| | 104 | 53 | | ActivityHandle = request.ActivityHandle, |
| | 104 | 54 | | IncludeWorkflowOutput = request.IncludeWorkflowOutput |
| | 104 | 55 | | }, cancellationToken)); |
| | 52 | 56 | | } |
| | | 57 | | |
| | | 58 | | public async Task CancelAsync(CancellationToken cancellationToken = default) |
| | | 59 | | { |
| | 0 | 60 | | await _localWorkflowClient.CancelAsync(cancellationToken); |
| | 0 | 61 | | } |
| | | 62 | | |
| | | 63 | | public async Task<WorkflowState> ExportStateAsync(CancellationToken cancellationToken = default) |
| | | 64 | | { |
| | 11 | 65 | | return await _localWorkflowClient.ExportStateAsync(cancellationToken); |
| | 11 | 66 | | } |
| | | 67 | | |
| | | 68 | | public async Task ImportStateAsync(WorkflowState workflowState, CancellationToken cancellationToken = default) |
| | | 69 | | { |
| | 0 | 70 | | await _localWorkflowClient.ImportStateAsync(workflowState, cancellationToken); |
| | 0 | 71 | | } |
| | | 72 | | |
| | | 73 | | public async Task<bool> InstanceExistsAsync(CancellationToken cancellationToken = default) |
| | | 74 | | { |
| | 0 | 75 | | return await _localWorkflowClient.InstanceExistsAsync(cancellationToken); |
| | 0 | 76 | | } |
| | | 77 | | |
| | | 78 | | public async Task<bool> DeleteAsync(CancellationToken cancellationToken = default) |
| | | 79 | | { |
| | | 80 | | // Use the same distributed lock as for execution to prevent concurrent DB writes |
| | 16 | 81 | | return await WithLockAsync(async () => await _localWorkflowClient.DeleteAsync(cancellationToken)); |
| | 8 | 82 | | } |
| | | 83 | | |
| | | 84 | | private async Task<R> WithLockAsync<R>(Func<Task<R>> func) |
| | | 85 | | { |
| | 118 | 86 | | var lockKey = $"workflow-instance:{WorkflowInstanceId}"; |
| | 118 | 87 | | var lockTimeout = distributedLockingOptions.Value.LockAcquisitionTimeout; |
| | 118 | 88 | | await using var @lock = await distributedLockProvider.AcquireLockAsync(lockKey, lockTimeout); |
| | 118 | 89 | | var result = await func(); |
| | 118 | 90 | | return result; |
| | 118 | 91 | | } |
| | | 92 | | } |