| | | 1 | | using Elsa.Common.Entities; |
| | | 2 | | using Elsa.Common.Models; |
| | | 3 | | using Elsa.Extensions; |
| | | 4 | | using Elsa.Workflows.Runtime.Entities; |
| | | 5 | | using Elsa.Workflows.Runtime.OrderDefinitions; |
| | | 6 | | using Microsoft.Extensions.Logging; |
| | | 7 | | |
| | | 8 | | namespace Elsa.Workflows.Runtime; |
| | | 9 | | |
| | 48 | 10 | | public class BookmarkQueueProcessor(IBookmarkQueueStore store, IWorkflowResumer workflowResumer, ILogger<BookmarkQueuePr |
| | | 11 | | { |
| | | 12 | | public async Task ProcessAsync(CancellationToken cancellationToken = default) |
| | | 13 | | { |
| | 48 | 14 | | var batchSize = 50; |
| | 48 | 15 | | var offset = 0; |
| | | 16 | | |
| | 48 | 17 | | while (!cancellationToken.IsCancellationRequested) |
| | | 18 | | { |
| | 44 | 19 | | var pageArgs = PageArgs.FromRange(offset, batchSize); |
| | 44 | 20 | | var page = await store.PageAsync(pageArgs, new BookmarkQueueItemOrder<DateTimeOffset>(x => x.CreatedAt, Orde |
| | | 21 | | |
| | 44 | 22 | | await ProcessPageAsync(page, cancellationToken); |
| | | 23 | | |
| | 44 | 24 | | if (page.Items.Count < batchSize) |
| | | 25 | | break; |
| | | 26 | | |
| | 0 | 27 | | offset += batchSize; |
| | 0 | 28 | | } |
| | 48 | 29 | | } |
| | | 30 | | |
| | | 31 | | private async Task ProcessPageAsync(Page<BookmarkQueueItem> page, CancellationToken cancellationToken = default) |
| | | 32 | | { |
| | 2288 | 33 | | foreach (var bookmarkQueueItem in page.Items) |
| | 1100 | 34 | | await ProcessItemAsync(bookmarkQueueItem, cancellationToken); |
| | 44 | 35 | | } |
| | | 36 | | |
| | | 37 | | private async Task ProcessItemAsync(BookmarkQueueItem item, CancellationToken cancellationToken = default) |
| | | 38 | | { |
| | 1100 | 39 | | var filter = item.CreateBookmarkFilter(); |
| | 1100 | 40 | | var options = item.Options; |
| | | 41 | | |
| | 1100 | 42 | | logger.LogDebug("Processing bookmark queue item {BookmarkQueueItemId} for workflow instance {WorkflowInstanceId} |
| | | 43 | | |
| | 1100 | 44 | | var responses = (await workflowResumer.ResumeAsync(filter, options, cancellationToken)).ToList(); |
| | | 45 | | |
| | 1100 | 46 | | if (responses.Count > 0) |
| | | 47 | | { |
| | 18 | 48 | | logger.LogDebug("Successfully resumed {WorkflowCount} workflow instances using stimulus {StimulusHash} for a |
| | 18 | 49 | | await store.DeleteAsync(item.Id, cancellationToken); |
| | | 50 | | } |
| | | 51 | | else |
| | | 52 | | { |
| | 1082 | 53 | | logger.LogDebug("No matching bookmarks found for bookmark queue item {BookmarkQueueItemId} for workflow inst |
| | | 54 | | } |
| | 1100 | 55 | | } |
| | | 56 | | } |