Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Data;
using System.Data.Common;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using BBT.Aether.Clock;
Expand Down Expand Up @@ -189,7 +190,7 @@ LIMIT @batchSize
FOR UPDATE SKIP LOCKED
)
RETURNING "Id", "Status", "EventName", "EventData", "CreatedAt",
"HandledTime", "LockedBy", "LockedUntil", "RetryCount", "NextRetryTime";
"HandledTime", "LockedBy", "LockedUntil", "RetryCount", "NextRetryTime", "ExtraProperties";
""";

AddParameter(command, "@processing", (int)IncomingEventStatus.Processing);
Expand Down Expand Up @@ -224,7 +225,7 @@ FOR UPDATE SKIP LOCKED
NextRetryTime = reader.IsDBNull(reader.GetOrdinal("NextRetryTime"))
? null
: reader.GetDateTime(reader.GetOrdinal("NextRetryTime")),
ExtraProperties = new Dictionary<string, object>()
ExtraProperties = DeserializeExtraProperties(reader)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: The inbox and outbox stores now duplicate identical JSON deserialization logic that could be centralized.

EfCoreInboxStore and EfCoreOutboxStore both implement DeserializeExtraProperties identically. Please extract this into a shared helper (for example, an internal static utility in the same namespace) to remove duplication and keep JSON handling consistent across both stores.

Suggested implementation:

                ExtraProperties = EventStoreDataReaderHelper.DeserializeExtraProperties(reader)
        parameter.Value = value ?? DBNull.Value;
    }

    internal static class EventStoreDataReaderHelper
    {
        public static Dictionary<string, object> DeserializeExtraProperties(DbDataReader reader)
        {
            var ordinal = reader.GetOrdinal("ExtraProperties");

            if (reader.IsDBNull(ordinal))
            {
                return new Dictionary<string, object>();
            }

            var json = reader.GetString(ordinal);

            if (string.IsNullOrWhiteSpace(json))
            {
                return new Dictionary<string, object>();
            }

            Dictionary<string, object>? result;

            try
            {
                result = JsonSerializer.Deserialize<Dictionary<string, object>>(json);
            }
            catch (JsonException)
            {
                // In case of malformed JSON, fallback to empty dictionary to avoid breaking processing.
                result = null;
            }

            return result ?? new Dictionary<string, object>();
        }
  1. Ensure the following using directives exist at the top of this file (or are otherwise available in the project):
    • using System.Collections.Generic;
    • using System.Data.Common;
    • using System.Text.Json;
  2. In EfCoreOutboxStore (or the corresponding outbox store file), replace its local DeserializeExtraProperties implementation with calls to EventStoreDataReaderHelper.DeserializeExtraProperties(reader) so that both stores share the same deserialization logic.
  3. Remove any now-unused instance methods named DeserializeExtraProperties from both EfCoreInboxStore and EfCoreOutboxStore once they are fully migrated to the shared helper.
  4. If the project uses a custom JsonSerializerOptions (e.g., a shared options instance for polymorphic support or naming policies), adapt the JsonSerializer.Deserialize call to use that options object instead of the default.

};

messages.Add(message);
Expand All @@ -240,4 +241,18 @@ protected virtual void AddParameter(DbCommand command, string name, object value
parameter.Value = value ?? DBNull.Value;
command.Parameters.Add(parameter);
}

private static Dictionary<string, object> DeserializeExtraProperties(DbDataReader reader)
{
var ordinal = reader.GetOrdinal("ExtraProperties");
if (reader.IsDBNull(ordinal))
return new Dictionary<string, object>();

var json = reader.GetString(ordinal);
if (string.IsNullOrWhiteSpace(json) || json == "{}")
return new Dictionary<string, object>();

return JsonSerializer.Deserialize<Dictionary<string, object>>(json)
?? new Dictionary<string, object>();
}
Comment on lines +245 to +257

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This method introduces a couple of concerns:

  1. Code Duplication: This logic is identical to the new method in EfCoreOutboxStore.cs. To follow the DRY (Don't Repeat Yourself) principle, this should be extracted to a shared internal static helper class. The existing AddParameter method is also duplicated across both classes and could be moved there as well.

  2. Error Handling: The JsonSerializer.Deserialize call can throw a JsonException if the JSON string from the database is malformed. This would cause the entire batch processing to fail. It's safer to wrap this call in a try-catch block and return an empty dictionary on failure, similar to the pattern in ExtraPropertiesValueConverter.

    private static Dictionary<string, object> DeserializeExtraProperties(DbDataReader reader)
    {
        var ordinal = reader.GetOrdinal("ExtraProperties");
        if (reader.IsDBNull(ordinal))
            return new Dictionary<string, object>();

        var json = reader.GetString(ordinal);
        if (string.IsNullOrWhiteSpace(json) || json == "{}")
            return new Dictionary<string, object>();

        try
        {
            return JsonSerializer.Deserialize<Dictionary<string, object>>(json)
                   ?? new Dictionary<string, object>();
        }
        catch (JsonException)
        {
            // Consider logging this exception to aid in debugging malformed data.
            return new Dictionary<string, object>();
        }
    }

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using BBT.Aether.Clock;
Expand Down Expand Up @@ -102,7 +103,7 @@ LIMIT @batchSize
FOR UPDATE SKIP LOCKED
)
RETURNING "Id", "Status", "EventName", "EventData", "CreatedAt",
"ProcessedAt", "LockedBy", "LockedUntil", "LastError", "RetryCount", "NextRetryAt";
"ProcessedAt", "LockedBy", "LockedUntil", "LastError", "RetryCount", "NextRetryAt", "ExtraProperties";
""";

AddParameter(command, "@processing", (int)OutboxMessageStatus.Processing);
Expand Down Expand Up @@ -139,7 +140,8 @@ FOR UPDATE SKIP LOCKED
RetryCount = reader.GetInt32(reader.GetOrdinal("RetryCount")),
NextRetryAt = reader.IsDBNull(reader.GetOrdinal("NextRetryAt"))
? null
: reader.GetDateTime(reader.GetOrdinal("NextRetryAt"))
: reader.GetDateTime(reader.GetOrdinal("NextRetryAt")),
ExtraProperties = DeserializeExtraProperties(reader)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): JSON deserialization from the DB can throw and currently isn’t guarded, which may break outbox processing for a single bad record.

If ExtraProperties contains malformed JSON, JsonSerializer.Deserialize will throw and stop the whole batch. Consider catching JsonException in DeserializeExtraProperties, logging it, and returning an empty dictionary so a single corrupt row doesn’t block processing.

Suggested implementation:

        private Dictionary<string, object> DeserializeExtraProperties(DbDataReader reader)
        {
            var extraPropertiesOrdinal = reader.GetOrdinal("ExtraProperties");

            if (reader.IsDBNull(extraPropertiesOrdinal))
            {
                return new Dictionary<string, object>();
            }

            var json = reader.GetString(extraPropertiesOrdinal);

            if (string.IsNullOrWhiteSpace(json))
            {
                return new Dictionary<string, object>();
            }

            try
            {
                var extraProperties =
                    JsonSerializer.Deserialize<Dictionary<string, object>>(json);

                return extraProperties ?? new Dictionary<string, object>();
            }
            catch (JsonException ex)
            {
                _logger?.LogWarning(
                    ex,
                    "Failed to deserialize ExtraProperties for outbox message. Raw value: {ExtraPropertiesJson}",
                    json);

                return new Dictionary<string, object>();
            }
        }

The above edit assumes:

  1. There is already a DeserializeExtraProperties(DbDataReader reader) method matching the SEARCH block.
  2. The class has an _logger field of type ILogger<EfCoreOutboxStore> (or compatible).

If these assumptions do not match your file:

  • Remove static from the existing DeserializeExtraProperties definition (if it is static) so _logger is accessible, or adapt logging to your existing logging mechanism.
  • Ensure using System.Text.Json; and using Microsoft.Extensions.Logging; are present at the top of the file.
  • If _logger does not exist, inject an ILogger<EfCoreOutboxStore> into the class (constructor) and assign it to a private readonly _logger field, or replace the _logger?.LogWarning(...) call with your project's standard logging API.

};

messages.Add(message);
Expand All @@ -155,5 +157,19 @@ protected virtual void AddParameter(DbCommand command, string name, object value
parameter.Value = value ?? DBNull.Value;
command.Parameters.Add(parameter);
}

private static Dictionary<string, object> DeserializeExtraProperties(DbDataReader reader)
{
var ordinal = reader.GetOrdinal("ExtraProperties");
if (reader.IsDBNull(ordinal))
return new Dictionary<string, object>();

var json = reader.GetString(ordinal);
if (string.IsNullOrWhiteSpace(json) || json == "{}")
return new Dictionary<string, object>();

return JsonSerializer.Deserialize<Dictionary<string, object>>(json)
?? new Dictionary<string, object>();
}
Comment on lines +161 to +173

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This method is a duplicate of the one added to EfCoreInboxStore.cs. Please see my comment on that file for suggestions regarding error handling and refactoring to avoid code duplication.

}

Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected virtual async Task ProcessOutboxMessagesAsync(CancellationToken cancel
domainMessage.LockedUntil = null;
}

logger.LogDebug("Successfully published outbox message {MessageId}", message.Id);
logger.LogInformation("Successfully published outbox message {MessageId}", message.Id);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Changing this log level from Debug to Information might lead to excessive logging in a production environment, especially with a large batch size. This log is inside a loop that iterates over all leased messages. A LogInformation entry for every successfully published message could create a lot of noise. Consider if LogDebug is more appropriate for this fine-grained success message, or perhaps log a summary after the loop completes.

}
catch (Exception ex)
{
Expand Down