< Summary

Information
Class: Elsa.Resilience.ResilientActivityInvoker
Assembly: Elsa.Resilience.Core
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Resilience.Core/Services/ResilientActivityInvoker.cs
Line coverage
0%
Covered lines: 0
Uncovered lines: 48
Coverable lines: 48
Total lines: 116
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 6
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
InvokeAsync()0%620%
<InvokeAsync()100%210%
RecordRetryAttempts()0%620%
CreateResiliencePipelineBuilder()100%210%
GetStrategyConfig(...)0%620%
Map(...)100%210%
Map(...)100%210%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Resilience.Core/Services/ResilientActivityInvoker.cs

#LineLine coverage
 1using System.Text.Json;
 2using Elsa.Expressions.Helpers;
 3using Elsa.Extensions;
 4using Elsa.Resilience.Entities;
 5using Elsa.Resilience.Extensions;
 6using Elsa.Resilience.Models;
 7using Elsa.Resilience.Serialization;
 8using Elsa.Workflows;
 9using Elsa.Workflows.State;
 10using Polly;
 11using Polly.Telemetry;
 12
 13namespace Elsa.Resilience;
 14
 015public class ResilientActivityInvoker(
 016    IResilienceStrategyConfigEvaluator resilienceStrategyConfigEvaluator,
 017    IRetryAttemptRecorder retryAttemptRecorder,
 018    IIdentityGenerator identityGenerator,
 019    ResilienceStrategySerializer resilienceStrategySerializer) : IResilientActivityInvoker
 20{
 21    private const string ResilienceStrategyIdPropKey = "resilienceStrategy";
 22    private const string RetryAttemptsCountKey = "RetryAttemptsCount";
 23
 24    public async Task<T> InvokeAsync<T>(IResilientActivity activity, ActivityExecutionContext context, Func<Task<T>> act
 25    {
 26        // Get the resilience strategy.
 027        var strategyConfig = GetStrategyConfig(activity);
 028        var resilienceStrategy = await resilienceStrategyConfigEvaluator.EvaluateAsync(strategyConfig, context.Expressio
 29
 30        // If no resilience strategy is configured, execute the action as-is.
 031        if (resilienceStrategy == null)
 032            return await action();
 33
 34        // Record the applied strategy as part of the activity execution context for diagnostics.
 035        var resilienceStrategyModel = JsonSerializer.SerializeToNode(resilienceStrategy, resilienceStrategySerializer.Se
 036        context.SetResilienceStrategy(resilienceStrategyModel);
 37
 38        // Create a resilience pipeline builder.
 039        var builder = CreateResiliencePipelineBuilder<T>();
 040        var retries = new List<RetryAttempt>();
 041        context.TransientProperties[RetryAttempt.RetriesKey] = retries;
 42
 43        // Create a resilience context.
 044        var resilienceContext = ResilienceContextPool.Shared.Get(cancellationToken);
 045        resilienceContext.Properties.Set(new(nameof(ActivityExecutionContext)), context);
 46
 47        try
 48        {
 49            // Configure the resilience pipeline.
 050            await resilienceStrategy.ConfigurePipeline(builder, resilienceContext);
 051            var pipeline = builder.Build();
 52
 53            // Execute the action within the resilience pipeline.
 054            var result = await pipeline.ExecuteAsync<T>(async _ => await action(), resilienceContext);
 55
 56            // Record the retry attempts.
 057            await RecordRetryAttempts(activity, context, retries, cancellationToken);
 58
 059            return result;
 60        }
 61        finally
 62        {
 063            ResilienceContextPool.Shared.Return(resilienceContext);
 64        }
 65
 066    }
 67
 68    private async Task RecordRetryAttempts(IResilientActivity activity, ActivityExecutionContext context, ICollection<Re
 69    {
 070        if (attempts.Count > 0)
 71        {
 072            var records = Map(context, activity, attempts);
 073            var recordContext = new RecordRetryAttemptsContext(context, records, cancellationToken);
 074            await retryAttemptRecorder.RecordAsync(recordContext);
 75
 76            // Propagate a flag that retries have occurred. This information can then be used to show the retry attempts
 077            context.SetRetriesAttemptedFlag();
 78
 079            context.SetExtensionsMetadata(RetryAttemptsCountKey, attempts.Count);
 80        }
 081    }
 82
 83    private ResiliencePipelineBuilder<T> CreateResiliencePipelineBuilder<T>()
 84    {
 085        var telemetryOptions = new TelemetryOptions();
 086        telemetryOptions.TelemetryListeners.Add(new RetryTelemetryListener());
 087        return new ResiliencePipelineBuilder<T>().ConfigureTelemetry(telemetryOptions);
 88    }
 89
 90    private ResilienceStrategyConfig? GetStrategyConfig(IResilientActivity resilientActivity)
 91    {
 092        return !resilientActivity.CustomProperties.TryGetValue(ResilienceStrategyIdPropKey, out var value)
 093            ? null
 094            : value.ConvertTo<ResilienceStrategyConfig>();
 95    }
 96
 97    private ICollection<RetryAttemptRecord> Map(ActivityExecutionContext activityExecutionContext, IResilientActivity re
 98    {
 099        return attempts.Select(x => Map(activityExecutionContext, resilientActivity, x)).ToList();
 100    }
 101
 102    private RetryAttemptRecord Map(ActivityExecutionContext activityExecutionContext, IResilientActivity resilientActivi
 103    {
 0104        var details = resilientActivity.CollectRetryDetails(activityExecutionContext, attempt).Where(x => x.Value != nul
 0105        return new()
 0106        {
 0107            Id = identityGenerator.GenerateId(),
 0108            ActivityInstanceId = activityExecutionContext.Id,
 0109            ActivityId = activityExecutionContext.Activity.Id,
 0110            WorkflowInstanceId = activityExecutionContext.WorkflowExecutionContext.Id,
 0111            AttemptNumber = attempt.AttemptNumber,
 0112            RetryDelay = attempt.RetryDelay,
 0113            Details = details
 0114        };
 115    }
 116}