< Summary

Information
Class: Elsa.Mediator.HostedServices.BackgroundCommandSenderHostedService
Assembly: Elsa.Mediator
File(s): /home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/BackgroundCommandSenderHostedService.cs
Line coverage
80%
Covered lines: 28
Uncovered lines: 7
Coverable lines: 35
Total lines: 91
Line coverage: 80%
Branch coverage
50%
Covered branches: 4
Total branches: 8
Branch coverage: 50%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()50%7675%
ReadOutputAsync()50%2275%

File(s)

/home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/BackgroundCommandSenderHostedService.cs

#LineLine coverage
 1using System.Threading.Channels;
 2using Elsa.Mediator.Contracts;
 3using Elsa.Mediator.Middleware.Command;
 4using Elsa.Mediator.Options;
 5using Microsoft.Extensions.DependencyInjection;
 6using Microsoft.Extensions.Hosting;
 7using Microsoft.Extensions.Logging;
 8using Microsoft.Extensions.Options;
 9
 10namespace Elsa.Mediator.HostedServices;
 11
 12/// <summary>
 13/// Continuously reads from a channel to which commands can be sent, executing each received command.
 14/// </summary>
 15public class BackgroundCommandSenderHostedService : BackgroundService
 16{
 17    private readonly int _workerCount;
 18    private readonly ICommandsChannel _commandsChannel;
 19    private readonly IServiceScopeFactory _scopeFactory;
 20    private readonly List<Channel<CommandContext>> _outputs;
 21    private readonly ILogger _logger;
 22
 23    /// <inheritdoc />
 124    public BackgroundCommandSenderHostedService(IOptions<MediatorOptions> options, ICommandsChannel commandsChannel, ISe
 25    {
 126        _workerCount = options.Value.CommandWorkerCount;
 127        _commandsChannel = commandsChannel; // The shared input channel for all commands
 128        _scopeFactory = scopeFactory;
 129        _logger = logger;
 130        _outputs = new(_workerCount); // Prepare a list to hold worker-specific channels
 131    }
 32
 33    /// <inheritdoc />
 34    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
 35    {
 136        var index = 0; // Used for round-robin distribution of work
 37
 38        // Set up worker channels and start background tasks for each worker
 1039        for (var i = 0; i < _workerCount; i++)
 40        {
 441            var output = Channel.CreateUnbounded<CommandContext>();
 442            _outputs.Add(output);
 43            // Start a background task that processes commands from this worker's channel
 444            _ = ReadOutputAsync(output, cancellationToken);
 45        }
 46
 47        // Main dispatcher loop: read from the input channel and distribute to worker channels
 4648        await foreach (var commandContext in _commandsChannel.Reader.ReadAllAsync(cancellationToken))
 49        {
 2250            var output = _outputs[index];
 2251            await output.Writer.WriteAsync(commandContext, cancellationToken);
 52            // Round-robin distribution - move to next worker
 2253            index = (index + 1) % _workerCount;
 54        }
 55
 56        // If the input channel is completed, complete all worker channels
 057        foreach (var output in _outputs)
 058            output.Writer.Complete();
 059    }
 60
 61    private async Task ReadOutputAsync(Channel<CommandContext> output, CancellationToken cancellationToken)
 62    {
 63        // Worker task: process commands from the worker's channel
 5264        await foreach (var commandContext in output.Reader.ReadAllAsync(cancellationToken))
 65        {
 66            try
 67            {
 68                // Create a fresh scope for each command to ensure proper service lifetime
 2269                using var scope = _scopeFactory.CreateScope();
 2270                var commandSender = scope.ServiceProvider.GetRequiredService<ICommandSender>();
 71
 72                // Link the service cancellation token with the command's token to ensure proper cancellation
 2273                using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
 2274                    cancellationToken,
 2275                    commandContext.CancellationToken);
 76
 77                // Process the command using the command sender service with the linked token
 2278                await commandSender.SendAsync(
 2279                    commandContext.Command,
 2280                    CommandStrategy.Default,
 2281                    commandContext.Headers,
 2282                    linkedTokenSource.Token);
 2283            }
 084            catch (Exception e)
 85            {
 86                // Log errors but continue processing other commands
 087                _logger.LogError(e, "An unhandled exception occurred while processing the queue");
 088            }
 89        }
 090    }
 91}