< Summary

Information
Class: Elsa.Mediator.Services.BackgroundCommandProcessor
Assembly: Elsa.Mediator
File(s): /home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/Services/BackgroundCommandProcessor.cs
Line coverage
82%
Covered lines: 38
Uncovered lines: 8
Coverable lines: 46
Total lines: 101
Line coverage: 82.6%
Branch coverage
75%
Covered branches: 6
Total branches: 8
Branch coverage: 75%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()83.33%6694.73%
ReadOutputAsync()50%2266.66%

File(s)

/home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/Services/BackgroundCommandProcessor.cs

#LineLine coverage
 1using System.Threading.Channels;
 2using Elsa.Mediator.Contracts;
 3using Elsa.Mediator.Middleware.Command;
 4using Elsa.Mediator.Options;
 5using Microsoft.Extensions.DependencyInjection;
 6using Microsoft.Extensions.Logging;
 7using Microsoft.Extensions.Options;
 8
 9namespace Elsa.Mediator.Services;
 10
 11/// <summary>
 12/// Continuously reads from a channel to which commands can be sent, executing each received command.
 13/// </summary>
 14public class BackgroundCommandProcessor
 15{
 16    private readonly int _workerCount;
 17    private readonly ICommandsChannel _commandsChannel;
 18    private readonly IServiceScopeFactory _scopeFactory;
 19    private readonly ILogger _logger;
 20
 21    /// <summary>
 22    /// Initializes a new instance of the <see cref="BackgroundCommandProcessor"/> class.
 23    /// </summary>
 424    public BackgroundCommandProcessor(IOptions<MediatorOptions> options, ICommandsChannel commandsChannel, IServiceScope
 25    {
 426        _workerCount = options.Value.CommandWorkerCount;
 427        _commandsChannel = commandsChannel;
 428        _scopeFactory = scopeFactory;
 429        _logger = logger;
 430    }
 31
 32    /// <summary>
 33    /// Runs the processor until cancellation is requested.
 34    /// </summary>
 35    public async Task ExecuteAsync(CancellationToken cancellationToken)
 36    {
 437        var index = 0;
 438        var outputs = new List<Channel<CommandContext>>(_workerCount);
 439        var workers = new List<Task>(_workerCount);
 40
 3441        for (var i = 0; i < _workerCount; i++)
 42        {
 1343            var output = Channel.CreateUnbounded<CommandContext>();
 1344            outputs.Add(output);
 1345            workers.Add(ReadOutputAsync(output, cancellationToken));
 46        }
 47
 48        try
 49        {
 6050            await foreach (var commandContext in _commandsChannel.Reader.ReadAllAsync(cancellationToken))
 51            {
 2652                var output = outputs[index];
 2653                await output.Writer.WriteAsync(commandContext, cancellationToken);
 2654                index = (index + 1) % _workerCount;
 55            }
 056        }
 457        catch (OperationCanceledException ex) when (cancellationToken.IsCancellationRequested)
 58        {
 459            _logger.LogDebug(ex, "An operation was cancelled while processing the command queue");
 460        }
 61
 3462        foreach (var output in outputs)
 1363            output.Writer.Complete();
 64
 465        await Task.WhenAll(workers);
 466    }
 67
 68    private async Task ReadOutputAsync(Channel<CommandContext> output, CancellationToken cancellationToken)
 69    {
 70        try
 71        {
 7872            await foreach (var commandContext in output.Reader.ReadAllAsync(cancellationToken))
 73            {
 74                try
 75                {
 2676                    using var scope = _scopeFactory.CreateScope();
 2677                    var commandSender = scope.ServiceProvider.GetRequiredService<ICommandSender>();
 2678                    using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, com
 79
 2680                    await commandSender.SendAsync(
 2681                        commandContext.Command,
 2682                        commandContext.CommandStrategy,
 2683                        commandContext.Headers,
 2684                        linkedTokenSource.Token);
 2685                }
 086                catch (OperationCanceledException e)
 87                {
 088                    _logger.LogDebug(e, "An operation was cancelled while processing the command queue");
 089                }
 090                catch (Exception e)
 91                {
 092                    _logger.LogError(e, "An unhandled exception occurred while processing the command queue");
 093                }
 94            }
 095        }
 1396        catch (OperationCanceledException ex) when (cancellationToken.IsCancellationRequested)
 97        {
 1398            _logger.LogDebug(ex, "An operation was cancelled while processing the command queue");
 1399        }
 13100    }
 101}