< Summary

Information
Class: Elsa.Mediator.Services.JobQueue
Assembly: Elsa.Mediator
File(s): /home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/Services/JobQueue.cs
Line coverage
10%
Covered lines: 3
Uncovered lines: 26
Coverable lines: 29
Total lines: 72
Line coverage: 10.3%
Branch coverage
0%
Covered branches: 0
Total branches: 10
Branch coverage: 0%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
Create(...)100%210%
Enqueue(...)0%620%
Enqueue(...)100%210%
Dequeue(...)0%2040%
Cancel(...)0%2040%
CreateJob(...)100%210%
OnJobCompleted(...)100%210%

File(s)

/home/runner/work/elsa-core/elsa-core/src/common/Elsa.Mediator/Services/JobQueue.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using Elsa.Mediator.Contracts;
 3using Elsa.Mediator.Models;
 4using Microsoft.Extensions.Logging;
 5
 6namespace Elsa.Mediator.Services;
 7
 8/// <inheritdoc />
 19public class JobQueue(IJobsChannel jobsChannel, ILogger<JobQueue> logger) : IJobQueue
 10{
 111    private readonly ConcurrentDictionary<string, EnqueuedJob> _scheduledItems = new();
 112    private readonly ConcurrentDictionary<string, EnqueuedJob> _pendingItems = new();
 13
 14    /// <inheritdoc />
 15    public string Create(Func<CancellationToken, Task> job)
 16    {
 017        var jobItem = CreateJob(job);
 018        _pendingItems.TryAdd(jobItem.JobId, jobItem);
 019        return jobItem.JobId;
 20    }
 21
 22    public void Enqueue(string jobId)
 23    {
 024        if (!_pendingItems.TryRemove(jobId, out var jobItem))
 25        {
 026            logger.LogWarning($"Job {jobId} was not found");
 027            return;
 28        }
 29
 030        _scheduledItems.TryAdd(jobItem.JobId, jobItem);
 031        jobsChannel.Writer.TryWrite(jobItem);
 032    }
 33
 34    /// <inheritdoc />
 35    public string Enqueue(Func<CancellationToken, Task> job)
 36    {
 037        var jobItem = CreateJob(job);
 038        _scheduledItems.TryAdd(jobItem.JobId, jobItem);
 039        jobsChannel.Writer.TryWrite(jobItem);
 040        return jobItem.JobId;
 41    }
 42
 43    /// <inheritdoc />
 44    public bool Dequeue(string jobId)
 45    {
 046        if (!_pendingItems.TryRemove(jobId, out _))
 047            if (!_scheduledItems.TryRemove(jobId, out _))
 048                return false;
 49
 050        return true;
 51    }
 52
 53    /// <inheritdoc />
 54    public bool Cancel(string jobId)
 55    {
 056        if (!_pendingItems.TryGetValue(jobId, out var jobItem))
 057            if (!_scheduledItems.TryGetValue(jobId, out jobItem))
 058                return false;
 59
 060        jobItem.CancellationTokenSource.Cancel();
 061        return true;
 62    }
 63
 64    private EnqueuedJob CreateJob(Func<CancellationToken, Task> job)
 65    {
 066        var jobId = Guid.NewGuid().ToString();
 067        var cts = new CancellationTokenSource();
 068        return new EnqueuedJob(jobId, job, cts, OnJobCompleted);
 69    }
 70
 071    void OnJobCompleted(string completedJobId) => _scheduledItems.TryRemove(completedJobId, out _);
 72}