| | | 1 | | using System.Collections.Concurrent; |
| | | 2 | | using System.Collections.Generic; |
| | | 3 | | using Microsoft.Extensions.Logging; |
| | | 4 | | using Elsa.Extensions; |
| | | 5 | | |
| | | 6 | | namespace Elsa.Scheduling.Services; |
| | | 7 | | |
| | | 8 | | /// <summary> |
| | | 9 | | /// Represents a local, in-memory scheduler that schedules tasks in-process. |
| | | 10 | | /// </summary> |
| | | 11 | | public class LocalScheduler : IScheduler |
| | | 12 | | { |
| | | 13 | | private readonly IServiceProvider _serviceProvider; |
| | | 14 | | private readonly ILogger<LocalScheduler> _logger; |
| | 135 | 15 | | private readonly ConcurrentDictionary<string, IScheduledTask> _scheduledTasks = new(); |
| | 135 | 16 | | private readonly ConcurrentDictionary<IScheduledTask, ICollection<string>> _scheduledTaskKeys = new(); |
| | | 17 | | |
| | | 18 | | // Note: Using lock instead of SemaphoreSlim because: |
| | | 19 | | // 1. All critical sections are synchronous (dictionary operations only) |
| | | 20 | | // 2. Methods return ValueTask.CompletedTask (not truly async) |
| | | 21 | | // 3. No await inside critical sections |
| | | 22 | | // 4. lock has zero allocation overhead, perfect for fast synchronous operations |
| | 135 | 23 | | private readonly object _lock = new(); |
| | | 24 | | |
| | | 25 | | /// <summary> |
| | | 26 | | /// Initializes a new instance of the <see cref="LocalScheduler"/> class. |
| | | 27 | | /// </summary> |
| | 135 | 28 | | public LocalScheduler(IServiceProvider serviceProvider, ILogger<LocalScheduler> logger) |
| | | 29 | | { |
| | 135 | 30 | | _serviceProvider = serviceProvider; |
| | 135 | 31 | | _logger = logger; |
| | 135 | 32 | | } |
| | | 33 | | |
| | | 34 | | /// <inheritdoc /> |
| | | 35 | | public ValueTask ScheduleAsync(string name, ITask task, ISchedule schedule, CancellationToken cancellationToken = de |
| | | 36 | | { |
| | 21 | 37 | | return ScheduleAsync(name, task, schedule, null, cancellationToken); |
| | | 38 | | } |
| | | 39 | | |
| | | 40 | | /// <inheritdoc /> |
| | | 41 | | public ValueTask ScheduleAsync(string name, ITask task, ISchedule schedule, IEnumerable<string>? keys = null, Cancel |
| | | 42 | | { |
| | 21 | 43 | | var scheduleContext = new ScheduleContext(_serviceProvider, task); |
| | 21 | 44 | | var scheduledTask = schedule.Schedule(scheduleContext); |
| | | 45 | | |
| | 21 | 46 | | lock (_lock) |
| | | 47 | | { |
| | 21 | 48 | | RegisterScheduledTask(name, scheduledTask, keys); |
| | 21 | 49 | | } |
| | | 50 | | |
| | 21 | 51 | | return ValueTask.CompletedTask; |
| | | 52 | | } |
| | | 53 | | |
| | | 54 | | /// <inheritdoc /> |
| | | 55 | | public ValueTask ClearScheduleAsync(string name, CancellationToken cancellationToken = default) |
| | | 56 | | { |
| | 33 | 57 | | lock (_lock) |
| | | 58 | | { |
| | 33 | 59 | | RemoveScheduledTask(name); |
| | 33 | 60 | | } |
| | | 61 | | |
| | 33 | 62 | | return ValueTask.CompletedTask; |
| | | 63 | | } |
| | | 64 | | |
| | | 65 | | /// <inheritdoc /> |
| | | 66 | | public ValueTask ClearScheduleAsync(IEnumerable<string> keys, CancellationToken cancellationToken = default) |
| | | 67 | | { |
| | 0 | 68 | | lock (_lock) |
| | | 69 | | { |
| | 0 | 70 | | RemoveScheduledTasks(keys); |
| | 0 | 71 | | } |
| | | 72 | | |
| | 0 | 73 | | return ValueTask.CompletedTask; |
| | | 74 | | } |
| | | 75 | | |
| | | 76 | | |
| | | 77 | | private void RegisterScheduledTask(string name, IScheduledTask scheduledTask, IEnumerable<string>? keys = null) |
| | | 78 | | { |
| | 21 | 79 | | _scheduledTasks.AddOrUpdate( |
| | 21 | 80 | | name, |
| | 21 | 81 | | addValueFactory: _ => scheduledTask, |
| | 21 | 82 | | updateValueFactory: (_, existingScheduledTask) => |
| | 21 | 83 | | { |
| | 0 | 84 | | existingScheduledTask.Cancel(); |
| | 0 | 85 | | var removed = _scheduledTaskKeys.TryRemove(existingScheduledTask, out var _); |
| | 0 | 86 | | if (!removed) |
| | 0 | 87 | | _logger.LogWarning("Tried to remove scheduled task keys for an existing scheduled task, but it was n |
| | 0 | 88 | | return scheduledTask; |
| | 21 | 89 | | }); |
| | | 90 | | |
| | 21 | 91 | | if (keys != null) |
| | 0 | 92 | | _scheduledTaskKeys[scheduledTask] = keys.ToList(); |
| | 21 | 93 | | } |
| | | 94 | | |
| | | 95 | | |
| | | 96 | | private void RemoveScheduledTask(string name) |
| | | 97 | | { |
| | 33 | 98 | | if (_scheduledTasks.TryGetValue(name, out var existingScheduledTask)) |
| | | 99 | | { |
| | 19 | 100 | | _scheduledTaskKeys.Remove(existingScheduledTask, out _); |
| | 19 | 101 | | _scheduledTasks.Remove(name, out _); |
| | 19 | 102 | | existingScheduledTask.Cancel(); |
| | | 103 | | } |
| | 33 | 104 | | } |
| | | 105 | | |
| | | 106 | | private void RemoveScheduledTasks(IEnumerable<string> keys) |
| | | 107 | | { |
| | 0 | 108 | | foreach (var key in keys) |
| | | 109 | | { |
| | 0 | 110 | | var scheduledTasks = _scheduledTaskKeys.Where(x => x.Value.Contains(key)).Select(x => x.Key).Distinct().ToLi |
| | | 111 | | |
| | 0 | 112 | | foreach (var scheduledTask in scheduledTasks) |
| | | 113 | | { |
| | | 114 | | // Collect all keys in _scheduledTasks that map to this scheduledTask |
| | 0 | 115 | | var matchingTaskKeys = _scheduledTasks.Where(x => x.Value == scheduledTask).Select(x => x.Key).ToList(); |
| | 0 | 116 | | foreach (var taskKey in matchingTaskKeys) |
| | | 117 | | { |
| | 0 | 118 | | var removed = _scheduledTasks.TryRemove(taskKey, out _); |
| | 0 | 119 | | if (!removed) |
| | 0 | 120 | | _logger.LogWarning("Failed to remove scheduled task with key '{TaskKey}' for '{Key}' from _sched |
| | | 121 | | } |
| | | 122 | | |
| | 0 | 123 | | _scheduledTaskKeys.Remove(scheduledTask, out _); |
| | 0 | 124 | | scheduledTask.Cancel(); |
| | | 125 | | } |
| | | 126 | | } |
| | 0 | 127 | | } |
| | | 128 | | } |