| | | 1 | | using Elsa.Common.Entities; |
| | | 2 | | using Elsa.Common.Models; |
| | | 3 | | using Elsa.Extensions; |
| | | 4 | | using Elsa.Common; |
| | | 5 | | using Elsa.Workflows.Runtime.Entities; |
| | | 6 | | using Elsa.Workflows.Runtime.Messages; |
| | | 7 | | using Elsa.Workflows.Runtime.Options; |
| | | 8 | | using Elsa.Workflows.Runtime.OrderDefinitions; |
| | | 9 | | using Microsoft.Extensions.Logging; |
| | | 10 | | using Microsoft.Extensions.Options; |
| | | 11 | | |
| | | 12 | | namespace Elsa.Workflows.Runtime; |
| | | 13 | | |
| | 353 | 14 | | public class BookmarkQueueProcessor( |
| | 353 | 15 | | IBookmarkQueueStore store, |
| | 353 | 16 | | IBookmarkQueueDeadLetterManager deadLetterManager, |
| | 353 | 17 | | IWorkflowResumer workflowResumer, |
| | 353 | 18 | | ISystemClock systemClock, |
| | 353 | 19 | | IOptions<BookmarkQueuePurgeOptions> options, |
| | 353 | 20 | | ILogger<BookmarkQueueProcessor> logger) : IBookmarkQueueProcessor |
| | | 21 | | { |
| | | 22 | | public async Task ProcessAsync(CancellationToken cancellationToken = default) |
| | | 23 | | { |
| | 354 | 24 | | var batchSize = 50; |
| | 354 | 25 | | var offset = 0; |
| | | 26 | | |
| | 354 | 27 | | while (!cancellationToken.IsCancellationRequested) |
| | | 28 | | { |
| | 352 | 29 | | var pageArgs = PageArgs.FromRange(offset, batchSize); |
| | 352 | 30 | | var page = await store.PageAsync(pageArgs, new BookmarkQueueItemOrder<DateTimeOffset>(x => x.CreatedAt, Orde |
| | | 31 | | |
| | 352 | 32 | | await ProcessPageAsync(page, cancellationToken); |
| | | 33 | | |
| | 351 | 34 | | if (page.Items.Count < batchSize) |
| | | 35 | | break; |
| | | 36 | | |
| | 0 | 37 | | offset += batchSize; |
| | 0 | 38 | | } |
| | 353 | 39 | | } |
| | | 40 | | |
| | | 41 | | private async Task ProcessPageAsync(Page<BookmarkQueueItem> page, CancellationToken cancellationToken = default) |
| | | 42 | | { |
| | 15611 | 43 | | foreach (var bookmarkQueueItem in page.Items) |
| | 7454 | 44 | | await ProcessItemAsync(bookmarkQueueItem, cancellationToken); |
| | 351 | 45 | | } |
| | | 46 | | |
| | | 47 | | private async Task ProcessItemAsync(BookmarkQueueItem item, CancellationToken cancellationToken = default) |
| | | 48 | | { |
| | 7454 | 49 | | var filter = item.CreateBookmarkFilter(); |
| | 7454 | 50 | | var resumeOptions = item.Options; |
| | | 51 | | |
| | 7454 | 52 | | logger.LogDebug("Processing bookmark queue item {BookmarkQueueItemId} for workflow instance {WorkflowInstanceId} |
| | | 53 | | |
| | | 54 | | List<RunWorkflowInstanceResponse> responses; |
| | | 55 | | |
| | | 56 | | try |
| | | 57 | | { |
| | 7454 | 58 | | responses = (await workflowResumer.ResumeAsync(filter, resumeOptions, cancellationToken)).ToList(); |
| | 7450 | 59 | | } |
| | 1 | 60 | | catch (OperationCanceledException) |
| | | 61 | | { |
| | 1 | 62 | | throw; |
| | | 63 | | } |
| | 3 | 64 | | catch (Exception ex) when (ex is not OutOfMemoryException and not StackOverflowException) |
| | | 65 | | { |
| | 3 | 66 | | await HandleFailureAsync(item, ex, cancellationToken); |
| | 3 | 67 | | return; |
| | | 68 | | } |
| | | 69 | | |
| | 7450 | 70 | | if (responses.Count > 0) |
| | | 71 | | { |
| | 21 | 72 | | logger.LogDebug("Successfully resumed {WorkflowCount} workflow instances using stimulus {StimulusHash} for a |
| | 21 | 73 | | await store.DeleteAsync(item.Id, cancellationToken); |
| | | 74 | | } |
| | | 75 | | else |
| | | 76 | | { |
| | 7429 | 77 | | logger.LogDebug("No matching bookmarks found for bookmark queue item {BookmarkQueueItemId} for workflow inst |
| | | 78 | | } |
| | 7453 | 79 | | } |
| | | 80 | | |
| | | 81 | | private async Task HandleFailureAsync(BookmarkQueueItem item, Exception exception, CancellationToken cancellationTok |
| | | 82 | | { |
| | 3 | 83 | | item.DeliveryAttempts++; |
| | 3 | 84 | | item.LastAttemptedAt = systemClock.UtcNow; |
| | 3 | 85 | | item.LastErrorType = exception.GetType().FullName; |
| | 3 | 86 | | item.LastErrorMessage = exception.Message; |
| | | 87 | | |
| | 3 | 88 | | if (item.DeliveryAttempts < options.Value.MaxDeliveryAttempts) |
| | | 89 | | { |
| | 1 | 90 | | logger.LogWarning( |
| | 1 | 91 | | exception, |
| | 1 | 92 | | "Failed to process bookmark queue item {BookmarkQueueItemId}. Attempt {DeliveryAttempt} of {MaxDeliveryA |
| | 1 | 93 | | item.Id, |
| | 1 | 94 | | item.DeliveryAttempts, |
| | 1 | 95 | | options.Value.MaxDeliveryAttempts); |
| | | 96 | | |
| | 1 | 97 | | await store.SaveAsync(item, cancellationToken); |
| | 1 | 98 | | return; |
| | | 99 | | } |
| | | 100 | | |
| | 2 | 101 | | logger.LogError( |
| | 2 | 102 | | exception, |
| | 2 | 103 | | "Moving bookmark queue item {BookmarkQueueItemId} to dead letter after {DeliveryAttempt} failed delivery att |
| | 2 | 104 | | item.Id, |
| | 2 | 105 | | item.DeliveryAttempts); |
| | | 106 | | |
| | 2 | 107 | | await deadLetterManager.DeadLetterAsync(item, "Failed", exception, cancellationToken); |
| | 2 | 108 | | await store.DeleteAsync(item.Id, cancellationToken); |
| | 3 | 109 | | } |
| | | 110 | | } |