< Summary

Information
Class: Elsa.Mediator.HostedServices.BackgroundCommandSenderHostedService
Assembly: Elsa.Mediator
File(s): /home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/BackgroundCommandSenderHostedService.cs
Line coverage
88%
Covered lines: 38
Uncovered lines: 5
Coverable lines: 43
Total lines: 105
Line coverage: 88.3%
Branch coverage
75%
Covered branches: 6
Total branches: 8
Branch coverage: 75%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()83.33%6693.75%
ReadOutputAsync()50%2280%

File(s)

/home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/BackgroundCommandSenderHostedService.cs

#LineLine coverage
 1using System.Threading.Channels;
 2using Elsa.Mediator.Contracts;
 3using Elsa.Mediator.Middleware.Command;
 4using Elsa.Mediator.Options;
 5using Microsoft.Extensions.DependencyInjection;
 6using Microsoft.Extensions.Hosting;
 7using Microsoft.Extensions.Logging;
 8using Microsoft.Extensions.Options;
 9
 10namespace Elsa.Mediator.HostedServices;
 11
 12/// <summary>
 13/// Continuously reads from a channel to which commands can be sent, executing each received command.
 14/// </summary>
 15public class BackgroundCommandSenderHostedService : BackgroundService
 16{
 17    private readonly int _workerCount;
 18    private readonly ICommandsChannel _commandsChannel;
 19    private readonly IServiceScopeFactory _scopeFactory;
 20    private readonly List<Channel<CommandContext>> _outputs;
 21    private readonly ILogger _logger;
 22
 23    /// <inheritdoc />
 724    public BackgroundCommandSenderHostedService(IOptions<MediatorOptions> options, ICommandsChannel commandsChannel, ISe
 25    {
 726        _workerCount = options.Value.CommandWorkerCount;
 727        _commandsChannel = commandsChannel; // The shared input channel for all commands
 728        _scopeFactory = scopeFactory;
 729        _logger = logger;
 730        _outputs = new(_workerCount); // Prepare a list to hold worker-specific channels
 731    }
 32
 33    /// <inheritdoc />
 34    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
 35    {
 736        var index = 0; // Used for round-robin distribution of work
 37
 38        // Set up worker channels and start background tasks for each worker
 7039        for (var i = 0; i < _workerCount; i++)
 40        {
 2841            var output = Channel.CreateUnbounded<CommandContext>();
 2842            _outputs.Add(output);
 43            // Start a background task that processes commands from this worker's channel
 2844            _ = ReadOutputAsync(output, cancellationToken);
 45        }
 46
 47        // Main dispatcher loop: read from the input channel and distribute to worker channels
 48        try
 49        {
 6450            await foreach (var commandContext in _commandsChannel.Reader.ReadAllAsync(cancellationToken))
 51            {
 2552                var output = _outputs[index];
 2553                await output.Writer.WriteAsync(commandContext, cancellationToken);
 54                // Round-robin distribution - move to next worker
 2555                index = (index + 1) % _workerCount;
 56            }
 057        }
 358        catch (OperationCanceledException ex)
 59        {
 360            _logger.LogDebug(ex, "An operation was cancelled while processing the queue");
 361        }
 62
 63        // If the input channel is completed, complete all worker channels
 3064        foreach (var output in _outputs)
 1265            output.Writer.Complete();
 366    }
 67
 68    private async Task ReadOutputAsync(Channel<CommandContext> output, CancellationToken cancellationToken)
 69    {
 70        // Worker task: process commands from the worker's channel
 71        try
 72        {
 10673            await foreach (var commandContext in output.Reader.ReadAllAsync(cancellationToken))
 74            {
 75                try
 76                {
 77                    // Create a fresh scope for each command to ensure proper service lifetime
 2578                    using var scope = _scopeFactory.CreateScope();
 2579                    var commandSender = scope.ServiceProvider.GetRequiredService<ICommandSender>();
 80
 81                    // Link the service cancellation token with the command's token to ensure proper cancellation
 2582                    using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
 2583                        cancellationToken,
 2584                        commandContext.CancellationToken);
 85
 86                    // Process the command using the command sender service with the linked token
 2587                    await commandSender.SendAsync(
 2588                        commandContext.Command,
 2589                        CommandStrategy.Default,
 2590                        commandContext.Headers,
 2591                        linkedTokenSource.Token);
 2592                }
 093                catch (Exception e)
 94                {
 95                    // Log errors but continue processing other commands
 096                    _logger.LogError(e, "An unhandled exception occurred while processing the queue");
 097                }
 98            }
 099        }
 12100        catch (OperationCanceledException ex)
 101        {
 12102            _logger.LogDebug(ex, "An operation was cancelled while processing the queue");
 12103        }
 12104    }
 105}