| | | 1 | | using System.Reflection; |
| | | 2 | | using Elsa.Extensions; |
| | | 3 | | using Elsa.Mediator.Contracts; |
| | | 4 | | using Elsa.Workflows.Activities; |
| | | 5 | | using Elsa.Workflows.CommitStates; |
| | | 6 | | using Elsa.Workflows.Pipelines.ActivityExecution; |
| | | 7 | | using Microsoft.Extensions.Logging; |
| | | 8 | | using Microsoft.Extensions.Options; |
| | | 9 | | |
| | | 10 | | namespace Elsa.Workflows.Middleware.Activities; |
| | | 11 | | |
| | | 12 | | /// <summary> |
| | | 13 | | /// Provides extension methods to <see cref="IActivityExecutionPipelineBuilder"/>. |
| | | 14 | | /// </summary> |
| | | 15 | | public static class ActivityInvokerMiddlewareExtensions |
| | | 16 | | { |
| | | 17 | | /// <summary> |
| | | 18 | | /// Adds the <see cref="DefaultActivityInvokerMiddleware"/> component to the pipeline. |
| | | 19 | | /// </summary> |
| | | 20 | | public static IActivityExecutionPipelineBuilder UseDefaultActivityInvoker(this IActivityExecutionPipelineBuilder pip |
| | | 21 | | } |
| | | 22 | | |
| | | 23 | | /// <summary> |
| | | 24 | | /// A default activity execution middleware component that evaluates the current activity's properties, executes the act |
| | | 25 | | /// </summary> |
| | 501 | 26 | | public class DefaultActivityInvokerMiddleware(ActivityMiddlewareDelegate next, ICommitStrategyRegistry commitStrategyReg |
| | | 27 | | : IActivityExecutionMiddleware |
| | | 28 | | { |
| | 3 | 29 | | private static readonly MethodInfo ExecuteAsyncMethodInfo = typeof(IActivity).GetMethod(nameof(IActivity.ExecuteAsyn |
| | | 30 | | |
| | | 31 | | /// <inheritdoc /> |
| | | 32 | | public async ValueTask InvokeAsync(ActivityExecutionContext context) |
| | | 33 | | { |
| | 3316 | 34 | | context.CancellationToken.ThrowIfCancellationRequested(); |
| | | 35 | | |
| | 3316 | 36 | | var workflowExecutionContext = context.WorkflowExecutionContext; |
| | | 37 | | |
| | | 38 | | // Evaluate input properties. |
| | 3316 | 39 | | await EvaluateInputPropertiesAsync(context); |
| | | 40 | | |
| | | 41 | | // Prevent the activity from being started if cancellation is requested. |
| | 3316 | 42 | | if (context.CancellationToken.IsCancellationRequested) |
| | | 43 | | { |
| | 0 | 44 | | context.TransitionTo(ActivityStatus.Canceled); |
| | 0 | 45 | | context.AddExecutionLogEntry("Activity cancelled"); |
| | 0 | 46 | | return; |
| | | 47 | | } |
| | | 48 | | |
| | | 49 | | // Check if the activity can be executed. |
| | 3316 | 50 | | if (!await context.Activity.CanExecuteAsync(context)) |
| | | 51 | | { |
| | 0 | 52 | | context.TransitionTo(ActivityStatus.Pending); |
| | 0 | 53 | | context.AddExecutionLogEntry("Precondition Failed", "Cannot execute at this time"); |
| | 0 | 54 | | return; |
| | | 55 | | } |
| | | 56 | | |
| | | 57 | | // Mark workflow and activity as executing. |
| | 3316 | 58 | | using var executionState = context.EnterExecution(); |
| | | 59 | | |
| | | 60 | | // Conditionally commit the workflow state. |
| | 3316 | 61 | | if (ShouldCommit(context, ActivityLifetimeEvent.ActivityExecuting)) |
| | 0 | 62 | | await context.WorkflowExecutionContext.CommitAsync(); |
| | | 63 | | |
| | 3316 | 64 | | var previousActivityStatus = context.Status; |
| | 3316 | 65 | | context.TransitionTo(ActivityStatus.Running); |
| | | 66 | | |
| | | 67 | | // Execute activity. |
| | 3316 | 68 | | await ExecuteActivityAsync(context); |
| | | 69 | | |
| | 3302 | 70 | | var currentActivityStatus = context.Status; |
| | 3302 | 71 | | var activityDidComplete = previousActivityStatus != ActivityStatus.Completed && currentActivityStatus == Activit |
| | | 72 | | |
| | | 73 | | // Reset execute delegate. |
| | 3302 | 74 | | workflowExecutionContext.ExecuteDelegate = null; |
| | | 75 | | |
| | | 76 | | // If a bookmark was used to resume, burn it if not burnt already by the activity. |
| | 3302 | 77 | | var resumedBookmark = workflowExecutionContext.ResumedBookmarkContext?.Bookmark; |
| | | 78 | | |
| | 3302 | 79 | | if (resumedBookmark is { AutoBurn: true }) |
| | | 80 | | { |
| | 61 | 81 | | logger.LogDebug("Auto-burning bookmark {BookmarkId}", resumedBookmark.Id); |
| | 61 | 82 | | workflowExecutionContext.Bookmarks.Remove(resumedBookmark); |
| | | 83 | | } |
| | | 84 | | |
| | | 85 | | // Update execution count. |
| | 3302 | 86 | | context.IncrementExecutionCount(); |
| | | 87 | | |
| | | 88 | | // Invoke next middleware. |
| | 3302 | 89 | | await next(context); |
| | | 90 | | |
| | | 91 | | // If the activity completed, send a notification. |
| | 3302 | 92 | | if (activityDidComplete) |
| | | 93 | | { |
| | 1083 | 94 | | var mediator = context.GetRequiredService<INotificationSender>(); |
| | 1083 | 95 | | await mediator.SendAsync(new Notifications.ActivityCompleted(context), context.CancellationToken); |
| | | 96 | | } |
| | | 97 | | |
| | | 98 | | // Conditionally commit the workflow state. |
| | 3302 | 99 | | if (ShouldCommit(context, ActivityLifetimeEvent.ActivityExecuted)) |
| | 0 | 100 | | await context.WorkflowExecutionContext.CommitAsync(); |
| | 3302 | 101 | | } |
| | | 102 | | |
| | | 103 | | /// <summary> |
| | | 104 | | /// Executes the activity using the specified context. |
| | | 105 | | /// This method is virtual so that modules might override this implementation to do things like e.g. asynchronous pr |
| | | 106 | | /// </summary> |
| | | 107 | | protected virtual async ValueTask ExecuteActivityAsync(ActivityExecutionContext context) |
| | | 108 | | { |
| | 3316 | 109 | | var executeDelegate = context.WorkflowExecutionContext.ExecuteDelegate |
| | 3316 | 110 | | ?? (ExecuteActivityDelegate)Delegate.CreateDelegate(typeof(ExecuteActivityDelegate), conte |
| | | 111 | | |
| | 3316 | 112 | | await executeDelegate(context); |
| | 3302 | 113 | | } |
| | | 114 | | |
| | | 115 | | private async Task EvaluateInputPropertiesAsync(ActivityExecutionContext context) |
| | | 116 | | { |
| | | 117 | | // Evaluate containing composite input properties, if any. |
| | 12914 | 118 | | var compositeContainerContexts = context.GetAncestors().Where(x => x.Activity is Composite).ToList(); |
| | | 119 | | |
| | 18790 | 120 | | foreach (var activityExecutionContext in compositeContainerContexts) |
| | | 121 | | { |
| | 6079 | 122 | | if (!activityExecutionContext.GetHasEvaluatedProperties()) |
| | 39 | 123 | | await activityExecutionContext.EvaluateInputPropertiesAsync(); |
| | | 124 | | } |
| | | 125 | | |
| | | 126 | | // Evaluate input properties. |
| | 3316 | 127 | | await context.EvaluateInputPropertiesAsync(); |
| | 3316 | 128 | | } |
| | | 129 | | |
| | | 130 | | private bool ShouldCommit(ActivityExecutionContext context, ActivityLifetimeEvent lifetimeEvent) |
| | | 131 | | { |
| | 6618 | 132 | | var strategyName = context.Activity.GetCommitStrategy(); |
| | | 133 | | |
| | 6618 | 134 | | IActivityCommitStrategy? strategy = !string.IsNullOrWhiteSpace(strategyName) |
| | 6618 | 135 | | ? commitStrategyRegistry.FindActivityStrategy(strategyName) |
| | 6618 | 136 | | : commitStateOptions.Value.DefaultActivityCommitStrategy; |
| | | 137 | | |
| | 6618 | 138 | | var commitAction = CommitAction.Default; |
| | | 139 | | |
| | 6618 | 140 | | if (strategy != null) |
| | | 141 | | { |
| | 0 | 142 | | var strategyContext = new ActivityCommitStateStrategyContext(context, lifetimeEvent); |
| | 0 | 143 | | commitAction = strategy.ShouldCommit(strategyContext); |
| | | 144 | | } |
| | | 145 | | |
| | | 146 | | switch (commitAction) |
| | | 147 | | { |
| | | 148 | | case CommitAction.Skip: |
| | 0 | 149 | | return false; |
| | | 150 | | case CommitAction.Commit: |
| | 0 | 151 | | return true; |
| | | 152 | | case CommitAction.Default: |
| | | 153 | | { |
| | 6618 | 154 | | var workflowStrategyName = context.WorkflowExecutionContext.Workflow.Options.CommitStrategyName; |
| | | 155 | | |
| | 6618 | 156 | | IWorkflowCommitStrategy? workflowStrategy = !string.IsNullOrWhiteSpace(workflowStrategyName) |
| | 6618 | 157 | | ? commitStrategyRegistry.FindWorkflowStrategy(workflowStrategyName) |
| | 6618 | 158 | | : commitStateOptions.Value.DefaultWorkflowCommitStrategy; |
| | | 159 | | |
| | 6618 | 160 | | if (workflowStrategy == null) |
| | 6618 | 161 | | return false; |
| | | 162 | | |
| | 0 | 163 | | var workflowLifetimeEvent = lifetimeEvent == ActivityLifetimeEvent.ActivityExecuting ? WorkflowLifet |
| | 0 | 164 | | var workflowCommitStateStrategyContext = new WorkflowCommitStateStrategyContext(context.WorkflowExec |
| | 0 | 165 | | commitAction = workflowStrategy.ShouldCommit(workflowCommitStateStrategyContext); |
| | | 166 | | |
| | 0 | 167 | | return commitAction == CommitAction.Commit; |
| | | 168 | | } |
| | | 169 | | default: |
| | 0 | 170 | | throw new ArgumentOutOfRangeException(nameof(commitAction), commitAction, "Unknown commit action"); |
| | | 171 | | } |
| | | 172 | | } |
| | | 173 | | } |