| | | 1 | | using System.Reflection; |
| | | 2 | | using Elsa.Extensions; |
| | | 3 | | using Elsa.Mediator.Contracts; |
| | | 4 | | using Elsa.Workflows.Activities; |
| | | 5 | | using Elsa.Workflows.CommitStates; |
| | | 6 | | using Elsa.Workflows.Pipelines.ActivityExecution; |
| | | 7 | | using Microsoft.Extensions.Logging; |
| | | 8 | | |
| | | 9 | | namespace Elsa.Workflows.Middleware.Activities; |
| | | 10 | | |
| | | 11 | | /// <summary> |
| | | 12 | | /// Provides extension methods to <see cref="IActivityExecutionPipelineBuilder"/>. |
| | | 13 | | /// </summary> |
| | | 14 | | public static class ActivityInvokerMiddlewareExtensions |
| | | 15 | | { |
| | | 16 | | /// <summary> |
| | | 17 | | /// Adds the <see cref="DefaultActivityInvokerMiddleware"/> component to the pipeline. |
| | | 18 | | /// </summary> |
| | 0 | 19 | | public static IActivityExecutionPipelineBuilder UseDefaultActivityInvoker(this IActivityExecutionPipelineBuilder pip |
| | | 20 | | } |
| | | 21 | | |
| | | 22 | | /// <summary> |
| | | 23 | | /// A default activity execution middleware component that evaluates the current activity's properties, executes the act |
| | | 24 | | /// </summary> |
| | | 25 | | public class DefaultActivityInvokerMiddleware(ActivityMiddlewareDelegate next, ICommitStrategyRegistry commitStrategyReg |
| | | 26 | | : IActivityExecutionMiddleware |
| | | 27 | | { |
| | | 28 | | private static readonly MethodInfo ExecuteAsyncMethodInfo = typeof(IActivity).GetMethod(nameof(IActivity.ExecuteAsyn |
| | | 29 | | |
| | | 30 | | /// <inheritdoc /> |
| | | 31 | | public async ValueTask InvokeAsync(ActivityExecutionContext context) |
| | | 32 | | { |
| | | 33 | | context.CancellationToken.ThrowIfCancellationRequested(); |
| | | 34 | | |
| | | 35 | | var workflowExecutionContext = context.WorkflowExecutionContext; |
| | | 36 | | |
| | | 37 | | // Evaluate input properties. |
| | | 38 | | await EvaluateInputPropertiesAsync(context); |
| | | 39 | | |
| | | 40 | | // Prevent the activity from being started if cancellation is requested. |
| | | 41 | | if (context.CancellationToken.IsCancellationRequested) |
| | | 42 | | { |
| | | 43 | | context.TransitionTo(ActivityStatus.Canceled); |
| | | 44 | | context.AddExecutionLogEntry("Activity cancelled"); |
| | | 45 | | return; |
| | | 46 | | } |
| | | 47 | | |
| | | 48 | | // Check if the activity can be executed. |
| | | 49 | | if (!await context.Activity.CanExecuteAsync(context)) |
| | | 50 | | { |
| | | 51 | | context.TransitionTo(ActivityStatus.Pending); |
| | | 52 | | context.AddExecutionLogEntry("Precondition Failed", "Cannot execute at this time"); |
| | | 53 | | return; |
| | | 54 | | } |
| | | 55 | | |
| | | 56 | | // Mark workflow and activity as executing. |
| | | 57 | | using var executionState = context.EnterExecution(); |
| | | 58 | | |
| | | 59 | | // Conditionally commit the workflow state. |
| | | 60 | | if (ShouldCommit(context, ActivityLifetimeEvent.ActivityExecuting)) |
| | | 61 | | await context.WorkflowExecutionContext.CommitAsync(); |
| | | 62 | | |
| | | 63 | | var previousActivityStatus = context.Status; |
| | | 64 | | context.TransitionTo(ActivityStatus.Running); |
| | | 65 | | |
| | | 66 | | // Execute activity. |
| | | 67 | | await ExecuteActivityAsync(context); |
| | | 68 | | |
| | | 69 | | var currentActivityStatus = context.Status; |
| | | 70 | | var activityDidComplete = previousActivityStatus != ActivityStatus.Completed && currentActivityStatus == Activit |
| | | 71 | | |
| | | 72 | | // Reset execute delegate. |
| | | 73 | | workflowExecutionContext.ExecuteDelegate = null; |
| | | 74 | | |
| | | 75 | | // If a bookmark was used to resume, burn it if not burnt already by the activity. |
| | | 76 | | var resumedBookmark = workflowExecutionContext.ResumedBookmarkContext?.Bookmark; |
| | | 77 | | |
| | | 78 | | if (resumedBookmark is { AutoBurn: true }) |
| | | 79 | | { |
| | | 80 | | logger.LogDebug("Auto-burning bookmark {BookmarkId}", resumedBookmark.Id); |
| | | 81 | | workflowExecutionContext.Bookmarks.Remove(resumedBookmark); |
| | | 82 | | } |
| | | 83 | | |
| | | 84 | | // Update execution count. |
| | | 85 | | context.IncrementExecutionCount(); |
| | | 86 | | |
| | | 87 | | // Invoke next middleware. |
| | | 88 | | await next(context); |
| | | 89 | | |
| | | 90 | | // If the activity completed, send a notification. |
| | | 91 | | if (activityDidComplete) |
| | | 92 | | { |
| | | 93 | | var mediator = context.GetRequiredService<INotificationSender>(); |
| | | 94 | | await mediator.SendAsync(new Notifications.ActivityCompleted(context), context.CancellationToken); |
| | | 95 | | } |
| | | 96 | | |
| | | 97 | | // Conditionally commit the workflow state. |
| | | 98 | | if (ShouldCommit(context, ActivityLifetimeEvent.ActivityExecuted)) |
| | | 99 | | await context.WorkflowExecutionContext.CommitAsync(); |
| | | 100 | | } |
| | | 101 | | |
| | | 102 | | /// <summary> |
| | | 103 | | /// Executes the activity using the specified context. |
| | | 104 | | /// This method is virtual so that modules might override this implementation to do things like e.g. asynchronous pr |
| | | 105 | | /// </summary> |
| | | 106 | | protected virtual async ValueTask ExecuteActivityAsync(ActivityExecutionContext context) |
| | | 107 | | { |
| | | 108 | | var executeDelegate = context.WorkflowExecutionContext.ExecuteDelegate |
| | | 109 | | ?? (ExecuteActivityDelegate)Delegate.CreateDelegate(typeof(ExecuteActivityDelegate), conte |
| | | 110 | | |
| | | 111 | | await executeDelegate(context); |
| | | 112 | | } |
| | | 113 | | |
| | | 114 | | private async Task EvaluateInputPropertiesAsync(ActivityExecutionContext context) |
| | | 115 | | { |
| | | 116 | | // Evaluate containing composite input properties, if any. |
| | | 117 | | var compositeContainerContexts = context.GetAncestors().Where(x => x.Activity is Composite).ToList(); |
| | | 118 | | |
| | | 119 | | foreach (var activityExecutionContext in compositeContainerContexts) |
| | | 120 | | { |
| | | 121 | | if (!activityExecutionContext.GetHasEvaluatedProperties()) |
| | | 122 | | await activityExecutionContext.EvaluateInputPropertiesAsync(); |
| | | 123 | | } |
| | | 124 | | |
| | | 125 | | // Evaluate input properties. |
| | | 126 | | await context.EvaluateInputPropertiesAsync(); |
| | | 127 | | } |
| | | 128 | | |
| | | 129 | | private bool ShouldCommit(ActivityExecutionContext context, ActivityLifetimeEvent lifetimeEvent) |
| | | 130 | | { |
| | | 131 | | var strategyName = context.Activity.GetCommitStrategy(); |
| | | 132 | | var strategy = string.IsNullOrWhiteSpace(strategyName) ? null : commitStrategyRegistry.FindActivityStrategy(stra |
| | | 133 | | var commitAction = CommitAction.Default; |
| | | 134 | | |
| | | 135 | | if (strategy != null) |
| | | 136 | | { |
| | | 137 | | var strategyContext = new ActivityCommitStateStrategyContext(context, lifetimeEvent); |
| | | 138 | | commitAction = strategy.ShouldCommit(strategyContext); |
| | | 139 | | } |
| | | 140 | | |
| | | 141 | | switch (commitAction) |
| | | 142 | | { |
| | | 143 | | case CommitAction.Skip: |
| | | 144 | | return false; |
| | | 145 | | case CommitAction.Commit: |
| | | 146 | | return true; |
| | | 147 | | case CommitAction.Default: |
| | | 148 | | { |
| | | 149 | | var workflowStrategyName = context.WorkflowExecutionContext.Workflow.Options.CommitStrategyName; |
| | | 150 | | var workflowStrategy = string.IsNullOrWhiteSpace(workflowStrategyName) ? null : commitStrategyRegist |
| | | 151 | | |
| | | 152 | | if (workflowStrategy == null) |
| | | 153 | | return false; |
| | | 154 | | |
| | | 155 | | var workflowLifetimeEvent = lifetimeEvent == ActivityLifetimeEvent.ActivityExecuting ? WorkflowLifet |
| | | 156 | | var workflowCommitStateStrategyContext = new WorkflowCommitStateStrategyContext(context.WorkflowExec |
| | | 157 | | commitAction = workflowStrategy.ShouldCommit(workflowCommitStateStrategyContext); |
| | | 158 | | |
| | | 159 | | return commitAction == CommitAction.Commit; |
| | | 160 | | } |
| | | 161 | | default: |
| | | 162 | | throw new ArgumentOutOfRangeException(nameof(commitAction), commitAction, "Unknown commit action"); |
| | | 163 | | } |
| | | 164 | | } |
| | | 165 | | } |