| | | 1 | | using Elsa.Common; |
| | | 2 | | using Elsa.Common.Multitenancy; |
| | | 3 | | using Elsa.Workflows.Management; |
| | | 4 | | using Elsa.Workflows.Management.Filters; |
| | | 5 | | using Elsa.Workflows.Runtime.Options; |
| | | 6 | | using Microsoft.Extensions.Logging; |
| | | 7 | | using Microsoft.Extensions.Options; |
| | | 8 | | |
| | | 9 | | namespace Elsa.Workflows.Runtime.Services; |
| | | 10 | | |
| | | 11 | | /// <summary> |
| | | 12 | | /// Default implementation of <see cref="IInterruptedRecoveryScanner"/>. |
| | | 13 | | /// </summary> |
| | | 14 | | public sealed class InterruptedRecoveryScanner : IInterruptedRecoveryScanner |
| | | 15 | | { |
| | | 16 | | private readonly IWorkflowRestarter _restarter; |
| | | 17 | | private readonly IWorkflowInstanceStore _instanceStore; |
| | | 18 | | private readonly IOptions<RuntimeOptions> _runtimeOptions; |
| | | 19 | | private readonly ILogger<InterruptedRecoveryScanner> _logger; |
| | | 20 | | private readonly ITenantService? _tenantService; |
| | | 21 | | private readonly ITenantAccessor? _tenantAccessor; |
| | | 22 | | |
| | 92 | 23 | | public InterruptedRecoveryScanner( |
| | 92 | 24 | | IWorkflowRestarter restarter, |
| | 92 | 25 | | IWorkflowInstanceStore instanceStore, |
| | 92 | 26 | | IOptions<RuntimeOptions> runtimeOptions, |
| | 92 | 27 | | ILogger<InterruptedRecoveryScanner> logger, |
| | 92 | 28 | | ITenantService? tenantService = null, |
| | 92 | 29 | | ITenantAccessor? tenantAccessor = null) |
| | | 30 | | { |
| | 92 | 31 | | _restarter = restarter; |
| | 92 | 32 | | _instanceStore = instanceStore; |
| | 92 | 33 | | _runtimeOptions = runtimeOptions; |
| | 92 | 34 | | _logger = logger; |
| | 92 | 35 | | _tenantService = tenantService; |
| | 92 | 36 | | _tenantAccessor = tenantAccessor; |
| | 92 | 37 | | } |
| | | 38 | | |
| | | 39 | | /// <inheritdoc /> |
| | | 40 | | public async ValueTask<int> ScanAndRequeueAsync(CancellationToken cancellationToken) |
| | | 41 | | { |
| | 92 | 42 | | var filter = new WorkflowInstanceFilter { WorkflowSubStatus = WorkflowSubStatus.Interrupted }; |
| | 92 | 43 | | var batchSize = _runtimeOptions.Value.RestartInterruptedWorkflowsBatchSize; |
| | 92 | 44 | | var instances = _instanceStore.EnumerateSummariesAsync(filter, batchSize, cancellationToken); |
| | 92 | 45 | | var requeued = 0; |
| | | 46 | | |
| | 92 | 47 | | _logger.LogInformation("Scanning for workflows interrupted by graceful drain."); |
| | | 48 | | |
| | 438 | 49 | | await foreach (var summary in instances.WithCancellation(cancellationToken)) |
| | | 50 | | { |
| | | 51 | | try |
| | | 52 | | { |
| | 127 | 53 | | var tenantId = summary.TenantId ?? string.Empty; |
| | | 54 | | |
| | 127 | 55 | | if (_tenantService is not null && _tenantAccessor is not null && !string.IsNullOrWhiteSpace(tenantId) && |
| | | 56 | | { |
| | 1 | 57 | | var tenant = await _tenantService.FindAsync(tenantId, cancellationToken) ?? new Tenant { Id = tenant |
| | | 58 | | |
| | 1 | 59 | | using (_tenantAccessor.PushContext(tenant)) |
| | 1 | 60 | | await _restarter.RestartWorkflowAsync(summary.Id, cancellationToken); |
| | | 61 | | } |
| | | 62 | | else |
| | | 63 | | { |
| | 126 | 64 | | await _restarter.RestartWorkflowAsync(summary.Id, cancellationToken); |
| | | 65 | | } |
| | | 66 | | |
| | 126 | 67 | | requeued++; |
| | 126 | 68 | | } |
| | 1 | 69 | | catch (Exception ex) when (!ex.IsFatal()) |
| | | 70 | | { |
| | 1 | 71 | | _logger.LogError(ex, "Failed to requeue interrupted workflow {WorkflowInstanceId}; will be retried by th |
| | 1 | 72 | | } |
| | 127 | 73 | | } |
| | | 74 | | |
| | 92 | 75 | | _logger.LogInformation("Interrupted-workflow scan complete; requeued {Count} instance(s).", requeued); |
| | 92 | 76 | | return requeued; |
| | 92 | 77 | | } |
| | | 78 | | } |