< Summary

Information
Class: Elsa.Workflows.Runtime.Middleware.Activities.BackgroundActivityInvokerMiddleware
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs
Line coverage
87%
Covered lines: 114
Uncovered lines: 16
Coverable lines: 130
Total lines: 254
Line coverage: 87.6%
Branch coverage
63%
Covered branches: 42
Total branches: 66
Branch coverage: 63.6%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Middleware/Activities/BackgroundActivityInvokerMiddleware.cs

#LineLine coverage
 1using System.Text.Json;
 2using Elsa.Extensions;
 3using Elsa.Mediator.Contracts;
 4using Elsa.Workflows.Attributes;
 5using Elsa.Workflows.CommitStates;
 6using Elsa.Workflows.Middleware.Activities;
 7using Elsa.Workflows.Models;
 8using Elsa.Workflows.Options;
 9using Elsa.Workflows.Pipelines.ActivityExecution;
 10using Elsa.Workflows.Runtime.Notifications;
 11using Elsa.Workflows.Runtime.Stimuli;
 12using JetBrains.Annotations;
 13using Microsoft.Extensions.Logging;
 14using Microsoft.Extensions.Options;
 15
 16namespace Elsa.Workflows.Runtime.Middleware.Activities;
 17
 18/// <summary>
 19/// Collects the current activity for scheduling for execution from a background job if the activity is of kind <see cre
 20/// </summary>
 21[UsedImplicitly]
 22public class BackgroundActivityInvokerMiddleware(
 23    ActivityMiddlewareDelegate next,
 24    ILogger<BackgroundActivityInvokerMiddleware> logger,
 25    IIdentityGenerator identityGenerator,
 26    IBackgroundActivityScheduler backgroundActivityScheduler,
 27    ICommitStrategyRegistry commitStrategyRegistry,
 28    IMediator mediator,
 29    IOptions<CommitStateOptions> commitStateOptions)
 52430    : DefaultActivityInvokerMiddleware(next, commitStrategyRegistry, commitStateOptions, logger)
 31{
 3532    internal static string GetBackgroundActivityOutputKey(string activityNodeId) => $"__BackgroundActivityOutput:{activi
 3533    internal static string GetBackgroundActivityOutcomesKey(string activityNodeId) => $"__BackgroundActivityOutcomes:{ac
 3534    internal static string GetBackgroundActivityCompletedKey(string activityNodeId) => $"__BackgroundActivityCompleted:{
 3535    internal static string GetBackgroundActivityJournalDataKey(string activityNodeId) => $"__BackgroundActivityJournalDa
 3536    internal static string GetBackgroundActivityScheduledActivitiesKey(string activityNodeId) => $"__BackgroundActivityS
 3537    internal static string GetBackgroundActivityBookmarksKey(string activityNodeId) => $"__BackgroundActivityBookmarks:{
 3538    internal static string GetBackgroundActivityPropertiesKey(string activityNodeId) => $"__BackgroundActivityProperties
 39    internal const string BackgroundActivityBookmarkName = "BackgroundActivity";
 40
 41    /// <inheritdoc />
 42    protected override async ValueTask ExecuteActivityAsync(ActivityExecutionContext context)
 43    {
 406944        var shouldRunInBackground = GetShouldRunInBackground(context);
 45
 406946        if (shouldRunInBackground)
 347            await ScheduleBackgroundActivityAsync(context);
 48        else
 49        {
 406650            await base.ExecuteActivityAsync(context);
 51
 52            // This part is either executed from the background or in the foreground when the activity is resumed.
 405753            var isResuming = !GetIsBackgroundExecution(context) && context.ActivityDescriptor.Kind is ActivityKind.Task 
 405754            if (isResuming)
 55            {
 3256                CaptureOutputIfAny(context);
 3257                CaptureJournalData(context);
 3258                CaptureBookmarkData(context);
 3259                CapturePropertiesIfAny(context);
 3260                await CompleteBackgroundActivityOutcomesAsync(context);
 3261                await CompleteBackgroundActivityAsync(context);
 3262                await CompleteBackgroundActivityScheduledActivitiesAsync(context);
 3263                await mediator.SendAsync(new BackgroundActivityExecutionCompleted(context), context.CancellationToken);
 64            }
 65        }
 406066    }
 67
 68    /// <summary>
 69    /// Schedules the current activity for execution in the background.
 70    /// </summary>
 71    private async Task ScheduleBackgroundActivityAsync(ActivityExecutionContext context)
 72    {
 373        var cancellationToken = context.CancellationToken;
 374        var workflowInstanceId = context.WorkflowExecutionContext.Id;
 375        var activityNodeId = context.NodeId;
 376        var bookmarkId = identityGenerator.GenerateId();
 377        var scheduledBackgroundActivity = new ScheduledBackgroundActivity(workflowInstanceId, activityNodeId, bookmarkId
 378        var jobId = await backgroundActivityScheduler.CreateAsync(scheduledBackgroundActivity, cancellationToken);
 379        var stimulus = new BackgroundActivityStimulus
 380        {
 381            JobId = jobId
 382        };
 383        var bookmarkOptions = new CreateBookmarkArgs
 384        {
 385            BookmarkId = bookmarkId,
 386            BookmarkName = BackgroundActivityBookmarkName,
 387            Stimulus = stimulus,
 388            AutoComplete = false
 389        };
 390        context.CreateBookmark(bookmarkOptions);
 91
 392        context.DeferTask(async () =>
 393        {
 394            await backgroundActivityScheduler.ScheduleAsync(jobId, cancellationToken);
 695        });
 396    }
 97
 98    /// <summary>
 99    /// Determines whether the current activity should be executed in the background.
 100    /// </summary>
 101    private static bool GetShouldRunInBackground(ActivityExecutionContext context)
 102    {
 4069103        var activity = context.Activity;
 4069104        var activityDescriptor = context.ActivityDescriptor;
 4069105        var kind = activityDescriptor.Kind;
 106
 4069107        return !GetIsBackgroundExecution(context)
 4069108               && context.WorkflowExecutionContext.ExecuteDelegate == null
 4069109               && (kind is ActivityKind.Job || GetTaskRunAsynchronously(context));
 110    }
 111
 112    private static bool GetTaskRunAsynchronously(ActivityExecutionContext context)
 113    {
 4013114        var activity = context.Activity;
 4013115        var activityDescriptor = context.ActivityDescriptor;
 4013116        var kind = activityDescriptor.Kind;
 117
 4013118        if (kind is not ActivityKind.Task)
 3992119            return false;
 120
 21121        var runAsynchronously = activity.GetRunAsynchronously();
 21122        return runAsynchronously ?? activityDescriptor.RunAsynchronously;
 123    }
 124
 8126125    private static bool GetIsBackgroundExecution(ActivityExecutionContext context) => context.TransientProperties.Contai
 126
 127    /// <summary>
 128    /// If the input contains captured output from the background activity invoker, apply that to the execution context.
 129    /// </summary>
 130    private static void CaptureOutputIfAny(ActivityExecutionContext context)
 131    {
 32132        var activity = context.Activity;
 32133        var inputKey = GetBackgroundActivityOutputKey(activity.NodeId);
 32134        var capturedOutput = context.WorkflowExecutionContext.GetProperty<IDictionary<string, object>>(inputKey);
 135
 32136        context.WorkflowExecutionContext.Properties.Remove(inputKey);
 137
 32138        if (capturedOutput == null)
 29139            return;
 140
 18141        foreach (var outputEntry in capturedOutput)
 142        {
 15143            var outputDescriptor = context.ActivityDescriptor.Outputs.FirstOrDefault(x => x.Name == outputEntry.Key);
 144
 6145            if (outputDescriptor == null)
 146                continue;
 147
 6148            var output = (Output?)outputDescriptor.ValueGetter(activity);
 6149            context.Set(output, outputEntry.Value, outputDescriptor.Name);
 150        }
 3151    }
 152
 153    private void CaptureJournalData(ActivityExecutionContext context)
 154    {
 32155        var activity = context.Activity;
 32156        var journalDataKey = GetBackgroundActivityJournalDataKey(activity.NodeId);
 32157        var journalData = context.WorkflowExecutionContext.GetProperty<IDictionary<string, object>>(journalDataKey);
 158
 32159        context.WorkflowExecutionContext.Properties.Remove(journalDataKey);
 160
 32161        if (journalData == null)
 29162            return;
 163
 6164        foreach (var journalEntry in journalData)
 0165            context.JournalData[journalEntry.Key] = journalEntry.Value;
 3166    }
 167
 168    private void CaptureBookmarkData(ActivityExecutionContext context)
 169    {
 32170        var activity = context.Activity;
 32171        var bookmarksKey = GetBackgroundActivityBookmarksKey(activity.NodeId);
 32172        var bookmarks = context.WorkflowExecutionContext.GetProperty<ICollection<Bookmark>>(bookmarksKey);
 32173        if (bookmarks != null)
 174        {
 3175            context.AddBookmarks(bookmarks);
 176        }
 177
 32178        context.WorkflowExecutionContext.Properties.Remove(bookmarksKey);
 32179    }
 180
 181    private void CapturePropertiesIfAny(ActivityExecutionContext context)
 182    {
 32183        var activity = context.Activity;
 32184        var propertiesKey = GetBackgroundActivityPropertiesKey(activity.NodeId);
 32185        var capturedProperties = context.WorkflowExecutionContext.GetProperty<IDictionary<string, object>>(propertiesKey
 186
 32187        context.WorkflowExecutionContext.Properties.Remove(propertiesKey);
 188
 32189        if (capturedProperties == null)
 29190            return;
 191
 12192        foreach (var property in capturedProperties)
 3193            context.Properties[property.Key] = property.Value;
 3194    }
 195
 196    private async Task CompleteBackgroundActivityOutcomesAsync(ActivityExecutionContext context)
 197    {
 32198        var outcomesKey = GetBackgroundActivityOutcomesKey(context.NodeId);
 32199        var outcomes = context.WorkflowExecutionContext.GetProperty<ICollection<string>>(outcomesKey);
 200
 32201        if (outcomes != null)
 202        {
 0203            await context.CompleteActivityWithOutcomesAsync(outcomes.ToArray());
 204        }
 205
 206        // Remove the outcomes from the workflow execution context.
 32207        context.WorkflowExecutionContext.Properties.Remove(outcomesKey);
 32208    }
 209
 210    private async Task CompleteBackgroundActivityAsync(ActivityExecutionContext context)
 211    {
 32212        var completedKey = GetBackgroundActivityCompletedKey(context.NodeId);
 32213        var completed = context.WorkflowExecutionContext.GetProperty<bool?>(completedKey);
 214
 32215        if (completed is true)
 216        {
 3217            await context.CompleteActivityAsync();
 218        }
 219
 220        // Remove the outcomes from the workflow execution context.
 32221        context.WorkflowExecutionContext.Properties.Remove(completedKey);
 32222    }
 223
 224    private async Task CompleteBackgroundActivityScheduledActivitiesAsync(ActivityExecutionContext context)
 225    {
 32226        var scheduledActivitiesKey = GetBackgroundActivityScheduledActivitiesKey(context.NodeId);
 32227        var scheduledActivitiesJson = context.WorkflowExecutionContext.GetProperty<string>(scheduledActivitiesKey);
 32228        var scheduledActivities = scheduledActivitiesJson != null ? JsonSerializer.Deserialize<ICollection<ScheduledActi
 229
 32230        if (scheduledActivities != null)
 231        {
 6232            foreach (var scheduledActivity in scheduledActivities)
 233            {
 0234                var activityNode = scheduledActivity.ActivityNodeId != null ? context.WorkflowExecutionContext.FindActiv
 0235                var owner = scheduledActivity.OwnerActivityInstanceId != null ? context.WorkflowExecutionContext.Activit
 0236                var options = scheduledActivity.Options != null
 0237                    ? new ScheduleWorkOptions
 0238                    {
 0239                        ExistingActivityExecutionContext = scheduledActivity.Options.ExistingActivityInstanceId != null 
 0240                        Variables = scheduledActivity.Options?.Variables,
 0241                        CompletionCallback = !string.IsNullOrEmpty(scheduledActivity.Options?.CompletionCallback) && own
 0242                        PreventDuplicateScheduling = scheduledActivity.Options?.PreventDuplicateScheduling ?? false,
 0243                        Input = scheduledActivity.Options?.Input,
 0244                        Tag = scheduledActivity.Options?.Tag
 0245                    }
 0246                    : null;
 0247                await context.ScheduleActivityAsync(activityNode, owner, options);
 248            }
 249        }
 250
 251        // Remove the scheduled activities from the workflow execution context.
 32252        context.WorkflowExecutionContext.Properties.Remove(scheduledActivitiesKey);
 32253    }
 254}