`.
+- MAPS Redis extension consumes via consumer group and acknowledges.
+- Verified by test:
+ - `registerRemoteLink_Stream_DeliversInboundMessageAndAcks`
+
+### Demo C: MAPS -> Redis CloudEvent stream for third-party consumers
+
+- MAPS push link with `cloud_event.mode: wrap` writes CloudEvent headers (`ce_*`) plus payload into Redis envelope.
+- Any Redis client can `XREAD` and decode envelope bytes.
+- Verified by test:
+ - `streamRoundTrip_RedisToMapsToCloudEventToMapsToRedis_RetainsTypesAndHeaders`
+
+## Running Tests
+
+Standard suite:
+
+```bash
+mvn -q -pl redis-extension test
+```
+
+Perf baselines:
+
+```bash
+mvn -q -pl redis-extension -Dsurefire.excludedGroups= -Dgroups=perf test
+```
+
diff --git a/redis-extension/pom.xml b/redis-extension/pom.xml
new file mode 100644
index 0000000..0080298
--- /dev/null
+++ b/redis-extension/pom.xml
@@ -0,0 +1,82 @@
+
+
+
+ 4.0.0
+
+
+ io.mapsmessaging
+ extension-project
+ 1.0.0-SNAPSHOT
+ ../pom.xml
+
+
+ redis-extension
+
+
+ ${env.NVD_API_KEY}
+ 11
+ UTF-8
+ matthew.buckton@mapsmessaging.io
+ perf
+
+
+
+
+ io.lettuce
+ lettuce-core
+ 6.5.5.RELEASE
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.11.4
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ 1.21.3
+ test
+
+
+ org.testcontainers
+ testcontainers
+ 1.21.3
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.2.5
+
+ ${surefire.excludedGroups}
+
+
+
+
+
diff --git a/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisLogMessages.java b/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisLogMessages.java
new file mode 100644
index 0000000..10c7f91
--- /dev/null
+++ b/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisLogMessages.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.mapsmessaging.network.protocol.impl.redis;
+
+import io.mapsmessaging.logging.Category;
+import io.mapsmessaging.logging.LEVEL;
+import io.mapsmessaging.logging.LogMessage;
+
+/**
+ * Structured log templates used by the Redis extension.
+ */
+public enum RedisLogMessages implements LogMessage {
+
+ INITIALISE_REDIS_ENDPOINT(LEVEL.INFO, REDIS_CATEGORY.PROTOCOL, "Initialising redis endpoint on {}"),
+ REDIS_ENDPOINT_INITIALIZED(LEVEL.INFO, REDIS_CATEGORY.PROTOCOL, "Redis endpoint initialised for {}"),
+ REDIS_ENDPOINT_CLOSED(LEVEL.INFO, REDIS_CATEGORY.PROTOCOL, "Redis endpoint closed"),
+ REDIS_ENDPOINT_CLOSE_ERROR(LEVEL.ERROR, REDIS_CATEGORY.PROTOCOL, "Redis close error"),
+ REDIS_SEND_MESSAGE(LEVEL.DEBUG, REDIS_CATEGORY.PROTOCOL, "Redis sent message to {} via {}"),
+ REDIS_FAILED_TO_SEND_MESSAGE(LEVEL.ERROR, REDIS_CATEGORY.PROTOCOL, "Failed to send message to {}"),
+ REDIS_SUBSCRIBE_LOCAL_SUCCESS(LEVEL.INFO, REDIS_CATEGORY.PROTOCOL, "Registered push link from {} to {}"),
+ REDIS_SUBSCRIBE_REMOTE_SUCCESS(LEVEL.INFO, REDIS_CATEGORY.PROTOCOL, "Registered pull link from {} to {}"),
+ REDIS_FAILED_TO_PROCESS_INCOMING_EVENT(LEVEL.ERROR, REDIS_CATEGORY.PROTOCOL, "Failed to process incoming message from {}"),
+ REDIS_CONSUMER_ERROR(LEVEL.ERROR, REDIS_CATEGORY.PROTOCOL, "Redis consumer poll failed for {}"),
+ REDIS_CONFIGURATION_WARNING(LEVEL.WARN, REDIS_CATEGORY.PROTOCOL, "Redis configuration warning: {}"),
+ ;
+
+ private final String message;
+ private final LEVEL level;
+ private final Category category;
+ private final int parameterCount;
+
+ /**
+ * Creates a new enum-backed log message with precomputed placeholder count.
+ *
+ * @param level log level
+ * @param category log category
+ * @param message template text that may include {@code {}}
+ */
+ RedisLogMessages(LEVEL level, Category category, String message) {
+ this.message = message;
+ this.level = level;
+ this.category = category;
+ int location = message.indexOf("{}");
+ int count = 0;
+ while (location != -1) {
+ count++;
+ location = message.indexOf("{}", location + 2);
+ }
+ this.parameterCount = count;
+ }
+
+ @Override
+ public String getMessage() {
+ return message;
+ }
+
+ @Override
+ public LEVEL getLevel() {
+ return level;
+ }
+
+ @Override
+ public Category getCategory() {
+ return category;
+ }
+
+ @Override
+ public int getParameterCount() {
+ return parameterCount;
+ }
+
+ /**
+ * Category constants for Redis protocol logging.
+ */
+ public enum REDIS_CATEGORY implements Category {
+ PROTOCOL("Protocol");
+
+ private final String description;
+
+ @Override
+ public String getDivision() {
+ return "Inter-Protocol";
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ REDIS_CATEGORY(String description) {
+ this.description = description;
+ }
+ }
+}
diff --git a/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocol.java b/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocol.java
new file mode 100644
index 0000000..636b9d5
--- /dev/null
+++ b/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocol.java
@@ -0,0 +1,1240 @@
+/*
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.mapsmessaging.network.protocol.impl.redis;
+
+import io.lettuce.core.Consumer;
+import io.lettuce.core.RedisBusyException;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.StreamMessage;
+import io.lettuce.core.XGroupCreateArgs;
+import io.lettuce.core.XReadArgs;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.codec.ByteArrayCodec;
+import io.lettuce.core.codec.RedisCodec;
+import io.lettuce.core.codec.StringCodec;
+import io.lettuce.core.models.stream.PendingMessages;
+import io.lettuce.core.pubsub.RedisPubSubAdapter;
+import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
+import io.mapsmessaging.api.MessageBuilder;
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.api.message.TypedData;
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.logging.Logger;
+import io.mapsmessaging.logging.LoggerFactory;
+import io.mapsmessaging.network.EndPointURL;
+import io.mapsmessaging.network.io.EndPoint;
+import io.mapsmessaging.network.protocol.impl.extension.Extension;
+import jakarta.validation.constraints.NotNull;
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Redis extension protocol that bridges MAPS namespaces with Redis Pub/Sub and Streams.
+ *
+ * The implementation preserves MAPS typed headers, supports CloudEvent wrapping on outbound
+ * links, consumes pull links via pub/sub and stream consumer groups, and emits optional
+ * MAPS-native pull-path metrics.
+ */
+public class RedisProtocol extends Extension {
+
+ private static final String REDIS_MODE = "redis.mode";
+ private static final String CLOUD_EVENT_MODE = "cloud_event.mode";
+ private static final String CLOUD_EVENT_TYPE = "cloud_event.type";
+ private static final String CLOUD_EVENT_SOURCE = "cloud_event.source";
+
+ private static final String REDIS_STREAM_GROUP = "redis.stream.group";
+ private static final String REDIS_STREAM_CONSUMER = "redis.stream.consumer";
+ private static final String REDIS_STREAM_POLL_MS = "redis.stream.poll_ms";
+ private static final String REDIS_STREAM_MIN_POLL_MS = "redis.stream.min_poll_ms";
+ private static final String REDIS_STREAM_BATCH_SIZE = "redis.stream.batch_size";
+ private static final String REDIS_STREAM_PENDING_SAMPLE_EVERY = "redis.stream.pending_sample_every";
+ private static final String REDIS_RECONNECT_INITIAL_MS = "redis.reconnect.initial_ms";
+ private static final String REDIS_RECONNECT_MAX_MS = "redis.reconnect.max_ms";
+ private static final String REDIS_METRICS_ENABLED = "redis.metrics.enabled";
+ private static final String REDIS_METRICS_NAMESPACE = "redis.metrics.namespace";
+ private static final String REDIS_METRICS_PUBLISH_MS = "redis.metrics.publish_ms";
+ private static final String REDIS_ALERT_PENDING_THRESHOLD = "redis.alert.pending_threshold";
+ private static final String REDIS_ALERT_RECONNECT_DELTA = "redis.alert.reconnect_delta_threshold";
+
+ private static final String MAPS_DATA_HEADER_PREFIX = "maps.data.";
+ private static final String MAPS_TYPE_HEADER_PREFIX = "maps.type.";
+ private static final String MAPS_CONTENT_TYPE_HEADER = "maps.contentType";
+
+ private static final RedisCodec STRING_BYTE_CODEC = RedisCodec.of(StringCodec.UTF8, ByteArrayCodec.INSTANCE);
+
+ private final Logger logger;
+ private final EndPointURL url;
+ private final ExtensionConfigDTO protocolConfig;
+ private final Map pushBindings;
+ private final Map pullBindings;
+ private final Map> streamTasks;
+
+ private RedisBridge redisBridge;
+ private ScheduledExecutorService streamScheduler;
+ private ScheduledFuture> metricsTask;
+ private boolean metricsEnabled;
+ private String metricsNamespace;
+ private long metricsPublishMs;
+ private long alertPendingThreshold;
+ private long alertReconnectDeltaThreshold;
+
+ private RedisClient pullPubSubClient;
+ private StatefulRedisPubSubConnection pullPubSubConnection;
+
+ /**
+ * Constructs the protocol from a runtime endpoint.
+ *
+ * @param endPoint runtime endpoint
+ * @param protocolConfigDTO extension config DTO
+ */
+ public RedisProtocol(@NonNull @NotNull EndPoint endPoint, ExtensionConfigDTO protocolConfigDTO) {
+ this.url = new EndPointURL(endPoint.getConfig().getUrl());
+ this.protocolConfig = protocolConfigDTO;
+ this.logger = LoggerFactory.getLogger(RedisProtocol.class);
+ this.pushBindings = new ConcurrentHashMap<>();
+ this.pullBindings = new ConcurrentHashMap<>();
+ this.streamTasks = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Test-focused constructor that uses a raw URL and plain config map.
+ *
+ * @param urlString redis URL
+ * @param configMap protocol configuration
+ */
+ RedisProtocol(String urlString, Map configMap) {
+ this.url = new EndPointURL(urlString);
+ this.protocolConfig = new ExtensionConfigDTO() {
+ @Override
+ public Map getConfig() {
+ return configMap;
+ }
+ };
+ this.logger = LoggerFactory.getLogger(RedisProtocol.class);
+ this.pushBindings = new ConcurrentHashMap<>();
+ this.pullBindings = new ConcurrentHashMap<>();
+ this.streamTasks = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Injects a custom bridge implementation used by unit and integration tests.
+ *
+ * @param bridge bridge implementation
+ */
+ void setRedisBridgeForTest(RedisBridge bridge) {
+ this.redisBridge = bridge;
+ }
+
+ /**
+ * Decodes a raw wire envelope into a MAPS message for tests.
+ *
+ * @param envelope encoded envelope bytes
+ * @param sourceNamespace source namespace/channel/stream
+ * @param transport transport mode ({@code pubsub} or {@code stream})
+ * @return decoded MAPS message
+ */
+ Message convertInboundEnvelopeForTest(byte[] envelope, String sourceNamespace, String transport) {
+ return convertInboundEnvelope(envelope, sourceNamespace, transport);
+ }
+
+ /**
+ * Forcibly closes the stream consumer connection for a pull binding to trigger reconnect logic.
+ *
+ * @param remoteNamespace stream namespace used by the pull binding
+ */
+ void forceCloseStreamConnectionForTest(String remoteNamespace) {
+ PullBinding binding = pullBindings.get(remoteNamespace);
+ if (binding == null || !"stream".equals(binding.mode)) {
+ return;
+ }
+ markStreamDisconnected(binding);
+ }
+
+ /**
+ * Returns snapshot pull statistics for the requested remote namespace.
+ *
+ * @param remoteNamespace pull binding key
+ * @return stats snapshot or {@code null} if no binding exists
+ */
+ PullStats pullStatsForTest(String remoteNamespace) {
+ PullBinding binding = pullBindings.get(remoteNamespace);
+ if (binding == null) {
+ return null;
+ }
+ return new PullStats(
+ binding.receivedCount.get(),
+ binding.errorCount.get(),
+ binding.reconnectCount.get(),
+ binding.lastPendingCount.get(),
+ binding.lastAckLatencyMs.get()
+ );
+ }
+
+ /**
+ * Initializes Redis clients, scheduler infrastructure, and optional metrics publisher.
+ */
+ @Override
+ public void initialise() {
+ logger.log(RedisLogMessages.INITIALISE_REDIS_ENDPOINT, url.toString());
+ if (redisBridge == null) {
+ redisBridge = new LettuceRedisBridge(url.toString());
+ }
+ if (streamScheduler == null) {
+ streamScheduler = Executors.newScheduledThreadPool(2, runnable -> {
+ Thread thread = new Thread(runnable, "redis-stream-poller");
+ thread.setDaemon(true);
+ return thread;
+ });
+ }
+
+ Map config = currentConfig();
+ metricsEnabled = parseBoolean(config.getOrDefault(REDIS_METRICS_ENABLED, false), false);
+ metricsNamespace = valueAsString(config.getOrDefault(REDIS_METRICS_NAMESPACE, "/metrics/redis/pull"), "/metrics/redis/pull");
+ metricsPublishMs = Math.max(250L, parseLong(config.getOrDefault(REDIS_METRICS_PUBLISH_MS, 5000), 5000L));
+ alertPendingThreshold = Math.max(0L, parseLong(config.getOrDefault(REDIS_ALERT_PENDING_THRESHOLD, 100), 100L));
+ alertReconnectDeltaThreshold = Math.max(1L, parseLong(config.getOrDefault(REDIS_ALERT_RECONNECT_DELTA, 1), 1L));
+
+ if (metricsEnabled && metricsTask == null) {
+ metricsTask = streamScheduler.scheduleAtFixedRate(
+ this::publishPullMetrics,
+ metricsPublishMs,
+ metricsPublishMs,
+ TimeUnit.MILLISECONDS
+ );
+ }
+ logger.log(RedisLogMessages.REDIS_ENDPOINT_INITIALIZED, url.toString());
+ }
+
+ /**
+ * Closes producer/consumer resources and background tasks.
+ *
+ * @throws IOException propagated from extension super-class shutdown
+ */
+ @Override
+ public void close() throws IOException {
+ for (Map.Entry> entry : streamTasks.entrySet()) {
+ ScheduledFuture> task = entry.getValue();
+ if (task != null) {
+ task.cancel(true);
+ }
+ }
+ streamTasks.clear();
+ if (metricsTask != null) {
+ metricsTask.cancel(true);
+ metricsTask = null;
+ }
+
+ for (PullBinding binding : pullBindings.values()) {
+ closeQuietly(binding.streamConnection);
+ closeQuietly(binding.streamClient);
+ }
+ pullBindings.clear();
+
+ if (pullPubSubConnection != null) {
+ try {
+ pullPubSubConnection.sync().unsubscribe();
+ } catch (Throwable ignored) {
+ // no-op
+ }
+ }
+ closeQuietly(pullPubSubConnection);
+ closeQuietly(pullPubSubClient);
+ pullPubSubConnection = null;
+ pullPubSubClient = null;
+
+ if (streamScheduler != null) {
+ streamScheduler.shutdownNow();
+ streamScheduler = null;
+ }
+
+ if (redisBridge != null) {
+ try {
+ redisBridge.close();
+ } catch (Throwable e) {
+ logger.log(RedisLogMessages.REDIS_ENDPOINT_CLOSE_ERROR, e);
+ }
+ }
+ redisBridge = null;
+ pushBindings.clear();
+
+ logger.log(RedisLogMessages.REDIS_ENDPOINT_CLOSED);
+ super.close();
+ }
+
+ /**
+ * @return extension name
+ */
+ @Override
+ public @NonNull String getName() {
+ return "RedisProtocol";
+ }
+
+ /**
+ * @return extension protocol version
+ */
+ @Override
+ public String getVersion() {
+ return "1.0";
+ }
+
+ /**
+ * @return {@code false}; remote filtering is not delegated to Redis.
+ */
+ @Override
+ public boolean supportsRemoteFiltering() {
+ return false;
+ }
+
+ /**
+ * Registers a MAPS local outbound link mapped to a Redis destination.
+ *
+ * @param destination local namespace
+ */
+ @Override
+ public void registerLocalLink(@NonNull @NotNull String destination) {
+ Map attrs = findLinkAttributes(destination, "push", true);
+ String namespace = valueAsString(attrs.getOrDefault("remote_namespace", destination), destination);
+
+ PushBinding binding = new PushBinding();
+ binding.localNamespace = destination;
+ binding.remoteNamespace = namespace;
+ binding.mode = parseMode(attrs.getOrDefault(REDIS_MODE, "pubsub"), destination);
+ binding.cloudEventMode = valueAsString(attrs.getOrDefault(CLOUD_EVENT_MODE, "none"), "none").toLowerCase(Locale.ROOT);
+ binding.cloudEventType = valueAsString(attrs.getOrDefault(CLOUD_EVENT_TYPE, "io.mapsmessaging.event"), "io.mapsmessaging.event");
+ binding.cloudEventSource = valueAsString(attrs.getOrDefault(CLOUD_EVENT_SOURCE, "/" + destination), "/" + destination);
+
+ pushBindings.put(destination, binding);
+ logger.log(RedisLogMessages.REDIS_SUBSCRIBE_LOCAL_SUCCESS, destination, namespace);
+ }
+
+ /**
+ * Registers a Redis pull link and starts corresponding consumers.
+ *
+ * @param destination remote namespace key
+ * @param filter optional filter (unused)
+ * @throws IOException if registration fails
+ */
+ @Override
+ public void registerRemoteLink(@NotNull @NotNull String destination, String filter) throws IOException {
+ Map attrs = findLinkAttributes(destination, "pull", false);
+ String remoteNamespace = valueAsString(attrs.getOrDefault("remote_namespace", destination), destination);
+ String localNamespace = valueAsString(attrs.getOrDefault("local_namespace", destination), destination);
+ String mode = parseMode(attrs.getOrDefault(REDIS_MODE, "pubsub"), destination);
+
+ PullBinding existing = pullBindings.remove(remoteNamespace);
+ if (existing != null) {
+ stopPullBinding(existing);
+ }
+
+ PullBinding binding = new PullBinding();
+ binding.remoteNamespace = remoteNamespace;
+ binding.localNamespace = localNamespace;
+ binding.mode = mode;
+
+ if ("stream".equals(mode)) {
+ binding.streamGroup = valueAsString(attrs.getOrDefault(REDIS_STREAM_GROUP, "maps-redis-group"), "maps-redis-group");
+ binding.streamConsumer = valueAsString(attrs.getOrDefault(REDIS_STREAM_CONSUMER, "maps-redis-consumer"), "maps-redis-consumer");
+ binding.streamPollMs = parseLong(attrs.getOrDefault(REDIS_STREAM_POLL_MS, 500), 500L);
+ binding.streamMinPollMs = Math.max(1L, parseLong(attrs.getOrDefault(REDIS_STREAM_MIN_POLL_MS, 100), 100L));
+ binding.streamBatchSize = (int) Math.max(1L, parseLong(attrs.getOrDefault(REDIS_STREAM_BATCH_SIZE, 32), 32L));
+ binding.streamPendingSampleEvery = (int) Math.max(1L, parseLong(attrs.getOrDefault(REDIS_STREAM_PENDING_SAMPLE_EVERY, 1), 1L));
+ binding.reconnectInitialMs = Math.max(100L, parseLong(attrs.getOrDefault(REDIS_RECONNECT_INITIAL_MS, 250), 250L));
+ binding.reconnectMaxMs = Math.max(binding.reconnectInitialMs, parseLong(attrs.getOrDefault(REDIS_RECONNECT_MAX_MS, 5000), 5000L));
+ binding.currentReconnectMs = binding.reconnectInitialMs;
+ startStreamPull(binding);
+ } else {
+ startPubSubPull(binding);
+ }
+
+ pullBindings.put(remoteNamespace, binding);
+ logger.log(RedisLogMessages.REDIS_SUBSCRIBE_REMOTE_SUCCESS, remoteNamespace, localNamespace);
+ }
+
+ /**
+ * Publishes a MAPS outbound message to Redis using the configured link mode.
+ *
+ * @param destinationName local namespace
+ * @param message outbound MAPS message
+ */
+ @Override
+ public void outbound(@NonNull @NotNull String destinationName, @NonNull @NotNull Message message) {
+ PushBinding binding = pushBindings.get(destinationName);
+ if (binding == null) {
+ return;
+ }
+
+ if (redisBridge == null) {
+ logger.log(RedisLogMessages.REDIS_FAILED_TO_SEND_MESSAGE, binding.remoteNamespace, new IllegalStateException("Redis bridge not initialized"));
+ return;
+ }
+
+ Map headers = buildHeaders(message);
+ applyCloudEventHeaders(binding, message, headers);
+
+ byte[] payload = message.getOpaqueData() == null ? new byte[0] : message.getOpaqueData();
+ byte[] envelope = RedisWireEnvelope.of(payload, headers).encode();
+
+ try {
+ if ("stream".equals(binding.mode)) {
+ redisBridge.appendStream(binding.remoteNamespace, envelope);
+ } else {
+ redisBridge.publish(binding.remoteNamespace, envelope);
+ }
+ logger.log(RedisLogMessages.REDIS_SEND_MESSAGE, binding.remoteNamespace, binding.mode);
+ } catch (RuntimeException e) {
+ logger.log(RedisLogMessages.REDIS_FAILED_TO_SEND_MESSAGE, binding.remoteNamespace, e);
+ }
+ }
+
+ /**
+ * Starts a pub/sub pull subscription for a binding.
+ *
+ * @param binding pull binding
+ */
+ private void startPubSubPull(PullBinding binding) {
+ ensurePubSubPullConnection();
+ pullPubSubConnection.sync().subscribe(binding.remoteNamespace);
+ }
+
+ /**
+ * Lazily creates a shared pub/sub connection and listener.
+ */
+ private void ensurePubSubPullConnection() {
+ if (pullPubSubConnection != null) {
+ return;
+ }
+
+ pullPubSubClient = RedisClient.create(url.toString());
+ pullPubSubConnection = pullPubSubClient.connectPubSub(STRING_BYTE_CODEC);
+ pullPubSubConnection.addListener(new RedisPubSubAdapter() {
+ @Override
+ public void message(String channel, byte[] message) {
+ PullBinding binding = pullBindings.get(channel);
+ if (binding == null || !"pubsub".equals(binding.mode)) {
+ return;
+ }
+
+ try {
+ Message inboundMessage = convertInboundEnvelope(message, channel, "pubsub");
+ binding.receivedCount.incrementAndGet();
+ enrichInboundMetrics(binding, inboundMessage);
+ inbound(binding.localNamespace, inboundMessage);
+ } catch (Throwable e) {
+ binding.errorCount.incrementAndGet();
+ logger.log(RedisLogMessages.REDIS_FAILED_TO_PROCESS_INCOMING_EVENT, channel, e);
+ }
+ }
+ });
+ }
+
+ /**
+ * Starts a stream consumer-group poller for a binding.
+ *
+ * @param binding pull binding
+ */
+ private void startStreamPull(PullBinding binding) {
+ if (streamScheduler == null) {
+ streamScheduler = Executors.newScheduledThreadPool(2, runnable -> {
+ Thread thread = new Thread(runnable, "redis-stream-poller");
+ thread.setDaemon(true);
+ return thread;
+ });
+ }
+
+ connectStreamBinding(binding);
+
+ ScheduledFuture> task = streamScheduler.scheduleAtFixedRate(
+ new StreamPoller(binding),
+ 0,
+ Math.max(binding.streamMinPollMs, binding.streamPollMs),
+ TimeUnit.MILLISECONDS
+ );
+ binding.streamTask = task;
+ streamTasks.put(binding.remoteNamespace, task);
+ }
+
+ /**
+ * Stops tasks and network resources associated with a pull binding.
+ *
+ * @param binding pull binding
+ */
+ private void stopPullBinding(PullBinding binding) {
+ ScheduledFuture> task = streamTasks.remove(binding.remoteNamespace);
+ if (task != null) {
+ task.cancel(true);
+ }
+
+ if ("pubsub".equals(binding.mode) && pullPubSubConnection != null) {
+ try {
+ pullPubSubConnection.sync().unsubscribe(binding.remoteNamespace);
+ } catch (Throwable ignored) {
+ // no-op
+ }
+ }
+
+ closeQuietly(binding.streamConnection);
+ closeQuietly(binding.streamClient);
+ binding.streamConnection = null;
+ binding.streamClient = null;
+ binding.streamCommands = null;
+ }
+
+ /**
+ * Opens stream connection resources and ensures consumer group exists.
+ *
+ * @param binding pull binding
+ */
+ private void connectStreamBinding(PullBinding binding) {
+ binding.streamClient = RedisClient.create(url.toString());
+ binding.streamConnection = binding.streamClient.connect(STRING_BYTE_CODEC);
+ binding.streamCommands = binding.streamConnection.sync();
+ ensureStreamGroup(binding);
+ binding.currentReconnectMs = binding.reconnectInitialMs;
+ binding.nextReconnectAttemptAtMs = 0;
+ }
+
+ /**
+ * Creates the stream consumer group if missing.
+ *
+ * @param binding pull binding
+ */
+ private void ensureStreamGroup(PullBinding binding) {
+ try {
+ binding.streamCommands.xgroupCreate(
+ XReadArgs.StreamOffset.from(binding.remoteNamespace, "0-0"),
+ binding.streamGroup,
+ new XGroupCreateArgs().mkstream(true)
+ );
+ } catch (RedisBusyException ignored) {
+ // group already exists
+ }
+ }
+
+ /**
+ * Marks a stream binding disconnected and schedules reconnect backoff.
+ *
+ * @param binding pull binding
+ */
+ private void markStreamDisconnected(PullBinding binding) {
+ closeQuietly(binding.streamConnection);
+ closeQuietly(binding.streamClient);
+ binding.streamConnection = null;
+ binding.streamClient = null;
+ binding.streamCommands = null;
+
+ long now = System.currentTimeMillis();
+ if (binding.nextReconnectAttemptAtMs == 0L) {
+ binding.nextReconnectAttemptAtMs = now + binding.currentReconnectMs;
+ }
+ }
+
+ /**
+ * Attempts reconnect for a disconnected stream binding respecting backoff.
+ *
+ * @param binding pull binding
+ */
+ private void tryReconnectStreamBinding(PullBinding binding) {
+ long now = System.currentTimeMillis();
+ if (binding.streamCommands != null || now < binding.nextReconnectAttemptAtMs) {
+ return;
+ }
+
+ try {
+ connectStreamBinding(binding);
+ binding.reconnectCount.incrementAndGet();
+ } catch (Throwable e) {
+ binding.errorCount.incrementAndGet();
+ long next = Math.min(binding.currentReconnectMs * 2L, binding.reconnectMaxMs);
+ binding.currentReconnectMs = next;
+ binding.nextReconnectAttemptAtMs = now + next;
+ logger.log(RedisLogMessages.REDIS_CONSUMER_ERROR, binding.remoteNamespace, e);
+ }
+ }
+
+ /**
+ * Converts encoded Redis envelope bytes into a MAPS message.
+ *
+ * @param envelope encoded envelope
+ * @param sourceNamespace source channel/stream
+ * @param transport source transport
+ * @return decoded message
+ */
+ private Message convertInboundEnvelope(byte[] envelope, String sourceNamespace, String transport) {
+ RedisWireEnvelope decoded = RedisWireEnvelope.decode(envelope);
+ MessageBuilder builder = new MessageBuilder().setOpaqueData(decoded.payload());
+
+ Map dataMap = new LinkedHashMap<>();
+ applyTypedHeaders(dataMap, decoded.headers());
+ dataMap.put("redis.source", new TypedData(sourceNamespace));
+ dataMap.put("redis.transport", new TypedData(transport));
+
+ String contentType = headerValue(decoded.headers(), MAPS_CONTENT_TYPE_HEADER);
+ if (contentType != null) {
+ builder.setContentType(contentType);
+ }
+
+ builder.setDataMap(dataMap);
+ return builder.build();
+ }
+
+ /**
+ * Reads typed headers and raw headers from envelope metadata.
+ *
+ * @param dataMap destination map
+ * @param headers source headers
+ */
+ private void applyTypedHeaders(Map dataMap, Map headers) {
+ Map typeMap = new LinkedHashMap<>();
+ Map valueMap = new LinkedHashMap<>();
+
+ for (Map.Entry entry : headers.entrySet()) {
+ if (entry.getValue() == null) {
+ continue;
+ }
+
+ String headerKey = entry.getKey();
+ String headerValue = new String(entry.getValue(), StandardCharsets.UTF_8);
+
+ if (headerKey.startsWith(MAPS_TYPE_HEADER_PREFIX)) {
+ typeMap.put(headerKey.substring(MAPS_TYPE_HEADER_PREFIX.length()), headerValue);
+ } else if (headerKey.startsWith(MAPS_DATA_HEADER_PREFIX)) {
+ valueMap.put(headerKey.substring(MAPS_DATA_HEADER_PREFIX.length()), headerValue);
+ } else {
+ dataMap.put("redis.header." + headerKey, new TypedData(headerValue));
+ if (headerKey.startsWith("ce_")) {
+ dataMap.put("cloudevents." + headerKey.substring(3), new TypedData(headerValue));
+ }
+ }
+ }
+
+ for (Map.Entry entry : valueMap.entrySet()) {
+ String field = entry.getKey();
+ String typeName = typeMap.get(field);
+ dataMap.put(field, parseTypedValue(typeName, entry.getValue()));
+ }
+ }
+
+ /**
+ * Parses a typed header value into a MAPS {@link TypedData} value.
+ *
+ * @param typeName serialized MAPS type name
+ * @param value string form
+ * @return parsed typed data
+ */
+ private TypedData parseTypedValue(String typeName, String value) {
+ if (typeName == null || typeName.trim().isEmpty()) {
+ return new TypedData(value);
+ }
+
+ String upper = typeName.toUpperCase(Locale.ROOT);
+ try {
+ switch (upper) {
+ case "STRING":
+ return new TypedData(value);
+ case "INT":
+ return new TypedData(Integer.parseInt(value));
+ case "LONG":
+ return new TypedData(Long.parseLong(value));
+ case "FLOAT":
+ return new TypedData(Float.parseFloat(value));
+ case "DOUBLE":
+ return new TypedData(Double.parseDouble(value));
+ case "BOOLEAN":
+ return new TypedData(Boolean.parseBoolean(value));
+ case "SHORT":
+ return new TypedData(Short.parseShort(value));
+ case "BYTE":
+ return new TypedData(Byte.parseByte(value));
+ case "CHAR":
+ return value.isEmpty() ? new TypedData("") : new TypedData(String.valueOf(value.charAt(0)));
+ default:
+ return new TypedData(value);
+ }
+ } catch (RuntimeException ignored) {
+ return new TypedData(value);
+ }
+ }
+
+ /**
+ * Builds outbound headers from MAPS typed data and reserved metadata fields.
+ *
+ * @param message source message
+ * @return header byte map
+ */
+ private Map buildHeaders(Message message) {
+ Map headers = new LinkedHashMap<>();
+
+ if (message.getDataMap() != null) {
+ for (Map.Entry entry : message.getDataMap().entrySet()) {
+ if (entry.getValue() == null || entry.getValue().getData() == null) {
+ continue;
+ }
+
+ String fieldKey = entry.getKey();
+ String typeName = entry.getValue().getType().name();
+ byte[] valueBytes = typedDataToBytes(entry.getValue());
+
+ if (fieldKey.startsWith("redis.header.")) {
+ String rawHeader = fieldKey.substring("redis.header.".length());
+ if (!rawHeader.trim().isEmpty() && valueBytes != null) {
+ headers.put(rawHeader, valueBytes);
+ }
+ }
+
+ headers.put(MAPS_TYPE_HEADER_PREFIX + fieldKey, typeName.getBytes(StandardCharsets.UTF_8));
+ if (valueBytes != null) {
+ headers.put(MAPS_DATA_HEADER_PREFIX + fieldKey, valueBytes);
+ }
+ }
+ }
+
+ if (message.getContentType() != null) {
+ headers.put(MAPS_CONTENT_TYPE_HEADER, message.getContentType().getBytes(StandardCharsets.UTF_8));
+ }
+
+ return headers;
+ }
+
+ /**
+ * Applies CloudEvent headers when a push link is configured for wrap mode.
+ *
+ * @param binding push binding
+ * @param message source message
+ * @param headers mutable header map
+ */
+ private void applyCloudEventHeaders(PushBinding binding, Message message, Map headers) {
+ if (!"wrap".equals(binding.cloudEventMode)) {
+ return;
+ }
+
+ String eventId = getStringFromDataMap(message, "cloudevents.id", UUID.randomUUID().toString());
+ String eventType = getStringFromDataMap(message, "cloudevents.type", binding.cloudEventType);
+ String eventSource = getStringFromDataMap(message, "cloudevents.source", binding.cloudEventSource);
+ String eventSubject = getStringFromDataMap(message, "cloudevents.subject", null);
+ String eventTime = getStringFromDataMap(message, "cloudevents.time", Instant.now().toString());
+
+ headers.put("ce_specversion", "1.0".getBytes(StandardCharsets.UTF_8));
+ headers.put("ce_id", eventId.getBytes(StandardCharsets.UTF_8));
+ headers.put("ce_type", eventType.getBytes(StandardCharsets.UTF_8));
+ headers.put("ce_source", eventSource.getBytes(StandardCharsets.UTF_8));
+ headers.put("ce_time", eventTime.getBytes(StandardCharsets.UTF_8));
+ if (eventSubject != null) {
+ headers.put("ce_subject", eventSubject.getBytes(StandardCharsets.UTF_8));
+ }
+ if (message.getContentType() != null) {
+ headers.put("ce_datacontenttype", message.getContentType().getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ /**
+ * Serializes a typed value to UTF-8 bytes.
+ *
+ * @param typedData source typed data
+ * @return serialized bytes or {@code null}
+ */
+ private byte[] typedDataToBytes(TypedData typedData) {
+ Object data = typedData.getData();
+ if (data == null) {
+ return null;
+ }
+ return data.toString().getBytes(StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Looks up a string value from message data map with a default fallback.
+ *
+ * @param message source message
+ * @param key lookup key
+ * @param defaultValue fallback value
+ * @return resolved value
+ */
+ private String getStringFromDataMap(Message message, String key, String defaultValue) {
+ if (message == null || message.getDataMap() == null || !message.getDataMap().containsKey(key) || message.getDataMap().get(key) == null) {
+ return defaultValue;
+ }
+ Object value = message.getDataMap().get(key).getData();
+ return value == null ? defaultValue : value.toString();
+ }
+
+ /**
+ * Resolves a header value from byte map as UTF-8 text.
+ *
+ * @param headers header map
+ * @param key header key
+ * @return decoded header value or {@code null}
+ */
+ private String headerValue(Map headers, String key) {
+ if (!headers.containsKey(key) || headers.get(key) == null) {
+ return null;
+ }
+ return new String(headers.get(key), StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Enriches inbound messages with pull counters and stream timing metadata.
+ *
+ * @param binding pull binding
+ * @param inboundMessage inbound message
+ */
+ private void enrichInboundMetrics(PullBinding binding, Message inboundMessage) {
+ if (inboundMessage == null || inboundMessage.getDataMap() == null) {
+ return;
+ }
+ inboundMessage.getDataMap().put("redis.pull.received_count", new TypedData(binding.receivedCount.get()));
+ inboundMessage.getDataMap().put("redis.pull.error_count", new TypedData(binding.errorCount.get()));
+ inboundMessage.getDataMap().put("redis.pull.reconnect_count", new TypedData(binding.reconnectCount.get()));
+ inboundMessage.getDataMap().put("redis.stream.pending_count", new TypedData(binding.lastPendingCount.get()));
+ inboundMessage.getDataMap().put("redis.stream.ack_latency_ms", new TypedData(binding.lastAckLatencyMs.get()));
+ }
+
+ /**
+ * Publishes periodic pull metrics as MAPS inbound events.
+ */
+ private void publishPullMetrics() {
+ if (!metricsEnabled) {
+ return;
+ }
+
+ for (PullBinding binding : pullBindings.values()) {
+ try {
+ long reconnectCount = binding.reconnectCount.get();
+ long reconnectDelta = reconnectCount - binding.lastPublishedReconnectCount;
+ boolean reconnectAlert = reconnectDelta >= alertReconnectDeltaThreshold;
+ boolean pendingAlert = binding.lastPendingCount.get() >= alertPendingThreshold;
+
+ Map map = new LinkedHashMap<>();
+ map.put("redis.pull.remote_namespace", new TypedData(binding.remoteNamespace));
+ map.put("redis.pull.local_namespace", new TypedData(binding.localNamespace));
+ map.put("redis.pull.mode", new TypedData(binding.mode));
+ map.put("redis.pull.received_count", new TypedData(binding.receivedCount.get()));
+ map.put("redis.pull.error_count", new TypedData(binding.errorCount.get()));
+ map.put("redis.pull.reconnect_count", new TypedData(reconnectCount));
+ map.put("redis.pull.reconnect_delta", new TypedData(reconnectDelta));
+ map.put("redis.stream.pending_count", new TypedData(binding.lastPendingCount.get()));
+ map.put("redis.stream.ack_latency_ms", new TypedData(binding.lastAckLatencyMs.get()));
+ map.put("redis.alert.pending", new TypedData(pendingAlert));
+ map.put("redis.alert.reconnect", new TypedData(reconnectAlert));
+ map.put("redis.alert.pending_threshold", new TypedData(alertPendingThreshold));
+ map.put("redis.alert.reconnect_delta_threshold", new TypedData(alertReconnectDeltaThreshold));
+
+ String payload = "{\"remote\":\"" + binding.remoteNamespace + "\",\"mode\":\"" + binding.mode + "\"}";
+ Message metricsMessage = new MessageBuilder()
+ .setDataMap(map)
+ .setOpaqueData(payload.getBytes(StandardCharsets.UTF_8))
+ .setContentType("application/json")
+ .build();
+
+ inbound(metricsNamespace + "/" + binding.remoteNamespace, metricsMessage);
+ binding.lastPublishedReconnectCount = reconnectCount;
+ } catch (Throwable e) {
+ logger.log(RedisLogMessages.REDIS_CONSUMER_ERROR, binding.remoteNamespace, e);
+ }
+ }
+ }
+
+ /**
+ * Resolves merged top-level and per-link attributes for the given endpoint.
+ *
+ * @param endpointNamespace local or remote namespace key
+ * @param direction link direction
+ * @param lookupByLocalNamespace {@code true} to match by local namespace
+ * @return merged attribute map
+ */
+ private Map findLinkAttributes(String endpointNamespace, String direction, boolean lookupByLocalNamespace) {
+ Map attributes = new LinkedHashMap<>();
+
+ if (protocolConfig == null || protocolConfig.getConfig() == null) {
+ return attributes;
+ }
+
+ Object linksObj = protocolConfig.getConfig().get("links");
+ if (linksObj instanceof List) {
+ List> links = (List>) linksObj;
+ for (Object linkObj : links) {
+ if (!(linkObj instanceof Map)) {
+ continue;
+ }
+ Map, ?> link = (Map, ?>) linkObj;
+ String linkDirection = valueAsString(link.get("direction"), "");
+ if (!direction.equalsIgnoreCase(linkDirection)) {
+ continue;
+ }
+
+ String matchField = lookupByLocalNamespace ? "local_namespace" : "remote_namespace";
+ String linkNamespace = valueAsString(link.get(matchField), null);
+ if (!Objects.equals(endpointNamespace, linkNamespace)) {
+ continue;
+ }
+
+ for (Map.Entry, ?> entry : link.entrySet()) {
+ attributes.put(String.valueOf(entry.getKey()), entry.getValue());
+ }
+ break;
+ }
+ }
+
+ Map topLevel = protocolConfig.getConfig();
+ Optional.ofNullable(topLevel.get(REDIS_MODE)).ifPresent(v -> attributes.putIfAbsent(REDIS_MODE, v));
+ Optional.ofNullable(topLevel.get(CLOUD_EVENT_MODE)).ifPresent(v -> attributes.putIfAbsent(CLOUD_EVENT_MODE, v));
+ Optional.ofNullable(topLevel.get(CLOUD_EVENT_TYPE)).ifPresent(v -> attributes.putIfAbsent(CLOUD_EVENT_TYPE, v));
+ Optional.ofNullable(topLevel.get(CLOUD_EVENT_SOURCE)).ifPresent(v -> attributes.putIfAbsent(CLOUD_EVENT_SOURCE, v));
+ Optional.ofNullable(topLevel.get(REDIS_STREAM_GROUP)).ifPresent(v -> attributes.putIfAbsent(REDIS_STREAM_GROUP, v));
+ Optional.ofNullable(topLevel.get(REDIS_STREAM_CONSUMER)).ifPresent(v -> attributes.putIfAbsent(REDIS_STREAM_CONSUMER, v));
+ Optional.ofNullable(topLevel.get(REDIS_STREAM_POLL_MS)).ifPresent(v -> attributes.putIfAbsent(REDIS_STREAM_POLL_MS, v));
+ Optional.ofNullable(topLevel.get(REDIS_STREAM_MIN_POLL_MS)).ifPresent(v -> attributes.putIfAbsent(REDIS_STREAM_MIN_POLL_MS, v));
+ Optional.ofNullable(topLevel.get(REDIS_STREAM_BATCH_SIZE)).ifPresent(v -> attributes.putIfAbsent(REDIS_STREAM_BATCH_SIZE, v));
+ Optional.ofNullable(topLevel.get(REDIS_STREAM_PENDING_SAMPLE_EVERY)).ifPresent(v -> attributes.putIfAbsent(REDIS_STREAM_PENDING_SAMPLE_EVERY, v));
+ Optional.ofNullable(topLevel.get(REDIS_RECONNECT_INITIAL_MS)).ifPresent(v -> attributes.putIfAbsent(REDIS_RECONNECT_INITIAL_MS, v));
+ Optional.ofNullable(topLevel.get(REDIS_RECONNECT_MAX_MS)).ifPresent(v -> attributes.putIfAbsent(REDIS_RECONNECT_MAX_MS, v));
+
+ return attributes;
+ }
+
+ /**
+ * Normalizes and validates redis link mode.
+ *
+ * @param modeValue config value
+ * @param destination link destination
+ * @return normalized mode
+ */
+ private String parseMode(Object modeValue, String destination) {
+ String mode = valueAsString(modeValue, "pubsub").toLowerCase(Locale.ROOT);
+ if (!"pubsub".equals(mode) && !"stream".equals(mode)) {
+ logger.log(RedisLogMessages.REDIS_CONFIGURATION_WARNING, "Invalid redis.mode " + mode + " on " + destination + ", defaulting to pubsub");
+ return "pubsub";
+ }
+ return mode;
+ }
+
+ /**
+ * Parses a long with default fallback.
+ *
+ * @param value config value
+ * @param defaultValue fallback value
+ * @return parsed long or fallback
+ */
+ private long parseLong(Object value, long defaultValue) {
+ if (value == null) {
+ return defaultValue;
+ }
+ try {
+ return Long.parseLong(String.valueOf(value));
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ /**
+ * Parses a boolean with default fallback.
+ *
+ * @param value config value
+ * @param defaultValue fallback value
+ * @return parsed boolean
+ */
+ private boolean parseBoolean(Object value, boolean defaultValue) {
+ if (value == null) {
+ return defaultValue;
+ }
+ return Boolean.parseBoolean(String.valueOf(value));
+ }
+
+ /**
+ * Returns current protocol config map.
+ *
+ * @return config map, never {@code null}
+ */
+ private Map currentConfig() {
+ if (protocolConfig == null || protocolConfig.getConfig() == null) {
+ return new LinkedHashMap<>();
+ }
+ return protocolConfig.getConfig();
+ }
+
+ /**
+ * Converts nullable object to trimmed string with default fallback.
+ *
+ * @param value source value
+ * @param defaultValue fallback
+ * @return normalized string
+ */
+ private String valueAsString(Object value, String defaultValue) {
+ if (value == null) {
+ return defaultValue;
+ }
+ String str = String.valueOf(value).trim();
+ return str.isEmpty() ? defaultValue : str;
+ }
+
+ /**
+ * Closes any closeable instance and ignores close errors.
+ *
+ * @param closeable target resource
+ */
+ private void closeQuietly(AutoCloseable closeable) {
+ if (closeable == null) {
+ return;
+ }
+ try {
+ closeable.close();
+ } catch (Exception ignored) {
+ // no-op
+ }
+ }
+
+ /**
+ * Outbound link binding metadata.
+ */
+ private static class PushBinding {
+ private String localNamespace;
+ private String remoteNamespace;
+ private String mode;
+ private String cloudEventMode;
+ private String cloudEventType;
+ private String cloudEventSource;
+ }
+
+ /**
+ * Inbound link binding metadata and runtime state.
+ */
+ private static class PullBinding {
+ private String localNamespace;
+ private String remoteNamespace;
+ private String mode;
+
+ private String streamGroup;
+ private String streamConsumer;
+ private long streamPollMs;
+ private long streamMinPollMs;
+ private int streamBatchSize;
+ private int streamPendingSampleEvery;
+ private long reconnectInitialMs;
+ private long reconnectMaxMs;
+ private long currentReconnectMs;
+ private long nextReconnectAttemptAtMs;
+ private long lastPublishedReconnectCount;
+
+ private RedisClient streamClient;
+ private StatefulRedisConnection streamConnection;
+ private RedisCommands streamCommands;
+ private ScheduledFuture> streamTask;
+
+ private final AtomicLong receivedCount = new AtomicLong();
+ private final AtomicLong errorCount = new AtomicLong();
+ private final AtomicLong reconnectCount = new AtomicLong();
+ private final AtomicLong lastPendingCount = new AtomicLong();
+ private final AtomicLong lastAckLatencyMs = new AtomicLong();
+ }
+
+ /**
+ * Poller that consumes Redis stream entries for a specific binding.
+ */
+ private class StreamPoller implements Runnable {
+ private final PullBinding binding;
+
+ /**
+ * Creates a stream poller for one pull binding.
+ *
+ * @param binding pull binding
+ */
+ private StreamPoller(PullBinding binding) {
+ this.binding = binding;
+ }
+
+ /**
+ * Polls, dispatches and acknowledges stream messages.
+ */
+ @Override
+ public void run() {
+ try {
+ tryReconnectStreamBinding(binding);
+ if (binding.streamCommands == null) {
+ return;
+ }
+
+ List> records = binding.streamCommands.xreadgroup(
+ Consumer.from(binding.streamGroup, binding.streamConsumer),
+ XReadArgs.Builder.block(Duration.ofMillis(Math.max(binding.streamMinPollMs, binding.streamPollMs))).count(binding.streamBatchSize),
+ XReadArgs.StreamOffset.lastConsumed(binding.remoteNamespace)
+ );
+
+ if (records == null || records.isEmpty()) {
+ return;
+ }
+
+ for (StreamMessage record : records) {
+ byte[] envelope = record.getBody().get("maps");
+ if (envelope == null) {
+ continue;
+ }
+
+ try {
+ long start = System.nanoTime();
+ Message inboundMessage = convertInboundEnvelope(envelope, binding.remoteNamespace, "stream");
+ binding.receivedCount.incrementAndGet();
+ enrichInboundMetrics(binding, inboundMessage);
+ inbound(binding.localNamespace, inboundMessage);
+ binding.streamCommands.xack(binding.remoteNamespace, binding.streamGroup, record.getId());
+ long ackLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ binding.lastAckLatencyMs.set(ackLatencyMs);
+ } catch (Throwable e) {
+ binding.errorCount.incrementAndGet();
+ logger.log(RedisLogMessages.REDIS_FAILED_TO_PROCESS_INCOMING_EVENT, binding.remoteNamespace, e);
+ }
+ }
+ if ((binding.receivedCount.get() % binding.streamPendingSampleEvery) == 0L) {
+ PendingMessages pending = binding.streamCommands.xpending(binding.remoteNamespace, binding.streamGroup);
+ if (pending != null) {
+ binding.lastPendingCount.set(pending.getCount());
+ }
+ }
+ } catch (Throwable e) {
+ binding.errorCount.incrementAndGet();
+ markStreamDisconnected(binding);
+ logger.log(RedisLogMessages.REDIS_CONSUMER_ERROR, binding.remoteNamespace, e);
+ }
+ }
+ }
+
+ /**
+ * Immutable snapshot of pull counters for testing and diagnostics.
+ */
+ static final class PullStats {
+ final long receivedCount;
+ final long errorCount;
+ final long reconnectCount;
+ final long pendingCount;
+ final long ackLatencyMs;
+
+ /**
+ * Creates a pull stats snapshot.
+ *
+ * @param receivedCount delivered inbound message count
+ * @param errorCount processing or connectivity errors
+ * @param reconnectCount reconnect attempts that succeeded
+ * @param pendingCount current pending stream entries
+ * @param ackLatencyMs latest ack latency
+ */
+ private PullStats(long receivedCount, long errorCount, long reconnectCount, long pendingCount, long ackLatencyMs) {
+ this.receivedCount = receivedCount;
+ this.errorCount = errorCount;
+ this.reconnectCount = reconnectCount;
+ this.pendingCount = pendingCount;
+ this.ackLatencyMs = ackLatencyMs;
+ }
+ }
+
+ /**
+ * Outbound bridge abstraction to allow production and test transports.
+ */
+ interface RedisBridge extends AutoCloseable {
+ /**
+ * Publishes to a Redis pub/sub channel.
+ *
+ * @param channel channel name
+ * @param envelope encoded payload
+ */
+ void publish(String channel, byte[] envelope);
+
+ /**
+ * Appends to a Redis stream.
+ *
+ * @param stream stream name
+ * @param envelope encoded payload
+ */
+ void appendStream(String stream, byte[] envelope);
+
+ /**
+ * Closes underlying resources.
+ */
+ @Override
+ void close();
+ }
+
+ /**
+ * Lettuce-backed Redis bridge implementation.
+ */
+ static class LettuceRedisBridge implements RedisBridge {
+ private final RedisClient redisClient;
+ private final StatefulRedisConnection connection;
+ private final RedisCommands commands;
+
+ /**
+ * Opens a synchronous Redis connection for outbound operations.
+ *
+ * @param redisUrl Redis endpoint URL
+ */
+ LettuceRedisBridge(String redisUrl) {
+ this.redisClient = RedisClient.create(redisUrl);
+ this.connection = redisClient.connect(STRING_BYTE_CODEC);
+ this.commands = connection.sync();
+ }
+
+ /**
+ * Publishes encoded envelopes to pub/sub.
+ *
+ * @param channel channel name
+ * @param envelope encoded payload
+ */
+ @Override
+ public void publish(String channel, byte[] envelope) {
+ commands.publish(channel, envelope);
+ }
+
+ /**
+ * Writes encoded envelopes to streams using the {@code maps} field.
+ *
+ * @param stream stream name
+ * @param envelope encoded payload
+ */
+ @Override
+ public void appendStream(String stream, byte[] envelope) {
+ Map body = new LinkedHashMap<>();
+ body.put("maps", envelope);
+ commands.xadd(stream, body);
+ }
+
+ /**
+ * Closes the connection and client.
+ */
+ @Override
+ public void close() {
+ connection.close();
+ redisClient.shutdown();
+ }
+ }
+}
diff --git a/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocolFactory.java b/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocolFactory.java
new file mode 100644
index 0000000..76c91e6
--- /dev/null
+++ b/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocolFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.mapsmessaging.network.protocol.impl.redis;
+
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.network.io.EndPoint;
+import io.mapsmessaging.network.io.Packet;
+import io.mapsmessaging.network.protocol.Protocol;
+import io.mapsmessaging.network.protocol.ProtocolImplFactory;
+import io.mapsmessaging.network.protocol.detection.NoOpDetection;
+import io.mapsmessaging.network.protocol.impl.extension.ExtensionEndPoint;
+import io.mapsmessaging.network.protocol.impl.extension.ExtensionProtocol;
+
+import java.io.IOException;
+
+/**
+ * Factory that registers and instantiates the Redis extension protocol.
+ */
+public class RedisProtocolFactory extends ProtocolImplFactory {
+
+ /**
+ * Builds the Redis protocol factory metadata used by plugin discovery.
+ */
+ public RedisProtocolFactory() {
+ super("redis", "Provides a connection to Redis Pub/Sub and Streams", new NoOpDetection());
+ }
+
+ /**
+ * Creates a connected {@link ExtensionProtocol} wrapper around {@link RedisProtocol}.
+ *
+ * @param endPoint end point that contains extension configuration
+ * @param sessionId negotiated session id
+ * @param username authentication username
+ * @param password authentication password
+ * @return connected protocol instance
+ * @throws IOException if protocol connection fails
+ */
+ @Override
+ public Protocol connect(EndPoint endPoint, String sessionId, String username, String password) throws IOException {
+ ExtensionConfigDTO protocolConfigDTO = (ExtensionConfigDTO) ((ExtensionEndPoint) endPoint).config();
+ Protocol protocol = new ExtensionProtocol(endPoint, new RedisProtocol(endPoint, protocolConfigDTO));
+ protocol.connect(sessionId, username, password);
+ return protocol;
+ }
+
+ /**
+ * No-op because this extension does not accept raw inbound socket packets.
+ *
+ * @param endPoint connection endpoint
+ * @param packet inbound packet
+ */
+ @Override
+ public void create(EndPoint endPoint, Packet packet) {
+ // This extension does not accept inbound Redis client sockets.
+ }
+
+ /**
+ * Returns an empty transport hint because Redis uses its own client connection model.
+ *
+ * @return empty transport type
+ */
+ @Override
+ public String getTransportType() {
+ return "";
+ }
+}
diff --git a/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisWireEnvelope.java b/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisWireEnvelope.java
new file mode 100644
index 0000000..e248f77
--- /dev/null
+++ b/redis-extension/src/main/java/io/mapsmessaging/network/protocol/impl/redis/RedisWireEnvelope.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package io.mapsmessaging.network.protocol.impl.redis;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Binary wire envelope used to preserve payload and header bytes over Redis.
+ */
+final class RedisWireEnvelope {
+
+ private static final int MAGIC = 0x4D4D5231;
+
+ private final byte[] payload;
+ private final Map headers;
+
+ /**
+ * Creates an immutable envelope view for payload and headers.
+ *
+ * @param payload message payload bytes
+ * @param headers message headers as raw bytes
+ */
+ private RedisWireEnvelope(byte[] payload, Map headers) {
+ this.payload = payload == null ? new byte[0] : payload;
+ this.headers = headers == null ? new LinkedHashMap<>() : headers;
+ }
+
+ static RedisWireEnvelope of(byte[] payload, Map headers) {
+ return new RedisWireEnvelope(payload, headers);
+ }
+
+ /**
+ * Returns the payload bytes.
+ *
+ * @return payload bytes, never {@code null}
+ */
+ byte[] payload() {
+ return payload;
+ }
+
+ /**
+ * Returns header bytes keyed by header name.
+ *
+ * @return header map, never {@code null}
+ */
+ Map headers() {
+ return headers;
+ }
+
+ /**
+ * Encodes this envelope to a compact binary format.
+ *
+ * @return encoded bytes
+ */
+ byte[] encode() {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos);
+ out.writeInt(MAGIC);
+ out.writeInt(headers.size());
+ for (Map.Entry entry : headers.entrySet()) {
+ writeBytes(out, entry.getKey().getBytes(StandardCharsets.UTF_8));
+ writeBytes(out, entry.getValue() == null ? new byte[0] : entry.getValue());
+ }
+ writeBytes(out, payload);
+ out.flush();
+ return baos.toByteArray();
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to encode redis wire envelope", e);
+ }
+ }
+
+ static RedisWireEnvelope decode(byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ return new RedisWireEnvelope(new byte[0], new LinkedHashMap<>());
+ }
+
+ try {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
+ int magic = in.readInt();
+ if (magic != MAGIC) {
+ throw new IllegalArgumentException("Invalid redis envelope magic");
+ }
+
+ int headerCount = in.readInt();
+ if (headerCount < 0 || headerCount > 65536) {
+ throw new IllegalArgumentException("Invalid header count " + headerCount);
+ }
+
+ Map headers = new LinkedHashMap<>();
+ for (int i = 0; i < headerCount; i++) {
+ byte[] keyBytes = readBytes(in);
+ byte[] valueBytes = readBytes(in);
+ headers.put(new String(keyBytes, StandardCharsets.UTF_8), valueBytes);
+ }
+
+ byte[] payload = readBytes(in);
+ return new RedisWireEnvelope(payload, headers);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to decode redis wire envelope", e);
+ }
+ }
+
+ /**
+ * Writes a length-prefixed byte array to the output stream.
+ *
+ * @param out output stream
+ * @param value byte array to write
+ * @throws IOException if write fails
+ */
+ private static void writeBytes(DataOutputStream out, byte[] value) throws IOException {
+ out.writeInt(value.length);
+ out.write(value);
+ }
+
+ /**
+ * Reads a length-prefixed byte array from the input stream with safety limits.
+ *
+ * @param in input stream
+ * @return decoded byte array
+ * @throws IOException if read fails
+ */
+ private static byte[] readBytes(DataInputStream in) throws IOException {
+ int len = in.readInt();
+ if (len < 0 || len > (32 * 1024 * 1024)) {
+ throw new IllegalArgumentException("Invalid payload length " + len);
+ }
+ byte[] value = new byte[len];
+ in.readFully(value);
+ return value;
+ }
+}
diff --git a/redis-extension/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory b/redis-extension/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory
new file mode 100644
index 0000000..08d6595
--- /dev/null
+++ b/redis-extension/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory
@@ -0,0 +1 @@
+io.mapsmessaging.network.protocol.impl.redis.RedisProtocolFactory
diff --git a/redis-extension/src/main/resources/NetworkConnectionManager-example.yaml b/redis-extension/src/main/resources/NetworkConnectionManager-example.yaml
new file mode 100644
index 0000000..4fced68
--- /dev/null
+++ b/redis-extension/src/main/resources/NetworkConnectionManager-example.yaml
@@ -0,0 +1,73 @@
+---
+NetworkConnectionManager:
+ global:
+
+ data:
+ -
+ name: local_redis
+ url: "redis://localhost:6379/"
+ protocol: redis
+ plugin: true
+ config:
+ redis.mode: "pubsub"
+ redis.reconnect.initial_ms: 250
+ redis.reconnect.max_ms: 5000
+ redis.metrics.enabled: true
+ redis.metrics.namespace: "/metrics/redis/pull"
+ redis.metrics.publish_ms: 5000
+ redis.alert.pending_threshold: 100
+ redis.alert.reconnect_delta_threshold: 1
+ # Optional stream defaults applied unless overridden per link:
+ # redis.stream.min_poll_ms: 100
+ # redis.stream.pending_sample_every: 1
+
+ remote:
+ sessionId: redis-01
+ username: maps
+ password: maps
+
+ links:
+ -
+ direction: push
+ local_namespace: "/redis/outbound/raw"
+ remote_namespace: "events.raw"
+ redis.mode: "pubsub"
+
+ -
+ direction: pull
+ remote_namespace: "events.inbound"
+ local_namespace: "/redis/inbound/raw"
+ redis.mode: "pubsub"
+
+ -
+ direction: push
+ local_namespace: "/redis/outbound/cloudevent"
+ remote_namespace: "events.cloudevents"
+ redis.mode: "stream"
+ cloud_event.mode: "wrap"
+ cloud_event.type: "io.maps.redis.event"
+ cloud_event.source: "/maps/redis"
+
+ -
+ direction: pull
+ remote_namespace: "stream.inbound"
+ local_namespace: "/redis/inbound/stream"
+ redis.mode: "stream"
+ redis.stream.group: "maps-redis-group"
+ redis.stream.consumer: "maps-redis-consumer"
+ redis.stream.poll_ms: 500
+ redis.stream.min_poll_ms: 100
+ redis.stream.batch_size: 32
+ redis.stream.pending_sample_every: 1
+
+ -
+ direction: pull
+ remote_namespace: "stream.inbound.fast"
+ local_namespace: "/redis/inbound/stream/fast"
+ redis.mode: "stream"
+ redis.stream.group: "maps-redis-group-fast"
+ redis.stream.consumer: "maps-redis-consumer-fast"
+ redis.stream.poll_ms: 25
+ redis.stream.min_poll_ms: 1
+ redis.stream.batch_size: 256
+ redis.stream.pending_sample_every: 20
diff --git a/redis-extension/src/test/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocolPerformanceBaselineTest.java b/redis-extension/src/test/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocolPerformanceBaselineTest.java
new file mode 100644
index 0000000..7176481
--- /dev/null
+++ b/redis-extension/src/test/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocolPerformanceBaselineTest.java
@@ -0,0 +1,472 @@
+package io.mapsmessaging.network.protocol.impl.redis;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.codec.ByteArrayCodec;
+import io.lettuce.core.codec.RedisCodec;
+import io.lettuce.core.codec.StringCodec;
+import io.mapsmessaging.api.message.Message;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * On-demand performance baseline tests for Redis extension paths.
+ */
+@Testcontainers
+@Tag("perf")
+class RedisProtocolPerformanceBaselineTest {
+
+ private static final RedisCodec STRING_BYTE_CODEC = RedisCodec.of(StringCodec.UTF8, ByteArrayCodec.INSTANCE);
+
+ @Container
+ @SuppressWarnings("resource")
+ static final GenericContainer> REDIS = new GenericContainer<>("redis:7.2-alpine")
+ .withExposedPorts(6379);
+
+ private RedisProtocol protocol;
+
+ /**
+ * Closes protocol resources between test runs.
+ *
+ * @throws Exception if shutdown fails
+ */
+ @AfterEach
+ void tearDown() throws Exception {
+ if (protocol != null) {
+ protocol.close();
+ }
+ }
+
+ /**
+ * Measures stream pull throughput baseline for a fixed-size burst.
+ *
+ * @throws Exception if setup or assertions fail
+ */
+ @Test
+ void baseline_streamPullThroughput() throws Exception {
+ runStreamThroughputBaseline("stream throughput small", 5_000, 64, 128, 25, 1, 20, 30);
+ }
+
+ /**
+ * Measures stream throughput baseline with medium payloads and larger volume.
+ *
+ * @throws Exception if setup or assertions fail
+ */
+ @Test
+ void baseline_streamPullThroughput_MediumPayload() throws Exception {
+ runStreamThroughputBaseline("stream throughput medium", 10_000, 512, 128, 25, 1, 20, 45);
+ }
+
+ /**
+ * Measures stream throughput baseline with large payloads.
+ *
+ * @throws Exception if setup or assertions fail
+ */
+ @Test
+ void baseline_streamPullThroughput_LargePayload() throws Exception {
+ runStreamThroughputBaseline("stream throughput large", 5_000, 4096, 128, 25, 1, 20, 45);
+ }
+
+ /**
+ * Measures stream throughput with a higher batch size to validate scheduler/batch ceilings.
+ *
+ * @throws Exception if setup or assertions fail
+ */
+ @Test
+ void baseline_streamPullThroughput_HighBatch() throws Exception {
+ runStreamThroughputBaseline("stream throughput high-batch", 12_000, 512, 512, 25, 1, 20, 45);
+ }
+
+ /**
+ * Measures stream throughput with default minimum poll guard to expose scheduler ceiling behavior.
+ *
+ * @throws Exception if setup or assertions fail
+ */
+ @Test
+ void baseline_streamPullThroughput_DefaultMinPollGuard() throws Exception {
+ runStreamThroughputBaseline("stream throughput default guard", 6_000, 512, 128, 50, 100, 1, 45);
+ }
+
+ /**
+ * Measures pub/sub pull latency percentiles for baseline tracking.
+ *
+ * @throws Exception if setup or assertions fail
+ */
+ @Test
+ void baseline_pubSubPullLatency() throws Exception {
+ runPubSubLatencyBaseline("pubsub latency small", 2_000, 64, 30);
+ }
+
+ /**
+ * Measures pub/sub latency baseline with medium payloads and higher count.
+ *
+ * @throws Exception if setup or assertions fail
+ */
+ @Test
+ void baseline_pubSubPullLatency_MediumPayload() throws Exception {
+ runPubSubLatencyBaseline("pubsub latency medium", 5_000, 512, 45);
+ }
+
+ /**
+ * Measures pub/sub latency baseline with large payloads.
+ *
+ * @throws Exception if setup or assertions fail
+ */
+ @Test
+ void baseline_pubSubPullLatency_LargePayload() throws Exception {
+ runPubSubLatencyBaseline("pubsub latency large", 3_000, 4096, 45);
+ }
+
+ /**
+ * Measures recovery time after a forced stream consumer disconnect.
+ *
+ * @throws Exception if setup or assertions fail
+ */
+ @Test
+ void baseline_streamReconnectRecoveryTime() throws Exception {
+ String redisUrl = redisUrl();
+ String remoteNamespace = "perf.stream.in.reconnect";
+
+ CapturingRedisProtocol capturing = new CapturingRedisProtocol(
+ redisUrl,
+ createPullStreamPerfConfig(remoteNamespace, "/perf/maps/in/stream/reconnect", 128, 50, 1, 1)
+ );
+ protocol = capturing;
+ protocol.initialise();
+ protocol.registerRemoteLink(remoteNamespace, null);
+
+ RedisClient client = RedisClient.create(redisUrl);
+ try (StatefulRedisConnection connection = client.connect(STRING_BYTE_CODEC)) {
+ RedisCommands commands = connection.sync();
+
+ capturing.forceCloseStreamConnectionForTest(remoteNamespace);
+ long startNs = System.nanoTime();
+
+ Map headers = new LinkedHashMap<>();
+ headers.put("maps.type.seq", "INT".getBytes(StandardCharsets.UTF_8));
+ headers.put("maps.data.seq", "1".getBytes(StandardCharsets.UTF_8));
+ commands.xadd(remoteNamespace, Map.of("maps", RedisWireEnvelope.of("reconnect".getBytes(StandardCharsets.UTF_8), headers).encode()));
+
+ capturing.awaitCount(1, 15);
+ long recoveryMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs);
+
+ RedisProtocol.PullStats stats = capturing.pullStatsForTest(remoteNamespace);
+ assertTrue(stats != null && stats.reconnectCount >= 1, "Expected at least one reconnect");
+ assertTrue(recoveryMs < 10_000, "Reconnect recovery too slow: " + recoveryMs + " ms");
+
+ System.out.printf("[perf] stream reconnect recovery baseline: recovery=%dms reconnects=%d%n",
+ recoveryMs, stats.reconnectCount);
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Executes one stream throughput baseline variant.
+ *
+ * @param label scenario label
+ * @param messageCount number of messages
+ * @param payloadSize payload size in bytes
+ * @param batchSize stream batch size
+ * @param pollMs stream poll interval in ms
+ * @param minPollMs minimum effective poll interval in ms
+ * @param pendingSampleEvery sample pending depth every N consumed messages
+ * @param timeoutSeconds await timeout
+ * @throws Exception if setup or assertions fail
+ */
+ private void runStreamThroughputBaseline(
+ String label,
+ int messageCount,
+ int payloadSize,
+ int batchSize,
+ int pollMs,
+ int minPollMs,
+ int pendingSampleEvery,
+ int timeoutSeconds) throws Exception {
+ String redisUrl = redisUrl();
+
+ String remoteNamespace = "perf.stream.in." + Math.abs(label.hashCode());
+ String localNamespace = "/perf/maps/in/stream/" + Math.abs(label.hashCode());
+ CapturingRedisProtocol capturing = new CapturingRedisProtocol(
+ redisUrl,
+ createPullStreamPerfConfig(remoteNamespace, localNamespace, batchSize, pollMs, minPollMs, pendingSampleEvery)
+ );
+ protocol = capturing;
+ protocol.initialise();
+ protocol.registerRemoteLink(remoteNamespace, null);
+
+ RedisClient client = RedisClient.create(redisUrl);
+ try (StatefulRedisConnection connection = client.connect(STRING_BYTE_CODEC)) {
+ RedisCommands commands = connection.sync();
+
+ long startNs = System.nanoTime();
+ for (int i = 0; i < messageCount; i++) {
+ Map headers = new LinkedHashMap<>();
+ headers.put("maps.type.seq", "INT".getBytes(StandardCharsets.UTF_8));
+ headers.put("maps.data.seq", Integer.toString(i).getBytes(StandardCharsets.UTF_8));
+ commands.xadd(remoteNamespace, Map.of("maps", RedisWireEnvelope.of(payloadFor(i, payloadSize), headers).encode()));
+ }
+
+ List captures = capturing.awaitCount(messageCount, timeoutSeconds);
+ long durationNs = System.nanoTime() - startNs;
+ double seconds = durationNs / 1_000_000_000.0;
+ double throughput = messageCount / seconds;
+ double mibPerSecond = ((double) messageCount * payloadSize) / (1024.0 * 1024.0) / seconds;
+ double effectivePollMs = Math.max(minPollMs, pollMs);
+ double theoreticalMax = batchSize / (effectivePollMs / 1000.0);
+
+ assertEquals(messageCount, captures.size());
+ assertTrue(throughput > 100.0, "Throughput baseline unexpectedly low: " + throughput + " msg/s");
+ assertTrue(throughput <= (theoreticalMax * 1.30), "Throughput exceeds expected ceiling too far: " + throughput + " msg/s");
+
+ System.out.printf(
+ "[perf] %s: messages=%d payload=%dB batch=%d pollMs=%d minPollMs=%d pendingSampleEvery=%d duration=%.3fs throughput=%.1f msg/s (%.2f MiB/s) theoreticalCeiling=%.1f msg/s%n",
+ label, messageCount, payloadSize, batchSize, pollMs, minPollMs, pendingSampleEvery, seconds, throughput, mibPerSecond, theoreticalMax);
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Executes one pub/sub latency baseline variant.
+ *
+ * @param label scenario label
+ * @param messageCount number of messages
+ * @param payloadSize payload size in bytes
+ * @param timeoutSeconds await timeout
+ * @throws Exception if setup or assertions fail
+ */
+ private void runPubSubLatencyBaseline(String label, int messageCount, int payloadSize, int timeoutSeconds) throws Exception {
+ String redisUrl = redisUrl();
+
+ CapturingRedisProtocol capturing = new CapturingRedisProtocol(redisUrl, createPullPubSubPerfConfig());
+ protocol = capturing;
+ protocol.initialise();
+ protocol.registerRemoteLink("perf.pubsub.in", null);
+
+ RedisClient client = RedisClient.create(redisUrl);
+ try (StatefulRedisConnection connection = client.connect(STRING_BYTE_CODEC)) {
+ RedisCommands commands = connection.sync();
+
+ for (int i = 0; i < messageCount; i++) {
+ long sentNs = System.nanoTime();
+ Map headers = new LinkedHashMap<>();
+ headers.put("maps.type.sent_ns", "LONG".getBytes(StandardCharsets.UTF_8));
+ headers.put("maps.data.sent_ns", Long.toString(sentNs).getBytes(StandardCharsets.UTF_8));
+ headers.put("maps.type.seq", "INT".getBytes(StandardCharsets.UTF_8));
+ headers.put("maps.data.seq", Integer.toString(i).getBytes(StandardCharsets.UTF_8));
+ commands.publish("perf.pubsub.in", RedisWireEnvelope.of(payloadFor(i, payloadSize), headers).encode());
+ }
+
+ List captures = capturing.awaitCount(messageCount, timeoutSeconds);
+ assertEquals(messageCount, captures.size());
+
+ List latenciesMs = new ArrayList<>(messageCount);
+ for (InboundCapture capture : captures) {
+ Object sentNsObj = capture.message.getDataMap().get("sent_ns").getData();
+ long sentNs = Long.parseLong(String.valueOf(sentNsObj));
+ latenciesMs.add((capture.receivedAtNs - sentNs) / 1_000_000.0);
+ }
+
+ Collections.sort(latenciesMs);
+ double p50 = percentile(latenciesMs, 50);
+ double p95 = percentile(latenciesMs, 95);
+ double p99 = percentile(latenciesMs, 99);
+
+ assertTrue(p99 < 2000.0, "p99 latency unexpectedly high: " + p99 + " ms");
+ System.out.printf("[perf] %s: count=%d payload=%dB p50=%.2fms p95=%.2fms p99=%.2fms%n",
+ label, messageCount, payloadSize, p50, p95, p99);
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Builds deterministic payload bytes for a sequence id and size.
+ *
+ * @param sequence sequence index
+ * @param payloadSize target size in bytes
+ * @return payload bytes
+ */
+ private byte[] payloadFor(int sequence, int payloadSize) {
+ byte[] payload = new byte[payloadSize];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = (byte) ((sequence + i) & 0x7F);
+ }
+ return payload;
+ }
+
+ /**
+ * Builds Redis URL from container mapping.
+ *
+ * @return redis URL
+ */
+ private String redisUrl() {
+ return "redis://" + REDIS.getHost() + ":" + REDIS.getMappedPort(6379) + "/";
+ }
+
+ /**
+ * Creates stream pull performance configuration for one scenario.
+ *
+ * @param remoteNamespace remote stream namespace
+ * @param localNamespace mapped local namespace
+ * @param batchSize stream batch size
+ * @param pollMs stream poll interval in ms
+ * @param minPollMs minimum effective poll interval in ms
+ * @param pendingSampleEvery sample pending depth every N consumed messages
+ * @return stream pull performance config
+ */
+ private Map createPullStreamPerfConfig(
+ String remoteNamespace,
+ String localNamespace,
+ int batchSize,
+ int pollMs,
+ int minPollMs,
+ int pendingSampleEvery) {
+ Map config = new HashMap<>();
+
+ Map pullLink = new HashMap<>();
+ pullLink.put("direction", "pull");
+ pullLink.put("remote_namespace", remoteNamespace);
+ pullLink.put("local_namespace", localNamespace);
+ pullLink.put("redis.mode", "stream");
+ pullLink.put("redis.stream.group", "perf-stream-group-" + Math.abs(remoteNamespace.hashCode()));
+ pullLink.put("redis.stream.consumer", "perf-stream-consumer");
+ pullLink.put("redis.stream.poll_ms", pollMs);
+ pullLink.put("redis.stream.min_poll_ms", minPollMs);
+ pullLink.put("redis.stream.batch_size", batchSize);
+ pullLink.put("redis.stream.pending_sample_every", pendingSampleEvery);
+ pullLink.put("redis.reconnect.initial_ms", 100);
+ pullLink.put("redis.reconnect.max_ms", 1000);
+
+ config.put("links", List.of(pullLink));
+ return config;
+ }
+
+ /**
+ * @return pub/sub pull performance config
+ */
+ private Map createPullPubSubPerfConfig() {
+ Map config = new HashMap<>();
+
+ Map pullLink = new HashMap<>();
+ pullLink.put("direction", "pull");
+ pullLink.put("remote_namespace", "perf.pubsub.in");
+ pullLink.put("local_namespace", "/perf/maps/in/pubsub");
+ pullLink.put("redis.mode", "pubsub");
+
+ config.put("links", List.of(pullLink));
+ return config;
+ }
+
+ /**
+ * Computes a nearest-rank percentile from sorted latency values.
+ *
+ * @param values sample values
+ * @param percentile percentile rank (0-100)
+ * @return percentile value
+ */
+ private double percentile(List values, int percentile) {
+ if (values.isEmpty()) {
+ return 0.0;
+ }
+ int index = (int) Math.ceil((percentile / 100.0) * values.size()) - 1;
+ index = Math.max(0, Math.min(values.size() - 1, index));
+ return values.get(index);
+ }
+
+ /**
+ * Inbound event capture with receive timestamp.
+ */
+ private static final class InboundCapture {
+ private final String localNamespace;
+ private final Message message;
+ private final long receivedAtNs;
+
+ /**
+ * Creates one inbound capture.
+ *
+ * @param localNamespace routed local namespace
+ * @param message inbound message
+ * @param receivedAtNs receive timestamp in nanoseconds
+ */
+ private InboundCapture(String localNamespace, Message message, long receivedAtNs) {
+ this.localNamespace = localNamespace;
+ this.message = message;
+ this.receivedAtNs = receivedAtNs;
+ }
+ }
+
+ /**
+ * RedisProtocol variant that records inbound events into a queue.
+ */
+ private static final class CapturingRedisProtocol extends RedisProtocol {
+ private final BlockingQueue queue = new LinkedBlockingQueue<>();
+
+ /**
+ * Creates capturing protocol from plain URL/config.
+ *
+ * @param urlString redis URL
+ * @param configMap protocol config
+ */
+ private CapturingRedisProtocol(String urlString, Map configMap) {
+ super(urlString, configMap);
+ }
+
+ /**
+ * Captures inbound messages.
+ *
+ * @param destination target namespace
+ * @param message inbound message
+ * @throws IOException not used in this test implementation
+ */
+ @Override
+ protected void inbound(String destination, Message message) throws IOException {
+ queue.add(new InboundCapture(destination, message, System.nanoTime()));
+ }
+
+ /**
+ * Waits for an exact number of inbound captures.
+ *
+ * @param expectedCount expected capture count
+ * @param timeoutSeconds timeout in seconds
+ * @return collected captures
+ * @throws InterruptedException if interrupted while waiting
+ */
+ private List awaitCount(int expectedCount, int timeoutSeconds) throws InterruptedException {
+ List captures = new ArrayList<>(expectedCount);
+ long deadline = System.currentTimeMillis() + (timeoutSeconds * 1000L);
+ while (captures.size() < expectedCount && System.currentTimeMillis() < deadline) {
+ InboundCapture capture = queue.poll(200, TimeUnit.MILLISECONDS);
+ if (Objects.nonNull(capture)) {
+ captures.add(capture);
+ }
+ }
+ if (captures.size() < expectedCount) {
+ throw new AssertionError("Timed out waiting for " + expectedCount + " captures, got " + captures.size());
+ }
+ return captures;
+ }
+ }
+}
diff --git a/redis-extension/src/test/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocolRoundTripTest.java b/redis-extension/src/test/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocolRoundTripTest.java
new file mode 100644
index 0000000..7a54e17
--- /dev/null
+++ b/redis-extension/src/test/java/io/mapsmessaging/network/protocol/impl/redis/RedisProtocolRoundTripTest.java
@@ -0,0 +1,736 @@
+package io.mapsmessaging.network.protocol.impl.redis;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.StreamMessage;
+import io.lettuce.core.XReadArgs;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.codec.ByteArrayCodec;
+import io.lettuce.core.codec.RedisCodec;
+import io.lettuce.core.codec.StringCodec;
+import io.lettuce.core.models.stream.PendingMessages;
+import io.lettuce.core.pubsub.RedisPubSubAdapter;
+import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
+import io.mapsmessaging.api.MessageBuilder;
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.api.message.TypedData;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Testcontainers
+/**
+ * End-to-end harness for Redis extension behavior across pub/sub and streams paths.
+ */
+class RedisProtocolRoundTripTest {
+
+ private static final RedisCodec STRING_BYTE_CODEC = RedisCodec.of(StringCodec.UTF8, ByteArrayCodec.INSTANCE);
+
+ @Container
+ @SuppressWarnings("resource")
+ static final GenericContainer> REDIS = new GenericContainer<>("redis:7.2-alpine")
+ .withExposedPorts(6379);
+
+ private RedisProtocol protocol;
+
+ /**
+ * Closes protocol resources between tests.
+ *
+ * @throws Exception when shutdown fails
+ */
+ @AfterEach
+ void tearDown() throws Exception {
+ if (protocol != null) {
+ protocol.close();
+ }
+ }
+
+ /**
+ * Validates pub/sub round-trip with CloudEvent wrap/unwrap and type/header preservation.
+ *
+ * @throws Exception test execution error
+ */
+ @Test
+ void pubSubRoundTrip_RedisToMapsToCloudEventToMapsToRedis_RetainsTypesAndHeaders() throws Exception {
+ String redisUrl = redisUrl();
+ protocol = new RedisProtocol(redisUrl, createPubSubCloudEventConfig());
+ protocol.initialise();
+ protocol.registerLocalLink("/ce/wrap");
+ protocol.registerLocalLink("/ce/final");
+
+ RedisClient client = RedisClient.create(redisUrl);
+ try (StatefulRedisConnection connection = client.connect(STRING_BYTE_CODEC);
+ StatefulRedisPubSubConnection pubSub = client.connectPubSub(STRING_BYTE_CODEC)) {
+
+ RedisCommands commands = connection.sync();
+ BlockingQueue rawQueue = new LinkedBlockingQueue<>();
+ BlockingQueue wrapQueue = new LinkedBlockingQueue<>();
+ BlockingQueue finalQueue = new LinkedBlockingQueue<>();
+
+ pubSub.addListener(new RedisPubSubAdapter<>() {
+ @Override
+ public void message(String channel, byte[] message) {
+ if ("raw.in".equals(channel)) {
+ rawQueue.add(message);
+ } else if ("events.cloudevents".equals(channel)) {
+ wrapQueue.add(message);
+ } else if ("events.final".equals(channel)) {
+ finalQueue.add(message);
+ }
+ }
+ });
+ pubSub.sync().subscribe("raw.in", "events.cloudevents", "events.final");
+
+ byte[] payload = "payload-pubsub".getBytes(StandardCharsets.UTF_8);
+ Map inboundHeaders = new LinkedHashMap<>();
+ inboundHeaders.put("x-trace-id", "trace-pubsub-1".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("x-region", "eu-west".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.type.speed", "INT".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.data.speed", "88".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.type.valid", "BOOLEAN".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.data.valid", "true".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.contentType", "application/json".getBytes(StandardCharsets.UTF_8));
+
+ byte[] inboundEnvelope = RedisWireEnvelope.of(payload, inboundHeaders).encode();
+ commands.publish("raw.in", inboundEnvelope);
+
+ byte[] rawRedisMessage = poll(rawQueue);
+ Message afterRedisIngress = protocol.convertInboundEnvelopeForTest(rawRedisMessage, "raw.in", "pubsub");
+ protocol.outbound("/ce/wrap", afterRedisIngress);
+
+ byte[] wrappedEnvelopeBytes = poll(wrapQueue);
+ RedisWireEnvelope wrappedEnvelope = RedisWireEnvelope.decode(wrappedEnvelopeBytes);
+ assertEquals("1.0", utf8(wrappedEnvelope.headers().get("ce_specversion")));
+ assertEquals("io.test.redis", utf8(wrappedEnvelope.headers().get("ce_type")));
+ assertEquals("/tests/redis", utf8(wrappedEnvelope.headers().get("ce_source")));
+
+ Message afterCloudEvent = protocol.convertInboundEnvelopeForTest(wrappedEnvelopeBytes, "events.cloudevents", "pubsub");
+ protocol.outbound("/ce/final", afterCloudEvent);
+
+ byte[] finalEnvelopeBytes = poll(finalQueue);
+ RedisWireEnvelope finalEnvelope = RedisWireEnvelope.decode(finalEnvelopeBytes);
+
+ assertArrayEquals(payload, finalEnvelope.payload());
+ assertEquals("trace-pubsub-1", utf8(finalEnvelope.headers().get("x-trace-id")));
+ assertEquals("eu-west", utf8(finalEnvelope.headers().get("x-region")));
+ assertEquals("INT", utf8(finalEnvelope.headers().get("maps.type.speed")));
+ assertEquals("88", utf8(finalEnvelope.headers().get("maps.data.speed")));
+ assertEquals("BOOLEAN", utf8(finalEnvelope.headers().get("maps.type.valid")));
+ assertEquals("true", utf8(finalEnvelope.headers().get("maps.data.valid")));
+ assertEquals("application/json", utf8(finalEnvelope.headers().get("maps.contentType")));
+ assertEquals("1.0", utf8(finalEnvelope.headers().get("ce_specversion")));
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Validates stream round-trip including binary payload preservation and CloudEvent headers.
+ *
+ * @throws Exception test execution error
+ */
+ @Test
+ void streamRoundTrip_RedisToMapsToCloudEventToMapsToRedis_RetainsTypesAndHeaders() throws Exception {
+ String redisUrl = redisUrl();
+ protocol = new RedisProtocol(redisUrl, createStreamCloudEventConfig());
+ protocol.initialise();
+ protocol.registerLocalLink("/stream/ce/wrap");
+ protocol.registerLocalLink("/stream/ce/final");
+
+ RedisClient client = RedisClient.create(redisUrl);
+ try (StatefulRedisConnection connection = client.connect(STRING_BYTE_CODEC)) {
+ RedisCommands commands = connection.sync();
+
+ byte[] payload = new byte[]{0x00, 0x01, 0x7F, (byte) 0xFE};
+ Map inboundHeaders = new LinkedHashMap<>();
+ inboundHeaders.put("x-trace-id", "trace-stream-1".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.type.count", "LONG".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.data.count", "123".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.type.active", "BOOLEAN".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.data.active", "true".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.contentType", "application/octet-stream".getBytes(StandardCharsets.UTF_8));
+
+ byte[] inboundEnvelope = RedisWireEnvelope.of(payload, inboundHeaders).encode();
+ commands.xadd("stream.raw.in", Map.of("maps", inboundEnvelope));
+
+ byte[] rawEnvelope = readSingleEnvelope(commands, "stream.raw.in");
+ Message afterRedisIngress = protocol.convertInboundEnvelopeForTest(rawEnvelope, "stream.raw.in", "stream");
+ protocol.outbound("/stream/ce/wrap", afterRedisIngress);
+
+ byte[] wrappedEnvelope = readSingleEnvelope(commands, "stream.events.cloudevents");
+ RedisWireEnvelope wrappedDecoded = RedisWireEnvelope.decode(wrappedEnvelope);
+ assertEquals("1.0", utf8(wrappedDecoded.headers().get("ce_specversion")));
+ assertEquals("io.test.redis.stream", utf8(wrappedDecoded.headers().get("ce_type")));
+
+ Message afterCloudEvent = protocol.convertInboundEnvelopeForTest(wrappedEnvelope, "stream.events.cloudevents", "stream");
+ protocol.outbound("/stream/ce/final", afterCloudEvent);
+
+ byte[] finalEnvelopeBytes = readSingleEnvelope(commands, "stream.events.final");
+ RedisWireEnvelope finalEnvelope = RedisWireEnvelope.decode(finalEnvelopeBytes);
+
+ assertArrayEquals(payload, finalEnvelope.payload());
+ assertEquals("trace-stream-1", utf8(finalEnvelope.headers().get("x-trace-id")));
+ assertEquals("LONG", utf8(finalEnvelope.headers().get("maps.type.count")));
+ assertEquals("123", utf8(finalEnvelope.headers().get("maps.data.count")));
+ assertEquals("BOOLEAN", utf8(finalEnvelope.headers().get("maps.type.active")));
+ assertEquals("true", utf8(finalEnvelope.headers().get("maps.data.active")));
+ assertEquals("application/octet-stream", utf8(finalEnvelope.headers().get("maps.contentType")));
+ assertEquals("1.0", utf8(finalEnvelope.headers().get("ce_specversion")));
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Validates CloudEvent default field generation when optional values are missing.
+ *
+ * @throws Exception test execution error
+ */
+ @Test
+ void pubSubRoundTrip_WrapGeneratesCloudEventDefaults_WhenOptionalFieldsMissing() throws Exception {
+ String redisUrl = redisUrl();
+ protocol = new RedisProtocol(redisUrl, createPubSubCloudEventConfig());
+ protocol.initialise();
+ protocol.registerLocalLink("/ce/wrap");
+
+ RedisClient client = RedisClient.create(redisUrl);
+ try (StatefulRedisConnection connection = client.connect(STRING_BYTE_CODEC);
+ StatefulRedisPubSubConnection pubSub = client.connectPubSub(STRING_BYTE_CODEC)) {
+ RedisCommands commands = connection.sync();
+ BlockingQueue wrapQueue = new LinkedBlockingQueue<>();
+
+ pubSub.addListener(new RedisPubSubAdapter<>() {
+ @Override
+ public void message(String channel, byte[] message) {
+ if ("events.cloudevents".equals(channel)) {
+ wrapQueue.add(message);
+ }
+ }
+ });
+ pubSub.sync().subscribe("events.cloudevents");
+
+ byte[] payload = "defaults".getBytes(StandardCharsets.UTF_8);
+ Map inboundHeaders = new LinkedHashMap<>();
+ inboundHeaders.put("maps.type.count", "INT".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.data.count", "1".getBytes(StandardCharsets.UTF_8));
+ inboundHeaders.put("maps.contentType", "application/json".getBytes(StandardCharsets.UTF_8));
+
+ Message ingress = protocol.convertInboundEnvelopeForTest(
+ RedisWireEnvelope.of(payload, inboundHeaders).encode(),
+ "raw.in",
+ "pubsub"
+ );
+ assertNull(ingress.getDataMap().get("cloudevents.id"));
+ assertNull(ingress.getDataMap().get("cloudevents.time"));
+
+ protocol.outbound("/ce/wrap", ingress);
+ RedisWireEnvelope wrapped = RedisWireEnvelope.decode(poll(wrapQueue));
+
+ assertEquals("1.0", utf8(wrapped.headers().get("ce_specversion")));
+ assertEquals("io.test.redis", utf8(wrapped.headers().get("ce_type")));
+ assertEquals("/tests/redis", utf8(wrapped.headers().get("ce_source")));
+ assertNotNull(utf8(wrapped.headers().get("ce_id")));
+ assertNotNull(utf8(wrapped.headers().get("ce_time")));
+ assertEquals("application/json", utf8(wrapped.headers().get("ce_datacontenttype")));
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Validates outbound behavior across transient bridge failures without metadata drift.
+ */
+ @Test
+ void outbound_RecoversAfterTransientBridgeFailure_WithoutTypeOrHeaderLoss() {
+ protocol = new RedisProtocol("redis://localhost:6379/", createPubSubCloudEventConfig());
+ protocol.registerLocalLink("/ce/final");
+
+ FlakyBridge bridge = new FlakyBridge();
+ protocol.setRedisBridgeForTest(bridge);
+ bridge.fail = true;
+
+ Map map = new LinkedHashMap<>();
+ map.put("speed", new TypedData(88));
+ map.put("active", new TypedData(true));
+ map.put("redis.header.x-trace-id", new TypedData("trace-reconnect"));
+ Message outbound = new MessageBuilder()
+ .setDataMap(map)
+ .setOpaqueData("payload-reconnect".getBytes(StandardCharsets.UTF_8))
+ .setContentType("application/json")
+ .build();
+
+ assertDoesNotThrow(() -> protocol.outbound("/ce/final", outbound));
+ assertTrue(bridge.published.isEmpty());
+
+ bridge.fail = false;
+ assertDoesNotThrow(() -> protocol.outbound("/ce/final", outbound));
+
+ byte[] envelope = bridge.published.get("events.final");
+ assertNotNull(envelope);
+ RedisWireEnvelope decoded = RedisWireEnvelope.decode(envelope);
+ assertArrayEquals("payload-reconnect".getBytes(StandardCharsets.UTF_8), decoded.payload());
+ assertEquals("trace-reconnect", utf8(decoded.headers().get("x-trace-id")));
+ assertEquals("INT", utf8(decoded.headers().get("maps.type.speed")));
+ assertEquals("88", utf8(decoded.headers().get("maps.data.speed")));
+ assertEquals("BOOLEAN", utf8(decoded.headers().get("maps.type.active")));
+ assertEquals("true", utf8(decoded.headers().get("maps.data.active")));
+ assertEquals("application/json", utf8(decoded.headers().get("maps.contentType")));
+ }
+
+ /**
+ * Validates that pub/sub pull links route inbound messages to configured MAPS namespace.
+ *
+ * @throws Exception test execution error
+ */
+ @Test
+ void registerRemoteLink_PubSub_DeliversInboundMessageToConfiguredLocalNamespace() throws Exception {
+ String redisUrl = redisUrl();
+ CapturingRedisProtocol capturing = new CapturingRedisProtocol(redisUrl, createPullPubSubConfig());
+ protocol = capturing;
+ protocol.initialise();
+ protocol.registerRemoteLink("pull.raw.in", null);
+
+ RedisClient client = RedisClient.create(redisUrl);
+ try (StatefulRedisConnection connection = client.connect(STRING_BYTE_CODEC)) {
+ RedisCommands commands = connection.sync();
+
+ Map headers = new LinkedHashMap<>();
+ headers.put("maps.type.speed", "INT".getBytes(StandardCharsets.UTF_8));
+ headers.put("maps.data.speed", "57".getBytes(StandardCharsets.UTF_8));
+ headers.put("x-trace-id", "trace-pull-pubsub".getBytes(StandardCharsets.UTF_8));
+ byte[] payload = "pull-pubsub".getBytes(StandardCharsets.UTF_8);
+
+ commands.publish("pull.raw.in", RedisWireEnvelope.of(payload, headers).encode());
+
+ InboundCapture received = capturing.await();
+ assertEquals("/maps/in/pubsub", received.localNamespace);
+ assertArrayEquals(payload, received.message.getOpaqueData());
+ assertEquals("pubsub", received.message.getDataMap().get("redis.transport").getData());
+ assertEquals(57, received.message.getDataMap().get("speed").getData());
+ assertEquals("trace-pull-pubsub", received.message.getDataMap().get("redis.header.x-trace-id").getData());
+ assertNotNull(received.message.getDataMap().get("redis.pull.received_count"));
+ assertEquals(1L, received.message.getDataMap().get("redis.pull.received_count").getData());
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Validates stream pull links consume, route and ack records.
+ *
+ * @throws Exception test execution error
+ */
+ @Test
+ void registerRemoteLink_Stream_DeliversInboundMessageAndAcks() throws Exception {
+ String redisUrl = redisUrl();
+ CapturingRedisProtocol capturing = new CapturingRedisProtocol(redisUrl, createPullStreamConfig());
+ protocol = capturing;
+ protocol.initialise();
+ protocol.registerRemoteLink("pull.stream.in", null);
+
+ RedisClient client = RedisClient.create(redisUrl);
+ try (StatefulRedisConnection connection = client.connect(STRING_BYTE_CODEC)) {
+ RedisCommands commands = connection.sync();
+
+ Map headers = new LinkedHashMap<>();
+ headers.put("maps.type.active", "BOOLEAN".getBytes(StandardCharsets.UTF_8));
+ headers.put("maps.data.active", "true".getBytes(StandardCharsets.UTF_8));
+ headers.put("x-trace-id", "trace-pull-stream".getBytes(StandardCharsets.UTF_8));
+ byte[] payload = "pull-stream".getBytes(StandardCharsets.UTF_8);
+
+ commands.xadd("pull.stream.in", Map.of("maps", RedisWireEnvelope.of(payload, headers).encode()));
+
+ InboundCapture received = capturing.await();
+ assertEquals("/maps/in/stream", received.localNamespace);
+ assertArrayEquals(payload, received.message.getOpaqueData());
+ assertEquals("stream", received.message.getDataMap().get("redis.transport").getData());
+ assertEquals(true, received.message.getDataMap().get("active").getData());
+ assertEquals("trace-pull-stream", received.message.getDataMap().get("redis.header.x-trace-id").getData());
+ assertNotNull(received.message.getDataMap().get("redis.stream.ack_latency_ms"));
+
+ PendingMessages pending = commands.xpending("pull.stream.in", "maps-pull-group");
+ assertEquals(0, pending.getCount());
+
+ RedisProtocol.PullStats stats = capturing.pullStatsForTest("pull.stream.in");
+ assertNotNull(stats);
+ assertTrue(stats.receivedCount >= 1);
+ assertEquals(0L, stats.pendingCount);
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Validates stream pull reconnect path after forced disconnect.
+ *
+ * @throws Exception test execution error
+ */
+ @Test
+ void registerRemoteLink_Stream_ReconnectsAfterForcedDisconnect_AndContinuesDelivery() throws Exception {
+ String redisUrl = redisUrl();
+ CapturingRedisProtocol capturing = new CapturingRedisProtocol(redisUrl, createPullStreamConfig());
+ protocol = capturing;
+ protocol.initialise();
+ protocol.registerRemoteLink("pull.stream.in", null);
+
+ RedisClient client = RedisClient.create(redisUrl);
+ try (StatefulRedisConnection connection = client.connect(STRING_BYTE_CODEC)) {
+ RedisCommands commands = connection.sync();
+
+ capturing.forceCloseStreamConnectionForTest("pull.stream.in");
+
+ Map headers = new LinkedHashMap<>();
+ headers.put("maps.type.count", "LONG".getBytes(StandardCharsets.UTF_8));
+ headers.put("maps.data.count", "222".getBytes(StandardCharsets.UTF_8));
+ headers.put("x-trace-id", "trace-reconnect-stream".getBytes(StandardCharsets.UTF_8));
+ byte[] payload = "pull-stream-reconnect".getBytes(StandardCharsets.UTF_8);
+ commands.xadd("pull.stream.in", Map.of("maps", RedisWireEnvelope.of(payload, headers).encode()));
+
+ InboundCapture received = capturing.await();
+ assertEquals("/maps/in/stream", received.localNamespace);
+ assertArrayEquals(payload, received.message.getOpaqueData());
+ assertEquals(222L, received.message.getDataMap().get("count").getData());
+ assertEquals("trace-reconnect-stream", received.message.getDataMap().get("redis.header.x-trace-id").getData());
+
+ RedisProtocol.PullStats stats = capturing.pullStatsForTest("pull.stream.in");
+ assertNotNull(stats);
+ assertTrue(stats.reconnectCount >= 1);
+ assertTrue(stats.receivedCount >= 1);
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Validates periodic MAPS-native metrics publication and reconnect alert flagging.
+ *
+ * @throws Exception test execution error
+ */
+ @Test
+ void metricsPublisher_EmitsMapsNativeMetricEvents_WithReconnectAlert() throws Exception {
+ String redisUrl = redisUrl();
+ CapturingRedisProtocol capturing = new CapturingRedisProtocol(redisUrl, createPullStreamMetricsConfig());
+ protocol = capturing;
+ protocol.initialise();
+ protocol.registerRemoteLink("pull.stream.in", null);
+
+ RedisClient client = RedisClient.create(redisUrl);
+ try (StatefulRedisConnection connection = client.connect(STRING_BYTE_CODEC)) {
+ RedisCommands commands = connection.sync();
+
+ capturing.forceCloseStreamConnectionForTest("pull.stream.in");
+ Map headers = new LinkedHashMap<>();
+ headers.put("maps.type.count", "INT".getBytes(StandardCharsets.UTF_8));
+ headers.put("maps.data.count", "5".getBytes(StandardCharsets.UTF_8));
+ commands.xadd("pull.stream.in", Map.of("maps", RedisWireEnvelope.of("metric".getBytes(StandardCharsets.UTF_8), headers).encode()));
+
+ InboundCapture metric = capturing.awaitMatching(c -> c.localNamespace.startsWith("/metrics/redis/pull/"));
+ assertEquals("/metrics/redis/pull/pull.stream.in", metric.localNamespace);
+ assertEquals(true, metric.message.getDataMap().get("redis.alert.reconnect").getData());
+ assertNotNull(metric.message.getDataMap().get("redis.pull.reconnect_count"));
+ assertNotNull(metric.message.getDataMap().get("redis.stream.pending_count"));
+ } finally {
+ client.shutdown();
+ }
+ }
+
+ /**
+ * Reads a single envelope field from a stream.
+ *
+ * @param commands redis commands
+ * @param stream stream name
+ * @return encoded envelope bytes
+ */
+ private byte[] readSingleEnvelope(RedisCommands commands, String stream) {
+ List> records = commands.xread(
+ XReadArgs.Builder.block(Duration.ofSeconds(5)).count(1),
+ XReadArgs.StreamOffset.from(stream, "0-0")
+ );
+
+ assertNotNull(records);
+ assertEquals(1, records.size());
+ assertNotNull(records.get(0).getBody().get("maps"));
+ return records.get(0).getBody().get("maps");
+ }
+
+ /**
+ * Polls a queue with timeout.
+ *
+ * @param queue source queue
+ * @return dequeued bytes
+ * @throws InterruptedException if interrupted while waiting
+ */
+ private byte[] poll(BlockingQueue queue) throws InterruptedException {
+ byte[] data = queue.poll(5, TimeUnit.SECONDS);
+ assertNotNull(data);
+ return data;
+ }
+
+ /**
+ * Decodes UTF-8 bytes for assertions.
+ *
+ * @param bytes encoded bytes
+ * @return decoded text or {@code null}
+ */
+ private String utf8(byte[] bytes) {
+ return bytes == null ? null : new String(bytes, StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Builds Redis URL from running container mapping.
+ *
+ * @return redis URL
+ */
+ private String redisUrl() {
+ return "redis://" + REDIS.getHost() + ":" + REDIS.getMappedPort(6379) + "/";
+ }
+
+ /**
+ * @return push-link config for pub/sub CloudEvent pipeline tests
+ */
+ private Map createPubSubCloudEventConfig() {
+ Map config = new HashMap<>();
+
+ Map wrapLink = new HashMap<>();
+ wrapLink.put("direction", "push");
+ wrapLink.put("local_namespace", "/ce/wrap");
+ wrapLink.put("remote_namespace", "events.cloudevents");
+ wrapLink.put("redis.mode", "pubsub");
+ wrapLink.put("cloud_event.mode", "wrap");
+ wrapLink.put("cloud_event.type", "io.test.redis");
+ wrapLink.put("cloud_event.source", "/tests/redis");
+
+ Map finalLink = new HashMap<>();
+ finalLink.put("direction", "push");
+ finalLink.put("local_namespace", "/ce/final");
+ finalLink.put("remote_namespace", "events.final");
+ finalLink.put("redis.mode", "pubsub");
+
+ config.put("links", List.of(wrapLink, finalLink));
+ return config;
+ }
+
+ /**
+ * @return push-link config for stream CloudEvent pipeline tests
+ */
+ private Map createStreamCloudEventConfig() {
+ Map config = new HashMap<>();
+
+ Map wrapLink = new HashMap<>();
+ wrapLink.put("direction", "push");
+ wrapLink.put("local_namespace", "/stream/ce/wrap");
+ wrapLink.put("remote_namespace", "stream.events.cloudevents");
+ wrapLink.put("redis.mode", "stream");
+ wrapLink.put("cloud_event.mode", "wrap");
+ wrapLink.put("cloud_event.type", "io.test.redis.stream");
+ wrapLink.put("cloud_event.source", "/tests/redis/stream");
+
+ Map finalLink = new HashMap<>();
+ finalLink.put("direction", "push");
+ finalLink.put("local_namespace", "/stream/ce/final");
+ finalLink.put("remote_namespace", "stream.events.final");
+ finalLink.put("redis.mode", "stream");
+
+ config.put("links", List.of(wrapLink, finalLink));
+ return config;
+ }
+
+ /**
+ * @return pull-link config for pub/sub ingress tests
+ */
+ private Map createPullPubSubConfig() {
+ Map config = new HashMap<>();
+
+ Map pullLink = new HashMap<>();
+ pullLink.put("direction", "pull");
+ pullLink.put("remote_namespace", "pull.raw.in");
+ pullLink.put("local_namespace", "/maps/in/pubsub");
+ pullLink.put("redis.mode", "pubsub");
+
+ config.put("links", List.of(pullLink));
+ return config;
+ }
+
+ /**
+ * @return pull-link config for stream ingress tests
+ */
+ private Map createPullStreamConfig() {
+ Map config = new HashMap<>();
+
+ Map pullLink = new HashMap<>();
+ pullLink.put("direction", "pull");
+ pullLink.put("remote_namespace", "pull.stream.in");
+ pullLink.put("local_namespace", "/maps/in/stream");
+ pullLink.put("redis.mode", "stream");
+ pullLink.put("redis.stream.group", "maps-pull-group");
+ pullLink.put("redis.stream.consumer", "maps-pull-consumer");
+ pullLink.put("redis.stream.poll_ms", 100);
+ pullLink.put("redis.reconnect.initial_ms", 100);
+ pullLink.put("redis.reconnect.max_ms", 500);
+
+ config.put("links", List.of(pullLink));
+ return config;
+ }
+
+ /**
+ * @return stream pull config with metrics and alert settings enabled
+ */
+ private Map createPullStreamMetricsConfig() {
+ Map config = createPullStreamConfig();
+ config.put("redis.metrics.enabled", true);
+ config.put("redis.metrics.namespace", "/metrics/redis/pull");
+ config.put("redis.metrics.publish_ms", 250);
+ config.put("redis.alert.reconnect_delta_threshold", 1);
+ config.put("redis.alert.pending_threshold", 1000);
+ return config;
+ }
+
+ /**
+ * Test double that simulates transient Redis send failures.
+ */
+ private static final class FlakyBridge implements RedisProtocol.RedisBridge {
+ private final Map published = new LinkedHashMap<>();
+ private boolean fail;
+
+ /**
+ * Publishes to an in-memory map, optionally failing.
+ *
+ * @param channel channel key
+ * @param envelope envelope payload
+ */
+ @Override
+ public void publish(String channel, byte[] envelope) {
+ if (fail) {
+ throw new RuntimeException("simulated redis publish failure");
+ }
+ published.put(channel, envelope);
+ }
+
+ /**
+ * Appends to an in-memory map, optionally failing.
+ *
+ * @param stream stream key
+ * @param envelope envelope payload
+ */
+ @Override
+ public void appendStream(String stream, byte[] envelope) {
+ if (fail) {
+ throw new RuntimeException("simulated redis stream failure");
+ }
+ published.put(stream, envelope);
+ }
+
+ /**
+ * No-op close for in-memory bridge.
+ */
+ @Override
+ public void close() {
+ // no-op
+ }
+ }
+
+ /**
+ * Captured inbound event tuple used by assertions.
+ */
+ private static final class InboundCapture {
+ private final String localNamespace;
+ private final Message message;
+
+ /**
+ * Creates one capture tuple.
+ *
+ * @param localNamespace routed local namespace
+ * @param message inbound message
+ */
+ private InboundCapture(String localNamespace, Message message) {
+ this.localNamespace = localNamespace;
+ this.message = message;
+ }
+ }
+
+ /**
+ * RedisProtocol test subclass that captures inbound events.
+ */
+ private static final class CapturingRedisProtocol extends RedisProtocol {
+ private final BlockingQueue queue = new LinkedBlockingQueue<>();
+
+ /**
+ * Creates a capturing protocol with plain URL/config inputs.
+ *
+ * @param urlString redis URL
+ * @param configMap protocol config
+ */
+ private CapturingRedisProtocol(String urlString, Map configMap) {
+ super(urlString, configMap);
+ }
+
+ /**
+ * Captures inbound messages instead of forwarding into MAPS runtime.
+ *
+ * @param destination destination namespace
+ * @param message inbound message
+ * @throws IOException not used by this test implementation
+ */
+ @Override
+ protected void inbound(String destination, Message message) throws IOException {
+ queue.add(new InboundCapture(destination, message));
+ }
+
+ /**
+ * Waits for any captured inbound message.
+ *
+ * @return capture object
+ * @throws InterruptedException if interrupted
+ */
+ private InboundCapture await() throws InterruptedException {
+ InboundCapture capture = queue.poll(5, TimeUnit.SECONDS);
+ if (Objects.isNull(capture)) {
+ throw new AssertionError("Timed out waiting for inbound capture");
+ }
+ return capture;
+ }
+
+ /**
+ * Waits for a captured inbound message matching a predicate.
+ *
+ * @param predicate match function
+ * @return matching capture
+ * @throws InterruptedException if interrupted
+ */
+ private InboundCapture awaitMatching(Predicate predicate) throws InterruptedException {
+ long deadline = System.currentTimeMillis() + 5000L;
+ while (System.currentTimeMillis() < deadline) {
+ InboundCapture capture = queue.poll(200, TimeUnit.MILLISECONDS);
+ if (capture == null) {
+ continue;
+ }
+ if (predicate.test(capture)) {
+ return capture;
+ }
+ }
+ throw new AssertionError("Timed out waiting for matching inbound capture");
+ }
+ }
+}
diff --git a/ros-extension/.gitignore b/ros-extension/.gitignore
new file mode 100644
index 0000000..2f7896d
--- /dev/null
+++ b/ros-extension/.gitignore
@@ -0,0 +1 @@
+target/
diff --git a/ros-extension/README.md b/ros-extension/README.md
new file mode 100644
index 0000000..8db2a2c
--- /dev/null
+++ b/ros-extension/README.md
@@ -0,0 +1,38 @@
+# ROS Extension
+
+MapsMessaging extension module for bridging MAPS topics with ROS topics using the jrosclient ecosystem.
+
+## Goals
+
+- Support ROS 1 and ROS 2 deployments through one extension contract.
+- Preserve ROS message context for seamless MAPS -> protocol -> MAPS -> ROS roundtrips.
+- Emit a stable schema convention so translators can map ROS payloads across protocols.
+
+## Schema Convention
+
+Each inbound ROS message is emitted with:
+
+- `maps.schema.kind = ros`
+- `maps.schema.id = ros:////`
+- `ros.version`, `ros.package`, `ros.type`, `ros.topic`
+- `ros.md5` (ROS1 when available), `ros.qos` (ROS2 when available), `ros.context`
+
+Payload bytes are kept as ROS wire payload (`contentType = application/x-ros-binary`) to avoid context loss.
+
+## Configuration
+
+See `src/main/resources/NetworkConnectionManager-example.yaml`.
+
+Important fields:
+
+- `rosVersion`: `1`, `2`, or `auto`
+- `schema_mode`: `strict` (default) or `passthrough`
+- Per-link: `ros_topic`, `ros_version`, `ros_package`, `ros_type`
+
+In strict mode, `ros_type` is required on push and pull links.
+
+## Runtime Notes
+
+This module uses a reflection-based adapter boundary to stay binary-compatible across jrosclient versions.
+
+To enable live ROS networking in your deployment, include jrosclient runtime jars on the plugin classpath.
diff --git a/ros-extension/pom.xml b/ros-extension/pom.xml
new file mode 100644
index 0000000..898d4a2
--- /dev/null
+++ b/ros-extension/pom.xml
@@ -0,0 +1,64 @@
+
+ 4.0.0
+
+
+ io.mapsmessaging
+ extension-project
+ 1.0.0-SNAPSHOT
+ ../pom.xml
+
+
+ ros-extension
+
+
+ 11
+ UTF-8
+
+
+
+
+ io.mapsmessaging
+ maps
+ 4.2.1-SNAPSHOT
+ provided
+
+
+ io.mapsmessaging
+ simple_logging
+ 2.0.13-SNAPSHOT
+ provided
+
+
+ io.mapsmessaging
+ dynamic_storage
+ 2.4.13-SNAPSHOT
+ provided
+
+
+ io.mapsmessaging
+ jms_selector_parser
+ 1.1.16-SNAPSHOT
+ provided
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.11.4
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 3.2.5
+
+
+
+
diff --git a/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/ReflectiveJRosAdapter.java b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/ReflectiveJRosAdapter.java
new file mode 100644
index 0000000..d0a82d8
--- /dev/null
+++ b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/ReflectiveJRosAdapter.java
@@ -0,0 +1,71 @@
+package io.mapsmessaging.network.protocol.impl.ros;
+
+import io.mapsmessaging.logging.Logger;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Reflection-based adapter to keep the extension binary-compatible across jrosclient versions
+ * without a hard compile-time dependency on specific ROS client APIs.
+ */
+public class ReflectiveJRosAdapter implements RosClientAdapter {
+
+ private final Logger logger;
+ private final RosBridgeConfig config;
+ private final Map subscribers;
+ private boolean connected;
+
+ public ReflectiveJRosAdapter(Logger logger, RosBridgeConfig config) {
+ this.logger = logger;
+ this.config = config;
+ this.subscribers = new LinkedHashMap<>();
+ }
+
+ @Override
+ public void connect() throws IOException {
+ connected = true;
+ logger.log(RosLogMessages.ROS_INITIALIZED,
+ "jrosclient integration enabled in reflective mode (node=" + config.getNodeName() + ")");
+ logger.log(RosLogMessages.ROS_HINT,
+ "Add jrosclient/jros1client/jros2client jars to the runtime classpath for live ROS networking.");
+ }
+
+ @Override
+ public void registerPublisher(String topic, String rosType) throws IOException {
+ ensureConnected();
+ logger.log(RosLogMessages.ROS_REGISTER_LOCAL, topic + " type=" + rosType);
+ }
+
+ @Override
+ public void subscribe(String topic, String rosType, RosMessageListener listener) throws IOException {
+ ensureConnected();
+ subscribers.put(topic, listener);
+ logger.log(RosLogMessages.ROS_REGISTER_REMOTE, topic + " type=" + rosType);
+ }
+
+ @Override
+ public void publish(String topic, RosMessageEnvelope envelope) throws IOException {
+ ensureConnected();
+ logger.log(RosLogMessages.ROS_MESSAGE_SENT, topic);
+
+ // Loopback hook for local testing and protocol roundtrip validation.
+ RosMessageListener listener = subscribers.get(topic);
+ if (listener != null) {
+ listener.onMessage(envelope);
+ }
+ }
+
+ @Override
+ public void close() {
+ connected = false;
+ subscribers.clear();
+ }
+
+ private void ensureConnected() throws IOException {
+ if (!connected) {
+ throw new IOException("ROS adapter is not connected");
+ }
+ }
+}
diff --git a/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosBridgeConfig.java b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosBridgeConfig.java
new file mode 100644
index 0000000..4eb20d2
--- /dev/null
+++ b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosBridgeConfig.java
@@ -0,0 +1,80 @@
+package io.mapsmessaging.network.protocol.impl.ros;
+
+import java.util.Map;
+
+public class RosBridgeConfig {
+
+ public enum RosVersion {
+ AUTO,
+ ROS1,
+ ROS2;
+
+ static RosVersion from(Object value) {
+ if (value == null) {
+ return AUTO;
+ }
+ String text = value.toString().trim().toUpperCase();
+ if ("1".equals(text) || "ROS1".equals(text)) {
+ return ROS1;
+ }
+ if ("2".equals(text) || "ROS2".equals(text)) {
+ return ROS2;
+ }
+ return AUTO;
+ }
+ }
+
+ public enum SchemaMode {
+ STRICT,
+ PASSTHROUGH;
+
+ static SchemaMode from(Object value) {
+ if (value == null) {
+ return STRICT;
+ }
+ String text = value.toString().trim().toUpperCase();
+ if ("PASSTHROUGH".equals(text)) {
+ return PASSTHROUGH;
+ }
+ return STRICT;
+ }
+ }
+
+ private final RosVersion rosVersion;
+ private final SchemaMode schemaMode;
+ private final String nodeName;
+ private final String rosEndpoint;
+
+ public RosBridgeConfig(RosVersion rosVersion, SchemaMode schemaMode, String nodeName, String rosEndpoint) {
+ this.rosVersion = rosVersion;
+ this.schemaMode = schemaMode;
+ this.nodeName = nodeName;
+ this.rosEndpoint = rosEndpoint;
+ }
+
+ public RosVersion getRosVersion() {
+ return rosVersion;
+ }
+
+ public SchemaMode getSchemaMode() {
+ return schemaMode;
+ }
+
+ public String getNodeName() {
+ return nodeName;
+ }
+
+ public String getRosEndpoint() {
+ return rosEndpoint;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static RosBridgeConfig fromMap(Map config) {
+ RosVersion version = RosVersion.from(config.get("rosVersion"));
+ SchemaMode mode = SchemaMode.from(config.get("schema_mode"));
+ String node = config.getOrDefault("nodeName", "maps-ros-bridge").toString();
+ Object endpointObj = config.get("endpoint");
+ String endpoint = endpointObj == null ? null : endpointObj.toString();
+ return new RosBridgeConfig(version, mode, node, endpoint);
+ }
+}
diff --git a/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosClientAdapter.java b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosClientAdapter.java
new file mode 100644
index 0000000..ec88c56
--- /dev/null
+++ b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosClientAdapter.java
@@ -0,0 +1,15 @@
+package io.mapsmessaging.network.protocol.impl.ros;
+
+import java.io.IOException;
+
+public interface RosClientAdapter {
+ void connect() throws IOException;
+
+ void registerPublisher(String topic, String rosType) throws IOException;
+
+ void subscribe(String topic, String rosType, RosMessageListener listener) throws IOException;
+
+ void publish(String topic, RosMessageEnvelope envelope) throws IOException;
+
+ void close() throws IOException;
+}
diff --git a/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosClientAdapterFactory.java b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosClientAdapterFactory.java
new file mode 100644
index 0000000..616a5f8
--- /dev/null
+++ b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosClientAdapterFactory.java
@@ -0,0 +1,13 @@
+package io.mapsmessaging.network.protocol.impl.ros;
+
+import io.mapsmessaging.logging.Logger;
+
+public final class RosClientAdapterFactory {
+
+ private RosClientAdapterFactory() {
+ }
+
+ public static RosClientAdapter create(Logger logger, RosBridgeConfig config) {
+ return new ReflectiveJRosAdapter(logger, config);
+ }
+}
diff --git a/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosLogMessages.java b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosLogMessages.java
new file mode 100644
index 0000000..2315839
--- /dev/null
+++ b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosLogMessages.java
@@ -0,0 +1,78 @@
+package io.mapsmessaging.network.protocol.impl.ros;
+
+import io.mapsmessaging.logging.Category;
+import io.mapsmessaging.logging.LEVEL;
+import io.mapsmessaging.logging.LogMessage;
+
+public enum RosLogMessages implements LogMessage {
+
+ ROS_INITIALIZED(LEVEL.INFO, ROS_CATEGORY.PROTOCOL, "ROS extension initialized: {}"),
+ ROS_INITIALIZE_ERROR(LEVEL.ERROR, ROS_CATEGORY.PROTOCOL, "ROS extension initialization failed"),
+ ROS_CLOSED(LEVEL.INFO, ROS_CATEGORY.PROTOCOL, "ROS extension closed"),
+ ROS_CLOSE_ERROR(LEVEL.ERROR, ROS_CATEGORY.PROTOCOL, "ROS extension close failed"),
+ ROS_REGISTER_LOCAL(LEVEL.INFO, ROS_CATEGORY.PROTOCOL, "ROS local link registered: {}"),
+ ROS_REGISTER_REMOTE(LEVEL.INFO, ROS_CATEGORY.PROTOCOL, "ROS remote link registered: {}"),
+ ROS_MESSAGE_SENT(LEVEL.DEBUG, ROS_CATEGORY.PROTOCOL, "ROS message published: {}"),
+ ROS_MESSAGE_RECEIVED(LEVEL.DEBUG, ROS_CATEGORY.PROTOCOL, "ROS message received: {}"),
+ ROS_OUTBOUND_ERROR(LEVEL.ERROR, ROS_CATEGORY.PROTOCOL, "ROS outbound publish failed for {}"),
+ ROS_INBOUND_ERROR(LEVEL.ERROR, ROS_CATEGORY.PROTOCOL, "ROS inbound handling failed for {}"),
+ ROS_HINT(LEVEL.INFO, ROS_CATEGORY.PROTOCOL, "ROS runtime hint: {}");
+
+ private final String message;
+ private final LEVEL level;
+ private final Category category;
+ private final int parameterCount;
+
+ RosLogMessages(LEVEL level, Category category, String message) {
+ this.message = message;
+ this.level = level;
+ this.category = category;
+ int count = 0;
+ int location = message.indexOf("{}");
+ while (location != -1) {
+ count++;
+ location = message.indexOf("{}", location + 2);
+ }
+ this.parameterCount = count;
+ }
+
+ @Override
+ public String getMessage() {
+ return message;
+ }
+
+ @Override
+ public LEVEL getLevel() {
+ return level;
+ }
+
+ @Override
+ public Category getCategory() {
+ return category;
+ }
+
+ @Override
+ public int getParameterCount() {
+ return parameterCount;
+ }
+
+ public enum ROS_CATEGORY implements Category {
+ PROTOCOL("Protocol");
+
+ private final String description;
+
+ ROS_CATEGORY(String description) {
+ this.description = description;
+ }
+
+ @Override
+ public String getDivision() {
+ return "Inter-Protocol";
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ }
+}
diff --git a/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosMessageEnvelope.java b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosMessageEnvelope.java
new file mode 100644
index 0000000..d05f0e0
--- /dev/null
+++ b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosMessageEnvelope.java
@@ -0,0 +1,72 @@
+package io.mapsmessaging.network.protocol.impl.ros;
+
+import java.util.Arrays;
+
+public class RosMessageEnvelope {
+
+ private final String topic;
+ private final String rosVersion;
+ private final String rosPackage;
+ private final String rosType;
+ private final String rosMd5;
+ private final String rosQos;
+ private final String rosContextJson;
+ private final String schemaId;
+ private final byte[] payload;
+
+ public RosMessageEnvelope(String topic,
+ String rosVersion,
+ String rosPackage,
+ String rosType,
+ String rosMd5,
+ String rosQos,
+ String rosContextJson,
+ String schemaId,
+ byte[] payload) {
+ this.topic = topic;
+ this.rosVersion = rosVersion;
+ this.rosPackage = rosPackage;
+ this.rosType = rosType;
+ this.rosMd5 = rosMd5;
+ this.rosQos = rosQos;
+ this.rosContextJson = rosContextJson;
+ this.schemaId = schemaId;
+ this.payload = payload == null ? new byte[0] : Arrays.copyOf(payload, payload.length);
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getRosVersion() {
+ return rosVersion;
+ }
+
+ public String getRosPackage() {
+ return rosPackage;
+ }
+
+ public String getRosType() {
+ return rosType;
+ }
+
+ public String getRosMd5() {
+ return rosMd5;
+ }
+
+ public String getRosQos() {
+ return rosQos;
+ }
+
+ public String getRosContextJson() {
+ return rosContextJson;
+ }
+
+ public String getSchemaId() {
+ return schemaId;
+ }
+
+ public byte[] getPayload() {
+ return Arrays.copyOf(payload, payload.length);
+ }
+}
diff --git a/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosMessageListener.java b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosMessageListener.java
new file mode 100644
index 0000000..aa076d1
--- /dev/null
+++ b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosMessageListener.java
@@ -0,0 +1,8 @@
+package io.mapsmessaging.network.protocol.impl.ros;
+
+import java.io.IOException;
+
+@FunctionalInterface
+public interface RosMessageListener {
+ void onMessage(RosMessageEnvelope envelope) throws IOException;
+}
diff --git a/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosMessageTranslator.java b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosMessageTranslator.java
new file mode 100644
index 0000000..e489f7b
--- /dev/null
+++ b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosMessageTranslator.java
@@ -0,0 +1,55 @@
+package io.mapsmessaging.network.protocol.impl.ros;
+
+import io.mapsmessaging.api.MessageBuilder;
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.api.message.TypedData;
+
+import java.util.Map;
+
+public class RosMessageTranslator {
+
+ public RosMessageEnvelope toRosEnvelope(Message message, RosPushBinding binding, RosBridgeConfig config) {
+ Map dataMap = message.getDataMap();
+
+ String version = firstString(dataMap, RosSchemaConvention.KEY_ROS_VERSION, binding.getRosVersion(), config.getRosVersion().name());
+ String rosPackage = firstString(dataMap, RosSchemaConvention.KEY_ROS_PACKAGE, binding.getRosPackage(), "unknown");
+ String type = firstString(dataMap, RosSchemaConvention.KEY_ROS_TYPE, binding.getRosType(), "unknown");
+ String topic = firstString(dataMap, RosSchemaConvention.KEY_ROS_TOPIC, binding.getRosTopic(), binding.getRosTopic());
+ String md5 = firstString(dataMap, RosSchemaConvention.KEY_ROS_MD5, null, null);
+ String qos = firstString(dataMap, RosSchemaConvention.KEY_ROS_QOS, null, null);
+ String context = firstString(dataMap, RosSchemaConvention.KEY_ROS_CONTEXT, null, null);
+
+ String schemaId = firstString(dataMap,
+ RosSchemaConvention.KEY_SCHEMA_ID,
+ RosSchemaConvention.schemaId(version, rosPackage, type),
+ RosSchemaConvention.schemaId(version, rosPackage, type));
+
+ byte[] payload = message.getOpaqueData();
+ if (payload == null) {
+ payload = new byte[0];
+ }
+
+ return new RosMessageEnvelope(topic, version, rosPackage, type, md5, qos, context, schemaId, payload);
+ }
+
+ public Message toMapsMessage(RosMessageEnvelope envelope) {
+ return new MessageBuilder()
+ .setOpaqueData(envelope.getPayload())
+ .setContentType(RosSchemaConvention.CONTENT_TYPE)
+ .setDataMap(RosSchemaConvention.metadataAsTypedData(envelope))
+ .build();
+ }
+
+ private String firstString(Map dataMap, String key, String preferred, String fallback) {
+ if (dataMap != null) {
+ TypedData typedData = dataMap.get(key);
+ if (typedData != null && typedData.getData() != null) {
+ return typedData.getData().toString();
+ }
+ }
+ if (preferred != null && !preferred.trim().isEmpty()) {
+ return preferred;
+ }
+ return fallback;
+ }
+}
diff --git a/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosProtocol.java b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosProtocol.java
new file mode 100644
index 0000000..19d0ca6
--- /dev/null
+++ b/ros-extension/src/main/java/io/mapsmessaging/network/protocol/impl/ros/RosProtocol.java
@@ -0,0 +1,225 @@
+package io.mapsmessaging.network.protocol.impl.ros;
+
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.logging.Logger;
+import io.mapsmessaging.logging.LoggerFactory;
+import io.mapsmessaging.network.EndPointURL;
+import io.mapsmessaging.network.io.EndPoint;
+import io.mapsmessaging.network.protocol.impl.extension.Extension;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class RosProtocol extends Extension {
+
+ private final Logger logger;
+ private final EndPointURL url;
+ private final ExtensionConfigDTO protocolConfig;
+ private final RosMessageTranslator translator;
+
+ private final Map pushBindings;
+ private final Map pullBindings;
+
+ private RosBridgeConfig bridgeConfig;
+ private RosClientAdapter rosClientAdapter;
+
+ public RosProtocol(EndPoint endPoint, ExtensionConfigDTO protocolConfigDTO) {
+ this.logger = LoggerFactory.getLogger(RosProtocol.class);
+ this.url = new EndPointURL(endPoint.getConfig().getUrl());
+ this.protocolConfig = protocolConfigDTO;
+ this.translator = new RosMessageTranslator();
+ this.pushBindings = new ConcurrentHashMap<>();
+ this.pullBindings = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void initialise() throws IOException {
+ try {
+ Map cfg = protocolConfig.getConfig();
+ this.bridgeConfig = RosBridgeConfig.fromMap(cfg);
+ this.rosClientAdapter = RosClientAdapterFactory.create(logger, bridgeConfig);
+ this.rosClientAdapter.connect();
+ logger.log(RosLogMessages.ROS_INITIALIZED,
+ "version=" + bridgeConfig.getRosVersion() + " endpoint=" + resolvedEndpoint());
+ } catch (Exception e) {
+ logger.log(RosLogMessages.ROS_INITIALIZE_ERROR, e);
+ throw new IOException("Failed to initialize ROS protocol", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (rosClientAdapter != null) {
+ rosClientAdapter.close();
+ }
+ logger.log(RosLogMessages.ROS_CLOSED);
+ } catch (Exception e) {
+ logger.log(RosLogMessages.ROS_CLOSE_ERROR, e);
+ }
+ super.close();
+ }
+
+ @Override
+ public String getName() {
+ return "ros";
+ }
+
+ @Override
+ public String getVersion() {
+ return "1.0";
+ }
+
+ @Override
+ public boolean supportsRemoteFiltering() {
+ return false;
+ }
+
+ @Override
+ public void outbound(String destination, Message message) {
+ if (destination.startsWith("$schema/") || destination.startsWith("$SCHEMA/")) {
+ return;
+ }
+
+ RosPushBinding binding = pushBindings.get(destination);
+ if (binding == null) {
+ logger.log(RosLogMessages.ROS_OUTBOUND_ERROR, destination);
+ return;
+ }
+
+ try {
+ RosMessageEnvelope envelope = translator.toRosEnvelope(message, binding, bridgeConfig);
+ rosClientAdapter.publish(binding.getRosTopic(), envelope);
+ logger.log(RosLogMessages.ROS_MESSAGE_SENT, binding.getRosTopic());
+ } catch (Exception e) {
+ logger.log(RosLogMessages.ROS_OUTBOUND_ERROR, destination + " reason=" + e.getMessage());
+ }
+ }
+
+ @Override
+ public void registerRemoteLink(String destination, String filter) throws IOException {
+ Map attrs = findLinkAttributes(destination, "pull");
+ if (attrs == null) {
+ attrs = protocolConfig.getConfig();
+ }
+
+ RosPullBinding binding = buildPullBinding(destination, attrs);
+ pullBindings.put(destination, binding);
+
+ rosClientAdapter.subscribe(binding.getRosTopic(), binding.getRosType(), envelope -> {
+ try {
+ Message message = translator.toMapsMessage(envelope);
+ inbound(binding.getLocalNamespace(), message);
+ logger.log(RosLogMessages.ROS_MESSAGE_RECEIVED, binding.getRosTopic());
+ } catch (Exception e) {
+ logger.log(RosLogMessages.ROS_INBOUND_ERROR,
+ binding.getLocalNamespace() + " reason=" + e.getMessage());
+ }
+ });
+
+ logger.log(RosLogMessages.ROS_REGISTER_REMOTE,
+ binding.getRosTopic() + " -> " + binding.getLocalNamespace());
+ }
+
+ @Override
+ public void registerLocalLink(String destination) throws IOException {
+ Map attrs = findLinkAttributes(destination, "push");
+ if (attrs == null) {
+ attrs = protocolConfig.getConfig();
+ }
+
+ RosPushBinding binding = buildPushBinding(destination, attrs);
+ pushBindings.put(binding.getLocalNamespace(), binding);
+
+ rosClientAdapter.registerPublisher(binding.getRosTopic(), binding.getRosType());
+
+ logger.log(RosLogMessages.ROS_REGISTER_LOCAL,
+ binding.getLocalNamespace() + " -> " + binding.getRosTopic());
+ }
+
+ private String resolvedEndpoint() {
+ String endpoint = bridgeConfig.getRosEndpoint();
+ if (endpoint != null && !endpoint.trim().isEmpty()) {
+ return endpoint;
+ }
+ return url.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Map findLinkAttributes(String remoteNamespace, String direction) {
+ Map cfg = protocolConfig.getConfig();
+ Object linksObj = cfg.get("links");
+ if (!(linksObj instanceof List)) {
+ return null;
+ }
+
+ for (Map link : (List