< Summary

Information
Class: Elsa.Workflows.Runtime.WorkflowDispatchOutboxProcessor
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/WorkflowDispatchOutboxProcessor.cs
Line coverage
78%
Covered lines: 123
Uncovered lines: 34
Coverable lines: 157
Total lines: 324
Line coverage: 78.3%
Branch coverage
66%
Covered branches: 30
Total branches: 45
Branch coverage: 66.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ProcessAsync()100%22100%
TryProcessAsync()50%2283.33%
ProcessPendingItemsAsync()100%2258.33%
GetLockResource()83.33%66100%
ProcessAsync()100%4487.5%
SendAsync()15.38%471341.17%
HandleMissingOwnerAsync()100%44100%
HandleUncommittedOwnerAsync()100%44100%
TryDeleteRetainedItemAsync()100%1175%
HandleDeliveryFailureAsync()100%4486.66%
TryDeleteAbandonedItemAsync()100%1156.25%
CleanupDeliveredItemAsync()100%1177.77%
RemoveCommittedMarkerAsync()50%4484.61%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/WorkflowDispatchOutboxProcessor.cs

#LineLine coverage
 1using Elsa.Common;
 2using Elsa.Common.DistributedHosting;
 3using Elsa.Common.Models;
 4using Elsa.Common.Multitenancy;
 5using Elsa.Mediator;
 6using Elsa.Mediator.Contracts;
 7using Elsa.Tenants.Mediator;
 8using Elsa.Workflows.Management;
 9using Elsa.Workflows.Management.Entities;
 10using Elsa.Workflows.Management.Filters;
 11using Elsa.Workflows.Runtime.Models;
 12using Elsa.Workflows.Runtime.Options;
 13using Medallion.Threading;
 14using Microsoft.Extensions.Logging;
 15using Microsoft.Extensions.Options;
 16
 17namespace Elsa.Workflows.Runtime;
 18
 19/// <inheritdoc />
 2620public class WorkflowDispatchOutboxProcessor(
 2621    IWorkflowDispatchOutboxStore store,
 2622    IWorkflowInstanceStore workflowInstanceStore,
 2623    ICommandSender commandSender,
 2624    IDistributedLockProvider distributedLockProvider,
 2625    ISystemClock systemClock,
 2626    IOptions<DistributedLockingOptions> distributedLockingOptions,
 2627    IOptions<WorkflowDispatcherOptions> dispatcherOptions,
 2628    ILogger<WorkflowDispatchOutboxProcessor> logger,
 2629    ITenantAccessor? tenantAccessor = null) : IWorkflowDispatchOutboxProcessor
 30{
 31    private const string LockResource = "Elsa:WorkflowDispatchOutbox:Processor";
 32
 33    /// <inheritdoc />
 34    public async Task ProcessAsync(CancellationToken cancellationToken = default)
 35    {
 36        IDistributedSynchronizationHandle? handle;
 37
 38        try
 39        {
 1840            handle = await distributedLockProvider.TryAcquireLockAsync(GetLockResource(), distributedLockingOptions.Valu
 1741        }
 142        catch (TimeoutException e) when (!cancellationToken.IsCancellationRequested)
 43        {
 144            logger.LogDebug(e, "Skipping workflow dispatch outbox processing because the processor lock could not be acq
 145            return;
 46        }
 47
 1748        await using (handle)
 49        {
 1750            if (handle == null)
 51            {
 152                logger.LogDebug("Skipping workflow dispatch outbox processing because another processor owns the lock.")
 153                return;
 54            }
 55
 1656            await ProcessPendingItemsAsync(cancellationToken);
 57        }
 1858    }
 59
 60    /// <inheritdoc />
 61    public async Task<bool> TryProcessAsync(CancellationToken cancellationToken = default)
 62    {
 63        IDistributedSynchronizationHandle? handle;
 64
 65        try
 66        {
 267            handle = await distributedLockProvider.TryAcquireLockAsync(GetLockResource(), TimeSpan.Zero, cancellationTok
 168        }
 169        catch (TimeoutException e) when (!cancellationToken.IsCancellationRequested)
 70        {
 171            logger.LogDebug(e, "Skipping eager workflow dispatch outbox processing because the processor lock could not 
 172            return false;
 73        }
 74
 175        await using (handle)
 76        {
 177            if (handle == null)
 78            {
 079                logger.LogDebug("Skipping eager workflow dispatch outbox processing because another processor owns the l
 080                return false;
 81            }
 82
 183            await ProcessPendingItemsAsync(cancellationToken);
 184            return true;
 85        }
 286    }
 87
 88    private async Task ProcessPendingItemsAsync(CancellationToken cancellationToken)
 89    {
 1790        var batchSize = Math.Max(1, dispatcherOptions.Value.OutboxProcessorBatchSize);
 1791        var items = (await store.FindManyAsync(batchSize, cancellationToken)).ToList();
 92
 6893        foreach (var item in items)
 94        {
 95            try
 96            {
 1797                await ProcessAsync(item, cancellationToken);
 1798            }
 099            catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 100            {
 0101                throw;
 102            }
 0103            catch (Exception e) when (e is not OperationCanceledException)
 104            {
 0105                logger.LogError(e, "Failed to process workflow dispatch outbox item {OutboxItemId}; processing will cont
 0106            }
 17107        }
 17108    }
 109
 110    private string GetLockResource()
 111    {
 20112        var tenantId = tenantAccessor?.Tenant?.Id;
 20113        return string.IsNullOrWhiteSpace(tenantId) ? LockResource : $"{LockResource}:{tenantId}";
 114    }
 115
 116    private async Task ProcessAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken)
 117    {
 17118        var owner = await workflowInstanceStore.FindAsync(new WorkflowInstanceFilter { Id = item.OwnerWorkflowInstanceId
 119
 17120        if (owner == null)
 121        {
 3122            await HandleMissingOwnerAsync(item, cancellationToken);
 3123            return;
 124        }
 125
 14126        if (!owner.WorkflowState.HasWorkflowDispatchOutboxItem(item.Id))
 127        {
 3128            await HandleUncommittedOwnerAsync(item, cancellationToken);
 3129            return;
 130        }
 131
 132        try
 133        {
 11134            await SendAsync(item, cancellationToken);
 7135        }
 0136        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 137        {
 0138            throw;
 139        }
 4140        catch (Exception e) when (e is not OperationCanceledException)
 141        {
 4142            await HandleDeliveryFailureAsync(item, e, cancellationToken);
 4143            return;
 144        }
 145
 7146        await CleanupDeliveredItemAsync(item, cancellationToken);
 17147    }
 148
 149    private async Task SendAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken)
 150    {
 11151        var headers = TenantHeaders.CreateHeaders(item.TenantId);
 152
 11153        switch (item.Kind)
 154        {
 11155            case WorkflowDispatchOutboxItemKind.WorkflowDefinition when item.WorkflowDefinitionCommand != null:
 11156                await commandSender.SendAsync(item.WorkflowDefinitionCommand, CommandStrategy.Background, headers, cance
 7157                break;
 0158            case WorkflowDispatchOutboxItemKind.WorkflowInstance when item.WorkflowInstanceCommand != null:
 0159                await commandSender.SendAsync(item.WorkflowInstanceCommand, CommandStrategy.Background, headers, cancell
 0160                break;
 0161            case WorkflowDispatchOutboxItemKind.TriggerWorkflows when item.TriggerWorkflowsCommand != null:
 0162                await commandSender.SendAsync(item.TriggerWorkflowsCommand, CommandStrategy.Background, headers, cancell
 0163                break;
 0164            case WorkflowDispatchOutboxItemKind.ResumeWorkflows when item.ResumeWorkflowsCommand != null:
 0165                await commandSender.SendAsync(item.ResumeWorkflowsCommand, CommandStrategy.Background, headers, cancella
 0166                break;
 167            default:
 0168                throw new InvalidOperationException($"Outbox item {item.Id} does not contain a dispatch command for kind
 169        }
 170
 7171        logger.LogDebug("Delivered workflow dispatch outbox item {OutboxItemId} for owner workflow {WorkflowInstanceId}"
 7172    }
 173
 174    private async Task HandleMissingOwnerAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken)
 175    {
 3176        var retention = dispatcherOptions.Value.OrphanedOutboxItemRetention;
 3177        var expiresAt = item.CreatedAt.Add(retention);
 178
 3179        if (retention <= TimeSpan.Zero || systemClock.UtcNow >= expiresAt)
 180        {
 2181            logger.LogWarning("Deleting workflow dispatch outbox item {OutboxItemId} because owner workflow {WorkflowIns
 2182            await TryDeleteRetainedItemAsync(item, cancellationToken);
 2183            return;
 184        }
 185
 1186        logger.LogDebug("Skipping workflow dispatch outbox item {OutboxItemId} because owner workflow {WorkflowInstanceI
 3187    }
 188
 189    private async Task HandleUncommittedOwnerAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken)
 190    {
 3191        var retention = dispatcherOptions.Value.OrphanedOutboxItemRetention;
 3192        var expiresAt = item.CreatedAt.Add(retention);
 193
 3194        if (retention <= TimeSpan.Zero || systemClock.UtcNow >= expiresAt)
 195        {
 2196            logger.LogWarning("Deleting workflow dispatch outbox item {OutboxItemId} because owner workflow {WorkflowIns
 2197            await TryDeleteRetainedItemAsync(item, cancellationToken);
 2198            return;
 199        }
 200
 1201        logger.LogDebug("Skipping workflow dispatch outbox item {OutboxItemId} because owner workflow {WorkflowInstanceI
 3202    }
 203
 204    private async Task TryDeleteRetainedItemAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken)
 205    {
 206        try
 207        {
 4208            await store.DeleteAsync(item.Id, cancellationToken);
 2209        }
 0210        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 211        {
 0212            throw;
 213        }
 2214        catch (Exception e) when (e is not OperationCanceledException)
 215        {
 2216            logger.LogWarning(e, "Failed to delete retained workflow dispatch outbox item {OutboxItemId}; it will be ret
 2217        }
 4218    }
 219
 220    private async Task HandleDeliveryFailureAsync(WorkflowDispatchOutboxItem item, Exception exception, CancellationToke
 221    {
 4222        item.DeliveryAttempts++;
 223
 224        try
 225        {
 4226            if (item.DeliveryAttempts >= dispatcherOptions.Value.MaxOutboxDeliveryAttempts)
 227            {
 2228                logger.LogWarning(exception, "Abandoning workflow dispatch outbox item {OutboxItemId} after {DeliveryAtt
 2229                if (await TryDeleteAbandonedItemAsync(item, cancellationToken))
 1230                    await RemoveCommittedMarkerAsync(item, cancellationToken);
 2231                return;
 232            }
 233
 2234            await store.SaveAsync(item, cancellationToken);
 1235            logger.LogError(exception, "Failed to deliver workflow dispatch outbox item {OutboxItemId}; attempt {Deliver
 1236        }
 0237        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 238        {
 0239            throw;
 240        }
 1241        catch (Exception e) when (e is not OperationCanceledException)
 242        {
 1243            logger.LogError(e, "Failed to persist delivery failure state for workflow dispatch outbox item {OutboxItemId
 1244        }
 4245    }
 246
 247    private async Task<bool> TryDeleteAbandonedItemAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellation
 248    {
 249        try
 250        {
 2251            await store.DeleteAsync(item.Id, cancellationToken);
 1252            return true;
 253        }
 0254        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 255        {
 0256            throw;
 257        }
 1258        catch (Exception e) when (e is not OperationCanceledException)
 259        {
 1260            logger.LogError(e, "Failed to delete abandoned workflow dispatch outbox item {OutboxItemId}; preserving its 
 1261        }
 262
 263        try
 264        {
 1265            await store.SaveAsync(item, cancellationToken);
 1266        }
 0267        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 268        {
 0269            throw;
 270        }
 0271        catch (Exception e) when (e is not OperationCanceledException)
 272        {
 0273            logger.LogError(e, "Failed to preserve delivery attempt count for abandoned workflow dispatch outbox item {O
 0274        }
 275
 1276        return false;
 2277    }
 278
 279    private async Task CleanupDeliveredItemAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken)
 280    {
 281        try
 282        {
 7283            await store.DeleteAsync(item.Id, cancellationToken);
 6284        }
 0285        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 286        {
 0287            throw;
 288        }
 1289        catch (Exception e) when (e is not OperationCanceledException)
 290        {
 1291            logger.LogWarning(e, "Delivered workflow dispatch outbox item {OutboxItemId}, but failed to delete it from t
 1292            return;
 293        }
 294
 6295        await RemoveCommittedMarkerAsync(item, cancellationToken);
 7296    }
 297
 298    private async Task RemoveCommittedMarkerAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken)
 299    {
 7300        var lockResource = $"workflow-instance:{item.OwnerWorkflowInstanceId}";
 301
 302        try
 303        {
 7304            await using var handle = await distributedLockProvider.AcquireLockAsync(lockResource, distributedLockingOpti
 7305            var owner = await workflowInstanceStore.FindAsync(new WorkflowInstanceFilter { Id = item.OwnerWorkflowInstan
 306
 7307            if (owner == null)
 308                return;
 309
 7310            if (!owner.WorkflowState.RemoveWorkflowDispatchOutboxItem(item.Id))
 311                return;
 312
 7313            await workflowInstanceStore.SaveAsync(owner, cancellationToken);
 6314        }
 0315        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 316        {
 0317            throw;
 318        }
 1319        catch (Exception e) when (e is not OperationCanceledException)
 320        {
 1321            logger.LogWarning(e, "Failed to remove delivered workflow dispatch outbox item {OutboxItemId} from owner wor
 1322        }
 7323    }
 324}