< Summary

Information
Class: Elsa.Workflows.Management.Services.UpdatedWorkflowDefinition
Assembly: Elsa.Workflows.Management
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Management/Services/WorkflowReferenceUpdater.cs
Line coverage
100%
Covered lines: 1
Uncovered lines: 0
Coverable lines: 1
Total lines: 204
Line coverage: 100%
Branch coverage
N/A
Covered branches: 0
Total branches: 0
Branch coverage: N/A
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Definition()100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Management/Services/WorkflowReferenceUpdater.cs

#LineLine coverage
 1using Elsa.Common.Models;
 2using Elsa.Extensions;
 3using Elsa.Workflows.Activities;
 4using Elsa.Workflows.Management.Activities.WorkflowDefinitionActivity;
 5using Elsa.Workflows.Management.Entities;
 6using Elsa.Workflows.Management.Models;
 7using Elsa.Workflows.Models;
 8
 9namespace Elsa.Workflows.Management.Services;
 10
 911internal record UpdatedWorkflowDefinition(WorkflowDefinition Definition, WorkflowGraph NewGraph);
 12
 13public class WorkflowReferenceUpdater(
 14    IWorkflowDefinitionPublisher publisher,
 15    IWorkflowDefinitionService workflowDefinitionService,
 16    IWorkflowDefinitionStore workflowDefinitionStore,
 17    IWorkflowReferenceGraphBuilder workflowReferenceGraphBuilder,
 18    WorkflowDefinitionActivityDescriptorFactory workflowDefinitionActivityDescriptorFactory,
 19    IActivityRegistry activityRegistry,
 20    IApiSerializer serializer)
 21    : IWorkflowReferenceUpdater
 22{
 23    private bool _isUpdating;
 24
 25    public async Task<UpdateWorkflowReferencesResult> UpdateWorkflowReferencesAsync(
 26        WorkflowDefinition referencedDefinition,
 27        CancellationToken cancellationToken = default)
 28    {
 29        if (_isUpdating ||
 30            referencedDefinition.Options is not { UsableAsActivity: true, AutoUpdateConsumingWorkflows: true })
 31            return new([]);
 32
 33        var referenceGraph = await workflowReferenceGraphBuilder.BuildGraphAsync(referencedDefinition.DefinitionId, canc
 34
 35        // Get all consumer (source) and referenced (target) IDs from the edges
 36        var referencingIds = referenceGraph.Edges.Select(e => e.Source).Distinct().ToList();
 37        var referencedIds = referenceGraph.Edges.Select(e => e.Target).Distinct().ToList();
 38
 39        if (referencingIds.Count == 0)
 40            return new([]);
 41
 42        var referencingWorkflowGraphs = (await workflowDefinitionService.FindWorkflowGraphsAsync(new()
 43            {
 44                DefinitionIds = referencingIds,
 45                VersionOptions = VersionOptions.Latest,
 46                IsReadonly = false
 47            }, cancellationToken))
 48            .ToDictionary(g => g.Workflow.Identity.DefinitionId);
 49
 50        var referencedWorkflowDefinitionList = (await workflowDefinitionStore.FindManyAsync(new()
 51        {
 52            DefinitionIds = referencedIds,
 53            VersionOptions = VersionOptions.Published,
 54            IsReadonly = false
 55        }, cancellationToken)).ToList();
 56
 57        var referencedWorkflowDefinitionsPublished = referencedWorkflowDefinitionList
 58            .GroupBy(x => x.DefinitionId)
 59            .Select(group =>
 60            {
 61                var publishedVersion = group.FirstOrDefault(x => x.IsPublished);
 62                return publishedVersion ?? group.First();
 63            })
 64            .ToDictionary(d => d.DefinitionId);
 65
 66        var initialPublicationState = new Dictionary<string, bool>();
 67
 68        foreach (var workflowGraph in referencingWorkflowGraphs)
 69            initialPublicationState[workflowGraph.Key] = workflowGraph.Value.Workflow.Publication.IsPublished;
 70
 71        // Add the initially referenced definition
 72        referencedWorkflowDefinitionsPublished[referencedDefinition.DefinitionId] = referencedDefinition;
 73
 74        // Use the OutboundEdges lookup from the graph (Source → what it depends on)
 75        var dependencyMap = referenceGraph.OutboundEdges;
 76
 77        // Perform topological sort to ensure dependent workflows are processed in the right order
 78        var sortedWorkflowIds = referencingIds
 79            .TSort(id => dependencyMap[id], true)
 80            // Only process workflows that exist in our referencing workflows dictionary
 81            .Where(id => referencingWorkflowGraphs.ContainsKey(id))
 82            .ToList();
 83
 84        var updatedWorkflows = new Dictionary<string, UpdatedWorkflowDefinition>();
 85
 86        // Create a cache for drafts that we've already created during this operation
 87        var draftCache = new Dictionary<string, WorkflowDefinition>();
 88
 89        foreach (var id in sortedWorkflowIds)
 90        {
 91            if (!referencingWorkflowGraphs.TryGetValue(id, out var graph) || !dependencyMap[id].Any())
 92                continue;
 93
 94            foreach (var refId in dependencyMap[id])
 95            {
 96                var target = referencedWorkflowDefinitionsPublished.GetValueOrDefault(refId);
 97                if (target == null) continue;
 98
 99                var updated = await UpdateWorkflowAsync(graph, target, draftCache, initialPublicationState, cancellation
 100                if (updated == null) continue;
 101
 102                graph = updated.NewGraph;
 103                updatedWorkflows[updated.Definition.DefinitionId] = updated;
 104                referencedWorkflowDefinitionsPublished[id] = updated.Definition;
 105                draftCache[id] = updated.Definition;
 106                referencingWorkflowGraphs[id] = updated.NewGraph;
 107            }
 108        }
 109
 110        _isUpdating = true;
 111        foreach (var updatedWorkflow in updatedWorkflows.Values)
 112        {
 113            var requiresPublication = initialPublicationState.GetValueOrDefault(updatedWorkflow.Definition.DefinitionId)
 114            if (requiresPublication)
 115                await publisher.PublishAsync(updatedWorkflow.Definition, cancellationToken);
 116            else
 117                await publisher.SaveDraftAsync(updatedWorkflow.Definition, cancellationToken);
 118        }
 119
 120        _isUpdating = false;
 121
 122        return new(updatedWorkflows.Select(u => u.Value.Definition));
 123    }
 124
 125
 126    private async Task<UpdatedWorkflowDefinition?> UpdateWorkflowAsync(
 127        WorkflowGraph graph,
 128        WorkflowDefinition target,
 129        Dictionary<string, WorkflowDefinition> draftCache,
 130        Dictionary<string, bool> initialPublicationState,
 131        CancellationToken cancellationToken)
 132    {
 133        var willTargetBePublished = initialPublicationState.GetValueOrDefault(target.DefinitionId, target.IsPublished);
 134        if (!willTargetBePublished)
 135            return null;
 136
 137        var id = graph.Workflow.Identity.DefinitionId;
 138        var draft = await GetOrCreateDraftAsync(id, draftCache, cancellationToken);
 139        if (draft == null) return null;
 140
 141        var newGraph = await workflowDefinitionService.MaterializeWorkflowAsync(draft, cancellationToken);
 142        var outdated = FindActivities(newGraph.Root, target.DefinitionId)
 143            .Where(a => a.WorkflowDefinitionVersionId != target.Id)
 144            .ToList();
 145
 146        if (!outdated.Any()) return null;
 147
 148        foreach (var act in outdated)
 149        {
 150            act.WorkflowDefinitionVersionId = target.Id;
 151            act.Version = target.Version;
 152            act.LatestAvailablePublishedVersionId = target.Id;
 153            act.LatestAvailablePublishedVersion = target.Version;
 154        }
 155
 156        if (newGraph.Root.Activity is Workflow wf)
 157            draft.StringData = serializer.Serialize(wf.Root);
 158
 159        return new(draft, newGraph);
 160    }
 161
 162    private async Task<WorkflowDefinition?> GetOrCreateDraftAsync(
 163        string definitionId,
 164        Dictionary<string, WorkflowDefinition> draftCache,
 165        CancellationToken cancellationToken)
 166    {
 167        // Check if we already have a draft for this workflow
 168        if (draftCache.TryGetValue(definitionId, out var cachedDraft))
 169            return cachedDraft;
 170
 171        // Create or get a draft for this workflow
 172        var draft = await publisher.GetDraftAsync(definitionId, VersionOptions.Latest, cancellationToken);
 173        if (draft == null) return null;
 174
 175        // Store the draft in the cache for potential future use
 176        draftCache[definitionId] = draft;
 177
 178        // Get the current published version of the workflow definition.
 179        var publishedVersion = await workflowDefinitionStore.FindAsync(
 180            WorkflowDefinitionHandle.ByDefinitionId(definitionId, VersionOptions.Published).ToFilter(),
 181            cancellationToken);
 182
 183        // Update the activity registry to be able to materialize the workflow.
 184        var activityDescriptor = workflowDefinitionActivityDescriptorFactory.CreateDescriptor(draft, publishedVersion);
 185        activityRegistry.Add(typeof(WorkflowDefinitionActivityProvider), activityDescriptor);
 186
 187        return draft;
 188    }
 189
 190    private static IEnumerable<WorkflowDefinitionActivity> FindActivities(ActivityNode node, string definitionId)
 191    {
 192        // Do not drill into activities that are WorkflowDefinitionActivity
 193        if (node.Activity is WorkflowDefinitionActivity)
 194            yield break;
 195
 196        foreach (var child in node.Children)
 197        {
 198            if (child.Activity is WorkflowDefinitionActivity activity && activity.WorkflowDefinitionId == definitionId)
 199                yield return activity;
 200            foreach (var grandChildActivity in FindActivities(child, definitionId))
 201                yield return grandChildActivity;
 202        }
 203    }
 204}

Methods/Properties

get_Definition()