< Summary

Information
Class: Elsa.Workflows.Runtime.TriggerIndexer
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/TriggerIndexer.cs
Line coverage
93%
Covered lines: 107
Uncovered lines: 7
Coverable lines: 114
Total lines: 229
Line coverage: 93.8%
Branch coverage
83%
Covered branches: 20
Total branches: 24
Branch coverage: 83.3%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
DeleteTriggersAsync()87.5%8891.66%
IndexTriggersAsync()100%11100%
IndexTriggersAsync()100%11100%
IndexTriggersInternalAsync()50%22100%
GetTriggersAsync()100%11100%
DeleteTriggersAsync()100%11100%
GetCurrentTriggersAsync()100%11100%
GetTriggersInternalAsync()100%88100%
CreateWorkflowTriggersAsync()50%4485.71%
TryGetTriggerDataAsync()100%1133.33%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/TriggerIndexer.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using Elsa.Common.DistributedHosting;
 3using Elsa.Expressions.Contracts;
 4using Elsa.Extensions;
 5using Elsa.Mediator.Contracts;
 6using Elsa.Workflows.Activities;
 7using Elsa.Workflows.Helpers;
 8using Elsa.Workflows.Management;
 9using Elsa.Workflows.Management.Entities;
 10using Elsa.Workflows.Runtime.Comparers;
 11using Elsa.Workflows.Runtime.Entities;
 12using Elsa.Workflows.Runtime.Filters;
 13using Elsa.Workflows.Runtime.Notifications;
 14using Medallion.Threading;
 15using Microsoft.Extensions.Logging;
 16using Microsoft.Extensions.Options;
 17using Open.Linq.AsyncExtensions;
 18
 19namespace Elsa.Workflows.Runtime;
 20
 21/// <inheritdoc />
 22public class TriggerIndexer : ITriggerIndexer
 23{
 24    private readonly IActivityVisitor _activityVisitor;
 25    private readonly IWorkflowDefinitionService _workflowDefinitionService;
 26    private readonly IExpressionEvaluator _expressionEvaluator;
 27    private readonly IIdentityGenerator _identityGenerator;
 28    private readonly ITriggerStore _triggerStore;
 29    private readonly IActivityRegistry _activityRegistry;
 30    private readonly INotificationSender _notificationSender;
 31    private readonly IServiceProvider _serviceProvider;
 32    private readonly IStimulusHasher _hasher;
 33    private readonly IDistributedLockProvider _distributedLockProvider;
 34    private readonly DistributedLockingOptions _lockingOptions;
 35    private readonly ILogger _logger;
 36
 37    /// <summary>
 38    /// Constructor.
 39    /// </summary>
 64540    public TriggerIndexer(
 64541        IActivityVisitor activityVisitor,
 64542        IWorkflowDefinitionService workflowDefinitionService,
 64543        IExpressionEvaluator expressionEvaluator,
 64544        IIdentityGenerator identityGenerator,
 64545        ITriggerStore triggerStore,
 64546        IActivityRegistry activityRegistry,
 64547        INotificationSender notificationSender,
 64548        IServiceProvider serviceProvider,
 64549        IStimulusHasher hasher,
 64550        IDistributedLockProvider distributedLockProvider,
 64551        IOptions<DistributedLockingOptions> lockingOptions,
 64552        ILogger<TriggerIndexer> logger)
 53    {
 64554        _activityVisitor = activityVisitor;
 64555        _expressionEvaluator = expressionEvaluator;
 64556        _identityGenerator = identityGenerator;
 64557        _triggerStore = triggerStore;
 64558        _activityRegistry = activityRegistry;
 64559        _notificationSender = notificationSender;
 64560        _serviceProvider = serviceProvider;
 64561        _hasher = hasher;
 64562        _distributedLockProvider = distributedLockProvider;
 64563        _lockingOptions = lockingOptions.Value;
 64564        _logger = logger;
 64565        _workflowDefinitionService = workflowDefinitionService;
 64566    }
 67
 68    /// <inheritdoc />
 69    public async Task DeleteTriggersAsync(TriggerFilter filter, CancellationToken cancellationToken = default)
 70    {
 671        var triggers = (await _triggerStore.FindManyAsync(filter, cancellationToken)).ToList();
 1372        var workflowDefinitionVersionIds = triggers.Select(x => x.WorkflowDefinitionVersionId).Distinct().ToList();
 73
 2674        foreach (var workflowDefinitionVersionId in workflowDefinitionVersionIds)
 75        {
 76            try
 77            {
 778                var workflowGraph = await _workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionVersionId,
 79
 380                if (workflowGraph == null)
 081                    continue;
 82
 383                await DeleteTriggersAsync(workflowGraph.Workflow, cancellationToken);
 384            }
 485            catch (Exception ex)
 86            {
 487                _logger.LogWarning(ex, "Failed to load workflow graph for workflow definition version {WorkflowDefinitio
 488            }
 789        }
 690    }
 91
 92    /// <inheritdoc />
 93    public async Task<IndexedWorkflowTriggers> IndexTriggersAsync(WorkflowDefinition definition, CancellationToken cance
 94    {
 148295        var workflowGraph = await _workflowDefinitionService.MaterializeWorkflowAsync(definition, cancellationToken);
 148296        return await IndexTriggersAsync(workflowGraph.Workflow, cancellationToken);
 148297    }
 98
 99    /// <inheritdoc />
 100    public async Task<IndexedWorkflowTriggers> IndexTriggersAsync(Workflow workflow, CancellationToken cancellationToken
 101    {
 102        // Use distributed lock to prevent concurrent trigger indexing race conditions
 1482103        var lockResource = $"trigger-indexer:{workflow.Identity.DefinitionId}";
 1482104        await using (await _distributedLockProvider.AcquireLockAsync(lockResource, _lockingOptions.LockAcquisitionTimeou
 105        {
 1482106            return await IndexTriggersInternalAsync(workflow, cancellationToken);
 107        }
 1482108    }
 109
 110    private async Task<IndexedWorkflowTriggers> IndexTriggersInternalAsync(Workflow workflow, CancellationToken cancella
 111    {
 112        // Get current triggers
 1482113        var currentTriggers = await GetCurrentTriggersAsync(workflow.Identity.DefinitionId, cancellationToken).ToList();
 114
 115        // Collect new triggers **if the workflow is published**.
 1482116        var newTriggers = workflow.Publication.IsPublished
 1482117            ? await GetTriggersInternalAsync(workflow, cancellationToken).ToListAsync(cancellationToken)
 1482118            : new(0);
 119
 120        // Diff triggers.
 1482121        var diff = Diff.For(currentTriggers, newTriggers, new WorkflowTriggerEqualityComparer());
 122
 123        // Replace triggers for the specified workflow.
 1482124        await _triggerStore.ReplaceAsync(diff.Removed, diff.Added, cancellationToken);
 125
 1482126        var indexedWorkflow = new IndexedWorkflowTriggers(workflow, diff.Added, diff.Removed, diff.Unchanged);
 127
 128        // Publish event.
 1482129        await _notificationSender.SendAsync(new WorkflowTriggersIndexed(indexedWorkflow), cancellationToken);
 1482130        return indexedWorkflow;
 1482131    }
 132
 133    /// <inheritdoc />
 134    public async Task<IEnumerable<StoredTrigger>> GetTriggersAsync(Workflow workflow, CancellationToken cancellationToke
 135    {
 54136        return await GetTriggersInternalAsync(workflow, cancellationToken).ToListAsync(cancellationToken);
 54137    }
 138
 139    private async Task DeleteTriggersAsync(Workflow workflow, CancellationToken cancellationToken = default)
 140    {
 3141        var emptyTriggerList = new List<StoredTrigger>(0);
 3142        var currentTriggers = await GetCurrentTriggersAsync(workflow.Identity.DefinitionId, cancellationToken).ToList();
 3143        var diff = Diff.For(currentTriggers, emptyTriggerList, new WorkflowTriggerEqualityComparer());
 3144        await _triggerStore.ReplaceAsync(diff.Removed, diff.Added, cancellationToken);
 3145        var indexedWorkflow = new IndexedWorkflowTriggers(workflow, emptyTriggerList, currentTriggers, emptyTriggerList)
 3146        await _notificationSender.SendAsync(new WorkflowTriggersIndexed(indexedWorkflow), cancellationToken);
 3147    }
 148
 149    private async Task<IEnumerable<StoredTrigger>> GetCurrentTriggersAsync(string workflowDefinitionId, CancellationToke
 150    {
 1485151        var filter = new TriggerFilter
 1485152        {
 1485153            WorkflowDefinitionId = workflowDefinitionId
 1485154        };
 1485155        return await _triggerStore.FindManyAsync(filter, cancellationToken);
 1485156    }
 157
 158    private async IAsyncEnumerable<StoredTrigger> GetTriggersInternalAsync(Workflow workflow, [EnumeratorCancellation] C
 159    {
 1536160        var context = new WorkflowIndexingContext(workflow, cancellationToken);
 1536161        var nodes = await _activityVisitor.VisitAsync(workflow.Root, cancellationToken);
 162
 163        // Get a list of trigger activities that are configured as "startable".
 1536164        var triggerActivities = nodes
 1536165            .Flatten()
 5227166            .Where(x => x.Activity.GetCanStartWorkflow() && x.Activity is ITrigger)
 308167            .Select(x => x.Activity)
 1536168            .Cast<ITrigger>()
 1536169            .ToList();
 170
 171        // For each trigger activity, create a trigger.
 3688172        foreach (var triggerActivity in triggerActivities)
 173        {
 308174            var triggers = await CreateWorkflowTriggersAsync(context, triggerActivity);
 175
 1340176            foreach (var trigger in triggers)
 362177                yield return trigger;
 178        }
 1536179    }
 180
 181    private async Task<ICollection<StoredTrigger>> CreateWorkflowTriggersAsync(WorkflowIndexingContext context, ITrigger
 182    {
 308183        var workflow = context.Workflow;
 308184        var cancellationToken = context.CancellationToken;
 308185        var activityTypeName = trigger.Type;
 308186        var triggerDescriptor = _activityRegistry.Find(activityTypeName, trigger.Version);
 187
 308188        if (triggerDescriptor == null)
 189        {
 0190            _logger.LogWarning("Could not find activity descriptor for activity type {ActivityType}", activityTypeName);
 0191            return new List<StoredTrigger>(0);
 192        }
 193
 308194        var expressionExecutionContext = await trigger.CreateExpressionExecutionContextAsync(triggerDescriptor, _service
 308195        var triggerIndexingContext = new TriggerIndexingContext(context, expressionExecutionContext, trigger, cancellati
 308196        var triggerData = await TryGetTriggerDataAsync(trigger, triggerIndexingContext);
 308197        var triggerName = triggerIndexingContext.TriggerName;
 198
 199        // If no trigger payloads were returned, create a null payload.
 308200        if (!triggerData.Any()) triggerData.Add(null!);
 201
 670202        var triggers = triggerData.Select(payload => new StoredTrigger
 670203        {
 670204            Id = _identityGenerator.GenerateId(),
 670205            WorkflowDefinitionId = workflow.Identity.DefinitionId,
 670206            WorkflowDefinitionVersionId = workflow.Identity.Id,
 670207            Name = triggerName,
 670208            ActivityId = trigger.Id,
 670209            Hash = _hasher.Hash(triggerName, payload),
 670210            Payload = payload
 670211        });
 212
 308213        return triggers.ToList();
 308214    }
 215
 216    private async Task<List<object>> TryGetTriggerDataAsync(ITrigger trigger, TriggerIndexingContext context)
 217    {
 218        try
 219        {
 308220            return (await trigger.GetTriggerPayloadsAsync(context)).ToList();
 221        }
 0222        catch (Exception e)
 223        {
 0224            _logger.LogWarning(e, "Failed to get trigger data for activity {ActivityId}", trigger.Id);
 0225        }
 226
 0227        return new(0);
 308228    }
 229}