| | | 1 | | using System.Text.Json; |
| | | 2 | | using Elsa.Expressions.Helpers; |
| | | 3 | | using Elsa.Extensions; |
| | | 4 | | using Elsa.Resilience.Entities; |
| | | 5 | | using Elsa.Resilience.Extensions; |
| | | 6 | | using Elsa.Resilience.Models; |
| | | 7 | | using Elsa.Resilience.Serialization; |
| | | 8 | | using Elsa.Workflows; |
| | | 9 | | using Elsa.Workflows.State; |
| | | 10 | | using Polly; |
| | | 11 | | using Polly.Telemetry; |
| | | 12 | | |
| | | 13 | | namespace Elsa.Resilience; |
| | | 14 | | |
| | 0 | 15 | | public class ResilientActivityInvoker( |
| | 0 | 16 | | IResilienceStrategyConfigEvaluator resilienceStrategyConfigEvaluator, |
| | 0 | 17 | | IRetryAttemptRecorder retryAttemptRecorder, |
| | 0 | 18 | | IIdentityGenerator identityGenerator, |
| | 0 | 19 | | ResilienceStrategySerializer resilienceStrategySerializer) : IResilientActivityInvoker |
| | | 20 | | { |
| | | 21 | | private const string ResilienceStrategyIdPropKey = "resilienceStrategy"; |
| | | 22 | | private const string RetryAttemptsCountKey = "RetryAttemptsCount"; |
| | | 23 | | |
| | | 24 | | public async Task<T> InvokeAsync<T>(IResilientActivity activity, ActivityExecutionContext context, Func<Task<T>> act |
| | | 25 | | { |
| | | 26 | | // Get the resilience strategy. |
| | 0 | 27 | | var strategyConfig = GetStrategyConfig(activity); |
| | 0 | 28 | | var resilienceStrategy = await resilienceStrategyConfigEvaluator.EvaluateAsync(strategyConfig, context.Expressio |
| | | 29 | | |
| | | 30 | | // If no resilience strategy is configured, execute the action as-is. |
| | 0 | 31 | | if (resilienceStrategy == null) |
| | 0 | 32 | | return await action(); |
| | | 33 | | |
| | | 34 | | // Record the applied strategy as part of the activity execution context for diagnostics. |
| | 0 | 35 | | var resilienceStrategyModel = JsonSerializer.SerializeToNode(resilienceStrategy, resilienceStrategySerializer.Se |
| | 0 | 36 | | context.SetResilienceStrategy(resilienceStrategyModel); |
| | | 37 | | |
| | | 38 | | // Create a resilience pipeline builder. |
| | 0 | 39 | | var builder = CreateResiliencePipelineBuilder<T>(); |
| | 0 | 40 | | var retries = new List<RetryAttempt>(); |
| | 0 | 41 | | context.TransientProperties[RetryAttempt.RetriesKey] = retries; |
| | | 42 | | |
| | | 43 | | // Create a resilience context. |
| | 0 | 44 | | var resilienceContext = ResilienceContextPool.Shared.Get(cancellationToken); |
| | 0 | 45 | | resilienceContext.Properties.Set(new(nameof(ActivityExecutionContext)), context); |
| | | 46 | | |
| | | 47 | | try |
| | | 48 | | { |
| | | 49 | | // Configure the resilience pipeline. |
| | 0 | 50 | | await resilienceStrategy.ConfigurePipeline(builder, resilienceContext); |
| | 0 | 51 | | var pipeline = builder.Build(); |
| | | 52 | | |
| | | 53 | | // Execute the action within the resilience pipeline. |
| | 0 | 54 | | var result = await pipeline.ExecuteAsync<T>(async _ => await action(), resilienceContext); |
| | | 55 | | |
| | | 56 | | // Record the retry attempts. |
| | 0 | 57 | | await RecordRetryAttempts(activity, context, retries, cancellationToken); |
| | | 58 | | |
| | 0 | 59 | | return result; |
| | | 60 | | } |
| | | 61 | | finally |
| | | 62 | | { |
| | 0 | 63 | | ResilienceContextPool.Shared.Return(resilienceContext); |
| | | 64 | | } |
| | | 65 | | |
| | 0 | 66 | | } |
| | | 67 | | |
| | | 68 | | private async Task RecordRetryAttempts(IResilientActivity activity, ActivityExecutionContext context, ICollection<Re |
| | | 69 | | { |
| | 0 | 70 | | if (attempts.Count > 0) |
| | | 71 | | { |
| | 0 | 72 | | var records = Map(context, activity, attempts); |
| | 0 | 73 | | var recordContext = new RecordRetryAttemptsContext(context, records, cancellationToken); |
| | 0 | 74 | | await retryAttemptRecorder.RecordAsync(recordContext); |
| | | 75 | | |
| | | 76 | | // Propagate a flag that retries have occurred. This information can then be used to show the retry attempts |
| | 0 | 77 | | context.SetRetriesAttemptedFlag(); |
| | | 78 | | |
| | 0 | 79 | | context.SetExtensionsMetadata(RetryAttemptsCountKey, attempts.Count); |
| | | 80 | | } |
| | 0 | 81 | | } |
| | | 82 | | |
| | | 83 | | private ResiliencePipelineBuilder<T> CreateResiliencePipelineBuilder<T>() |
| | | 84 | | { |
| | 0 | 85 | | var telemetryOptions = new TelemetryOptions(); |
| | 0 | 86 | | telemetryOptions.TelemetryListeners.Add(new RetryTelemetryListener()); |
| | 0 | 87 | | return new ResiliencePipelineBuilder<T>().ConfigureTelemetry(telemetryOptions); |
| | | 88 | | } |
| | | 89 | | |
| | | 90 | | private ResilienceStrategyConfig? GetStrategyConfig(IResilientActivity resilientActivity) |
| | | 91 | | { |
| | 0 | 92 | | return !resilientActivity.CustomProperties.TryGetValue(ResilienceStrategyIdPropKey, out var value) |
| | 0 | 93 | | ? null |
| | 0 | 94 | | : value.ConvertTo<ResilienceStrategyConfig>(); |
| | | 95 | | } |
| | | 96 | | |
| | | 97 | | private ICollection<RetryAttemptRecord> Map(ActivityExecutionContext activityExecutionContext, IResilientActivity re |
| | | 98 | | { |
| | 0 | 99 | | return attempts.Select(x => Map(activityExecutionContext, resilientActivity, x)).ToList(); |
| | | 100 | | } |
| | | 101 | | |
| | | 102 | | private RetryAttemptRecord Map(ActivityExecutionContext activityExecutionContext, IResilientActivity resilientActivi |
| | | 103 | | { |
| | 0 | 104 | | var details = resilientActivity.CollectRetryDetails(activityExecutionContext, attempt).Where(x => x.Value != nul |
| | 0 | 105 | | return new() |
| | 0 | 106 | | { |
| | 0 | 107 | | Id = identityGenerator.GenerateId(), |
| | 0 | 108 | | ActivityInstanceId = activityExecutionContext.Id, |
| | 0 | 109 | | ActivityId = activityExecutionContext.Activity.Id, |
| | 0 | 110 | | WorkflowInstanceId = activityExecutionContext.WorkflowExecutionContext.Id, |
| | 0 | 111 | | AttemptNumber = attempt.AttemptNumber, |
| | 0 | 112 | | RetryDelay = attempt.RetryDelay, |
| | 0 | 113 | | Details = details |
| | 0 | 114 | | }; |
| | | 115 | | } |
| | | 116 | | } |