< Summary

Information
Class: Elsa.Workflows.Runtime.DefaultWorkflowDefinitionStorePopulator
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowDefinitionStorePopulator.cs
Line coverage
97%
Covered lines: 140
Uncovered lines: 3
Coverable lines: 143
Total lines: 291
Line coverage: 97.9%
Branch coverage
94%
Covered branches: 34
Total branches: 36
Branch coverage: 94.4%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
PopulateStoreAsync(...)100%210%
PopulateStoreAsync()100%44100%
AddAsync(...)100%11100%
AddAsync()100%44100%
AssignIdentities()100%11100%
AddOrUpdateAsync()100%11100%
AddOrUpdateCoreAsync()81.81%222295.08%
UpdateIsLatest()100%44100%
UpdateIsPublished()100%66100%
SyncExistingCopies(...)100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowDefinitionStorePopulator.cs

#LineLine coverage
 1using Elsa.Common;
 2using Elsa.Common.Models;
 3using Elsa.Workflows.Activities;
 4using Elsa.Workflows.Management;
 5using Elsa.Workflows.Management.Entities;
 6using Elsa.Workflows.Management.Filters;
 7using Microsoft.Extensions.Logging;
 8using Open.Linq.AsyncExtensions;
 9
 10namespace Elsa.Workflows.Runtime;
 11
 12/// <inheritdoc />
 13public class DefaultWorkflowDefinitionStorePopulator : IWorkflowDefinitionStorePopulator
 14{
 15    private readonly Func<IEnumerable<IWorkflowsProvider>> _workflowDefinitionProviders;
 16    private readonly ITriggerIndexer _triggerIndexer;
 17    private readonly IWorkflowDefinitionStore _workflowDefinitionStore;
 18    private readonly IActivitySerializer _activitySerializer;
 19    private readonly IPayloadSerializer _payloadSerializer;
 20    private readonly ISystemClock _systemClock;
 21    private readonly IIdentityGraphService _identityGraphService;
 22    private readonly ILogger<DefaultWorkflowDefinitionStorePopulator> _logger;
 17623    private readonly SemaphoreSlim _semaphore = new(1, 1);
 24
 25    /// <summary>
 26    /// Initializes a new instance of the <see cref="DefaultWorkflowDefinitionStorePopulator"/> class.
 27    /// </summary>
 17628    public DefaultWorkflowDefinitionStorePopulator(
 17629        Func<IEnumerable<IWorkflowsProvider>> workflowDefinitionProviders,
 17630        ITriggerIndexer triggerIndexer,
 17631        IWorkflowDefinitionStore workflowDefinitionStore,
 17632        IActivitySerializer activitySerializer,
 17633        IPayloadSerializer payloadSerializer,
 17634        ISystemClock systemClock,
 17635        IIdentityGraphService identityGraphService,
 17636        ILogger<DefaultWorkflowDefinitionStorePopulator> logger)
 37    {
 17638        _workflowDefinitionProviders = workflowDefinitionProviders;
 17639        _triggerIndexer = triggerIndexer;
 17640        _workflowDefinitionStore = workflowDefinitionStore;
 17641        _activitySerializer = activitySerializer;
 17642        _payloadSerializer = payloadSerializer;
 17643        _systemClock = systemClock;
 17644        _identityGraphService = identityGraphService;
 17645        _logger = logger;
 17646    }
 47
 48    /// <inheritdoc />
 49    public Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(CancellationToken cancellationToken = default)
 50    {
 051        return PopulateStoreAsync(true, cancellationToken);
 52    }
 53
 54    /// <inheritdoc />
 55    public async Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(bool indexTriggers, CancellationToken cancella
 56    {
 32057        var providers = _workflowDefinitionProviders();
 32058        var workflowDefinitions = new List<WorkflowDefinition>();
 59
 142060        foreach (var provider in providers)
 61        {
 39062            var results = await provider.GetWorkflowsAsync(cancellationToken).AsTask().ToList();
 63
 460064            foreach (var result in results)
 65            {
 191066                var workflowDefinition = await AddAsync(result, indexTriggers, cancellationToken);
 191067                workflowDefinitions.Add(workflowDefinition);
 68            }
 69        }
 70
 32071        return workflowDefinitions;
 32072    }
 73
 74    /// <inheritdoc />
 75    public Task<WorkflowDefinition> AddAsync(MaterializedWorkflow materializedWorkflow, CancellationToken cancellationTo
 76    {
 1377        return AddAsync(materializedWorkflow, true, cancellationToken);
 78    }
 79
 80    /// <inheritdoc />
 81    public async Task<WorkflowDefinition> AddAsync(MaterializedWorkflow materializedWorkflow, bool indexTriggers, Cancel
 82    {
 192383        await AssignIdentities(materializedWorkflow.Workflow, cancellationToken);
 192384        var workflowDefinition = await AddOrUpdateAsync(materializedWorkflow, cancellationToken);
 85
 192386        if (indexTriggers && workflowDefinition.IsPublished)
 118887            await _triggerIndexer.IndexTriggersAsync(workflowDefinition, cancellationToken);
 88
 192389        return workflowDefinition;
 192390    }
 91
 92    private async Task AssignIdentities(Workflow workflow, CancellationToken cancellationToken)
 93    {
 192394        await _identityGraphService.AssignIdentitiesAsync(workflow, cancellationToken);
 192395    }
 96
 97    private async Task<WorkflowDefinition> AddOrUpdateAsync(MaterializedWorkflow materializedWorkflow, CancellationToken
 98    {
 192399        await _semaphore.WaitAsync(cancellationToken);
 100
 101        try
 102        {
 1923103            return await AddOrUpdateCoreAsync(materializedWorkflow, cancellationToken);
 104        }
 105        finally
 106        {
 1923107            _semaphore.Release();
 108        }
 1923109    }
 110
 111    private async Task<WorkflowDefinition> AddOrUpdateCoreAsync(MaterializedWorkflow materializedWorkflow, CancellationT
 112    {
 1923113        var workflow = materializedWorkflow.Workflow;
 1923114        var definitionId = workflow.Identity.DefinitionId;
 115
 1923116        var existingWorkflowLatest = false;
 1923117        var existingWorkflowPublished = false;
 118
 119        // Serialize materializer context.
 1923120        var materializerContext = materializedWorkflow.MaterializerContext;
 1923121        var materializerContextJson = materializerContext != null ? _payloadSerializer.Serialize(materializerContext) : 
 122
 123        // Determine StringData and OriginalSource based on what's provided
 124        string? stringData;
 1923125        var originalSource = materializedWorkflow.OriginalSource;
 126
 1923127        if (originalSource != null)
 128        {
 129            // NEW WAY: OriginalSource is provided
 130            // For JSON workflows, we still need to populate StringData with the serialized root for backwards compatibi
 604131            stringData = materializedWorkflow.MaterializerName == "Json" ? _activitySerializer.Serialize(workflow.Root) 
 604132                // For new formats (ElsaScript, YAML, etc.), only OriginalSource is needed
 604133                // StringData can be null as these materializers only use OriginalSource
 604134                null;
 135        }
 136        else
 137        {
 138            // OLD WAY: No OriginalSource provided (backwards compatibility for existing workflows)
 139            // Serialize the workflow root as before
 1319140            stringData = _activitySerializer.Serialize(workflow.Root);
 141        }
 142
 143        // Check if there's already a workflow definition stored with this definition ID and version.
 1923144        var specificVersionFilter = new WorkflowDefinitionFilter
 1923145        {
 1923146            DefinitionId = definitionId,
 1923147            VersionOptions = VersionOptions.SpecificVersion(workflow.Identity.Version)
 1923148        };
 149
 1923150        var existingDefinitionVersion = await _workflowDefinitionStore.FindAsync(specificVersionFilter, cancellationToke
 151
 152        // Set up a list to collect all workflow definitions to be persisted.
 1923153        var workflowDefinitionsToSave = new HashSet<WorkflowDefinition>();
 154
 1923155        if (existingDefinitionVersion != null)
 156        {
 1718157            workflowDefinitionsToSave.Add(existingDefinitionVersion);
 158
 1718159            if (existingDefinitionVersion.Id != workflow.Identity.Id)
 160            {
 161                // It's possible that the imported workflow definition has a different ID than the existing one in the s
 162                // In a future update, we might store this discrepancy in a "troubleshooting" table and provide tooling 
 163                // See https://github.com/elsa-workflows/elsa-core/issues/5540
 2164                _logger.LogWarning("Workflow with ID {WorkflowId} already exists with a different ID {ExistingWorkflowId
 165            }
 166        }
 167
 1923168        await UpdateIsLatest();
 1923169        await UpdateIsPublished();
 170
 1923171        var workflowDefinition = existingDefinitionVersion ?? new WorkflowDefinition
 1923172        {
 1923173            DefinitionId = workflow.Identity.DefinitionId,
 1923174            Id = workflow.Identity.Id,
 1923175            Version = workflow.Identity.Version,
 1923176            TenantId = workflow.Identity.TenantId,
 1923177        };
 178
 1923179        workflowDefinition.Description = workflow.WorkflowMetadata.Description;
 1923180        workflowDefinition.Name = workflow.WorkflowMetadata.Name;
 1923181        workflowDefinition.ToolVersion = workflow.WorkflowMetadata.ToolVersion;
 1923182        workflowDefinition.IsLatest = !existingWorkflowLatest;
 1923183        workflowDefinition.IsPublished = !existingWorkflowPublished && workflow.Publication.IsPublished;
 1923184        workflowDefinition.IsReadonly = workflow.IsReadonly;
 1923185        workflowDefinition.IsSystem = workflow.IsSystem;
 1923186        workflowDefinition.CustomProperties = workflow.CustomProperties;
 1923187        workflowDefinition.Variables = workflow.Variables;
 1923188        workflowDefinition.Inputs = workflow.Inputs;
 1923189        workflowDefinition.Outputs = workflow.Outputs;
 1923190        workflowDefinition.Outcomes = workflow.Outcomes;
 1923191        workflowDefinition.StringData = stringData;
 1923192        workflowDefinition.OriginalSource = originalSource;
 1923193        workflowDefinition.CreatedAt = workflow.WorkflowMetadata.CreatedAt == default ? _systemClock.UtcNow : workflow.W
 1923194        workflowDefinition.Options = workflow.Options;
 1923195        workflowDefinition.ProviderName = materializedWorkflow.ProviderName;
 1923196        workflowDefinition.MaterializerContext = materializerContextJson;
 1923197        workflowDefinition.MaterializerName = materializedWorkflow.MaterializerName;
 198
 1932199        if (existingDefinitionVersion is null && workflowDefinitionsToSave.Any(w => w.Id == workflowDefinition.Id))
 200        {
 2201            _logger.LogInformation("Workflow with ID {WorkflowId} already exists", workflowDefinition.Id);
 2202            return workflowDefinition;
 203        }
 204
 1921205        workflowDefinitionsToSave.Add(workflowDefinition);
 206
 3849207        var duplicates = workflowDefinitionsToSave.GroupBy(wd => wd.Id)
 1928208            .Where(g => g.Count() > 1)
 0209            .Select(g => g.Key)
 1921210            .ToList();
 211
 1921212        if (duplicates.Any())
 213        {
 0214            throw new Exception($"Unable to update WorkflowDefinition with ids {string.Join(',', duplicates)} multiple t
 215        }
 216
 1921217        await _workflowDefinitionStore.SaveManyAsync(workflowDefinitionsToSave, cancellationToken);
 1921218        return workflowDefinition;
 219
 220        async Task UpdateIsLatest()
 221        {
 222            // Always try to update the IsLatest property based on the VersionNumber
 223
 224            // Reset current latest definitions.
 1923225            var filter = new WorkflowDefinitionFilter
 1923226            {
 1923227                DefinitionId = definitionId,
 1923228                VersionOptions = VersionOptions.Latest
 1923229            };
 1923230            var latestWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationToken)).To
 231
 232            // If the latest definitions contains definitions with the same ID then we need to replace them with the lat
 1923233            SyncExistingCopies(latestWorkflowDefinitions, workflowDefinitionsToSave);
 234
 7356235            foreach (var latestWorkflowDefinition in latestWorkflowDefinitions)
 236            {
 1755237                if (latestWorkflowDefinition.Version > workflow.Identity.Version)
 238                {
 28239                    _logger.LogWarning("A more recent version of the workflow has been found, overwriting the IsLatest p
 28240                    existingWorkflowLatest = true;
 28241                    continue;
 242                }
 243
 1727244                latestWorkflowDefinition.IsLatest = false;
 1727245                workflowDefinitionsToSave.Add(latestWorkflowDefinition);
 246            }
 1923247        }
 248
 249        async Task UpdateIsPublished()
 250        {
 251            // If the workflow being added is configured to be the published version, then we need to reset the current 
 1923252            if (workflow.Publication.IsPublished)
 253            {
 254                // Reset current published definitions.
 1880255                var filter = new WorkflowDefinitionFilter
 1880256                {
 1880257                    DefinitionId = definitionId,
 1880258                    VersionOptions = VersionOptions.Published
 1880259                };
 1880260                var publishedWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationTok
 261
 262                // If the published workflow definitions contains definitions with the same ID as definitions in the lat
 1880263                SyncExistingCopies(publishedWorkflowDefinitions, workflowDefinitionsToSave);
 264
 7152265                foreach (var publishedWorkflowDefinition in publishedWorkflowDefinitions)
 266                {
 1696267                    if (publishedWorkflowDefinition.Version > workflow.Identity.Version)
 268                    {
 10269                        _logger.LogWarning("A more recent version of the workflow has been found to be published, overwr
 10270                        existingWorkflowPublished = true;
 10271                        continue;
 272                    }
 273
 1686274                    publishedWorkflowDefinition.IsPublished = false;
 1686275                    workflowDefinitionsToSave.Add(publishedWorkflowDefinition);
 276                }
 277            }
 1923278        }
 1923279    }
 280
 281    /// <summary>
 282    /// Syncs the items in the primary list with existing items in the secondary list, even when the object instances ar
 283    /// </summary>
 284    private void SyncExistingCopies(List<WorkflowDefinition> primary, HashSet<WorkflowDefinition> secondary)
 285    {
 7207286        var ids = secondary.Select(x => x.Id).Distinct().ToList();
 7219287        var latestWorkflowDefinitions = primary.Where(x => ids.Contains(x.Id)).ToList();
 7219288        primary.RemoveAll(x => latestWorkflowDefinitions.Contains(x));
 7207289        primary.AddRange(secondary.Where(x => ids.Contains(x.Id)));
 3803290    }
 291}