| | | 1 | | using Elsa.Extensions; |
| | | 2 | | using Elsa.Workflows.Models; |
| | | 3 | | using Elsa.Workflows.Services; |
| | | 4 | | using Elsa.Workflows.State; |
| | | 5 | | using Microsoft.Extensions.Logging; |
| | | 6 | | |
| | | 7 | | namespace Elsa.Workflows; |
| | | 8 | | |
| | | 9 | | /// <inheritdoc /> |
| | 434 | 10 | | public class WorkflowStateExtractor(ILogger<WorkflowStateExtractor> logger) : IWorkflowStateExtractor |
| | | 11 | | { |
| | | 12 | | /// <inheritdoc /> |
| | | 13 | | public WorkflowState Extract(WorkflowExecutionContext workflowExecutionContext) |
| | | 14 | | { |
| | 440 | 15 | | var state = new WorkflowState |
| | 440 | 16 | | { |
| | 440 | 17 | | Id = workflowExecutionContext.Id, |
| | 440 | 18 | | DefinitionId = workflowExecutionContext.Workflow.Identity.DefinitionId, |
| | 440 | 19 | | DefinitionVersionId = workflowExecutionContext.Workflow.Identity.Id, |
| | 440 | 20 | | DefinitionVersion = workflowExecutionContext.Workflow.Identity.Version, |
| | 440 | 21 | | CorrelationId = workflowExecutionContext.CorrelationId, |
| | 440 | 22 | | Name = workflowExecutionContext.Name, |
| | 440 | 23 | | ParentWorkflowInstanceId = workflowExecutionContext.ParentWorkflowInstanceId, |
| | 440 | 24 | | Status = workflowExecutionContext.Status, |
| | 440 | 25 | | SubStatus = workflowExecutionContext.SubStatus, |
| | 440 | 26 | | IsExecuting = workflowExecutionContext.IsExecuting, |
| | 440 | 27 | | Bookmarks = workflowExecutionContext.Bookmarks, |
| | 440 | 28 | | ExecutionLogSequence = workflowExecutionContext.ExecutionLogSequence, |
| | 440 | 29 | | Input = GetPersistableInput(workflowExecutionContext), |
| | 440 | 30 | | Output = workflowExecutionContext.Output, |
| | 440 | 31 | | Incidents = workflowExecutionContext.Incidents, |
| | 440 | 32 | | IsSystem = workflowExecutionContext.Workflow.IsSystem, |
| | 440 | 33 | | CreatedAt = workflowExecutionContext.CreatedAt, |
| | 440 | 34 | | UpdatedAt = workflowExecutionContext.UpdatedAt, |
| | 440 | 35 | | FinishedAt = workflowExecutionContext.FinishedAt, |
| | 440 | 36 | | }; |
| | | 37 | | |
| | 440 | 38 | | ExtractProperties(state, workflowExecutionContext); |
| | 440 | 39 | | ExtractActiveActivityExecutionContexts(state, workflowExecutionContext); |
| | 440 | 40 | | ExtractCompletionCallbacks(state, workflowExecutionContext); |
| | 440 | 41 | | ExtractScheduledActivities(state, workflowExecutionContext); |
| | | 42 | | |
| | 440 | 43 | | return state; |
| | | 44 | | } |
| | | 45 | | |
| | | 46 | | /// <inheritdoc /> |
| | | 47 | | public async Task<WorkflowExecutionContext> ApplyAsync(WorkflowExecutionContext workflowExecutionContext, WorkflowSt |
| | | 48 | | { |
| | 126 | 49 | | workflowExecutionContext.Id = state.Id; |
| | 126 | 50 | | workflowExecutionContext.CorrelationId = state.CorrelationId; |
| | 126 | 51 | | workflowExecutionContext.Name = state.Name; |
| | 126 | 52 | | workflowExecutionContext.ParentWorkflowInstanceId = state.ParentWorkflowInstanceId; |
| | 126 | 53 | | workflowExecutionContext.SubStatus = state.SubStatus; |
| | 126 | 54 | | workflowExecutionContext.IsExecuting = state.IsExecuting; |
| | 126 | 55 | | workflowExecutionContext.Bookmarks = state.Bookmarks; |
| | 126 | 56 | | workflowExecutionContext.Output = state.Output; |
| | 126 | 57 | | workflowExecutionContext.ExecutionLogSequence = state.ExecutionLogSequence; |
| | 126 | 58 | | workflowExecutionContext.CreatedAt = state.CreatedAt; |
| | 126 | 59 | | workflowExecutionContext.UpdatedAt = state.UpdatedAt; |
| | 126 | 60 | | workflowExecutionContext.FinishedAt = state.FinishedAt; |
| | 126 | 61 | | ApplyInput(state, workflowExecutionContext); |
| | 126 | 62 | | ApplyProperties(state, workflowExecutionContext); |
| | 126 | 63 | | await ApplyActivityExecutionContextsAsync(state, workflowExecutionContext); |
| | 126 | 64 | | ApplyCompletionCallbacks(state, workflowExecutionContext); |
| | 126 | 65 | | ApplyScheduledActivities(state, workflowExecutionContext); |
| | 126 | 66 | | return workflowExecutionContext; |
| | 126 | 67 | | } |
| | | 68 | | |
| | | 69 | | private void ApplyInput(WorkflowState state, WorkflowExecutionContext workflowExecutionContext) |
| | | 70 | | { |
| | | 71 | | // Only add input from state if the input doesn't already exist on the workflow execution context. |
| | 330 | 72 | | foreach (var inputItem in state.Input) |
| | 39 | 73 | | if (!workflowExecutionContext.Input.ContainsKey(inputItem.Key)) |
| | 0 | 74 | | workflowExecutionContext.Input.Add(inputItem.Key, inputItem.Value); |
| | 126 | 75 | | } |
| | | 76 | | |
| | | 77 | | private IDictionary<string, object> GetPersistableInput(WorkflowExecutionContext workflowExecutionContext) |
| | | 78 | | { |
| | | 79 | | // TODO: This is a temporary solution. We need to find a better way to handle this. |
| | 457 | 80 | | var persistableInput = workflowExecutionContext.Workflow.Inputs.Where(x => x.StorageDriverType == typeof(Workflo |
| | 440 | 81 | | var input = workflowExecutionContext.Input; |
| | 440 | 82 | | var filteredInput = new Dictionary<string, object>(); |
| | | 83 | | |
| | 880 | 84 | | foreach (var inputDefinition in persistableInput) |
| | | 85 | | { |
| | 0 | 86 | | if (input.TryGetValue(inputDefinition.Name, out var value)) |
| | 0 | 87 | | filteredInput.Add(inputDefinition.Name, value); |
| | | 88 | | } |
| | | 89 | | |
| | 440 | 90 | | return filteredInput; |
| | | 91 | | } |
| | | 92 | | |
| | | 93 | | private void ExtractProperties(WorkflowState state, WorkflowExecutionContext workflowExecutionContext) |
| | | 94 | | { |
| | 440 | 95 | | state.Properties = workflowExecutionContext.Properties; |
| | 440 | 96 | | } |
| | | 97 | | |
| | | 98 | | private void ApplyProperties(WorkflowState state, WorkflowExecutionContext workflowExecutionContext) |
| | | 99 | | { |
| | | 100 | | // Merge properties. |
| | 328 | 101 | | foreach (var property in state.Properties) |
| | 38 | 102 | | workflowExecutionContext.Properties[property.Key] = property.Value; |
| | 126 | 103 | | } |
| | | 104 | | |
| | | 105 | | private async Task ApplyActivityExecutionContextsAsync(WorkflowState state, WorkflowExecutionContext workflowExecuti |
| | | 106 | | { |
| | | 107 | | var activityExecutionContexts = (await Task.WhenAll(state.ActivityExecutionContexts.Select(async item => await C |
| | | 108 | | |
| | | 109 | | var lookup = activityExecutionContexts.ToDictionary(x => x.Id); |
| | | 110 | | |
| | | 111 | | // Reconstruct hierarchy. |
| | | 112 | | foreach (var contextState in state.ActivityExecutionContexts.Where(x => !string.IsNullOrWhiteSpace(x.ParentConte |
| | | 113 | | { |
| | 92 | 114 | | var parentContextId = contextState.ParentContextId; |
| | 92 | 115 | | if (parentContextId == null || !lookup.TryGetValue(parentContextId, out var parentContext)) |
| | | 116 | | { |
| | 0 | 117 | | logger.LogWarning("Parent context with ID '{ParentContextId}' not found for context with ID '{ContextId} |
| | 0 | 118 | | continue; // Skip if parent context is not found. |
| | | 119 | | } |
| | | 120 | | |
| | 92 | 121 | | var contextId = contextState.Id; |
| | | 122 | | |
| | 92 | 123 | | if (lookup.TryGetValue(contextId, out var context)) |
| | | 124 | | { |
| | 92 | 125 | | context.ExpressionExecutionContext.ParentContext = parentContext.ExpressionExecutionContext; |
| | 92 | 126 | | context.ParentActivityExecutionContext = parentContext; |
| | | 127 | | } |
| | | 128 | | } |
| | | 129 | | |
| | | 130 | | // Assign root expression execution context. |
| | | 131 | | var rootActivityExecutionContexts = activityExecutionContexts.Where(x => x.ExpressionExecutionContext.ParentCont |
| | | 132 | | |
| | 252 | 133 | | foreach (var rootActivityExecutionContext in rootActivityExecutionContexts) |
| | 0 | 134 | | rootActivityExecutionContext.ExpressionExecutionContext.ParentContext = workflowExecutionContext.ExpressionE |
| | | 135 | | |
| | 126 | 136 | | workflowExecutionContext.ActivityExecutionContexts = activityExecutionContexts; |
| | 126 | 137 | | return; |
| | | 138 | | |
| | | 139 | | async Task<ActivityExecutionContext?> CreateActivityExecutionContextAsync(ActivityExecutionContextState activity |
| | | 140 | | { |
| | 133 | 141 | | var activity = workflowExecutionContext.FindActivityByNodeId(activityExecutionContextState.ScheduledActivity |
| | | 142 | | |
| | | 143 | | // Activity can be null in case the workflow instance was migrated to a newer version that no longer contain |
| | 133 | 144 | | if (activity == null) |
| | 0 | 145 | | return null; |
| | | 146 | | |
| | 133 | 147 | | var properties = activityExecutionContextState.Properties; |
| | 133 | 148 | | var metadata = activityExecutionContextState.Metadata; |
| | 133 | 149 | | var activityExecutionContext = await workflowExecutionContext.CreateActivityExecutionContextAsync(activity); |
| | 133 | 150 | | activityExecutionContext.Id = activityExecutionContextState.Id; |
| | 133 | 151 | | activityExecutionContext.Properties.Merge(properties); |
| | 133 | 152 | | activityExecutionContext.Metadata.Merge(metadata); |
| | | 153 | | |
| | 133 | 154 | | if(activityExecutionContextState.ActivityState != null) |
| | 133 | 155 | | activityExecutionContext.ActivityState.Merge(activityExecutionContextState.ActivityState); |
| | | 156 | | |
| | 133 | 157 | | activityExecutionContext.TransitionTo(activityExecutionContextState.Status); |
| | 133 | 158 | | activityExecutionContext.IsExecuting = activityExecutionContextState.IsExecuting; |
| | 133 | 159 | | activityExecutionContext.AggregateFaultCount = activityExecutionContextState.FaultCount; |
| | 133 | 160 | | activityExecutionContext.StartedAt = activityExecutionContextState.StartedAt; |
| | 133 | 161 | | activityExecutionContext.CompletedAt = activityExecutionContextState.CompletedAt; |
| | 133 | 162 | | activityExecutionContext.Tag = activityExecutionContextState.Tag; |
| | 133 | 163 | | activityExecutionContext.DynamicVariables = activityExecutionContextState.DynamicVariables; |
| | | 164 | | |
| | 133 | 165 | | return activityExecutionContext; |
| | 133 | 166 | | } |
| | 126 | 167 | | } |
| | | 168 | | |
| | | 169 | | private static void ApplyCompletionCallbacks(WorkflowState state, WorkflowExecutionContext workflowExecutionContext) |
| | | 170 | | { |
| | 436 | 171 | | foreach (var completionCallbackEntry in state.CompletionCallbacks) |
| | | 172 | | { |
| | 349 | 173 | | var ownerActivityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x |
| | 92 | 174 | | if (ownerActivityExecutionContext != null) |
| | | 175 | | { |
| | 92 | 176 | | var childNode = workflowExecutionContext.FindNodeById(completionCallbackEntry.ChildNodeId); |
| | | 177 | | |
| | 92 | 178 | | if (childNode == null) |
| | | 179 | | continue; |
| | | 180 | | |
| | 92 | 181 | | var callbackName = completionCallbackEntry.MethodName; |
| | 92 | 182 | | var callbackDelegate = !string.IsNullOrEmpty(callbackName) ? ownerActivityExecutionContext.Activity.GetA |
| | 92 | 183 | | var tag = completionCallbackEntry.Tag; |
| | 92 | 184 | | workflowExecutionContext.AddCompletionCallback(ownerActivityExecutionContext, childNode, callbackDelegat |
| | | 185 | | } |
| | | 186 | | } |
| | 126 | 187 | | } |
| | | 188 | | |
| | | 189 | | private void ApplyScheduledActivities(WorkflowState state, WorkflowExecutionContext workflowExecutionContext) |
| | | 190 | | { |
| | 252 | 191 | | foreach (var activityWorkItemState in state.ScheduledActivities) |
| | | 192 | | { |
| | 0 | 193 | | var activity = workflowExecutionContext.FindActivityByNodeId(activityWorkItemState.ActivityNodeId); |
| | | 194 | | |
| | 0 | 195 | | if (activity == null) |
| | | 196 | | continue; |
| | | 197 | | |
| | 0 | 198 | | var ownerContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x.Id == activityWo |
| | 0 | 199 | | var existingActivityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x = |
| | 0 | 200 | | var variables = activityWorkItemState.Variables; |
| | 0 | 201 | | var input = activityWorkItemState.Input; |
| | 0 | 202 | | var tag = activityWorkItemState.Tag; |
| | 0 | 203 | | var workItem = new ActivityWorkItem(activity, ownerContext, tag, variables, existingActivityExecutionContext |
| | 0 | 204 | | workflowExecutionContext.Scheduler.Schedule(workItem); |
| | | 205 | | } |
| | 126 | 206 | | } |
| | | 207 | | |
| | | 208 | | private static void ExtractCompletionCallbacks(WorkflowState state, WorkflowExecutionContext workflowExecutionContex |
| | | 209 | | { |
| | | 210 | | // Assert that all referenced owner contexts exist. |
| | 440 | 211 | | var activeContexts = workflowExecutionContext.GetActiveActivityExecutionContexts().ToList(); |
| | 1132 | 212 | | foreach (var completionCallback in workflowExecutionContext.CompletionCallbacks) |
| | | 213 | | { |
| | 360 | 214 | | var ownerContext = activeContexts.FirstOrDefault(x => x == completionCallback.Owner); |
| | | 215 | | |
| | 126 | 216 | | if (ownerContext == null) |
| | 0 | 217 | | throw new("Lost an owner context"); |
| | | 218 | | } |
| | | 219 | | |
| | 566 | 220 | | var completionCallbacks = workflowExecutionContext.CompletionCallbacks.Select(x => new CompletionCallbackState(x |
| | | 221 | | |
| | 440 | 222 | | state.CompletionCallbacks = completionCallbacks.ToList(); |
| | 440 | 223 | | } |
| | | 224 | | |
| | | 225 | | private static void ExtractActiveActivityExecutionContexts(WorkflowState state, WorkflowExecutionContext workflowExe |
| | | 226 | | { |
| | | 227 | | ActivityExecutionContextState CreateActivityExecutionContextState(ActivityExecutionContext activityExecutionCont |
| | | 228 | | { |
| | 574 | 229 | | var parentId = activityExecutionContext.ParentActivityExecutionContext?.Id; |
| | | 230 | | |
| | 574 | 231 | | if (parentId != null) |
| | | 232 | | { |
| | 452 | 233 | | var parentContext = activityExecutionContext.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrD |
| | | 234 | | |
| | 134 | 235 | | if (parentContext == null) |
| | 0 | 236 | | throw new("We lost a context. This could indicate a bug in a parent activity that completed before ( |
| | | 237 | | } |
| | | 238 | | |
| | 574 | 239 | | var activityExecutionContextState = new ActivityExecutionContextState |
| | 574 | 240 | | { |
| | 574 | 241 | | Id = activityExecutionContext.Id, |
| | 574 | 242 | | ParentContextId = activityExecutionContext.ParentActivityExecutionContext?.Id, |
| | 574 | 243 | | ScheduledActivityNodeId = activityExecutionContext.NodeId, |
| | 574 | 244 | | OwnerActivityNodeId = activityExecutionContext.ParentActivityExecutionContext?.NodeId, |
| | 574 | 245 | | Properties = activityExecutionContext.Properties, |
| | 574 | 246 | | Metadata = activityExecutionContext.Metadata, |
| | 574 | 247 | | ActivityState = activityExecutionContext.ActivityState, |
| | 574 | 248 | | Status = activityExecutionContext.Status, |
| | 574 | 249 | | IsExecuting = activityExecutionContext.IsExecuting, |
| | 574 | 250 | | FaultCount = activityExecutionContext.AggregateFaultCount, |
| | 574 | 251 | | StartedAt = activityExecutionContext.StartedAt, |
| | 574 | 252 | | CompletedAt = activityExecutionContext.CompletedAt, |
| | 574 | 253 | | Tag = activityExecutionContext.Tag, |
| | 574 | 254 | | DynamicVariables = activityExecutionContext.DynamicVariables, |
| | 574 | 255 | | }; |
| | 574 | 256 | | return activityExecutionContextState; |
| | | 257 | | } |
| | | 258 | | |
| | | 259 | | // Only persist non-completed contexts. |
| | 440 | 260 | | state.ActivityExecutionContexts = workflowExecutionContext.GetActiveActivityExecutionContexts().Reverse().Select |
| | 440 | 261 | | } |
| | | 262 | | |
| | | 263 | | private void ExtractScheduledActivities(WorkflowState state, WorkflowExecutionContext workflowExecutionContext) |
| | | 264 | | { |
| | 440 | 265 | | var scheduledActivities = workflowExecutionContext |
| | 440 | 266 | | .Scheduler.List() |
| | 440 | 267 | | .Select(x => new ActivityWorkItemState |
| | 440 | 268 | | { |
| | 440 | 269 | | ActivityNodeId = x.Activity.NodeId, |
| | 440 | 270 | | OwnerContextId = x.Owner?.Id, |
| | 440 | 271 | | Tag = x.Tag, |
| | 440 | 272 | | Variables = x.Variables?.ToList(), |
| | 440 | 273 | | ExistingActivityExecutionContextId = x.ExistingActivityExecutionContext?.Id, |
| | 440 | 274 | | Input = x.Input, |
| | 440 | 275 | | }); |
| | | 276 | | |
| | 440 | 277 | | state.ScheduledActivities = scheduledActivities.ToList(); |
| | 440 | 278 | | } |
| | | 279 | | } |