| | | 1 | | using System.Diagnostics.CodeAnalysis; |
| | | 2 | | using System.Linq.Expressions; |
| | | 3 | | using System.Text.Json; |
| | | 4 | | using Elsa.Common; |
| | | 5 | | using Elsa.Common.Codecs; |
| | | 6 | | using Elsa.Common.Entities; |
| | | 7 | | using Elsa.Extensions; |
| | | 8 | | using Elsa.Workflows; |
| | | 9 | | using Elsa.Workflows.Runtime; |
| | | 10 | | using Elsa.Workflows.Runtime.Entities; |
| | | 11 | | using Elsa.Workflows.Runtime.Extensions; |
| | | 12 | | using Elsa.Workflows.Runtime.Filters; |
| | | 13 | | using Elsa.Workflows.Runtime.OrderDefinitions; |
| | | 14 | | using Elsa.Workflows.State; |
| | | 15 | | using JetBrains.Annotations; |
| | | 16 | | using Microsoft.EntityFrameworkCore; |
| | | 17 | | using Open.Linq.AsyncExtensions; |
| | | 18 | | |
| | | 19 | | namespace Elsa.Persistence.EFCore.Modules.Runtime; |
| | | 20 | | |
| | | 21 | | /// <summary> |
| | | 22 | | /// An EF Core implementation of <see cref="IActivityExecutionStore"/>. |
| | | 23 | | /// </summary> |
| | | 24 | | [UsedImplicitly] |
| | 324 | 25 | | public class EFCoreActivityExecutionStore( |
| | 324 | 26 | | EntityStore<RuntimeElsaDbContext, ActivityExecutionRecord> store, |
| | 324 | 27 | | IPayloadSerializer payloadSerializer, |
| | 324 | 28 | | ICompressionCodecResolver compressionCodecResolver) : IActivityExecutionStore |
| | | 29 | | { |
| | | 30 | | /// <inheritdoc /> |
| | 0 | 31 | | public async Task SaveAsync(ActivityExecutionRecord record, CancellationToken cancellationToken = default) => await |
| | | 32 | | |
| | | 33 | | /// <inheritdoc /> |
| | 323 | 34 | | public async Task SaveManyAsync(IEnumerable<ActivityExecutionRecord> records, CancellationToken cancellationToken = |
| | | 35 | | |
| | | 36 | | /// <inheritdoc /> |
| | 0 | 37 | | public async Task AddManyAsync(IEnumerable<ActivityExecutionRecord> records, CancellationToken cancellationToken = d |
| | | 38 | | |
| | | 39 | | /// <inheritdoc /> |
| | | 40 | | [RequiresUnreferencedCode("Calls Elsa.Persistence.EFCore.Modules.Runtime.EFCoreActivityExecutionStore.DeserializeAct |
| | | 41 | | public async Task<ActivityExecutionRecord?> FindAsync(ActivityExecutionRecordFilter filter, CancellationToken cancel |
| | | 42 | | { |
| | 2 | 43 | | return await store.QueryAsync(queryable => Filter(queryable, filter), OnLoadAsync, cancellationToken).FirstOrDef |
| | 1 | 44 | | } |
| | | 45 | | |
| | | 46 | | /// <inheritdoc /> |
| | | 47 | | public async Task<IEnumerable<ActivityExecutionRecord>> FindManyAsync<TOrderBy>(ActivityExecutionRecordFilter filter |
| | | 48 | | { |
| | 4 | 49 | | return await store.QueryAsync(queryable => Filter(queryable, filter).OrderBy(order), OnLoadAsync, cancellationTo |
| | 2 | 50 | | } |
| | | 51 | | |
| | | 52 | | /// <inheritdoc /> |
| | | 53 | | [RequiresUnreferencedCode("Calls Elsa.Persistence.EFCore.Modules.Runtime.EFCoreActivityExecutionStore.DeserializeAct |
| | | 54 | | public async Task<IEnumerable<ActivityExecutionRecord>> FindManyAsync(ActivityExecutionRecordFilter filter, Cancella |
| | | 55 | | { |
| | 16 | 56 | | return await store.QueryAsync(queryable => Filter(queryable, filter), OnLoadAsync, cancellationToken).ToList(); |
| | 8 | 57 | | } |
| | | 58 | | |
| | | 59 | | /// <inheritdoc /> |
| | | 60 | | public async Task<IEnumerable<ActivityExecutionRecordSummary>> FindManySummariesAsync<TOrderBy>(ActivityExecutionRec |
| | | 61 | | { |
| | 0 | 62 | | var shadowRecords = await store.QueryAsync(query => Filter(query, filter), FromRecordExpression(), cancellationT |
| | 0 | 63 | | return Map(shadowRecords); |
| | 0 | 64 | | } |
| | | 65 | | |
| | | 66 | | /// <inheritdoc /> |
| | | 67 | | public async Task<IEnumerable<ActivityExecutionRecordSummary>> FindManySummariesAsync(ActivityExecutionRecordFilter |
| | | 68 | | { |
| | 0 | 69 | | var shadowRecords = await store.QueryAsync(query => Filter(query, filter), FromRecordExpression(), cancellationT |
| | 0 | 70 | | return Map(shadowRecords); |
| | 0 | 71 | | } |
| | | 72 | | |
| | | 73 | | /// <inheritdoc /> |
| | | 74 | | public async Task<long> CountAsync(ActivityExecutionRecordFilter filter, CancellationToken cancellationToken = defau |
| | | 75 | | { |
| | 0 | 76 | | return await store.CountAsync(queryable => Filter(queryable, filter), cancellationToken); |
| | 0 | 77 | | } |
| | | 78 | | |
| | | 79 | | /// <inheritdoc /> |
| | | 80 | | public async Task<long> DeleteManyAsync(ActivityExecutionRecordFilter filter, CancellationToken cancellationToken = |
| | | 81 | | { |
| | 26 | 82 | | return await store.DeleteWhereAsync(queryable => Filter(queryable, filter), cancellationToken); |
| | 13 | 83 | | } |
| | | 84 | | |
| | | 85 | | private ValueTask OnSaveAsync(RuntimeElsaDbContext dbContext, ActivityExecutionRecord entity, CancellationToken canc |
| | | 86 | | { |
| | 2481 | 87 | | var snapshot = entity.SerializedSnapshot; |
| | | 88 | | |
| | 2481 | 89 | | if (snapshot is null) |
| | 0 | 90 | | return ValueTask.CompletedTask; |
| | | 91 | | |
| | 2481 | 92 | | dbContext.Entry(entity).Property("SerializedActivityState").CurrentValue = snapshot.SerializedActivityState; |
| | 2481 | 93 | | dbContext.Entry(entity).Property("SerializedActivityStateCompressionAlgorithm").CurrentValue = snapshot.Serializ |
| | 2481 | 94 | | dbContext.Entry(entity).Property("SerializedOutputs").CurrentValue = snapshot.SerializedOutputs; |
| | 2481 | 95 | | dbContext.Entry(entity).Property("SerializedProperties").CurrentValue = snapshot.SerializedProperties; |
| | 2481 | 96 | | dbContext.Entry(entity).Property("SerializedMetadata").CurrentValue = snapshot.SerializedMetadata; |
| | 2481 | 97 | | dbContext.Entry(entity).Property("SerializedException").CurrentValue = snapshot.SerializedException; |
| | 2481 | 98 | | dbContext.Entry(entity).Property("SerializedPayload").CurrentValue = snapshot.SerializedPayload; |
| | 2481 | 99 | | return ValueTask.CompletedTask; |
| | | 100 | | } |
| | | 101 | | |
| | | 102 | | [RequiresUnreferencedCode("Calls Elsa.Persistence.EFCore.Modules.Runtime.EFCoreActivityExecutionStore.DeserializeAct |
| | | 103 | | private async ValueTask OnLoadAsync(RuntimeElsaDbContext dbContext, ActivityExecutionRecord? entity, CancellationTok |
| | | 104 | | { |
| | 33 | 105 | | if (entity is null) |
| | 0 | 106 | | return; |
| | | 107 | | |
| | 33 | 108 | | entity.ActivityState = await DeserializeActivityState(dbContext, entity, cancellationToken); |
| | 33 | 109 | | entity.Outputs = Deserialize<IDictionary<string, object?>>(dbContext, entity, "SerializedOutputs"); |
| | 33 | 110 | | entity.Properties = DeserializePayload<IDictionary<string, object>?>(dbContext, entity, "SerializedProperties"); |
| | 33 | 111 | | entity.Metadata = DeserializePayload<IDictionary<string, object>?>(dbContext, entity, "SerializedMetadata"); |
| | 33 | 112 | | entity.Exception = DeserializePayload<ExceptionState>(dbContext, entity, "SerializedException"); |
| | 33 | 113 | | entity.Payload = DeserializePayload<IDictionary<string, object>>(dbContext, entity, "SerializedPayload"); |
| | 33 | 114 | | } |
| | | 115 | | |
| | | 116 | | [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Deserialize<TValue>(String, JsonSerializerOptions)" |
| | | 117 | | private async Task<IDictionary<string, object?>?> DeserializeActivityState(RuntimeElsaDbContext dbContext, ActivityE |
| | | 118 | | { |
| | 33 | 119 | | var json = dbContext.Entry(entity).Property<string>("SerializedActivityState").CurrentValue; |
| | | 120 | | |
| | 33 | 121 | | if (!string.IsNullOrWhiteSpace(json)) |
| | | 122 | | { |
| | 16 | 123 | | var compressionAlgorithm = (string?)dbContext.Entry(entity).Property("SerializedActivityStateCompressionAlgo |
| | 16 | 124 | | var compressionStrategy = compressionCodecResolver.Resolve(compressionAlgorithm); |
| | 16 | 125 | | json = await compressionStrategy.DecompressAsync(json, cancellationToken); |
| | 16 | 126 | | var dictionary = JsonSerializer.Deserialize<IDictionary<string, object?>>(json); |
| | 48 | 127 | | return dictionary?.ToDictionary(x => x.Key, x => x.Value); |
| | | 128 | | } |
| | | 129 | | |
| | 17 | 130 | | return null; |
| | 33 | 131 | | } |
| | | 132 | | |
| | | 133 | | [RequiresUnreferencedCode("Calls System.Text.Json.JsonSerializer.Deserialize<TValue>(String, JsonSerializerOptions)" |
| | | 134 | | private T Deserialize<T>(RuntimeElsaDbContext dbContext, ActivityExecutionRecord entity, string propertyName) |
| | | 135 | | { |
| | 33 | 136 | | var json = dbContext.Entry(entity).Property<string>(propertyName).CurrentValue; |
| | 33 | 137 | | var value = !string.IsNullOrEmpty(json) ? JsonSerializer.Deserialize<T>(json) : default; |
| | 6 | 138 | | return value!; |
| | | 139 | | } |
| | | 140 | | |
| | | 141 | | private T DeserializePayload<T>(RuntimeElsaDbContext dbContext, ActivityExecutionRecord entity, string propertyName) |
| | | 142 | | { |
| | 132 | 143 | | var json = dbContext.Entry(entity).Property<string>(propertyName).CurrentValue; |
| | 132 | 144 | | var payload = !string.IsNullOrEmpty(json) ? payloadSerializer.Deserialize<T>(json) : default; |
| | 59 | 145 | | return payload!; |
| | | 146 | | } |
| | | 147 | | |
| | 24 | 148 | | private static IQueryable<ActivityExecutionRecord> Filter(IQueryable<ActivityExecutionRecord> queryable, ActivityExe |
| | | 149 | | |
| | | 150 | | private static Expression<Func<ActivityExecutionRecord, ShadowActivityExecutionRecordSummary>> FromRecordExpression( |
| | | 151 | | { |
| | 0 | 152 | | return record => new() |
| | 0 | 153 | | { |
| | 0 | 154 | | Id = record.Id, |
| | 0 | 155 | | WorkflowInstanceId = record.WorkflowInstanceId, |
| | 0 | 156 | | ActivityId = record.ActivityId, |
| | 0 | 157 | | ActivityNodeId = record.ActivityNodeId, |
| | 0 | 158 | | ActivityType = record.ActivityType, |
| | 0 | 159 | | ActivityTypeVersion = record.ActivityTypeVersion, |
| | 0 | 160 | | ActivityName = record.ActivityName, |
| | 0 | 161 | | StartedAt = record.StartedAt, |
| | 0 | 162 | | HasBookmarks = record.HasBookmarks, |
| | 0 | 163 | | Status = record.Status, |
| | 0 | 164 | | AggregateFaultCount = record.AggregateFaultCount, |
| | 0 | 165 | | SerializedProperties = EF.Property<string>(record, "SerializedProperties"), |
| | 0 | 166 | | SerializedMetadata = EF.Property<string>(record, "SerializedMetadata"), |
| | 0 | 167 | | CompletedAt = record.CompletedAt |
| | 0 | 168 | | }; |
| | | 169 | | } |
| | | 170 | | |
| | 0 | 171 | | private IEnumerable<ActivityExecutionRecordSummary> Map(IEnumerable<ShadowActivityExecutionRecordSummary> source) => |
| | | 172 | | |
| | | 173 | | private ActivityExecutionRecordSummary Map(ShadowActivityExecutionRecordSummary source) |
| | | 174 | | { |
| | 0 | 175 | | return new() |
| | 0 | 176 | | { |
| | 0 | 177 | | Id = source.Id, |
| | 0 | 178 | | WorkflowInstanceId = source.WorkflowInstanceId, |
| | 0 | 179 | | ActivityId = source.ActivityId, |
| | 0 | 180 | | ActivityNodeId = source.ActivityNodeId, |
| | 0 | 181 | | ActivityType = source.ActivityType, |
| | 0 | 182 | | ActivityTypeVersion = source.ActivityTypeVersion, |
| | 0 | 183 | | ActivityName = source.ActivityName, |
| | 0 | 184 | | StartedAt = source.StartedAt, |
| | 0 | 185 | | HasBookmarks = source.HasBookmarks, |
| | 0 | 186 | | Status = source.Status, |
| | 0 | 187 | | AggregateFaultCount = source.AggregateFaultCount, |
| | 0 | 188 | | CompletedAt = source.CompletedAt, |
| | 0 | 189 | | Metadata = source.SerializedMetadata is null ? null : payloadSerializer.Deserialize<IDictionary<string, obje |
| | 0 | 190 | | }; |
| | | 191 | | } |
| | | 192 | | |
| | | 193 | | private class ShadowActivityExecutionRecordSummary : Entity |
| | | 194 | | { |
| | 0 | 195 | | public string WorkflowInstanceId { get; set; } = null!; |
| | 0 | 196 | | public string ActivityId { get; set; } = null!; |
| | 0 | 197 | | public string ActivityNodeId { get; set; } = null!; |
| | 0 | 198 | | public string ActivityType { get; set; } = null!; |
| | 0 | 199 | | public int ActivityTypeVersion { get; set; } |
| | 0 | 200 | | public string? ActivityName { get; set; } |
| | 0 | 201 | | public DateTimeOffset StartedAt { get; set; } |
| | 0 | 202 | | public bool HasBookmarks { get; set; } |
| | 0 | 203 | | public ActivityStatus Status { get; set; } |
| | 0 | 204 | | public string? SerializedProperties { get; set; } |
| | 0 | 205 | | public string? SerializedMetadata { get; set; } |
| | 0 | 206 | | public int AggregateFaultCount { get; set; } |
| | 0 | 207 | | public DateTimeOffset? CompletedAt { get; set; } |
| | | 208 | | } |
| | | 209 | | } |