| | | 1 | | using Microsoft.Extensions.DependencyInjection; |
| | | 2 | | using Microsoft.Extensions.Logging; |
| | | 3 | | using ThrottleDebounce; |
| | | 4 | | |
| | | 5 | | namespace Elsa.Workflows.Runtime; |
| | | 6 | | |
| | | 7 | | public class BookmarkQueueWorker : IBookmarkQueueWorker |
| | | 8 | | { |
| | | 9 | | private readonly RateLimitedFunc<CancellationToken, Task> _rateLimitedProcessAsync; |
| | | 10 | | private CancellationTokenSource _cts = null!; |
| | | 11 | | private bool _running; |
| | | 12 | | private readonly IBookmarkQueueSignaler _signaler; |
| | | 13 | | private readonly IServiceScopeFactory _scopeFactory; |
| | | 14 | | private readonly ILogger<BookmarkQueueWorker> _logger; |
| | | 15 | | |
| | 5 | 16 | | public BookmarkQueueWorker(IBookmarkQueueSignaler signaler, IServiceScopeFactory scopeFactory, ILogger<BookmarkQueue |
| | | 17 | | { |
| | 5 | 18 | | _signaler = signaler; |
| | 5 | 19 | | _scopeFactory = scopeFactory; |
| | 5 | 20 | | _logger = logger; |
| | 5 | 21 | | _rateLimitedProcessAsync = Throttler.Throttle<CancellationToken, Task>(ProcessAsync, TimeSpan.FromMilliseconds(5 |
| | 5 | 22 | | } |
| | | 23 | | |
| | | 24 | | public void Start() |
| | | 25 | | { |
| | 5 | 26 | | if (_running) |
| | 0 | 27 | | return; |
| | | 28 | | |
| | 5 | 29 | | _cts = new(); |
| | 5 | 30 | | _running = true; |
| | | 31 | | |
| | 5 | 32 | | _ = Task.Run(AwaitSignalAsync); |
| | 5 | 33 | | } |
| | | 34 | | |
| | | 35 | | public void Stop() |
| | | 36 | | { |
| | 5 | 37 | | if (_running) |
| | | 38 | | { |
| | 5 | 39 | | _running = false; |
| | 5 | 40 | | _cts.Cancel(); |
| | | 41 | | } |
| | | 42 | | |
| | 5 | 43 | | _cts.Dispose(); |
| | 5 | 44 | | } |
| | | 45 | | |
| | | 46 | | private async Task AwaitSignalAsync() |
| | | 47 | | { |
| | 485 | 48 | | while (!_cts.IsCancellationRequested) |
| | | 49 | | { |
| | | 50 | | try |
| | | 51 | | { |
| | 485 | 52 | | await _signaler.AwaitAsync(_cts.Token); |
| | 480 | 53 | | await _rateLimitedProcessAsync.InvokeAsync(_cts.Token); |
| | 480 | 54 | | } |
| | 5 | 55 | | catch (OperationCanceledException) |
| | | 56 | | { |
| | 5 | 57 | | break; // Stop() was called |
| | | 58 | | } |
| | 0 | 59 | | catch (Exception ex) |
| | | 60 | | { |
| | 0 | 61 | | _logger.LogError(ex, "BookmarkQueueWorker error – continuing loop"); |
| | 0 | 62 | | } |
| | | 63 | | } |
| | 5 | 64 | | } |
| | | 65 | | |
| | | 66 | | protected virtual async Task ProcessAsync(CancellationToken cancellationToken) |
| | | 67 | | { |
| | 49 | 68 | | _logger.LogDebug("Processing bookmark queue..."); |
| | 49 | 69 | | using var scope = _scopeFactory.CreateScope(); |
| | 48 | 70 | | var processor = scope.ServiceProvider.GetRequiredService<IBookmarkQueueProcessor>(); |
| | 48 | 71 | | await processor.ProcessAsync(cancellationToken); |
| | 48 | 72 | | _logger.LogDebug("Processed bookmark queue."); |
| | 48 | 73 | | } |
| | | 74 | | } |