< Summary

Information
Class: Elsa.Workflows.Runtime.DefaultWorkflowDefinitionStorePopulator
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowDefinitionStorePopulator.cs
Line coverage
95%
Covered lines: 160
Uncovered lines: 8
Coverable lines: 168
Total lines: 328
Line coverage: 95.2%
Branch coverage
95%
Covered branches: 40
Total branches: 42
Branch coverage: 95.2%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
PopulateStoreAsync(...)100%11100%
PopulateStoreAsync()100%1010100%
AddAsync(...)100%11100%
AddAsync()100%11100%
<AddAsync()100%44100%
AssignIdentities()100%11100%
AddOrUpdateAsync()100%1145.45%
AddOrUpdateCoreAsync()83.33%242495.31%
UpdateIsLatest()100%44100%
UpdateIsPublished()100%66100%
SyncExistingCopies(...)100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowDefinitionStorePopulator.cs

#LineLine coverage
 1using Elsa.Common;
 2using Elsa.Common.Models;
 3using Elsa.Common.Multitenancy;
 4using Elsa.Workflows.Activities;
 5using Elsa.Workflows.Management;
 6using Elsa.Workflows.Management.Entities;
 7using Elsa.Workflows.Management.Filters;
 8using Microsoft.Extensions.Logging;
 9using Open.Linq.AsyncExtensions;
 10
 11namespace Elsa.Workflows.Runtime;
 12
 13/// <inheritdoc />
 14public class DefaultWorkflowDefinitionStorePopulator : IWorkflowDefinitionStorePopulator
 15{
 16    private readonly Func<IEnumerable<IWorkflowsProvider>> _workflowDefinitionProviders;
 17    private readonly ITriggerIndexer _triggerIndexer;
 18    private readonly IWorkflowDefinitionStore _workflowDefinitionStore;
 19    private readonly IActivitySerializer _activitySerializer;
 20    private readonly IPayloadSerializer _payloadSerializer;
 21    private readonly ISystemClock _systemClock;
 22    private readonly IIdentityGraphService _identityGraphService;
 23    private readonly ITenantAccessor _tenantAccessor;
 24    private readonly ILogger<DefaultWorkflowDefinitionStorePopulator> _logger;
 19225    private readonly SemaphoreSlim _semaphore = new(1, 1);
 26
 27    /// <summary>
 28    /// Initializes a new instance of the <see cref="DefaultWorkflowDefinitionStorePopulator"/> class.
 29    /// </summary>
 19230    public DefaultWorkflowDefinitionStorePopulator(
 19231        Func<IEnumerable<IWorkflowsProvider>> workflowDefinitionProviders,
 19232        ITriggerIndexer triggerIndexer,
 19233        IWorkflowDefinitionStore workflowDefinitionStore,
 19234        IActivitySerializer activitySerializer,
 19235        IPayloadSerializer payloadSerializer,
 19236        ISystemClock systemClock,
 19237        IIdentityGraphService identityGraphService,
 19238        ITenantAccessor tenantAccessor,
 19239        ILogger<DefaultWorkflowDefinitionStorePopulator> logger)
 40    {
 19241        _workflowDefinitionProviders = workflowDefinitionProviders;
 19242        _triggerIndexer = triggerIndexer;
 19243        _workflowDefinitionStore = workflowDefinitionStore;
 19244        _activitySerializer = activitySerializer;
 19245        _payloadSerializer = payloadSerializer;
 19246        _systemClock = systemClock;
 19247        _identityGraphService = identityGraphService;
 19248        _tenantAccessor = tenantAccessor;
 19249        _logger = logger;
 19250    }
 51
 52    /// <inheritdoc />
 53    public Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(CancellationToken cancellationToken = default)
 54    {
 755        return PopulateStoreAsync(true, cancellationToken);
 56    }
 57
 58    /// <inheritdoc />
 59    public async Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(bool indexTriggers, CancellationToken cancella
 60    {
 34561        var providers = _workflowDefinitionProviders();
 34562        var workflowDefinitions = new List<WorkflowDefinition>();
 34563        var currentTenantId = _tenantAccessor.TenantId;
 64
 158465        foreach (var provider in providers)
 66        {
 44767            var results = await provider.GetWorkflowsAsync(cancellationToken).AsTask().ToList();
 68
 768069            foreach (var result in results)
 70            {
 71                // Normalize tenant IDs for comparison (null becomes empty string)
 339372                var definitionTenantId = result.Workflow.Identity.TenantId.NormalizeTenantId();
 73
 74                // Only import workflows belonging to the current tenant or tenant-agnostic workflows (TenantId = "*").
 339375                if (definitionTenantId != currentTenantId && definitionTenantId != Tenant.AgnosticTenantId)
 76                {
 49577                    _logger.LogDebug(
 49578                        "Skipping adding workflow {WorkflowId} from provider {Provider} because it belongs to tenant '{W
 49579                        result.Workflow.Identity.DefinitionId,
 49580                        provider.Name,
 49581                        result.Workflow.Identity.TenantId,
 49582                        currentTenantId);
 49583                    continue;
 84                }
 85
 289886                var addResult = await AddAsync(result, indexTriggers, cancellationToken);
 579687                addResult.OnSuccess(workflowDefinition => workflowDefinitions.Add(workflowDefinition));
 88            }
 44789        }
 90
 34591        return workflowDefinitions;
 34592    }
 93
 94    /// <inheritdoc />
 95    public Task<Result<WorkflowDefinition>> AddAsync(MaterializedWorkflow materializedWorkflow, CancellationToken cancel
 96    {
 1397        return AddAsync(materializedWorkflow, true, cancellationToken);
 98    }
 99
 100    /// <inheritdoc />
 101    public async Task<Result<WorkflowDefinition>> AddAsync(MaterializedWorkflow materializedWorkflow, bool indexTriggers
 102    {
 2911103        await AssignIdentities(materializedWorkflow.Workflow, cancellationToken);
 2911104        var result = await AddOrUpdateAsync(materializedWorkflow, cancellationToken);
 105
 2911106        return await result.OnSuccessAsync(async workflowDefinition =>
 2911107        {
 2911108            if (indexTriggers && workflowDefinition.IsPublished)
 1434109                await _triggerIndexer.IndexTriggersAsync(workflowDefinition, cancellationToken);
 5822110        });
 2911111    }
 112
 113    private async Task AssignIdentities(Workflow workflow, CancellationToken cancellationToken)
 114    {
 2911115        await _identityGraphService.AssignIdentitiesAsync(workflow, cancellationToken);
 2911116    }
 117
 118    private async Task<Result<WorkflowDefinition>> AddOrUpdateAsync(MaterializedWorkflow materializedWorkflow, Cancellat
 119    {
 2911120        await _semaphore.WaitAsync(cancellationToken);
 121
 122        try
 123        {
 2911124            var workflowDefinition = await AddOrUpdateCoreAsync(materializedWorkflow, cancellationToken);
 2911125            return Result.Success(workflowDefinition);
 126        }
 0127        catch (Exception e)
 128        {
 0129            _logger.LogError(e, "An error occurred while populating the workflow definition store for workflow {Workflow
 0130                materializedWorkflow.Workflow.Identity.DefinitionId,
 0131                materializedWorkflow.Workflow.WorkflowMetadata.Name,
 0132                materializedWorkflow.Workflow.Identity.Version);
 133
 0134            return Result.Failure<WorkflowDefinition>(e);
 135        }
 136        finally
 137        {
 2911138            _semaphore.Release();
 139        }
 2911140    }
 141
 142    private async Task<WorkflowDefinition> AddOrUpdateCoreAsync(MaterializedWorkflow materializedWorkflow, CancellationT
 143    {
 2911144        var workflow = materializedWorkflow.Workflow;
 2911145        var definitionId = workflow.Identity.DefinitionId;
 146
 2911147        var existingWorkflowLatest = false;
 2911148        var existingWorkflowPublished = false;
 149
 150        // Serialize materializer context.
 2911151        var materializerContext = materializedWorkflow.MaterializerContext;
 2911152        var materializerContextJson = materializerContext != null ? _payloadSerializer.Serialize(materializerContext) : 
 153
 154        // Determine StringData and OriginalSource based on what's provided
 155        string? stringData;
 2911156        var originalSource = materializedWorkflow.OriginalSource;
 157
 2911158        if (originalSource != null)
 159        {
 160            // NEW WAY: OriginalSource is provided
 161            // For JSON workflows, we still need to populate StringData with the serialized root for backwards compatibi
 616162            stringData = materializedWorkflow.MaterializerName == "Json"
 616163                ? _activitySerializer.Serialize(workflow.Root)
 616164                :
 616165                // For new formats (ElsaScript, YAML, etc.), only OriginalSource is needed
 616166                // StringData can be null as these materializers only use OriginalSource
 616167                null;
 168        }
 169        else
 170        {
 171            // OLD WAY: No OriginalSource provided (backwards compatibility for existing workflows)
 172            // Serialize the workflow root as before
 2295173            stringData = _activitySerializer.Serialize(workflow.Root);
 174        }
 175
 176        // Check if there's already a workflow definition stored with this definition ID and version.
 2911177        var specificVersionFilter = new WorkflowDefinitionFilter
 2911178        {
 2911179            DefinitionId = definitionId,
 2911180            VersionOptions = VersionOptions.SpecificVersion(workflow.Identity.Version)
 2911181        };
 182
 2911183        var existingDefinitionVersion = await _workflowDefinitionStore.FindAsync(specificVersionFilter, cancellationToke
 184
 185        // Set up a list to collect all workflow definitions to be persisted.
 2911186        var workflowDefinitionsToSave = new HashSet<WorkflowDefinition>();
 187
 2911188        if (existingDefinitionVersion != null)
 189        {
 2697190            workflowDefinitionsToSave.Add(existingDefinitionVersion);
 191
 2697192            if (existingDefinitionVersion.Id != workflow.Identity.Id)
 193            {
 194                // It's possible that the imported workflow definition has a different ID than the existing one in the s
 195                // In a future update, we might store this discrepancy in a "troubleshooting" table and provide tooling 
 196                // See https://github.com/elsa-workflows/elsa-core/issues/5540
 2197                _logger.LogWarning("Workflow with ID {WorkflowId} already exists with a different ID {ExistingWorkflowId
 198            }
 199        }
 200
 2911201        await UpdateIsLatest();
 2911202        await UpdateIsPublished();
 203
 204        // Determine the tenant ID for the workflow definition
 205        // If the workflow has no tenant ID, use the current tenant (normalized to handle null -> "*")
 2911206        var workflowTenantId = workflow.Identity.TenantId ?? _tenantAccessor.TenantId;
 207
 2911208        var workflowDefinition = existingDefinitionVersion ?? new WorkflowDefinition
 2911209        {
 2911210            DefinitionId = workflow.Identity.DefinitionId,
 2911211            Id = workflow.Identity.Id,
 2911212            Version = workflow.Identity.Version,
 2911213            TenantId = workflowTenantId,
 2911214        };
 215
 2911216        workflowDefinition.Description = workflow.WorkflowMetadata.Description;
 2911217        workflowDefinition.Name = workflow.WorkflowMetadata.Name;
 2911218        workflowDefinition.ToolVersion = workflow.WorkflowMetadata.ToolVersion;
 2911219        workflowDefinition.IsLatest = !existingWorkflowLatest;
 2911220        workflowDefinition.IsPublished = !existingWorkflowPublished && workflow.Publication.IsPublished;
 2911221        workflowDefinition.IsReadonly = workflow.IsReadonly;
 2911222        workflowDefinition.IsSystem = workflow.IsSystem;
 2911223        workflowDefinition.CustomProperties = workflow.CustomProperties;
 2911224        workflowDefinition.Variables = workflow.Variables;
 2911225        workflowDefinition.Inputs = workflow.Inputs;
 2911226        workflowDefinition.Outputs = workflow.Outputs;
 2911227        workflowDefinition.Outcomes = workflow.Outcomes;
 2911228        workflowDefinition.StringData = stringData;
 2911229        workflowDefinition.OriginalSource = originalSource;
 2911230        workflowDefinition.CreatedAt = workflow.WorkflowMetadata.CreatedAt == default ? _systemClock.UtcNow : workflow.W
 2911231        workflowDefinition.Options = workflow.Options;
 2911232        workflowDefinition.ProviderName = materializedWorkflow.ProviderName;
 2911233        workflowDefinition.MaterializerContext = materializerContextJson;
 2911234        workflowDefinition.MaterializerName = materializedWorkflow.MaterializerName;
 235
 2919236        if (existingDefinitionVersion is null && workflowDefinitionsToSave.Any(w => w.Id == workflowDefinition.Id))
 237        {
 2238            _logger.LogInformation("Workflow with ID {WorkflowId} already exists", workflowDefinition.Id);
 2239            return workflowDefinition;
 240        }
 241
 2909242        workflowDefinitionsToSave.Add(workflowDefinition);
 243
 5824244        var duplicates = workflowDefinitionsToSave.GroupBy(wd => wd.Id)
 2915245            .Where(g => g.Count() > 1)
 0246            .Select(g => g.Key)
 2909247            .ToList();
 248
 2909249        if (duplicates.Any())
 250        {
 0251            throw new Exception($"Unable to update WorkflowDefinition with ids {string.Join(',', duplicates)} multiple t
 252        }
 253
 2909254        await _workflowDefinitionStore.SaveManyAsync(workflowDefinitionsToSave, cancellationToken);
 2909255        return workflowDefinition;
 256
 257        async Task UpdateIsLatest()
 258        {
 259            // Always try to update the IsLatest property based on the VersionNumber
 260
 261            // Reset current latest definitions.
 2911262            var filter = new WorkflowDefinitionFilter
 2911263            {
 2911264                DefinitionId = definitionId,
 2911265                VersionOptions = VersionOptions.Latest
 2911266            };
 2911267            var latestWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationToken)).To
 268
 269            // If the latest definitions contains definitions with the same ID then we need to replace them with the lat
 2911270            SyncExistingCopies(latestWorkflowDefinitions, workflowDefinitionsToSave);
 271
 11304272            foreach (var latestWorkflowDefinition in latestWorkflowDefinitions)
 273            {
 2741274                if (latestWorkflowDefinition.Version > workflow.Identity.Version)
 275                {
 36276                    _logger.LogWarning("A more recent version of the workflow has been found, overwriting the IsLatest p
 36277                    existingWorkflowLatest = true;
 36278                    continue;
 279                }
 280
 2705281                latestWorkflowDefinition.IsLatest = false;
 2705282                workflowDefinitionsToSave.Add(latestWorkflowDefinition);
 283            }
 2911284        }
 285
 286        async Task UpdateIsPublished()
 287        {
 288            // If the workflow being added is configured to be the published version, then we need to reset the current 
 2911289            if (workflow.Publication.IsPublished)
 290            {
 291                // Reset current published definitions.
 2872292                var filter = new WorkflowDefinitionFilter
 2872293                {
 2872294                    DefinitionId = definitionId,
 2872295                    VersionOptions = VersionOptions.Published
 2872296                };
 2872297                var publishedWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationTok
 298
 299                // If the published workflow definitions contains definitions with the same ID as definitions in the lat
 2872300                SyncExistingCopies(publishedWorkflowDefinitions, workflowDefinitionsToSave);
 301
 11118302                foreach (var publishedWorkflowDefinition in publishedWorkflowDefinitions)
 303                {
 2687304                    if (publishedWorkflowDefinition.Version > workflow.Identity.Version)
 305                    {
 18306                        _logger.LogWarning("A more recent version of the workflow has been found to be published, overwr
 18307                        existingWorkflowPublished = true;
 18308                        continue;
 309                    }
 310
 2669311                    publishedWorkflowDefinition.IsPublished = false;
 2669312                    workflowDefinitionsToSave.Add(publishedWorkflowDefinition);
 313                }
 314            }
 2911315        }
 2911316    }
 317
 318    /// <summary>
 319    /// Syncs the items in the primary list with existing items in the secondary list, even when the object instances ar
 320    /// </summary>
 321    private void SyncExistingCopies(List<WorkflowDefinition> primary, HashSet<WorkflowDefinition> secondary)
 322    {
 11149323        var ids = secondary.Select(x => x.Id).Distinct().ToList();
 11161324        var latestWorkflowDefinitions = primary.Where(x => ids.Contains(x.Id)).ToList();
 11161325        primary.RemoveAll(x => latestWorkflowDefinitions.Contains(x));
 11149326        primary.AddRange(secondary.Where(x => ids.Contains(x.Id)));
 5783327    }
 328}