| | | 1 | | using Elsa.Common; |
| | | 2 | | using Elsa.Common.Models; |
| | | 3 | | using Elsa.Workflows.Activities; |
| | | 4 | | using Elsa.Workflows.Management; |
| | | 5 | | using Elsa.Workflows.Management.Entities; |
| | | 6 | | using Elsa.Workflows.Management.Filters; |
| | | 7 | | using Microsoft.Extensions.Logging; |
| | | 8 | | using Open.Linq.AsyncExtensions; |
| | | 9 | | |
| | | 10 | | namespace Elsa.Workflows.Runtime; |
| | | 11 | | |
| | | 12 | | /// <inheritdoc /> |
| | | 13 | | public class DefaultWorkflowDefinitionStorePopulator : IWorkflowDefinitionStorePopulator |
| | | 14 | | { |
| | | 15 | | private readonly Func<IEnumerable<IWorkflowsProvider>> _workflowDefinitionProviders; |
| | | 16 | | private readonly ITriggerIndexer _triggerIndexer; |
| | | 17 | | private readonly IWorkflowDefinitionStore _workflowDefinitionStore; |
| | | 18 | | private readonly IActivitySerializer _activitySerializer; |
| | | 19 | | private readonly IPayloadSerializer _payloadSerializer; |
| | | 20 | | private readonly ISystemClock _systemClock; |
| | | 21 | | private readonly IIdentityGraphService _identityGraphService; |
| | | 22 | | private readonly ILogger<DefaultWorkflowDefinitionStorePopulator> _logger; |
| | 152 | 23 | | private readonly SemaphoreSlim _semaphore = new(1, 1); |
| | | 24 | | |
| | | 25 | | /// <summary> |
| | | 26 | | /// Initializes a new instance of the <see cref="DefaultWorkflowDefinitionStorePopulator"/> class. |
| | | 27 | | /// </summary> |
| | 152 | 28 | | public DefaultWorkflowDefinitionStorePopulator( |
| | 152 | 29 | | Func<IEnumerable<IWorkflowsProvider>> workflowDefinitionProviders, |
| | 152 | 30 | | ITriggerIndexer triggerIndexer, |
| | 152 | 31 | | IWorkflowDefinitionStore workflowDefinitionStore, |
| | 152 | 32 | | IActivitySerializer activitySerializer, |
| | 152 | 33 | | IPayloadSerializer payloadSerializer, |
| | 152 | 34 | | ISystemClock systemClock, |
| | 152 | 35 | | IIdentityGraphService identityGraphService, |
| | 152 | 36 | | ILogger<DefaultWorkflowDefinitionStorePopulator> logger) |
| | | 37 | | { |
| | 152 | 38 | | _workflowDefinitionProviders = workflowDefinitionProviders; |
| | 152 | 39 | | _triggerIndexer = triggerIndexer; |
| | 152 | 40 | | _workflowDefinitionStore = workflowDefinitionStore; |
| | 152 | 41 | | _activitySerializer = activitySerializer; |
| | 152 | 42 | | _payloadSerializer = payloadSerializer; |
| | 152 | 43 | | _systemClock = systemClock; |
| | 152 | 44 | | _identityGraphService = identityGraphService; |
| | 152 | 45 | | _logger = logger; |
| | 152 | 46 | | } |
| | | 47 | | |
| | | 48 | | /// <inheritdoc /> |
| | | 49 | | public Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(CancellationToken cancellationToken = default) |
| | | 50 | | { |
| | 0 | 51 | | return PopulateStoreAsync(true, cancellationToken); |
| | | 52 | | } |
| | | 53 | | |
| | | 54 | | /// <inheritdoc /> |
| | | 55 | | public async Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(bool indexTriggers, CancellationToken cancella |
| | | 56 | | { |
| | 284 | 57 | | var providers = _workflowDefinitionProviders(); |
| | 284 | 58 | | var workflowDefinitions = new List<WorkflowDefinition>(); |
| | | 59 | | |
| | 1228 | 60 | | foreach (var provider in providers) |
| | | 61 | | { |
| | 330 | 62 | | var results = await provider.GetWorkflowsAsync(cancellationToken).AsTask().ToList(); |
| | | 63 | | |
| | 2216 | 64 | | foreach (var result in results) |
| | | 65 | | { |
| | 778 | 66 | | var workflowDefinition = await AddAsync(result, indexTriggers, cancellationToken); |
| | 778 | 67 | | workflowDefinitions.Add(workflowDefinition); |
| | | 68 | | } |
| | | 69 | | } |
| | | 70 | | |
| | 284 | 71 | | return workflowDefinitions; |
| | 284 | 72 | | } |
| | | 73 | | |
| | | 74 | | /// <inheritdoc /> |
| | | 75 | | public Task<WorkflowDefinition> AddAsync(MaterializedWorkflow materializedWorkflow, CancellationToken cancellationTo |
| | | 76 | | { |
| | 13 | 77 | | return AddAsync(materializedWorkflow, true, cancellationToken); |
| | | 78 | | } |
| | | 79 | | |
| | | 80 | | /// <inheritdoc /> |
| | | 81 | | public async Task<WorkflowDefinition> AddAsync(MaterializedWorkflow materializedWorkflow, bool indexTriggers, Cancel |
| | | 82 | | { |
| | 791 | 83 | | await AssignIdentities(materializedWorkflow.Workflow, cancellationToken); |
| | 791 | 84 | | var workflowDefinition = await AddOrUpdateAsync(materializedWorkflow, cancellationToken); |
| | | 85 | | |
| | 791 | 86 | | if (indexTriggers && workflowDefinition.IsPublished) |
| | 592 | 87 | | await _triggerIndexer.IndexTriggersAsync(workflowDefinition, cancellationToken); |
| | | 88 | | |
| | 791 | 89 | | return workflowDefinition; |
| | 791 | 90 | | } |
| | | 91 | | |
| | | 92 | | private async Task AssignIdentities(Workflow workflow, CancellationToken cancellationToken) |
| | | 93 | | { |
| | 791 | 94 | | await _identityGraphService.AssignIdentitiesAsync(workflow, cancellationToken); |
| | 791 | 95 | | } |
| | | 96 | | |
| | | 97 | | private async Task<WorkflowDefinition> AddOrUpdateAsync(MaterializedWorkflow materializedWorkflow, CancellationToken |
| | | 98 | | { |
| | 791 | 99 | | await _semaphore.WaitAsync(cancellationToken); |
| | | 100 | | |
| | | 101 | | try |
| | | 102 | | { |
| | 791 | 103 | | return await AddOrUpdateCoreAsync(materializedWorkflow, cancellationToken); |
| | | 104 | | } |
| | | 105 | | finally |
| | | 106 | | { |
| | 791 | 107 | | _semaphore.Release(); |
| | | 108 | | } |
| | 791 | 109 | | } |
| | | 110 | | |
| | | 111 | | private async Task<WorkflowDefinition> AddOrUpdateCoreAsync(MaterializedWorkflow materializedWorkflow, CancellationT |
| | | 112 | | { |
| | 791 | 113 | | var workflow = materializedWorkflow.Workflow; |
| | 791 | 114 | | var definitionId = workflow.Identity.DefinitionId; |
| | | 115 | | |
| | 791 | 116 | | var existingWorkflowLatest = false; |
| | 791 | 117 | | var existingWorkflowPublished = false; |
| | | 118 | | |
| | | 119 | | // Serialize materializer context. |
| | 791 | 120 | | var materializerContext = materializedWorkflow.MaterializerContext; |
| | 791 | 121 | | var materializerContextJson = materializerContext != null ? _payloadSerializer.Serialize(materializerContext) : |
| | | 122 | | |
| | | 123 | | // Determine StringData and OriginalSource based on what's provided |
| | | 124 | | string? stringData; |
| | 791 | 125 | | var originalSource = materializedWorkflow.OriginalSource; |
| | | 126 | | |
| | 791 | 127 | | if (originalSource != null) |
| | | 128 | | { |
| | | 129 | | // NEW WAY: OriginalSource is provided |
| | | 130 | | // For JSON workflows, we still need to populate StringData with the serialized root for backwards compatibi |
| | 352 | 131 | | stringData = materializedWorkflow.MaterializerName == "Json" ? _activitySerializer.Serialize(workflow.Root) |
| | 352 | 132 | | // For new formats (ElsaScript, YAML, etc.), only OriginalSource is needed |
| | 352 | 133 | | // StringData can be null as these materializers only use OriginalSource |
| | 352 | 134 | | null; |
| | | 135 | | } |
| | | 136 | | else |
| | | 137 | | { |
| | | 138 | | // OLD WAY: No OriginalSource provided (backwards compatibility for existing workflows) |
| | | 139 | | // Serialize the workflow root as before |
| | 439 | 140 | | stringData = _activitySerializer.Serialize(workflow.Root); |
| | | 141 | | } |
| | | 142 | | |
| | | 143 | | // Check if there's already a workflow definition stored with this definition ID and version. |
| | 791 | 144 | | var specificVersionFilter = new WorkflowDefinitionFilter |
| | 791 | 145 | | { |
| | 791 | 146 | | DefinitionId = definitionId, |
| | 791 | 147 | | VersionOptions = VersionOptions.SpecificVersion(workflow.Identity.Version) |
| | 791 | 148 | | }; |
| | | 149 | | |
| | 791 | 150 | | var existingDefinitionVersion = await _workflowDefinitionStore.FindAsync(specificVersionFilter, cancellationToke |
| | | 151 | | |
| | | 152 | | // Set up a list to collect all workflow definitions to be persisted. |
| | 791 | 153 | | var workflowDefinitionsToSave = new HashSet<WorkflowDefinition>(); |
| | | 154 | | |
| | 791 | 155 | | if (existingDefinitionVersion != null) |
| | | 156 | | { |
| | 600 | 157 | | workflowDefinitionsToSave.Add(existingDefinitionVersion); |
| | | 158 | | |
| | 600 | 159 | | if (existingDefinitionVersion.Id != workflow.Identity.Id) |
| | | 160 | | { |
| | | 161 | | // It's possible that the imported workflow definition has a different ID than the existing one in the s |
| | | 162 | | // In a future update, we might store this discrepancy in a "troubleshooting" table and provide tooling |
| | | 163 | | // See https://github.com/elsa-workflows/elsa-core/issues/5540 |
| | 2 | 164 | | _logger.LogWarning("Workflow with ID {WorkflowId} already exists with a different ID {ExistingWorkflowId |
| | | 165 | | } |
| | | 166 | | } |
| | | 167 | | |
| | 791 | 168 | | await UpdateIsLatest(); |
| | 791 | 169 | | await UpdateIsPublished(); |
| | | 170 | | |
| | 791 | 171 | | var workflowDefinition = existingDefinitionVersion ?? new WorkflowDefinition |
| | 791 | 172 | | { |
| | 791 | 173 | | DefinitionId = workflow.Identity.DefinitionId, |
| | 791 | 174 | | Id = workflow.Identity.Id, |
| | 791 | 175 | | Version = workflow.Identity.Version, |
| | 791 | 176 | | TenantId = workflow.Identity.TenantId, |
| | 791 | 177 | | }; |
| | | 178 | | |
| | 791 | 179 | | workflowDefinition.Description = workflow.WorkflowMetadata.Description; |
| | 791 | 180 | | workflowDefinition.Name = workflow.WorkflowMetadata.Name; |
| | 791 | 181 | | workflowDefinition.ToolVersion = workflow.WorkflowMetadata.ToolVersion; |
| | 791 | 182 | | workflowDefinition.IsLatest = !existingWorkflowLatest; |
| | 791 | 183 | | workflowDefinition.IsPublished = !existingWorkflowPublished && workflow.Publication.IsPublished; |
| | 791 | 184 | | workflowDefinition.IsReadonly = workflow.IsReadonly; |
| | 791 | 185 | | workflowDefinition.IsSystem = workflow.IsSystem; |
| | 791 | 186 | | workflowDefinition.CustomProperties = workflow.CustomProperties; |
| | 791 | 187 | | workflowDefinition.Variables = workflow.Variables; |
| | 791 | 188 | | workflowDefinition.Inputs = workflow.Inputs; |
| | 791 | 189 | | workflowDefinition.Outputs = workflow.Outputs; |
| | 791 | 190 | | workflowDefinition.Outcomes = workflow.Outcomes; |
| | 791 | 191 | | workflowDefinition.StringData = stringData; |
| | 791 | 192 | | workflowDefinition.OriginalSource = originalSource; |
| | 791 | 193 | | workflowDefinition.CreatedAt = workflow.WorkflowMetadata.CreatedAt == default ? _systemClock.UtcNow : workflow.W |
| | 791 | 194 | | workflowDefinition.Options = workflow.Options; |
| | 791 | 195 | | workflowDefinition.ProviderName = materializedWorkflow.ProviderName; |
| | 791 | 196 | | workflowDefinition.MaterializerContext = materializerContextJson; |
| | 791 | 197 | | workflowDefinition.MaterializerName = materializedWorkflow.MaterializerName; |
| | | 198 | | |
| | 800 | 199 | | if (existingDefinitionVersion is null && workflowDefinitionsToSave.Any(w => w.Id == workflowDefinition.Id)) |
| | | 200 | | { |
| | 2 | 201 | | _logger.LogInformation("Workflow with ID {WorkflowId} already exists", workflowDefinition.Id); |
| | 2 | 202 | | return workflowDefinition; |
| | | 203 | | } |
| | | 204 | | |
| | 789 | 205 | | workflowDefinitionsToSave.Add(workflowDefinition); |
| | | 206 | | |
| | 1585 | 207 | | var duplicates = workflowDefinitionsToSave.GroupBy(wd => wd.Id) |
| | 796 | 208 | | .Where(g => g.Count() > 1) |
| | 0 | 209 | | .Select(g => g.Key) |
| | 789 | 210 | | .ToList(); |
| | | 211 | | |
| | 789 | 212 | | if (duplicates.Any()) |
| | | 213 | | { |
| | 0 | 214 | | throw new Exception($"Unable to update WorkflowDefinition with ids {string.Join(',', duplicates)} multiple t |
| | | 215 | | } |
| | | 216 | | |
| | 789 | 217 | | await _workflowDefinitionStore.SaveManyAsync(workflowDefinitionsToSave, cancellationToken); |
| | 789 | 218 | | return workflowDefinition; |
| | | 219 | | |
| | | 220 | | async Task UpdateIsLatest() |
| | | 221 | | { |
| | | 222 | | // Always try to update the IsLatest property based on the VersionNumber |
| | | 223 | | |
| | | 224 | | // Reset current latest definitions. |
| | 791 | 225 | | var filter = new WorkflowDefinitionFilter |
| | 791 | 226 | | { |
| | 791 | 227 | | DefinitionId = definitionId, |
| | 791 | 228 | | VersionOptions = VersionOptions.Latest |
| | 791 | 229 | | }; |
| | 791 | 230 | | var latestWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationToken)).To |
| | | 231 | | |
| | | 232 | | // If the latest definitions contains definitions with the same ID then we need to replace them with the lat |
| | 791 | 233 | | SyncExistingCopies(latestWorkflowDefinitions, workflowDefinitionsToSave); |
| | | 234 | | |
| | 2832 | 235 | | foreach (var latestWorkflowDefinition in latestWorkflowDefinitions) |
| | | 236 | | { |
| | 625 | 237 | | if (latestWorkflowDefinition.Version > workflow.Identity.Version) |
| | | 238 | | { |
| | 16 | 239 | | _logger.LogWarning("A more recent version of the workflow has been found, overwriting the IsLatest p |
| | 16 | 240 | | existingWorkflowLatest = true; |
| | 16 | 241 | | continue; |
| | | 242 | | } |
| | | 243 | | |
| | 609 | 244 | | latestWorkflowDefinition.IsLatest = false; |
| | 609 | 245 | | workflowDefinitionsToSave.Add(latestWorkflowDefinition); |
| | | 246 | | } |
| | 791 | 247 | | } |
| | | 248 | | |
| | | 249 | | async Task UpdateIsPublished() |
| | | 250 | | { |
| | | 251 | | // If the workflow being added is configured to be the published version, then we need to reset the current |
| | 791 | 252 | | if (workflow.Publication.IsPublished) |
| | | 253 | | { |
| | | 254 | | // Reset current published definitions. |
| | 772 | 255 | | var filter = new WorkflowDefinitionFilter |
| | 772 | 256 | | { |
| | 772 | 257 | | DefinitionId = definitionId, |
| | 772 | 258 | | VersionOptions = VersionOptions.Published |
| | 772 | 259 | | }; |
| | 772 | 260 | | var publishedWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationTok |
| | | 261 | | |
| | | 262 | | // If the published workflow definitions contains definitions with the same ID as definitions in the lat |
| | 772 | 263 | | SyncExistingCopies(publishedWorkflowDefinitions, workflowDefinitionsToSave); |
| | | 264 | | |
| | 2748 | 265 | | foreach (var publishedWorkflowDefinition in publishedWorkflowDefinitions) |
| | | 266 | | { |
| | 602 | 267 | | if (publishedWorkflowDefinition.Version > workflow.Identity.Version) |
| | | 268 | | { |
| | 10 | 269 | | _logger.LogWarning("A more recent version of the workflow has been found to be published, overwr |
| | 10 | 270 | | existingWorkflowPublished = true; |
| | 10 | 271 | | continue; |
| | | 272 | | } |
| | | 273 | | |
| | 592 | 274 | | publishedWorkflowDefinition.IsPublished = false; |
| | 592 | 275 | | workflowDefinitionsToSave.Add(publishedWorkflowDefinition); |
| | | 276 | | } |
| | | 277 | | } |
| | 791 | 278 | | } |
| | 791 | 279 | | } |
| | | 280 | | |
| | | 281 | | /// <summary> |
| | | 282 | | /// Syncs the items in the primary list with existing items in the secondary list, even when the object instances ar |
| | | 283 | | /// </summary> |
| | | 284 | | private void SyncExistingCopies(List<WorkflowDefinition> primary, HashSet<WorkflowDefinition> secondary) |
| | | 285 | | { |
| | 2755 | 286 | | var ids = secondary.Select(x => x.Id).Distinct().ToList(); |
| | 2767 | 287 | | var latestWorkflowDefinitions = primary.Where(x => ids.Contains(x.Id)).ToList(); |
| | 2767 | 288 | | primary.RemoveAll(x => latestWorkflowDefinitions.Contains(x)); |
| | 2755 | 289 | | primary.AddRange(secondary.Where(x => ids.Contains(x.Id))); |
| | 1563 | 290 | | } |
| | | 291 | | } |