< Summary

Information
Class: Elsa.Workflows.WorkflowStateExtractor
Assembly: Elsa.Workflows.Core
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Services/WorkflowStateExtractor.cs
Line coverage
88%
Covered lines: 145
Uncovered lines: 18
Coverable lines: 163
Total lines: 279
Line coverage: 88.9%
Branch coverage
65%
Covered branches: 38
Total branches: 58
Branch coverage: 65.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Core/Services/WorkflowStateExtractor.cs

#LineLine coverage
 1using Elsa.Extensions;
 2using Elsa.Workflows.Models;
 3using Elsa.Workflows.Services;
 4using Elsa.Workflows.State;
 5using Microsoft.Extensions.Logging;
 6
 7namespace Elsa.Workflows;
 8
 9/// <inheritdoc />
 43410public class WorkflowStateExtractor(ILogger<WorkflowStateExtractor> logger) : IWorkflowStateExtractor
 11{
 12    /// <inheritdoc />
 13    public WorkflowState Extract(WorkflowExecutionContext workflowExecutionContext)
 14    {
 44015        var state = new WorkflowState
 44016        {
 44017            Id = workflowExecutionContext.Id,
 44018            DefinitionId = workflowExecutionContext.Workflow.Identity.DefinitionId,
 44019            DefinitionVersionId = workflowExecutionContext.Workflow.Identity.Id,
 44020            DefinitionVersion = workflowExecutionContext.Workflow.Identity.Version,
 44021            CorrelationId = workflowExecutionContext.CorrelationId,
 44022            Name = workflowExecutionContext.Name,
 44023            ParentWorkflowInstanceId = workflowExecutionContext.ParentWorkflowInstanceId,
 44024            Status = workflowExecutionContext.Status,
 44025            SubStatus = workflowExecutionContext.SubStatus,
 44026            IsExecuting = workflowExecutionContext.IsExecuting,
 44027            Bookmarks = workflowExecutionContext.Bookmarks,
 44028            ExecutionLogSequence = workflowExecutionContext.ExecutionLogSequence,
 44029            Input = GetPersistableInput(workflowExecutionContext),
 44030            Output = workflowExecutionContext.Output,
 44031            Incidents = workflowExecutionContext.Incidents,
 44032            IsSystem = workflowExecutionContext.Workflow.IsSystem,
 44033            CreatedAt = workflowExecutionContext.CreatedAt,
 44034            UpdatedAt = workflowExecutionContext.UpdatedAt,
 44035            FinishedAt = workflowExecutionContext.FinishedAt,
 44036        };
 37
 44038        ExtractProperties(state, workflowExecutionContext);
 44039        ExtractActiveActivityExecutionContexts(state, workflowExecutionContext);
 44040        ExtractCompletionCallbacks(state, workflowExecutionContext);
 44041        ExtractScheduledActivities(state, workflowExecutionContext);
 42
 44043        return state;
 44    }
 45
 46    /// <inheritdoc />
 47    public async Task<WorkflowExecutionContext> ApplyAsync(WorkflowExecutionContext workflowExecutionContext, WorkflowSt
 48    {
 12649        workflowExecutionContext.Id = state.Id;
 12650        workflowExecutionContext.CorrelationId = state.CorrelationId;
 12651        workflowExecutionContext.Name = state.Name;
 12652        workflowExecutionContext.ParentWorkflowInstanceId = state.ParentWorkflowInstanceId;
 12653        workflowExecutionContext.SubStatus = state.SubStatus;
 12654        workflowExecutionContext.IsExecuting = state.IsExecuting;
 12655        workflowExecutionContext.Bookmarks = state.Bookmarks;
 12656        workflowExecutionContext.Output = state.Output;
 12657        workflowExecutionContext.ExecutionLogSequence = state.ExecutionLogSequence;
 12658        workflowExecutionContext.CreatedAt = state.CreatedAt;
 12659        workflowExecutionContext.UpdatedAt = state.UpdatedAt;
 12660        workflowExecutionContext.FinishedAt = state.FinishedAt;
 12661        ApplyInput(state, workflowExecutionContext);
 12662        ApplyProperties(state, workflowExecutionContext);
 12663        await ApplyActivityExecutionContextsAsync(state, workflowExecutionContext);
 12664        ApplyCompletionCallbacks(state, workflowExecutionContext);
 12665        ApplyScheduledActivities(state, workflowExecutionContext);
 12666        return workflowExecutionContext;
 12667    }
 68
 69    private void ApplyInput(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 70    {
 71        // Only add input from state if the input doesn't already exist on the workflow execution context.
 33072        foreach (var inputItem in state.Input)
 3973            if (!workflowExecutionContext.Input.ContainsKey(inputItem.Key))
 074                workflowExecutionContext.Input.Add(inputItem.Key, inputItem.Value);
 12675    }
 76
 77    private IDictionary<string, object> GetPersistableInput(WorkflowExecutionContext workflowExecutionContext)
 78    {
 79        // TODO: This is a temporary solution. We need to find a better way to handle this.
 45780        var persistableInput = workflowExecutionContext.Workflow.Inputs.Where(x => x.StorageDriverType == typeof(Workflo
 44081        var input = workflowExecutionContext.Input;
 44082        var filteredInput = new Dictionary<string, object>();
 83
 88084        foreach (var inputDefinition in persistableInput)
 85        {
 086            if (input.TryGetValue(inputDefinition.Name, out var value))
 087                filteredInput.Add(inputDefinition.Name, value);
 88        }
 89
 44090        return filteredInput;
 91    }
 92
 93    private void ExtractProperties(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 94    {
 44095        state.Properties = workflowExecutionContext.Properties;
 44096    }
 97
 98    private void ApplyProperties(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 99    {
 100        // Merge properties.
 328101        foreach (var property in state.Properties)
 38102            workflowExecutionContext.Properties[property.Key] = property.Value;
 126103    }
 104
 105    private async Task ApplyActivityExecutionContextsAsync(WorkflowState state, WorkflowExecutionContext workflowExecuti
 106    {
 107        var activityExecutionContexts = (await Task.WhenAll(state.ActivityExecutionContexts.Select(async item => await C
 108
 109        var lookup = activityExecutionContexts.ToDictionary(x => x.Id);
 110
 111        // Reconstruct hierarchy.
 112        foreach (var contextState in state.ActivityExecutionContexts.Where(x => !string.IsNullOrWhiteSpace(x.ParentConte
 113        {
 92114            var parentContextId = contextState.ParentContextId;
 92115            if (parentContextId == null || !lookup.TryGetValue(parentContextId, out var parentContext))
 116            {
 0117                logger.LogWarning("Parent context with ID '{ParentContextId}' not found for context with ID '{ContextId}
 0118                continue; // Skip if parent context is not found.
 119            }
 120
 92121            var contextId = contextState.Id;
 122
 92123            if (lookup.TryGetValue(contextId, out var context))
 124            {
 92125                context.ExpressionExecutionContext.ParentContext = parentContext.ExpressionExecutionContext;
 92126                context.ParentActivityExecutionContext = parentContext;
 127            }
 128        }
 129
 130        // Assign root expression execution context.
 131        var rootActivityExecutionContexts = activityExecutionContexts.Where(x => x.ExpressionExecutionContext.ParentCont
 132
 252133        foreach (var rootActivityExecutionContext in rootActivityExecutionContexts)
 0134            rootActivityExecutionContext.ExpressionExecutionContext.ParentContext = workflowExecutionContext.ExpressionE
 135
 126136        workflowExecutionContext.ActivityExecutionContexts = activityExecutionContexts;
 126137        return;
 138
 139        async Task<ActivityExecutionContext?> CreateActivityExecutionContextAsync(ActivityExecutionContextState activity
 140        {
 133141            var activity = workflowExecutionContext.FindActivityByNodeId(activityExecutionContextState.ScheduledActivity
 142
 143            // Activity can be null in case the workflow instance was migrated to a newer version that no longer contain
 133144            if (activity == null)
 0145                return null;
 146
 133147            var properties = activityExecutionContextState.Properties;
 133148            var metadata = activityExecutionContextState.Metadata;
 133149            var activityExecutionContext = await workflowExecutionContext.CreateActivityExecutionContextAsync(activity);
 133150            activityExecutionContext.Id = activityExecutionContextState.Id;
 133151            activityExecutionContext.Properties.Merge(properties);
 133152            activityExecutionContext.Metadata.Merge(metadata);
 153
 133154            if(activityExecutionContextState.ActivityState != null)
 133155                activityExecutionContext.ActivityState.Merge(activityExecutionContextState.ActivityState);
 156
 133157            activityExecutionContext.TransitionTo(activityExecutionContextState.Status);
 133158            activityExecutionContext.IsExecuting = activityExecutionContextState.IsExecuting;
 133159            activityExecutionContext.AggregateFaultCount = activityExecutionContextState.FaultCount;
 133160            activityExecutionContext.StartedAt = activityExecutionContextState.StartedAt;
 133161            activityExecutionContext.CompletedAt = activityExecutionContextState.CompletedAt;
 133162            activityExecutionContext.Tag = activityExecutionContextState.Tag;
 133163            activityExecutionContext.DynamicVariables = activityExecutionContextState.DynamicVariables;
 164
 133165            return activityExecutionContext;
 133166        }
 126167    }
 168
 169    private static void ApplyCompletionCallbacks(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 170    {
 436171        foreach (var completionCallbackEntry in state.CompletionCallbacks)
 172        {
 349173            var ownerActivityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x
 92174            if (ownerActivityExecutionContext != null)
 175            {
 92176                var childNode = workflowExecutionContext.FindNodeById(completionCallbackEntry.ChildNodeId);
 177
 92178                if (childNode == null)
 179                    continue;
 180
 92181                var callbackName = completionCallbackEntry.MethodName;
 92182                var callbackDelegate = !string.IsNullOrEmpty(callbackName) ? ownerActivityExecutionContext.Activity.GetA
 92183                var tag = completionCallbackEntry.Tag;
 92184                workflowExecutionContext.AddCompletionCallback(ownerActivityExecutionContext, childNode, callbackDelegat
 185            }
 186        }
 126187    }
 188
 189    private void ApplyScheduledActivities(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 190    {
 252191        foreach (var activityWorkItemState in state.ScheduledActivities)
 192        {
 0193            var activity = workflowExecutionContext.FindActivityByNodeId(activityWorkItemState.ActivityNodeId);
 194
 0195            if (activity == null)
 196                continue;
 197
 0198            var ownerContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x => x.Id == activityWo
 0199            var existingActivityExecutionContext = workflowExecutionContext.ActivityExecutionContexts.FirstOrDefault(x =
 0200            var variables = activityWorkItemState.Variables;
 0201            var input = activityWorkItemState.Input;
 0202            var tag = activityWorkItemState.Tag;
 0203            var workItem = new ActivityWorkItem(activity, ownerContext, tag, variables, existingActivityExecutionContext
 0204            workflowExecutionContext.Scheduler.Schedule(workItem);
 205        }
 126206    }
 207
 208    private static void ExtractCompletionCallbacks(WorkflowState state, WorkflowExecutionContext workflowExecutionContex
 209    {
 210        // Assert that all referenced owner contexts exist.
 440211        var activeContexts = workflowExecutionContext.GetActiveActivityExecutionContexts().ToList();
 1132212        foreach (var completionCallback in workflowExecutionContext.CompletionCallbacks)
 213        {
 360214            var ownerContext = activeContexts.FirstOrDefault(x => x == completionCallback.Owner);
 215
 126216            if (ownerContext == null)
 0217                throw new("Lost an owner context");
 218        }
 219
 566220        var completionCallbacks = workflowExecutionContext.CompletionCallbacks.Select(x => new CompletionCallbackState(x
 221
 440222        state.CompletionCallbacks = completionCallbacks.ToList();
 440223    }
 224
 225    private static void ExtractActiveActivityExecutionContexts(WorkflowState state, WorkflowExecutionContext workflowExe
 226    {
 227        ActivityExecutionContextState CreateActivityExecutionContextState(ActivityExecutionContext activityExecutionCont
 228        {
 574229            var parentId = activityExecutionContext.ParentActivityExecutionContext?.Id;
 230
 574231            if (parentId != null)
 232            {
 452233                var parentContext = activityExecutionContext.WorkflowExecutionContext.ActivityExecutionContexts.FirstOrD
 234
 134235                if (parentContext == null)
 0236                    throw new("We lost a context. This could indicate a bug in a parent activity that completed before (
 237            }
 238
 574239            var activityExecutionContextState = new ActivityExecutionContextState
 574240            {
 574241                Id = activityExecutionContext.Id,
 574242                ParentContextId = activityExecutionContext.ParentActivityExecutionContext?.Id,
 574243                ScheduledActivityNodeId = activityExecutionContext.NodeId,
 574244                OwnerActivityNodeId = activityExecutionContext.ParentActivityExecutionContext?.NodeId,
 574245                Properties = activityExecutionContext.Properties,
 574246                Metadata = activityExecutionContext.Metadata,
 574247                ActivityState = activityExecutionContext.ActivityState,
 574248                Status = activityExecutionContext.Status,
 574249                IsExecuting = activityExecutionContext.IsExecuting,
 574250                FaultCount = activityExecutionContext.AggregateFaultCount,
 574251                StartedAt = activityExecutionContext.StartedAt,
 574252                CompletedAt = activityExecutionContext.CompletedAt,
 574253                Tag = activityExecutionContext.Tag,
 574254                DynamicVariables = activityExecutionContext.DynamicVariables,
 574255            };
 574256            return activityExecutionContextState;
 257        }
 258
 259        // Only persist non-completed contexts.
 440260        state.ActivityExecutionContexts = workflowExecutionContext.GetActiveActivityExecutionContexts().Reverse().Select
 440261    }
 262
 263    private void ExtractScheduledActivities(WorkflowState state, WorkflowExecutionContext workflowExecutionContext)
 264    {
 440265        var scheduledActivities = workflowExecutionContext
 440266            .Scheduler.List()
 440267            .Select(x => new ActivityWorkItemState
 440268            {
 440269                ActivityNodeId = x.Activity.NodeId,
 440270                OwnerContextId = x.Owner?.Id,
 440271                Tag = x.Tag,
 440272                Variables = x.Variables?.ToList(),
 440273                ExistingActivityExecutionContextId = x.ExistingActivityExecutionContext?.Id,
 440274                Input = x.Input,
 440275            });
 276
 440277        state.ScheduledActivities = scheduledActivities.ToList();
 440278    }
 279}