< Summary

Information
Class: Elsa.AI.Persistence.EFCore.Stores.EFCoreAIConversationStore
Assembly: Elsa.AI.Persistence.EFCore
File(s): /home/runner/work/elsa-core/elsa-core/src/modules/Elsa.AI.Persistence.EFCore/Stores/EFCoreAIConversationStore.cs
Line coverage
75%
Covered lines: 97
Uncovered lines: 31
Coverable lines: 128
Total lines: 236
Line coverage: 75.7%
Branch coverage
72%
Covered branches: 45
Total branches: 62
Branch coverage: 72.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
FindAsync()75%4487.5%
SaveAsync()100%4487.5%
RetryAsUpdateAsync()0%2040%
Map(...)50%22100%
Map(...)100%22100%
SerializeMessages(...)90%101092.3%
SerializeSingleTruncatedMessage(...)100%44100%
CreateTruncatedMessage(...)100%22100%
NormalizeSliceLength(...)75%5466.66%
ShrinkMessagesToByteLimit(...)0%2040%
IsExpired(...)70%101087.5%
ParseEnum(...)50%22100%
BelongsToTenant(...)100%11100%
NormalizeTenantId(...)100%22100%
ValidateUserOwnership(...)100%44100%
Validate(...)75%4480%

File(s)

/home/runner/work/elsa-core/elsa-core/src/modules/Elsa.AI.Persistence.EFCore/Stores/EFCoreAIConversationStore.cs

#LineLine coverage
 1using System.Text;
 2using System.Text.Json;
 3using System.Text.Json.Nodes;
 4using Elsa.AI.Abstractions.Contracts;
 5using Elsa.AI.Abstractions.Models;
 6using Elsa.AI.Persistence.EFCore.Entities;
 7using Microsoft.EntityFrameworkCore;
 8
 9namespace Elsa.AI.Persistence.EFCore.Stores;
 10
 1911public class EFCoreAIConversationStore(AIDbContext dbContext) : IAIConversationStore
 12{
 13    private const int MaxStoredMessages = 256;
 14    private const int MaxMessagesJsonBytes = 1024 * 1024;
 15
 16    public async ValueTask<AIConversation?> FindAsync(string id, CancellationToken cancellationToken = default)
 17    {
 1318        var record = await dbContext.Conversations.AsNoTracking().FirstOrDefaultAsync(x => x.Id == id, cancellationToken
 1319        if (record == null)
 020            return null;
 21
 1322        var conversation = Map(record);
 1323        if (!IsExpired(conversation))
 1124            return conversation;
 25
 226        return null;
 1327    }
 28
 29    public async ValueTask SaveAsync(AIConversation conversation, CancellationToken cancellationToken = default)
 30    {
 2231        Validate(conversation);
 2132        var isNew = false;
 2133        var record = await dbContext.Conversations.FindAsync([conversation.Id], cancellationToken);
 2134        if (record == null)
 35        {
 1636            record = new AIConversationRecord { Id = conversation.Id };
 1637            dbContext.Conversations.Add(record);
 1638            isNew = true;
 39        }
 540        else if (!BelongsToTenant(record.TenantId, conversation.TenantId))
 41        {
 142            throw new InvalidOperationException("Cannot overwrite an AI conversation that belongs to another tenant.");
 43        }
 44        else
 45        {
 446            ValidateUserOwnership(record, conversation);
 47        }
 48
 1949        Map(conversation, record);
 50
 51        try
 52        {
 1953            await dbContext.SaveChangesAsync(cancellationToken);
 1954        }
 055        catch (DbUpdateException e) when (isNew)
 56        {
 057            await RetryAsUpdateAsync(conversation, e, cancellationToken);
 58        }
 1959    }
 60
 61    private async ValueTask RetryAsUpdateAsync(AIConversation conversation, DbUpdateException originalException, Cancell
 62    {
 063        dbContext.ChangeTracker.Clear();
 064        var record = await dbContext.Conversations.FindAsync([conversation.Id], cancellationToken);
 065        if (record == null)
 066            throw new DbUpdateException($"Failed to insert AI conversation {conversation.Id}, and no existing record was
 67
 068        if (!BelongsToTenant(record.TenantId, conversation.TenantId))
 069            throw new InvalidOperationException("Cannot overwrite an AI conversation that belongs to another tenant.");
 70
 071        ValidateUserOwnership(record, conversation);
 72
 073        Map(conversation, record);
 074        await dbContext.SaveChangesAsync(cancellationToken);
 075    }
 76
 77    private static AIConversation Map(AIConversationRecord record) =>
 1378        new()
 1379        {
 1380            Id = record.Id,
 1381            TenantId = record.TenantId,
 1382            UserId = record.UserId,
 1383            Title = record.Title,
 1384            Status = ParseEnum(record.Status, AIConversationStatus.Active),
 1385            CreatedAt = record.CreatedAt,
 1386            UpdatedAt = record.UpdatedAt,
 1387            ProviderSessionId = record.ProviderSessionId,
 1388            RetentionMode = ParseEnum(record.RetentionMode, AIRetentionMode.Configured),
 1389            RetentionExpiresAt = record.RetentionExpiresAt,
 1390            Messages = JsonSerializer.Deserialize<IReadOnlyCollection<AIMessage>>(record.Messages) ?? []
 1391        };
 92
 93    private static void Map(AIConversation conversation, AIConversationRecord record)
 94    {
 1995        record.TenantId = NormalizeTenantId(conversation.TenantId);
 1996        record.UserId = conversation.UserId;
 1997        record.Title = conversation.Title;
 1998        record.Status = conversation.Status.ToString();
 1999        if (record.CreatedAt == default)
 16100            record.CreatedAt = conversation.CreatedAt;
 101
 19102        record.UpdatedAt = conversation.UpdatedAt;
 19103        record.ProviderSessionId = conversation.ProviderSessionId;
 19104        record.RetentionMode = conversation.RetentionMode.ToString();
 19105        record.RetentionExpiresAt = conversation.RetentionExpiresAt;
 106
 19107        record.Messages = SerializeMessages(conversation.Messages);
 19108    }
 109
 110    private static string SerializeMessages(IReadOnlyCollection<AIMessage> messages)
 111    {
 19112        var orderedMessages = messages
 600113            .OrderBy(x => x.CreatedAt)
 600114            .ThenBy(x => x.StreamSequence)
 19115            .ToList();
 19116        var boundedMessages = orderedMessages.Count > MaxStoredMessages
 19117            ? orderedMessages.Skip(orderedMessages.Count - MaxStoredMessages).ToList()
 19118            : orderedMessages;
 19119        var json = JsonSerializer.Serialize(boundedMessages);
 120
 19121        if (boundedMessages.Count > 1 && Encoding.UTF8.GetByteCount(json) > MaxMessagesJsonBytes)
 0122            (boundedMessages, json) = ShrinkMessagesToByteLimit(boundedMessages);
 123
 19124        if (boundedMessages.Count == 1 && Encoding.UTF8.GetByteCount(json) > MaxMessagesJsonBytes)
 2125            json = SerializeSingleTruncatedMessage(boundedMessages[0]);
 126
 19127        return json;
 128    }
 129
 130    private static string SerializeSingleTruncatedMessage(AIMessage message)
 131    {
 2132        var candidateLength = Math.Min(message.Content.Length, MaxMessagesJsonBytes / 4);
 133
 40134        while (true)
 135        {
 42136            candidateLength = NormalizeSliceLength(message.Content, candidateLength);
 42137            var candidateContent = message.Content[..candidateLength];
 42138            var candidateJson = JsonSerializer.Serialize(new[] { CreateTruncatedMessage(message, candidateContent) });
 139
 42140            if (Encoding.UTF8.GetByteCount(candidateJson) <= MaxMessagesJsonBytes)
 1141                return candidateJson;
 142
 41143            if (candidateLength == 0)
 1144                return JsonSerializer.Serialize(new[] { CreateTruncatedMessage(message, "", preserveMetadata: false) });
 145
 40146            candidateLength -= Math.Max(1, candidateLength / 10);
 147        }
 148    }
 149
 150    private static AIMessage CreateTruncatedMessage(AIMessage message, string content, bool preserveMetadata = true)
 151    {
 43152        var metadata = preserveMetadata ? message.Metadata.DeepClone().AsObject() : [];
 43153        metadata["truncated"] = true;
 43154        metadata["maxBytes"] = MaxMessagesJsonBytes;
 155
 43156        return message with
 43157        {
 43158            Content = content,
 43159            Metadata = metadata
 43160        };
 161    }
 162
 163    private static int NormalizeSliceLength(string value, int length)
 164    {
 42165        if (length > 0 && char.IsHighSurrogate(value[length - 1]))
 0166            length--;
 167
 42168        return length;
 169    }
 170
 171    private static (List<AIMessage> Messages, string Json) ShrinkMessagesToByteLimit(List<AIMessage> messages)
 172    {
 0173        var low = 1;
 0174        var high = messages.Count;
 0175        var bestMessages = messages.Skip(messages.Count - 1).ToList();
 0176        var bestJson = JsonSerializer.Serialize(bestMessages);
 177
 0178        while (low < high)
 179        {
 0180            var candidateCount = (low + high + 1) / 2;
 0181            var candidateMessages = messages.Skip(messages.Count - candidateCount).ToList();
 0182            var candidateJson = JsonSerializer.Serialize(candidateMessages);
 183
 0184            if (Encoding.UTF8.GetByteCount(candidateJson) <= MaxMessagesJsonBytes)
 185            {
 0186                low = candidateCount;
 0187                bestMessages = candidateMessages;
 0188                bestJson = candidateJson;
 189            }
 190            else
 191            {
 0192                high = candidateCount - 1;
 193            }
 194        }
 195
 0196        return (bestMessages, bestJson);
 197    }
 198
 199    private static bool IsExpired(AIConversation conversation)
 200    {
 13201        if (conversation.RetentionMode == AIRetentionMode.Ephemeral)
 1202            return conversation.Status is AIConversationStatus.Completed or AIConversationStatus.Failed;
 203
 12204        if (conversation.RetentionMode == AIRetentionMode.Durable)
 0205            return false;
 206
 12207        var expiresAt = conversation.RetentionExpiresAt;
 12208        if (expiresAt == null)
 8209            return false;
 210
 4211        return expiresAt <= DateTimeOffset.UtcNow;
 212    }
 213
 214    private static TEnum ParseEnum<TEnum>(string value, TEnum defaultValue) where TEnum : struct =>
 26215        Enum.TryParse<TEnum>(value, ignoreCase: true, out var result) ? result : defaultValue;
 216
 217    private static bool BelongsToTenant(string? storedTenantId, string? requestedTenantId) =>
 5218        string.Equals(NormalizeTenantId(storedTenantId), NormalizeTenantId(requestedTenantId), StringComparison.Ordinal)
 219
 29220    private static string NormalizeTenantId(string? tenantId) => tenantId ?? "";
 221
 222    private static void ValidateUserOwnership(AIConversationRecord record, AIConversation conversation)
 223    {
 4224        if (!string.IsNullOrWhiteSpace(record.UserId) && !string.Equals(record.UserId, conversation.UserId, StringCompar
 1225            throw new InvalidOperationException("Cannot overwrite an AI conversation that belongs to another user.");
 3226    }
 227
 228    private static void Validate(AIConversation conversation)
 229    {
 22230        if (string.IsNullOrWhiteSpace(conversation.Id))
 0231            throw new ArgumentException("A conversation ID is required.", nameof(conversation));
 232
 22233        if (string.IsNullOrWhiteSpace(conversation.UserId))
 1234            throw new ArgumentException("A conversation user ID is required.", nameof(conversation));
 21235    }
 236}