< Summary

Information
Class: Elsa.Workflows.WorkflowStateExtractor
Assembly: Elsa.Workflows.Core
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Services/WorkflowStateExtractor.cs
Line coverage
89%
Covered lines: 147
Uncovered lines: 18
Coverable lines: 165
Total lines: 281
Line coverage: 89%
Branch coverage
65%
Covered branches: 38
Total branches: 58
Branch coverage: 65.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Services/WorkflowStateExtractor.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Models;
 3using Elsa.Workflows.Services;
 4using Elsa.Workflows.State;
 5using Microsoft.Extensions.Logging;
 6
 7namespace Elsa.Workflows;
 8
 9/// <inheritdoc />
 60010public class WorkflowStateExtractor(ILogger<WorkflowStateExtractor> logger) : IWorkflowStateExtractor
 11{
 12    /// <inheritdoc />
 13    public WorkflowState Extract(WorkflowExecutionContext workflowExecutionContext)
 14    {
 50615        var state = new WorkflowState
 50616        {
 50617            Id = workflowExecutionContext.Id,
 50618            DefinitionId = workflowExecutionContext.Workflow.Identity.DefinitionId,
 50619            DefinitionVersionId = workflowExecutionContext.Workflow.Identity.Id,
 50620            DefinitionVersion = workflowExecutionContext.Workflow.Identity.Version,
 50621            CorrelationId = workflowExecutionContext.CorrelationId,
 50622            Name = workflowExecutionContext.Name,
 50623            ParentWorkflowInstanceId = workflowExecutionContext.ParentWorkflowInstanceId,
 50624            Status = workflowExecutionContext.Status,
 50625            SubStatus = workflowExecutionContext.SubStatus,
 50626            IsExecuting = workflowExecutionContext.IsExecuting,
 50627            Bookmarks = workflowExecutionContext.Bookmarks,
 50628            ExecutionLogSequence = workflowExecutionContext.ExecutionLogSequence,
 50629            Input = GetPersistableInput(workflowExecutionContext),
 50630            Output = workflowExecutionContext.Output,
 50631            Incidents = workflowExecutionContext.Incidents,
 50632            IsSystem = workflowExecutionContext.Workflow.IsSystem,
 50633            CreatedAt = workflowExecutionContext.CreatedAt,
 50634            UpdatedAt = workflowExecutionContext.UpdatedAt,
 50635            FinishedAt = workflowExecutionContext.FinishedAt,
 50636        };
 37
 50638        ExtractProperties(state, workflowExecutionContext);
 50639        ExtractActiveActivityExecutionContexts(state, workflowExecutionContext);
 50640        ExtractCompletionCallbacks(state, workflowExecutionContext);
 50641        ExtractScheduledActivities(state, workflowExecutionContext);
 42
 50643        return state;
 44    }
 45
 46    /// <inheritdoc />
 47    public async Task<WorkflowExecutionContext> ApplyAsync(WorkflowExecutionContext workflowExecutionContext, WorkflowSt
 48    {
 13549        workflowExecutionContext.Id = state.Id;
 13550        workflowExecutionContext.CorrelationId = state.CorrelationId;
 13551        workflowExecutionContext.Name = state.Name;
 13552        workflowExecutionContext.ParentWorkflowInstanceId = state.ParentWorkflowInstanceId;
 13553        workflowExecutionContext.SubStatus = state.SubStatus;
 13554        workflowExecutionContext.IsExecuting = state.IsExecuting;
 13555        workflowExecutionContext.Bookmarks = state.Bookmarks;
 13556        workflowExecutionContext.Output = state.Output;
 13557        workflowExecutionContext.ExecutionLogSequence = state.ExecutionLogSequence;
 13558        workflowExecutionContext.CreatedAt = state.CreatedAt;
 13559        workflowExecutionContext.UpdatedAt = state.UpdatedAt;
 13560        workflowExecutionContext.FinishedAt = state.FinishedAt;
 13561        ApplyInput(state, workflowExecutionContext);
 13562        ApplyProperties(state, workflowExecutionContext);
 13563        await ApplyActivityExecutionContextsAsync(state, workflowExecutionContext);
 13564        ApplyCompletionCallbacks(state, workflowExecutionContext);
 13565        ApplyScheduledActivities(state, workflowExecutionContext);
 13566        return workflowExecutionContext;
 13567    }
 68
 69    private void ApplyInput(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 70    {
 71        // Only add input from state if the input doesn't already exist on the workflow execution context.
 34872        foreach (var inputItem in state.Input)
 3973            if (!workflowExecutionContext.Input.ContainsKey(inputItem.Key))
 074                workflowExecutionContext.Input.Add(inputItem.Key, inputItem.Value);
 13575    }
 76
 77    private IDictionary<string, object> GetPersistableInput(WorkflowExecutionContext workflowExecutionContext)
 78    {
 79        // TODO: This is a temporary solution. We need to find a better way to handle this.
 52080        var persistableInput = workflowExecutionContext.Workflow.Inputs.Where(x => x.StorageDriverType == typeof(Workflo
 50681        var input = workflowExecutionContext.Input;
 50682        var filteredInput = new Dictionary<string, object>();
 83
 101284        foreach (var inputDefinition in persistableInput)
 85        {
 086            if (input.TryGetValue(inputDefinition.Name, out var value))
 087                filteredInput.Add(inputDefinition.Name, value);
 88        }
 89
 50690        return filteredInput;
 91    }
 92
 93    private void ExtractProperties(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 94    {
 50695        state.Properties = workflowExecutionContext.Properties;
 50696    }
 97
 98    private void ApplyProperties(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 99    {
 100        // Merge properties.
 338101        foreach (var property in state.Properties)
 34102            workflowExecutionContext.Properties[property.Key] = property.Value;
 135103    }
 104
 105    private async Task ApplyActivityExecutionContextsAsync(WorkflowState state, WorkflowExecutionContext workflowExecuti
 106    {
 107        var activityExecutionContexts = (await Task.WhenAll(state.ActivityExecutionContexts.Select(async item => await C
 108
 109        var lookup = activityExecutionContexts.ToDictionary(x => x.Id);
 110
 111        // Reconstruct hierarchy.
 112        foreach (var contextState in state.ActivityExecutionContexts.Where(x => !string.IsNullOrWhiteSpace(x.ParentConte
 113        {
 97114            var parentContextId = contextState.ParentContextId;
 97115            if (parentContextId == null || !lookup.TryGetValue(parentContextId, out var parentContext))
 116            {
 0117                logger.LogWarning("Parent context with ID '{ParentContextId}' not found for context with ID '{ContextId}
 0118                continue; // Skip if parent context is not found.
 119            }
 120
 97121            var contextId = contextState.Id;
 122
 97123            if (lookup.TryGetValue(contextId, out var context))
 124            {
 97125                context.ExpressionExecutionContext.ParentContext = parentContext.ExpressionExecutionContext;
 97126                context.ParentActivityExecutionContext = parentContext;
 127            }
 128        }
 129
 130        // Assign root expression execution context.
 131        var rootActivityExecutionContexts = activityExecutionContexts.Where(x => x.ExpressionExecutionContext.ParentCont
 132
 270133        foreach (var rootActivityExecutionContext in rootActivityExecutionContexts)
 0134            rootActivityExecutionContext.ExpressionExecutionContext.ParentContext = workflowExecutionContext.ExpressionE
 135
 135136        workflowExecutionContext.ActivityExecutionContexts = activityExecutionContexts;
 135137        return;
 138
 139        async Task<ActivityExecutionContext?> CreateActivityExecutionContextAsync(ActivityExecutionContextState activity
 140        {
 138141            var activity = workflowExecutionContext.FindActivityByNodeId(activityExecutionContextState.ScheduledActivity
 142
 143            // Activity can be null in case the workflow instance was migrated to a newer version that no longer contain
 138144            if (activity == null)
 0145                return null;
 146
 138147            var properties = activityExecutionContextState.Properties;
 138148            var metadata = activityExecutionContextState.Metadata;
 138149            var activityExecutionContext = await workflowExecutionContext.CreateActivityExecutionContextAsync(activity);
 138150            activityExecutionContext.Id = activityExecutionContextState.Id;
 138151            activityExecutionContext.CallStackDepth = activityExecutionContextState.CallStackDepth;
 138152            activityExecutionContext.Properties.Merge(properties);
 138153            activityExecutionContext.Metadata.Merge(metadata);
 154
 138155            if(activityExecutionContextState.ActivityState != null)
 138156                activityExecutionContext.ActivityState.Merge(activityExecutionContextState.ActivityState);
 157
 138158            activityExecutionContext.TransitionTo(activityExecutionContextState.Status);
 138159            activityExecutionContext.IsExecuting = activityExecutionContextState.IsExecuting;
 138160            activityExecutionContext.AggregateFaultCount = activityExecutionContextState.FaultCount;
 138161            activityExecutionContext.StartedAt = activityExecutionContextState.StartedAt;
 138162            activityExecutionContext.CompletedAt = activityExecutionContextState.CompletedAt;
 138163            activityExecutionContext.Tag = activityExecutionContextState.Tag;
 138164            activityExecutionContext.DynamicVariables = activityExecutionContextState.DynamicVariables;
 165
 138166            return activityExecutionContext;
 138167        }
 135168    }
 169
 170    private static void ApplyCompletionCallbacks(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 171    {
 464172        foreach (var completionCallbackEntry in state.CompletionCallbacks)
 173        {
 387174            var ownerActivityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x
 97175            if (ownerActivityExecutionContext != null)
 176            {
 97177                var childNode = workflowExecutionContext.FindNodeById(completionCallbackEntry.ChildNodeId);
 178
 97179                if (childNode == null)
 180                    continue;
 181
 97182                var callbackName = completionCallbackEntry.MethodName;
 97183                var callbackDelegate = !string.IsNullOrEmpty(callbackName) ? ownerActivityExecutionContext.Activity.GetA
 97184                var tag = completionCallbackEntry.Tag;
 97185                workflowExecutionContext.AddCompletionCallback(ownerActivityExecutionContext, childNode, callbackDelegat
 186            }
 187        }
 135188    }
 189
 190    private void ApplyScheduledActivities(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 191    {
 270192        foreach (var activityWorkItemState in state.ScheduledActivities)
 193        {
 0194            var activity = workflowExecutionContext.FindActivityByNodeId(activityWorkItemState.ActivityNodeId);
 195
 0196            if (activity == null)
 197                continue;
 198
 0199            var ownerContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x.Id == activityWo
 0200            var existingActivityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x =
 0201            var variables = activityWorkItemState.Variables;
 0202            var input = activityWorkItemState.Input;
 0203            var tag = activityWorkItemState.Tag;
 0204            var workItem = new ActivityWorkItem(activity, ownerContext, tag, variables, existingActivityExecutionContext
 0205            workflowExecutionContext.Scheduler.Schedule(workItem);
 206        }
 135207    }
 208
 209    private static void ExtractCompletionCallbacks(WorkflowState state, WorkflowExecutionContext workflowExecutionContex
 210    {
 211        // Assert that all referenced owner contexts exist.
 506212        var activeContexts = workflowExecutionContext.GetActiveActivityExecutionContexts().ToList();
 1282213        foreach (var completionCallback in workflowExecutionContext.CompletionCallbacks)
 214        {
 393215            var ownerContext = activeContexts.FirstOrDefault(x => x == completionCallback.Owner);
 216
 135217            if (ownerContext == null)
 0218                throw new("Lost an owner context");
 219        }
 220
 641221        var completionCallbacks = workflowExecutionContext.CompletionCallbacks.Select(x => new CompletionCallbackState(x
 222
 506223        state.CompletionCallbacks = completionCallbacks.ToList();
 506224    }
 225
 226    private static void ExtractActiveActivityExecutionContexts(WorkflowState state, WorkflowExecutionContext workflowExe
 227    {
 228        ActivityExecutionContextState CreateActivityExecutionContextState(ActivityExecutionContext activityExecutionCont
 229        {
 657230            var parentId = activityExecutionContext.ParentActivityExecutionContext?.Id;
 231
 657232            if (parentId != null)
 233            {
 529234                var parentContext = activityExecutionContext.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrD
 235
 151236                if (parentContext == null)
 0237                    throw new("We lost a context. This could indicate a bug in a parent activity that completed before (
 238            }
 239
 657240            var activityExecutionContextState = new ActivityExecutionContextState
 657241            {
 657242                Id = activityExecutionContext.Id,
 657243                CallStackDepth = activityExecutionContext.CallStackDepth,
 657244                ParentContextId = activityExecutionContext.ParentActivityExecutionContext?.Id,
 657245                ScheduledActivityNodeId = activityExecutionContext.NodeId,
 657246                OwnerActivityNodeId = activityExecutionContext.ParentActivityExecutionContext?.NodeId,
 657247                Properties = activityExecutionContext.Properties,
 657248                Metadata = activityExecutionContext.Metadata,
 657249                ActivityState = activityExecutionContext.ActivityState,
 657250                Status = activityExecutionContext.Status,
 657251                IsExecuting = activityExecutionContext.IsExecuting,
 657252                FaultCount = activityExecutionContext.AggregateFaultCount,
 657253                StartedAt = activityExecutionContext.StartedAt,
 657254                CompletedAt = activityExecutionContext.CompletedAt,
 657255                Tag = activityExecutionContext.Tag,
 657256                DynamicVariables = activityExecutionContext.DynamicVariables,
 657257            };
 657258            return activityExecutionContextState;
 259        }
 260
 261        // Only persist non-completed contexts.
 506262        state.ActivityExecutionContexts = workflowExecutionContext.GetActiveActivityExecutionContexts().Reverse().Select
 506263    }
 264
 265    private void ExtractScheduledActivities(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 266    {
 506267        var scheduledActivities = workflowExecutionContext
 506268            .Scheduler.List()
 506269            .Select(x => new ActivityWorkItemState
 506270            {
 506271                ActivityNodeId = x.Activity.NodeId,
 506272                OwnerContextId = x.Owner?.Id,
 506273                Tag = x.Tag,
 506274                Variables = x.Variables?.ToList(),
 506275                ExistingActivityExecutionContextId = x.ExistingActivityExecutionContext?.Id,
 506276                Input = x.Input,
 506277            });
 278
 506279        state.ScheduledActivities = scheduledActivities.ToList();
 506280    }
 281}