< Summary

Information
Class: Elsa.Workflows.Runtime.DefaultWorkflowDefinitionStorePopulator
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowDefinitionStorePopulator.cs
Line coverage
95%
Covered lines: 159
Uncovered lines: 8
Coverable lines: 167
Total lines: 327
Line coverage: 95.2%
Branch coverage
95%
Covered branches: 40
Total branches: 42
Branch coverage: 95.2%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
PopulateStoreAsync(...)100%11100%
PopulateStoreAsync()100%1010100%
AddAsync(...)100%11100%
AddAsync()100%11100%
<AddAsync()100%44100%
AssignIdentities()100%11100%
AddOrUpdateAsync()100%1145.45%
AddOrUpdateCoreAsync()83.33%242495.31%
UpdateIsLatest()100%44100%
UpdateIsPublished()100%66100%
SyncExistingCopies(...)100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/DefaultWorkflowDefinitionStorePopulator.cs

#LineLine coverage
 1using Elsa.Common;
 2using Elsa.Common.Models;
 3using Elsa.Common.Multitenancy;
 4using Elsa.Workflows.Activities;
 5using Elsa.Workflows.Management;
 6using Elsa.Workflows.Management.Entities;
 7using Elsa.Workflows.Management.Filters;
 8using Microsoft.Extensions.Logging;
 9using Open.Linq.AsyncExtensions;
 10
 11namespace Elsa.Workflows.Runtime;
 12
 13/// <inheritdoc />
 14public class DefaultWorkflowDefinitionStorePopulator : IWorkflowDefinitionStorePopulator
 15{
 16    private readonly Func<IEnumerable<IWorkflowsProvider>> _workflowDefinitionProviders;
 17    private readonly ITriggerIndexer _triggerIndexer;
 18    private readonly IWorkflowDefinitionStore _workflowDefinitionStore;
 19    private readonly IActivitySerializer _activitySerializer;
 20    private readonly IPayloadSerializer _payloadSerializer;
 21    private readonly ISystemClock _systemClock;
 22    private readonly IIdentityGraphService _identityGraphService;
 23    private readonly ITenantAccessor _tenantAccessor;
 24    private readonly ILogger<DefaultWorkflowDefinitionStorePopulator> _logger;
 19725    private readonly SemaphoreSlim _semaphore = new(1, 1);
 26
 27    /// <summary>
 28    /// Initializes a new instance of the <see cref="DefaultWorkflowDefinitionStorePopulator"/> class.
 29    /// </summary>
 19730    public DefaultWorkflowDefinitionStorePopulator(
 19731        Func<IEnumerable<IWorkflowsProvider>> workflowDefinitionProviders,
 19732        ITriggerIndexer triggerIndexer,
 19733        IWorkflowDefinitionStore workflowDefinitionStore,
 19734        IActivitySerializer activitySerializer,
 19735        IPayloadSerializer payloadSerializer,
 19736        ISystemClock systemClock,
 19737        IIdentityGraphService identityGraphService,
 19738        ITenantAccessor tenantAccessor,
 19739        ILogger<DefaultWorkflowDefinitionStorePopulator> logger)
 40    {
 19741        _workflowDefinitionProviders = workflowDefinitionProviders;
 19742        _triggerIndexer = triggerIndexer;
 19743        _workflowDefinitionStore = workflowDefinitionStore;
 19744        _activitySerializer = activitySerializer;
 19745        _payloadSerializer = payloadSerializer;
 19746        _systemClock = systemClock;
 19747        _identityGraphService = identityGraphService;
 19748        _tenantAccessor = tenantAccessor;
 19749        _logger = logger;
 19750    }
 51
 52    /// <inheritdoc />
 53    public Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(CancellationToken cancellationToken = default)
 54    {
 755        return PopulateStoreAsync(true, cancellationToken);
 56    }
 57
 58    /// <inheritdoc />
 59    public async Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(bool indexTriggers, CancellationToken cancella
 60    {
 35561        var providers = _workflowDefinitionProviders();
 35562        var workflowDefinitions = new List<WorkflowDefinition>();
 35563        var currentTenantId = _tenantAccessor.TenantId;
 64
 162465        foreach (var provider in providers)
 66        {
 45767            var results = await provider.GetWorkflowsAsync(cancellationToken).AsTask().ToList();
 68
 778069            foreach (var result in results)
 70            {
 71                // Normalize tenant IDs for comparison (null becomes empty string)
 343372                var definitionTenantId = result.Workflow.Identity.TenantId.NormalizeTenantId();
 73
 74                // Only import workflows belonging to the current tenant or tenant-agnostic workflows (TenantId = "*").
 343375                if (definitionTenantId != currentTenantId && definitionTenantId != Tenant.AgnosticTenantId)
 76                {
 49577                    _logger.LogDebug(
 49578                        "Skipping adding workflow {WorkflowId} from provider {Provider} because it belongs to tenant '{W
 49579                        result.Workflow.Identity.DefinitionId,
 49580                        provider.Name,
 49581                        result.Workflow.Identity.TenantId,
 49582                        currentTenantId);
 49583                    continue;
 84                }
 85
 293886                var addResult = await AddAsync(result, indexTriggers, cancellationToken);
 587687                addResult.OnSuccess(workflowDefinition => workflowDefinitions.Add(workflowDefinition));
 88            }
 45789        }
 90
 35591        return workflowDefinitions;
 35592    }
 93
 94    /// <inheritdoc />
 95    public Task<Result<WorkflowDefinition>> AddAsync(MaterializedWorkflow materializedWorkflow, CancellationToken cancel
 96    {
 1397        return AddAsync(materializedWorkflow, true, cancellationToken);
 98    }
 99
 100    /// <inheritdoc />
 101    public async Task<Result<WorkflowDefinition>> AddAsync(MaterializedWorkflow materializedWorkflow, bool indexTriggers
 102    {
 2951103        await AssignIdentities(materializedWorkflow.Workflow, cancellationToken);
 2951104        var result = await AddOrUpdateAsync(materializedWorkflow, cancellationToken);
 105
 2951106        return await result.OnSuccessAsync(async workflowDefinition =>
 2951107        {
 2951108            if (indexTriggers && workflowDefinition.IsPublished)
 1454109                await _triggerIndexer.IndexTriggersAsync(workflowDefinition, cancellationToken);
 5902110        });
 2951111    }
 112
 113    private async Task AssignIdentities(Workflow workflow, CancellationToken cancellationToken)
 114    {
 2951115        await _identityGraphService.AssignIdentitiesAsync(workflow, cancellationToken);
 2951116    }
 117
 118    private async Task<Result<WorkflowDefinition>> AddOrUpdateAsync(MaterializedWorkflow materializedWorkflow, Cancellat
 119    {
 2951120        await _semaphore.WaitAsync(cancellationToken);
 121
 122        try
 123        {
 2951124            var workflowDefinition = await AddOrUpdateCoreAsync(materializedWorkflow, cancellationToken);
 2951125            return Result.Success(workflowDefinition);
 126        }
 0127        catch (Exception e)
 128        {
 0129            _logger.LogError(e, "An error occurred while populating the workflow definition store for workflow {Workflow
 0130                materializedWorkflow.Workflow.Identity.DefinitionId,
 0131                materializedWorkflow.Workflow.WorkflowMetadata.Name,
 0132                materializedWorkflow.Workflow.Identity.Version);
 133
 0134            return Result.Failure<WorkflowDefinition>(e);
 135        }
 136        finally
 137        {
 2951138            _semaphore.Release();
 139        }
 2951140    }
 141
 142    private async Task<WorkflowDefinition> AddOrUpdateCoreAsync(MaterializedWorkflow materializedWorkflow, CancellationT
 143    {
 2951144        var workflow = materializedWorkflow.Workflow;
 2951145        var definitionId = workflow.Identity.DefinitionId;
 146
 2951147        var existingWorkflowLatest = false;
 2951148        var existingWorkflowPublished = false;
 149
 150        // Serialize materializer context.
 2951151        var materializerContext = materializedWorkflow.MaterializerContext;
 2951152        var materializerContextJson = materializerContext != null ? _payloadSerializer.Serialize(materializerContext) : 
 153
 154        // Determine StringData and OriginalSource based on what's provided
 155        string? stringData;
 2951156        var originalSource = materializedWorkflow.OriginalSource;
 157
 2951158        if (originalSource != null)
 159        {
 160            // NEW WAY: OriginalSource is provided
 161            // For JSON workflows, we still need to populate StringData with the serialized root for backwards compatibi
 616162            stringData = materializedWorkflow.MaterializerName == "Json"
 616163                ? _activitySerializer.Serialize(workflow.Root)
 616164                :
 616165                // For new formats (ElsaScript, YAML, etc.), only OriginalSource is needed
 616166                // StringData can be null as these materializers only use OriginalSource
 616167                null;
 168        }
 169        else
 170        {
 171            // OLD WAY: No OriginalSource provided (backwards compatibility for existing workflows)
 172            // Serialize the workflow root as before
 2335173            stringData = _activitySerializer.Serialize(workflow.Root);
 174        }
 175
 176        // Check if there's already a workflow definition stored with this definition ID and version.
 2951177        var specificVersionFilter = new WorkflowDefinitionFilter
 2951178        {
 2951179            DefinitionId = definitionId,
 2951180            VersionOptions = VersionOptions.SpecificVersion(workflow.Identity.Version)
 2951181        };
 182
 2951183        var existingDefinitionVersion = await _workflowDefinitionStore.FindAsync(specificVersionFilter, cancellationToke
 184
 185        // Set up a list to collect all workflow definitions to be persisted.
 2951186        var workflowDefinitionsToSave = new HashSet<WorkflowDefinition>();
 187
 2951188        if (existingDefinitionVersion != null)
 189        {
 2734190            workflowDefinitionsToSave.Add(existingDefinitionVersion);
 191
 2734192            if (existingDefinitionVersion.Id != workflow.Identity.Id)
 193            {
 194                // It's possible that the imported workflow definition has a different ID than the existing one in the s
 195                // In a future update, we might store this discrepancy in a "troubleshooting" table and provide tooling 
 196                // See https://github.com/elsa-workflows/elsa-core/issues/5540
 2197                _logger.LogWarning("Workflow with ID {WorkflowId} already exists with a different ID {ExistingWorkflowId
 198            }
 199        }
 200
 2951201        await UpdateIsLatest();
 2951202        await UpdateIsPublished();
 203
 204        // Determine the tenant ID for the workflow definition
 205        // If the workflow has no tenant ID, use the current tenant (normalized to handle null -> "*")
 2951206        var workflowTenantId = workflow.Identity.TenantId ?? _tenantAccessor.TenantId;
 207
 2951208        var workflowDefinition = existingDefinitionVersion ?? new WorkflowDefinition
 2951209        {
 2951210            DefinitionId = workflow.Identity.DefinitionId,
 2951211            Id = workflow.Identity.Id,
 2951212            Version = workflow.Identity.Version,
 2951213            TenantId = workflowTenantId,
 2951214        };
 215
 2951216        workflowDefinition.Description = workflow.WorkflowMetadata.Description;
 2951217        workflowDefinition.Name = workflow.WorkflowMetadata.Name;
 2951218        workflowDefinition.ToolVersion = workflow.WorkflowMetadata.ToolVersion;
 2951219        workflowDefinition.IsLatest = !existingWorkflowLatest;
 2951220        workflowDefinition.IsPublished = !existingWorkflowPublished && workflow.Publication.IsPublished;
 2951221        workflowDefinition.IsReadonly = workflow.IsReadonly;
 2951222        workflowDefinition.IsSystem = workflow.IsSystem;
 2951223        workflowDefinition.CustomProperties = workflow.CustomProperties;
 2951224        workflowDefinition.Variables = workflow.Variables;
 2951225        workflowDefinition.Inputs = workflow.Inputs;
 2951226        workflowDefinition.Outputs = workflow.Outputs;
 2951227        workflowDefinition.Outcomes = workflow.Outcomes;
 2951228        workflowDefinition.StringData = stringData;
 2951229        workflowDefinition.OriginalSource = originalSource;
 2951230        workflowDefinition.CreatedAt = workflow.WorkflowMetadata.CreatedAt == default ? _systemClock.UtcNow : workflow.W
 2951231        workflowDefinition.Options = workflow.Options;
 2951232        workflowDefinition.ProviderName = materializedWorkflow.ProviderName;
 2951233        workflowDefinition.MaterializerContext = materializerContextJson;
 2951234        workflowDefinition.MaterializerName = materializedWorkflow.MaterializerName;
 235
 2960236        if (existingDefinitionVersion is null && workflowDefinitionsToSave.Any(w => w.Id == workflowDefinition.Id))
 237        {
 2238            _logger.LogInformation("Workflow with ID {WorkflowId} already exists", workflowDefinition.Id);
 2239            return workflowDefinition;
 240        }
 241
 2949242        workflowDefinitionsToSave.Add(workflowDefinition);
 243
 5905244        var duplicates = workflowDefinitionsToSave.GroupBy(wd => wd.Id)
 2956245            .Where(g => g.Count() > 1)
 0246            .Select(g => g.Key)
 2949247            .ToList();
 248
 2949249        if (duplicates.Any())
 250        {
 0251            throw new Exception($"Unable to update WorkflowDefinition with ids {string.Join(',', duplicates)} multiple t
 252        }
 253
 2949254        await _workflowDefinitionStore.SaveManyAsync(workflowDefinitionsToSave, cancellationToken);
 2949255        return workflowDefinition;
 256
 257        async Task UpdateIsLatest()
 258        {
 259            // Always try to update the IsLatest property based on the VersionNumber
 260
 261            // Reset current latest definitions.
 2951262            var filter = new WorkflowDefinitionFilter
 2951263            {
 2951264                DefinitionId = definitionId,
 2951265                VersionOptions = VersionOptions.Latest
 2951266            };
 2951267            var latestWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationToken)).To
 268
 269            // If the latest definitions contains definitions with the same ID then we need to replace them with the lat
 2951270            SyncExistingCopies(latestWorkflowDefinitions, workflowDefinitionsToSave);
 271
 11458272            foreach (var latestWorkflowDefinition in latestWorkflowDefinitions)
 273            {
 2778274                if (latestWorkflowDefinition.Version > workflow.Identity.Version)
 275                {
 35276                    _logger.LogWarning("A more recent version of the workflow has been found, overwriting the IsLatest p
 35277                    existingWorkflowLatest = true;
 35278                    continue;
 279                }
 280
 2743281                latestWorkflowDefinition.IsLatest = false;
 2743282                workflowDefinitionsToSave.Add(latestWorkflowDefinition);
 283            }
 2951284        }
 285
 286        async Task UpdateIsPublished()
 287        {
 288            // If the workflow being added is configured to be the published version, then we need to reset the current 
 2951289            if (workflow.Publication.IsPublished)
 290            {
 291                // Reset current published definitions.
 2912292                var filter = new WorkflowDefinitionFilter
 2912293                {
 2912294                    DefinitionId = definitionId,
 2912295                    VersionOptions = VersionOptions.Published
 2912296                };
 2912297                var publishedWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationTok
 298
 299                // If the published workflow definitions contains definitions with the same ID as definitions in the lat
 2912300                SyncExistingCopies(publishedWorkflowDefinitions, workflowDefinitionsToSave);
 301
 11274302                foreach (var publishedWorkflowDefinition in publishedWorkflowDefinitions)
 303                {
 2725304                    if (publishedWorkflowDefinition.Version > workflow.Identity.Version)
 305                    {
 19306                        _logger.LogWarning("A more recent version of the workflow has been found to be published, overwr
 19307                        existingWorkflowPublished = true;
 19308                        continue;
 309                    }
 310
 2706311                    publishedWorkflowDefinition.IsPublished = false;
 2706312                    workflowDefinitionsToSave.Add(publishedWorkflowDefinition);
 313                }
 314            }
 2951315        }
 2951316    }
 317
 318    /// <summary>
 319    /// Syncs the items in the primary list with existing items in the secondary list, even when the object instances ar
 320    /// </summary>
 321    private void SyncExistingCopies(List<WorkflowDefinition> primary, HashSet<WorkflowDefinition> secondary)
 322    {
 11303323        var ids = secondary.Select(x => x.Id).ToHashSet();
 11315324        primary.RemoveAll(x => ids.Contains(x.Id));
 5863325        primary.AddRange(secondary);
 5863326    }
 327}