| | | 1 | | using Elsa.Common; |
| | | 2 | | using Elsa.Common.DistributedHosting; |
| | | 3 | | using Elsa.Common.Models; |
| | | 4 | | using Elsa.Common.Multitenancy; |
| | | 5 | | using Elsa.Mediator; |
| | | 6 | | using Elsa.Mediator.Contracts; |
| | | 7 | | using Elsa.Tenants.Mediator; |
| | | 8 | | using Elsa.Workflows.Management; |
| | | 9 | | using Elsa.Workflows.Management.Entities; |
| | | 10 | | using Elsa.Workflows.Management.Filters; |
| | | 11 | | using Elsa.Workflows.Runtime.Models; |
| | | 12 | | using Elsa.Workflows.Runtime.Options; |
| | | 13 | | using Medallion.Threading; |
| | | 14 | | using Microsoft.Extensions.Logging; |
| | | 15 | | using Microsoft.Extensions.Options; |
| | | 16 | | |
| | | 17 | | namespace Elsa.Workflows.Runtime; |
| | | 18 | | |
| | | 19 | | /// <inheritdoc /> |
| | 26 | 20 | | public class WorkflowDispatchOutboxProcessor( |
| | 26 | 21 | | IWorkflowDispatchOutboxStore store, |
| | 26 | 22 | | IWorkflowInstanceStore workflowInstanceStore, |
| | 26 | 23 | | ICommandSender commandSender, |
| | 26 | 24 | | IDistributedLockProvider distributedLockProvider, |
| | 26 | 25 | | ISystemClock systemClock, |
| | 26 | 26 | | IOptions<DistributedLockingOptions> distributedLockingOptions, |
| | 26 | 27 | | IOptions<WorkflowDispatcherOptions> dispatcherOptions, |
| | 26 | 28 | | ILogger<WorkflowDispatchOutboxProcessor> logger, |
| | 26 | 29 | | ITenantAccessor? tenantAccessor = null) : IWorkflowDispatchOutboxProcessor |
| | | 30 | | { |
| | | 31 | | private const string LockResource = "Elsa:WorkflowDispatchOutbox:Processor"; |
| | | 32 | | |
| | | 33 | | /// <inheritdoc /> |
| | | 34 | | public async Task ProcessAsync(CancellationToken cancellationToken = default) |
| | | 35 | | { |
| | | 36 | | IDistributedSynchronizationHandle? handle; |
| | | 37 | | |
| | | 38 | | try |
| | | 39 | | { |
| | 18 | 40 | | handle = await distributedLockProvider.TryAcquireLockAsync(GetLockResource(), distributedLockingOptions.Valu |
| | 17 | 41 | | } |
| | 1 | 42 | | catch (TimeoutException e) when (!cancellationToken.IsCancellationRequested) |
| | | 43 | | { |
| | 1 | 44 | | logger.LogDebug(e, "Skipping workflow dispatch outbox processing because the processor lock could not be acq |
| | 1 | 45 | | return; |
| | | 46 | | } |
| | | 47 | | |
| | 17 | 48 | | await using (handle) |
| | | 49 | | { |
| | 17 | 50 | | if (handle == null) |
| | | 51 | | { |
| | 1 | 52 | | logger.LogDebug("Skipping workflow dispatch outbox processing because another processor owns the lock.") |
| | 1 | 53 | | return; |
| | | 54 | | } |
| | | 55 | | |
| | 16 | 56 | | await ProcessPendingItemsAsync(cancellationToken); |
| | | 57 | | } |
| | 18 | 58 | | } |
| | | 59 | | |
| | | 60 | | /// <inheritdoc /> |
| | | 61 | | public async Task<bool> TryProcessAsync(CancellationToken cancellationToken = default) |
| | | 62 | | { |
| | | 63 | | IDistributedSynchronizationHandle? handle; |
| | | 64 | | |
| | | 65 | | try |
| | | 66 | | { |
| | 2 | 67 | | handle = await distributedLockProvider.TryAcquireLockAsync(GetLockResource(), TimeSpan.Zero, cancellationTok |
| | 1 | 68 | | } |
| | 1 | 69 | | catch (TimeoutException e) when (!cancellationToken.IsCancellationRequested) |
| | | 70 | | { |
| | 1 | 71 | | logger.LogDebug(e, "Skipping eager workflow dispatch outbox processing because the processor lock could not |
| | 1 | 72 | | return false; |
| | | 73 | | } |
| | | 74 | | |
| | 1 | 75 | | await using (handle) |
| | | 76 | | { |
| | 1 | 77 | | if (handle == null) |
| | | 78 | | { |
| | 0 | 79 | | logger.LogDebug("Skipping eager workflow dispatch outbox processing because another processor owns the l |
| | 0 | 80 | | return false; |
| | | 81 | | } |
| | | 82 | | |
| | 1 | 83 | | await ProcessPendingItemsAsync(cancellationToken); |
| | 1 | 84 | | return true; |
| | | 85 | | } |
| | 2 | 86 | | } |
| | | 87 | | |
| | | 88 | | private async Task ProcessPendingItemsAsync(CancellationToken cancellationToken) |
| | | 89 | | { |
| | 17 | 90 | | var batchSize = Math.Max(1, dispatcherOptions.Value.OutboxProcessorBatchSize); |
| | 17 | 91 | | var items = (await store.FindManyAsync(batchSize, cancellationToken)).ToList(); |
| | | 92 | | |
| | 68 | 93 | | foreach (var item in items) |
| | | 94 | | { |
| | | 95 | | try |
| | | 96 | | { |
| | 17 | 97 | | await ProcessAsync(item, cancellationToken); |
| | 17 | 98 | | } |
| | 0 | 99 | | catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) |
| | | 100 | | { |
| | 0 | 101 | | throw; |
| | | 102 | | } |
| | 0 | 103 | | catch (Exception e) when (e is not OperationCanceledException) |
| | | 104 | | { |
| | 0 | 105 | | logger.LogError(e, "Failed to process workflow dispatch outbox item {OutboxItemId}; processing will cont |
| | 0 | 106 | | } |
| | 17 | 107 | | } |
| | 17 | 108 | | } |
| | | 109 | | |
| | | 110 | | private string GetLockResource() |
| | | 111 | | { |
| | 20 | 112 | | var tenantId = tenantAccessor?.Tenant?.Id; |
| | 20 | 113 | | return string.IsNullOrWhiteSpace(tenantId) ? LockResource : $"{LockResource}:{tenantId}"; |
| | | 114 | | } |
| | | 115 | | |
| | | 116 | | private async Task ProcessAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken) |
| | | 117 | | { |
| | 17 | 118 | | var owner = await workflowInstanceStore.FindAsync(new WorkflowInstanceFilter { Id = item.OwnerWorkflowInstanceId |
| | | 119 | | |
| | 17 | 120 | | if (owner == null) |
| | | 121 | | { |
| | 3 | 122 | | await HandleMissingOwnerAsync(item, cancellationToken); |
| | 3 | 123 | | return; |
| | | 124 | | } |
| | | 125 | | |
| | 14 | 126 | | if (!owner.WorkflowState.HasWorkflowDispatchOutboxItem(item.Id)) |
| | | 127 | | { |
| | 3 | 128 | | await HandleUncommittedOwnerAsync(item, cancellationToken); |
| | 3 | 129 | | return; |
| | | 130 | | } |
| | | 131 | | |
| | | 132 | | try |
| | | 133 | | { |
| | 11 | 134 | | await SendAsync(item, cancellationToken); |
| | 7 | 135 | | } |
| | 0 | 136 | | catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) |
| | | 137 | | { |
| | 0 | 138 | | throw; |
| | | 139 | | } |
| | 4 | 140 | | catch (Exception e) when (e is not OperationCanceledException) |
| | | 141 | | { |
| | 4 | 142 | | await HandleDeliveryFailureAsync(item, e, cancellationToken); |
| | 4 | 143 | | return; |
| | | 144 | | } |
| | | 145 | | |
| | 7 | 146 | | await CleanupDeliveredItemAsync(item, cancellationToken); |
| | 17 | 147 | | } |
| | | 148 | | |
| | | 149 | | private async Task SendAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken) |
| | | 150 | | { |
| | 11 | 151 | | var headers = TenantHeaders.CreateHeaders(item.TenantId); |
| | | 152 | | |
| | 11 | 153 | | switch (item.Kind) |
| | | 154 | | { |
| | 11 | 155 | | case WorkflowDispatchOutboxItemKind.WorkflowDefinition when item.WorkflowDefinitionCommand != null: |
| | 11 | 156 | | await commandSender.SendAsync(item.WorkflowDefinitionCommand, CommandStrategy.Background, headers, cance |
| | 7 | 157 | | break; |
| | 0 | 158 | | case WorkflowDispatchOutboxItemKind.WorkflowInstance when item.WorkflowInstanceCommand != null: |
| | 0 | 159 | | await commandSender.SendAsync(item.WorkflowInstanceCommand, CommandStrategy.Background, headers, cancell |
| | 0 | 160 | | break; |
| | 0 | 161 | | case WorkflowDispatchOutboxItemKind.TriggerWorkflows when item.TriggerWorkflowsCommand != null: |
| | 0 | 162 | | await commandSender.SendAsync(item.TriggerWorkflowsCommand, CommandStrategy.Background, headers, cancell |
| | 0 | 163 | | break; |
| | 0 | 164 | | case WorkflowDispatchOutboxItemKind.ResumeWorkflows when item.ResumeWorkflowsCommand != null: |
| | 0 | 165 | | await commandSender.SendAsync(item.ResumeWorkflowsCommand, CommandStrategy.Background, headers, cancella |
| | 0 | 166 | | break; |
| | | 167 | | default: |
| | 0 | 168 | | throw new InvalidOperationException($"Outbox item {item.Id} does not contain a dispatch command for kind |
| | | 169 | | } |
| | | 170 | | |
| | 7 | 171 | | logger.LogDebug("Delivered workflow dispatch outbox item {OutboxItemId} for owner workflow {WorkflowInstanceId}" |
| | 7 | 172 | | } |
| | | 173 | | |
| | | 174 | | private async Task HandleMissingOwnerAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken) |
| | | 175 | | { |
| | 3 | 176 | | var retention = dispatcherOptions.Value.OrphanedOutboxItemRetention; |
| | 3 | 177 | | var expiresAt = item.CreatedAt.Add(retention); |
| | | 178 | | |
| | 3 | 179 | | if (retention <= TimeSpan.Zero || systemClock.UtcNow >= expiresAt) |
| | | 180 | | { |
| | 2 | 181 | | logger.LogWarning("Deleting workflow dispatch outbox item {OutboxItemId} because owner workflow {WorkflowIns |
| | 2 | 182 | | await TryDeleteRetainedItemAsync(item, cancellationToken); |
| | 2 | 183 | | return; |
| | | 184 | | } |
| | | 185 | | |
| | 1 | 186 | | logger.LogDebug("Skipping workflow dispatch outbox item {OutboxItemId} because owner workflow {WorkflowInstanceI |
| | 3 | 187 | | } |
| | | 188 | | |
| | | 189 | | private async Task HandleUncommittedOwnerAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken) |
| | | 190 | | { |
| | 3 | 191 | | var retention = dispatcherOptions.Value.OrphanedOutboxItemRetention; |
| | 3 | 192 | | var expiresAt = item.CreatedAt.Add(retention); |
| | | 193 | | |
| | 3 | 194 | | if (retention <= TimeSpan.Zero || systemClock.UtcNow >= expiresAt) |
| | | 195 | | { |
| | 2 | 196 | | logger.LogWarning("Deleting workflow dispatch outbox item {OutboxItemId} because owner workflow {WorkflowIns |
| | 2 | 197 | | await TryDeleteRetainedItemAsync(item, cancellationToken); |
| | 2 | 198 | | return; |
| | | 199 | | } |
| | | 200 | | |
| | 1 | 201 | | logger.LogDebug("Skipping workflow dispatch outbox item {OutboxItemId} because owner workflow {WorkflowInstanceI |
| | 3 | 202 | | } |
| | | 203 | | |
| | | 204 | | private async Task TryDeleteRetainedItemAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken) |
| | | 205 | | { |
| | | 206 | | try |
| | | 207 | | { |
| | 4 | 208 | | await store.DeleteAsync(item.Id, cancellationToken); |
| | 2 | 209 | | } |
| | 0 | 210 | | catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) |
| | | 211 | | { |
| | 0 | 212 | | throw; |
| | | 213 | | } |
| | 2 | 214 | | catch (Exception e) when (e is not OperationCanceledException) |
| | | 215 | | { |
| | 2 | 216 | | logger.LogWarning(e, "Failed to delete retained workflow dispatch outbox item {OutboxItemId}; it will be ret |
| | 2 | 217 | | } |
| | 4 | 218 | | } |
| | | 219 | | |
| | | 220 | | private async Task HandleDeliveryFailureAsync(WorkflowDispatchOutboxItem item, Exception exception, CancellationToke |
| | | 221 | | { |
| | 4 | 222 | | item.DeliveryAttempts++; |
| | | 223 | | |
| | | 224 | | try |
| | | 225 | | { |
| | 4 | 226 | | if (item.DeliveryAttempts >= dispatcherOptions.Value.MaxOutboxDeliveryAttempts) |
| | | 227 | | { |
| | 2 | 228 | | logger.LogWarning(exception, "Abandoning workflow dispatch outbox item {OutboxItemId} after {DeliveryAtt |
| | 2 | 229 | | if (await TryDeleteAbandonedItemAsync(item, cancellationToken)) |
| | 1 | 230 | | await RemoveCommittedMarkerAsync(item, cancellationToken); |
| | 2 | 231 | | return; |
| | | 232 | | } |
| | | 233 | | |
| | 2 | 234 | | await store.SaveAsync(item, cancellationToken); |
| | 1 | 235 | | logger.LogError(exception, "Failed to deliver workflow dispatch outbox item {OutboxItemId}; attempt {Deliver |
| | 1 | 236 | | } |
| | 0 | 237 | | catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) |
| | | 238 | | { |
| | 0 | 239 | | throw; |
| | | 240 | | } |
| | 1 | 241 | | catch (Exception e) when (e is not OperationCanceledException) |
| | | 242 | | { |
| | 1 | 243 | | logger.LogError(e, "Failed to persist delivery failure state for workflow dispatch outbox item {OutboxItemId |
| | 1 | 244 | | } |
| | 4 | 245 | | } |
| | | 246 | | |
| | | 247 | | private async Task<bool> TryDeleteAbandonedItemAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellation |
| | | 248 | | { |
| | | 249 | | try |
| | | 250 | | { |
| | 2 | 251 | | await store.DeleteAsync(item.Id, cancellationToken); |
| | 1 | 252 | | return true; |
| | | 253 | | } |
| | 0 | 254 | | catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) |
| | | 255 | | { |
| | 0 | 256 | | throw; |
| | | 257 | | } |
| | 1 | 258 | | catch (Exception e) when (e is not OperationCanceledException) |
| | | 259 | | { |
| | 1 | 260 | | logger.LogError(e, "Failed to delete abandoned workflow dispatch outbox item {OutboxItemId}; preserving its |
| | 1 | 261 | | } |
| | | 262 | | |
| | | 263 | | try |
| | | 264 | | { |
| | 1 | 265 | | await store.SaveAsync(item, cancellationToken); |
| | 1 | 266 | | } |
| | 0 | 267 | | catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) |
| | | 268 | | { |
| | 0 | 269 | | throw; |
| | | 270 | | } |
| | 0 | 271 | | catch (Exception e) when (e is not OperationCanceledException) |
| | | 272 | | { |
| | 0 | 273 | | logger.LogError(e, "Failed to preserve delivery attempt count for abandoned workflow dispatch outbox item {O |
| | 0 | 274 | | } |
| | | 275 | | |
| | 1 | 276 | | return false; |
| | 2 | 277 | | } |
| | | 278 | | |
| | | 279 | | private async Task CleanupDeliveredItemAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken) |
| | | 280 | | { |
| | | 281 | | try |
| | | 282 | | { |
| | 7 | 283 | | await store.DeleteAsync(item.Id, cancellationToken); |
| | 6 | 284 | | } |
| | 0 | 285 | | catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) |
| | | 286 | | { |
| | 0 | 287 | | throw; |
| | | 288 | | } |
| | 1 | 289 | | catch (Exception e) when (e is not OperationCanceledException) |
| | | 290 | | { |
| | 1 | 291 | | logger.LogWarning(e, "Delivered workflow dispatch outbox item {OutboxItemId}, but failed to delete it from t |
| | 1 | 292 | | return; |
| | | 293 | | } |
| | | 294 | | |
| | 6 | 295 | | await RemoveCommittedMarkerAsync(item, cancellationToken); |
| | 7 | 296 | | } |
| | | 297 | | |
| | | 298 | | private async Task RemoveCommittedMarkerAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken) |
| | | 299 | | { |
| | 7 | 300 | | var lockResource = $"workflow-instance:{item.OwnerWorkflowInstanceId}"; |
| | | 301 | | |
| | | 302 | | try |
| | | 303 | | { |
| | 7 | 304 | | await using var handle = await distributedLockProvider.AcquireLockAsync(lockResource, distributedLockingOpti |
| | 7 | 305 | | var owner = await workflowInstanceStore.FindAsync(new WorkflowInstanceFilter { Id = item.OwnerWorkflowInstan |
| | | 306 | | |
| | 7 | 307 | | if (owner == null) |
| | | 308 | | return; |
| | | 309 | | |
| | 7 | 310 | | if (!owner.WorkflowState.RemoveWorkflowDispatchOutboxItem(item.Id)) |
| | | 311 | | return; |
| | | 312 | | |
| | 7 | 313 | | await workflowInstanceStore.SaveAsync(owner, cancellationToken); |
| | 6 | 314 | | } |
| | 0 | 315 | | catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) |
| | | 316 | | { |
| | 0 | 317 | | throw; |
| | | 318 | | } |
| | 1 | 319 | | catch (Exception e) when (e is not OperationCanceledException) |
| | | 320 | | { |
| | 1 | 321 | | logger.LogWarning(e, "Failed to remove delivered workflow dispatch outbox item {OutboxItemId} from owner wor |
| | 1 | 322 | | } |
| | 7 | 323 | | } |
| | | 324 | | } |