From 4aec5ef5a0693fca80dd299247ac0975fd4ff2c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tayfun=20Y=C4=B1lmaz?= Date: Tue, 13 Jan 2026 02:02:25 +0300 Subject: [PATCH] Handle ExtraProperties in inbox and outbox stores Added deserialization logic for the ExtraProperties field in both EfCoreInboxStore and EfCoreOutboxStore to properly load additional event metadata from the database. Also updated logging in OutboxProcessor to use LogInformation for successful message publication. --- .../BBT/Aether/Events/EfCoreInboxStore.cs | 19 ++++++++++++++++-- .../BBT/Aether/Events/EfCoreOutboxStore.cs | 20 +++++++++++++++++-- .../Events/Processing/OutboxProcessor.cs | 2 +- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreInboxStore.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreInboxStore.cs index db3b044..d28365a 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreInboxStore.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreInboxStore.cs @@ -3,6 +3,7 @@ using System.Data; using System.Data.Common; using System.Linq; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; using BBT.Aether.Clock; @@ -189,7 +190,7 @@ LIMIT @batchSize FOR UPDATE SKIP LOCKED ) RETURNING "Id", "Status", "EventName", "EventData", "CreatedAt", - "HandledTime", "LockedBy", "LockedUntil", "RetryCount", "NextRetryTime"; + "HandledTime", "LockedBy", "LockedUntil", "RetryCount", "NextRetryTime", "ExtraProperties"; """; AddParameter(command, "@processing", (int)IncomingEventStatus.Processing); @@ -224,7 +225,7 @@ FOR UPDATE SKIP LOCKED NextRetryTime = reader.IsDBNull(reader.GetOrdinal("NextRetryTime")) ? null : reader.GetDateTime(reader.GetOrdinal("NextRetryTime")), - ExtraProperties = new Dictionary() + ExtraProperties = DeserializeExtraProperties(reader) }; messages.Add(message); @@ -240,4 +241,18 @@ protected virtual void AddParameter(DbCommand command, string name, object value parameter.Value = value ?? DBNull.Value; command.Parameters.Add(parameter); } + + private static Dictionary DeserializeExtraProperties(DbDataReader reader) + { + var ordinal = reader.GetOrdinal("ExtraProperties"); + if (reader.IsDBNull(ordinal)) + return new Dictionary(); + + var json = reader.GetString(ordinal); + if (string.IsNullOrWhiteSpace(json) || json == "{}") + return new Dictionary(); + + return JsonSerializer.Deserialize>(json) + ?? new Dictionary(); + } } \ No newline at end of file diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreOutboxStore.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreOutboxStore.cs index 6fccb74..ac8fa39 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreOutboxStore.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreOutboxStore.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Data; using System.Data.Common; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; using BBT.Aether.Clock; @@ -102,7 +103,7 @@ LIMIT @batchSize FOR UPDATE SKIP LOCKED ) RETURNING "Id", "Status", "EventName", "EventData", "CreatedAt", - "ProcessedAt", "LockedBy", "LockedUntil", "LastError", "RetryCount", "NextRetryAt"; + "ProcessedAt", "LockedBy", "LockedUntil", "LastError", "RetryCount", "NextRetryAt", "ExtraProperties"; """; AddParameter(command, "@processing", (int)OutboxMessageStatus.Processing); @@ -139,7 +140,8 @@ FOR UPDATE SKIP LOCKED RetryCount = reader.GetInt32(reader.GetOrdinal("RetryCount")), NextRetryAt = reader.IsDBNull(reader.GetOrdinal("NextRetryAt")) ? null - : reader.GetDateTime(reader.GetOrdinal("NextRetryAt")) + : reader.GetDateTime(reader.GetOrdinal("NextRetryAt")), + ExtraProperties = DeserializeExtraProperties(reader) }; messages.Add(message); @@ -155,5 +157,19 @@ protected virtual void AddParameter(DbCommand command, string name, object value parameter.Value = value ?? DBNull.Value; command.Parameters.Add(parameter); } + + private static Dictionary DeserializeExtraProperties(DbDataReader reader) + { + var ordinal = reader.GetOrdinal("ExtraProperties"); + if (reader.IsDBNull(ordinal)) + return new Dictionary(); + + var json = reader.GetString(ordinal); + if (string.IsNullOrWhiteSpace(json) || json == "{}") + return new Dictionary(); + + return JsonSerializer.Deserialize>(json) + ?? new Dictionary(); + } } diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs index aa66622..d660913 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs @@ -97,7 +97,7 @@ protected virtual async Task ProcessOutboxMessagesAsync(CancellationToken cancel domainMessage.LockedUntil = null; } - logger.LogDebug("Successfully published outbox message {MessageId}", message.Id); + logger.LogInformation("Successfully published outbox message {MessageId}", message.Id); } catch (Exception ex) {