< Summary

Information
Class: Elsa.Persistence.EFCore.Modules.Management.EFCoreWorkflowInstanceStore
Assembly: Elsa.Persistence.EFCore
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Persistence.EFCore/Modules/Management/WorkflowInstanceStore.cs
Line coverage
53%
Covered lines: 62
Uncovered lines: 54
Coverable lines: 116
Total lines: 263
Line coverage: 53.4%
Branch coverage
35%
Covered branches: 7
Total branches: 20
Branch coverage: 35%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
FindAsync()100%11100%
FindManyAsync()100%210%
FindManyAsync()100%210%
FindManyAsync()100%11100%
FindManyAsync()100%11100%
CountAsync()100%210%
SummarizeManyAsync()100%11100%
SummarizeManyAsync()100%22100%
SummarizeManyAsync()100%11100%
SummarizeManyAsync()100%11100%
FindManyIdsAsync()100%11100%
FindManyIdsAsync()100%210%
FindManyIdsAsync()0%2040%
DeleteAsync()100%11100%
UpdateUpdatedTimestampAsync()0%7280%
SaveAsync()100%11100%
AddAsync()100%210%
UpdateAsync()100%210%
SaveManyAsync()100%210%
OnSaveAsync()50%22100%
OnLoadAsync()66.66%7673.33%
Filter(...)100%11100%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.Persistence.EFCore/Modules/Management/WorkflowInstanceStore.cs

#LineLine coverage
 1using System.Diagnostics.CodeAnalysis;
 2using Elsa.Common;
 3using Elsa.Common.Codecs;
 4using Elsa.Common.Entities;
 5using Elsa.Common.Models;
 6using Elsa.Extensions;
 7using Elsa.Workflows;
 8using Elsa.Workflows.Management;
 9using Elsa.Workflows.Management.Entities;
 10using Elsa.Workflows.Management.Filters;
 11using Elsa.Workflows.Management.Models;
 12using Elsa.Workflows.Management.Options;
 13using JetBrains.Annotations;
 14using Microsoft.EntityFrameworkCore;
 15using Microsoft.Extensions.Logging;
 16using Microsoft.Extensions.Options;
 17using Open.Linq.AsyncExtensions;
 18
 19namespace Elsa.Persistence.EFCore.Modules.Management;
 20
 21/// <summary>
 22/// An EF Core implementation of <see cref="IWorkflowInstanceStore"/>.
 23/// </summary>
 24[UsedImplicitly]
 25public class EFCoreWorkflowInstanceStore : IWorkflowInstanceStore
 26{
 27    private readonly EntityStore<ManagementElsaDbContext, WorkflowInstance> _store;
 28    private readonly IWorkflowStateSerializer _workflowStateSerializer;
 29    private readonly ICompressionCodecResolver _compressionCodecResolver;
 30    private readonly IOptions<ManagementOptions> _options;
 31    private readonly ILogger<EFCoreWorkflowInstanceStore> _logger;
 32
 33    /// <summary>
 34    /// Constructor.
 35    /// </summary>
 32236    public EFCoreWorkflowInstanceStore(
 32237        EntityStore<ManagementElsaDbContext, WorkflowInstance> store,
 32238        IWorkflowStateSerializer workflowStateSerializer,
 32239        ICompressionCodecResolver compressionCodecResolver,
 32240        IOptions<ManagementOptions> options,
 32241        ILogger<EFCoreWorkflowInstanceStore> logger)
 42    {
 32243        _store = store;
 32244        _workflowStateSerializer = workflowStateSerializer;
 32245        _compressionCodecResolver = compressionCodecResolver;
 32246        _options = options;
 32247        _logger = logger;
 32248    }
 49
 50    /// <inheritdoc />
 51    public async ValueTask<WorkflowInstance?> FindAsync(WorkflowInstanceFilter filter, CancellationToken cancellationTok
 52    {
 21253        return await _store.QueryAsync(query => Filter(query, filter), OnLoadAsync, cancellationToken).FirstOrDefault();
 10654    }
 55
 56    /// <inheritdoc />
 57    public async ValueTask<Page<WorkflowInstance>> FindManyAsync(WorkflowInstanceFilter filter, PageArgs pageArgs, Cance
 58    {
 059        var orderBy = new WorkflowInstanceOrder<DateTimeOffset>(x => x.CreatedAt, OrderDirection.Ascending);
 060        return await FindManyAsync(filter, pageArgs, orderBy, cancellationToken);
 061    }
 62
 63    /// <inheritdoc />
 64    public async ValueTask<Page<WorkflowInstance>> FindManyAsync<TOrderBy>(WorkflowInstanceFilter filter, PageArgs pageA
 65    {
 066        var count = await _store.QueryAsync(query => Filter(query, filter), x => x.Id, cancellationToken).LongCount();
 067        var entities = await _store.QueryAsync(query => Filter(query, filter).OrderBy(order).Paginate(pageArgs), OnLoadA
 068        return Page.Of(entities, count);
 069    }
 70
 71    /// <inheritdoc />
 72    public async ValueTask<IEnumerable<WorkflowInstance>> FindManyAsync(WorkflowInstanceFilter filter, CancellationToken
 73    {
 474        var orderBy = new WorkflowInstanceOrder<DateTimeOffset>(x => x.CreatedAt, OrderDirection.Ascending);
 475        return await FindManyAsync(filter, orderBy, cancellationToken);
 476    }
 77
 78    /// <inheritdoc />
 79    public async ValueTask<IEnumerable<WorkflowInstance>> FindManyAsync<TOrderBy>(WorkflowInstanceFilter filter, Workflo
 80    {
 881        return await _store.QueryAsync(query => Filter(query, filter).OrderBy(order), OnLoadAsync, cancellationToken).To
 482    }
 83
 84    /// <inheritdoc />
 85    [RequiresUnreferencedCode("Calls Elsa.Workflows.Contracts.IWorkflowStateSerializer.SerializeAsync(WorkflowState, Can
 86    public async ValueTask<long> CountAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = default
 87    {
 088        return await _store.CountAsync(filter.Apply, cancellationToken);
 089    }
 90
 91    /// <inheritdoc />
 92    public async ValueTask<Page<WorkflowInstanceSummary>> SummarizeManyAsync(WorkflowInstanceFilter filter, PageArgs pag
 93    {
 294        var orderBy = new WorkflowInstanceOrder<DateTimeOffset>(x => x.CreatedAt, OrderDirection.Ascending);
 295        return await SummarizeManyAsync(filter, pageArgs, orderBy, cancellationToken);
 296    }
 97
 98    /// <inheritdoc />
 99    public async ValueTask<Page<WorkflowInstanceSummary>> SummarizeManyAsync<TOrderBy>(WorkflowInstanceFilter filter, Pa
 100    {
 2101        await using var dbContext = await _store.CreateDbContextAsync(cancellationToken);
 2102        var set = dbContext.WorkflowInstances;
 2103        var queryable = Filter(set.AsQueryable(), filter).OrderBy(order);
 2104        var count = await queryable.LongCountAsync(cancellationToken);
 2105        queryable = queryable.Paginate(pageArgs);
 2106        var entities = await queryable.Select(WorkflowInstanceSummary.FromInstanceExpression()).ToListAsync(cancellation
 107
 2108        return Page.Of(entities, count);
 2109    }
 110
 111    /// <inheritdoc />
 112    public async ValueTask<IEnumerable<WorkflowInstanceSummary>> SummarizeManyAsync(WorkflowInstanceFilter filter, Cance
 113    {
 6114        var orderBy = new WorkflowInstanceOrder<DateTimeOffset>(x => x.CreatedAt, OrderDirection.Ascending);
 6115        return await SummarizeManyAsync(filter, orderBy, cancellationToken);
 6116    }
 117
 118    /// <inheritdoc />
 119    public async ValueTask<IEnumerable<WorkflowInstanceSummary>> SummarizeManyAsync<TOrderBy>(WorkflowInstanceFilter fil
 120    {
 18121        return await _store.QueryAsync(query => Filter(query, filter).OrderBy(order), WorkflowInstanceSummary.FromInstan
 6122    }
 123
 124    /// <inheritdoc />
 125    public async ValueTask<IEnumerable<string>> FindManyIdsAsync(WorkflowInstanceFilter filter, CancellationToken cancel
 126    {
 6127        var entities = await _store.QueryAsync(query => Filter(query, filter).OrderBy(x => x.CreatedAt), WorkflowInstanc
 4128        return entities.Select(x => x.Id).ToList();
 2129    }
 130
 131    /// <inheritdoc />
 132    public async ValueTask<Page<string>> FindManyIdsAsync(WorkflowInstanceFilter filter, PageArgs pageArgs, Cancellation
 133    {
 0134        var orderBy = new WorkflowInstanceOrder<DateTimeOffset>(x => x.CreatedAt, OrderDirection.Ascending);
 0135        return await FindManyIdsAsync(filter, pageArgs, orderBy, cancellationToken);
 0136    }
 137
 138    /// <inheritdoc />
 139    public async ValueTask<Page<string>> FindManyIdsAsync<TOrderBy>(WorkflowInstanceFilter filter, PageArgs pageArgs, Wo
 140    {
 0141        await using var dbContext = await _store.CreateDbContextAsync(cancellationToken);
 0142        var set = dbContext.WorkflowInstances;
 0143        var queryable = Filter(set.AsQueryable(), filter).OrderBy(order);
 0144        var count = await queryable.LongCountAsync(cancellationToken);
 0145        queryable = queryable.Paginate(pageArgs);
 0146        var entities = await queryable.Select(WorkflowInstanceId.FromInstanceExpression()).ToListAsync(cancellationToken
 0147        var ids = entities.Select(x => x.Id).ToList();
 148
 0149        return Page.Of(ids, count);
 0150    }
 151
 152    /// <inheritdoc />
 153    public async ValueTask<long> DeleteAsync(WorkflowInstanceFilter filter, CancellationToken cancellationToken = defaul
 154    {
 26155        return await _store.DeleteWhereAsync(query => Filter(query, filter), cancellationToken);
 13156    }
 157
 158    public async Task UpdateUpdatedTimestampAsync(string workflowInstanceId, DateTimeOffset value, CancellationToken can
 159    {
 0160        var entity = new WorkflowInstance
 0161        {
 0162            Id = workflowInstanceId,
 0163            UpdatedAt = value
 0164        };
 165
 0166        await using var dbContext = await _store.CreateDbContextAsync(cancellationToken);
 0167        dbContext.Attach(entity);
 0168        dbContext.Entry(entity).Property(x => x.UpdatedAt).IsModified = true;
 169
 170        try
 171        {
 0172            await dbContext.SaveChangesAsync(cancellationToken);
 0173        }
 0174        catch (DbUpdateConcurrencyException e)
 175        {
 0176            foreach (var entry in e.Entries)
 177            {
 0178                var proposedValues = entry.CurrentValues;
 0179                var databaseValues = await entry.GetDatabaseValuesAsync(cancellationToken);
 180
 0181                if(databaseValues == null)
 182                    continue;
 183
 0184                var updatedAtProperty = entry.Metadata.GetProperty(nameof(WorkflowInstance.UpdatedAt));
 0185                var proposedValue = (DateTimeOffset)proposedValues[updatedAtProperty]!;
 0186                var databaseValue = (DateTimeOffset)databaseValues[updatedAtProperty]!;
 187
 0188                if (proposedValue > databaseValue)
 0189                    proposedValues[updatedAtProperty] = proposedValue;
 190
 0191                entry.OriginalValues.SetValues(databaseValues);
 0192            }
 193        }
 0194    }
 195
 196    /// <inheritdoc />
 197    [RequiresUnreferencedCode("Calls Elsa.Workflows.Contracts.IWorkflowStateSerializer.SerializeAsync(WorkflowState, Can
 198    public async ValueTask SaveAsync(WorkflowInstance instance, CancellationToken cancellationToken = default)
 199    {
 349200        await _store.SaveAsync(instance, OnSaveAsync, cancellationToken);
 349201    }
 202
 203    /// <inheritdoc />
 204    public async ValueTask AddAsync(WorkflowInstance instance, CancellationToken cancellationToken = default)
 205    {
 0206        await _store.AddAsync(instance, OnSaveAsync, cancellationToken);
 0207    }
 208
 209    /// <inheritdoc />
 210    public async ValueTask UpdateAsync(WorkflowInstance instance, CancellationToken cancellationToken = default)
 211    {
 0212        await _store.UpdateAsync(instance, OnSaveAsync, cancellationToken);
 0213    }
 214
 215    /// <inheritdoc />
 216    public async ValueTask SaveManyAsync(IEnumerable<WorkflowInstance> instances, CancellationToken cancellationToken = 
 217    {
 0218        await _store.SaveManyAsync(instances, OnSaveAsync, cancellationToken);
 0219    }
 220
 221    [RequiresUnreferencedCode("Calls Elsa.Workflows.Contracts.IWorkflowStateSerializer.SerializeAsync(WorkflowState, Can
 222    private async ValueTask OnSaveAsync(ManagementElsaDbContext managementElsaDbContext, WorkflowInstance entity, Cancel
 223    {
 349224        var data = entity.WorkflowState;
 349225        var json = _workflowStateSerializer.Serialize(data);
 349226        var compressionAlgorithm = _options.Value.CompressionAlgorithm ?? nameof(None);
 349227        var compressionCodec = _compressionCodecResolver.Resolve(compressionAlgorithm);
 349228        var compressedJson = await compressionCodec.CompressAsync(json, cancellationToken);
 229
 349230        managementElsaDbContext.Entry(entity).Property("Data").CurrentValue = compressedJson;
 349231        managementElsaDbContext.Entry(entity).Property("DataCompressionAlgorithm").CurrentValue = compressionAlgorithm;
 349232    }
 233
 234    private async ValueTask OnLoadAsync(ManagementElsaDbContext managementElsaDbContext, WorkflowInstance? entity, Cance
 235    {
 95236        if (entity == null)
 0237            return;
 238
 95239        var data = entity.WorkflowState;
 95240        var json = (string?)managementElsaDbContext.Entry(entity).Property("Data").CurrentValue;
 95241        var compressionAlgorithm = (string?)managementElsaDbContext.Entry(entity).Property("DataCompressionAlgorithm").C
 95242        var compressionStrategy = _compressionCodecResolver.Resolve(compressionAlgorithm);
 243
 244        try
 245        {
 95246            if (!string.IsNullOrWhiteSpace(json))
 247            {
 95248                json = await compressionStrategy.DecompressAsync(json, cancellationToken);
 95249                data = _workflowStateSerializer.Deserialize(json);
 250            }
 95251        }
 0252        catch (Exception exp)
 253        {
 0254            _logger.LogWarning(exp, "Exception while deserializing workflow instance state: {InstanceId}. Reverting to d
 0255        }
 95256        entity.WorkflowState = data;
 95257    }
 258
 259    private static IQueryable<WorkflowInstance> Filter(IQueryable<WorkflowInstance> query, WorkflowInstanceFilter filter
 260    {
 141261        return filter.Apply(query);
 262    }
 263}