| | | 1 | | using System.ComponentModel; |
| | | 2 | | using Elsa.Common.Models; |
| | | 3 | | using Elsa.Extensions; |
| | | 4 | | using Elsa.Workflows.Activities; |
| | | 5 | | using Elsa.Workflows.Management.Entities; |
| | | 6 | | using Elsa.Workflows.Management.Filters; |
| | | 7 | | using Elsa.Workflows.Memory; |
| | | 8 | | using Elsa.Workflows.Models; |
| | | 9 | | using Elsa.Workflows.Signals; |
| | | 10 | | using Microsoft.Extensions.DependencyInjection; |
| | | 11 | | using Microsoft.Extensions.Logging; |
| | | 12 | | |
| | | 13 | | namespace Elsa.Workflows.Management.Activities.WorkflowDefinitionActivity; |
| | | 14 | | |
| | | 15 | | /// <summary> |
| | | 16 | | /// Loads and executes an <see cref="WorkflowDefinition"/>. |
| | | 17 | | /// </summary> |
| | | 18 | | [Browsable(false)] |
| | | 19 | | public class WorkflowDefinitionActivity : Composite, IInitializable |
| | | 20 | | { |
| | 785 | 21 | | private bool IsInitialized => Root.Id != null!; |
| | | 22 | | |
| | | 23 | | /// <summary> |
| | | 24 | | /// The definition ID of the workflow to schedule for execution. |
| | | 25 | | /// </summary> |
| | 1635 | 26 | | public string WorkflowDefinitionId { get; set; } = default!; |
| | | 27 | | |
| | | 28 | | /// <summary> |
| | | 29 | | /// The specific version ID of the workflow to schedule for execution. If not set, the <see cref="Version"/> number |
| | | 30 | | /// </summary> |
| | 2135 | 31 | | public string? WorkflowDefinitionVersionId { get; set; } |
| | | 32 | | |
| | | 33 | | /// <summary> |
| | | 34 | | /// The latest published version number set by the provider. Tooling uses this to let the user know that a newer ver |
| | | 35 | | /// </summary> |
| | 1094 | 36 | | public int LatestAvailablePublishedVersion { get; set; } |
| | | 37 | | |
| | | 38 | | /// <summary> |
| | | 39 | | /// The latest published version ID set by the provider. Tooling uses this to let the user know that a newer version |
| | | 40 | | /// </summary> |
| | 1094 | 41 | | public string? LatestAvailablePublishedVersionId { get; set; } |
| | | 42 | | |
| | | 43 | | async ValueTask IInitializable.InitializeAsync(InitializationContext context) |
| | | 44 | | { |
| | | 45 | | // This is not just for efficiency but also a necessity to avoid potential race conditions. |
| | | 46 | | // Such conditions can occur when multiple threads are simultaneously creating consuming workflows, |
| | | 47 | | // especially when cached workflows are being updated during the graph construction process. |
| | 785 | 48 | | if (IsInitialized) |
| | 265 | 49 | | return; |
| | | 50 | | |
| | 520 | 51 | | var serviceProvider = context.ServiceProvider; |
| | 520 | 52 | | var cancellationToken = context.CancellationToken; |
| | | 53 | | |
| | | 54 | | // Find the workflow definition and not the graph; the graph must be computed at runtime, since NodeIds will var |
| | 520 | 55 | | var workflowDefinition = await GetWorkflowDefinitionAsync(serviceProvider, cancellationToken); |
| | | 56 | | |
| | 520 | 57 | | if (workflowDefinition == null) |
| | 0 | 58 | | throw new Exception($"Could not find workflow definition with ID {WorkflowDefinitionId}."); |
| | | 59 | | |
| | 520 | 60 | | var activityDescriptor = await FindActivityDescriptorAsync(serviceProvider); |
| | | 61 | | |
| | 520 | 62 | | if (activityDescriptor == null) |
| | | 63 | | { |
| | 0 | 64 | | var logger = serviceProvider.GetRequiredService<ILogger<WorkflowDefinitionActivity>>(); |
| | 0 | 65 | | logger.LogWarning("Could not find activity descriptor for activity type {ActivityType}", Type); |
| | | 66 | | } |
| | | 67 | | else |
| | | 68 | | { |
| | | 69 | | // Declare input and output variables. |
| | 535 | 70 | | DeclareInputAsVariables(activityDescriptor, (_, variable) => Variables.Declare(variable)); |
| | 569 | 71 | | DeclareOutputAsVariables(activityDescriptor, (_, variable) => Variables.Declare(variable)); |
| | | 72 | | } |
| | | 73 | | |
| | 520 | 74 | | var workflowDefinitionService = serviceProvider.GetRequiredService<IWorkflowDefinitionService>(); |
| | 520 | 75 | | var workflowGraph = await workflowDefinitionService.MaterializeWorkflowAsync(workflowDefinition, cancellationTok |
| | | 76 | | |
| | | 77 | | // Set the root activity. |
| | 520 | 78 | | Root = workflowGraph.Workflow; |
| | 785 | 79 | | } |
| | | 80 | | |
| | | 81 | | /// <inheritdoc /> |
| | | 82 | | protected override async ValueTask ExecuteAsync(ActivityExecutionContext context) |
| | | 83 | | { |
| | 403 | 84 | | await CopyInputOutputToVariablesAsync(context); |
| | 403 | 85 | | await context.ScheduleActivityAsync(Root, OnChildCompletedAsync); |
| | 403 | 86 | | } |
| | | 87 | | |
| | | 88 | | private async ValueTask OnChildCompletedAsync(ActivityCompletedContext context) |
| | | 89 | | { |
| | 403 | 90 | | var activityExecutionContext = context.TargetContext; |
| | | 91 | | |
| | | 92 | | // Do we have a "complete composite" signal that triggered the completion? |
| | 403 | 93 | | var completeCompositeSignal = context.WorkflowExecutionContext.TransientProperties.TryGetValue(nameof(CompleteCo |
| | | 94 | | |
| | | 95 | | // If we do, make sure to remove it from the transient properties. |
| | 403 | 96 | | if (completeCompositeSignal != null) |
| | | 97 | | { |
| | 0 | 98 | | var logger = context.GetRequiredService<ILogger<WorkflowDefinitionActivity>>(); |
| | 0 | 99 | | logger.LogDebug("Received a complete composite signal and removing it from the transient properties"); |
| | 0 | 100 | | context.WorkflowExecutionContext.TransientProperties.Remove(nameof(CompleteCompositeSignal)); |
| | | 101 | | } |
| | | 102 | | |
| | | 103 | | // Copy any collected outputs into the synthetic properties. |
| | 812 | 104 | | foreach (var outputDescriptor in activityExecutionContext.ActivityDescriptor.Outputs) |
| | | 105 | | { |
| | 3 | 106 | | var output = (Output?)outputDescriptor.ValueGetter(activityExecutionContext.Activity); |
| | | 107 | | // If direct output mapping is used, we can read the output value directly from the memory. |
| | 3 | 108 | | var value = activityExecutionContext.Get(output) ?? activityExecutionContext.Get(outputDescriptor.Name); |
| | | 109 | | |
| | | 110 | | // Make sure to select a parent scope to avoid naming collisions between outputs defined on the current scop |
| | 3 | 111 | | var parentActivityExecutionContext = activityExecutionContext.ParentActivityExecutionContext?.GetAncestors() |
| | 6 | 112 | | .Any(x => x.ActivityDescriptor.Outputs.Any(y => y.Name == outputDescriptor.Name)) == true |
| | 3 | 113 | | ? activityExecutionContext.ParentActivityExecutionContext ?? activityExecutionContext |
| | 3 | 114 | | : activityExecutionContext; |
| | | 115 | | |
| | 3 | 116 | | parentActivityExecutionContext.Set(output, value, outputDescriptor.Name); |
| | | 117 | | } |
| | | 118 | | |
| | | 119 | | // Complete this activity with the signal value. |
| | 403 | 120 | | await activityExecutionContext.CompleteActivityAsync(completeCompositeSignal?.Value); |
| | 403 | 121 | | } |
| | | 122 | | |
| | | 123 | | private async Task CopyInputOutputToVariablesAsync(ActivityExecutionContext context) |
| | | 124 | | { |
| | 403 | 125 | | var serviceProvider = context.GetRequiredService<IServiceProvider>(); |
| | 403 | 126 | | var activityDescriptor = await FindActivityDescriptorAsync(serviceProvider); |
| | | 127 | | |
| | 403 | 128 | | if (activityDescriptor == null) |
| | | 129 | | { |
| | 0 | 130 | | var logger = serviceProvider.GetRequiredService<ILogger<WorkflowDefinitionActivity>>(); |
| | 0 | 131 | | logger.LogWarning("Could not find activity descriptor for activity type {ActivityType}", Type); |
| | 0 | 132 | | return; |
| | | 133 | | } |
| | | 134 | | |
| | 403 | 135 | | DeclareInputAsVariables(activityDescriptor, (descriptor, variable) => |
| | 403 | 136 | | { |
| | 1 | 137 | | var inputName = descriptor.Name; |
| | 1 | 138 | | var input = SyntheticProperties.TryGetValue(inputName, out var inputValue) ? (Input?)inputValue : default; |
| | 1 | 139 | | var evaluatedExpression = input != null ? context.Get(input.MemoryBlockReference()) : default; |
| | 403 | 140 | | |
| | 1 | 141 | | context.ExpressionExecutionContext.Memory.Declare(variable); |
| | 1 | 142 | | variable.Set(context, evaluatedExpression); |
| | 404 | 143 | | }); |
| | | 144 | | |
| | 406 | 145 | | DeclareOutputAsVariables(activityDescriptor, (_, variable) => context.ExpressionExecutionContext.Memory.Declare( |
| | 403 | 146 | | } |
| | | 147 | | |
| | | 148 | | private void DeclareInputAsVariables(ActivityDescriptor activityDescriptor, Action<InputDescriptor, Variable> config |
| | | 149 | | { |
| | 1878 | 150 | | foreach (var inputDescriptor in activityDescriptor.Inputs) |
| | | 151 | | { |
| | 16 | 152 | | var inputName = inputDescriptor.Name; |
| | 16 | 153 | | var unsafeInputName = PropertyNameHelper.GetUnsafePropertyName(typeof(WorkflowDefinitionActivity), inputName |
| | 16 | 154 | | var variableType = typeof(Variable<>).MakeGenericType(inputDescriptor.Type); |
| | 16 | 155 | | var variable = (Variable)Activator.CreateInstance(variableType)!; |
| | | 156 | | |
| | 16 | 157 | | variable.Id = unsafeInputName; |
| | 16 | 158 | | variable.Name = unsafeInputName; |
| | 16 | 159 | | variable.StorageDriverType = inputDescriptor.StorageDriverType; |
| | | 160 | | |
| | 16 | 161 | | configureVariable(inputDescriptor, variable); |
| | | 162 | | } |
| | 923 | 163 | | } |
| | | 164 | | |
| | | 165 | | private void DeclareOutputAsVariables(ActivityDescriptor activityDescriptor, Action<OutputDescriptor, Variable> conf |
| | | 166 | | { |
| | 1950 | 167 | | foreach (var outputDescriptor in activityDescriptor.Outputs) |
| | | 168 | | { |
| | 52 | 169 | | var outputName = outputDescriptor.Name; |
| | 52 | 170 | | var unsafeOutputName = PropertyNameHelper.GetUnsafePropertyName(typeof(WorkflowDefinitionActivity), outputNa |
| | | 171 | | |
| | 52 | 172 | | var variable = new Variable |
| | 52 | 173 | | { |
| | 52 | 174 | | Id = unsafeOutputName, |
| | 52 | 175 | | Name = unsafeOutputName |
| | 52 | 176 | | }; |
| | | 177 | | |
| | 52 | 178 | | configureVariable(outputDescriptor, variable); |
| | | 179 | | } |
| | 923 | 180 | | } |
| | | 181 | | |
| | | 182 | | private async Task<WorkflowDefinition?> GetWorkflowDefinitionAsync(IServiceProvider serviceProvider, CancellationTok |
| | | 183 | | { |
| | 520 | 184 | | var workflowDefinitionService = serviceProvider.GetRequiredService<IWorkflowDefinitionService>(); |
| | 520 | 185 | | var filter = new WorkflowDefinitionFilter |
| | 520 | 186 | | { |
| | 520 | 187 | | DefinitionId = WorkflowDefinitionId |
| | 520 | 188 | | }; |
| | | 189 | | |
| | 520 | 190 | | if (!string.IsNullOrWhiteSpace(WorkflowDefinitionVersionId)) |
| | 520 | 191 | | filter.Id = WorkflowDefinitionVersionId; |
| | | 192 | | else |
| | 0 | 193 | | filter.VersionOptions = VersionOptions.SpecificVersion(Version); |
| | | 194 | | |
| | 520 | 195 | | var workflowDefinition = |
| | 520 | 196 | | await workflowDefinitionService.FindWorkflowDefinitionAsync(filter, cancellationToken) |
| | 520 | 197 | | ?? (await workflowDefinitionService.FindWorkflowDefinitionAsync(new WorkflowDefinitionFilter |
| | 520 | 198 | | { |
| | 520 | 199 | | DefinitionId = WorkflowDefinitionId, |
| | 520 | 200 | | VersionOptions = VersionOptions.Published |
| | 520 | 201 | | }, cancellationToken) |
| | 520 | 202 | | ?? await workflowDefinitionService.FindWorkflowDefinitionAsync(new WorkflowDefinitionFilter |
| | 520 | 203 | | { |
| | 520 | 204 | | DefinitionId = WorkflowDefinitionId, |
| | 520 | 205 | | VersionOptions = VersionOptions.Latest |
| | 520 | 206 | | }, cancellationToken)); |
| | | 207 | | |
| | 520 | 208 | | return workflowDefinition; |
| | 520 | 209 | | } |
| | | 210 | | |
| | | 211 | | private async Task<ActivityDescriptor?> FindActivityDescriptorAsync(IServiceProvider serviceProvider) |
| | | 212 | | { |
| | 923 | 213 | | var activityRegistryLookup = serviceProvider.GetRequiredService<IActivityRegistryLookupService>(); |
| | 923 | 214 | | return await activityRegistryLookup.FindAsync(Type, Version) ?? await activityRegistryLookup.FindAsync(Type); |
| | 923 | 215 | | } |
| | | 216 | | } |