< Summary

Information
Class: Elsa.Common.Multitenancy.TenantTaskLifecycleCoordinator
Assembly: Elsa.Common
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Common/Multitenancy/Implementations/TenantTaskLifecycleCoordinator.cs
Line coverage
67%
Covered lines: 117
Uncovered lines: 57
Coverable lines: 174
Total lines: 328
Line coverage: 67.2%
Branch coverage
84%
Covered branches: 37
Total branches: 44
Branch coverage: 84%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
ActivateTenantAsync()75%9876.19%
DeactivateTenantAsync()50%2288.88%
RunStartupTasksAsync()100%22100%
RunBackgroundTasksAsync()83.33%66100%
RunBackgroundTaskAsync()100%1133.33%
StartRecurringTasksAsync()75%44100%
<StartRecurringTasksAsync()50%3236.36%
DisposeAsync()83.33%7670.58%
StopTenantCoreAsync()100%351245.9%
GetTenantId(...)100%11100%
get_Gate()100%11100%
get_RunningBackgroundTasks()100%11100%
get_BackgroundTasks()100%11100%
get_ScheduledTimers()100%11100%
get_RecurringTasks()100%11100%
get_CancellationTokenSource()100%11100%
get_Task()100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Common/Multitenancy/Implementations/TenantTaskLifecycleCoordinator.cs

#LineLine coverage
 1using System.Collections.Concurrent;
 2using System.Reflection;
 3using Elsa.Common.Helpers;
 4using Elsa.Common.RecurringTasks;
 5using Microsoft.Extensions.DependencyInjection;
 6using Microsoft.Extensions.Logging;
 7
 8namespace Elsa.Common.Multitenancy;
 9
 10/// <summary>
 11/// Coordinates the lifecycle of startup, background, and recurring tasks for tenants.
 12/// Executes tasks in the proper sequence: startup tasks first, then background tasks, then recurring tasks.
 13/// </summary>
 914public class TenantTaskLifecycleCoordinator(RecurringTaskScheduleManager scheduleManager, ILogger<TenantTaskLifecycleCoo
 15{
 916    private readonly ConcurrentDictionary<string, TenantRuntimeState> _tenantStates = new();
 917    private readonly CancellationTokenSource _shutdownCancellationTokenSource = new();
 18    private int _disposeRequested;
 19
 20    public async Task ActivateTenantAsync(TenantActivatedEventArgs args)
 21    {
 1822        if (Volatile.Read(ref _disposeRequested) == 1)
 023            return;
 24
 1825        using var activationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(args.CancellationToken, _shutd
 1826        var cancellationToken = activationTokenSource.Token;
 1827        var tenantScope = args.TenantScope;
 1828        var taskExecutor = tenantScope.ServiceProvider.GetRequiredService<ITaskExecutor>();
 1829        var tenantId = GetTenantId(args.Tenant);
 3630        var state = _tenantStates.GetOrAdd(tenantId, static _ => new TenantRuntimeState());
 31
 1832        await state.Gate.WaitAsync(cancellationToken);
 33
 34        try
 35        {
 1836            if (Volatile.Read(ref _disposeRequested) == 1)
 037                return;
 38
 1839            await StopTenantCoreAsync(state, cancellationToken);
 1840            state.CancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
 41
 42            // Step 1: Run startup tasks (with dependency ordering)
 1843            await RunStartupTasksAsync(tenantScope, taskExecutor, cancellationToken);
 44
 45            // Step 2: Run background tasks
 1846            await RunBackgroundTasksAsync(tenantScope, taskExecutor, state, cancellationToken);
 47
 48            // Step 3: Start recurring tasks
 1849            await StartRecurringTasksAsync(tenantScope, taskExecutor, state, cancellationToken);
 1850        }
 051        catch
 52        {
 053            await StopTenantCoreAsync(state, cancellationToken);
 054            throw;
 55        }
 56        finally
 57        {
 1858            state.Gate.Release();
 59        }
 1860    }
 61
 62    public async Task DeactivateTenantAsync(TenantDeactivatedEventArgs args)
 63    {
 1864        var tenantId = GetTenantId(args.Tenant);
 65
 1866        if (!_tenantStates.TryGetValue(tenantId, out var state))
 067            return;
 68
 1869        await state.Gate.WaitAsync(args.CancellationToken);
 70
 71        try
 72        {
 1773            _tenantStates.TryRemove(tenantId, out _);
 1774            await StopTenantCoreAsync(state, args.CancellationToken);
 1775        }
 76        finally
 77        {
 1778            state.Gate.Release();
 79        }
 1780    }
 81
 82    private async Task RunStartupTasksAsync(ITenantScope tenantScope, ITaskExecutor taskExecutor, CancellationToken canc
 83    {
 1884        var startupTasks = tenantScope.ServiceProvider.GetServices<IStartupTask>()
 15685            .OrderBy(x => x.GetType().GetCustomAttribute<OrderAttribute>()?.Order ?? 0f)
 1886            .ToList();
 87
 88        // First apply OrderAttribute to determine a base order, then perform topological sorting.
 89        // The topological sort is the final ordering step to ensure dependency constraints are respected.
 1890        var sortedTasks = TopologicalTaskSorter.Sort(startupTasks).ToList();
 35091        foreach (var task in sortedTasks)
 15792            await taskExecutor.ExecuteTaskAsync(task, cancellationToken);
 1893    }
 94
 95    private async Task RunBackgroundTasksAsync(ITenantScope tenantScope, ITaskExecutor taskExecutor, TenantRuntimeState 
 96    {
 1897        var backgroundTasks = tenantScope.ServiceProvider.GetServices<IBackgroundTask>().ToList();
 1898        var backgroundTaskStarter = tenantScope.ServiceProvider.GetRequiredService<IBackgroundTaskStarter>();
 1899        var tenantCancellationToken = state.CancellationTokenSource?.Token ?? cancellationToken;
 100
 64101        foreach (var backgroundTask in backgroundTasks)
 102        {
 14103            await backgroundTaskStarter.StartAsync(backgroundTask, tenantCancellationToken);
 14104            state.BackgroundTasks.Add(new(backgroundTask, backgroundTaskStarter));
 105
 14106            var task = RunBackgroundTaskAsync(backgroundTask, taskExecutor, tenantCancellationToken);
 107
 14108            if (!task.IsCompleted)
 14109                state.RunningBackgroundTasks.Add(task);
 14110        }
 18111    }
 112
 113    private async Task RunBackgroundTaskAsync(IBackgroundTask backgroundTask, ITaskExecutor taskExecutor, CancellationTo
 114    {
 115        try
 116        {
 14117            await taskExecutor.ExecuteTaskAsync(backgroundTask, cancellationToken);
 13118        }
 0119        catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
 120        {
 0121            logger.LogInformation("Background task {TaskType} was cancelled", backgroundTask.GetType().Name);
 0122        }
 0123        catch (Exception e) when (!e.IsFatal())
 124        {
 0125            logger.LogError(e, "Background task {TaskType} failed with an error", backgroundTask.GetType().Name);
 0126        }
 13127    }
 128
 129    private async Task StartRecurringTasksAsync(ITenantScope tenantScope, ITaskExecutor taskExecutor, TenantRuntimeState
 130    {
 18131        var recurringTasks = tenantScope.ServiceProvider.GetServices<IRecurringTask>().ToList();
 18132        var tenantCancellationToken = state.CancellationTokenSource?.Token ?? cancellationToken;
 133
 140134        foreach (var task in recurringTasks)
 135        {
 52136            var schedule = scheduleManager.GetScheduleFor(task.GetType());
 52137            var timer = schedule.CreateTimer(async () =>
 52138            {
 191139                if (tenantCancellationToken.IsCancellationRequested)
 0140                    return;
 52141
 52142                try
 52143                {
 191144                    await taskExecutor.ExecuteTaskAsync(task, tenantCancellationToken);
 191145                }
 0146                catch (OperationCanceledException e)
 52147                {
 0148                    logger.LogInformation(e, "Recurring task {TaskType} was cancelled", task.GetType().Name);
 0149                }
 0150                catch (Exception e) when (!e.IsFatal())
 52151                {
 52152                    // Log but don't rethrow - recurring tasks should not crash the host
 0153                    logger.LogError(e, "Recurring task {TaskType} failed with an error", task.GetType().Name);
 0154                }
 243155            }, logger);
 156
 52157            state.ScheduledTimers.Add(timer);
 52158            state.RecurringTasks.Add(task);
 52159            await task.StartAsync(cancellationToken);
 160        }
 18161    }
 162
 163    public async ValueTask DisposeAsync()
 164    {
 5165        if (Interlocked.Exchange(ref _disposeRequested, 1) == 1)
 0166            return;
 167
 168        try
 169        {
 5170            await _shutdownCancellationTokenSource.CancelAsync();
 5171        }
 0172        catch (ObjectDisposedException)
 173        {
 174            // Already disposed by a concurrent path.
 0175        }
 176
 14177        foreach (var state in _tenantStates.Values)
 178        {
 2179            await state.Gate.WaitAsync();
 180
 181            try
 182            {
 2183                await StopTenantCoreAsync(state, CancellationToken.None);
 2184            }
 185            finally
 186            {
 2187                state.Gate.Release();
 188            }
 2189        }
 190
 191        try
 192        {
 5193            _shutdownCancellationTokenSource.Dispose();
 5194        }
 0195        catch (ObjectDisposedException)
 196        {
 197            // Already disposed by a concurrent path.
 0198        }
 5199    }
 200
 201    private async Task StopTenantCoreAsync(TenantRuntimeState state, CancellationToken cancellationToken)
 202    {
 37203        var cancellationTokenSource = state.CancellationTokenSource;
 204
 205        // Cancel first so callbacks and long-running tasks can drain while resources are being disposed.
 37206        if (cancellationTokenSource != null)
 207        {
 208            try
 209            {
 17210                await cancellationTokenSource.CancelAsync();
 17211            }
 0212            catch (ObjectDisposedException)
 213            {
 214                // Already disposed by a concurrent path.
 0215            }
 0216            catch (Exception e) when (!e.IsFatal())
 217            {
 0218                logger.LogWarning(e, "Failed to cancel tenant task cancellation token source while stopping tenant tasks
 0219            }
 220        }
 221
 178222        foreach (var timer in state.ScheduledTimers)
 223        {
 224            try
 225            {
 52226                await timer.DisposeAsync();
 52227            }
 0228            catch (ObjectDisposedException)
 229            {
 230                // Timer is already disposed; this can happen during concurrent shutdown paths.
 0231            }
 0232            catch (Exception e) when (!e.IsFatal())
 233            {
 0234                logger.LogWarning(e, "Failed to dispose a recurring timer while stopping tenant tasks");
 0235            }
 236        }
 37237        state.ScheduledTimers.Clear();
 238
 100239        foreach (var backgroundTask in state.BackgroundTasks)
 240        {
 241            try
 242            {
 13243                await backgroundTask.Starter.StopAsync(backgroundTask.Task, cancellationToken);
 13244            }
 0245            catch (OperationCanceledException)
 246            {
 247                // Expected if caller requested cancellation during tenant deactivation.
 0248            }
 0249            catch (Exception e) when (!e.IsFatal())
 250            {
 0251                logger.LogError(e, "Failed to stop background task {TaskType}", backgroundTask.Task.GetType().Name);
 0252            }
 13253        }
 37254        state.BackgroundTasks.Clear();
 255
 37256        if (state.RunningBackgroundTasks.Count > 0)
 257        {
 258            try
 259            {
 13260                await Task.WhenAll(state.RunningBackgroundTasks);
 13261            }
 0262            catch (OperationCanceledException)
 263            {
 264                // Expected when cancellation is requested.
 0265            }
 0266            catch (AggregateException e)
 267            {
 0268                logger.LogError(e, "One or more background tasks failed while stopping tenant tasks");
 0269            }
 0270            catch (InvalidOperationException e)
 271            {
 0272                logger.LogError(e, "Background task collection was in an invalid state while stopping tenant tasks");
 0273            }
 13274            state.RunningBackgroundTasks.Clear();
 275        }
 276
 178277        foreach (var recurringTask in state.RecurringTasks)
 278        {
 279            try
 280            {
 52281                await recurringTask.StopAsync(cancellationToken);
 52282            }
 0283            catch (OperationCanceledException)
 284            {
 285                // Expected if caller requested cancellation during tenant deactivation.
 0286            }
 0287            catch (Exception e) when (!e.IsFatal())
 288            {
 0289                logger.LogError(e, "Failed to stop recurring task {TaskType}", recurringTask.GetType().Name);
 0290            }
 52291        }
 37292        state.RecurringTasks.Clear();
 293
 37294        if (cancellationTokenSource == null)
 20295            return;
 296
 297        try
 298        {
 17299            cancellationTokenSource.Dispose();
 17300        }
 0301        catch (ObjectDisposedException)
 302        {
 303            // Already disposed by a concurrent path.
 0304        }
 0305        catch (Exception e) when (!e.IsFatal())
 306        {
 0307            logger.LogWarning(e, "Failed to dispose tenant task cancellation token source");
 0308        }
 309
 17310        state.CancellationTokenSource = null;
 37311    }
 312
 36313    private static string GetTenantId(Tenant tenant) => tenant.Id.NormalizeTenantId();
 314
 315    private class TenantRuntimeState
 316    {
 317        // SemaphoreSlim only allocates a kernel handle when AvailableWaitHandle is accessed.
 318        // Since we exclusively use WaitAsync(), no kernel handle is ever created and disposal is a no-op.
 93319        public SemaphoreSlim Gate { get; } = new(1, 1);
 95320        public List<Task> RunningBackgroundTasks { get; } = [];
 106321        public List<BackgroundTaskRegistration> BackgroundTasks { get; } = [];
 144322        public List<ScheduledTimer> ScheduledTimers { get; } = [];
 144323        public List<IRecurringTask> RecurringTasks { get; } = [];
 108324        public CancellationTokenSource? CancellationTokenSource { get; set; }
 325    }
 326
 40327    private record BackgroundTaskRegistration(IBackgroundTask Task, IBackgroundTaskStarter Starter);
 328}