< Summary

Information
Class: Elsa.Mediator.HostedServices.BackgroundEventPublisherHostedService
Assembly: Elsa.Mediator
File(s): /home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/BackgroundEventPublisherHostedService.cs
Line coverage
64%
Covered lines: 27
Uncovered lines: 15
Coverable lines: 42
Total lines: 111
Line coverage: 64.2%
Branch coverage
50%
Covered branches: 4
Total branches: 8
Branch coverage: 50%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()66.66%6678.94%
ReadOutputAsync()0%3231.25%

File(s)

/home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/BackgroundEventPublisherHostedService.cs

#LineLine coverage
 1using System.Threading.Channels;
 2using Elsa.Mediator.Contracts;
 3using Elsa.Mediator.Middleware.Notification;
 4using Elsa.Mediator.Options;
 5using Microsoft.Extensions.DependencyInjection;
 6using Microsoft.Extensions.Hosting;
 7using Microsoft.Extensions.Logging;
 8using Microsoft.Extensions.Options;
 9
 10namespace Elsa.Mediator.HostedServices;
 11
 12/// <summary>
 13/// Continuously reads from a channel to which notifications can be sent, publishing each received notification.
 14/// </summary>
 15public class BackgroundEventPublisherHostedService : BackgroundService
 16{
 17    private readonly int _workerCount;
 18    private readonly INotificationsChannel _notificationsChannel;
 19    private readonly IServiceScopeFactory _scopeFactory;
 20    private readonly List<Channel<NotificationContext>> _outputs;
 21    private readonly ILogger _logger;
 22
 23    /// <inheritdoc />
 724    public BackgroundEventPublisherHostedService(IOptions<MediatorOptions> options, INotificationsChannel notificationsC
 25    {
 726        _workerCount = options.Value.NotificationWorkerCount;
 727        _notificationsChannel = notificationsChannel;
 728        _scopeFactory = scopeFactory;
 729        _logger = logger;
 730        _outputs = new(_workerCount);
 731    }
 32
 33    /// <inheritdoc />
 34    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
 35    {
 36        // Index to round-robin distribute notifications across worker channels
 737        var index = 0;
 38
 739        using var scope = _scopeFactory.CreateScope();
 740        var notificationSender = scope.ServiceProvider.GetRequiredService<INotificationSender>();
 41
 42        // Create multiple output channels and start worker tasks for parallel processing
 7043        for (var i = 0; i < _workerCount; i++)
 44        {
 2845            var output = Channel.CreateUnbounded<NotificationContext>();
 2846            _outputs.Add(output);
 47            // Start a background task to process notifications from this output channel
 2848            _ = ReadOutputAsync(output, notificationSender, cancellationToken);
 49        }
 50
 751        var channelReader = _notificationsChannel.Reader;
 52
 53        // Continuously read notifications from the input channel and distribute them to worker channels
 54        // using round-robin distribution for load balancing
 55        try
 56        {
 1457            await foreach (var notification in channelReader.ReadAllAsync(cancellationToken))
 58            {
 059                var output = _outputs[index];
 060                await output.Writer.WriteAsync(notification, cancellationToken);
 61                // Move to the next worker in a circular fashion
 062                index = (index + 1) % _workerCount;
 63            }
 064        }
 365        catch (OperationCanceledException ex)
 66        {
 367            _logger.LogDebug(ex, "An operation was cancelled while processing the queue");
 368        }
 69
 70        // When the input channel is completed, complete all output channels
 3071        foreach (var output in _outputs)
 72        {
 1273            output.Writer.Complete();
 74        }
 375    }
 76
 77    /// <summary>
 78    /// Processes notifications from an output channel asynchronously.
 79    /// </summary>
 80    /// <param name="output">The channel to read notifications from</param>
 81    /// <param name="notificationSender">The service used to send notifications</param>
 82    /// <param name="cancellationToken">Cancellation token from the hosted service</param>
 83    private async Task ReadOutputAsync(Channel<NotificationContext> output, INotificationSender notificationSender, Canc
 84    {
 85        try
 86        {
 5687            await foreach (var notificationContext in output.Reader.ReadAllAsync(cancellationToken))
 88            {
 89                try
 90                {
 091                    var notification = notificationContext.Notification;
 92                    // Link the cancellation tokens so that cancellation can happen from either source
 093                    using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, not
 094                    await notificationSender.SendAsync(notification, NotificationStrategy.Sequential, linkedTokenSource.
 095                }
 096                catch (OperationCanceledException e)
 97                {
 098                    _logger.LogDebug(e, "An operation was cancelled while processing the queue");
 099                }
 0100                catch (Exception e)
 101                {
 0102                    _logger.LogError(e, "An unhandled exception occurred while processing the queue");
 0103                }
 104            }
 0105        }
 12106        catch (OperationCanceledException ex)
 107        {
 12108            _logger.LogDebug(ex, "An operation was cancelled while processing the queue");
 12109        }
 12110    }
 111}