< Summary

Information
Class: Elsa.Workflows.WorkflowStateExtractor
Assembly: Elsa.Workflows.Core
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Services/WorkflowStateExtractor.cs
Line coverage
88%
Covered lines: 145
Uncovered lines: 18
Coverable lines: 163
Total lines: 279
Line coverage: 88.9%
Branch coverage
65%
Covered branches: 38
Total branches: 58
Branch coverage: 65.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Services/WorkflowStateExtractor.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Models;
 3using Elsa.Workflows.Services;
 4using Elsa.Workflows.State;
 5using Microsoft.Extensions.Logging;
 6
 7namespace Elsa.Workflows;
 8
 9/// <inheritdoc />
 55310public class WorkflowStateExtractor(ILogger<WorkflowStateExtractor> logger) : IWorkflowStateExtractor
 11{
 12    /// <inheritdoc />
 13    public WorkflowState Extract(WorkflowExecutionContext workflowExecutionContext)
 14    {
 51315        var state = new WorkflowState
 51316        {
 51317            Id = workflowExecutionContext.Id,
 51318            DefinitionId = workflowExecutionContext.Workflow.Identity.DefinitionId,
 51319            DefinitionVersionId = workflowExecutionContext.Workflow.Identity.Id,
 51320            DefinitionVersion = workflowExecutionContext.Workflow.Identity.Version,
 51321            CorrelationId = workflowExecutionContext.CorrelationId,
 51322            Name = workflowExecutionContext.Name,
 51323            ParentWorkflowInstanceId = workflowExecutionContext.ParentWorkflowInstanceId,
 51324            Status = workflowExecutionContext.Status,
 51325            SubStatus = workflowExecutionContext.SubStatus,
 51326            IsExecuting = workflowExecutionContext.IsExecuting,
 51327            Bookmarks = workflowExecutionContext.Bookmarks,
 51328            ExecutionLogSequence = workflowExecutionContext.ExecutionLogSequence,
 51329            Input = GetPersistableInput(workflowExecutionContext),
 51330            Output = workflowExecutionContext.Output,
 51331            Incidents = workflowExecutionContext.Incidents,
 51332            IsSystem = workflowExecutionContext.Workflow.IsSystem,
 51333            CreatedAt = workflowExecutionContext.CreatedAt,
 51334            UpdatedAt = workflowExecutionContext.UpdatedAt,
 51335            FinishedAt = workflowExecutionContext.FinishedAt,
 51336        };
 37
 51338        ExtractProperties(state, workflowExecutionContext);
 51339        ExtractActiveActivityExecutionContexts(state, workflowExecutionContext);
 51340        ExtractCompletionCallbacks(state, workflowExecutionContext);
 51341        ExtractScheduledActivities(state, workflowExecutionContext);
 42
 51343        return state;
 44    }
 45
 46    /// <inheritdoc />
 47    public async Task<WorkflowExecutionContext> ApplyAsync(WorkflowExecutionContext workflowExecutionContext, WorkflowSt
 48    {
 13849        workflowExecutionContext.Id = state.Id;
 13850        workflowExecutionContext.CorrelationId = state.CorrelationId;
 13851        workflowExecutionContext.Name = state.Name;
 13852        workflowExecutionContext.ParentWorkflowInstanceId = state.ParentWorkflowInstanceId;
 13853        workflowExecutionContext.SubStatus = state.SubStatus;
 13854        workflowExecutionContext.IsExecuting = state.IsExecuting;
 13855        workflowExecutionContext.Bookmarks = state.Bookmarks;
 13856        workflowExecutionContext.Output = state.Output;
 13857        workflowExecutionContext.ExecutionLogSequence = state.ExecutionLogSequence;
 13858        workflowExecutionContext.CreatedAt = state.CreatedAt;
 13859        workflowExecutionContext.UpdatedAt = state.UpdatedAt;
 13860        workflowExecutionContext.FinishedAt = state.FinishedAt;
 13861        ApplyInput(state, workflowExecutionContext);
 13862        ApplyProperties(state, workflowExecutionContext);
 13863        await ApplyActivityExecutionContextsAsync(state, workflowExecutionContext);
 13864        ApplyCompletionCallbacks(state, workflowExecutionContext);
 13865        ApplyScheduledActivities(state, workflowExecutionContext);
 13866        return workflowExecutionContext;
 13867    }
 68
 69    private void ApplyInput(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 70    {
 71        // Only add input from state if the input doesn't already exist on the workflow execution context.
 35472        foreach (var inputItem in state.Input)
 3973            if (!workflowExecutionContext.Input.ContainsKey(inputItem.Key))
 074                workflowExecutionContext.Input.Add(inputItem.Key, inputItem.Value);
 13875    }
 76
 77    private IDictionary<string, object> GetPersistableInput(WorkflowExecutionContext workflowExecutionContext)
 78    {
 79        // TODO: This is a temporary solution. We need to find a better way to handle this.
 53080        var persistableInput = workflowExecutionContext.Workflow.Inputs.Where(x => x.StorageDriverType == typeof(Workflo
 51381        var input = workflowExecutionContext.Input;
 51382        var filteredInput = new Dictionary<string, object>();
 83
 102684        foreach (var inputDefinition in persistableInput)
 85        {
 086            if (input.TryGetValue(inputDefinition.Name, out var value))
 087                filteredInput.Add(inputDefinition.Name, value);
 88        }
 89
 51390        return filteredInput;
 91    }
 92
 93    private void ExtractProperties(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 94    {
 51395        state.Properties = workflowExecutionContext.Properties;
 51396    }
 97
 98    private void ApplyProperties(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 99    {
 100        // Merge properties.
 352101        foreach (var property in state.Properties)
 38102            workflowExecutionContext.Properties[property.Key] = property.Value;
 138103    }
 104
 105    private async Task ApplyActivityExecutionContextsAsync(WorkflowState state, WorkflowExecutionContext workflowExecuti
 106    {
 107        var activityExecutionContexts = (await Task.WhenAll(state.ActivityExecutionContexts.Select(async item => await C
 108
 109        var lookup = activityExecutionContexts.ToDictionary(x => x.Id);
 110
 111        // Reconstruct hierarchy.
 112        foreach (var contextState in state.ActivityExecutionContexts.Where(x => !string.IsNullOrWhiteSpace(x.ParentConte
 113        {
 105114            var parentContextId = contextState.ParentContextId;
 105115            if (parentContextId == null || !lookup.TryGetValue(parentContextId, out var parentContext))
 116            {
 0117                logger.LogWarning("Parent context with ID '{ParentContextId}' not found for context with ID '{ContextId}
 0118                continue; // Skip if parent context is not found.
 119            }
 120
 105121            var contextId = contextState.Id;
 122
 105123            if (lookup.TryGetValue(contextId, out var context))
 124            {
 105125                context.ExpressionExecutionContext.ParentContext = parentContext.ExpressionExecutionContext;
 105126                context.ParentActivityExecutionContext = parentContext;
 127            }
 128        }
 129
 130        // Assign root expression execution context.
 131        var rootActivityExecutionContexts = activityExecutionContexts.Where(x => x.ExpressionExecutionContext.ParentCont
 132
 276133        foreach (var rootActivityExecutionContext in rootActivityExecutionContexts)
 0134            rootActivityExecutionContext.ExpressionExecutionContext.ParentContext = workflowExecutionContext.ExpressionE
 135
 138136        workflowExecutionContext.ActivityExecutionContexts = activityExecutionContexts;
 138137        return;
 138
 139        async Task<ActivityExecutionContext?> CreateActivityExecutionContextAsync(ActivityExecutionContextState activity
 140        {
 149141            var activity = workflowExecutionContext.FindActivityByNodeId(activityExecutionContextState.ScheduledActivity
 142
 143            // Activity can be null in case the workflow instance was migrated to a newer version that no longer contain
 149144            if (activity == null)
 0145                return null;
 146
 149147            var properties = activityExecutionContextState.Properties;
 149148            var metadata = activityExecutionContextState.Metadata;
 149149            var activityExecutionContext = await workflowExecutionContext.CreateActivityExecutionContextAsync(activity);
 149150            activityExecutionContext.Id = activityExecutionContextState.Id;
 149151            activityExecutionContext.Properties.Merge(properties);
 149152            activityExecutionContext.Metadata.Merge(metadata);
 153
 149154            if(activityExecutionContextState.ActivityState != null)
 149155                activityExecutionContext.ActivityState.Merge(activityExecutionContextState.ActivityState);
 156
 149157            activityExecutionContext.TransitionTo(activityExecutionContextState.Status);
 149158            activityExecutionContext.IsExecuting = activityExecutionContextState.IsExecuting;
 149159            activityExecutionContext.AggregateFaultCount = activityExecutionContextState.FaultCount;
 149160            activityExecutionContext.StartedAt = activityExecutionContextState.StartedAt;
 149161            activityExecutionContext.CompletedAt = activityExecutionContextState.CompletedAt;
 149162            activityExecutionContext.Tag = activityExecutionContextState.Tag;
 149163            activityExecutionContext.DynamicVariables = activityExecutionContextState.DynamicVariables;
 164
 149165            return activityExecutionContext;
 149166        }
 138167    }
 168
 169    private static void ApplyCompletionCallbacks(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 170    {
 486171        foreach (var completionCallbackEntry in state.CompletionCallbacks)
 172        {
 415173            var ownerActivityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x
 105174            if (ownerActivityExecutionContext != null)
 175            {
 105176                var childNode = workflowExecutionContext.FindNodeById(completionCallbackEntry.ChildNodeId);
 177
 105178                if (childNode == null)
 179                    continue;
 180
 105181                var callbackName = completionCallbackEntry.MethodName;
 105182                var callbackDelegate = !string.IsNullOrEmpty(callbackName) ? ownerActivityExecutionContext.Activity.GetA
 105183                var tag = completionCallbackEntry.Tag;
 105184                workflowExecutionContext.AddCompletionCallback(ownerActivityExecutionContext, childNode, callbackDelegat
 185            }
 186        }
 138187    }
 188
 189    private void ApplyScheduledActivities(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 190    {
 276191        foreach (var activityWorkItemState in state.ScheduledActivities)
 192        {
 0193            var activity = workflowExecutionContext.FindActivityByNodeId(activityWorkItemState.ActivityNodeId);
 194
 0195            if (activity == null)
 196                continue;
 197
 0198            var ownerContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x.Id == activityWo
 0199            var existingActivityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x =
 0200            var variables = activityWorkItemState.Variables;
 0201            var input = activityWorkItemState.Input;
 0202            var tag = activityWorkItemState.Tag;
 0203            var workItem = new ActivityWorkItem(activity, ownerContext, tag, variables, existingActivityExecutionContext
 0204            workflowExecutionContext.Scheduler.Schedule(workItem);
 205        }
 138206    }
 207
 208    private static void ExtractCompletionCallbacks(WorkflowState state, WorkflowExecutionContext workflowExecutionContex
 209    {
 210        // Assert that all referenced owner contexts exist.
 513211        var activeContexts = workflowExecutionContext.GetActiveActivityExecutionContexts().ToList();
 1308212        foreach (var completionCallback in workflowExecutionContext.CompletionCallbacks)
 213        {
 408214            var ownerContext = activeContexts.FirstOrDefault(x => x == completionCallback.Owner);
 215
 141216            if (ownerContext == null)
 0217                throw new("Lost an owner context");
 218        }
 219
 654220        var completionCallbacks = workflowExecutionContext.CompletionCallbacks.Select(x => new CompletionCallbackState(x
 221
 513222        state.CompletionCallbacks = completionCallbacks.ToList();
 513223    }
 224
 225    private static void ExtractActiveActivityExecutionContexts(WorkflowState state, WorkflowExecutionContext workflowExe
 226    {
 227        ActivityExecutionContextState CreateActivityExecutionContextState(ActivityExecutionContext activityExecutionCont
 228        {
 670229            var parentId = activityExecutionContext.ParentActivityExecutionContext?.Id;
 230
 670231            if (parentId != null)
 232            {
 544233                var parentContext = activityExecutionContext.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrD
 234
 157235                if (parentContext == null)
 0236                    throw new("We lost a context. This could indicate a bug in a parent activity that completed before (
 237            }
 238
 670239            var activityExecutionContextState = new ActivityExecutionContextState
 670240            {
 670241                Id = activityExecutionContext.Id,
 670242                ParentContextId = activityExecutionContext.ParentActivityExecutionContext?.Id,
 670243                ScheduledActivityNodeId = activityExecutionContext.NodeId,
 670244                OwnerActivityNodeId = activityExecutionContext.ParentActivityExecutionContext?.NodeId,
 670245                Properties = activityExecutionContext.Properties,
 670246                Metadata = activityExecutionContext.Metadata,
 670247                ActivityState = activityExecutionContext.ActivityState,
 670248                Status = activityExecutionContext.Status,
 670249                IsExecuting = activityExecutionContext.IsExecuting,
 670250                FaultCount = activityExecutionContext.AggregateFaultCount,
 670251                StartedAt = activityExecutionContext.StartedAt,
 670252                CompletedAt = activityExecutionContext.CompletedAt,
 670253                Tag = activityExecutionContext.Tag,
 670254                DynamicVariables = activityExecutionContext.DynamicVariables,
 670255            };
 670256            return activityExecutionContextState;
 257        }
 258
 259        // Only persist non-completed contexts.
 513260        state.ActivityExecutionContexts = workflowExecutionContext.GetActiveActivityExecutionContexts().Reverse().Select
 513261    }
 262
 263    private void ExtractScheduledActivities(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 264    {
 513265        var scheduledActivities = workflowExecutionContext
 513266            .Scheduler.List()
 513267            .Select(x => new ActivityWorkItemState
 513268            {
 513269                ActivityNodeId = x.Activity.NodeId,
 513270                OwnerContextId = x.Owner?.Id,
 513271                Tag = x.Tag,
 513272                Variables = x.Variables?.ToList(),
 513273                ExistingActivityExecutionContextId = x.ExistingActivityExecutionContext?.Id,
 513274                Input = x.Input,
 513275            });
 276
 513277        state.ScheduledActivities = scheduledActivities.ToList();
 513278    }
 279}