< Summary

Information
Class: Elsa.Workflows.Runtime.TriggerIndexer
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/TriggerIndexer.cs
Line coverage
93%
Covered lines: 109
Uncovered lines: 7
Coverable lines: 116
Total lines: 233
Line coverage: 93.9%
Branch coverage
80%
Covered branches: 16
Total branches: 20
Branch coverage: 80%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
DeleteTriggersAsync()87.5%8891.66%
IndexTriggersAsync()100%11100%
IndexTriggersAsync()100%11100%
IndexTriggersInternalAsync()50%22100%
GetTriggersAsync()100%11100%
DeleteTriggersAsync()100%11100%
GetCurrentTriggersAsync()100%11100%
GetTriggersInternalAsync()100%44100%
CreateWorkflowTriggersAsync()50%4485.71%
TryGetTriggerDataAsync()100%1133.33%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/TriggerIndexer.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using Elsa.Common.DistributedHosting;
 3using Elsa.Expressions.Contracts;
 4using Elsa.Extensions;
 5using Elsa.Mediator.Contracts;
 6using Elsa.Workflows.Activities;
 7using Elsa.Workflows.Helpers;
 8using Elsa.Workflows.Management;
 9using Elsa.Workflows.Management.Entities;
 10using Elsa.Workflows.Runtime.Comparers;
 11using Elsa.Workflows.Runtime.Entities;
 12using Elsa.Workflows.Runtime.Filters;
 13using Elsa.Workflows.Runtime.Notifications;
 14using Medallion.Threading;
 15using Microsoft.Extensions.Logging;
 16using Microsoft.Extensions.Options;
 17using Open.Linq.AsyncExtensions;
 18using Elsa.Common.Serialization;
 19
 20namespace Elsa.Workflows.Runtime;
 21
 22/// <inheritdoc />
 23public class TriggerIndexer : ITriggerIndexer
 24{
 25    private readonly IActivityVisitor _activityVisitor;
 26    private readonly IWorkflowDefinitionService _workflowDefinitionService;
 27    private readonly IExpressionEvaluator _expressionEvaluator;
 28    private readonly IIdentityGenerator _identityGenerator;
 29    private readonly ITriggerStore _triggerStore;
 30    private readonly IActivityRegistry _activityRegistry;
 31    private readonly INotificationSender _notificationSender;
 32    private readonly IServiceProvider _serviceProvider;
 33    private readonly IStimulusHasher _hasher;
 34    private readonly IDistributedLockProvider _distributedLockProvider;
 35    private readonly WorkflowTriggerEqualityComparer _triggerEqualityComparer;
 36    private readonly DistributedLockingOptions _lockingOptions;
 37    private readonly ILogger _logger;
 38
 39    /// <summary>
 40    /// Constructor.
 41    /// </summary>
 64742    public TriggerIndexer(
 64743        IActivityVisitor activityVisitor,
 64744        IWorkflowDefinitionService workflowDefinitionService,
 64745        IExpressionEvaluator expressionEvaluator,
 64746        IIdentityGenerator identityGenerator,
 64747        ITriggerStore triggerStore,
 64748        IActivityRegistry activityRegistry,
 64749        INotificationSender notificationSender,
 64750        IServiceProvider serviceProvider,
 64751        IStimulusHasher hasher,
 64752        IDistributedLockProvider distributedLockProvider,
 64753        ISerializationTypeRegistry workflowJsonTypeRegistry,
 64754        IOptions<DistributedLockingOptions> lockingOptions,
 64755        ILogger<TriggerIndexer> logger)
 56    {
 64757        _activityVisitor = activityVisitor;
 64758        _expressionEvaluator = expressionEvaluator;
 64759        _identityGenerator = identityGenerator;
 64760        _triggerStore = triggerStore;
 64761        _activityRegistry = activityRegistry;
 64762        _notificationSender = notificationSender;
 64763        _serviceProvider = serviceProvider;
 64764        _hasher = hasher;
 64765        _distributedLockProvider = distributedLockProvider;
 64766        _triggerEqualityComparer = new WorkflowTriggerEqualityComparer(workflowJsonTypeRegistry);
 64767        _lockingOptions = lockingOptions.Value;
 64768        _logger = logger;
 64769        _workflowDefinitionService = workflowDefinitionService;
 64770    }
 71
 72    /// <inheritdoc />
 73    public async Task DeleteTriggersAsync(TriggerFilter filter, CancellationToken cancellationToken = default)
 74    {
 675        var triggers = (await _triggerStore.FindManyAsync(filter, cancellationToken)).ToList();
 1376        var workflowDefinitionVersionIds = triggers.Select(x => x.WorkflowDefinitionVersionId).Distinct().ToList();
 77
 2678        foreach (var workflowDefinitionVersionId in workflowDefinitionVersionIds)
 79        {
 80            try
 81            {
 782                var workflowGraph = await _workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionVersionId,
 83
 384                if (workflowGraph == null)
 085                    continue;
 86
 387                await DeleteTriggersAsync(workflowGraph.Workflow, cancellationToken);
 388            }
 489            catch (Exception ex)
 90            {
 491                _logger.LogWarning(ex, "Failed to load workflow graph for workflow definition version {WorkflowDefinitio
 492            }
 793        }
 694    }
 95
 96    /// <inheritdoc />
 97    public async Task<IndexedWorkflowTriggers> IndexTriggersAsync(WorkflowDefinition definition, CancellationToken cance
 98    {
 150299        var workflowGraph = await _workflowDefinitionService.MaterializeWorkflowAsync(definition, cancellationToken);
 1502100        return await IndexTriggersAsync(workflowGraph.Workflow, cancellationToken);
 1502101    }
 102
 103    /// <inheritdoc />
 104    public async Task<IndexedWorkflowTriggers> IndexTriggersAsync(Workflow workflow, CancellationToken cancellationToken
 105    {
 106        // Use distributed lock to prevent concurrent trigger indexing race conditions
 1502107        var lockResource = $"trigger-indexer:{workflow.Identity.DefinitionId}";
 1502108        await using (await _distributedLockProvider.AcquireLockAsync(lockResource, _lockingOptions.LockAcquisitionTimeou
 109        {
 1502110            return await IndexTriggersInternalAsync(workflow, cancellationToken);
 111        }
 1502112    }
 113
 114    private async Task<IndexedWorkflowTriggers> IndexTriggersInternalAsync(Workflow workflow, CancellationToken cancella
 115    {
 116        // Get current triggers
 1502117        var currentTriggers = await GetCurrentTriggersAsync(workflow.Identity.DefinitionId, cancellationToken).ToList();
 118
 119        // Collect new triggers **if the workflow is published**.
 1502120        var newTriggers = workflow.Publication.IsPublished
 1502121            ? await GetTriggersInternalAsync(workflow, cancellationToken).ToListAsync(cancellationToken)
 1502122            : new(0);
 123
 124        // Diff triggers.
 1502125        var diff = Diff.For(currentTriggers, newTriggers, _triggerEqualityComparer);
 126
 127        // Replace triggers for the specified workflow.
 1502128        await _triggerStore.ReplaceAsync(diff.Removed, diff.Added, cancellationToken);
 129
 1502130        var indexedWorkflow = new IndexedWorkflowTriggers(workflow, diff.Added, diff.Removed, diff.Unchanged);
 131
 132        // Publish event.
 1502133        await _notificationSender.SendAsync(new WorkflowTriggersIndexed(indexedWorkflow), cancellationToken);
 1502134        return indexedWorkflow;
 1502135    }
 136
 137    /// <inheritdoc />
 138    public async Task<IEnumerable<StoredTrigger>> GetTriggersAsync(Workflow workflow, CancellationToken cancellationToke
 139    {
 54140        return await GetTriggersInternalAsync(workflow, cancellationToken).ToListAsync(cancellationToken);
 54141    }
 142
 143    private async Task DeleteTriggersAsync(Workflow workflow, CancellationToken cancellationToken = default)
 144    {
 3145        var emptyTriggerList = new List<StoredTrigger>(0);
 3146        var currentTriggers = await GetCurrentTriggersAsync(workflow.Identity.DefinitionId, cancellationToken).ToList();
 3147        var diff = Diff.For(currentTriggers, emptyTriggerList, _triggerEqualityComparer);
 3148        await _triggerStore.ReplaceAsync(diff.Removed, diff.Added, cancellationToken);
 3149        var indexedWorkflow = new IndexedWorkflowTriggers(workflow, emptyTriggerList, currentTriggers, emptyTriggerList)
 3150        await _notificationSender.SendAsync(new WorkflowTriggersIndexed(indexedWorkflow), cancellationToken);
 3151    }
 152
 153    private async Task<IEnumerable<StoredTrigger>> GetCurrentTriggersAsync(string workflowDefinitionId, CancellationToke
 154    {
 1505155        var filter = new TriggerFilter
 1505156        {
 1505157            WorkflowDefinitionId = workflowDefinitionId
 1505158        };
 1505159        return await _triggerStore.FindManyAsync(filter, cancellationToken);
 1505160    }
 161
 162    private async IAsyncEnumerable<StoredTrigger> GetTriggersInternalAsync(Workflow workflow, [EnumeratorCancellation] C
 163    {
 1556164        var context = new WorkflowIndexingContext(workflow, cancellationToken);
 1556165        var nodes = await _activityVisitor.VisitAsync(workflow.Root, cancellationToken);
 166
 167        // Get a list of trigger activities that are configured as "startable".
 1556168        var triggerActivities = nodes
 1556169            .Flatten()
 5283170            .Where(x => x.Activity.GetCanStartWorkflow() && x.Activity is ITrigger)
 326171            .Select(x => x.Activity)
 1556172            .Cast<ITrigger>()
 1556173            .ToList();
 174
 175        // For each trigger activity, create a trigger.
 3764176        foreach (var triggerActivity in triggerActivities)
 177        {
 326178            var triggers = await CreateWorkflowTriggersAsync(context, triggerActivity);
 179
 1412180            foreach (var trigger in triggers)
 380181                yield return trigger;
 182        }
 1556183    }
 184
 185    private async Task<ICollection<StoredTrigger>> CreateWorkflowTriggersAsync(WorkflowIndexingContext context, ITrigger
 186    {
 326187        var workflow = context.Workflow;
 326188        var cancellationToken = context.CancellationToken;
 326189        var activityTypeName = trigger.Type;
 326190        var triggerDescriptor = _activityRegistry.Find(activityTypeName, trigger.Version);
 191
 326192        if (triggerDescriptor == null)
 193        {
 0194            _logger.LogWarning("Could not find activity descriptor for activity type {ActivityType}", activityTypeName);
 0195            return new List<StoredTrigger>(0);
 196        }
 197
 326198        var expressionExecutionContext = await trigger.CreateExpressionExecutionContextAsync(triggerDescriptor, _service
 326199        var triggerIndexingContext = new TriggerIndexingContext(context, expressionExecutionContext, trigger, cancellati
 326200        var triggerData = await TryGetTriggerDataAsync(trigger, triggerIndexingContext);
 326201        var triggerName = triggerIndexingContext.TriggerName;
 202
 203        // If no trigger payloads were returned, create a null payload.
 326204        if (!triggerData.Any()) triggerData.Add(null!);
 205
 706206        var triggers = triggerData.Select(payload => new StoredTrigger
 706207        {
 706208            Id = _identityGenerator.GenerateId(),
 706209            WorkflowDefinitionId = workflow.Identity.DefinitionId,
 706210            WorkflowDefinitionVersionId = workflow.Identity.Id,
 706211            Name = triggerName,
 706212            ActivityId = trigger.Id,
 706213            Hash = _hasher.Hash(triggerName, payload),
 706214            Payload = payload
 706215        });
 216
 326217        return triggers.ToList();
 326218    }
 219
 220    private async Task<List<object>> TryGetTriggerDataAsync(ITrigger trigger, TriggerIndexingContext context)
 221    {
 222        try
 223        {
 326224            return (await trigger.GetTriggerPayloadsAsync(context)).ToList();
 225        }
 0226        catch (Exception e)
 227        {
 0228            _logger.LogWarning(e, "Failed to get trigger data for activity {ActivityId}", trigger.Id);
 0229        }
 230
 0231        return new(0);
 326232    }
 233}