< Summary

Information
Class: Elsa.Mediator.HostedServices.BackgroundEventPublisherHostedService
Assembly: Elsa.Mediator
File(s): /home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/BackgroundEventPublisherHostedService.cs
Line coverage
50%
Covered lines: 17
Uncovered lines: 17
Coverable lines: 34
Total lines: 97
Line coverage: 50%
Branch coverage
25%
Covered branches: 2
Total branches: 8
Branch coverage: 25%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()33.33%8660%
ReadOutputAsync()0%528.33%

File(s)

/home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/BackgroundEventPublisherHostedService.cs

#LineLine coverage
 1using System.Threading.Channels;
 2using Elsa.Mediator.Contracts;
 3using Elsa.Mediator.Middleware.Notification;
 4using Elsa.Mediator.Options;
 5using Microsoft.Extensions.DependencyInjection;
 6using Microsoft.Extensions.Hosting;
 7using Microsoft.Extensions.Logging;
 8using Microsoft.Extensions.Options;
 9
 10namespace Elsa.Mediator.HostedServices;
 11
 12/// <summary>
 13/// Continuously reads from a channel to which notifications can be sent, publishing each received notification.
 14/// </summary>
 15public class BackgroundEventPublisherHostedService : BackgroundService
 16{
 17    private readonly int _workerCount;
 18    private readonly INotificationsChannel _notificationsChannel;
 19    private readonly IServiceScopeFactory _scopeFactory;
 20    private readonly List<Channel<NotificationContext>> _outputs;
 21    private readonly ILogger _logger;
 22
 23    /// <inheritdoc />
 124    public BackgroundEventPublisherHostedService(IOptions<MediatorOptions> options, INotificationsChannel notificationsC
 25    {
 126        _workerCount = options.Value.NotificationWorkerCount;
 127        _notificationsChannel = notificationsChannel;
 128        _scopeFactory = scopeFactory;
 129        _logger = logger;
 130        _outputs = new(_workerCount);
 131    }
 32
 33    /// <inheritdoc />
 34    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
 35    {
 36        // Index to round-robin distribute notifications across worker channels
 137        var index = 0;
 38
 139        using var scope = _scopeFactory.CreateScope();
 140        var notificationSender = scope.ServiceProvider.GetRequiredService<INotificationSender>();
 41
 42        // Create multiple output channels and start worker tasks for parallel processing
 1043        for (var i = 0; i < _workerCount; i++)
 44        {
 445            var output = Channel.CreateUnbounded<NotificationContext>();
 446            _outputs.Add(output);
 47            // Start a background task to process notifications from this output channel
 448            _ = ReadOutputAsync(output, notificationSender, cancellationToken);
 49        }
 50
 151        var channelReader = _notificationsChannel.Reader;
 52
 53        // Continuously read notifications from the input channel and distribute them to worker channels
 54        // using round-robin distribution for load balancing
 255        await foreach (var notification in channelReader.ReadAllAsync(cancellationToken))
 56        {
 057            var output = _outputs[index];
 058            await output.Writer.WriteAsync(notification, cancellationToken);
 59            // Move to the next worker in a circular fashion
 060            index = (index + 1) % _workerCount;
 61        }
 62
 63        // When the input channel is completed, complete all output channels
 064        foreach (var output in _outputs)
 65        {
 066            output.Writer.Complete();
 67        }
 068    }
 69
 70    /// <summary>
 71    /// Processes notifications from an output channel asynchronously.
 72    /// </summary>
 73    /// <param name="output">The channel to read notifications from</param>
 74    /// <param name="notificationSender">The service used to send notifications</param>
 75    /// <param name="cancellationToken">Cancellation token from the hosted service</param>
 76    private async Task ReadOutputAsync(Channel<NotificationContext> output, INotificationSender notificationSender, Canc
 77    {
 878        await foreach (var notificationContext in output.Reader.ReadAllAsync(cancellationToken))
 79        {
 80            try
 81            {
 082                var notification = notificationContext.Notification;
 83                // Link the cancellation tokens so that cancellation can happen from either source
 084                using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, notific
 085                await notificationSender.SendAsync(notification, NotificationStrategy.Sequential, linkedTokenSource.Toke
 086            }
 087            catch (OperationCanceledException e)
 88            {
 089                _logger.LogDebug(e, "An operation was cancelled while processing the queue");
 090            }
 091            catch (Exception e)
 92            {
 093                _logger.LogError(e, "An unhandled exception occurred while processing the queue");
 094            }
 95        }
 096    }
 97}