< Summary

Information
Class: Elsa.Workflows.Runtime.Middleware.Activities.BackgroundActivityInvokerMiddleware
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs
Line coverage
87%
Covered lines: 117
Uncovered lines: 16
Coverable lines: 133
Total lines: 260
Line coverage: 87.9%
Branch coverage
61%
Covered branches: 42
Total branches: 68
Branch coverage: 61.7%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs

#LineLine coverage
 1using System.Text.Json;
 2using Elsa.Extensions;
 3using Elsa.Mediator.Contracts;
 4using Elsa.Workflows.Attributes;
 5using Elsa.Workflows.CommitStates;
 6using Elsa.Workflows.Middleware.Activities;
 7using Elsa.Workflows.Models;
 8using Elsa.Workflows.Options;
 9using Elsa.Workflows.Pipelines.ActivityExecution;
 10using Elsa.Workflows.Runtime.Notifications;
 11using Elsa.Workflows.Runtime.Stimuli;
 12using JetBrains.Annotations;
 13using Microsoft.Extensions.Logging;
 14
 15namespace Elsa.Workflows.Runtime.Middleware.Activities;
 16
 17/// <summary>
 18/// Collects the current activity for scheduling for execution from a background job if the activity is of kind <see cre
 19/// </summary>
 20[UsedImplicitly]
 21public class BackgroundActivityInvokerMiddleware(
 22    ActivityMiddlewareDelegate next,
 23    ILogger<BackgroundActivityInvokerMiddleware> logger,
 24    IIdentityGenerator identityGenerator,
 25    IBackgroundActivityScheduler backgroundActivityScheduler,
 26    ICommitStrategyRegistry commitStrategyRegistry,
 27    IMediator mediator)
 43628    : DefaultActivityInvokerMiddleware(next, commitStrategyRegistry, logger)
 29{
 3430    internal static string GetBackgroundActivityOutputKey(string activityNodeId) => $"__BackgroundActivityOutput:{activi
 3431    internal static string GetBackgroundActivityOutcomesKey(string activityNodeId) => $"__BackgroundActivityOutcomes:{ac
 3432    internal static string GetBackgroundActivityCompletedKey(string activityNodeId) => $"__BackgroundActivityCompleted:{
 3433    internal static string GetBackgroundActivityJournalDataKey(string activityNodeId) => $"__BackgroundActivityJournalDa
 3434    internal static string GetBackgroundActivityScheduledActivitiesKey(string activityNodeId) => $"__BackgroundActivityS
 3435    internal static string GetBackgroundActivityBookmarksKey(string activityNodeId) => $"__BackgroundActivityBookmarks:{
 3436    internal static string GetBackgroundActivityPropertiesKey(string activityNodeId) => $"__BackgroundActivityProperties
 37    internal const string BackgroundActivityBookmarkName = "BackgroundActivity";
 38
 39    /// <inheritdoc />
 40    protected override async ValueTask ExecuteActivityAsync(ActivityExecutionContext context)
 41    {
 365342        var shouldRunInBackground = GetShouldRunInBackground(context);
 43
 365344        if (shouldRunInBackground)
 345            await ScheduleBackgroundActivityAsync(context);
 46        else
 47        {
 365048            await base.ExecuteActivityAsync(context);
 49
 50            // This part is either executed from the background or in the foreground when the activity is resumed.
 364251            var isResuming = !GetIsBackgroundExecution(context) && context.ActivityDescriptor.Kind is ActivityKind.Task 
 364252            if (isResuming)
 53            {
 3154                CaptureOutputIfAny(context);
 3155                CaptureJournalData(context);
 3156                CaptureBookmarkData(context);
 3157                CapturePropertiesIfAny(context);
 3158                await CompleteBackgroundActivityOutcomesAsync(context);
 3159                await CompleteBackgroundActivityAsync(context);
 3160                await CompleteBackgroundActivityScheduledActivitiesAsync(context);
 3161                await mediator.SendAsync(new BackgroundActivityExecutionCompleted(context), context.CancellationToken);
 62            }
 63        }
 364564    }
 65
 66    /// <summary>
 67    /// Schedules the current activity for execution in the background.
 68    /// </summary>
 69    private async Task ScheduleBackgroundActivityAsync(ActivityExecutionContext context)
 70    {
 371        var cancellationToken = context.CancellationToken;
 372        var workflowInstanceId = context.WorkflowExecutionContext.Id;
 373        var activityNodeId = context.NodeId;
 374        var bookmarkId = identityGenerator.GenerateId();
 375        var scheduledBackgroundActivity = new ScheduledBackgroundActivity(workflowInstanceId, activityNodeId, bookmarkId
 376        var jobId = await backgroundActivityScheduler.CreateAsync(scheduledBackgroundActivity, cancellationToken);
 377        var stimulus = new BackgroundActivityStimulus
 378        {
 379            JobId = jobId
 380        };
 381        var bookmarkOptions = new CreateBookmarkArgs
 382        {
 383            BookmarkId = bookmarkId,
 384            BookmarkName = BackgroundActivityBookmarkName,
 385            Stimulus = stimulus,
 386            AutoComplete = false
 387        };
 388        context.CreateBookmark(bookmarkOptions);
 89
 390        context.DeferTask(async () =>
 391        {
 392            await backgroundActivityScheduler.ScheduleAsync(jobId, cancellationToken);
 693        });
 394    }
 95
 96    /// <summary>
 97    /// Determines whether the current activity should be executed in the background.
 98    /// </summary>
 99    private static bool GetShouldRunInBackground(ActivityExecutionContext context)
 100    {
 3653101        var activity = context.Activity;
 3653102        var activityDescriptor = context.ActivityDescriptor;
 3653103        var kind = activityDescriptor.Kind;
 104
 3653105        return !GetIsBackgroundExecution(context)
 3653106               && context.WorkflowExecutionContext.ExecuteDelegate == null
 3653107               && (kind is ActivityKind.Job || GetTaskRunAsynchronously(context));
 108    }
 109
 110    private static bool GetTaskRunAsynchronously(ActivityExecutionContext context)
 111    {
 3600112        var activity = context.Activity;
 3600113        var activityDescriptor = context.ActivityDescriptor;
 3600114        var kind = activityDescriptor.Kind;
 115
 3600116        if (kind is not ActivityKind.Task)
 3580117            return false;
 118
 20119        var runAsynchronously = activity.GetRunAsynchronously();
 120
 20121        if (runAsynchronously is null)
 122        {
 8123            var taskActivityAttribute = activityDescriptor.Attributes.OfType<TaskActivityAttribute>().FirstOrDefault();
 124
 8125            return taskActivityAttribute is { RunAsynchronously: true };
 126        }
 127
 12128        return (bool)runAsynchronously;
 129    }
 130
 7295131    private static bool GetIsBackgroundExecution(ActivityExecutionContext context) => context.TransientProperties.Contai
 132
 133    /// <summary>
 134    /// If the input contains captured output from the background activity invoker, apply that to the execution context.
 135    /// </summary>
 136    private static void CaptureOutputIfAny(ActivityExecutionContext context)
 137    {
 31138        var activity = context.Activity;
 31139        var inputKey = GetBackgroundActivityOutputKey(activity.NodeId);
 31140        var capturedOutput = context.WorkflowExecutionContext.GetProperty<IDictionary<string, object>>(inputKey);
 141
 31142        context.WorkflowExecutionContext.Properties.Remove(inputKey);
 143
 31144        if (capturedOutput == null)
 28145            return;
 146
 18147        foreach (var outputEntry in capturedOutput)
 148        {
 15149            var outputDescriptor = context.ActivityDescriptor.Outputs.FirstOrDefault(x => x.Name == outputEntry.Key);
 150
 6151            if (outputDescriptor == null)
 152                continue;
 153
 6154            var output = (Output?)outputDescriptor.ValueGetter(activity);
 6155            context.Set(output, outputEntry.Value, outputDescriptor.Name);
 156        }
 3157    }
 158
 159    private void CaptureJournalData(ActivityExecutionContext context)
 160    {
 31161        var activity = context.Activity;
 31162        var journalDataKey = GetBackgroundActivityJournalDataKey(activity.NodeId);
 31163        var journalData = context.WorkflowExecutionContext.GetProperty<IDictionary<string, object>>(journalDataKey);
 164
 31165        context.WorkflowExecutionContext.Properties.Remove(journalDataKey);
 166
 31167        if (journalData == null)
 28168            return;
 169
 6170        foreach (var journalEntry in journalData)
 0171            context.JournalData[journalEntry.Key] = journalEntry.Value;
 3172    }
 173
 174    private void CaptureBookmarkData(ActivityExecutionContext context)
 175    {
 31176        var activity = context.Activity;
 31177        var bookmarksKey = GetBackgroundActivityBookmarksKey(activity.NodeId);
 31178        var bookmarks = context.WorkflowExecutionContext.GetProperty<ICollection<Bookmark>>(bookmarksKey);
 31179        if (bookmarks != null)
 180        {
 3181            context.AddBookmarks(bookmarks);
 182        }
 183
 31184        context.WorkflowExecutionContext.Properties.Remove(bookmarksKey);
 31185    }
 186
 187    private void CapturePropertiesIfAny(ActivityExecutionContext context)
 188    {
 31189        var activity = context.Activity;
 31190        var propertiesKey = GetBackgroundActivityPropertiesKey(activity.NodeId);
 31191        var capturedProperties = context.WorkflowExecutionContext.GetProperty<IDictionary<string, object>>(propertiesKey
 192
 31193        context.WorkflowExecutionContext.Properties.Remove(propertiesKey);
 194
 31195        if (capturedProperties == null)
 28196            return;
 197
 12198        foreach (var property in capturedProperties)
 3199            context.Properties[property.Key] = property.Value;
 3200    }
 201
 202    private async Task CompleteBackgroundActivityOutcomesAsync(ActivityExecutionContext context)
 203    {
 31204        var outcomesKey = GetBackgroundActivityOutcomesKey(context.NodeId);
 31205        var outcomes = context.WorkflowExecutionContext.GetProperty<ICollection<string>>(outcomesKey);
 206
 31207        if (outcomes != null)
 208        {
 0209            await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray());
 210        }
 211
 212        // Remove the outcomes from the workflow execution context.
 31213        context.WorkflowExecutionContext.Properties.Remove(outcomesKey);
 31214    }
 215
 216    private async Task CompleteBackgroundActivityAsync(ActivityExecutionContext context)
 217    {
 31218        var completedKey = GetBackgroundActivityCompletedKey(context.NodeId);
 31219        var completed = context.WorkflowExecutionContext.GetProperty<bool?>(completedKey);
 220
 31221        if (completed is true)
 222        {
 3223            await context.CompleteActivityAsync();
 224        }
 225
 226        // Remove the outcomes from the workflow execution context.
 31227        context.WorkflowExecutionContext.Properties.Remove(completedKey);
 31228    }
 229
 230    private async Task CompleteBackgroundActivityScheduledActivitiesAsync(ActivityExecutionContext context)
 231    {
 31232        var scheduledActivitiesKey = GetBackgroundActivityScheduledActivitiesKey(context.NodeId);
 31233        var scheduledActivitiesJson = context.WorkflowExecutionContext.GetProperty<string>(scheduledActivitiesKey);
 31234        var scheduledActivities = scheduledActivitiesJson != null ? JsonSerializer.Deserialize<ICollection<ScheduledActi
 235
 31236        if (scheduledActivities != null)
 237        {
 6238            foreach (var scheduledActivity in scheduledActivities)
 239            {
 0240                var activityNode = scheduledActivity.ActivityNodeId != null ? context.WorkflowExecutionContext.FindActiv
 0241                var owner = scheduledActivity.OwnerActivityInstanceId != null ? context.WorkflowExecutionContext.Activit
 0242                var options = scheduledActivity.Options != null
 0243                    ? new ScheduleWorkOptions
 0244                    {
 0245                        ExistingActivityExecutionContext = scheduledActivity.Options.ExistingActivityInstanceId != null 
 0246                        Variables = scheduledActivity.Options?.Variables,
 0247                        CompletionCallback = !string.IsNullOrEmpty(scheduledActivity.Options?.CompletionCallback) && own
 0248                        PreventDuplicateScheduling = scheduledActivity.Options?.PreventDuplicateScheduling ?? false,
 0249                        Input = scheduledActivity.Options?.Input,
 0250                        Tag = scheduledActivity.Options?.Tag
 0251                    }
 0252                    : default;
 0253                await context.ScheduleActivityAsync(activityNode, owner, options);
 254            }
 255        }
 256
 257        // Remove the scheduled activities from the workflow execution context.
 31258        context.WorkflowExecutionContext.Properties.Remove(scheduledActivitiesKey);
 31259    }
 260}