< Summary

Information
Class: Elsa.Mediator.HostedServices.MessageProcessorHostedService<T>
Assembly: Elsa.Mediator
File(s): /home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/MessageProcessorHostedService.cs
Line coverage
0%
Covered lines: 0
Uncovered lines: 20
Coverable lines: 20
Total lines: 119
Line coverage: 0%
Branch coverage
0%
Covered branches: 0
Total branches: 6
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%210%
ExecuteAsync()0%4260%

File(s)

/home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/MessageProcessorHostedService.cs

#LineLine coverage
 1using System.Threading.Channels;
 2using Elsa.Mediator.Contracts;
 3using Microsoft.Extensions.Hosting;
 4using Microsoft.Extensions.Logging;
 5
 6namespace Elsa.Mediator.HostedServices;
 7
 8/// <summary>
 9/// Continuously reads from a channel to which commands can be sent, executing each received command.
 10/// </summary>
 11public class MessageProcessorHostedService<T> : BackgroundService where T : notnull
 12{
 13    private readonly int _workerCount;
 14    private readonly Channel<T> _channel;
 15    private readonly IEnumerable<IConsumer<T>> _consumers;
 16    private readonly ILogger _logger;
 17    private readonly List<MessageWorker<T>> _workers;
 18
 19    /// <inheritdoc />
 20    // ReSharper disable once ContextualLoggerProblem
 021    public MessageProcessorHostedService(int workerCount, Channel<T> channel, IEnumerable<IConsumer<T>> consumers, ILogg
 22    {
 023        _workerCount = workerCount;
 024        _channel = channel;
 025        _consumers = consumers;
 026        _logger = logger;
 027        _workers = new List<MessageWorker<T>>(workerCount);
 028    }
 29
 30    /// <inheritdoc />
 31    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
 32    {
 033        var index = 0;
 34
 035        for (var i = 0; i < _workerCount; i++)
 36        {
 037            var worker = new MessageWorker<T>(Channel.CreateUnbounded<T>(), _consumers, _logger);
 038            _workers.Add(worker);
 039            _ = worker.StartAsync(cancellationToken);
 40        }
 41
 042        await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken))
 43        {
 044            var worker = _workers[index];
 045            await worker.DeliverMessageAsync(message, cancellationToken);
 046            index = (index + 1) % _workerCount;
 47        }
 48
 049        foreach (var worker in _workers)
 050            worker.Complete();
 51
 052        _workers.Clear();
 053    }
 54}
 55
 56/// <summary>
 57/// Represents a worker that continuously reads from a channel and processes each received message.
 58/// </summary>
 59/// <typeparam name="T">The type of message to process.</typeparam>
 60public class MessageWorker<T> where T : notnull
 61{
 62    private readonly Channel<T> _channel;
 63    private readonly IEnumerable<IConsumer<T>> _consumers;
 64    private readonly ILogger _logger;
 65
 66    /// <summary>
 67    /// Initializes a new instance of the <see cref="MessageWorker{T}"/> class.
 68    /// </summary>
 69    /// <param name="channel">The channel to read from.</param>
 70    /// <param name="consumers">The consumers that will process each received message.</param>
 71    /// <param name="logger">The logger.</param>
 72    public MessageWorker(Channel<T> channel, IEnumerable<IConsumer<T>> consumers, ILogger logger)
 73    {
 74        _channel = channel;
 75        _consumers = consumers;
 76        _logger = logger;
 77    }
 78
 79    /// <summary>
 80    /// Continuously reads from the channel and processes each received message.
 81    /// </summary>
 82    /// <param name="cancellationToken">The cancellation token.</param>
 83    public async Task StartAsync(CancellationToken cancellationToken)
 84    {
 85        await foreach (var message in _channel.Reader.ReadAllAsync(cancellationToken))
 86        foreach (var consumer in _consumers)
 87            await InvokeConsumerAsync(consumer, message, cancellationToken);
 88    }
 89
 90    /// <summary>
 91    /// Delivers a message to the channel.
 92    /// </summary>
 93    /// <param name="message">The message to deliver.</param>
 94    /// <param name="cancellationToken">The cancellation token.</param>
 95    public async Task DeliverMessageAsync(T message, CancellationToken cancellationToken)
 96    {
 97        await _channel.Writer.WriteAsync(message, cancellationToken);
 98    }
 99
 100    /// <summary>
 101    /// Completes the channel.
 102    /// </summary>
 103    public void Complete()
 104    {
 105        _channel.Writer.Complete();
 106    }
 107
 108    private async Task InvokeConsumerAsync(IConsumer<T> consumer, T message, CancellationToken cancellationToken)
 109    {
 110        try
 111        {
 112            await consumer.ConsumeAsync(message, cancellationToken);
 113        }
 114        catch (Exception ex)
 115        {
 116            _logger.LogError(ex, "An error occurred while invoking consumer {ConsumerType} for message {MessageType}", c
 117        }
 118    }
 119}