< Summary

Information
Class: Elsa.Workflows.Runtime.TriggerIndexer
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/TriggerIndexer.cs
Line coverage
93%
Covered lines: 107
Uncovered lines: 7
Coverable lines: 114
Total lines: 229
Line coverage: 93.8%
Branch coverage
80%
Covered branches: 16
Total branches: 20
Branch coverage: 80%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
DeleteTriggersAsync()87.5%8891.66%
IndexTriggersAsync()100%11100%
IndexTriggersAsync()100%11100%
IndexTriggersInternalAsync()50%22100%
GetTriggersAsync()100%11100%
DeleteTriggersAsync()100%11100%
GetCurrentTriggersAsync()100%11100%
GetTriggersInternalAsync()100%44100%
CreateWorkflowTriggersAsync()50%4485.71%
TryGetTriggerDataAsync()100%1133.33%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/TriggerIndexer.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using Elsa.Common.DistributedHosting;
 3using Elsa.Expressions.Contracts;
 4using Elsa.Extensions;
 5using Elsa.Mediator.Contracts;
 6using Elsa.Workflows.Activities;
 7using Elsa.Workflows.Helpers;
 8using Elsa.Workflows.Management;
 9using Elsa.Workflows.Management.Entities;
 10using Elsa.Workflows.Runtime.Comparers;
 11using Elsa.Workflows.Runtime.Entities;
 12using Elsa.Workflows.Runtime.Filters;
 13using Elsa.Workflows.Runtime.Notifications;
 14using Medallion.Threading;
 15using Microsoft.Extensions.Logging;
 16using Microsoft.Extensions.Options;
 17using Open.Linq.AsyncExtensions;
 18
 19namespace Elsa.Workflows.Runtime;
 20
 21/// <inheritdoc />
 22public class TriggerIndexer : ITriggerIndexer
 23{
 24    private readonly IActivityVisitor _activityVisitor;
 25    private readonly IWorkflowDefinitionService _workflowDefinitionService;
 26    private readonly IExpressionEvaluator _expressionEvaluator;
 27    private readonly IIdentityGenerator _identityGenerator;
 28    private readonly ITriggerStore _triggerStore;
 29    private readonly IActivityRegistry _activityRegistry;
 30    private readonly INotificationSender _notificationSender;
 31    private readonly IServiceProvider _serviceProvider;
 32    private readonly IStimulusHasher _hasher;
 33    private readonly IDistributedLockProvider _distributedLockProvider;
 34    private readonly DistributedLockingOptions _lockingOptions;
 35    private readonly ILogger _logger;
 36
 37    /// <summary>
 38    /// Constructor.
 39    /// </summary>
 65240    public TriggerIndexer(
 65241        IActivityVisitor activityVisitor,
 65242        IWorkflowDefinitionService workflowDefinitionService,
 65243        IExpressionEvaluator expressionEvaluator,
 65244        IIdentityGenerator identityGenerator,
 65245        ITriggerStore triggerStore,
 65246        IActivityRegistry activityRegistry,
 65247        INotificationSender notificationSender,
 65248        IServiceProvider serviceProvider,
 65249        IStimulusHasher hasher,
 65250        IDistributedLockProvider distributedLockProvider,
 65251        IOptions<DistributedLockingOptions> lockingOptions,
 65252        ILogger<TriggerIndexer> logger)
 53    {
 65254        _activityVisitor = activityVisitor;
 65255        _expressionEvaluator = expressionEvaluator;
 65256        _identityGenerator = identityGenerator;
 65257        _triggerStore = triggerStore;
 65258        _activityRegistry = activityRegistry;
 65259        _notificationSender = notificationSender;
 65260        _serviceProvider = serviceProvider;
 65261        _hasher = hasher;
 65262        _distributedLockProvider = distributedLockProvider;
 65263        _lockingOptions = lockingOptions.Value;
 65264        _logger = logger;
 65265        _workflowDefinitionService = workflowDefinitionService;
 65266    }
 67
 68    /// <inheritdoc />
 69    public async Task DeleteTriggersAsync(TriggerFilter filter, CancellationToken cancellationToken = default)
 70    {
 671        var triggers = (await _triggerStore.FindManyAsync(filter, cancellationToken)).ToList();
 1372        var workflowDefinitionVersionIds = triggers.Select(x => x.WorkflowDefinitionVersionId).Distinct().ToList();
 73
 2674        foreach (var workflowDefinitionVersionId in workflowDefinitionVersionIds)
 75        {
 76            try
 77            {
 778                var workflowGraph = await _workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionVersionId,
 79
 380                if (workflowGraph == null)
 081                    continue;
 82
 383                await DeleteTriggersAsync(workflowGraph.Workflow, cancellationToken);
 384            }
 485            catch (Exception ex)
 86            {
 487                _logger.LogWarning(ex, "Failed to load workflow graph for workflow definition version {WorkflowDefinitio
 488            }
 789        }
 690    }
 91
 92    /// <inheritdoc />
 93    public async Task<IndexedWorkflowTriggers> IndexTriggersAsync(WorkflowDefinition definition, CancellationToken cance
 94    {
 148495        var workflowGraph = await _workflowDefinitionService.MaterializeWorkflowAsync(definition, cancellationToken);
 148496        return await IndexTriggersAsync(workflowGraph.Workflow, cancellationToken);
 148497    }
 98
 99    /// <inheritdoc />
 100    public async Task<IndexedWorkflowTriggers> IndexTriggersAsync(Workflow workflow, CancellationToken cancellationToken
 101    {
 102        // Use distributed lock to prevent concurrent trigger indexing race conditions
 1484103        var lockResource = $"trigger-indexer:{workflow.Identity.DefinitionId}";
 1484104        await using (await _distributedLockProvider.AcquireLockAsync(lockResource, _lockingOptions.LockAcquisitionTimeou
 105        {
 1484106            return await IndexTriggersInternalAsync(workflow, cancellationToken);
 107        }
 1484108    }
 109
 110    private async Task<IndexedWorkflowTriggers> IndexTriggersInternalAsync(Workflow workflow, CancellationToken cancella
 111    {
 112        // Get current triggers
 1484113        var currentTriggers = await GetCurrentTriggersAsync(workflow.Identity.DefinitionId, cancellationToken).ToList();
 114
 115        // Collect new triggers **if the workflow is published**.
 1484116        var newTriggers = workflow.Publication.IsPublished
 1484117            ? await GetTriggersInternalAsync(workflow, cancellationToken).ToListAsync(cancellationToken)
 1484118            : new(0);
 119
 120        // Diff triggers.
 1484121        var diff = Diff.For(currentTriggers, newTriggers, new WorkflowTriggerEqualityComparer());
 122
 123        // Replace triggers for the specified workflow.
 1484124        await _triggerStore.ReplaceAsync(diff.Removed, diff.Added, cancellationToken);
 125
 1484126        var indexedWorkflow = new IndexedWorkflowTriggers(workflow, diff.Added, diff.Removed, diff.Unchanged);
 127
 128        // Publish event.
 1484129        await _notificationSender.SendAsync(new WorkflowTriggersIndexed(indexedWorkflow), cancellationToken);
 1484130        return indexedWorkflow;
 1484131    }
 132
 133    /// <inheritdoc />
 134    public async Task<IEnumerable<StoredTrigger>> GetTriggersAsync(Workflow workflow, CancellationToken cancellationToke
 135    {
 54136        return await GetTriggersInternalAsync(workflow, cancellationToken).ToListAsync(cancellationToken);
 54137    }
 138
 139    private async Task DeleteTriggersAsync(Workflow workflow, CancellationToken cancellationToken = default)
 140    {
 3141        var emptyTriggerList = new List<StoredTrigger>(0);
 3142        var currentTriggers = await GetCurrentTriggersAsync(workflow.Identity.DefinitionId, cancellationToken).ToList();
 3143        var diff = Diff.For(currentTriggers, emptyTriggerList, new WorkflowTriggerEqualityComparer());
 3144        await _triggerStore.ReplaceAsync(diff.Removed, diff.Added, cancellationToken);
 3145        var indexedWorkflow = new IndexedWorkflowTriggers(workflow, emptyTriggerList, currentTriggers, emptyTriggerList)
 3146        await _notificationSender.SendAsync(new WorkflowTriggersIndexed(indexedWorkflow), cancellationToken);
 3147    }
 148
 149    private async Task<IEnumerable<StoredTrigger>> GetCurrentTriggersAsync(string workflowDefinitionId, CancellationToke
 150    {
 1487151        var filter = new TriggerFilter
 1487152        {
 1487153            WorkflowDefinitionId = workflowDefinitionId
 1487154        };
 1487155        return await _triggerStore.FindManyAsync(filter, cancellationToken);
 1487156    }
 157
 158    private async IAsyncEnumerable<StoredTrigger> GetTriggersInternalAsync(Workflow workflow, [EnumeratorCancellation] C
 159    {
 1538160        var context = new WorkflowIndexingContext(workflow, cancellationToken);
 1538161        var nodes = await _activityVisitor.VisitAsync(workflow.Root, cancellationToken);
 162
 163        // Get a list of trigger activities that are configured as "startable".
 1538164        var triggerActivities = nodes
 1538165            .Flatten()
 5229166            .Where(x => x.Activity.GetCanStartWorkflow() && x.Activity is ITrigger)
 308167            .Select(x => x.Activity)
 1538168            .Cast<ITrigger>()
 1538169            .ToList();
 170
 171        // For each trigger activity, create a trigger.
 3692172        foreach (var triggerActivity in triggerActivities)
 173        {
 308174            var triggers = await CreateWorkflowTriggersAsync(context, triggerActivity);
 175
 1340176            foreach (var trigger in triggers)
 362177                yield return trigger;
 178        }
 1538179    }
 180
 181    private async Task<ICollection<StoredTrigger>> CreateWorkflowTriggersAsync(WorkflowIndexingContext context, ITrigger
 182    {
 308183        var workflow = context.Workflow;
 308184        var cancellationToken = context.CancellationToken;
 308185        var activityTypeName = trigger.Type;
 308186        var triggerDescriptor = _activityRegistry.Find(activityTypeName, trigger.Version);
 187
 308188        if (triggerDescriptor == null)
 189        {
 0190            _logger.LogWarning("Could not find activity descriptor for activity type {ActivityType}", activityTypeName);
 0191            return new List<StoredTrigger>(0);
 192        }
 193
 308194        var expressionExecutionContext = await trigger.CreateExpressionExecutionContextAsync(triggerDescriptor, _service
 308195        var triggerIndexingContext = new TriggerIndexingContext(context, expressionExecutionContext, trigger, cancellati
 308196        var triggerData = await TryGetTriggerDataAsync(trigger, triggerIndexingContext);
 308197        var triggerName = triggerIndexingContext.TriggerName;
 198
 199        // If no trigger payloads were returned, create a null payload.
 308200        if (!triggerData.Any()) triggerData.Add(null!);
 201
 670202        var triggers = triggerData.Select(payload => new StoredTrigger
 670203        {
 670204            Id = _identityGenerator.GenerateId(),
 670205            WorkflowDefinitionId = workflow.Identity.DefinitionId,
 670206            WorkflowDefinitionVersionId = workflow.Identity.Id,
 670207            Name = triggerName,
 670208            ActivityId = trigger.Id,
 670209            Hash = _hasher.Hash(triggerName, payload),
 670210            Payload = payload
 670211        });
 212
 308213        return triggers.ToList();
 308214    }
 215
 216    private async Task<List<object>> TryGetTriggerDataAsync(ITrigger trigger, TriggerIndexingContext context)
 217    {
 218        try
 219        {
 308220            return (await trigger.GetTriggerPayloadsAsync(context)).ToList();
 221        }
 0222        catch (Exception e)
 223        {
 0224            _logger.LogWarning(e, "Failed to get trigger data for activity {ActivityId}", trigger.Id);
 0225        }
 226
 0227        return new(0);
 308228    }
 229}