-
Notifications
You must be signed in to change notification settings - Fork 0
Handle ExtraProperties in inbox and outbox stores #39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| using System.Data; | ||
| using System.Data.Common; | ||
| using System.Linq; | ||
| using System.Text.Json; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using BBT.Aether.Clock; | ||
|
|
@@ -189,7 +190,7 @@ LIMIT @batchSize | |
| FOR UPDATE SKIP LOCKED | ||
| ) | ||
| RETURNING "Id", "Status", "EventName", "EventData", "CreatedAt", | ||
| "HandledTime", "LockedBy", "LockedUntil", "RetryCount", "NextRetryTime"; | ||
| "HandledTime", "LockedBy", "LockedUntil", "RetryCount", "NextRetryTime", "ExtraProperties"; | ||
| """; | ||
|
|
||
| AddParameter(command, "@processing", (int)IncomingEventStatus.Processing); | ||
|
|
@@ -224,7 +225,7 @@ FOR UPDATE SKIP LOCKED | |
| NextRetryTime = reader.IsDBNull(reader.GetOrdinal("NextRetryTime")) | ||
| ? null | ||
| : reader.GetDateTime(reader.GetOrdinal("NextRetryTime")), | ||
| ExtraProperties = new Dictionary<string, object>() | ||
| ExtraProperties = DeserializeExtraProperties(reader) | ||
| }; | ||
|
|
||
| messages.Add(message); | ||
|
|
@@ -240,4 +241,18 @@ protected virtual void AddParameter(DbCommand command, string name, object value | |
| parameter.Value = value ?? DBNull.Value; | ||
| command.Parameters.Add(parameter); | ||
| } | ||
|
|
||
| private static Dictionary<string, object> DeserializeExtraProperties(DbDataReader reader) | ||
| { | ||
| var ordinal = reader.GetOrdinal("ExtraProperties"); | ||
| if (reader.IsDBNull(ordinal)) | ||
| return new Dictionary<string, object>(); | ||
|
|
||
| var json = reader.GetString(ordinal); | ||
| if (string.IsNullOrWhiteSpace(json) || json == "{}") | ||
| return new Dictionary<string, object>(); | ||
|
|
||
| return JsonSerializer.Deserialize<Dictionary<string, object>>(json) | ||
| ?? new Dictionary<string, object>(); | ||
| } | ||
|
Comment on lines
+245
to
+257
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method introduces a couple of concerns:
private static Dictionary<string, object> DeserializeExtraProperties(DbDataReader reader)
{
var ordinal = reader.GetOrdinal("ExtraProperties");
if (reader.IsDBNull(ordinal))
return new Dictionary<string, object>();
var json = reader.GetString(ordinal);
if (string.IsNullOrWhiteSpace(json) || json == "{}")
return new Dictionary<string, object>();
try
{
return JsonSerializer.Deserialize<Dictionary<string, object>>(json)
?? new Dictionary<string, object>();
}
catch (JsonException)
{
// Consider logging this exception to aid in debugging malformed data.
return new Dictionary<string, object>();
}
} |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
| using System.Collections.Generic; | ||
| using System.Data; | ||
| using System.Data.Common; | ||
| using System.Text.Json; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using BBT.Aether.Clock; | ||
|
|
@@ -102,7 +103,7 @@ LIMIT @batchSize | |
| FOR UPDATE SKIP LOCKED | ||
| ) | ||
| RETURNING "Id", "Status", "EventName", "EventData", "CreatedAt", | ||
| "ProcessedAt", "LockedBy", "LockedUntil", "LastError", "RetryCount", "NextRetryAt"; | ||
| "ProcessedAt", "LockedBy", "LockedUntil", "LastError", "RetryCount", "NextRetryAt", "ExtraProperties"; | ||
| """; | ||
|
|
||
| AddParameter(command, "@processing", (int)OutboxMessageStatus.Processing); | ||
|
|
@@ -139,7 +140,8 @@ FOR UPDATE SKIP LOCKED | |
| RetryCount = reader.GetInt32(reader.GetOrdinal("RetryCount")), | ||
| NextRetryAt = reader.IsDBNull(reader.GetOrdinal("NextRetryAt")) | ||
| ? null | ||
| : reader.GetDateTime(reader.GetOrdinal("NextRetryAt")) | ||
| : reader.GetDateTime(reader.GetOrdinal("NextRetryAt")), | ||
| ExtraProperties = DeserializeExtraProperties(reader) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion (bug_risk): JSON deserialization from the DB can throw and currently isn’t guarded, which may break outbox processing for a single bad record. If Suggested implementation: private Dictionary<string, object> DeserializeExtraProperties(DbDataReader reader)
{
var extraPropertiesOrdinal = reader.GetOrdinal("ExtraProperties");
if (reader.IsDBNull(extraPropertiesOrdinal))
{
return new Dictionary<string, object>();
}
var json = reader.GetString(extraPropertiesOrdinal);
if (string.IsNullOrWhiteSpace(json))
{
return new Dictionary<string, object>();
}
try
{
var extraProperties =
JsonSerializer.Deserialize<Dictionary<string, object>>(json);
return extraProperties ?? new Dictionary<string, object>();
}
catch (JsonException ex)
{
_logger?.LogWarning(
ex,
"Failed to deserialize ExtraProperties for outbox message. Raw value: {ExtraPropertiesJson}",
json);
return new Dictionary<string, object>();
}
}The above edit assumes:
If these assumptions do not match your file:
|
||
| }; | ||
|
|
||
| messages.Add(message); | ||
|
|
@@ -155,5 +157,19 @@ protected virtual void AddParameter(DbCommand command, string name, object value | |
| parameter.Value = value ?? DBNull.Value; | ||
| command.Parameters.Add(parameter); | ||
| } | ||
|
|
||
| private static Dictionary<string, object> DeserializeExtraProperties(DbDataReader reader) | ||
| { | ||
| var ordinal = reader.GetOrdinal("ExtraProperties"); | ||
| if (reader.IsDBNull(ordinal)) | ||
| return new Dictionary<string, object>(); | ||
|
|
||
| var json = reader.GetString(ordinal); | ||
| if (string.IsNullOrWhiteSpace(json) || json == "{}") | ||
| return new Dictionary<string, object>(); | ||
|
|
||
| return JsonSerializer.Deserialize<Dictionary<string, object>>(json) | ||
| ?? new Dictionary<string, object>(); | ||
| } | ||
|
Comment on lines
+161
to
+173
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -97,7 +97,7 @@ protected virtual async Task ProcessOutboxMessagesAsync(CancellationToken cancel | |
| domainMessage.LockedUntil = null; | ||
| } | ||
|
|
||
| logger.LogDebug("Successfully published outbox message {MessageId}", message.Id); | ||
| logger.LogInformation("Successfully published outbox message {MessageId}", message.Id); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changing this log level from |
||
| } | ||
| catch (Exception ex) | ||
| { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: The inbox and outbox stores now duplicate identical JSON deserialization logic that could be centralized.
EfCoreInboxStoreandEfCoreOutboxStoreboth implementDeserializeExtraPropertiesidentically. Please extract this into a shared helper (for example, an internal static utility in the same namespace) to remove duplication and keep JSON handling consistent across both stores.Suggested implementation:
usingdirectives exist at the top of this file (or are otherwise available in the project):using System.Collections.Generic;using System.Data.Common;using System.Text.Json;EfCoreOutboxStore(or the corresponding outbox store file), replace its localDeserializeExtraPropertiesimplementation with calls toEventStoreDataReaderHelper.DeserializeExtraProperties(reader)so that both stores share the same deserialization logic.DeserializeExtraPropertiesfrom bothEfCoreInboxStoreandEfCoreOutboxStoreonce they are fully migrated to the shared helper.JsonSerializerOptions(e.g., a shared options instance for polymorphic support or naming policies), adapt theJsonSerializer.Deserializecall to use that options object instead of the default.