diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreInboxStore.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreInboxStore.cs index db3b044..d28365a 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreInboxStore.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreInboxStore.cs @@ -3,6 +3,7 @@ using System.Data; using System.Data.Common; using System.Linq; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; using BBT.Aether.Clock; @@ -189,7 +190,7 @@ LIMIT @batchSize FOR UPDATE SKIP LOCKED ) RETURNING "Id", "Status", "EventName", "EventData", "CreatedAt", - "HandledTime", "LockedBy", "LockedUntil", "RetryCount", "NextRetryTime"; + "HandledTime", "LockedBy", "LockedUntil", "RetryCount", "NextRetryTime", "ExtraProperties"; """; AddParameter(command, "@processing", (int)IncomingEventStatus.Processing); @@ -224,7 +225,7 @@ FOR UPDATE SKIP LOCKED NextRetryTime = reader.IsDBNull(reader.GetOrdinal("NextRetryTime")) ? null : reader.GetDateTime(reader.GetOrdinal("NextRetryTime")), - ExtraProperties = new Dictionary() + ExtraProperties = DeserializeExtraProperties(reader) }; messages.Add(message); @@ -240,4 +241,18 @@ protected virtual void AddParameter(DbCommand command, string name, object value parameter.Value = value ?? DBNull.Value; command.Parameters.Add(parameter); } + + private static Dictionary DeserializeExtraProperties(DbDataReader reader) + { + var ordinal = reader.GetOrdinal("ExtraProperties"); + if (reader.IsDBNull(ordinal)) + return new Dictionary(); + + var json = reader.GetString(ordinal); + if (string.IsNullOrWhiteSpace(json) || json == "{}") + return new Dictionary(); + + return JsonSerializer.Deserialize>(json) + ?? new Dictionary(); + } } \ No newline at end of file diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreOutboxStore.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreOutboxStore.cs index 6fccb74..ac8fa39 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreOutboxStore.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/EfCoreOutboxStore.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Data; using System.Data.Common; +using System.Text.Json; using System.Threading; using System.Threading.Tasks; using BBT.Aether.Clock; @@ -102,7 +103,7 @@ LIMIT @batchSize FOR UPDATE SKIP LOCKED ) RETURNING "Id", "Status", "EventName", "EventData", "CreatedAt", - "ProcessedAt", "LockedBy", "LockedUntil", "LastError", "RetryCount", "NextRetryAt"; + "ProcessedAt", "LockedBy", "LockedUntil", "LastError", "RetryCount", "NextRetryAt", "ExtraProperties"; """; AddParameter(command, "@processing", (int)OutboxMessageStatus.Processing); @@ -139,7 +140,8 @@ FOR UPDATE SKIP LOCKED RetryCount = reader.GetInt32(reader.GetOrdinal("RetryCount")), NextRetryAt = reader.IsDBNull(reader.GetOrdinal("NextRetryAt")) ? null - : reader.GetDateTime(reader.GetOrdinal("NextRetryAt")) + : reader.GetDateTime(reader.GetOrdinal("NextRetryAt")), + ExtraProperties = DeserializeExtraProperties(reader) }; messages.Add(message); @@ -155,5 +157,19 @@ protected virtual void AddParameter(DbCommand command, string name, object value parameter.Value = value ?? DBNull.Value; command.Parameters.Add(parameter); } + + private static Dictionary DeserializeExtraProperties(DbDataReader reader) + { + var ordinal = reader.GetOrdinal("ExtraProperties"); + if (reader.IsDBNull(ordinal)) + return new Dictionary(); + + var json = reader.GetString(ordinal); + if (string.IsNullOrWhiteSpace(json) || json == "{}") + return new Dictionary(); + + return JsonSerializer.Deserialize>(json) + ?? new Dictionary(); + } } diff --git a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs index aa66622..d660913 100644 --- a/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs +++ b/framework/src/BBT.Aether.Infrastructure/BBT/Aether/Events/Processing/OutboxProcessor.cs @@ -97,7 +97,7 @@ protected virtual async Task ProcessOutboxMessagesAsync(CancellationToken cancel domainMessage.LockedUntil = null; } - logger.LogDebug("Successfully published outbox message {MessageId}", message.Id); + logger.LogInformation("Successfully published outbox message {MessageId}", message.Id); } catch (Exception ex) {