| | | 1 | | using Elsa.Common; |
| | | 2 | | using Elsa.Workflows.Runtime.Entities; |
| | | 3 | | using Elsa.Workflows.Runtime.Filters; |
| | | 4 | | using Microsoft.Extensions.Logging; |
| | | 5 | | |
| | | 6 | | namespace Elsa.Workflows.Runtime; |
| | | 7 | | |
| | 450 | 8 | | public class BookmarkQueueDeadLetterManager( |
| | 450 | 9 | | IBookmarkQueueDeadLetterStore deadLetterStore, |
| | 450 | 10 | | IBookmarkQueueStore bookmarkQueueStore, |
| | 450 | 11 | | IBookmarkQueueSignaler bookmarkQueueSignaler, |
| | 450 | 12 | | ISystemClock systemClock, |
| | 450 | 13 | | IIdentityGenerator identityGenerator, |
| | 450 | 14 | | ILogger<BookmarkQueueDeadLetterManager> logger) : IBookmarkQueueDeadLetterManager |
| | | 15 | | { |
| | | 16 | | public async Task<BookmarkQueueDeadLetterItem> DeadLetterAsync(BookmarkQueueItem item, string reason, Exception? exc |
| | | 17 | | { |
| | 6 | 18 | | var results = await DeadLetterManyAsync([item], reason, exception, cancellationToken); |
| | 6 | 19 | | return results.Single(); |
| | 6 | 20 | | } |
| | | 21 | | |
| | | 22 | | public async Task<IReadOnlyCollection<BookmarkQueueDeadLetterItem>> DeadLetterManyAsync(IEnumerable<BookmarkQueueIte |
| | | 23 | | { |
| | 12 | 24 | | var itemList = items.ToList(); |
| | | 25 | | |
| | 12 | 26 | | if (itemList.Count == 0) |
| | 0 | 27 | | return Array.Empty<BookmarkQueueDeadLetterItem>(); |
| | | 28 | | |
| | 26 | 29 | | var originalQueueItemIds = itemList.Select(x => x.Id).ToList(); |
| | 12 | 30 | | var existingDeadLetters = (await deadLetterStore.FindManyAsync(new BookmarkQueueDeadLetterFilter { OriginalQueue |
| | 15 | 31 | | var existingByOriginalQueueItemId = existingDeadLetters.ToDictionary(x => x.OriginalQueueItemId); |
| | 12 | 32 | | var now = systemClock.UtcNow; |
| | 12 | 33 | | var deadLetterItems = itemList |
| | 14 | 34 | | .Where(item => !existingByOriginalQueueItemId.ContainsKey(item.Id)) |
| | 11 | 35 | | .Select(item => CreateDeadLetterItem(item, reason, exception, now)) |
| | 12 | 36 | | .ToList(); |
| | | 37 | | |
| | 12 | 38 | | if (deadLetterItems.Count == 0) |
| | 6 | 39 | | return itemList.Select(x => existingByOriginalQueueItemId[x.Id]).ToList(); |
| | | 40 | | |
| | 9 | 41 | | var savedDeadLetterItems = await deadLetterStore.AddOrGetExistingManyAsync(deadLetterItems, cancellationToken); |
| | 18 | 42 | | var savedByOriginalQueueItemId = savedDeadLetterItems.ToDictionary(x => x.OriginalQueueItemId); |
| | 18 | 43 | | var newDeadLetterIds = deadLetterItems.Select(x => x.Id).ToHashSet(); |
| | | 44 | | |
| | 46 | 45 | | foreach (var savedDeadLetterItem in savedDeadLetterItems.Where(x => newDeadLetterIds.Contains(x.Id))) |
| | | 46 | | { |
| | 10 | 47 | | logger.LogInformation( |
| | 10 | 48 | | "Moved bookmark queue item {BookmarkQueueItemId} to dead letter {BookmarkQueueDeadLetterItemId} because |
| | 10 | 49 | | savedDeadLetterItem.OriginalQueueItemId, |
| | 10 | 50 | | savedDeadLetterItem.Id, |
| | 10 | 51 | | reason); |
| | | 52 | | } |
| | | 53 | | |
| | 18 | 54 | | return itemList.Select(x => existingByOriginalQueueItemId.GetValueOrDefault(x.Id) ?? savedByOriginalQueueItemId[ |
| | 11 | 55 | | } |
| | | 56 | | |
| | | 57 | | private BookmarkQueueDeadLetterItem CreateDeadLetterItem(BookmarkQueueItem item, string reason, Exception? exception |
| | | 58 | | { |
| | 11 | 59 | | var deadLetterItem = new BookmarkQueueDeadLetterItem |
| | 11 | 60 | | { |
| | 11 | 61 | | Id = identityGenerator.GenerateId(), |
| | 11 | 62 | | TenantId = item.TenantId, |
| | 11 | 63 | | OriginalQueueItemId = item.Id, |
| | 11 | 64 | | WorkflowInstanceId = item.WorkflowInstanceId, |
| | 11 | 65 | | CorrelationId = item.CorrelationId, |
| | 11 | 66 | | BookmarkId = item.BookmarkId, |
| | 11 | 67 | | StimulusHash = item.StimulusHash, |
| | 11 | 68 | | ActivityInstanceId = item.ActivityInstanceId, |
| | 11 | 69 | | ActivityTypeName = item.ActivityTypeName, |
| | 11 | 70 | | Options = item.Options, |
| | 11 | 71 | | OriginalCreatedAt = item.CreatedAt, |
| | 11 | 72 | | DeadLetteredAt = now, |
| | 11 | 73 | | Reason = reason, |
| | 11 | 74 | | DeliveryAttempts = item.DeliveryAttempts, |
| | 11 | 75 | | LastAttemptedAt = item.LastAttemptedAt, |
| | 11 | 76 | | LastErrorType = exception?.GetType().FullName ?? item.LastErrorType, |
| | 11 | 77 | | LastErrorMessage = exception?.Message ?? item.LastErrorMessage, |
| | 11 | 78 | | CanReplay = true |
| | 11 | 79 | | }; |
| | | 80 | | |
| | 11 | 81 | | return deadLetterItem; |
| | | 82 | | } |
| | | 83 | | |
| | | 84 | | public async Task<ReplayBookmarkQueueDeadLetterResult> ReplayAsync(string id, CancellationToken cancellationToken = |
| | | 85 | | { |
| | 8 | 86 | | var queueItemId = identityGenerator.GenerateId(); |
| | 8 | 87 | | var replayedAt = systemClock.UtcNow; |
| | 8 | 88 | | var item = await deadLetterStore.TryMarkReplayedAsync(id, queueItemId, replayedAt, cancellationToken); |
| | | 89 | | |
| | 8 | 90 | | if (item == null) |
| | | 91 | | { |
| | 3 | 92 | | var existing = await deadLetterStore.FindAsync(new BookmarkQueueDeadLetterFilter { Id = id }, cancellationTo |
| | 3 | 93 | | return existing == null |
| | 3 | 94 | | ? new(false, null, ReplayBookmarkQueueDeadLetterResult.ReasonNotFound) |
| | 3 | 95 | | : new(false, existing.ReplayedQueueItemId, ReplayBookmarkQueueDeadLetterResult.ReasonNotReplayable); |
| | | 96 | | } |
| | | 97 | | |
| | 5 | 98 | | var queueItem = new BookmarkQueueItem |
| | 5 | 99 | | { |
| | 5 | 100 | | Id = queueItemId, |
| | 5 | 101 | | TenantId = item.TenantId, |
| | 5 | 102 | | WorkflowInstanceId = item.WorkflowInstanceId, |
| | 5 | 103 | | CorrelationId = item.CorrelationId, |
| | 5 | 104 | | BookmarkId = item.BookmarkId, |
| | 5 | 105 | | StimulusHash = item.StimulusHash, |
| | 5 | 106 | | ActivityInstanceId = item.ActivityInstanceId, |
| | 5 | 107 | | ActivityTypeName = item.ActivityTypeName, |
| | 5 | 108 | | Options = item.Options, |
| | 5 | 109 | | CreatedAt = systemClock.UtcNow |
| | 5 | 110 | | }; |
| | | 111 | | |
| | | 112 | | try |
| | | 113 | | { |
| | 5 | 114 | | await bookmarkQueueStore.AddAsync(queueItem, cancellationToken); |
| | 4 | 115 | | } |
| | 1 | 116 | | catch (Exception ex) when (ex is not OutOfMemoryException and not StackOverflowException) |
| | | 117 | | { |
| | 1 | 118 | | item.CanReplay = true; |
| | 1 | 119 | | item.ReplayedAt = null; |
| | 1 | 120 | | item.ReplayedQueueItemId = null; |
| | 1 | 121 | | await deadLetterStore.SaveAsync(item, CancellationToken.None); |
| | | 122 | | |
| | 1 | 123 | | logger.LogWarning( |
| | 1 | 124 | | ex, |
| | 1 | 125 | | "Failed to enqueue replayed bookmark queue item {BookmarkQueueItemId}; restored dead-letter item {Bookma |
| | 1 | 126 | | queueItem.Id, |
| | 1 | 127 | | item.Id); |
| | | 128 | | |
| | 1 | 129 | | throw; |
| | | 130 | | } |
| | | 131 | | |
| | 4 | 132 | | await bookmarkQueueSignaler.TriggerAsync(cancellationToken); |
| | | 133 | | |
| | 4 | 134 | | logger.LogInformation( |
| | 4 | 135 | | "Replayed bookmark queue dead-letter item {BookmarkQueueDeadLetterItemId} as queue item {BookmarkQueueItemId |
| | 4 | 136 | | item.Id, |
| | 4 | 137 | | queueItem.Id); |
| | | 138 | | |
| | 4 | 139 | | return new(true, queueItem.Id, null); |
| | 7 | 140 | | } |
| | | 141 | | } |