| | | 1 | | using Elsa.Common; |
| | | 2 | | using Elsa.Common.Models; |
| | | 3 | | using Elsa.Common.Multitenancy; |
| | | 4 | | using Elsa.Workflows.Activities; |
| | | 5 | | using Elsa.Workflows.Management; |
| | | 6 | | using Elsa.Workflows.Management.Entities; |
| | | 7 | | using Elsa.Workflows.Management.Filters; |
| | | 8 | | using Microsoft.Extensions.Logging; |
| | | 9 | | using Open.Linq.AsyncExtensions; |
| | | 10 | | |
| | | 11 | | namespace Elsa.Workflows.Runtime; |
| | | 12 | | |
| | | 13 | | /// <inheritdoc /> |
| | | 14 | | public class DefaultWorkflowDefinitionStorePopulator : IWorkflowDefinitionStorePopulator |
| | | 15 | | { |
| | | 16 | | private readonly Func<IEnumerable<IWorkflowsProvider>> _workflowDefinitionProviders; |
| | | 17 | | private readonly ITriggerIndexer _triggerIndexer; |
| | | 18 | | private readonly IWorkflowDefinitionStore _workflowDefinitionStore; |
| | | 19 | | private readonly IActivitySerializer _activitySerializer; |
| | | 20 | | private readonly IPayloadSerializer _payloadSerializer; |
| | | 21 | | private readonly ISystemClock _systemClock; |
| | | 22 | | private readonly IIdentityGraphService _identityGraphService; |
| | | 23 | | private readonly ITenantAccessor _tenantAccessor; |
| | | 24 | | private readonly ILogger<DefaultWorkflowDefinitionStorePopulator> _logger; |
| | 211 | 25 | | private readonly SemaphoreSlim _semaphore = new(1, 1); |
| | | 26 | | |
| | | 27 | | /// <summary> |
| | | 28 | | /// Initializes a new instance of the <see cref="DefaultWorkflowDefinitionStorePopulator"/> class. |
| | | 29 | | /// </summary> |
| | 211 | 30 | | public DefaultWorkflowDefinitionStorePopulator( |
| | 211 | 31 | | Func<IEnumerable<IWorkflowsProvider>> workflowDefinitionProviders, |
| | 211 | 32 | | ITriggerIndexer triggerIndexer, |
| | 211 | 33 | | IWorkflowDefinitionStore workflowDefinitionStore, |
| | 211 | 34 | | IActivitySerializer activitySerializer, |
| | 211 | 35 | | IPayloadSerializer payloadSerializer, |
| | 211 | 36 | | ISystemClock systemClock, |
| | 211 | 37 | | IIdentityGraphService identityGraphService, |
| | 211 | 38 | | ITenantAccessor tenantAccessor, |
| | 211 | 39 | | ILogger<DefaultWorkflowDefinitionStorePopulator> logger) |
| | | 40 | | { |
| | 211 | 41 | | _workflowDefinitionProviders = workflowDefinitionProviders; |
| | 211 | 42 | | _triggerIndexer = triggerIndexer; |
| | 211 | 43 | | _workflowDefinitionStore = workflowDefinitionStore; |
| | 211 | 44 | | _activitySerializer = activitySerializer; |
| | 211 | 45 | | _payloadSerializer = payloadSerializer; |
| | 211 | 46 | | _systemClock = systemClock; |
| | 211 | 47 | | _identityGraphService = identityGraphService; |
| | 211 | 48 | | _tenantAccessor = tenantAccessor; |
| | 211 | 49 | | _logger = logger; |
| | 211 | 50 | | } |
| | | 51 | | |
| | | 52 | | /// <inheritdoc /> |
| | | 53 | | public Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(CancellationToken cancellationToken = default) |
| | | 54 | | { |
| | 7 | 55 | | return PopulateStoreAsync(true, cancellationToken); |
| | | 56 | | } |
| | | 57 | | |
| | | 58 | | /// <inheritdoc /> |
| | | 59 | | public async Task<IEnumerable<WorkflowDefinition>> PopulateStoreAsync(bool indexTriggers, CancellationToken cancella |
| | | 60 | | { |
| | 369 | 61 | | var providers = _workflowDefinitionProviders(); |
| | 369 | 62 | | var workflowDefinitions = new List<WorkflowDefinition>(); |
| | 369 | 63 | | var currentTenantId = (_tenantAccessor.Tenant?.Id).NormalizeTenantId(); |
| | | 64 | | |
| | 1784 | 65 | | foreach (var provider in providers) |
| | | 66 | | { |
| | 523 | 67 | | var results = await provider.GetWorkflowsAsync(cancellationToken).AsTask().ToList(); |
| | | 68 | | |
| | 12024 | 69 | | foreach (var result in results) |
| | | 70 | | { |
| | | 71 | | // Normalize tenant IDs for comparison (null becomes empty string) |
| | 5489 | 72 | | var definitionTenantId = result.Workflow.Identity.TenantId.NormalizeTenantId(); |
| | | 73 | | |
| | | 74 | | // Only import workflows belonging to the current tenant or tenant-agnostic workflows (TenantId = "*"). |
| | 5489 | 75 | | if (definitionTenantId != currentTenantId && definitionTenantId != Tenant.AgnosticTenantId) |
| | | 76 | | { |
| | 3585 | 77 | | _logger.LogDebug( |
| | 3585 | 78 | | "Skipping adding workflow {WorkflowId} from provider {Provider} because it belongs to tenant '{W |
| | 3585 | 79 | | result.Workflow.Identity.DefinitionId, |
| | 3585 | 80 | | provider.Name, |
| | 3585 | 81 | | result.Workflow.Identity.TenantId, |
| | 3585 | 82 | | currentTenantId); |
| | 3585 | 83 | | continue; |
| | | 84 | | } |
| | | 85 | | |
| | 1904 | 86 | | var workflowDefinition = await AddAsync(result, indexTriggers, cancellationToken); |
| | 1904 | 87 | | workflowDefinitions.Add(workflowDefinition); |
| | | 88 | | } |
| | 523 | 89 | | } |
| | | 90 | | |
| | 369 | 91 | | return workflowDefinitions; |
| | 369 | 92 | | } |
| | | 93 | | |
| | | 94 | | /// <inheritdoc /> |
| | | 95 | | public Task<WorkflowDefinition> AddAsync(MaterializedWorkflow materializedWorkflow, CancellationToken cancellationTo |
| | | 96 | | { |
| | 13 | 97 | | return AddAsync(materializedWorkflow, true, cancellationToken); |
| | | 98 | | } |
| | | 99 | | |
| | | 100 | | /// <inheritdoc /> |
| | | 101 | | public async Task<WorkflowDefinition> AddAsync(MaterializedWorkflow materializedWorkflow, bool indexTriggers, Cancel |
| | | 102 | | { |
| | 1917 | 103 | | await AssignIdentities(materializedWorkflow.Workflow, cancellationToken); |
| | 1917 | 104 | | var workflowDefinition = await AddOrUpdateAsync(materializedWorkflow, cancellationToken); |
| | | 105 | | |
| | 1917 | 106 | | if (indexTriggers && workflowDefinition.IsPublished) |
| | 1182 | 107 | | await _triggerIndexer.IndexTriggersAsync(workflowDefinition, cancellationToken); |
| | | 108 | | |
| | 1917 | 109 | | return workflowDefinition; |
| | 1917 | 110 | | } |
| | | 111 | | |
| | | 112 | | private async Task AssignIdentities(Workflow workflow, CancellationToken cancellationToken) |
| | | 113 | | { |
| | 1917 | 114 | | await _identityGraphService.AssignIdentitiesAsync(workflow, cancellationToken); |
| | 1917 | 115 | | } |
| | | 116 | | |
| | | 117 | | private async Task<WorkflowDefinition> AddOrUpdateAsync(MaterializedWorkflow materializedWorkflow, CancellationToken |
| | | 118 | | { |
| | 1917 | 119 | | await _semaphore.WaitAsync(cancellationToken); |
| | | 120 | | |
| | | 121 | | try |
| | | 122 | | { |
| | 1917 | 123 | | return await AddOrUpdateCoreAsync(materializedWorkflow, cancellationToken); |
| | | 124 | | } |
| | | 125 | | finally |
| | | 126 | | { |
| | 1917 | 127 | | _semaphore.Release(); |
| | | 128 | | } |
| | 1917 | 129 | | } |
| | | 130 | | |
| | | 131 | | private async Task<WorkflowDefinition> AddOrUpdateCoreAsync(MaterializedWorkflow materializedWorkflow, CancellationT |
| | | 132 | | { |
| | 1917 | 133 | | var workflow = materializedWorkflow.Workflow; |
| | 1917 | 134 | | var definitionId = workflow.Identity.DefinitionId; |
| | | 135 | | |
| | 1917 | 136 | | var existingWorkflowLatest = false; |
| | 1917 | 137 | | var existingWorkflowPublished = false; |
| | | 138 | | |
| | | 139 | | // Serialize materializer context. |
| | 1917 | 140 | | var materializerContext = materializedWorkflow.MaterializerContext; |
| | 1917 | 141 | | var materializerContextJson = materializerContext != null ? _payloadSerializer.Serialize(materializerContext) : |
| | | 142 | | |
| | | 143 | | // Determine StringData and OriginalSource based on what's provided |
| | | 144 | | string? stringData; |
| | 1917 | 145 | | var originalSource = materializedWorkflow.OriginalSource; |
| | | 146 | | |
| | 1917 | 147 | | if (originalSource != null) |
| | | 148 | | { |
| | | 149 | | // NEW WAY: OriginalSource is provided |
| | | 150 | | // For JSON workflows, we still need to populate StringData with the serialized root for backwards compatibi |
| | 604 | 151 | | stringData = materializedWorkflow.MaterializerName == "Json" |
| | 604 | 152 | | ? _activitySerializer.Serialize(workflow.Root) |
| | 604 | 153 | | : |
| | 604 | 154 | | // For new formats (ElsaScript, YAML, etc.), only OriginalSource is needed |
| | 604 | 155 | | // StringData can be null as these materializers only use OriginalSource |
| | 604 | 156 | | null; |
| | | 157 | | } |
| | | 158 | | else |
| | | 159 | | { |
| | | 160 | | // OLD WAY: No OriginalSource provided (backwards compatibility for existing workflows) |
| | | 161 | | // Serialize the workflow root as before |
| | 1313 | 162 | | stringData = _activitySerializer.Serialize(workflow.Root); |
| | | 163 | | } |
| | | 164 | | |
| | | 165 | | // Check if there's already a workflow definition stored with this definition ID and version. |
| | 1917 | 166 | | var specificVersionFilter = new WorkflowDefinitionFilter |
| | 1917 | 167 | | { |
| | 1917 | 168 | | DefinitionId = definitionId, |
| | 1917 | 169 | | VersionOptions = VersionOptions.SpecificVersion(workflow.Identity.Version) |
| | 1917 | 170 | | }; |
| | | 171 | | |
| | 1917 | 172 | | var existingDefinitionVersion = await _workflowDefinitionStore.FindAsync(specificVersionFilter, cancellationToke |
| | | 173 | | |
| | | 174 | | // Set up a list to collect all workflow definitions to be persisted. |
| | 1917 | 175 | | var workflowDefinitionsToSave = new HashSet<WorkflowDefinition>(); |
| | | 176 | | |
| | 1917 | 177 | | if (existingDefinitionVersion != null) |
| | | 178 | | { |
| | 1706 | 179 | | workflowDefinitionsToSave.Add(existingDefinitionVersion); |
| | | 180 | | |
| | 1706 | 181 | | if (existingDefinitionVersion.Id != workflow.Identity.Id) |
| | | 182 | | { |
| | | 183 | | // It's possible that the imported workflow definition has a different ID than the existing one in the s |
| | | 184 | | // In a future update, we might store this discrepancy in a "troubleshooting" table and provide tooling |
| | | 185 | | // See https://github.com/elsa-workflows/elsa-core/issues/5540 |
| | 2 | 186 | | _logger.LogWarning("Workflow with ID {WorkflowId} already exists with a different ID {ExistingWorkflowId |
| | | 187 | | } |
| | | 188 | | } |
| | | 189 | | |
| | 1917 | 190 | | await UpdateIsLatest(); |
| | 1917 | 191 | | await UpdateIsPublished(); |
| | | 192 | | |
| | | 193 | | // Determine the tenant ID for the workflow definition |
| | | 194 | | // If the workflow has no tenant ID, use the current tenant (normalized to handle null -> "") |
| | 1917 | 195 | | var workflowTenantId = workflow.Identity.TenantId ?? (_tenantAccessor.Tenant?.Id).NormalizeTenantId(); |
| | | 196 | | |
| | 1917 | 197 | | var workflowDefinition = existingDefinitionVersion ?? new WorkflowDefinition |
| | 1917 | 198 | | { |
| | 1917 | 199 | | DefinitionId = workflow.Identity.DefinitionId, |
| | 1917 | 200 | | Id = workflow.Identity.Id, |
| | 1917 | 201 | | Version = workflow.Identity.Version, |
| | 1917 | 202 | | TenantId = workflowTenantId, |
| | 1917 | 203 | | }; |
| | | 204 | | |
| | 1917 | 205 | | workflowDefinition.Description = workflow.WorkflowMetadata.Description; |
| | 1917 | 206 | | workflowDefinition.Name = workflow.WorkflowMetadata.Name; |
| | 1917 | 207 | | workflowDefinition.ToolVersion = workflow.WorkflowMetadata.ToolVersion; |
| | 1917 | 208 | | workflowDefinition.IsLatest = !existingWorkflowLatest; |
| | 1917 | 209 | | workflowDefinition.IsPublished = !existingWorkflowPublished && workflow.Publication.IsPublished; |
| | 1917 | 210 | | workflowDefinition.IsReadonly = workflow.IsReadonly; |
| | 1917 | 211 | | workflowDefinition.IsSystem = workflow.IsSystem; |
| | 1917 | 212 | | workflowDefinition.CustomProperties = workflow.CustomProperties; |
| | 1917 | 213 | | workflowDefinition.Variables = workflow.Variables; |
| | 1917 | 214 | | workflowDefinition.Inputs = workflow.Inputs; |
| | 1917 | 215 | | workflowDefinition.Outputs = workflow.Outputs; |
| | 1917 | 216 | | workflowDefinition.Outcomes = workflow.Outcomes; |
| | 1917 | 217 | | workflowDefinition.StringData = stringData; |
| | 1917 | 218 | | workflowDefinition.OriginalSource = originalSource; |
| | 1917 | 219 | | workflowDefinition.CreatedAt = workflow.WorkflowMetadata.CreatedAt == default ? _systemClock.UtcNow : workflow.W |
| | 1917 | 220 | | workflowDefinition.Options = workflow.Options; |
| | 1917 | 221 | | workflowDefinition.ProviderName = materializedWorkflow.ProviderName; |
| | 1917 | 222 | | workflowDefinition.MaterializerContext = materializerContextJson; |
| | 1917 | 223 | | workflowDefinition.MaterializerName = materializedWorkflow.MaterializerName; |
| | | 224 | | |
| | 1926 | 225 | | if (existingDefinitionVersion is null && workflowDefinitionsToSave.Any(w => w.Id == workflowDefinition.Id)) |
| | | 226 | | { |
| | 2 | 227 | | _logger.LogInformation("Workflow with ID {WorkflowId} already exists", workflowDefinition.Id); |
| | 2 | 228 | | return workflowDefinition; |
| | | 229 | | } |
| | | 230 | | |
| | 1915 | 231 | | workflowDefinitionsToSave.Add(workflowDefinition); |
| | | 232 | | |
| | 3837 | 233 | | var duplicates = workflowDefinitionsToSave.GroupBy(wd => wd.Id) |
| | 1922 | 234 | | .Where(g => g.Count() > 1) |
| | 0 | 235 | | .Select(g => g.Key) |
| | 1915 | 236 | | .ToList(); |
| | | 237 | | |
| | 1915 | 238 | | if (duplicates.Any()) |
| | | 239 | | { |
| | 0 | 240 | | throw new Exception($"Unable to update WorkflowDefinition with ids {string.Join(',', duplicates)} multiple t |
| | | 241 | | } |
| | | 242 | | |
| | 1915 | 243 | | await _workflowDefinitionStore.SaveManyAsync(workflowDefinitionsToSave, cancellationToken); |
| | 1915 | 244 | | return workflowDefinition; |
| | | 245 | | |
| | | 246 | | async Task UpdateIsLatest() |
| | | 247 | | { |
| | | 248 | | // Always try to update the IsLatest property based on the VersionNumber |
| | | 249 | | |
| | | 250 | | // Reset current latest definitions. |
| | 1917 | 251 | | var filter = new WorkflowDefinitionFilter |
| | 1917 | 252 | | { |
| | 1917 | 253 | | DefinitionId = definitionId, |
| | 1917 | 254 | | VersionOptions = VersionOptions.Latest |
| | 1917 | 255 | | }; |
| | 1917 | 256 | | var latestWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationToken)).To |
| | | 257 | | |
| | | 258 | | // If the latest definitions contains definitions with the same ID then we need to replace them with the lat |
| | 1917 | 259 | | SyncExistingCopies(latestWorkflowDefinitions, workflowDefinitionsToSave); |
| | | 260 | | |
| | 7320 | 261 | | foreach (var latestWorkflowDefinition in latestWorkflowDefinitions) |
| | | 262 | | { |
| | 1743 | 263 | | if (latestWorkflowDefinition.Version > workflow.Identity.Version) |
| | | 264 | | { |
| | 28 | 265 | | _logger.LogWarning("A more recent version of the workflow has been found, overwriting the IsLatest p |
| | 28 | 266 | | existingWorkflowLatest = true; |
| | 28 | 267 | | continue; |
| | | 268 | | } |
| | | 269 | | |
| | 1715 | 270 | | latestWorkflowDefinition.IsLatest = false; |
| | 1715 | 271 | | workflowDefinitionsToSave.Add(latestWorkflowDefinition); |
| | | 272 | | } |
| | 1917 | 273 | | } |
| | | 274 | | |
| | | 275 | | async Task UpdateIsPublished() |
| | | 276 | | { |
| | | 277 | | // If the workflow being added is configured to be the published version, then we need to reset the current |
| | 1917 | 278 | | if (workflow.Publication.IsPublished) |
| | | 279 | | { |
| | | 280 | | // Reset current published definitions. |
| | 1874 | 281 | | var filter = new WorkflowDefinitionFilter |
| | 1874 | 282 | | { |
| | 1874 | 283 | | DefinitionId = definitionId, |
| | 1874 | 284 | | VersionOptions = VersionOptions.Published |
| | 1874 | 285 | | }; |
| | 1874 | 286 | | var publishedWorkflowDefinitions = (await _workflowDefinitionStore.FindManyAsync(filter, cancellationTok |
| | | 287 | | |
| | | 288 | | // If the published workflow definitions contains definitions with the same ID as definitions in the lat |
| | 1874 | 289 | | SyncExistingCopies(publishedWorkflowDefinitions, workflowDefinitionsToSave); |
| | | 290 | | |
| | 7116 | 291 | | foreach (var publishedWorkflowDefinition in publishedWorkflowDefinitions) |
| | | 292 | | { |
| | 1684 | 293 | | if (publishedWorkflowDefinition.Version > workflow.Identity.Version) |
| | | 294 | | { |
| | 10 | 295 | | _logger.LogWarning("A more recent version of the workflow has been found to be published, overwr |
| | 10 | 296 | | existingWorkflowPublished = true; |
| | 10 | 297 | | continue; |
| | | 298 | | } |
| | | 299 | | |
| | 1674 | 300 | | publishedWorkflowDefinition.IsPublished = false; |
| | 1674 | 301 | | workflowDefinitionsToSave.Add(publishedWorkflowDefinition); |
| | | 302 | | } |
| | | 303 | | } |
| | 1917 | 304 | | } |
| | 1917 | 305 | | } |
| | | 306 | | |
| | | 307 | | /// <summary> |
| | | 308 | | /// Syncs the items in the primary list with existing items in the secondary list, even when the object instances ar |
| | | 309 | | /// </summary> |
| | | 310 | | private void SyncExistingCopies(List<WorkflowDefinition> primary, HashSet<WorkflowDefinition> secondary) |
| | | 311 | | { |
| | 7171 | 312 | | var ids = secondary.Select(x => x.Id).Distinct().ToList(); |
| | 7183 | 313 | | var latestWorkflowDefinitions = primary.Where(x => ids.Contains(x.Id)).ToList(); |
| | 7183 | 314 | | primary.RemoveAll(x => latestWorkflowDefinitions.Contains(x)); |
| | 7171 | 315 | | primary.AddRange(secondary.Where(x => ids.Contains(x.Id))); |
| | 3791 | 316 | | } |
| | | 317 | | } |