< Summary

Information
Class: Elsa.Mediator.Services.BackgroundJobProcessor
Assembly: Elsa.Mediator
File(s): /home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/Services/BackgroundJobProcessor.cs
Line coverage
53%
Covered lines: 15
Uncovered lines: 13
Coverable lines: 28
Total lines: 71
Line coverage: 53.5%
Branch coverage
50%
Covered branches: 2
Total branches: 4
Branch coverage: 50%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()100%22100%
ProcessJobsAsync()0%4227.77%

File(s)

/home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/Services/BackgroundJobProcessor.cs

#LineLine coverage
 1using Elsa.Mediator.Contracts;
 2using Elsa.Mediator.Options;
 3using Microsoft.Extensions.Logging;
 4using Microsoft.Extensions.Options;
 5
 6namespace Elsa.Mediator.Services;
 7
 8/// <summary>
 9/// Continuously reads from a channel to which jobs can be sent, executing each received job.
 10/// </summary>
 11public class BackgroundJobProcessor
 12{
 13    private readonly int _workerCount;
 14    private readonly IJobsChannel _jobsChannel;
 15    private readonly ILogger<BackgroundJobProcessor> _logger;
 16
 17    /// <summary>
 18    /// Initializes a new instance of the <see cref="BackgroundJobProcessor"/> class.
 19    /// </summary>
 320    public BackgroundJobProcessor(IOptions<MediatorOptions> options, IJobsChannel jobsChannel, ILogger<BackgroundJobProc
 21    {
 322        _workerCount = options.Value.JobWorkerCount;
 323        _jobsChannel = jobsChannel;
 324        _logger = logger;
 325    }
 26
 27    /// <summary>
 28    /// Runs the processor until cancellation is requested.
 29    /// </summary>
 30    public async Task ExecuteAsync(CancellationToken cancellationToken)
 31    {
 332        var workers = new Task[_workerCount];
 33
 3034        for (var i = 0; i < _workerCount; i++)
 1235            workers[i] = ProcessJobsAsync(cancellationToken);
 36
 337        await Task.WhenAll(workers);
 338    }
 39
 40    private async Task ProcessJobsAsync(CancellationToken cancellationToken)
 41    {
 42        try
 43        {
 2444            await foreach (var jobItem in _jobsChannel.Reader.ReadAllAsync(cancellationToken))
 45            {
 46                try
 47                {
 048                    using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, job
 049                    await jobItem.Action(linkedTokenSource.Token);
 050                    _logger.LogInformation("Worker {CurrentTaskId} processed job {JobId}", Task.CurrentId, jobItem.JobId
 051                }
 052                catch (OperationCanceledException)
 53                {
 054                    _logger.LogInformation("Job {JobId} was canceled", jobItem.JobId);
 055                }
 056                catch (Exception ex)
 57                {
 058                    _logger.LogError(ex, "Job {JobId} failed", jobItem.JobId);
 059                }
 60                finally
 61                {
 062                    jobItem.OnJobCompleted(jobItem.JobId);
 63                }
 064            }
 065        }
 1266        catch (OperationCanceledException ex) when (cancellationToken.IsCancellationRequested)
 67        {
 1268            _logger.LogDebug(ex, "An operation was cancelled while processing the job queue");
 1269        }
 1270    }
 71}