< Summary

Information
Class: Elsa.Workflows.Runtime.TriggerIndexer
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/TriggerIndexer.cs
Line coverage
93%
Covered lines: 107
Uncovered lines: 7
Coverable lines: 114
Total lines: 229
Line coverage: 93.8%
Branch coverage
83%
Covered branches: 20
Total branches: 24
Branch coverage: 83.3%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
DeleteTriggersAsync()87.5%8891.66%
IndexTriggersAsync()100%11100%
IndexTriggersAsync()100%11100%
IndexTriggersInternalAsync()50%22100%
GetTriggersAsync()100%11100%
DeleteTriggersAsync()100%11100%
GetCurrentTriggersAsync()100%11100%
GetTriggersInternalAsync()100%88100%
CreateWorkflowTriggersAsync()50%4485.71%
TryGetTriggerDataAsync()100%1133.33%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/TriggerIndexer.cs

#LineLine coverage
 1using System.Runtime.CompilerServices;
 2using Elsa.Common.DistributedHosting;
 3using Elsa.Expressions.Contracts;
 4using Elsa.Extensions;
 5using Elsa.Mediator.Contracts;
 6using Elsa.Workflows.Activities;
 7using Elsa.Workflows.Helpers;
 8using Elsa.Workflows.Management;
 9using Elsa.Workflows.Management.Entities;
 10using Elsa.Workflows.Runtime.Comparers;
 11using Elsa.Workflows.Runtime.Entities;
 12using Elsa.Workflows.Runtime.Filters;
 13using Elsa.Workflows.Runtime.Notifications;
 14using Medallion.Threading;
 15using Microsoft.Extensions.Logging;
 16using Microsoft.Extensions.Options;
 17using Open.Linq.AsyncExtensions;
 18
 19namespace Elsa.Workflows.Runtime;
 20
 21/// <inheritdoc />
 22public class TriggerIndexer : ITriggerIndexer
 23{
 24    private readonly IActivityVisitor _activityVisitor;
 25    private readonly IWorkflowDefinitionService _workflowDefinitionService;
 26    private readonly IExpressionEvaluator _expressionEvaluator;
 27    private readonly IIdentityGenerator _identityGenerator;
 28    private readonly ITriggerStore _triggerStore;
 29    private readonly IActivityRegistry _activityRegistry;
 30    private readonly INotificationSender _notificationSender;
 31    private readonly IServiceProvider _serviceProvider;
 32    private readonly IStimulusHasher _hasher;
 33    private readonly IDistributedLockProvider _distributedLockProvider;
 34    private readonly DistributedLockingOptions _lockingOptions;
 35    private readonly ILogger _logger;
 36
 37    /// <summary>
 38    /// Constructor.
 39    /// </summary>
 63140    public TriggerIndexer(
 63141        IActivityVisitor activityVisitor,
 63142        IWorkflowDefinitionService workflowDefinitionService,
 63143        IExpressionEvaluator expressionEvaluator,
 63144        IIdentityGenerator identityGenerator,
 63145        ITriggerStore triggerStore,
 63146        IActivityRegistry activityRegistry,
 63147        INotificationSender notificationSender,
 63148        IServiceProvider serviceProvider,
 63149        IStimulusHasher hasher,
 63150        IDistributedLockProvider distributedLockProvider,
 63151        IOptions<DistributedLockingOptions> lockingOptions,
 63152        ILogger<TriggerIndexer> logger)
 53    {
 63154        _activityVisitor = activityVisitor;
 63155        _expressionEvaluator = expressionEvaluator;
 63156        _identityGenerator = identityGenerator;
 63157        _triggerStore = triggerStore;
 63158        _activityRegistry = activityRegistry;
 63159        _notificationSender = notificationSender;
 63160        _serviceProvider = serviceProvider;
 63161        _hasher = hasher;
 63162        _distributedLockProvider = distributedLockProvider;
 63163        _lockingOptions = lockingOptions.Value;
 63164        _logger = logger;
 63165        _workflowDefinitionService = workflowDefinitionService;
 63166    }
 67
 68    /// <inheritdoc />
 69    public async Task DeleteTriggersAsync(TriggerFilter filter, CancellationToken cancellationToken = default)
 70    {
 671        var triggers = (await _triggerStore.FindManyAsync(filter, cancellationToken)).ToList();
 1372        var workflowDefinitionVersionIds = triggers.Select(x => x.WorkflowDefinitionVersionId).Distinct().ToList();
 73
 2674        foreach (var workflowDefinitionVersionId in workflowDefinitionVersionIds)
 75        {
 76            try
 77            {
 778                var workflowGraph = await _workflowDefinitionService.FindWorkflowGraphAsync(workflowDefinitionVersionId,
 79
 380                if (workflowGraph == null)
 081                    continue;
 82
 383                await DeleteTriggersAsync(workflowGraph.Workflow, cancellationToken);
 384            }
 485            catch (Exception ex)
 86            {
 487                _logger.LogWarning(ex, "Failed to load workflow graph for workflow definition version {WorkflowDefinitio
 488            }
 789        }
 690    }
 91
 92    /// <inheritdoc />
 93    public async Task<IndexedWorkflowTriggers> IndexTriggersAsync(WorkflowDefinition definition, CancellationToken cance
 94    {
 125495        var workflowGraph = await _workflowDefinitionService.MaterializeWorkflowAsync(definition, cancellationToken);
 125496        return await IndexTriggersAsync(workflowGraph.Workflow, cancellationToken);
 125497    }
 98
 99    /// <inheritdoc />
 100    public async Task<IndexedWorkflowTriggers> IndexTriggersAsync(Workflow workflow, CancellationToken cancellationToken
 101    {
 102        // Use distributed lock to prevent concurrent trigger indexing race conditions
 1254103        var lockResource = $"trigger-indexer:{workflow.Identity.DefinitionId}";
 1254104        await using (await _distributedLockProvider.AcquireLockAsync(lockResource, _lockingOptions.LockAcquisitionTimeou
 105        {
 1254106            return await IndexTriggersInternalAsync(workflow, cancellationToken);
 107        }
 1254108    }
 109
 110    private async Task<IndexedWorkflowTriggers> IndexTriggersInternalAsync(Workflow workflow, CancellationToken cancella
 111    {
 112        // Get current triggers
 1254113        var currentTriggers = await GetCurrentTriggersAsync(workflow.Identity.DefinitionId, cancellationToken).ToList();
 114
 115        // Collect new triggers **if the workflow is published**.
 1254116        var newTriggers = workflow.Publication.IsPublished
 1254117            ? await GetTriggersInternalAsync(workflow, cancellationToken).ToListAsync(cancellationToken)
 1254118            : new(0);
 119
 120        // Diff triggers.
 1254121        var diff = Diff.For(currentTriggers, newTriggers, new WorkflowTriggerEqualityComparer());
 122
 123        // Replace triggers for the specified workflow.
 1254124        await _triggerStore.ReplaceAsync(diff.Removed, diff.Added, cancellationToken);
 125
 1254126        var indexedWorkflow = new IndexedWorkflowTriggers(workflow, diff.Added, diff.Removed, diff.Unchanged);
 127
 128        // Publish event.
 1254129        await _notificationSender.SendAsync(new WorkflowTriggersIndexed(indexedWorkflow), cancellationToken);
 1254130        return indexedWorkflow;
 1254131    }
 132
 133    /// <inheritdoc />
 134    public async Task<IEnumerable<StoredTrigger>> GetTriggersAsync(Workflow workflow, CancellationToken cancellationToke
 135    {
 54136        return await GetTriggersInternalAsync(workflow, cancellationToken).ToListAsync(cancellationToken);
 54137    }
 138
 139    private async Task DeleteTriggersAsync(Workflow workflow, CancellationToken cancellationToken = default)
 140    {
 3141        var emptyTriggerList = new List<StoredTrigger>(0);
 3142        var currentTriggers = await GetCurrentTriggersAsync(workflow.Identity.DefinitionId, cancellationToken).ToList();
 3143        var diff = Diff.For(currentTriggers, emptyTriggerList, new WorkflowTriggerEqualityComparer());
 3144        await _triggerStore.ReplaceAsync(diff.Removed, diff.Added, cancellationToken);
 3145        var indexedWorkflow = new IndexedWorkflowTriggers(workflow, emptyTriggerList, currentTriggers, emptyTriggerList)
 3146        await _notificationSender.SendAsync(new WorkflowTriggersIndexed(indexedWorkflow), cancellationToken);
 3147    }
 148
 149    private async Task<IEnumerable<StoredTrigger>> GetCurrentTriggersAsync(string workflowDefinitionId, CancellationToke
 150    {
 1257151        var filter = new TriggerFilter
 1257152        {
 1257153            WorkflowDefinitionId = workflowDefinitionId
 1257154        };
 1257155        return await _triggerStore.FindManyAsync(filter, cancellationToken);
 1257156    }
 157
 158    private async IAsyncEnumerable<StoredTrigger> GetTriggersInternalAsync(Workflow workflow, [EnumeratorCancellation] C
 159    {
 1308160        var context = new WorkflowIndexingContext(workflow, cancellationToken);
 1308161        var nodes = await _activityVisitor.VisitAsync(workflow.Root, cancellationToken);
 162
 163        // Get a list of trigger activities that are configured as "startable".
 1308164        var triggerActivities = nodes
 1308165            .Flatten()
 4490166            .Where(x => x.Activity.GetCanStartWorkflow() && x.Activity is ITrigger)
 287167            .Select(x => x.Activity)
 1308168            .Cast<ITrigger>()
 1308169            .ToList();
 170
 171        // For each trigger activity, create a trigger.
 3190172        foreach (var triggerActivity in triggerActivities)
 173        {
 287174            var triggers = await CreateWorkflowTriggersAsync(context, triggerActivity);
 175
 1226176            foreach (var trigger in triggers)
 326177                yield return trigger;
 178        }
 1308179    }
 180
 181    private async Task<ICollection<StoredTrigger>> CreateWorkflowTriggersAsync(WorkflowIndexingContext context, ITrigger
 182    {
 287183        var workflow = context.Workflow;
 287184        var cancellationToken = context.CancellationToken;
 287185        var activityTypeName = trigger.Type;
 287186        var triggerDescriptor = _activityRegistry.Find(activityTypeName, trigger.Version);
 187
 287188        if (triggerDescriptor == null)
 189        {
 0190            _logger.LogWarning("Could not find activity descriptor for activity type {ActivityType}", activityTypeName);
 0191            return new List<StoredTrigger>(0);
 192        }
 193
 287194        var expressionExecutionContext = await trigger.CreateExpressionExecutionContextAsync(triggerDescriptor, _service
 287195        var triggerIndexingContext = new TriggerIndexingContext(context, expressionExecutionContext, trigger, cancellati
 287196        var triggerData = await TryGetTriggerDataAsync(trigger, triggerIndexingContext);
 287197        var triggerName = triggerIndexingContext.TriggerName;
 198
 199        // If no trigger payloads were returned, create a null payload.
 287200        if (!triggerData.Any()) triggerData.Add(null!);
 201
 613202        var triggers = triggerData.Select(payload => new StoredTrigger
 613203        {
 613204            Id = _identityGenerator.GenerateId(),
 613205            WorkflowDefinitionId = workflow.Identity.DefinitionId,
 613206            WorkflowDefinitionVersionId = workflow.Identity.Id,
 613207            Name = triggerName,
 613208            ActivityId = trigger.Id,
 613209            Hash = _hasher.Hash(triggerName, payload),
 613210            Payload = payload
 613211        });
 212
 287213        return triggers.ToList();
 287214    }
 215
 216    private async Task<List<object>> TryGetTriggerDataAsync(ITrigger trigger, TriggerIndexingContext context)
 217    {
 218        try
 219        {
 287220            return (await trigger.GetTriggerPayloadsAsync(context)).ToList();
 221        }
 0222        catch (Exception e)
 223        {
 0224            _logger.LogWarning(e, "Failed to get trigger data for activity {ActivityId}", trigger.Id);
 0225        }
 226
 0227        return new(0);
 287228    }
 229}