< Summary

Information
Class: Elsa.Workflows.Runtime.DefaultWorkflowDefinitionStorePopulator
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowDefinitionStorePopulator.cs
Line coverage
98%
Covered lines: 157
Uncovered lines: 2
Coverable lines: 159
Total lines: 317
Line coverage: 98.7%
Branch coverage
93%
Covered branches: 43
Total branches: 46
Branch coverage: 93.4%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
PopulateStoreAsync(...)100%11100%
PopulateStoreAsync()100%1010100%
AddAsync(...)100%11100%
AddAsync()100%44100%
AssignIdentities()100%11100%
AddOrUpdateAsync()100%11100%
AddOrUpdateCoreAsync()80.76%262695.31%
UpdateIsLatest()100%44100%
UpdateIsPublished()100%66100%
SyncExistingCopies(...)100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowDefinitionStorePopulator.cs

#LineLine coverage
 1using Elsa.Common;
 2using Elsa.Common.Models;
 3using Elsa.Common.Multitenancy;
 4using Elsa.Workflows.Activities;
 5using Elsa.Workflows.Management;
 6using Elsa.Workflows.Management.Entities;
 7using Elsa.Workflows.Management.Filters;
 8using Microsoft.Extensions.Logging;
 9using Open.Linq.AsyncExtensions;
 10
 11namespace Elsa.Workflows.Runtime;
 12
 13/// <inheritdoc />
 14public class DefaultWorkflowDefinitionStorePopulator : IWorkflowDefinitionStorePopulator
 15{
 16    private readonly Func<IEnumerable<IWorkflowsProvider>> _workflowDefinitionProviders;
 17    private readonly ITriggerIndexer _triggerIndexer;
 18    private readonly IWorkflowDefinitionStore _workflowDefinitionStore;
 19    private readonly IActivitySerializer _activitySerializer;
 20    private readonly IPayloadSerializer _payloadSerializer;
 21    private readonly ISystemClock _systemClock;
 22    private readonly IIdentityGraphService _identityGraphService;
 23    private readonly ITenantAccessor _tenantAccessor;
 24    private readonly ILogger<DefaultWorkflowDefinitionStorePopulator> _logger;
 21125    private readonly SemaphoreSlim _semaphore = new(1, 1);
 26
 27    /// <summary>
 28    /// Initializes a new instance of the <see cref="DefaultWorkflowDefinitionStorePopulator"/> class.
 29    /// </summary>
 21130    public DefaultWorkflowDefinitionStorePopulator(
 21131        Func<IEnumerable<IWorkflowsProvider>> workflowDefinitionProviders,
 21132        ITriggerIndexer triggerIndexer,
 21133        IWorkflowDefinitionStore workflowDefinitionStore,
 21134        IActivitySerializer activitySerializer,
 21135        IPayloadSerializer payloadSerializer,
 21136        ISystemClock systemClock,
 21137        IIdentityGraphService identityGraphService,
 21138        ITenantAccessor tenantAccessor,
 21139        ILogger<DefaultWorkflowDefinitionStorePopulator> logger)
 40    {
 21141        _workflowDefinitionProviders = workflowDefinitionProviders;
 21142        _triggerIndexer = triggerIndexer;
 21143        _workflowDefinitionStore = workflowDefinitionStore;
 21144        _activitySerializer = activitySerializer;
 21145        _payloadSerializer = payloadSerializer;
 21146        _systemClock = systemClock;
 21147        _identityGraphService = identityGraphService;
 21148        _tenantAccessor = tenantAccessor;
 21149        _logger = logger;
 21150    }
 51
 52    /// <inheritdoc />
 53    public Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(CancellationToken cancellationToken = default)
 54    {
 755        return PopulateStoreAsync(true, cancellationToken);
 56    }
 57
 58    /// <inheritdoc />
 59    public async Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(bool indexTriggers, CancellationToken cancella
 60    {
 36961        var providers = _workflowDefinitionProviders();
 36962        var workflowDefinitions = new List<WorkflowDefinition>();
 36963        var currentTenantId = (_tenantAccessor.Tenant?.Id).NormalizeTenantId();
 64
 178465        foreach (var provider in providers)
 66        {
 52367            var results = await provider.GetWorkflowsAsync(cancellationToken).AsTask().ToList();
 68
 1202469            foreach (var result in results)
 70            {
 71                // Normalize tenant IDs for comparison (null becomes empty string)
 548972                var definitionTenantId = result.Workflow.Identity.TenantId.NormalizeTenantId();
 73
 74                // Only import workflows belonging to the current tenant or tenant-agnostic workflows (TenantId = "*").
 548975                if (definitionTenantId != currentTenantId && definitionTenantId != Tenant.AgnosticTenantId)
 76                {
 358577                    _logger.LogDebug(
 358578                        "Skipping adding workflow {WorkflowId} from provider {Provider} because it belongs to tenant '{W
 358579                        result.Workflow.Identity.DefinitionId,
 358580                        provider.Name,
 358581                        result.Workflow.Identity.TenantId,
 358582                        currentTenantId);
 358583                    continue;
 84                }
 85
 190486                var workflowDefinition = await AddAsync(result, indexTriggers, cancellationToken);
 190487                workflowDefinitions.Add(workflowDefinition);
 88            }
 52389        }
 90
 36991        return workflowDefinitions;
 36992    }
 93
 94    /// <inheritdoc />
 95    public Task<WorkflowDefinition> AddAsync(MaterializedWorkflow materializedWorkflow, CancellationToken cancellationTo
 96    {
 1397        return AddAsync(materializedWorkflow, true, cancellationToken);
 98    }
 99
 100    /// <inheritdoc />
 101    public async Task<WorkflowDefinition> AddAsync(MaterializedWorkflow materializedWorkflow, bool indexTriggers, Cancel
 102    {
 1917103        await AssignIdentities(materializedWorkflow.Workflow, cancellationToken);
 1917104        var workflowDefinition = await AddOrUpdateAsync(materializedWorkflow, cancellationToken);
 105
 1917106        if (indexTriggers && workflowDefinition.IsPublished)
 1182107            await _triggerIndexer.IndexTriggersAsync(workflowDefinition, cancellationToken);
 108
 1917109        return workflowDefinition;
 1917110    }
 111
 112    private async Task AssignIdentities(Workflow workflow, CancellationToken cancellationToken)
 113    {
 1917114        await _identityGraphService.AssignIdentitiesAsync(workflow, cancellationToken);
 1917115    }
 116
 117    private async Task<WorkflowDefinition> AddOrUpdateAsync(MaterializedWorkflow materializedWorkflow, CancellationToken
 118    {
 1917119        await _semaphore.WaitAsync(cancellationToken);
 120
 121        try
 122        {
 1917123            return await AddOrUpdateCoreAsync(materializedWorkflow, cancellationToken);
 124        }
 125        finally
 126        {
 1917127            _semaphore.Release();
 128        }
 1917129    }
 130
 131    private async Task<WorkflowDefinition> AddOrUpdateCoreAsync(MaterializedWorkflow materializedWorkflow, CancellationT
 132    {
 1917133        var workflow = materializedWorkflow.Workflow;
 1917134        var definitionId = workflow.Identity.DefinitionId;
 135
 1917136        var existingWorkflowLatest = false;
 1917137        var existingWorkflowPublished = false;
 138
 139        // Serialize materializer context.
 1917140        var materializerContext = materializedWorkflow.MaterializerContext;
 1917141        var materializerContextJson = materializerContext != null ? _payloadSerializer.Serialize(materializerContext) : 
 142
 143        // Determine StringData and OriginalSource based on what's provided
 144        string? stringData;
 1917145        var originalSource = materializedWorkflow.OriginalSource;
 146
 1917147        if (originalSource != null)
 148        {
 149            // NEW WAY: OriginalSource is provided
 150            // For JSON workflows, we still need to populate StringData with the serialized root for backwards compatibi
 604151            stringData = materializedWorkflow.MaterializerName == "Json"
 604152                ? _activitySerializer.Serialize(workflow.Root)
 604153                :
 604154                // For new formats (ElsaScript, YAML, etc.), only OriginalSource is needed
 604155                // StringData can be null as these materializers only use OriginalSource
 604156                null;
 157        }
 158        else
 159        {
 160            // OLD WAY: No OriginalSource provided (backwards compatibility for existing workflows)
 161            // Serialize the workflow root as before
 1313162            stringData = _activitySerializer.Serialize(workflow.Root);
 163        }
 164
 165        // Check if there's already a workflow definition stored with this definition ID and version.
 1917166        var specificVersionFilter = new WorkflowDefinitionFilter
 1917167        {
 1917168            DefinitionId = definitionId,
 1917169            VersionOptions = VersionOptions.SpecificVersion(workflow.Identity.Version)
 1917170        };
 171
 1917172        var existingDefinitionVersion = await _workflowDefinitionStore.FindAsync(specificVersionFilter, cancellationToke
 173
 174        // Set up a list to collect all workflow definitions to be persisted.
 1917175        var workflowDefinitionsToSave = new HashSet<WorkflowDefinition>();
 176
 1917177        if (existingDefinitionVersion != null)
 178        {
 1706179            workflowDefinitionsToSave.Add(existingDefinitionVersion);
 180
 1706181            if (existingDefinitionVersion.Id != workflow.Identity.Id)
 182            {
 183                // It's possible that the imported workflow definition has a different ID than the existing one in the s
 184                // In a future update, we might store this discrepancy in a "troubleshooting" table and provide tooling 
 185                // See https://github.com/elsa-workflows/elsa-core/issues/5540
 2186                _logger.LogWarning("Workflow with ID {WorkflowId} already exists with a different ID {ExistingWorkflowId
 187            }
 188        }
 189
 1917190        await UpdateIsLatest();
 1917191        await UpdateIsPublished();
 192
 193        // Determine the tenant ID for the workflow definition
 194        // If the workflow has no tenant ID, use the current tenant (normalized to handle null -> "")
 1917195        var workflowTenantId = workflow.Identity.TenantId ?? (_tenantAccessor.Tenant?.Id).NormalizeTenantId();
 196
 1917197        var workflowDefinition = existingDefinitionVersion ?? new WorkflowDefinition
 1917198        {
 1917199            DefinitionId = workflow.Identity.DefinitionId,
 1917200            Id = workflow.Identity.Id,
 1917201            Version = workflow.Identity.Version,
 1917202            TenantId = workflowTenantId,
 1917203        };
 204
 1917205        workflowDefinition.Description = workflow.WorkflowMetadata.Description;
 1917206        workflowDefinition.Name = workflow.WorkflowMetadata.Name;
 1917207        workflowDefinition.ToolVersion = workflow.WorkflowMetadata.ToolVersion;
 1917208        workflowDefinition.IsLatest = !existingWorkflowLatest;
 1917209        workflowDefinition.IsPublished = !existingWorkflowPublished && workflow.Publication.IsPublished;
 1917210        workflowDefinition.IsReadonly = workflow.IsReadonly;
 1917211        workflowDefinition.IsSystem = workflow.IsSystem;
 1917212        workflowDefinition.CustomProperties = workflow.CustomProperties;
 1917213        workflowDefinition.Variables = workflow.Variables;
 1917214        workflowDefinition.Inputs = workflow.Inputs;
 1917215        workflowDefinition.Outputs = workflow.Outputs;
 1917216        workflowDefinition.Outcomes = workflow.Outcomes;
 1917217        workflowDefinition.StringData = stringData;
 1917218        workflowDefinition.OriginalSource = originalSource;
 1917219        workflowDefinition.CreatedAt = workflow.WorkflowMetadata.CreatedAt == default ? _systemClock.UtcNow : workflow.W
 1917220        workflowDefinition.Options = workflow.Options;
 1917221        workflowDefinition.ProviderName = materializedWorkflow.ProviderName;
 1917222        workflowDefinition.MaterializerContext = materializerContextJson;
 1917223        workflowDefinition.MaterializerName = materializedWorkflow.MaterializerName;
 224
 1926225        if (existingDefinitionVersion is null && workflowDefinitionsToSave.Any(w => w.Id == workflowDefinition.Id))
 226        {
 2227            _logger.LogInformation("Workflow with ID {WorkflowId} already exists", workflowDefinition.Id);
 2228            return workflowDefinition;
 229        }
 230
 1915231        workflowDefinitionsToSave.Add(workflowDefinition);
 232
 3837233        var duplicates = workflowDefinitionsToSave.GroupBy(wd => wd.Id)
 1922234            .Where(g => g.Count() > 1)
 0235            .Select(g => g.Key)
 1915236            .ToList();
 237
 1915238        if (duplicates.Any())
 239        {
 0240            throw new Exception($"Unable to update WorkflowDefinition with ids {string.Join(',', duplicates)} multiple t
 241        }
 242
 1915243        await _workflowDefinitionStore.SaveManyAsync(workflowDefinitionsToSave, cancellationToken);
 1915244        return workflowDefinition;
 245
 246        async Task UpdateIsLatest()
 247        {
 248            // Always try to update the IsLatest property based on the VersionNumber
 249
 250            // Reset current latest definitions.
 1917251            var filter = new WorkflowDefinitionFilter
 1917252            {
 1917253                DefinitionId = definitionId,
 1917254                VersionOptions = VersionOptions.Latest
 1917255            };
 1917256            var latestWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationToken)).To
 257
 258            // If the latest definitions contains definitions with the same ID then we need to replace them with the lat
 1917259            SyncExistingCopies(latestWorkflowDefinitions, workflowDefinitionsToSave);
 260
 7320261            foreach (var latestWorkflowDefinition in latestWorkflowDefinitions)
 262            {
 1743263                if (latestWorkflowDefinition.Version > workflow.Identity.Version)
 264                {
 28265                    _logger.LogWarning("A more recent version of the workflow has been found, overwriting the IsLatest p
 28266                    existingWorkflowLatest = true;
 28267                    continue;
 268                }
 269
 1715270                latestWorkflowDefinition.IsLatest = false;
 1715271                workflowDefinitionsToSave.Add(latestWorkflowDefinition);
 272            }
 1917273        }
 274
 275        async Task UpdateIsPublished()
 276        {
 277            // If the workflow being added is configured to be the published version, then we need to reset the current 
 1917278            if (workflow.Publication.IsPublished)
 279            {
 280                // Reset current published definitions.
 1874281                var filter = new WorkflowDefinitionFilter
 1874282                {
 1874283                    DefinitionId = definitionId,
 1874284                    VersionOptions = VersionOptions.Published
 1874285                };
 1874286                var publishedWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationTok
 287
 288                // If the published workflow definitions contains definitions with the same ID as definitions in the lat
 1874289                SyncExistingCopies(publishedWorkflowDefinitions, workflowDefinitionsToSave);
 290
 7116291                foreach (var publishedWorkflowDefinition in publishedWorkflowDefinitions)
 292                {
 1684293                    if (publishedWorkflowDefinition.Version > workflow.Identity.Version)
 294                    {
 10295                        _logger.LogWarning("A more recent version of the workflow has been found to be published, overwr
 10296                        existingWorkflowPublished = true;
 10297                        continue;
 298                    }
 299
 1674300                    publishedWorkflowDefinition.IsPublished = false;
 1674301                    workflowDefinitionsToSave.Add(publishedWorkflowDefinition);
 302                }
 303            }
 1917304        }
 1917305    }
 306
 307    /// <summary>
 308    /// Syncs the items in the primary list with existing items in the secondary list, even when the object instances ar
 309    /// </summary>
 310    private void SyncExistingCopies(List<WorkflowDefinition> primary, HashSet<WorkflowDefinition> secondary)
 311    {
 7171312        var ids = secondary.Select(x => x.Id).Distinct().ToList();
 7183313        var latestWorkflowDefinitions = primary.Where(x => ids.Contains(x.Id)).ToList();
 7183314        primary.RemoveAll(x => latestWorkflowDefinitions.Contains(x));
 7171315        primary.AddRange(secondary.Where(x => ids.Contains(x.Id)));
 3791316    }
 317}