< Summary

Information
Class: Elsa.Workflows.Runtime.DefaultWorkflowDefinitionStorePopulator
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowDefinitionStorePopulator.cs
Line coverage
97%
Covered lines: 140
Uncovered lines: 3
Coverable lines: 143
Total lines: 291
Line coverage: 97.9%
Branch coverage
94%
Covered branches: 34
Total branches: 36
Branch coverage: 94.4%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
PopulateStoreAsync(...)100%210%
PopulateStoreAsync()100%44100%
AddAsync(...)100%11100%
AddAsync()100%44100%
AssignIdentities()100%11100%
AddOrUpdateAsync()100%11100%
AddOrUpdateCoreAsync()81.81%222295.08%
UpdateIsLatest()100%44100%
UpdateIsPublished()100%66100%
SyncExistingCopies(...)100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowDefinitionStorePopulator.cs

#LineLine coverage
 1using Elsa.Common;
 2using Elsa.Common.Models;
 3using Elsa.Workflows.Activities;
 4using Elsa.Workflows.Management;
 5using Elsa.Workflows.Management.Entities;
 6using Elsa.Workflows.Management.Filters;
 7using Microsoft.Extensions.Logging;
 8using Open.Linq.AsyncExtensions;
 9
 10namespace Elsa.Workflows.Runtime;
 11
 12/// <inheritdoc />
 13public class DefaultWorkflowDefinitionStorePopulator : IWorkflowDefinitionStorePopulator
 14{
 15    private readonly Func<IEnumerable<IWorkflowsProvider>> _workflowDefinitionProviders;
 16    private readonly ITriggerIndexer _triggerIndexer;
 17    private readonly IWorkflowDefinitionStore _workflowDefinitionStore;
 18    private readonly IActivitySerializer _activitySerializer;
 19    private readonly IPayloadSerializer _payloadSerializer;
 20    private readonly ISystemClock _systemClock;
 21    private readonly IIdentityGraphService _identityGraphService;
 22    private readonly ILogger<DefaultWorkflowDefinitionStorePopulator> _logger;
 15223    private readonly SemaphoreSlim _semaphore = new(1, 1);
 24
 25    /// <summary>
 26    /// Initializes a new instance of the <see cref="DefaultWorkflowDefinitionStorePopulator"/> class.
 27    /// </summary>
 15228    public DefaultWorkflowDefinitionStorePopulator(
 15229        Func<IEnumerable<IWorkflowsProvider>> workflowDefinitionProviders,
 15230        ITriggerIndexer triggerIndexer,
 15231        IWorkflowDefinitionStore workflowDefinitionStore,
 15232        IActivitySerializer activitySerializer,
 15233        IPayloadSerializer payloadSerializer,
 15234        ISystemClock systemClock,
 15235        IIdentityGraphService identityGraphService,
 15236        ILogger<DefaultWorkflowDefinitionStorePopulator> logger)
 37    {
 15238        _workflowDefinitionProviders = workflowDefinitionProviders;
 15239        _triggerIndexer = triggerIndexer;
 15240        _workflowDefinitionStore = workflowDefinitionStore;
 15241        _activitySerializer = activitySerializer;
 15242        _payloadSerializer = payloadSerializer;
 15243        _systemClock = systemClock;
 15244        _identityGraphService = identityGraphService;
 15245        _logger = logger;
 15246    }
 47
 48    /// <inheritdoc />
 49    public Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(CancellationToken cancellationToken = default)
 50    {
 051        return PopulateStoreAsync(true, cancellationToken);
 52    }
 53
 54    /// <inheritdoc />
 55    public async Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(bool indexTriggers, CancellationToken cancella
 56    {
 28457        var providers = _workflowDefinitionProviders();
 28458        var workflowDefinitions = new List<WorkflowDefinition>();
 59
 122860        foreach (var provider in providers)
 61        {
 33062            var results = await provider.GetWorkflowsAsync(cancellationToken).AsTask().ToList();
 63
 221664            foreach (var result in results)
 65            {
 77866                var workflowDefinition = await AddAsync(result, indexTriggers, cancellationToken);
 77867                workflowDefinitions.Add(workflowDefinition);
 68            }
 69        }
 70
 28471        return workflowDefinitions;
 28472    }
 73
 74    /// <inheritdoc />
 75    public Task<WorkflowDefinition> AddAsync(MaterializedWorkflow materializedWorkflow, CancellationToken cancellationTo
 76    {
 1377        return AddAsync(materializedWorkflow, true, cancellationToken);
 78    }
 79
 80    /// <inheritdoc />
 81    public async Task<WorkflowDefinition> AddAsync(MaterializedWorkflow materializedWorkflow, bool indexTriggers, Cancel
 82    {
 79183        await AssignIdentities(materializedWorkflow.Workflow, cancellationToken);
 79184        var workflowDefinition = await AddOrUpdateAsync(materializedWorkflow, cancellationToken);
 85
 79186        if (indexTriggers && workflowDefinition.IsPublished)
 59287            await _triggerIndexer.IndexTriggersAsync(workflowDefinition, cancellationToken);
 88
 79189        return workflowDefinition;
 79190    }
 91
 92    private async Task AssignIdentities(Workflow workflow, CancellationToken cancellationToken)
 93    {
 79194        await _identityGraphService.AssignIdentitiesAsync(workflow, cancellationToken);
 79195    }
 96
 97    private async Task<WorkflowDefinition> AddOrUpdateAsync(MaterializedWorkflow materializedWorkflow, CancellationToken
 98    {
 79199        await _semaphore.WaitAsync(cancellationToken);
 100
 101        try
 102        {
 791103            return await AddOrUpdateCoreAsync(materializedWorkflow, cancellationToken);
 104        }
 105        finally
 106        {
 791107            _semaphore.Release();
 108        }
 791109    }
 110
 111    private async Task<WorkflowDefinition> AddOrUpdateCoreAsync(MaterializedWorkflow materializedWorkflow, CancellationT
 112    {
 791113        var workflow = materializedWorkflow.Workflow;
 791114        var definitionId = workflow.Identity.DefinitionId;
 115
 791116        var existingWorkflowLatest = false;
 791117        var existingWorkflowPublished = false;
 118
 119        // Serialize materializer context.
 791120        var materializerContext = materializedWorkflow.MaterializerContext;
 791121        var materializerContextJson = materializerContext != null ? _payloadSerializer.Serialize(materializerContext) : 
 122
 123        // Determine StringData and OriginalSource based on what's provided
 124        string? stringData;
 791125        var originalSource = materializedWorkflow.OriginalSource;
 126
 791127        if (originalSource != null)
 128        {
 129            // NEW WAY: OriginalSource is provided
 130            // For JSON workflows, we still need to populate StringData with the serialized root for backwards compatibi
 352131            stringData = materializedWorkflow.MaterializerName == "Json" ? _activitySerializer.Serialize(workflow.Root) 
 352132                // For new formats (ElsaScript, YAML, etc.), only OriginalSource is needed
 352133                // StringData can be null as these materializers only use OriginalSource
 352134                null;
 135        }
 136        else
 137        {
 138            // OLD WAY: No OriginalSource provided (backwards compatibility for existing workflows)
 139            // Serialize the workflow root as before
 439140            stringData = _activitySerializer.Serialize(workflow.Root);
 141        }
 142
 143        // Check if there's already a workflow definition stored with this definition ID and version.
 791144        var specificVersionFilter = new WorkflowDefinitionFilter
 791145        {
 791146            DefinitionId = definitionId,
 791147            VersionOptions = VersionOptions.SpecificVersion(workflow.Identity.Version)
 791148        };
 149
 791150        var existingDefinitionVersion = await _workflowDefinitionStore.FindAsync(specificVersionFilter, cancellationToke
 151
 152        // Set up a list to collect all workflow definitions to be persisted.
 791153        var workflowDefinitionsToSave = new HashSet<WorkflowDefinition>();
 154
 791155        if (existingDefinitionVersion != null)
 156        {
 600157            workflowDefinitionsToSave.Add(existingDefinitionVersion);
 158
 600159            if (existingDefinitionVersion.Id != workflow.Identity.Id)
 160            {
 161                // It's possible that the imported workflow definition has a different ID than the existing one in the s
 162                // In a future update, we might store this discrepancy in a "troubleshooting" table and provide tooling 
 163                // See https://github.com/elsa-workflows/elsa-core/issues/5540
 2164                _logger.LogWarning("Workflow with ID {WorkflowId} already exists with a different ID {ExistingWorkflowId
 165            }
 166        }
 167
 791168        await UpdateIsLatest();
 791169        await UpdateIsPublished();
 170
 791171        var workflowDefinition = existingDefinitionVersion ?? new WorkflowDefinition
 791172        {
 791173            DefinitionId = workflow.Identity.DefinitionId,
 791174            Id = workflow.Identity.Id,
 791175            Version = workflow.Identity.Version,
 791176            TenantId = workflow.Identity.TenantId,
 791177        };
 178
 791179        workflowDefinition.Description = workflow.WorkflowMetadata.Description;
 791180        workflowDefinition.Name = workflow.WorkflowMetadata.Name;
 791181        workflowDefinition.ToolVersion = workflow.WorkflowMetadata.ToolVersion;
 791182        workflowDefinition.IsLatest = !existingWorkflowLatest;
 791183        workflowDefinition.IsPublished = !existingWorkflowPublished && workflow.Publication.IsPublished;
 791184        workflowDefinition.IsReadonly = workflow.IsReadonly;
 791185        workflowDefinition.IsSystem = workflow.IsSystem;
 791186        workflowDefinition.CustomProperties = workflow.CustomProperties;
 791187        workflowDefinition.Variables = workflow.Variables;
 791188        workflowDefinition.Inputs = workflow.Inputs;
 791189        workflowDefinition.Outputs = workflow.Outputs;
 791190        workflowDefinition.Outcomes = workflow.Outcomes;
 791191        workflowDefinition.StringData = stringData;
 791192        workflowDefinition.OriginalSource = originalSource;
 791193        workflowDefinition.CreatedAt = workflow.WorkflowMetadata.CreatedAt == default ? _systemClock.UtcNow : workflow.W
 791194        workflowDefinition.Options = workflow.Options;
 791195        workflowDefinition.ProviderName = materializedWorkflow.ProviderName;
 791196        workflowDefinition.MaterializerContext = materializerContextJson;
 791197        workflowDefinition.MaterializerName = materializedWorkflow.MaterializerName;
 198
 800199        if (existingDefinitionVersion is null && workflowDefinitionsToSave.Any(w => w.Id == workflowDefinition.Id))
 200        {
 2201            _logger.LogInformation("Workflow with ID {WorkflowId} already exists", workflowDefinition.Id);
 2202            return workflowDefinition;
 203        }
 204
 789205        workflowDefinitionsToSave.Add(workflowDefinition);
 206
 1585207        var duplicates = workflowDefinitionsToSave.GroupBy(wd => wd.Id)
 796208            .Where(g => g.Count() > 1)
 0209            .Select(g => g.Key)
 789210            .ToList();
 211
 789212        if (duplicates.Any())
 213        {
 0214            throw new Exception($"Unable to update WorkflowDefinition with ids {string.Join(',', duplicates)} multiple t
 215        }
 216
 789217        await _workflowDefinitionStore.SaveManyAsync(workflowDefinitionsToSave, cancellationToken);
 789218        return workflowDefinition;
 219
 220        async Task UpdateIsLatest()
 221        {
 222            // Always try to update the IsLatest property based on the VersionNumber
 223
 224            // Reset current latest definitions.
 791225            var filter = new WorkflowDefinitionFilter
 791226            {
 791227                DefinitionId = definitionId,
 791228                VersionOptions = VersionOptions.Latest
 791229            };
 791230            var latestWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationToken)).To
 231
 232            // If the latest definitions contains definitions with the same ID then we need to replace them with the lat
 791233            SyncExistingCopies(latestWorkflowDefinitions, workflowDefinitionsToSave);
 234
 2832235            foreach (var latestWorkflowDefinition in latestWorkflowDefinitions)
 236            {
 625237                if (latestWorkflowDefinition.Version > workflow.Identity.Version)
 238                {
 16239                    _logger.LogWarning("A more recent version of the workflow has been found, overwriting the IsLatest p
 16240                    existingWorkflowLatest = true;
 16241                    continue;
 242                }
 243
 609244                latestWorkflowDefinition.IsLatest = false;
 609245                workflowDefinitionsToSave.Add(latestWorkflowDefinition);
 246            }
 791247        }
 248
 249        async Task UpdateIsPublished()
 250        {
 251            // If the workflow being added is configured to be the published version, then we need to reset the current 
 791252            if (workflow.Publication.IsPublished)
 253            {
 254                // Reset current published definitions.
 772255                var filter = new WorkflowDefinitionFilter
 772256                {
 772257                    DefinitionId = definitionId,
 772258                    VersionOptions = VersionOptions.Published
 772259                };
 772260                var publishedWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationTok
 261
 262                // If the published workflow definitions contains definitions with the same ID as definitions in the lat
 772263                SyncExistingCopies(publishedWorkflowDefinitions, workflowDefinitionsToSave);
 264
 2748265                foreach (var publishedWorkflowDefinition in publishedWorkflowDefinitions)
 266                {
 602267                    if (publishedWorkflowDefinition.Version > workflow.Identity.Version)
 268                    {
 10269                        _logger.LogWarning("A more recent version of the workflow has been found to be published, overwr
 10270                        existingWorkflowPublished = true;
 10271                        continue;
 272                    }
 273
 592274                    publishedWorkflowDefinition.IsPublished = false;
 592275                    workflowDefinitionsToSave.Add(publishedWorkflowDefinition);
 276                }
 277            }
 791278        }
 791279    }
 280
 281    /// <summary>
 282    /// Syncs the items in the primary list with existing items in the secondary list, even when the object instances ar
 283    /// </summary>
 284    private void SyncExistingCopies(List<WorkflowDefinition> primary, HashSet<WorkflowDefinition> secondary)
 285    {
 2755286        var ids = secondary.Select(x => x.Id).Distinct().ToList();
 2767287        var latestWorkflowDefinitions = primary.Where(x => ids.Contains(x.Id)).ToList();
 2767288        primary.RemoveAll(x => latestWorkflowDefinitions.Contains(x));
 2755289        primary.AddRange(secondary.Where(x => ids.Contains(x.Id)));
 1563290    }
 291}