| | | 1 | | using Elsa.Common.Models; |
| | | 2 | | using Elsa.Common.Services; |
| | | 3 | | using Elsa.Extensions; |
| | | 4 | | using Elsa.Workflows.Runtime.Entities; |
| | | 5 | | using Elsa.Workflows.Runtime.Filters; |
| | | 6 | | using Elsa.Workflows.Runtime.OrderDefinitions; |
| | | 7 | | using Elsa.Workflows.Runtime.Options; |
| | | 8 | | using JetBrains.Annotations; |
| | | 9 | | |
| | | 10 | | namespace Elsa.Workflows.Runtime.Stores; |
| | | 11 | | |
| | | 12 | | /// <inheritdoc /> |
| | | 13 | | [UsedImplicitly] |
| | 187 | 14 | | public class MemoryBookmarkQueueDeadLetterStore(MemoryStore<BookmarkQueueDeadLetterItem> store) : IBookmarkQueueDeadLett |
| | | 15 | | { |
| | 187 | 16 | | private readonly object _lock = new(); |
| | | 17 | | |
| | | 18 | | /// <inheritdoc /> |
| | | 19 | | public Task SaveAsync(BookmarkQueueDeadLetterItem record, CancellationToken cancellationToken = default) |
| | | 20 | | { |
| | 3 | 21 | | lock (_lock) |
| | | 22 | | { |
| | 7 | 23 | | var existing = store.Find(x => x.OriginalQueueItemId == record.OriginalQueueItemId && x.Id != record.Id); |
| | 3 | 24 | | if (existing != null) |
| | 0 | 25 | | throw new InvalidOperationException($"A bookmark queue dead-letter item for original queue item '{record |
| | | 26 | | |
| | 6 | 27 | | store.Save(Clone(record), x => x.Id); |
| | 3 | 28 | | } |
| | | 29 | | |
| | 3 | 30 | | return Task.CompletedTask; |
| | | 31 | | } |
| | | 32 | | |
| | | 33 | | /// <inheritdoc /> |
| | | 34 | | public async Task AddAsync(BookmarkQueueDeadLetterItem record, CancellationToken cancellationToken = default) |
| | | 35 | | { |
| | 25 | 36 | | await AddOrGetExistingAsync(record, cancellationToken); |
| | 25 | 37 | | } |
| | | 38 | | |
| | | 39 | | /// <inheritdoc /> |
| | | 40 | | public Task<BookmarkQueueDeadLetterItem> AddOrGetExistingAsync(BookmarkQueueDeadLetterItem record, CancellationToken |
| | | 41 | | { |
| | 26 | 42 | | lock (_lock) |
| | | 43 | | { |
| | 45 | 44 | | var existing = store.Find(x => x.OriginalQueueItemId == record.OriginalQueueItemId); |
| | 26 | 45 | | if (existing != null) |
| | 19 | 46 | | return Task.FromResult(Clone(existing)); |
| | | 47 | | |
| | 14 | 48 | | store.Add(Clone(record), x => x.Id); |
| | 7 | 49 | | return Task.FromResult(Clone(record)); |
| | | 50 | | } |
| | 26 | 51 | | } |
| | | 52 | | |
| | | 53 | | /// <inheritdoc /> |
| | | 54 | | public Task<IReadOnlyCollection<BookmarkQueueDeadLetterItem>> AddOrGetExistingManyAsync(IEnumerable<BookmarkQueueDea |
| | | 55 | | { |
| | 7 | 56 | | lock (_lock) |
| | | 57 | | { |
| | 7 | 58 | | var results = new List<BookmarkQueueDeadLetterItem>(); |
| | | 59 | | |
| | 32 | 60 | | foreach (var record in records) |
| | | 61 | | { |
| | 11 | 62 | | var existing = store.Find(x => x.OriginalQueueItemId == record.OriginalQueueItemId); |
| | 9 | 63 | | if (existing != null) |
| | | 64 | | { |
| | 0 | 65 | | results.Add(Clone(existing)); |
| | 0 | 66 | | continue; |
| | | 67 | | } |
| | | 68 | | |
| | 18 | 69 | | store.Add(Clone(record), x => x.Id); |
| | 9 | 70 | | results.Add(Clone(record)); |
| | | 71 | | } |
| | | 72 | | |
| | 7 | 73 | | return Task.FromResult<IReadOnlyCollection<BookmarkQueueDeadLetterItem>>(results); |
| | | 74 | | } |
| | 7 | 75 | | } |
| | | 76 | | |
| | | 77 | | /// <inheritdoc /> |
| | | 78 | | public Task<BookmarkQueueDeadLetterItem?> TryMarkReplayedAsync(string id, string queueItemId, DateTimeOffset replaye |
| | | 79 | | { |
| | 8 | 80 | | lock (_lock) |
| | | 81 | | { |
| | 16 | 82 | | var entity = store.Find(x => x.Id == id); |
| | 8 | 83 | | if (entity == null || !entity.CanReplay || entity.ReplayedAt != null) |
| | 3 | 84 | | return Task.FromResult<BookmarkQueueDeadLetterItem?>(null); |
| | | 85 | | |
| | 5 | 86 | | entity.ReplayedAt = replayedAt; |
| | 5 | 87 | | entity.ReplayedQueueItemId = queueItemId; |
| | 5 | 88 | | entity.CanReplay = false; |
| | 10 | 89 | | store.Save(entity, x => x.Id); |
| | 5 | 90 | | return Task.FromResult<BookmarkQueueDeadLetterItem?>(Clone(entity)); |
| | | 91 | | } |
| | 8 | 92 | | } |
| | | 93 | | |
| | | 94 | | /// <inheritdoc /> |
| | | 95 | | public Task<BookmarkQueueDeadLetterItem?> FindAsync(BookmarkQueueDeadLetterFilter filter, CancellationToken cancella |
| | | 96 | | { |
| | | 97 | | BookmarkQueueDeadLetterItem? entity; |
| | 5 | 98 | | lock (_lock) |
| | | 99 | | { |
| | 10 | 100 | | entity = store.Query(query => Filter(query, filter)).Select(Clone).FirstOrDefault(); |
| | 5 | 101 | | } |
| | | 102 | | |
| | 5 | 103 | | return Task.FromResult(entity); |
| | | 104 | | } |
| | | 105 | | |
| | | 106 | | /// <inheritdoc /> |
| | | 107 | | public Task<Page<BookmarkQueueDeadLetterItem>> PageAsync<TOrderBy>(PageArgs pageArgs, BookmarkQueueDeadLetterItemOrd |
| | | 108 | | { |
| | | 109 | | Page<BookmarkQueueDeadLetterItem> entities; |
| | 0 | 110 | | lock (_lock) |
| | | 111 | | { |
| | 0 | 112 | | var page = store.Query(query => query.OrderBy(orderBy)).Paginate(pageArgs); |
| | 0 | 113 | | entities = page with { Items = page.Items.Select(Clone).ToList() }; |
| | 0 | 114 | | } |
| | | 115 | | |
| | 0 | 116 | | return Task.FromResult(entities); |
| | | 117 | | } |
| | | 118 | | |
| | | 119 | | /// <inheritdoc /> |
| | | 120 | | public Task<Page<BookmarkQueueDeadLetterItem>> PageAsync<TOrderBy>(PageArgs pageArgs, BookmarkQueueDeadLetterFilter |
| | | 121 | | { |
| | | 122 | | Page<BookmarkQueueDeadLetterItem> entities; |
| | 12 | 123 | | lock (_lock) |
| | | 124 | | { |
| | 24 | 125 | | var page = store.Query(query => Filter(query, filter).OrderBy(orderBy)).Paginate(pageArgs); |
| | 12 | 126 | | entities = page with { Items = page.Items.Select(Clone).ToList() }; |
| | 12 | 127 | | } |
| | | 128 | | |
| | 12 | 129 | | return Task.FromResult(entities); |
| | | 130 | | } |
| | | 131 | | |
| | | 132 | | /// <inheritdoc /> |
| | | 133 | | public Task<IEnumerable<BookmarkQueueDeadLetterItem>> FindManyAsync(BookmarkQueueDeadLetterFilter filter, Cancellati |
| | | 134 | | { |
| | | 135 | | IEnumerable<BookmarkQueueDeadLetterItem> entities; |
| | 25 | 136 | | lock (_lock) |
| | | 137 | | { |
| | 50 | 138 | | entities = store.Query(query => Filter(query, filter)).Select(Clone).ToList(); |
| | 25 | 139 | | } |
| | | 140 | | |
| | 25 | 141 | | return Task.FromResult(entities); |
| | | 142 | | } |
| | | 143 | | |
| | | 144 | | /// <inheritdoc /> |
| | | 145 | | public Task<long> DeleteAsync(BookmarkQueueDeadLetterFilter filter, CancellationToken cancellationToken = default) |
| | | 146 | | { |
| | 1 | 147 | | lock (_lock) |
| | | 148 | | { |
| | 3 | 149 | | var ids = store.Query(query => Filter(query, filter)).Select(x => x.Id).ToList(); |
| | 1 | 150 | | return Task.FromResult(store.DeleteMany(ids)); |
| | | 151 | | } |
| | 1 | 152 | | } |
| | | 153 | | |
| | 43 | 154 | | private static IQueryable<BookmarkQueueDeadLetterItem> Filter(IQueryable<BookmarkQueueDeadLetterItem> query, Bookmar |
| | | 155 | | |
| | | 156 | | private static BookmarkQueueDeadLetterItem Clone(BookmarkQueueDeadLetterItem item) |
| | | 157 | | { |
| | 80 | 158 | | return new() |
| | 80 | 159 | | { |
| | 80 | 160 | | Id = item.Id, |
| | 80 | 161 | | TenantId = item.TenantId, |
| | 80 | 162 | | OriginalQueueItemId = item.OriginalQueueItemId, |
| | 80 | 163 | | WorkflowInstanceId = item.WorkflowInstanceId, |
| | 80 | 164 | | CorrelationId = item.CorrelationId, |
| | 80 | 165 | | BookmarkId = item.BookmarkId, |
| | 80 | 166 | | StimulusHash = item.StimulusHash, |
| | 80 | 167 | | ActivityInstanceId = item.ActivityInstanceId, |
| | 80 | 168 | | ActivityTypeName = item.ActivityTypeName, |
| | 80 | 169 | | Options = Clone(item.Options), |
| | 80 | 170 | | OriginalCreatedAt = item.OriginalCreatedAt, |
| | 80 | 171 | | DeadLetteredAt = item.DeadLetteredAt, |
| | 80 | 172 | | Reason = item.Reason, |
| | 80 | 173 | | DeliveryAttempts = item.DeliveryAttempts, |
| | 80 | 174 | | LastAttemptedAt = item.LastAttemptedAt, |
| | 80 | 175 | | LastErrorType = item.LastErrorType, |
| | 80 | 176 | | LastErrorMessage = item.LastErrorMessage, |
| | 80 | 177 | | CanReplay = item.CanReplay, |
| | 80 | 178 | | ReplayedAt = item.ReplayedAt, |
| | 80 | 179 | | ReplayedQueueItemId = item.ReplayedQueueItemId |
| | 80 | 180 | | }; |
| | | 181 | | } |
| | | 182 | | |
| | | 183 | | private static ResumeBookmarkOptions? Clone(ResumeBookmarkOptions? options) |
| | | 184 | | { |
| | 80 | 185 | | if (options == null) |
| | 80 | 186 | | return null; |
| | | 187 | | |
| | 0 | 188 | | return new() |
| | 0 | 189 | | { |
| | 0 | 190 | | Input = options.Input == null ? null : new Dictionary<string, object>(options.Input), |
| | 0 | 191 | | Properties = options.Properties == null ? null : new Dictionary<string, object>(options.Properties) |
| | 0 | 192 | | }; |
| | | 193 | | } |
| | | 194 | | } |