< Summary

Information
Class: Elsa.Common.Multitenancy.EventHandlers.TenantTaskManager
Assembly: Elsa.Common
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Common/Multitenancy/EventHandlers/TenantTaskManager.cs
Line coverage
71%
Covered lines: 111
Uncovered lines: 45
Coverable lines: 156
Total lines: 297
Line coverage: 71.1%
Branch coverage
85%
Covered branches: 36
Total branches: 42
Branch coverage: 85.7%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
TenantActivatedAsync()75%9876.19%
TenantDeactivatedAsync()50%2288.88%
RunStartupTasksAsync()100%22100%
RunBackgroundTasksAsync(...)83.33%66100%
StartRecurringTasksAsync()75%44100%
<StartRecurringTasksAsync()50%3236.36%
DisposeAsync()83.33%7670.58%
StopTenantCoreAsync()100%271045.09%
GetTenantId(...)100%11100%
get_Gate()100%11100%
get_RunningBackgroundTasks()100%11100%
get_ScheduledTimers()100%11100%
get_RecurringTasks()100%11100%
get_CancellationTokenSource()100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Common/Multitenancy/EventHandlers/TenantTaskManager.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using System.Reflection;
 3using Elsa.Common.Helpers;
 4using Elsa.Common.RecurringTasks;
 5using Microsoft.Extensions.DependencyInjection;
 6using Microsoft.Extensions.Logging;
 7
 8namespace Elsa.Common.Multitenancy.EventHandlers;
 9
 10/// <summary>
 11/// Manages the lifecycle of startup, background, and recurring tasks for tenants.
 12/// Executes tasks in the proper sequence: startup tasks first, then background tasks, then recurring tasks.
 13/// </summary>
 614public class TenantTaskManager(RecurringTaskScheduleManager scheduleManager, ILogger<TenantTaskManager> logger) : ITenan
 15{
 616    private readonly ConcurrentDictionary<string, TenantRuntimeState> _tenantStates = new();
 617    private readonly CancellationTokenSource _shutdownCancellationTokenSource = new();
 18    private int _disposeRequested;
 19
 20    public async Task TenantActivatedAsync(TenantActivatedEventArgs args)
 21    {
 1522        if (Volatile.Read(ref _disposeRequested) == 1)
 023            return;
 24
 1525        using var activationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(args.CancellationToken, _shutd
 1526        var cancellationToken = activationTokenSource.Token;
 1527        var tenantScope = args.TenantScope;
 1528        var taskExecutor = tenantScope.ServiceProvider.GetRequiredService<ITaskExecutor>();
 1529        var tenantId = GetTenantId(args.Tenant);
 3030        var state = _tenantStates.GetOrAdd(tenantId, static _ => new TenantRuntimeState());
 31
 1532        await state.Gate.WaitAsync(cancellationToken);
 33
 34        try
 35        {
 1536            if (Volatile.Read(ref _disposeRequested) == 1)
 037                return;
 38
 1539            await StopTenantCoreAsync(state, cancellationToken);
 1540            state.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 41
 42            // Step 1: Run startup tasks (with dependency ordering)
 1543            await RunStartupTasksAsync(tenantScope, taskExecutor, cancellationToken);
 44
 45            // Step 2: Run background tasks
 1546            await RunBackgroundTasksAsync(tenantScope, taskExecutor, state, cancellationToken);
 47
 48            // Step 3: Start recurring tasks
 1549            await StartRecurringTasksAsync(tenantScope, taskExecutor, state, cancellationToken);
 1550        }
 051        catch
 52        {
 053            await StopTenantCoreAsync(state, cancellationToken);
 054            throw;
 55        }
 56        finally
 57        {
 1558            state.Gate.Release();
 59        }
 1560    }
 61
 62    public async Task TenantDeactivatedAsync(TenantDeactivatedEventArgs args)
 63    {
 1464        var tenantId = GetTenantId(args.Tenant);
 65
 1466        if (!_tenantStates.TryGetValue(tenantId, out var state))
 067            return;
 68
 1469        await state.Gate.WaitAsync(args.CancellationToken);
 70
 71        try
 72        {
 1373            _tenantStates.TryRemove(tenantId, out _);
 1374            await StopTenantCoreAsync(state, args.CancellationToken);
 1375        }
 76        finally
 77        {
 1378            state.Gate.Release();
 79        }
 1380    }
 81
 82    private async Task RunStartupTasksAsync(ITenantScope tenantScope, ITaskExecutor taskExecutor, CancellationToken canc
 83    {
 1584        var startupTasks = tenantScope.ServiceProvider.GetServices<IStartupTask>()
 10885            .OrderBy(x => x.GetType().GetCustomAttribute<OrderAttribute>()?.Order ?? 0f)
 1586            .ToList();
 87
 88        // First apply OrderAttribute to determine a base order, then perform topological sorting.
 89        // The topological sort is the final ordering step to ensure dependency constraints are respected.
 1590        var sortedTasks = TopologicalTaskSorter.Sort(startupTasks).ToList();
 24691        foreach (var task in sortedTasks)
 10892            await taskExecutor.ExecuteTaskAsync(task, cancellationToken);
 1593    }
 94
 95    private Task RunBackgroundTasksAsync(ITenantScope tenantScope, ITaskExecutor taskExecutor, TenantRuntimeState state,
 96    {
 1597        var backgroundTasks = tenantScope.ServiceProvider.GetServices<IBackgroundTask>();
 1598        var backgroundTaskStarter = tenantScope.ServiceProvider.GetRequiredService<IBackgroundTaskStarter>();
 1599        var tenantCancellationToken = state.CancellationTokenSource?.Token ?? cancellationToken;
 100
 54101        foreach (var backgroundTask in backgroundTasks)
 102        {
 12103            var task = backgroundTaskStarter
 12104                .StartAsync(backgroundTask, tenantCancellationToken)
 12105                .ContinueWith(_ => taskExecutor.ExecuteTaskAsync(backgroundTask, tenantCancellationToken),
 12106                    cancellationToken,
 12107                    TaskContinuationOptions.RunContinuationsAsynchronously,
 12108                    TaskScheduler.Default)
 12109                .Unwrap();
 110
 12111            if (!task.IsCompleted)
 12112                state.RunningBackgroundTasks.Add(task);
 113        }
 114
 15115        return Task.CompletedTask;
 116    }
 117
 118    private async Task StartRecurringTasksAsync(ITenantScope tenantScope, ITaskExecutor taskExecutor, TenantRuntimeState
 119    {
 15120        var recurringTasks = tenantScope.ServiceProvider.GetServices<IRecurringTask>().ToList();
 15121        var tenantCancellationToken = state.CancellationTokenSource?.Token ?? cancellationToken;
 122
 108123        foreach (var task in recurringTasks)
 124        {
 39125            var schedule = scheduleManager.GetScheduleFor(task.GetType());
 39126            var timer = schedule.CreateTimer(async () =>
 39127            {
 72128                if (tenantCancellationToken.IsCancellationRequested)
 0129                    return;
 39130
 39131                try
 39132                {
 72133                    await taskExecutor.ExecuteTaskAsync(task, tenantCancellationToken);
 72134                }
 0135                catch (OperationCanceledException e)
 39136                {
 0137                    logger.LogInformation(e, "Recurring task {TaskType} was cancelled", task.GetType().Name);
 0138                }
 0139                catch (Exception e) when (!e.IsFatal())
 39140                {
 39141                    // Log but don't rethrow - recurring tasks should not crash the host
 0142                    logger.LogError(e, "Recurring task {TaskType} failed with an error", task.GetType().Name);
 0143                }
 111144            }, logger);
 145
 39146            state.ScheduledTimers.Add(timer);
 39147            state.RecurringTasks.Add(task);
 39148            await task.StartAsync(cancellationToken);
 149        }
 15150    }
 151
 152    public async ValueTask DisposeAsync()
 153    {
 11154        if (Interlocked.Exchange(ref _disposeRequested, 1) == 1)
 6155            return;
 156
 157        try
 158        {
 5159            await _shutdownCancellationTokenSource.CancelAsync();
 5160        }
 0161        catch (ObjectDisposedException)
 162        {
 163            // Already disposed by a concurrent path.
 0164        }
 165
 14166        foreach (var state in _tenantStates.Values)
 167        {
 2168            await state.Gate.WaitAsync();
 169
 170            try
 171            {
 2172                await StopTenantCoreAsync(state, CancellationToken.None);
 2173            }
 174            finally
 175            {
 2176                state.Gate.Release();
 177            }
 2178        }
 179
 180        try
 181        {
 5182            _shutdownCancellationTokenSource.Dispose();
 5183        }
 0184        catch (ObjectDisposedException)
 185        {
 186            // Already disposed by a concurrent path.
 0187        }
 11188    }
 189
 190    private async Task StopTenantCoreAsync(TenantRuntimeState state, CancellationToken cancellationToken)
 191    {
 30192        var cancellationTokenSource = state.CancellationTokenSource;
 193
 194        // Cancel first so callbacks and long-running tasks can drain while resources are being disposed.
 30195        if (cancellationTokenSource != null)
 196        {
 197            try
 198            {
 15199                await cancellationTokenSource.CancelAsync();
 15200            }
 0201            catch (ObjectDisposedException)
 202            {
 203                // Already disposed by a concurrent path.
 0204            }
 0205            catch (Exception e) when (!e.IsFatal())
 206            {
 0207                logger.LogWarning(e, "Failed to cancel tenant task cancellation token source while stopping tenant tasks
 0208            }
 209        }
 210
 138211        foreach (var timer in state.ScheduledTimers)
 212        {
 213            try
 214            {
 39215                await timer.DisposeAsync();
 39216            }
 0217            catch (ObjectDisposedException)
 218            {
 219                // Timer is already disposed; this can happen during concurrent shutdown paths.
 0220            }
 0221            catch (Exception e) when (!e.IsFatal())
 222            {
 0223                logger.LogWarning(e, "Failed to dispose a recurring timer while stopping tenant tasks");
 0224            }
 225        }
 30226        state.ScheduledTimers.Clear();
 227
 30228        if (state.RunningBackgroundTasks.Count > 0)
 229        {
 230            try
 231            {
 12232                await Task.WhenAll(state.RunningBackgroundTasks);
 12233            }
 0234            catch (OperationCanceledException)
 235            {
 236                // Expected when cancellation is requested.
 0237            }
 0238            catch (AggregateException e)
 239            {
 0240                logger.LogError(e, "One or more background tasks failed while stopping tenant tasks");
 0241            }
 0242            catch (InvalidOperationException e)
 243            {
 0244                logger.LogError(e, "Background task collection was in an invalid state while stopping tenant tasks");
 0245            }
 12246            state.RunningBackgroundTasks.Clear();
 247        }
 248
 138249        foreach (var recurringTask in state.RecurringTasks)
 250        {
 251            try
 252            {
 39253                await recurringTask.StopAsync(cancellationToken);
 39254            }
 0255            catch (OperationCanceledException)
 256            {
 257                // Expected if caller requested cancellation during tenant deactivation.
 0258            }
 0259            catch (Exception e) when (!e.IsFatal())
 260            {
 0261                logger.LogError(e, "Failed to stop recurring task {TaskType}", recurringTask.GetType().Name);
 0262            }
 39263        }
 30264        state.RecurringTasks.Clear();
 265
 30266        if (cancellationTokenSource == null)
 15267            return;
 268
 269        try
 270        {
 15271            cancellationTokenSource.Dispose();
 15272        }
 0273        catch (ObjectDisposedException)
 274        {
 275            // Already disposed by a concurrent path.
 0276        }
 0277        catch (Exception e) when (!e.IsFatal())
 278        {
 0279            logger.LogWarning(e, "Failed to dispose tenant task cancellation token source");
 0280        }
 281
 15282        state.CancellationTokenSource = null;
 30283    }
 284
 29285    private static string GetTenantId(Tenant tenant) => tenant.Id;
 286
 287    private class TenantRuntimeState
 288    {
 289        // SemaphoreSlim only allocates a kernel handle when AvailableWaitHandle is accessed.
 290        // Since we exclusively use WaitAsync(), no kernel handle is ever created and disposal is a no-op.
 76291        public SemaphoreSlim Gate { get; } = new(1, 1);
 81292        public List<Task> RunningBackgroundTasks { get; } = [];
 114293        public List<ScheduledTimer> ScheduledTimers { get; } = [];
 114294        public List<IRecurringTask> RecurringTasks { get; } = [];
 90295        public CancellationTokenSource? CancellationTokenSource { get; set; }
 296    }
 297}