| | | 1 | | using Elsa.Common; |
| | | 2 | | using Elsa.Extensions; |
| | | 3 | | using Elsa.Scheduling.Activities; |
| | | 4 | | using Elsa.Scheduling.Bookmarks; |
| | | 5 | | using Elsa.Workflows.Models; |
| | | 6 | | using Elsa.Workflows.Runtime.Entities; |
| | | 7 | | using Microsoft.Extensions.Logging; |
| | | 8 | | |
| | | 9 | | namespace Elsa.Scheduling.Services; |
| | | 10 | | |
| | | 11 | | /// <summary> |
| | | 12 | | /// A default implementation of <see cref="ITriggerScheduler"/> that schedules triggers using <see cref="IWorkflowSchedu |
| | | 13 | | /// </summary> |
| | 453 | 14 | | public class DefaultTriggerScheduler(IWorkflowScheduler workflowScheduler, ISystemClock systemClock, ILogger<DefaultTrig |
| | | 15 | | : ITriggerScheduler |
| | | 16 | | { |
| | | 17 | | /// <inheritdoc /> |
| | | 18 | | public async Task ScheduleAsync(IEnumerable<StoredTrigger> triggers, CancellationToken cancellationToken = default) |
| | | 19 | | { |
| | 635 | 20 | | var triggerList = triggers.ToList(); |
| | 635 | 21 | | var timerTriggers = triggerList.Filter<Activities.Timer>(); |
| | 635 | 22 | | var startAtTriggers = triggerList.Filter<StartAt>(); |
| | 635 | 23 | | var cronTriggers = triggerList.Filter<Cron>(); |
| | 635 | 24 | | var now = systemClock.UtcNow; |
| | | 25 | | |
| | | 26 | | // Schedule each Timer trigger. |
| | 1274 | 27 | | foreach (var trigger in timerTriggers) |
| | | 28 | | { |
| | 2 | 29 | | var (startAt, interval) = trigger.GetPayload<TimerTriggerPayload>(); |
| | 2 | 30 | | var input = new { StartAt = startAt, Interval = interval }.ToDictionary(); |
| | 2 | 31 | | var request = new ScheduleNewWorkflowInstanceRequest |
| | 2 | 32 | | { |
| | 2 | 33 | | WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionVersionId(trigger.WorkflowDefinitionVers |
| | 2 | 34 | | TriggerActivityId = trigger.ActivityId, |
| | 2 | 35 | | Input = input |
| | 2 | 36 | | }; |
| | 2 | 37 | | await workflowScheduler.ScheduleRecurringAsync(trigger.Id, request, startAt, interval, cancellationToken); |
| | | 38 | | } |
| | | 39 | | |
| | | 40 | | // Schedule each StartAt trigger. |
| | 1270 | 41 | | foreach (var trigger in startAtTriggers) |
| | | 42 | | { |
| | 0 | 43 | | var executeAt = trigger.GetPayload<StartAtPayload>().ExecuteAt; |
| | | 44 | | |
| | | 45 | | // If the trigger is in the past, log info and skip scheduling. |
| | 0 | 46 | | if (executeAt < now) |
| | | 47 | | { |
| | 0 | 48 | | logger.LogInformation("StartAt trigger is in the past. TriggerId: {TriggerId}. ExecuteAt: {ExecuteAt}. S |
| | 0 | 49 | | continue; |
| | | 50 | | } |
| | | 51 | | |
| | 0 | 52 | | var input = new { ExecuteAt = executeAt }.ToDictionary(); |
| | 0 | 53 | | var request = new ScheduleNewWorkflowInstanceRequest |
| | 0 | 54 | | { |
| | 0 | 55 | | WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionVersionId(trigger.WorkflowDefinitionVers |
| | 0 | 56 | | TriggerActivityId = trigger.ActivityId, |
| | 0 | 57 | | Input = input |
| | 0 | 58 | | }; |
| | | 59 | | |
| | 0 | 60 | | await workflowScheduler.ScheduleAtAsync(trigger.Id, request, executeAt, cancellationToken); |
| | | 61 | | } |
| | | 62 | | |
| | | 63 | | // Schedule each Cron trigger. |
| | 1272 | 64 | | foreach (var trigger in cronTriggers) |
| | | 65 | | { |
| | 1 | 66 | | var payload = trigger.GetPayload<CronTriggerPayload>(); |
| | 1 | 67 | | var cronExpression = payload.CronExpression; |
| | | 68 | | |
| | 1 | 69 | | if (string.IsNullOrWhiteSpace(cronExpression)) |
| | | 70 | | { |
| | 0 | 71 | | logger.LogWarning("Cron expression is empty. TriggerId: {TriggerId}. Skipping scheduling of this trigger |
| | 0 | 72 | | continue; |
| | | 73 | | } |
| | | 74 | | |
| | 1 | 75 | | var input = new { CronExpression = cronExpression }.ToDictionary(); |
| | 1 | 76 | | var request = new ScheduleNewWorkflowInstanceRequest |
| | 1 | 77 | | { |
| | 1 | 78 | | WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionVersionId(trigger.WorkflowDefinitionVers |
| | 1 | 79 | | TriggerActivityId = trigger.ActivityId, |
| | 1 | 80 | | Input = input |
| | 1 | 81 | | }; |
| | | 82 | | try |
| | | 83 | | { |
| | 1 | 84 | | await workflowScheduler.ScheduleCronAsync(trigger.Id, request, cronExpression, cancellationToken); |
| | 1 | 85 | | } |
| | 0 | 86 | | catch (FormatException ex) |
| | | 87 | | { |
| | 0 | 88 | | logger.LogWarning(ex, "Cron expression format error. CronExpression: {CronExpression}", cronExpression); |
| | 0 | 89 | | } |
| | 1 | 90 | | } |
| | 635 | 91 | | } |
| | | 92 | | |
| | | 93 | | /// <inheritdoc /> |
| | | 94 | | public async Task UnscheduleAsync(IEnumerable<StoredTrigger> triggers, CancellationToken cancellationToken = default |
| | | 95 | | { |
| | 637 | 96 | | var triggerList = triggers.ToList(); |
| | 637 | 97 | | var timerTriggers = triggerList.Filter<Activities.Timer>(); |
| | 637 | 98 | | var startAtTriggers = triggerList.Filter<StartAt>(); |
| | 637 | 99 | | var cronTriggers = triggerList.Filter<Cron>(); |
| | 637 | 100 | | var filteredTriggers = timerTriggers.Concat(startAtTriggers).Concat(cronTriggers); |
| | | 101 | | |
| | 1276 | 102 | | foreach (var trigger in filteredTriggers) |
| | 1 | 103 | | await workflowScheduler.UnscheduleAsync(trigger.Id, cancellationToken); |
| | 637 | 104 | | } |
| | | 105 | | } |