< Summary

Information
Class: Elsa.Mediator.Services.BackgroundNotificationProcessor
Assembly: Elsa.Mediator
File(s): /home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/Services/BackgroundNotificationProcessor.cs
Line coverage
81%
Covered lines: 35
Uncovered lines: 8
Coverable lines: 43
Total lines: 98
Line coverage: 81.3%
Branch coverage
75%
Covered branches: 6
Total branches: 8
Branch coverage: 75%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()83.33%6694.73%
ReadOutputAsync()50%2261.11%

File(s)

/home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/Services/BackgroundNotificationProcessor.cs

#LineLine coverage
 1using System.Threading.Channels;
 2using Elsa.Mediator.Contracts;
 3using Elsa.Mediator.Middleware.Notification;
 4using Elsa.Mediator.Options;
 5using Microsoft.Extensions.DependencyInjection;
 6using Microsoft.Extensions.Logging;
 7using Microsoft.Extensions.Options;
 8
 9namespace Elsa.Mediator.Services;
 10
 11/// <summary>
 12/// Continuously reads from a channel to which notifications can be sent, publishing each received notification.
 13/// </summary>
 14public class BackgroundNotificationProcessor
 15{
 16    private readonly int _workerCount;
 17    private readonly INotificationsChannel _notificationsChannel;
 18    private readonly IServiceScopeFactory _scopeFactory;
 19    private readonly ILogger _logger;
 20
 21    /// <summary>
 22    /// Initializes a new instance of the <see cref="BackgroundNotificationProcessor"/> class.
 23    /// </summary>
 524    public BackgroundNotificationProcessor(IOptions<MediatorOptions> options, INotificationsChannel notificationsChannel
 25    {
 526        _workerCount = options.Value.NotificationWorkerCount;
 527        _notificationsChannel = notificationsChannel;
 528        _scopeFactory = scopeFactory;
 529        _logger = logger;
 530    }
 31
 32    /// <summary>
 33    /// Runs the processor until cancellation is requested.
 34    /// </summary>
 35    public async Task ExecuteAsync(CancellationToken cancellationToken)
 36    {
 537        var index = 0;
 538        var outputs = new List<Channel<NotificationContext>>(_workerCount);
 539        var workers = new List<Task>(_workerCount);
 40
 4041        for (var i = 0; i < _workerCount; i++)
 42        {
 1543            var output = Channel.CreateUnbounded<NotificationContext>();
 1544            outputs.Add(output);
 1545            workers.Add(ReadOutputAsync(output, cancellationToken));
 46        }
 47
 48        try
 49        {
 1650            await foreach (var notification in _notificationsChannel.Reader.ReadAllAsync(cancellationToken))
 51            {
 352                var output = outputs[index];
 353                await output.Writer.WriteAsync(notification, cancellationToken);
 354                index = (index + 1) % _workerCount;
 55            }
 056        }
 557        catch (OperationCanceledException ex) when (cancellationToken.IsCancellationRequested)
 58        {
 559            _logger.LogDebug(ex, "An operation was cancelled while processing the notification queue");
 560        }
 61
 4062        foreach (var output in outputs)
 1563            output.Writer.Complete();
 64
 565        await Task.WhenAll(workers);
 566    }
 67
 68    private async Task ReadOutputAsync(Channel<NotificationContext> output, CancellationToken cancellationToken)
 69    {
 70        try
 71        {
 1572            using var scope = _scopeFactory.CreateScope();
 1573            var notificationSender = scope.ServiceProvider.GetRequiredService<INotificationSender>();
 74
 3675            await foreach (var notificationContext in output.Reader.ReadAllAsync(cancellationToken))
 76            {
 77                try
 78                {
 379                    var notification = notificationContext.Notification;
 380                    using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, not
 381                    await notificationSender.SendAsync(notification, notificationContext.NotificationStrategy, linkedTok
 382                }
 083                catch (OperationCanceledException e)
 84                {
 085                    _logger.LogDebug(e, "An operation was cancelled while processing the notification queue");
 086                }
 087                catch (Exception e)
 88                {
 089                    _logger.LogError(e, "An unhandled exception occurred while processing the notification queue");
 090                }
 91            }
 092        }
 1593        catch (OperationCanceledException ex) when (cancellationToken.IsCancellationRequested)
 94        {
 1595            _logger.LogDebug(ex, "An operation was cancelled while processing the notification queue");
 1596        }
 1597    }
 98}