< Summary

Information
Class: Elsa.Mediator.HostedServices.JobRunnerHostedService
Assembly: Elsa.Mediator
File(s): /home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/JobRunnerHostedService.cs
Line coverage
41%
Covered lines: 10
Uncovered lines: 14
Coverable lines: 24
Total lines: 71
Line coverage: 41.6%
Branch coverage
50%
Covered branches: 2
Total branches: 4
Branch coverage: 50%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ExecuteAsync()100%2280%
ProcessJobsAsync()0%527.14%

File(s)

/home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/HostedServices/JobRunnerHostedService.cs

#LineLine coverage
 1using Elsa.Mediator.Contracts;
 2using Elsa.Mediator.Options;
 3using Microsoft.Extensions.Hosting;
 4using Microsoft.Extensions.Logging;
 5using Microsoft.Extensions.Options;
 6
 7namespace Elsa.Mediator.HostedServices;
 8
 9/// <summary>
 10/// A hosted service that runs jobs.
 11/// </summary>
 12public class JobRunnerHostedService : BackgroundService
 13{
 14    private readonly int _workerCount;
 15    private readonly IJobsChannel _jobsChannel;
 16    private readonly ILogger<JobRunnerHostedService> _logger;
 17
 18    /// <inheritdoc />
 119    public JobRunnerHostedService(IOptions<MediatorOptions> options, IJobsChannel jobsChannel, ILogger<JobRunnerHostedSe
 20    {
 121        _workerCount = options.Value.JobWorkerCount;
 122        _jobsChannel = jobsChannel;
 123        _logger = logger;
 124    }
 25
 26    /// <inheritdoc />
 27    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
 28    {
 29        // Create an array of worker tasks to process jobs in parallel
 130        var workers = new Task[_workerCount];
 31
 32        // Start multiple workers (tasks) to process jobs concurrently
 1033        for (var i = 0; i < _workerCount; i++)
 434            workers[i] = ProcessJobsAsync(stoppingToken);
 35
 36        // Wait for all worker tasks to complete (typically when the application is shutting down)
 137        await Task.WhenAll(workers);
 038    }
 39
 40    /// <summary>
 41    /// Continuously processes jobs from the job channel until cancellation is requested.
 42    /// </summary>
 43    /// <param name="stoppingToken">Cancellation token from the hosted service</param>
 44    private async Task ProcessJobsAsync(CancellationToken stoppingToken)
 45    {
 46        // Process all jobs from the channel until it's completed or cancellation is requested
 847        await foreach (var jobItem in _jobsChannel.Reader.ReadAllAsync(stoppingToken))
 48        {
 49            try
 50            {
 51                // Link the cancellation tokens so that cancellation can happen from either source
 052                using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, jobItem.Can
 053                await jobItem.Action(linkedTokenSource.Token);
 054                _logger.LogInformation("Worker {CurrentTaskId} processed job {JobId}", Task.CurrentId, jobItem.JobId);
 055            }
 056            catch (OperationCanceledException)
 57            {
 058                _logger.LogInformation("Job {JobId} was canceled", jobItem.JobId);
 059            }
 060            catch (Exception ex)
 61            {
 062                _logger.LogError(ex, "Job {JobId} failed", jobItem.JobId);
 063            }
 64            finally
 65            {
 66                // Notify that the job has completed (whether successfully or not)
 067                jobItem.OnJobCompleted(jobItem.JobId);
 68            }
 069        }
 070    }
 71}