< Summary

Information
Class: Elsa.Workflows.Management.Activities.WorkflowDefinitionActivity.WorkflowDefinitionActivity
Assembly: Elsa.Workflows.Management
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Management/Activities/WorkflowDefinitionActivity/WorkflowDefinitionActivity.cs
Line coverage
90%
Covered lines: 93
Uncovered lines: 10
Coverable lines: 103
Total lines: 216
Line coverage: 90.2%
Branch coverage
61%
Covered branches: 26
Total branches: 42
Branch coverage: 61.9%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Management/Activities/WorkflowDefinitionActivity/WorkflowDefinitionActivity.cs

#LineLine coverage
 1using System.ComponentModel;
 2using Elsa.Common.Models;
 3using Elsa.Extensions;
 4using Elsa.Workflows.Activities;
 5using Elsa.Workflows.Management.Entities;
 6using Elsa.Workflows.Management.Filters;
 7using Elsa.Workflows.Memory;
 8using Elsa.Workflows.Models;
 9using Elsa.Workflows.Signals;
 10using Microsoft.Extensions.DependencyInjection;
 11using Microsoft.Extensions.Logging;
 12
 13namespace Elsa.Workflows.Management.Activities.WorkflowDefinitionActivity;
 14
 15/// <summary>
 16/// Loads and executes an <see cref="WorkflowDefinition"/>.
 17/// </summary>
 18[Browsable(false)]
 19public class WorkflowDefinitionActivity : Composite, IInitializable
 20{
 82421    private bool IsInitialized => Root.Id != null!;
 22
 23    /// <summary>
 24    /// The definition ID of the workflow to schedule for execution.
 25    /// </summary>
 180926    public string WorkflowDefinitionId { get; set; } = default!;
 27
 28    /// <summary>
 29    /// The specific version ID of the workflow to schedule for execution. If not set, the <see cref="Version"/> number 
 30    /// </summary>
 233931    public string? WorkflowDefinitionVersionId { get; set; }
 32
 33    /// <summary>
 34    /// The latest published version number set by the provider. Tooling uses this to let the user know that a newer ver
 35    /// </summary>
 123836    public int LatestAvailablePublishedVersion { get; set; }
 37
 38    /// <summary>
 39    /// The latest published version ID set by the provider. Tooling uses this to let the user know that a newer version
 40    /// </summary>
 123841    public string? LatestAvailablePublishedVersionId { get; set; }
 42
 43    async ValueTask IInitializable.InitializeAsync(InitializationContext context)
 44    {
 45        // This is not just for efficiency but also a necessity to avoid potential race conditions.
 46        // Such conditions can occur when multiple threads are simultaneously creating consuming workflows,
 47        // especially when cached workflows are being updated during the graph construction process.
 82448        if (IsInitialized)
 27449            return;
 50
 55051        var serviceProvider = context.ServiceProvider;
 55052        var cancellationToken = context.CancellationToken;
 53
 54        // Find the workflow definition and not the graph; the graph must be computed at runtime, since NodeIds will var
 55055        var workflowDefinition = await GetWorkflowDefinitionAsync(serviceProvider, cancellationToken);
 56
 55057        if (workflowDefinition == null)
 058            throw new Exception($"Could not find workflow definition with ID {WorkflowDefinitionId}.");
 59
 55060        var activityDescriptor = await FindActivityDescriptorAsync(serviceProvider);
 61
 55062        if (activityDescriptor == null)
 63        {
 064            var logger = serviceProvider.GetRequiredService<ILogger<WorkflowDefinitionActivity>>();
 065            logger.LogWarning("Could not find activity descriptor for activity type {ActivityType}", Type);
 66        }
 67        else
 68        {
 69            // Declare input and output variables.
 58370            DeclareInputAsVariables(activityDescriptor, (_, variable) => Variables.Declare(variable));
 65371            DeclareOutputAsVariables(activityDescriptor, (_, variable) => Variables.Declare(variable));
 72        }
 73
 55074        var workflowDefinitionService = serviceProvider.GetRequiredService<IWorkflowDefinitionService>();
 55075        var workflowGraph = await workflowDefinitionService.MaterializeWorkflowAsync(workflowDefinition, cancellationTok
 76
 77        // Set the root activity.
 55078        Root = workflowGraph.Workflow;
 82479    }
 80
 81    /// <inheritdoc />
 82    protected override async ValueTask ExecuteAsync(ActivityExecutionContext context)
 83    {
 40384        await CopyInputOutputToVariablesAsync(context);
 40385        await context.ScheduleActivityAsync(Root, OnChildCompletedAsync);
 40386    }
 87
 88    private async ValueTask OnChildCompletedAsync(ActivityCompletedContext context)
 89    {
 40390        var activityExecutionContext = context.TargetContext;
 91
 92        // Do we have a "complete composite" signal that triggered the completion?
 40393        var completeCompositeSignal = context.WorkflowExecutionContext.TransientProperties.TryGetValue(nameof(CompleteCo
 94
 95        // If we do, make sure to remove it from the transient properties.
 40396        if (completeCompositeSignal != null)
 97        {
 098            var logger = context.GetRequiredService<ILogger<WorkflowDefinitionActivity>>();
 099            logger.LogDebug("Received a complete composite signal and removing it from the transient properties");
 0100            context.WorkflowExecutionContext.TransientProperties.Remove(nameof(CompleteCompositeSignal));
 101        }
 102
 103        // Copy any collected outputs into the synthetic properties.
 812104        foreach (var outputDescriptor in activityExecutionContext.ActivityDescriptor.Outputs)
 105        {
 3106            var output = (Output?)outputDescriptor.ValueGetter(activityExecutionContext.Activity);
 107            // If direct output mapping is used, we can read the output value directly from the memory.
 3108            var value = activityExecutionContext.Get(output) ?? activityExecutionContext.Get(outputDescriptor.Name);
 109
 110            // Make sure to select a parent scope to avoid naming collisions between outputs defined on the current scop
 3111            var parentActivityExecutionContext = activityExecutionContext.ParentActivityExecutionContext?.GetAncestors()
 6112                .Any(x => x.ActivityDescriptor.Outputs.Any(y => y.Name == outputDescriptor.Name)) == true
 3113                ? activityExecutionContext.ParentActivityExecutionContext ?? activityExecutionContext
 3114                : activityExecutionContext;
 115
 3116            parentActivityExecutionContext.Set(output, value, outputDescriptor.Name);
 117        }
 118
 119        // Complete this activity with the signal value.
 403120        await activityExecutionContext.CompleteActivityAsync(completeCompositeSignal?.Value);
 403121    }
 122
 123    private async Task CopyInputOutputToVariablesAsync(ActivityExecutionContext context)
 124    {
 403125        var serviceProvider = context.GetRequiredService<IServiceProvider>();
 403126        var activityDescriptor = await FindActivityDescriptorAsync(serviceProvider);
 127
 403128        if (activityDescriptor == null)
 129        {
 0130            var logger = serviceProvider.GetRequiredService<ILogger<WorkflowDefinitionActivity>>();
 0131            logger.LogWarning("Could not find activity descriptor for activity type {ActivityType}", Type);
 0132            return;
 133        }
 134
 403135        DeclareInputAsVariables(activityDescriptor, (descriptor, variable) =>
 403136        {
 1137            var inputName = descriptor.Name;
 1138            var input = SyntheticProperties.TryGetValue(inputName, out var inputValue) ? (Input?)inputValue : default;
 1139            var evaluatedExpression = input != null ? context.Get(input.MemoryBlockReference()) : default;
 403140
 1141            context.ExpressionExecutionContext.Memory.Declare(variable);
 1142            variable.Set(context, evaluatedExpression);
 404143        });
 144
 406145        DeclareOutputAsVariables(activityDescriptor, (_, variable) => context.ExpressionExecutionContext.Memory.Declare(
 403146    }
 147
 148    private void DeclareInputAsVariables(ActivityDescriptor activityDescriptor, Action<InputDescriptor, Variable> config
 149    {
 1974150        foreach (var inputDescriptor in activityDescriptor.Inputs)
 151        {
 34152            var inputName = inputDescriptor.Name;
 34153            var unsafeInputName = PropertyNameHelper.GetUnsafePropertyName(typeof(WorkflowDefinitionActivity), inputName
 34154            var variableType = typeof(Variable<>).MakeGenericType(inputDescriptor.Type);
 34155            var variable = (Variable)Activator.CreateInstance(variableType)!;
 156
 34157            variable.Id = unsafeInputName;
 34158            variable.Name = unsafeInputName;
 34159            variable.StorageDriverType = inputDescriptor.StorageDriverType;
 160
 34161            configureVariable(inputDescriptor, variable);
 162        }
 953163    }
 164
 165    private void DeclareOutputAsVariables(ActivityDescriptor activityDescriptor, Action<OutputDescriptor, Variable> conf
 166    {
 2118167        foreach (var outputDescriptor in activityDescriptor.Outputs)
 168        {
 106169            var outputName = outputDescriptor.Name;
 106170            var unsafeOutputName = PropertyNameHelper.GetUnsafePropertyName(typeof(WorkflowDefinitionActivity), outputNa
 171
 106172            var variable = new Variable
 106173            {
 106174                Id = unsafeOutputName,
 106175                Name = unsafeOutputName
 106176            };
 177
 106178            configureVariable(outputDescriptor, variable);
 179        }
 953180    }
 181
 182    private async Task<WorkflowDefinition?> GetWorkflowDefinitionAsync(IServiceProvider serviceProvider, CancellationTok
 183    {
 550184        var workflowDefinitionService = serviceProvider.GetRequiredService<IWorkflowDefinitionService>();
 550185        var filter = new WorkflowDefinitionFilter
 550186        {
 550187            DefinitionId = WorkflowDefinitionId
 550188        };
 189
 550190        if (!string.IsNullOrWhiteSpace(WorkflowDefinitionVersionId))
 550191            filter.Id = WorkflowDefinitionVersionId;
 192        else
 0193            filter.VersionOptions = VersionOptions.SpecificVersion(Version);
 194
 550195        var workflowDefinition =
 550196            await workflowDefinitionService.FindWorkflowDefinitionAsync(filter, cancellationToken)
 550197            ?? (await workflowDefinitionService.FindWorkflowDefinitionAsync(new WorkflowDefinitionFilter
 550198                {
 550199                    DefinitionId = WorkflowDefinitionId,
 550200                    VersionOptions = VersionOptions.Published
 550201                }, cancellationToken)
 550202                ?? await workflowDefinitionService.FindWorkflowDefinitionAsync(new WorkflowDefinitionFilter
 550203                {
 550204                    DefinitionId = WorkflowDefinitionId,
 550205                    VersionOptions = VersionOptions.Latest
 550206                }, cancellationToken));
 207
 550208        return workflowDefinition;
 550209    }
 210
 211    private async Task<ActivityDescriptor?> FindActivityDescriptorAsync(IServiceProvider serviceProvider)
 212    {
 953213        var activityRegistryLookup = serviceProvider.GetRequiredService<IActivityRegistryLookupService>();
 953214        return await activityRegistryLookup.FindAsync(Type, Version) ?? await activityRegistryLookup.FindAsync(Type);
 953215    }
 216}