< Summary

Information
Class: Elsa.Workflows.Runtime.TriggerIndexer
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/TriggerIndexer.cs
Line coverage
93%
Covered lines: 107
Uncovered lines: 7
Coverable lines: 114
Total lines: 229
Line coverage: 93.8%
Branch coverage
83%
Covered branches: 20
Total branches: 24
Branch coverage: 83.3%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
DeleteTriggersAsync()87.5%8891.66%
IndexTriggersAsync()100%11100%
IndexTriggersAsync()100%11100%
IndexTriggersInternalAsync()50%22100%
GetTriggersAsync()100%11100%
DeleteTriggersAsync()100%11100%
GetCurrentTriggersAsync()100%11100%
GetTriggersInternalAsync()100%88100%
CreateWorkflowTriggersAsync()50%4485.71%
TryGetTriggerDataAsync()100%1133.33%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/TriggerIndexer.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using Elsa.Common.DistributedHosting;
 3using Elsa.Expressions.Contracts;
 4using Elsa.Extensions;
 5using Elsa.Mediator.Contracts;
 6using Elsa.Workflows.Activities;
 7using Elsa.Workflows.Helpers;
 8using Elsa.Workflows.Management;
 9using Elsa.Workflows.Management.Entities;
 10using Elsa.Workflows.Runtime.Comparers;
 11using Elsa.Workflows.Runtime.Entities;
 12using Elsa.Workflows.Runtime.Filters;
 13using Elsa.Workflows.Runtime.Notifications;
 14using Medallion.Threading;
 15using Microsoft.Extensions.Logging;
 16using Microsoft.Extensions.Options;
 17using Open.Linq.AsyncExtensions;
 18
 19namespace Elsa.Workflows.Runtime;
 20
 21/// <inheritdoc />
 22public class TriggerIndexer : ITriggerIndexer
 23{
 24    private readonly IActivityVisitor _activityVisitor;
 25    private readonly IWorkflowDefinitionService _workflowDefinitionService;
 26    private readonly IExpressionEvaluator _expressionEvaluator;
 27    private readonly IIdentityGenerator _identityGenerator;
 28    private readonly ITriggerStore _triggerStore;
 29    private readonly IActivityRegistry _activityRegistry;
 30    private readonly INotificationSender _notificationSender;
 31    private readonly IServiceProvider _serviceProvider;
 32    private readonly IStimulusHasher _hasher;
 33    private readonly IDistributedLockProvider _distributedLockProvider;
 34    private readonly DistributedLockingOptions _lockingOptions;
 35    private readonly ILogger _logger;
 36
 37    /// <summary>
 38    /// Constructor.
 39    /// </summary>
 60940    public TriggerIndexer(
 60941        IActivityVisitor activityVisitor,
 60942        IWorkflowDefinitionService workflowDefinitionService,
 60943        IExpressionEvaluator expressionEvaluator,
 60944        IIdentityGenerator identityGenerator,
 60945        ITriggerStore triggerStore,
 60946        IActivityRegistry activityRegistry,
 60947        INotificationSender notificationSender,
 60948        IServiceProvider serviceProvider,
 60949        IStimulusHasher hasher,
 60950        IDistributedLockProvider distributedLockProvider,
 60951        IOptions<DistributedLockingOptions> lockingOptions,
 60952        ILogger<TriggerIndexer> logger)
 53    {
 60954        _activityVisitor = activityVisitor;
 60955        _expressionEvaluator = expressionEvaluator;
 60956        _identityGenerator = identityGenerator;
 60957        _triggerStore = triggerStore;
 60958        _activityRegistry = activityRegistry;
 60959        _notificationSender = notificationSender;
 60960        _serviceProvider = serviceProvider;
 60961        _hasher = hasher;
 60962        _distributedLockProvider = distributedLockProvider;
 60963        _lockingOptions = lockingOptions.Value;
 60964        _logger = logger;
 60965        _workflowDefinitionService = workflowDefinitionService;
 60966    }
 67
 68    /// <inheritdoc />
 69    public async Task DeleteTriggersAsync(TriggerFilter filter, CancellationToken cancellationToken = default)
 70    {
 671        var triggers = (await _triggerStore.FindManyAsync(filter, cancellationToken)).ToList();
 1372        var workflowDefinitionVersionIds = triggers.Select(x => x.WorkflowDefinitionVersionId).Distinct().ToList();
 73
 2674        foreach (var workflowDefinitionVersionId in workflowDefinitionVersionIds)
 75        {
 76            try
 77            {
 778                var workflowGraph = await _workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionVersionId,
 79
 380                if (workflowGraph == null)
 081                    continue;
 82
 383                await DeleteTriggersAsync(workflowGraph.Workflow, cancellationToken);
 384            }
 485            catch (Exception ex)
 86            {
 487                _logger.LogWarning(ex, "Failed to load workflow graph for workflow definition version {WorkflowDefinitio
 488            }
 789        }
 690    }
 91
 92    /// <inheritdoc />
 93    public async Task<IndexedWorkflowTriggers> IndexTriggersAsync(WorkflowDefinition definition, CancellationToken cance
 94    {
 126695        var workflowGraph = await _workflowDefinitionService.MaterializeWorkflowAsync(definition, cancellationToken);
 126696        return await IndexTriggersAsync(workflowGraph.Workflow, cancellationToken);
 126697    }
 98
 99    /// <inheritdoc />
 100    public async Task<IndexedWorkflowTriggers> IndexTriggersAsync(Workflow workflow, CancellationToken cancellationToken
 101    {
 102        // Use distributed lock to prevent concurrent trigger indexing race conditions
 1266103        var lockResource = $"trigger-indexer:{workflow.Identity.DefinitionId}";
 1266104        await using (await _distributedLockProvider.AcquireLockAsync(lockResource, _lockingOptions.LockAcquisitionTimeou
 105        {
 1266106            return await IndexTriggersInternalAsync(workflow, cancellationToken);
 107        }
 1266108    }
 109
 110    private async Task<IndexedWorkflowTriggers> IndexTriggersInternalAsync(Workflow workflow, CancellationToken cancella
 111    {
 112        // Get current triggers
 1266113        var currentTriggers = await GetCurrentTriggersAsync(workflow.Identity.DefinitionId, cancellationToken).ToList();
 114
 115        // Collect new triggers **if the workflow is published**.
 1266116        var newTriggers = workflow.Publication.IsPublished
 1266117            ? await GetTriggersInternalAsync(workflow, cancellationToken).ToListAsync(cancellationToken)
 1266118            : new(0);
 119
 120        // Diff triggers.
 1266121        var diff = Diff.For(currentTriggers, newTriggers, new WorkflowTriggerEqualityComparer());
 122
 123        // Replace triggers for the specified workflow.
 1266124        await _triggerStore.ReplaceAsync(diff.Removed, diff.Added, cancellationToken);
 125
 1266126        var indexedWorkflow = new IndexedWorkflowTriggers(workflow, diff.Added, diff.Removed, diff.Unchanged);
 127
 128        // Publish event.
 1266129        await _notificationSender.SendAsync(new WorkflowTriggersIndexed(indexedWorkflow), cancellationToken);
 1266130        return indexedWorkflow;
 1266131    }
 132
 133    /// <inheritdoc />
 134    public async Task<IEnumerable<StoredTrigger>> GetTriggersAsync(Workflow workflow, CancellationToken cancellationToke
 135    {
 54136        return await GetTriggersInternalAsync(workflow, cancellationToken).ToListAsync(cancellationToken);
 54137    }
 138
 139    private async Task DeleteTriggersAsync(Workflow workflow, CancellationToken cancellationToken = default)
 140    {
 3141        var emptyTriggerList = new List<StoredTrigger>(0);
 3142        var currentTriggers = await GetCurrentTriggersAsync(workflow.Identity.DefinitionId, cancellationToken).ToList();
 3143        var diff = Diff.For(currentTriggers, emptyTriggerList, new WorkflowTriggerEqualityComparer());
 3144        await _triggerStore.ReplaceAsync(diff.Removed, diff.Added, cancellationToken);
 3145        var indexedWorkflow = new IndexedWorkflowTriggers(workflow, emptyTriggerList, currentTriggers, emptyTriggerList)
 3146        await _notificationSender.SendAsync(new WorkflowTriggersIndexed(indexedWorkflow), cancellationToken);
 3147    }
 148
 149    private async Task<IEnumerable<StoredTrigger>> GetCurrentTriggersAsync(string workflowDefinitionId, CancellationToke
 150    {
 1269151        var filter = new TriggerFilter
 1269152        {
 1269153            WorkflowDefinitionId = workflowDefinitionId
 1269154        };
 1269155        return await _triggerStore.FindManyAsync(filter, cancellationToken);
 1269156    }
 157
 158    private async IAsyncEnumerable<StoredTrigger> GetTriggersInternalAsync(Workflow workflow, [EnumeratorCancellation] C
 159    {
 1320160        var context = new WorkflowIndexingContext(workflow, cancellationToken);
 1320161        var nodes = await _activityVisitor.VisitAsync(workflow.Root, cancellationToken);
 162
 163        // Get a list of trigger activities that are configured as "startable".
 1320164        var triggerActivities = nodes
 1320165            .Flatten()
 4502166            .Where(x => x.Activity.GetCanStartWorkflow() && x.Activity is ITrigger)
 287167            .Select(x => x.Activity)
 1320168            .Cast<ITrigger>()
 1320169            .ToList();
 170
 171        // For each trigger activity, create a trigger.
 3214172        foreach (var triggerActivity in triggerActivities)
 173        {
 287174            var triggers = await CreateWorkflowTriggersAsync(context, triggerActivity);
 175
 1226176            foreach (var trigger in triggers)
 326177                yield return trigger;
 178        }
 1320179    }
 180
 181    private async Task<ICollection<StoredTrigger>> CreateWorkflowTriggersAsync(WorkflowIndexingContext context, ITrigger
 182    {
 287183        var workflow = context.Workflow;
 287184        var cancellationToken = context.CancellationToken;
 287185        var activityTypeName = trigger.Type;
 287186        var triggerDescriptor = _activityRegistry.Find(activityTypeName, trigger.Version);
 187
 287188        if (triggerDescriptor == null)
 189        {
 0190            _logger.LogWarning("Could not find activity descriptor for activity type {ActivityType}", activityTypeName);
 0191            return new List<StoredTrigger>(0);
 192        }
 193
 287194        var expressionExecutionContext = await trigger.CreateExpressionExecutionContextAsync(triggerDescriptor, _service
 287195        var triggerIndexingContext = new TriggerIndexingContext(context, expressionExecutionContext, trigger, cancellati
 287196        var triggerData = await TryGetTriggerDataAsync(trigger, triggerIndexingContext);
 287197        var triggerName = triggerIndexingContext.TriggerName;
 198
 199        // If no trigger payloads were returned, create a null payload.
 287200        if (!triggerData.Any()) triggerData.Add(null!);
 201
 613202        var triggers = triggerData.Select(payload => new StoredTrigger
 613203        {
 613204            Id = _identityGenerator.GenerateId(),
 613205            WorkflowDefinitionId = workflow.Identity.DefinitionId,
 613206            WorkflowDefinitionVersionId = workflow.Identity.Id,
 613207            Name = triggerName,
 613208            ActivityId = trigger.Id,
 613209            Hash = _hasher.Hash(triggerName, payload),
 613210            Payload = payload
 613211        });
 212
 287213        return triggers.ToList();
 287214    }
 215
 216    private async Task<List<object>> TryGetTriggerDataAsync(ITrigger trigger, TriggerIndexingContext context)
 217    {
 218        try
 219        {
 287220            return (await trigger.GetTriggerPayloadsAsync(context)).ToList();
 221        }
 0222        catch (Exception e)
 223        {
 0224            _logger.LogWarning(e, "Failed to get trigger data for activity {ActivityId}", trigger.Id);
 0225        }
 226
 0227        return new(0);
 287228    }
 229}