< Summary

Information
Class: Elsa.Workflows.Management.Services.WorkflowReferenceUpdater
Assembly: Elsa.Workflows.Management
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Management/Services/WorkflowReferenceUpdater.cs
Line coverage
96%
Covered lines: 121
Uncovered lines: 4
Coverable lines: 125
Total lines: 229
Line coverage: 96.8%
Branch coverage
87%
Covered branches: 56
Total branches: 64
Branch coverage: 87.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
UpdateWorkflowReferencesAsync()96.42%282898.11%
GetReferencingWorkflowDefinitionIdsAsync()90%101088.88%
UpdateWorkflowAsync()70%101094.73%
GetOrCreateDraftAsync()50%4491.66%
FindActivities()100%1010100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Management/Services/WorkflowReferenceUpdater.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using Elsa.Common.Models;
 3using Elsa.Extensions;
 4using Elsa.Workflows.Activities;
 5using Elsa.Workflows.Management.Activities.WorkflowDefinitionActivity;
 6using Elsa.Workflows.Management.Entities;
 7using Elsa.Workflows.Management.Models;
 8using Elsa.Workflows.Models;
 9
 10namespace Elsa.Workflows.Management.Services;
 11
 12internal record WorkflowReferences(string ReferencedDefinitionId, ICollection<string> ReferencingDefinitionIds);
 13
 14internal record UpdatedWorkflowDefinition(WorkflowDefinition Definition, WorkflowGraph NewGraph);
 15
 32016public class WorkflowReferenceUpdater(
 32017    IWorkflowDefinitionPublisher publisher,
 32018    IWorkflowDefinitionService workflowDefinitionService,
 32019    IWorkflowDefinitionStore workflowDefinitionStore,
 32020    IWorkflowReferenceQuery workflowReferenceQuery,
 32021    WorkflowDefinitionActivityDescriptorFactory workflowDefinitionActivityDescriptorFactory,
 32022    IActivityRegistry activityRegistry,
 32023    IApiSerializer serializer)
 24    : IWorkflowReferenceUpdater
 25{
 26    private bool _isUpdating;
 27
 28    public async Task<UpdateWorkflowReferencesResult> UpdateWorkflowReferencesAsync(
 29        WorkflowDefinition referencedDefinition,
 30        CancellationToken cancellationToken = default)
 31    {
 532        if (_isUpdating ||
 533            referencedDefinition.Options is not { UsableAsActivity: true, AutoUpdateConsumingWorkflows: true })
 334            return new([]);
 35
 236        var allWorkflowReferences = await GetReferencingWorkflowDefinitionIdsAsync(referencedDefinition.DefinitionId, ca
 237        var filteredWorkflowReferences = allWorkflowReferences
 338            .Where(r => r.ReferencingDefinitionIds.Any())
 139            .DistinctBy(r => r.ReferencedDefinitionId)
 240            .ToList();
 41
 342        var referencingIds = filteredWorkflowReferences.SelectMany(r => r.ReferencingDefinitionIds).Distinct().ToList();
 343        var referencedIds = filteredWorkflowReferences.Select(r => r.ReferencedDefinitionId).Distinct().ToList();
 44
 245        var referencingWorkflowGraphs = (await workflowDefinitionService.FindWorkflowGraphsAsync(new()
 246            {
 247                DefinitionIds = referencingIds,
 248                VersionOptions = VersionOptions.Latest,
 249                IsReadonly = false
 250            }, cancellationToken))
 351            .ToDictionary(g => g.Workflow.Identity.DefinitionId);
 52
 253        var referencedWorkflowDefinitionList = (await workflowDefinitionStore.FindManyAsync(new()
 254        {
 255            DefinitionIds = referencedIds,
 256            VersionOptions = VersionOptions.Published,
 257            IsReadonly = false
 258        }, cancellationToken)).ToList();
 59
 260        var referencedWorkflowDefinitionsPublished = referencedWorkflowDefinitionList
 161            .GroupBy(x => x.DefinitionId)
 262            .Select(group =>
 263            {
 264                var publishedVersion = group.FirstOrDefault(x => x.IsPublished);
 165                return publishedVersion ?? group.First();
 266            })
 367            .ToDictionary(d => d.DefinitionId);
 68
 269        var initialPublicationState = new Dictionary<string, bool>();
 70
 671        foreach (var workflowGraph in referencingWorkflowGraphs)
 172            initialPublicationState[workflowGraph.Key] = workflowGraph.Value.Workflow.Publication.IsPublished;
 73
 74        // Add the initially referenced definition
 275        referencedWorkflowDefinitionsPublished[referencedDefinition.DefinitionId] = referencedDefinition;
 76
 77        // Build dependency map for topological sorting
 278        var dependencyMap = filteredWorkflowReferences
 279            .SelectMany(r => r.ReferencingDefinitionIds.Select(id => (id, r.ReferencedDefinitionId)))
 480            .ToLookup(x => x.id, x => x.ReferencedDefinitionId);
 81
 82        // Perform topological sort to ensure dependent workflows are processed in the right order
 283        var sortedWorkflowIds = referencingIds
 284            .TSort(id => dependencyMap[id], true)
 285            // Only process workflows that exist in our referencing workflows dictionary
 286            .Where(id => referencingWorkflowGraphs.ContainsKey(id))
 287            .ToList();
 88
 289        var updatedWorkflows = new Dictionary<string, UpdatedWorkflowDefinition>();
 90
 91        // Create a cache for drafts that we've already created during this operation
 292        var draftCache = new Dictionary<string, WorkflowDefinition>();
 93
 694        foreach (var id in sortedWorkflowIds)
 95        {
 196            if (!referencingWorkflowGraphs.TryGetValue(id, out var graph) || !dependencyMap[id].Any())
 97                continue;
 98
 499            foreach (var refId in dependencyMap[id])
 100            {
 1101                var target = referencedWorkflowDefinitionsPublished.GetValueOrDefault(refId);
 1102                if (target == null) continue;
 103
 1104                var updated = await UpdateWorkflowAsync(graph, target, draftCache, initialPublicationState, cancellation
 1105                if (updated == null) continue;
 106
 1107                graph = updated.NewGraph;
 1108                updatedWorkflows[updated.Definition.DefinitionId] = updated;
 1109                referencedWorkflowDefinitionsPublished[id] = updated.Definition;
 1110                draftCache[id] = updated.Definition;
 1111                referencingWorkflowGraphs[id] = updated.NewGraph;
 112            }
 1113        }
 114
 2115        _isUpdating = true;
 6116        foreach (var updatedWorkflow in updatedWorkflows.Values)
 117        {
 1118            var requiresPublication = initialPublicationState.GetValueOrDefault(updatedWorkflow.Definition.DefinitionId)
 1119            if (requiresPublication)
 1120                await publisher.PublishAsync(updatedWorkflow.Definition, cancellationToken);
 121            else
 0122                await publisher.SaveDraftAsync(updatedWorkflow.Definition, cancellationToken);
 123        }
 124
 2125        _isUpdating = false;
 126
 3127        return new(updatedWorkflows.Select(u => u.Value.Definition));
 5128    }
 129
 130    private async IAsyncEnumerable<WorkflowReferences> GetReferencingWorkflowDefinitionIdsAsync(
 131        string definitionId,
 132        [EnumeratorCancellation] CancellationToken cancellationToken,
 133        HashSet<string>? visitedIds = null)
 134    {
 3135        visitedIds ??= new();
 136
 137        // If we've already processed this definition ID, skip it to prevent infinite recursion.
 3138        if (!visitedIds.Add(definitionId))
 0139            yield break;
 140
 3141        var refs = (await workflowReferenceQuery.ExecuteAsync(definitionId, cancellationToken)).ToList();
 3142        yield return new(definitionId, refs);
 143
 8144        foreach (var id in refs)
 145        {
 4146            await foreach (var child in GetReferencingWorkflowDefinitionIdsAsync(id, cancellationToken, visitedIds))
 1147                yield return child;
 148        }
 3149    }
 150
 151    private async Task<UpdatedWorkflowDefinition?> UpdateWorkflowAsync(
 152        WorkflowGraph graph,
 153        WorkflowDefinition target,
 154        Dictionary<string, WorkflowDefinition> draftCache,
 155        Dictionary<string, bool> initialPublicationState,
 156        CancellationToken cancellationToken)
 157    {
 1158        var willTargetBePublished = initialPublicationState.GetValueOrDefault(target.DefinitionId, target.IsPublished);
 1159        if (!willTargetBePublished)
 0160            return null;
 161
 1162        var id = graph.Workflow.Identity.DefinitionId;
 1163        var draft = await GetOrCreateDraftAsync(id, draftCache, cancellationToken);
 1164        if (draft == null) return null;
 165
 1166        var newGraph = await workflowDefinitionService.MaterializeWorkflowAsync(draft, cancellationToken);
 1167        var outdated = FindActivities(newGraph.Root, target.DefinitionId)
 1168            .Where(a => a.WorkflowDefinitionVersionId != target.Id)
 1169            .ToList();
 170
 1171        if (!outdated.Any()) return null;
 172
 4173        foreach (var act in outdated)
 174        {
 1175            act.WorkflowDefinitionVersionId = target.Id;
 1176            act.Version = target.Version;
 1177            act.LatestAvailablePublishedVersionId = target.Id;
 1178            act.LatestAvailablePublishedVersion = target.Version;
 179        }
 180
 1181        if (newGraph.Root.Activity is Workflow wf)
 1182            draft.StringData = serializer.Serialize(wf.Root);
 183
 1184        return new(draft, newGraph);
 1185    }
 186
 187    private async Task<WorkflowDefinition?> GetOrCreateDraftAsync(
 188        string definitionId,
 189        Dictionary<string, WorkflowDefinition> draftCache,
 190        CancellationToken cancellationToken)
 191    {
 192        // Check if we already have a draft for this workflow
 1193        if (draftCache.TryGetValue(definitionId, out var cachedDraft))
 0194            return cachedDraft;
 195
 196        // Create or get a draft for this workflow
 1197        var draft = await publisher.GetDraftAsync(definitionId, VersionOptions.Latest, cancellationToken);
 1198        if (draft == null) return null;
 199
 200        // Store the draft in the cache for potential future use
 1201        draftCache[definitionId] = draft;
 202
 203        // Get the current published version of the workflow definition.
 1204        var publishedVersion = await workflowDefinitionStore.FindAsync(
 1205            WorkflowDefinitionHandle.ByDefinitionId(definitionId, VersionOptions.Published).ToFilter(),
 1206            cancellationToken);
 207
 208        // Update the activity registry to be able to materialize the workflow.
 1209        var activityDescriptor = workflowDefinitionActivityDescriptorFactory.CreateDescriptor(draft, publishedVersion);
 1210        activityRegistry.Add(typeof(WorkflowDefinitionActivityProvider), activityDescriptor);
 211
 1212        return draft;
 1213    }
 214
 215    private static IEnumerable<WorkflowDefinitionActivity> FindActivities(ActivityNode node, string definitionId)
 216    {
 217        // Do not drill into activities that are WorkflowDefinitionActivity
 5218        if (node.Activity is WorkflowDefinitionActivity)
 1219            yield break;
 220
 16221        foreach (var child in node.Children)
 222        {
 4223            if (child.Activity is WorkflowDefinitionActivity activity && activity.WorkflowDefinitionId == definitionId)
 1224                yield return activity;
 10225            foreach (var grandChildActivity in FindActivities(child, definitionId))
 1226                yield return grandChildActivity;
 4227        }
 4228    }
 229}