< Summary

Information
Class: Elsa.Workflows.Runtime.Middleware.Activities.BackgroundActivityInvokerMiddleware
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs
Line coverage
87%
Covered lines: 114
Uncovered lines: 16
Coverable lines: 130
Total lines: 254
Line coverage: 87.6%
Branch coverage
62%
Covered branches: 41
Total branches: 66
Branch coverage: 62.1%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs

#LineLine coverage
 1using System.Text.Json;
 2using Elsa.Extensions;
 3using Elsa.Mediator.Contracts;
 4using Elsa.Workflows.Attributes;
 5using Elsa.Workflows.CommitStates;
 6using Elsa.Workflows.Middleware.Activities;
 7using Elsa.Workflows.Models;
 8using Elsa.Workflows.Options;
 9using Elsa.Workflows.Pipelines.ActivityExecution;
 10using Elsa.Workflows.Runtime.Notifications;
 11using Elsa.Workflows.Runtime.Stimuli;
 12using JetBrains.Annotations;
 13using Microsoft.Extensions.Logging;
 14using Microsoft.Extensions.Options;
 15
 16namespace Elsa.Workflows.Runtime.Middleware.Activities;
 17
 18/// <summary>
 19/// Collects the current activity for scheduling for execution from a background job if the activity is of kind <see cre
 20/// </summary>
 21[UsedImplicitly]
 22public class BackgroundActivityInvokerMiddleware(
 23    ActivityMiddlewareDelegate next,
 24    ILogger<BackgroundActivityInvokerMiddleware> logger,
 25    IIdentityGenerator identityGenerator,
 26    IBackgroundActivityScheduler backgroundActivityScheduler,
 27    ICommitStrategyRegistry commitStrategyRegistry,
 28    IMediator mediator,
 29    IOptions<CommitStateOptions> commitStateOptions)
 52530    : DefaultActivityInvokerMiddleware(next, commitStrategyRegistry, commitStateOptions, logger)
 31{
 3432    internal static string GetBackgroundActivityOutputKey(string activityNodeId) => $"__BackgroundActivityOutput:{activi
 3433    internal static string GetBackgroundActivityOutcomesKey(string activityNodeId) => $"__BackgroundActivityOutcomes:{ac
 3434    internal static string GetBackgroundActivityCompletedKey(string activityNodeId) => $"__BackgroundActivityCompleted:{
 3435    internal static string GetBackgroundActivityJournalDataKey(string activityNodeId) => $"__BackgroundActivityJournalDa
 3436    internal static string GetBackgroundActivityScheduledActivitiesKey(string activityNodeId) => $"__BackgroundActivityS
 3437    internal static string GetBackgroundActivityBookmarksKey(string activityNodeId) => $"__BackgroundActivityBookmarks:{
 3438    internal static string GetBackgroundActivityPropertiesKey(string activityNodeId) => $"__BackgroundActivityProperties
 39    internal const string BackgroundActivityBookmarkName = "BackgroundActivity";
 40
 41    /// <inheritdoc />
 42    protected override async ValueTask ExecuteActivityAsync(ActivityExecutionContext context)
 43    {
 406344        var shouldRunInBackground = GetShouldRunInBackground(context);
 45
 406346        if (shouldRunInBackground)
 347            await ScheduleBackgroundActivityAsync(context);
 48        else
 49        {
 406050            await base.ExecuteActivityAsync(context);
 51
 52            // This part is either executed from the background or in the foreground when the activity is resumed.
 405053            var isResuming = !GetIsBackgroundExecution(context) && context.ActivityDescriptor.Kind is ActivityKind.Task 
 405054            if (isResuming)
 55            {
 3156                CaptureOutputIfAny(context);
 3157                CaptureJournalData(context);
 3158                CaptureBookmarkData(context);
 3159                CapturePropertiesIfAny(context);
 3160                await CompleteBackgroundActivityOutcomesAsync(context);
 3161                await CompleteBackgroundActivityAsync(context);
 3162                await CompleteBackgroundActivityScheduledActivitiesAsync(context);
 3163                await mediator.SendAsync(new BackgroundActivityExecutionCompleted(context), context.CancellationToken);
 64            }
 65        }
 405366    }
 67
 68    /// <summary>
 69    /// Schedules the current activity for execution in the background.
 70    /// </summary>
 71    private async Task ScheduleBackgroundActivityAsync(ActivityExecutionContext context)
 72    {
 373        var cancellationToken = context.CancellationToken;
 374        var workflowInstanceId = context.WorkflowExecutionContext.Id;
 375        var activityNodeId = context.NodeId;
 376        var bookmarkId = identityGenerator.GenerateId();
 377        var scheduledBackgroundActivity = new ScheduledBackgroundActivity(workflowInstanceId, activityNodeId, bookmarkId
 378        var jobId = await backgroundActivityScheduler.CreateAsync(scheduledBackgroundActivity, cancellationToken);
 379        var stimulus = new BackgroundActivityStimulus
 380        {
 381            JobId = jobId
 382        };
 383        var bookmarkOptions = new CreateBookmarkArgs
 384        {
 385            BookmarkId = bookmarkId,
 386            BookmarkName = BackgroundActivityBookmarkName,
 387            Stimulus = stimulus,
 388            AutoComplete = false
 389        };
 390        context.CreateBookmark(bookmarkOptions);
 91
 392        context.DeferTask(async () =>
 393        {
 394            await backgroundActivityScheduler.ScheduleAsync(jobId, cancellationToken);
 695        });
 396    }
 97
 98    /// <summary>
 99    /// Determines whether the current activity should be executed in the background.
 100    /// </summary>
 101    private static bool GetShouldRunInBackground(ActivityExecutionContext context)
 102    {
 4063103        var activity = context.Activity;
 4063104        var activityDescriptor = context.ActivityDescriptor;
 4063105        var kind = activityDescriptor.Kind;
 106
 4063107        return !GetIsBackgroundExecution(context)
 4063108               && context.WorkflowExecutionContext.ExecuteDelegate == null
 4063109               && (kind is ActivityKind.Job || GetTaskRunAsynchronously(context));
 110    }
 111
 112    private static bool GetTaskRunAsynchronously(ActivityExecutionContext context)
 113    {
 4007114        var activity = context.Activity;
 4007115        var activityDescriptor = context.ActivityDescriptor;
 4007116        var kind = activityDescriptor.Kind;
 117
 4007118        if (kind is not ActivityKind.Task)
 3987119            return false;
 120
 20121        var runAsynchronously = activity.GetRunAsynchronously();
 20122        return runAsynchronously ?? activityDescriptor.RunAsynchronously;
 123    }
 124
 8113125    private static bool GetIsBackgroundExecution(ActivityExecutionContext context) => context.TransientProperties.Contai
 126
 127    /// <summary>
 128    /// If the input contains captured output from the background activity invoker, apply that to the execution context.
 129    /// </summary>
 130    private static void CaptureOutputIfAny(ActivityExecutionContext context)
 131    {
 31132        var activity = context.Activity;
 31133        var inputKey = GetBackgroundActivityOutputKey(activity.NodeId);
 31134        var capturedOutput = context.WorkflowExecutionContext.GetProperty<IDictionary<string, object>>(inputKey);
 135
 31136        context.WorkflowExecutionContext.Properties.Remove(inputKey);
 137
 31138        if (capturedOutput == null)
 28139            return;
 140
 18141        foreach (var outputEntry in capturedOutput)
 142        {
 15143            var outputDescriptor = context.ActivityDescriptor.Outputs.FirstOrDefault(x => x.Name == outputEntry.Key);
 144
 6145            if (outputDescriptor == null)
 146                continue;
 147
 6148            var output = (Output?)outputDescriptor.ValueGetter(activity);
 6149            context.Set(output, outputEntry.Value, outputDescriptor.Name);
 150        }
 3151    }
 152
 153    private void CaptureJournalData(ActivityExecutionContext context)
 154    {
 31155        var activity = context.Activity;
 31156        var journalDataKey = GetBackgroundActivityJournalDataKey(activity.NodeId);
 31157        var journalData = context.WorkflowExecutionContext.GetProperty<IDictionary<string, object>>(journalDataKey);
 158
 31159        context.WorkflowExecutionContext.Properties.Remove(journalDataKey);
 160
 31161        if (journalData == null)
 28162            return;
 163
 6164        foreach (var journalEntry in journalData)
 0165            context.JournalData[journalEntry.Key] = journalEntry.Value;
 3166    }
 167
 168    private void CaptureBookmarkData(ActivityExecutionContext context)
 169    {
 31170        var activity = context.Activity;
 31171        var bookmarksKey = GetBackgroundActivityBookmarksKey(activity.NodeId);
 31172        var bookmarks = context.WorkflowExecutionContext.GetProperty<ICollection<Bookmark>>(bookmarksKey);
 31173        if (bookmarks != null)
 174        {
 3175            context.AddBookmarks(bookmarks);
 176        }
 177
 31178        context.WorkflowExecutionContext.Properties.Remove(bookmarksKey);
 31179    }
 180
 181    private void CapturePropertiesIfAny(ActivityExecutionContext context)
 182    {
 31183        var activity = context.Activity;
 31184        var propertiesKey = GetBackgroundActivityPropertiesKey(activity.NodeId);
 31185        var capturedProperties = context.WorkflowExecutionContext.GetProperty<IDictionary<string, object>>(propertiesKey
 186
 31187        context.WorkflowExecutionContext.Properties.Remove(propertiesKey);
 188
 31189        if (capturedProperties == null)
 28190            return;
 191
 12192        foreach (var property in capturedProperties)
 3193            context.Properties[property.Key] = property.Value;
 3194    }
 195
 196    private async Task CompleteBackgroundActivityOutcomesAsync(ActivityExecutionContext context)
 197    {
 31198        var outcomesKey = GetBackgroundActivityOutcomesKey(context.NodeId);
 31199        var outcomes = context.WorkflowExecutionContext.GetProperty<ICollection<string>>(outcomesKey);
 200
 31201        if (outcomes != null)
 202        {
 0203            await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray());
 204        }
 205
 206        // Remove the outcomes from the workflow execution context.
 31207        context.WorkflowExecutionContext.Properties.Remove(outcomesKey);
 31208    }
 209
 210    private async Task CompleteBackgroundActivityAsync(ActivityExecutionContext context)
 211    {
 31212        var completedKey = GetBackgroundActivityCompletedKey(context.NodeId);
 31213        var completed = context.WorkflowExecutionContext.GetProperty<bool?>(completedKey);
 214
 31215        if (completed is true)
 216        {
 3217            await context.CompleteActivityAsync();
 218        }
 219
 220        // Remove the outcomes from the workflow execution context.
 31221        context.WorkflowExecutionContext.Properties.Remove(completedKey);
 31222    }
 223
 224    private async Task CompleteBackgroundActivityScheduledActivitiesAsync(ActivityExecutionContext context)
 225    {
 31226        var scheduledActivitiesKey = GetBackgroundActivityScheduledActivitiesKey(context.NodeId);
 31227        var scheduledActivitiesJson = context.WorkflowExecutionContext.GetProperty<string>(scheduledActivitiesKey);
 31228        var scheduledActivities = scheduledActivitiesJson != null ? JsonSerializer.Deserialize<ICollection<ScheduledActi
 229
 31230        if (scheduledActivities != null)
 231        {
 6232            foreach (var scheduledActivity in scheduledActivities)
 233            {
 0234                var activityNode = scheduledActivity.ActivityNodeId != null ? context.WorkflowExecutionContext.FindActiv
 0235                var owner = scheduledActivity.OwnerActivityInstanceId != null ? context.WorkflowExecutionContext.Activit
 0236                var options = scheduledActivity.Options != null
 0237                    ? new ScheduleWorkOptions
 0238                    {
 0239                        ExistingActivityExecutionContext = scheduledActivity.Options.ExistingActivityInstanceId != null 
 0240                        Variables = scheduledActivity.Options?.Variables,
 0241                        CompletionCallback = !string.IsNullOrEmpty(scheduledActivity.Options?.CompletionCallback) && own
 0242                        PreventDuplicateScheduling = scheduledActivity.Options?.PreventDuplicateScheduling ?? false,
 0243                        Input = scheduledActivity.Options?.Input,
 0244                        Tag = scheduledActivity.Options?.Tag
 0245                    }
 0246                    : null;
 0247                await context.ScheduleActivityAsync(activityNode, owner, options);
 248            }
 249        }
 250
 251        // Remove the scheduled activities from the workflow execution context.
 31252        context.WorkflowExecutionContext.Properties.Remove(scheduledActivitiesKey);
 31253    }
 254}