| | | 1 | | using Elsa.Common; |
| | | 2 | | using Elsa.Common.Codecs; |
| | | 3 | | using Elsa.Workflows.LogPersistence; |
| | | 4 | | using Elsa.Workflows.Management.Options; |
| | | 5 | | using Elsa.Workflows.Runtime.Entities; |
| | | 6 | | using Elsa.Workflows.Runtime.Extensions; |
| | | 7 | | using Elsa.Workflows.State; |
| | | 8 | | using Microsoft.Extensions.Options; |
| | | 9 | | |
| | | 10 | | namespace Elsa.Workflows.Runtime; |
| | | 11 | | |
| | | 12 | | /// <inheritdoc /> |
| | 427 | 13 | | public class DefaultActivityExecutionMapper( |
| | 427 | 14 | | ISafeSerializer safeSerializer, |
| | 427 | 15 | | IPayloadSerializer payloadSerializer, |
| | 427 | 16 | | ICompressionCodecResolver compressionCodecResolver, |
| | 427 | 17 | | IOptions<ManagementOptions> options) : IActivityExecutionMapper |
| | | 18 | | { |
| | | 19 | | /// <inheritdoc /> |
| | | 20 | | public async Task<ActivityExecutionRecord> MapAsync(ActivityExecutionContext source) |
| | | 21 | | { |
| | 3832 | 22 | | var outputs = source.GetOutputs(); |
| | 3832 | 23 | | var inputs = source.GetInputs(); |
| | 3832 | 24 | | var persistenceMap = source.GetLogPersistenceModeMap(); |
| | 3832 | 25 | | var persistableInputs = GetPersistableInputOutput(inputs, persistenceMap.Inputs); |
| | 3832 | 26 | | var persistableOutputs = GetPersistableInputOutput(outputs, persistenceMap.Outputs); |
| | 3832 | 27 | | var persistableProperties = GetPersistableDictionary(source.Properties!, persistenceMap.InternalState); |
| | 3832 | 28 | | var persistableJournalData = GetPersistableDictionary(source.JournalData!, persistenceMap.InternalState); |
| | 3832 | 29 | | var cancellationToken = source.CancellationToken; |
| | | 30 | | |
| | 3832 | 31 | | var record = new ActivityExecutionRecord |
| | 3832 | 32 | | { |
| | 3832 | 33 | | Id = source.Id, |
| | 3832 | 34 | | ActivityId = source.Activity.Id, |
| | 3832 | 35 | | ActivityNodeId = source.NodeId, |
| | 3832 | 36 | | WorkflowInstanceId = source.WorkflowExecutionContext.Id, |
| | 3832 | 37 | | ActivityType = source.Activity.Type, |
| | 3832 | 38 | | ActivityName = source.Activity.Name, |
| | 3832 | 39 | | ActivityState = persistableInputs, |
| | 3832 | 40 | | Outputs = persistableOutputs, |
| | 3832 | 41 | | Properties = persistableProperties!, |
| | 3832 | 42 | | Metadata = new Dictionary<string, object>(source.Metadata), |
| | 3832 | 43 | | Payload = persistableJournalData!, |
| | 3832 | 44 | | Exception = ExceptionState.FromException(source.Exception), |
| | 3832 | 45 | | ActivityTypeVersion = source.Activity.Version, |
| | 3832 | 46 | | StartedAt = source.StartedAt, |
| | 3832 | 47 | | HasBookmarks = source.Bookmarks.Any(), |
| | 3832 | 48 | | Status = source.Status, |
| | 3832 | 49 | | AggregateFaultCount = source.AggregateFaultCount, |
| | 3832 | 50 | | CompletedAt = source.CompletedAt |
| | 3832 | 51 | | }; |
| | | 52 | | |
| | 3832 | 53 | | record = record.SanitizeLogMessage(); |
| | 3832 | 54 | | var compressionAlgorithm = options.Value.CompressionAlgorithm ?? nameof(None); |
| | 3832 | 55 | | var serializedActivityState = record.ActivityState?.Count > 0 ? safeSerializer.Serialize(record.ActivityState) : |
| | 3832 | 56 | | var compressedSerializedActivityState = serializedActivityState != null ? await compressionCodecResolver.Resolve |
| | 3832 | 57 | | var serializedProperties = record.Properties != null ? payloadSerializer.Serialize(record.Properties) : null; |
| | 3832 | 58 | | var serializedMetadata = record.Metadata != null ? payloadSerializer.Serialize(record.Metadata) : null; |
| | 3832 | 59 | | record.SerializedSnapshot = new() |
| | 3832 | 60 | | { |
| | 3832 | 61 | | Id = record.Id, |
| | 3832 | 62 | | TenantId = record.TenantId, |
| | 3832 | 63 | | WorkflowInstanceId = record.WorkflowInstanceId, |
| | 3832 | 64 | | ActivityId = record.ActivityId, |
| | 3832 | 65 | | ActivityNodeId = record.ActivityNodeId, |
| | 3832 | 66 | | ActivityType = record.ActivityType, |
| | 3832 | 67 | | ActivityTypeVersion = record.ActivityTypeVersion, |
| | 3832 | 68 | | ActivityName = record.ActivityName, |
| | 3832 | 69 | | StartedAt = record.StartedAt, |
| | 3832 | 70 | | HasBookmarks = record.HasBookmarks, |
| | 3832 | 71 | | Status = record.Status, |
| | 3832 | 72 | | AggregateFaultCount = record.AggregateFaultCount, |
| | 3832 | 73 | | CompletedAt = record.CompletedAt, |
| | 3832 | 74 | | SerializedActivityState = compressedSerializedActivityState, |
| | 3832 | 75 | | SerializedActivityStateCompressionAlgorithm = compressionAlgorithm, |
| | 3832 | 76 | | SerializedOutputs = record.Outputs?.Any() == true ? safeSerializer.Serialize(record.Outputs) : null, |
| | 3832 | 77 | | SerializedProperties = serializedProperties, |
| | 3832 | 78 | | SerializedMetadata = serializedMetadata, |
| | 3832 | 79 | | SerializedException = record.Exception != null ? payloadSerializer.Serialize(record.Exception) : null, |
| | 3832 | 80 | | SerializedPayload = record.Payload?.Any() == true ? payloadSerializer.Serialize(record.Payload) : null |
| | 3832 | 81 | | }; |
| | | 82 | | |
| | 3832 | 83 | | return record; |
| | 3832 | 84 | | } |
| | | 85 | | |
| | | 86 | | private IDictionary<string, object?> GetPersistableInputOutput(IDictionary<string, object> state, IDictionary<string |
| | | 87 | | { |
| | 7664 | 88 | | var result = new Dictionary<string, object?>(); |
| | 30136 | 89 | | foreach (var stateEntry in state) |
| | | 90 | | { |
| | 7404 | 91 | | var mode = map.TryGetValue(stateEntry.Key, out var value) ? value : LogPersistenceMode.Include; |
| | 7404 | 92 | | if (mode == LogPersistenceMode.Include) |
| | 7396 | 93 | | result.Add(stateEntry.Key, stateEntry.Value); |
| | | 94 | | } |
| | | 95 | | |
| | 7664 | 96 | | return result; |
| | | 97 | | } |
| | | 98 | | |
| | | 99 | | private IDictionary<string, object?>? GetPersistableDictionary(IDictionary<string, object?> dictionary, LogPersisten |
| | | 100 | | { |
| | 7664 | 101 | | return mode == LogPersistenceMode.Include ? new Dictionary<string, object?>(dictionary) : null; |
| | | 102 | | } |
| | | 103 | | } |