< Summary

Information
Class: Elsa.Workflows.Runtime.KeyValueWorkflowDispatchOutboxStore
Assembly: Elsa.Workflows.Runtime
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/KeyValueWorkflowDispatchOutboxStore.cs
Line coverage
96%
Covered lines: 199
Uncovered lines: 8
Coverable lines: 207
Total lines: 358
Line coverage: 96.1%
Branch coverage
90%
Covered branches: 65
Total branches: 72
Branch coverage: 90.2%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
SaveAsync()100%11100%
SaveIndexAsync()100%11100%
FindManyAsync()100%11100%
FindManyAsync()100%22100%
DeleteAsync()100%44100%
FindIndexedItemsAsync()90%101089.65%
FindRecoveryItemsAsync()90%101088.46%
FindLegacyItemsAsync()100%1010100%
DeleteIndexesForMissingItemAsync()100%88100%
DeleteUnrecoverableItemAsync()100%44100%
GetItemKey(...)100%11100%
GetLegacyKey(...)100%11100%
GetIndexKey(...)100%11100%
GetIndexByIdKey(...)100%11100%
GetRecoveryKey(...)100%11100%
GetLegacyScanTake(...)75%44100%
IsLegacyScanComplete(...)100%22100%
IsIndexKeyForId(...)71.42%151481.81%
IsRecoverableItemRecord(...)100%66100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Workflows.Runtime/Services/KeyValueWorkflowDispatchOutboxStore.cs

#LineLine coverage
 1using Elsa.KeyValues.Contracts;
 2using Elsa.KeyValues.Entities;
 3using Elsa.KeyValues.Models;
 4using Elsa.Workflows.Runtime.Models;
 5
 6namespace Elsa.Workflows.Runtime;
 7
 8/// <summary>
 9/// Stores workflow dispatch outbox items in the existing key-value store.
 10/// </summary>
 1711public class KeyValueWorkflowDispatchOutboxStore(IKeyValueStore keyValueStore, IPayloadSerializer payloadSerializer) : I
 12{
 13    private const string LegacyKeyPrefix = "Elsa:WorkflowDispatchOutbox:";
 14    private const string ItemKeyPrefix = "Elsa:WorkflowDispatchOutbox:Items:";
 15    private const string IndexKeyPrefix = "Elsa:WorkflowDispatchOutbox:Index:";
 16    private const string IndexByIdKeyPrefix = "Elsa:WorkflowDispatchOutbox:IndexById:";
 17    private const string RecoveryKeyPrefix = "Elsa:WorkflowDispatchOutbox:Recovery:";
 18    private const string StateKeyPrefix = "Elsa:WorkflowDispatchOutbox:State:";
 19    private const string LegacyScanCompletedKey = $"{StateKeyPrefix}LegacyScanCompleted";
 20
 21    /// <inheritdoc />
 22    public async Task SaveAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken = default)
 23    {
 924        await keyValueStore.SaveAsync(new SerializedKeyValuePair
 925        {
 926            Key = GetRecoveryKey(item.Id),
 927            SerializedValue = item.Id,
 928            TenantId = item.TenantId
 929        }, cancellationToken);
 30
 931        await keyValueStore.SaveAsync(new SerializedKeyValuePair
 932        {
 933            Key = GetItemKey(item.Id),
 934            SerializedValue = payloadSerializer.Serialize(item),
 935            TenantId = item.TenantId
 936        }, cancellationToken);
 37
 938        await SaveIndexAsync(item, cancellationToken);
 939        await keyValueStore.DeleteAsync(GetRecoveryKey(item.Id), cancellationToken);
 940    }
 41
 42    private async Task SaveIndexAsync(WorkflowDispatchOutboxItem item, CancellationToken cancellationToken)
 43    {
 1244        var indexKey = GetIndexKey(item);
 45
 1246        await keyValueStore.SaveAsync(new SerializedKeyValuePair
 1247        {
 1248            Key = indexKey,
 1249            SerializedValue = item.Id,
 1250            TenantId = item.TenantId
 1251        }, cancellationToken);
 52
 1253        await keyValueStore.SaveAsync(new SerializedKeyValuePair
 1254        {
 1255            Key = GetIndexByIdKey(item.Id),
 1256            SerializedValue = indexKey,
 1257            TenantId = item.TenantId
 1258        }, cancellationToken);
 1259    }
 60
 61    /// <inheritdoc />
 62    public async Task<IEnumerable<WorkflowDispatchOutboxItem>> FindManyAsync(CancellationToken cancellationToken = defau
 63    {
 364        return await FindManyAsync(maxCount: 0, cancellationToken);
 365    }
 66
 67    /// <inheritdoc />
 68    public async Task<IEnumerable<WorkflowDispatchOutboxItem>> FindManyAsync(int maxCount, CancellationToken cancellatio
 69    {
 1070        var indexedItems = await FindIndexedItemsAsync(maxCount, cancellationToken);
 1071        var recoverableItems = await FindRecoveryItemsAsync(maxCount, cancellationToken);
 1072        var legacyItems = await FindLegacyItemsAsync(maxCount, cancellationToken);
 1073        var items = indexedItems
 1074            .Concat(recoverableItems)
 1075            .Concat(legacyItems)
 1776            .GroupBy(x => x.Id)
 1677            .Select(x => x.First())
 2578            .OrderBy(x => x.CreatedAt);
 79
 1080        if (maxCount > 0)
 781            return items.Take(maxCount).ToList();
 82
 383        return items.ToList();
 1084    }
 85
 86    /// <inheritdoc />
 87    public async Task DeleteAsync(string id, CancellationToken cancellationToken = default)
 88    {
 689        var itemKey = GetItemKey(id);
 690        var record = await keyValueStore.FindAsync(new KeyValueFilter { Key = itemKey }, cancellationToken);
 91
 692        if (record == null)
 93        {
 494            await DeleteIndexesForMissingItemAsync(id, cancellationToken);
 495            await keyValueStore.DeleteAsync(GetRecoveryKey(id), cancellationToken);
 496            await keyValueStore.DeleteAsync(GetLegacyKey(id), cancellationToken);
 497            await keyValueStore.DeleteAsync(itemKey, cancellationToken);
 498            return;
 99        }
 100
 2101        var item = payloadSerializer.Deserialize<WorkflowDispatchOutboxItem>(record.SerializedValue);
 102
 2103        if (item != null)
 104        {
 1105            await keyValueStore.DeleteAsync(GetIndexKey(item), cancellationToken);
 1106            await keyValueStore.DeleteAsync(GetIndexByIdKey(id), cancellationToken);
 107        }
 108        else
 109        {
 1110            await DeleteIndexesForMissingItemAsync(id, cancellationToken);
 111        }
 112
 2113        await keyValueStore.DeleteAsync(GetRecoveryKey(id), cancellationToken);
 2114        await keyValueStore.DeleteAsync(GetLegacyKey(id), cancellationToken);
 2115        await keyValueStore.DeleteAsync(itemKey, cancellationToken);
 6116    }
 117
 118    private async Task<IEnumerable<WorkflowDispatchOutboxItem>> FindIndexedItemsAsync(int maxCount, CancellationToken ca
 119    {
 10120        var indexRecords = (await keyValueStore.FindManyAsync(new KeyValueFilter
 10121        {
 10122            Key = IndexKeyPrefix,
 10123            StartsWith = true,
 10124            OrderByKey = true,
 10125            Take = maxCount > 0 ? maxCount : null
 10126        }, cancellationToken)).ToList();
 17127        var itemKeys = indexRecords.Select(x => GetItemKey(x.SerializedValue)).ToList();
 128
 10129        var itemRecords = itemKeys.Count == 0
 10130            ? Array.Empty<SerializedKeyValuePair>()
 10131            : await keyValueStore.FindManyAsync(new KeyValueFilter
 10132            {
 10133                Keys = itemKeys
 10134            }, cancellationToken);
 17135        var itemRecordLookup = itemRecords.ToDictionary(x => x.Key);
 10136        var items = new List<WorkflowDispatchOutboxItem>();
 137
 34138        foreach (var indexRecord in indexRecords)
 139        {
 7140            var id = indexRecord.SerializedValue;
 7141            var itemKey = GetItemKey(id);
 142
 7143            if (!itemRecordLookup.TryGetValue(itemKey, out var itemRecord))
 144            {
 0145                await keyValueStore.DeleteAsync(indexRecord.Key, cancellationToken);
 0146                await keyValueStore.DeleteAsync(GetIndexByIdKey(id), cancellationToken);
 0147                continue;
 148            }
 149
 7150            var item = payloadSerializer.Deserialize<WorkflowDispatchOutboxItem>(itemRecord.SerializedValue);
 151
 7152            if (item == null)
 153            {
 1154                await DeleteUnrecoverableItemAsync(id, itemKey, indexRecord.Key, recoveryKey: null, cancellationToken: c
 1155                continue;
 156            }
 157
 6158            items.Add(item);
 6159        }
 160
 10161        return items;
 10162    }
 163
 164    private async Task<IEnumerable<WorkflowDispatchOutboxItem>> FindRecoveryItemsAsync(int maxCount, CancellationToken c
 165    {
 10166        var recoveryRecords = (await keyValueStore.FindManyAsync(new KeyValueFilter
 10167        {
 10168            Key = RecoveryKeyPrefix,
 10169            StartsWith = true,
 10170            OrderByKey = true,
 10171            Take = maxCount > 0 ? maxCount : null
 10172        }, cancellationToken)).ToList();
 173
 10174        if (recoveryRecords.Count == 0)
 7175            return [];
 176
 7177        var itemKeys = recoveryRecords.Select(x => GetItemKey(x.SerializedValue)).ToList();
 7178        var itemRecords = (await keyValueStore.FindManyAsync(new KeyValueFilter { Keys = itemKeys }, cancellationToken))
 3179        var items = new List<WorkflowDispatchOutboxItem>();
 180
 14181        foreach (var recoveryRecord in recoveryRecords)
 182        {
 4183            var id = recoveryRecord.SerializedValue;
 184
 4185            if (!itemRecords.TryGetValue(GetItemKey(id), out var itemRecord))
 186            {
 0187                await DeleteIndexesForMissingItemAsync(id, cancellationToken);
 0188                await keyValueStore.DeleteAsync(recoveryRecord.Key, cancellationToken);
 0189                continue;
 190            }
 191
 4192            var item = payloadSerializer.Deserialize<WorkflowDispatchOutboxItem>(itemRecord.SerializedValue);
 193
 4194            if (item == null)
 195            {
 1196                await DeleteUnrecoverableItemAsync(id, itemRecord.Key, indexKey: null, recoveryKey: recoveryRecord.Key, 
 1197                continue;
 198            }
 199
 3200            await SaveIndexAsync(item, cancellationToken);
 3201            await keyValueStore.DeleteAsync(recoveryRecord.Key, cancellationToken);
 3202            items.Add(item);
 3203        }
 204
 3205        return items;
 10206    }
 207
 208    private async Task<IEnumerable<WorkflowDispatchOutboxItem>> FindLegacyItemsAsync(int maxCount, CancellationToken can
 209    {
 10210        var legacyScanCompleted = await keyValueStore.FindAsync(new KeyValueFilter { Key = LegacyScanCompletedKey }, can
 211
 10212        if (legacyScanCompleted != null)
 4213            return [];
 214
 6215        var records = await keyValueStore.FindManyAsync(new KeyValueFilter
 6216        {
 6217            Key = LegacyKeyPrefix,
 6218            StartsWith = true,
 6219            OrderByKey = true,
 6220            Take = GetLegacyScanTake(maxCount)
 6221        }, cancellationToken);
 6222        var recordList = records.ToList();
 223
 6224        var recoverableItems = recordList
 6225            .Where(IsRecoverableItemRecord)
 9226            .Select(x => new
 9227            {
 9228                Record = x,
 9229                Item = payloadSerializer.Deserialize<WorkflowDispatchOutboxItem>(x.SerializedValue)
 9230            })
 9231            .Where(x => x.Item != null)
 6232            .ToList();
 6233        var recoverableItemsToMigrate = maxCount > 0 ? recoverableItems.Take(maxCount).ToList() : recoverableItems;
 6234        var items = new List<WorkflowDispatchOutboxItem>();
 235
 28236        foreach (var recoverableItem in recoverableItemsToMigrate)
 237        {
 8238            var record = recoverableItem.Record;
 8239            var item = recoverableItem.Item!;
 8240            await SaveAsync(item, cancellationToken);
 241
 8242            if (!record.Key.StartsWith(ItemKeyPrefix, StringComparison.Ordinal))
 5243                await keyValueStore.DeleteAsync(record.Key, cancellationToken);
 244
 8245            items.Add(item);
 8246        }
 247
 6248        if (IsLegacyScanComplete(recordList.Count, maxCount))
 249        {
 5250            await keyValueStore.SaveAsync(new SerializedKeyValuePair
 5251            {
 5252                Key = LegacyScanCompletedKey,
 5253                SerializedValue = "true"
 5254            }, cancellationToken);
 255        }
 256
 6257        return items;
 10258    }
 259
 260    private async Task DeleteIndexesForMissingItemAsync(string id, CancellationToken cancellationToken)
 261    {
 6262        var lookupRecord = await keyValueStore.FindAsync(new KeyValueFilter { Key = GetIndexByIdKey(id) }, cancellationT
 263
 6264        if (lookupRecord != null)
 265        {
 5266            if (IsIndexKeyForId(lookupRecord.SerializedValue, id))
 267            {
 3268                await keyValueStore.DeleteAsync(lookupRecord.SerializedValue, cancellationToken);
 3269                await keyValueStore.DeleteAsync(lookupRecord.Key, cancellationToken);
 3270                return;
 271            }
 272
 2273            await keyValueStore.DeleteAsync(lookupRecord.Key, cancellationToken);
 274        }
 275
 3276        var matchingIndexRecords = await keyValueStore.FindManyAsync(new KeyValueFilter
 3277        {
 3278            Key = IndexKeyPrefix,
 3279            StartsWith = true
 3280        }, cancellationToken);
 281
 16282        foreach (var indexRecord in matchingIndexRecords.Where(x => x.SerializedValue == id || x.Key.EndsWith($":{id}", 
 283        {
 3284            await keyValueStore.DeleteAsync(indexRecord.Key, cancellationToken);
 285        }
 6286    }
 287
 288    private async Task DeleteUnrecoverableItemAsync(string id, string itemKey, string? indexKey, string? recoveryKey, Ca
 289    {
 2290        if (indexKey != null)
 291        {
 1292            await keyValueStore.DeleteAsync(indexKey, cancellationToken);
 1293            await keyValueStore.DeleteAsync(GetIndexByIdKey(id), cancellationToken);
 294        }
 295        else
 296        {
 1297            await DeleteIndexesForMissingItemAsync(id, cancellationToken);
 298        }
 299
 2300        await keyValueStore.DeleteAsync(recoveryKey ?? GetRecoveryKey(id), cancellationToken);
 2301        await keyValueStore.DeleteAsync(GetLegacyKey(id), cancellationToken);
 2302        await keyValueStore.DeleteAsync(itemKey, cancellationToken);
 2303    }
 304
 37305    private static string GetItemKey(string id) => $"{ItemKeyPrefix}{id}";
 306
 8307    private static string GetLegacyKey(string id) => $"{LegacyKeyPrefix}{id}";
 308
 13309    private static string GetIndexKey(WorkflowDispatchOutboxItem item) => $"{IndexKeyPrefix}{item.CreatedAt.UtcTicks:D20
 310
 20311    private static string GetIndexByIdKey(string id) => $"{IndexByIdKeyPrefix}{id}";
 312
 25313    private static string GetRecoveryKey(string id) => $"{RecoveryKeyPrefix}{id}";
 314
 315    private static int? GetLegacyScanTake(int maxCount)
 316    {
 12317        if (maxCount <= 0)
 4318            return null;
 319
 8320        return maxCount == int.MaxValue ? int.MaxValue : maxCount + 1;
 321    }
 322
 323    private static bool IsLegacyScanComplete(int recordCount, int maxCount)
 324    {
 6325        var scanTake = GetLegacyScanTake(maxCount);
 6326        return scanTake == null || recordCount < scanTake.Value;
 327    }
 328
 329    private static bool IsIndexKeyForId(string indexKey, string id)
 330    {
 5331        var expectedSuffix = $":{id}";
 332
 5333        if (!indexKey.StartsWith(IndexKeyPrefix, StringComparison.Ordinal) || !indexKey.EndsWith(expectedSuffix, StringC
 2334            return false;
 335
 3336        var ticksStart = IndexKeyPrefix.Length;
 3337        var ticksLength = indexKey.Length - ticksStart - expectedSuffix.Length;
 338
 3339        if (ticksLength != 20)
 0340            return false;
 341
 126342        foreach (var character in indexKey.AsSpan(ticksStart, ticksLength))
 343        {
 60344            if (character is < '0' or > '9')
 0345                return false;
 346        }
 347
 3348        return true;
 349    }
 350
 351    private static bool IsRecoverableItemRecord(SerializedKeyValuePair record)
 352    {
 10353        return !record.Key.StartsWith(IndexKeyPrefix, StringComparison.Ordinal)
 10354               && !record.Key.StartsWith(IndexByIdKeyPrefix, StringComparison.Ordinal)
 10355               && !record.Key.StartsWith(RecoveryKeyPrefix, StringComparison.Ordinal)
 10356               && !record.Key.StartsWith(StateKeyPrefix, StringComparison.Ordinal);
 357    }
 358}