diff --git a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java
index 4231631b4..3d89bec23 100644
--- a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java
+++ b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java
@@ -244,6 +244,17 @@ public enum ServerLogMessages implements LogMessage {
//
+ //
+ TAK_EXTENSION_INITIALISED(LEVEL.INFO, SERVER_CATEGORY.PROTOCOL, "TAK extension initialised for {}"),
+ TAK_EXTENSION_CLOSED(LEVEL.INFO, SERVER_CATEGORY.PROTOCOL, "TAK extension closed for {}"),
+ TAK_EXTENSION_OUTBOUND_FAILED(LEVEL.WARN, SERVER_CATEGORY.PROTOCOL, "TAK outbound failed for destination {}"),
+ TAK_EXTENSION_DECODE_FAILED(LEVEL.WARN, SERVER_CATEGORY.PROTOCOL, "TAK decode failed for transport {}"),
+ TAK_EXTENSION_RECONNECT_ATTEMPT(LEVEL.INFO, SERVER_CATEGORY.NETWORK, "TAK reconnect attempt in {}ms for {}"),
+ TAK_EXTENSION_RECONNECT_SUCCESS(LEVEL.INFO, SERVER_CATEGORY.NETWORK, "TAK reconnect succeeded for {}"),
+ TAK_EXTENSION_RECONNECT_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "TAK reconnect failed for {}"),
+ TAK_MULTICAST_IO_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "TAK multicast IO failed for {}:{}"),
+ //
+
//
PACKET_SECURITY_VERIFICATION_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "Packet integrity verification failed: ip={}, algorithm={}, reason={}, packetLength={}, signatureSize={}"),
PACKET_SECURITY_NOT_INITIALISED(LEVEL.ERROR, SERVER_CATEGORY.NETWORK, "Packet integrity not initialised: algorithm={}"),
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java
index c339a94fb..92ad6e903 100644
--- a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java
@@ -20,6 +20,7 @@
package io.mapsmessaging.network.protocol.impl.extension;
import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.network.io.Packet;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NonNull;
@@ -74,6 +75,10 @@ public String getSessionId() {
public abstract void registerLocalLink(@NonNull @NotNull String destination) throws IOException;
+ public boolean processPacket(@NonNull @NotNull Packet packet) throws IOException {
+ return false;
+ }
+
protected void inbound(@NonNull @NotNull String destinationName, @NonNull @NotNull Message message) throws IOException {
if (extensionProtocol == null) {
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java
index 2ae596900..947f2f538 100644
--- a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java
@@ -155,7 +155,7 @@ public ProtocolInformationDTO getInformation() {
@Override
public boolean processPacket(@NonNull @NotNull Packet packet) throws IOException {
- return false;
+ return extension.processPacket(packet);
}
@Override
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java
new file mode 100644
index 000000000..914f8f092
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.network.EndPointURL;
+import io.mapsmessaging.network.io.EndPoint;
+import io.mapsmessaging.network.io.EndPointConnectedCallback;
+import io.mapsmessaging.network.io.EndPointConnectionFactory;
+import io.mapsmessaging.network.io.EndPointServerStatus;
+import io.mapsmessaging.network.io.impl.SelectorLoadManager;
+import io.mapsmessaging.network.protocol.impl.extension.ExtensionEndPointConnectionFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TakEndPointConnectionFactory implements EndPointConnectionFactory {
+
+ private final ExtensionEndPointConnectionFactory delegate = new ExtensionEndPointConnectionFactory();
+
+ @Override
+ public EndPoint connect(EndPointURL url, SelectorLoadManager selector, EndPointConnectedCallback connectedCallback,
+ EndPointServerStatus endPointServerStatus, List jmxPath) throws IOException {
+ return delegate.connect(url, selector, connectedCallback, endPointServerStatus, jmxPath);
+ }
+
+ @Override
+ public String getName() {
+ return "tak";
+ }
+
+ @Override
+ public String getDescription() {
+ return "TAK extension endpoint factory";
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java
new file mode 100644
index 000000000..378c7c498
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java
@@ -0,0 +1,316 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.logging.Logger;
+import io.mapsmessaging.logging.LoggerFactory;
+import io.mapsmessaging.logging.ServerLogMessages;
+import io.mapsmessaging.network.EndPointURL;
+import io.mapsmessaging.network.io.EndPoint;
+import io.mapsmessaging.network.io.Packet;
+import io.mapsmessaging.network.protocol.impl.extension.Extension;
+import io.mapsmessaging.network.protocol.impl.extension.ExtensionEndPoint;
+import io.mapsmessaging.network.protocol.impl.tak.codec.CotXmlCodec;
+import io.mapsmessaging.network.protocol.impl.tak.codec.TakPayloadCodec;
+import io.mapsmessaging.network.protocol.impl.tak.codec.TakProtobufCodec;
+import io.mapsmessaging.network.protocol.impl.tak.framing.TakFrameReader;
+import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer;
+import io.mapsmessaging.network.protocol.impl.tak.transport.TakConnectionManager;
+import io.mapsmessaging.network.protocol.impl.tak.transport.TakMulticastTransport;
+import io.mapsmessaging.network.protocol.impl.tak.transport.TakServerConnection;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TakExtension extends Extension {
+
+ private final EndPointURL url;
+ private final Logger logger;
+ private final EndPoint endPoint;
+ private final TakExtensionConfig config;
+ private final boolean mapsManagedTransport;
+ private final TakPayloadCodec payloadCodec;
+ private final TakStreamFramer streamFramer;
+ private final TakConnectionManager connectionManager;
+ private final TakMulticastTransport multicastTransport;
+ private final TakFrameReader frameReader;
+ private final Set remoteLinks;
+ private final Set localLinks;
+ private final AtomicBoolean running;
+ private volatile Thread readerThread;
+ private volatile Thread multicastReaderThread;
+
+ public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionConfig) {
+ this.endPoint = endPoint;
+ this.url = new EndPointURL(endPoint.getConfig().getUrl());
+ this.logger = LoggerFactory.getLogger(TakExtension.class);
+ this.config = TakExtensionConfig.from(extensionConfig);
+ this.mapsManagedTransport = config.isUseMapsTransport() && !(endPoint instanceof ExtensionEndPoint);
+ this.payloadCodec = TakExtensionConfig.PAYLOAD_TAK_PROTO_V1.equalsIgnoreCase(config.getPayload())
+ ? TakProtobufCodec.withSchemaFormatter(
+ new CotXmlCodec(),
+ config.getMaxPayloadBytes(),
+ config.getProtobufDescriptorBase64(),
+ config.getProtobufMessageName())
+ : new CotXmlCodec();
+ this.streamFramer = new TakStreamFramer(config.getFramingMode(), config.getMaxPayloadBytes());
+ this.connectionManager = new TakConnectionManager(new TakServerConnection(url, Duration.ofSeconds(30)));
+ this.multicastTransport = config.isMulticastEnabled() ? new TakMulticastTransport(config) : null;
+ this.frameReader = new TakFrameReader(streamFramer, config.getReadBufferBytes());
+ this.remoteLinks = ConcurrentHashMap.newKeySet();
+ this.localLinks = ConcurrentHashMap.newKeySet();
+ this.running = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void initialise() throws IOException {
+ if (!mapsManagedTransport) {
+ connectionManager.connect();
+ }
+ if (multicastTransport != null) {
+ multicastTransport.start();
+ }
+ logger.log(ServerLogMessages.TAK_EXTENSION_INITIALISED, url.toString());
+ running.set(true);
+ if (!mapsManagedTransport) {
+ readerThread = new Thread(this::readerLoop, "tak-reader-" + url.getHost() + "-" + url.getPort());
+ readerThread.setDaemon(true);
+ readerThread.start();
+ }
+ if (multicastTransport != null && config.isMulticastIngressEnabled()) {
+ multicastReaderThread = new Thread(this::multicastReaderLoop, "tak-mcast-reader-" + config.getMulticastGroup() + "-" + config.getMulticastPort());
+ multicastReaderThread.setDaemon(true);
+ multicastReaderThread.start();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ running.set(false);
+ Thread thread = readerThread;
+ if (thread != null) {
+ thread.interrupt();
+ try {
+ thread.join(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ Thread multicastThread = multicastReaderThread;
+ if (multicastThread != null) {
+ multicastThread.interrupt();
+ try {
+ multicastThread.join(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (multicastTransport != null) {
+ multicastTransport.close();
+ }
+ if (!mapsManagedTransport) {
+ connectionManager.close();
+ }
+ logger.log(ServerLogMessages.TAK_EXTENSION_CLOSED, url.toString());
+ }
+
+ @Override
+ public String getName() {
+ return "TAK";
+ }
+
+ @Override
+ public String getVersion() {
+ return "1.0";
+ }
+
+ @Override
+ public boolean supportsRemoteFiltering() {
+ return false;
+ }
+
+ @Override
+ public void outbound(String destinationName, Message message) {
+ if (!running.get()) {
+ return;
+ }
+ try {
+ byte[] payload = payloadCodec.encode(message);
+ try {
+ byte[] framed = streamFramer.frame(payload);
+ if (mapsManagedTransport) {
+ endPoint.sendPacket(new Packet(ByteBuffer.wrap(framed)));
+ } else {
+ connectionManager.write(framed);
+ }
+ } catch (IOException ignored) {
+ // Stream path is best-effort; multicast egress may still succeed.
+ }
+ if (multicastTransport != null && config.isMulticastEgressEnabled()) {
+ try {
+ multicastTransport.send(payload);
+ } catch (IOException ignored) {
+ logger.log(ServerLogMessages.TAK_MULTICAST_IO_FAILED, config.getMulticastGroup(), config.getMulticastPort());
+ }
+ }
+ } catch (IOException ignored) {
+ logger.log(ServerLogMessages.TAK_EXTENSION_OUTBOUND_FAILED, destinationName);
+ }
+ }
+
+ @Override
+ public void registerRemoteLink(String destination, @Nullable String filter) {
+ remoteLinks.add(destination);
+ }
+
+ @Override
+ public void registerLocalLink(String destination) {
+ localLinks.add(destination);
+ }
+
+ private void readerLoop() {
+ while (running.get()) {
+ try {
+ List frames = frameReader.read(connectionManager.input());
+ for (byte[] frame : frames) {
+ try {
+ Message message = payloadCodec.decode(frame);
+ String destination = resolveInboundDestination(message);
+ inbound(destination, message);
+ } catch (IOException decodeFailure) {
+ logger.log(ServerLogMessages.TAK_EXTENSION_DECODE_FAILED, "stream");
+ }
+ }
+ } catch (IOException ex) {
+ if (!running.get()) {
+ break;
+ }
+ reconnectWithDelay();
+ }
+ }
+ }
+
+ private void reconnectWithDelay() {
+ long delayMs = config.getReconnectDelayMs();
+ while (running.get()) {
+ try {
+ long sleepFor = applyJitter(delayMs, config.getReconnectJitterMs());
+ logger.log(ServerLogMessages.TAK_EXTENSION_RECONNECT_ATTEMPT, sleepFor, url.toString());
+ Thread.sleep(sleepFor);
+ connectionManager.reconnect();
+ logger.log(ServerLogMessages.TAK_EXTENSION_RECONNECT_SUCCESS, url.toString());
+ return;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ } catch (IOException ignored) {
+ logger.log(ServerLogMessages.TAK_EXTENSION_RECONNECT_FAILED, url.toString());
+ delayMs = nextDelay(delayMs);
+ }
+ }
+ }
+
+ @Override
+ public boolean processPacket(@org.jetbrains.annotations.NotNull Packet packet) throws IOException {
+ if (!mapsManagedTransport || !running.get()) {
+ return false;
+ }
+ ByteBuffer raw = packet.getRawBuffer();
+ if (raw == null || raw.remaining() <= 0) {
+ return false;
+ }
+ List frames = frameReader.read(raw);
+ for (byte[] frame : frames) {
+ try {
+ Message message = payloadCodec.decode(frame);
+ String destination = resolveInboundDestination(message);
+ inbound(destination, message);
+ } catch (IOException decodeFailure) {
+ logger.log(ServerLogMessages.TAK_EXTENSION_DECODE_FAILED, "stream");
+ }
+ }
+ packet.position(packet.limit());
+ return true;
+ }
+
+ private long nextDelay(long currentDelayMs) {
+ long multiplied = (long) Math.ceil(currentDelayMs * config.getReconnectBackoffMultiplier());
+ return Math.min(config.getReconnectMaxDelayMs(), Math.max(config.getReconnectDelayMs(), multiplied));
+ }
+
+ private long applyJitter(long delayMs, int jitterMs) {
+ if (jitterMs <= 0) {
+ return delayMs;
+ }
+ int jitter = ThreadLocalRandom.current().nextInt(jitterMs + 1);
+ return delayMs + jitter;
+ }
+
+ private void multicastReaderLoop() {
+ while (running.get()) {
+ try {
+ Optional frame = multicastTransport.read();
+ if (frame.isEmpty()) {
+ continue;
+ }
+ try {
+ Message message = payloadCodec.decode(frame.get());
+ String destination = resolveInboundDestination(message);
+ inbound(destination, message);
+ } catch (IOException decodeFailure) {
+ logger.log(ServerLogMessages.TAK_EXTENSION_DECODE_FAILED, "multicast");
+ }
+ } catch (IOException ex) {
+ if (!running.get()) {
+ break;
+ }
+ logger.log(ServerLogMessages.TAK_MULTICAST_IO_FAILED, config.getMulticastGroup(), config.getMulticastPort());
+ }
+ }
+ }
+
+ private String resolveInboundDestination(Message message) {
+ String eventType = null;
+ if (message.getMeta() != null) {
+ eventType = message.getMeta().get("tak.type");
+ }
+ if (eventType == null || eventType.isBlank()) {
+ eventType = "unknown";
+ }
+ if (!remoteLinks.isEmpty()) {
+ String remote = remoteLinks.iterator().next();
+ if (remote.contains("#")) {
+ return remote.replace("#", eventType);
+ }
+ return remote;
+ }
+ return "tak/cot/" + eventType;
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java
new file mode 100644
index 000000000..733adc478
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java
@@ -0,0 +1,268 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer;
+
+import java.util.Locale;
+import java.util.Map;
+
+public class TakExtensionConfig {
+
+ public static final String PAYLOAD_COT_XML = "cot_xml";
+ public static final String PAYLOAD_TAK_PROTO_V1 = "tak_proto_v1";
+ private static final int DEFAULT_MAX_PAYLOAD = 1024 * 1024;
+ private static final int DEFAULT_RECONNECT_MS = 2000;
+ private static final int DEFAULT_RECONNECT_MAX_MS = 30000;
+ private static final double DEFAULT_RECONNECT_MULTIPLIER = 2.0d;
+ private static final int DEFAULT_RECONNECT_JITTER_MS = 250;
+ private static final int DEFAULT_READ_BUFFER = 8192;
+ private static final String DEFAULT_MULTICAST_GROUP = "239.2.3.1";
+ private static final int DEFAULT_MULTICAST_PORT = 6969;
+ private static final int DEFAULT_MULTICAST_TTL = 1;
+ private static final String DEFAULT_PROTOBUF_DESCRIPTOR_BASE64 = "";
+ private static final String DEFAULT_PROTOBUF_MESSAGE_NAME = "";
+ private static final boolean DEFAULT_USE_MAPS_TRANSPORT = true;
+
+ private final String payload;
+ private final TakStreamFramer.Mode framingMode;
+ private final int maxPayloadBytes;
+ private final int reconnectDelayMs;
+ private final int reconnectMaxDelayMs;
+ private final double reconnectBackoffMultiplier;
+ private final int reconnectJitterMs;
+ private final int readBufferBytes;
+ private final boolean multicastEnabled;
+ private final boolean multicastIngressEnabled;
+ private final boolean multicastEgressEnabled;
+ private final String multicastGroup;
+ private final int multicastPort;
+ private final String multicastInterface;
+ private final int multicastTtl;
+ private final int multicastReadBufferBytes;
+ private final String protobufDescriptorBase64;
+ private final String protobufMessageName;
+ private final boolean useMapsTransport;
+
+ private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs,
+ int reconnectMaxDelayMs, double reconnectBackoffMultiplier, int reconnectJitterMs, int readBufferBytes,
+ boolean multicastEnabled, boolean multicastIngressEnabled, boolean multicastEgressEnabled,
+ String multicastGroup, int multicastPort, String multicastInterface, int multicastTtl, int multicastReadBufferBytes,
+ String protobufDescriptorBase64, String protobufMessageName, boolean useMapsTransport) {
+ this.payload = payload;
+ this.framingMode = framingMode;
+ this.maxPayloadBytes = maxPayloadBytes;
+ this.reconnectDelayMs = reconnectDelayMs;
+ this.reconnectMaxDelayMs = reconnectMaxDelayMs;
+ this.reconnectBackoffMultiplier = reconnectBackoffMultiplier;
+ this.reconnectJitterMs = reconnectJitterMs;
+ this.readBufferBytes = readBufferBytes;
+ this.multicastEnabled = multicastEnabled;
+ this.multicastIngressEnabled = multicastIngressEnabled;
+ this.multicastEgressEnabled = multicastEgressEnabled;
+ this.multicastGroup = multicastGroup;
+ this.multicastPort = multicastPort;
+ this.multicastInterface = multicastInterface;
+ this.multicastTtl = multicastTtl;
+ this.multicastReadBufferBytes = multicastReadBufferBytes;
+ this.protobufDescriptorBase64 = protobufDescriptorBase64;
+ this.protobufMessageName = protobufMessageName;
+ this.useMapsTransport = useMapsTransport;
+ }
+
+ public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) {
+ Map config = extensionConfig != null ? extensionConfig.getConfig() : null;
+ String payload = asString(config, "payload", PAYLOAD_COT_XML).toLowerCase(Locale.ROOT);
+ String framing = asString(config, "framing", "xml_stream").toLowerCase(Locale.ROOT);
+ TakStreamFramer.Mode mode = "proto_stream".equals(framing) ? TakStreamFramer.Mode.PROTO_STREAM : TakStreamFramer.Mode.XML_STREAM;
+ int maxPayload = asInt(config, "max_payload_bytes", DEFAULT_MAX_PAYLOAD);
+ int reconnectMs = asInt(config, "reconnect_delay_ms", DEFAULT_RECONNECT_MS);
+ int reconnectMaxMs = asInt(config, "reconnect_max_delay_ms", DEFAULT_RECONNECT_MAX_MS);
+ double reconnectMultiplier = asDouble(config, "reconnect_backoff_multiplier", DEFAULT_RECONNECT_MULTIPLIER);
+ int reconnectJitter = asInt(config, "reconnect_jitter_ms", DEFAULT_RECONNECT_JITTER_MS);
+ int readBuffer = asInt(config, "read_buffer_bytes", DEFAULT_READ_BUFFER);
+ boolean multicastEnabled = asBoolean(config, "multicast_enabled", false);
+ boolean multicastIngressEnabled = asBoolean(config, "multicast_ingress_enabled", multicastEnabled);
+ boolean multicastEgressEnabled = asBoolean(config, "multicast_egress_enabled", multicastEnabled);
+ String multicastGroup = asString(config, "multicast_group", DEFAULT_MULTICAST_GROUP).trim();
+ int multicastPort = asInt(config, "multicast_port", DEFAULT_MULTICAST_PORT);
+ String multicastInterface = asString(config, "multicast_interface", "").trim();
+ int multicastTtl = asInt(config, "multicast_ttl", DEFAULT_MULTICAST_TTL);
+ int multicastReadBuffer = asInt(config, "multicast_read_buffer_bytes", DEFAULT_READ_BUFFER);
+ String protobufDescriptorBase64 = asString(config, "protobuf_descriptor_base64", DEFAULT_PROTOBUF_DESCRIPTOR_BASE64).trim();
+ String protobufMessageName = asString(config, "protobuf_message_name", DEFAULT_PROTOBUF_MESSAGE_NAME).trim();
+ boolean useMapsTransport = asBoolean(config, "use_maps_transport", DEFAULT_USE_MAPS_TRANSPORT);
+ int reconnectBase = Math.max(100, reconnectMs);
+ int reconnectMax = Math.max(reconnectBase, reconnectMaxMs);
+ return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), reconnectBase,
+ reconnectMax, clampMultiplier(reconnectMultiplier), Math.max(0, reconnectJitter), Math.max(512, readBuffer),
+ multicastEnabled, multicastIngressEnabled, multicastEgressEnabled,
+ multicastGroup.isEmpty() ? DEFAULT_MULTICAST_GROUP : multicastGroup,
+ Math.max(1, multicastPort),
+ multicastInterface,
+ Math.max(1, Math.min(255, multicastTtl)),
+ Math.max(512, multicastReadBuffer),
+ protobufDescriptorBase64,
+ protobufMessageName,
+ useMapsTransport);
+ }
+
+ public String getPayload() {
+ return payload;
+ }
+
+ public TakStreamFramer.Mode getFramingMode() {
+ return framingMode;
+ }
+
+ public int getMaxPayloadBytes() {
+ return maxPayloadBytes;
+ }
+
+ public int getReconnectDelayMs() {
+ return reconnectDelayMs;
+ }
+
+ public int getReconnectMaxDelayMs() {
+ return reconnectMaxDelayMs;
+ }
+
+ public double getReconnectBackoffMultiplier() {
+ return reconnectBackoffMultiplier;
+ }
+
+ public int getReconnectJitterMs() {
+ return reconnectJitterMs;
+ }
+
+ public int getReadBufferBytes() {
+ return readBufferBytes;
+ }
+
+ public boolean isMulticastEnabled() {
+ return multicastEnabled;
+ }
+
+ public boolean isMulticastIngressEnabled() {
+ return multicastIngressEnabled;
+ }
+
+ public boolean isMulticastEgressEnabled() {
+ return multicastEgressEnabled;
+ }
+
+ public String getMulticastGroup() {
+ return multicastGroup;
+ }
+
+ public int getMulticastPort() {
+ return multicastPort;
+ }
+
+ public String getMulticastInterface() {
+ return multicastInterface;
+ }
+
+ public int getMulticastTtl() {
+ return multicastTtl;
+ }
+
+ public int getMulticastReadBufferBytes() {
+ return multicastReadBufferBytes;
+ }
+
+ public String getProtobufDescriptorBase64() {
+ return protobufDescriptorBase64;
+ }
+
+ public String getProtobufMessageName() {
+ return protobufMessageName;
+ }
+
+ public boolean isUseMapsTransport() {
+ return useMapsTransport;
+ }
+
+ private static String asString(Map config, String key, String def) {
+ if (config == null) {
+ return def;
+ }
+ Object val = config.get(key);
+ return val == null ? def : val.toString();
+ }
+
+ private static int asInt(Map config, String key, int def) {
+ if (config == null) {
+ return def;
+ }
+ Object val = config.get(key);
+ if (val == null) {
+ return def;
+ }
+ if (val instanceof Number number) {
+ return number.intValue();
+ }
+ try {
+ return Integer.parseInt(val.toString());
+ } catch (NumberFormatException ignored) {
+ return def;
+ }
+ }
+
+ private static double asDouble(Map config, String key, double def) {
+ if (config == null) {
+ return def;
+ }
+ Object val = config.get(key);
+ if (val == null) {
+ return def;
+ }
+ if (val instanceof Number number) {
+ return number.doubleValue();
+ }
+ try {
+ return Double.parseDouble(val.toString());
+ } catch (NumberFormatException ignored) {
+ return def;
+ }
+ }
+
+ private static double clampMultiplier(double multiplier) {
+ if (Double.isNaN(multiplier) || Double.isInfinite(multiplier)) {
+ return DEFAULT_RECONNECT_MULTIPLIER;
+ }
+ return Math.max(1.0d, Math.min(10.0d, multiplier));
+ }
+
+ private static boolean asBoolean(Map config, String key, boolean def) {
+ if (config == null) {
+ return def;
+ }
+ Object val = config.get(key);
+ if (val == null) {
+ return def;
+ }
+ if (val instanceof Boolean bool) {
+ return bool;
+ }
+ return Boolean.parseBoolean(val.toString());
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java
new file mode 100644
index 000000000..02d419ae7
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.dto.rest.config.protocol.ProtocolConfigDTO;
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.network.io.EndPoint;
+import io.mapsmessaging.network.io.Packet;
+import io.mapsmessaging.network.protocol.Protocol;
+import io.mapsmessaging.network.protocol.ProtocolImplFactory;
+import io.mapsmessaging.network.protocol.detection.NoOpDetection;
+import io.mapsmessaging.network.protocol.impl.extension.ExtensionProtocol;
+
+import java.io.IOException;
+
+public class TakProtocolFactory extends ProtocolImplFactory {
+
+ public TakProtocolFactory() {
+ super("tak", "TAK Cursor-on-Target extension protocol", new NoOpDetection());
+ }
+
+ @Override
+ public Protocol connect(EndPoint endPoint, String sessionId, String username, String password) throws IOException {
+ ExtensionConfigDTO extensionConfig = null;
+ if (endPoint.getConfig() != null && endPoint.getConfig().getProtocolConfigs() != null) {
+ for (ProtocolConfigDTO protocolConfig : endPoint.getConfig().getProtocolConfigs()) {
+ if (protocolConfig instanceof ExtensionConfigDTO config && getName().equalsIgnoreCase(config.getProtocol())) {
+ extensionConfig = config;
+ break;
+ }
+ }
+ }
+ if (extensionConfig == null) {
+ throw new IOException("TAK extension configuration not found");
+ }
+
+ Protocol protocol = new ExtensionProtocol(endPoint, new TakExtension(endPoint, extensionConfig));
+ protocol.connect(sessionId, username, password);
+ return protocol;
+ }
+
+ @Override
+ public void create(EndPoint endPoint, Packet packet) {
+ // TAK extension does not accept inbound client sockets.
+ }
+
+ @Override
+ public String getTransportType() {
+ return "tak";
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java
new file mode 100644
index 000000000..63803d3e2
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java
@@ -0,0 +1,177 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.codec;
+
+import io.mapsmessaging.api.MessageBuilder;
+import io.mapsmessaging.api.features.QualityOfService;
+import io.mapsmessaging.api.message.Message;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class CotXmlCodec implements TakPayloadCodec {
+
+ @Override
+ public Message decode(byte[] payload) throws IOException {
+ String xml = new String(payload, StandardCharsets.UTF_8);
+ Element event = parseRootEvent(xml);
+ Element point = extractPoint(event);
+
+ String uid = required(event, "uid");
+ String type = required(event, "type");
+ String time = required(event, "time");
+ String start = required(event, "start");
+ String stale = required(event, "stale");
+ String how = required(event, "how");
+ String lat = required(point, "lat");
+ String lon = required(point, "lon");
+
+ Map meta = new LinkedHashMap<>();
+ meta.put("tak.uid", uid);
+ meta.put("tak.type", type);
+ meta.put("tak.time", time);
+ meta.put("tak.start", start);
+ meta.put("tak.stale", stale);
+ meta.put("tak.how", how);
+ meta.put("tak.lat", lat);
+ meta.put("tak.lon", lon);
+ putIfPresent(meta, "tak.hae", point.getAttribute("hae"));
+ putIfPresent(meta, "tak.ce", point.getAttribute("ce"));
+ putIfPresent(meta, "tak.le", point.getAttribute("le"));
+ meta.put("tak.format", "xml");
+ meta.put("tak.transport", "stream");
+
+ return new MessageBuilder()
+ .setOpaqueData(payload)
+ .setMeta(meta)
+ .setContentType("application/cot+xml")
+ .setQoS(QualityOfService.AT_MOST_ONCE)
+ .build();
+ }
+
+ @Override
+ public byte[] encode(Message message) throws IOException {
+ byte[] opaque = message.getOpaqueData();
+ if (opaque != null && opaque.length > 0) {
+ String candidate = new String(opaque, StandardCharsets.UTF_8).trim();
+ if (candidate.startsWith(" meta = message.getMeta() != null ? message.getMeta() : new LinkedHashMap<>();
+ String now = Instant.now().truncatedTo(ChronoUnit.SECONDS).toString();
+ String stale = Instant.now().plus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS).toString();
+
+ String uid = getOrDefault(meta, "tak.uid", "maps-generated");
+ String type = getOrDefault(meta, "tak.type", "a-f-G-U-C");
+ String time = getOrDefault(meta, "tak.time", now);
+ String start = getOrDefault(meta, "tak.start", now);
+ String staleTime = getOrDefault(meta, "tak.stale", stale);
+ String how = getOrDefault(meta, "tak.how", "m-g");
+ String lat = getOrDefault(meta, "tak.lat", "0");
+ String lon = getOrDefault(meta, "tak.lon", "0");
+ String hae = getOrDefault(meta, "tak.hae", "0");
+ String ce = getOrDefault(meta, "tak.ce", "9999999");
+ String le = getOrDefault(meta, "tak.le", "9999999");
+
+ String xml = ""
+ + ""
+ + "";
+ return xml.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static String getOrDefault(Map meta, String key, String def) {
+ String value = meta.get(key);
+ return value == null || value.isBlank() ? def : value;
+ }
+
+ private static void putIfPresent(Map meta, String key, String value) {
+ if (value != null && !value.isBlank()) {
+ meta.put(key, value);
+ }
+ }
+
+ private static Element parseRootEvent(String xml) throws IOException {
+ try {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ dbf.setNamespaceAware(true);
+ dbf.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
+ dbf.setFeature("http://xml.org/sax/features/external-general-entities", false);
+ dbf.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
+ dbf.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
+ dbf.setExpandEntityReferences(false);
+ dbf.setXIncludeAware(false);
+ dbf.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, "");
+ dbf.setAttribute(XMLConstants.ACCESS_EXTERNAL_SCHEMA, "");
+ DocumentBuilder builder = dbf.newDocumentBuilder();
+ Document document = builder.parse(new InputSource(new StringReader(xml)));
+ Element root = document.getDocumentElement();
+ if (root == null || !"event".equalsIgnoreCase(root.getLocalName() == null ? root.getNodeName() : root.getLocalName())) {
+ throw new IOException("Invalid CoT XML payload: root element must be event");
+ }
+ return root;
+ } catch (ParserConfigurationException | SAXException ex) {
+ throw new IOException("Invalid CoT XML payload", ex);
+ }
+ }
+
+ private static Element extractPoint(Element root) throws IOException {
+ NodeList byNs = root.getElementsByTagNameNS("*", "point");
+ if (byNs.getLength() > 0 && byNs.item(0) instanceof Element element) {
+ return element;
+ }
+ NodeList byName = root.getElementsByTagName("point");
+ if (byName.getLength() > 0 && byName.item(0) instanceof Element element) {
+ return element;
+ }
+ throw new IOException("Invalid CoT XML payload: missing point element");
+ }
+
+ private static String required(Element element, String name) throws IOException {
+ String value = element.getAttribute(name);
+ if (value == null || value.isBlank()) {
+ throw new IOException("Invalid CoT XML payload: missing " + name);
+ }
+ return value;
+ }
+
+ private static String esc(String value) {
+ return value
+ .replace("&", "&")
+ .replace("\"", """)
+ .replace("<", "<")
+ .replace(">", ">");
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java
new file mode 100644
index 000000000..0e24b9ca7
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.codec;
+
+import io.mapsmessaging.api.message.Message;
+
+import java.io.IOException;
+
+public interface TakPayloadCodec {
+
+ Message decode(byte[] payload) throws IOException;
+
+ byte[] encode(Message message) throws IOException;
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java
new file mode 100644
index 000000000..683ea49e7
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java
@@ -0,0 +1,195 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.codec;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
+import io.mapsmessaging.api.MessageBuilder;
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.schemas.config.impl.ProtoBufSchemaConfig;
+import io.mapsmessaging.schemas.formatters.MessageFormatter;
+import io.mapsmessaging.schemas.formatters.MessageFormatterFactory;
+import io.mapsmessaging.selector.IdentifierResolver;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class TakProtobufCodec implements TakPayloadCodec {
+
+ private static final int DEFAULT_MAX_PAYLOAD_BYTES = 1024 * 1024;
+ private final CotXmlCodec cotXmlCodec;
+ private final int maxPayloadBytes;
+ private final MessageFormatter protobufFormatter;
+
+ public TakProtobufCodec() {
+ this(new CotXmlCodec(), DEFAULT_MAX_PAYLOAD_BYTES, null);
+ }
+
+ public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes) {
+ this(cotXmlCodec, maxPayloadBytes, null);
+ }
+
+ public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes, MessageFormatter protobufFormatter) {
+ this.cotXmlCodec = cotXmlCodec;
+ this.maxPayloadBytes = Math.max(1024, maxPayloadBytes);
+ this.protobufFormatter = protobufFormatter;
+ }
+
+ public static TakProtobufCodec withSchemaFormatter(CotXmlCodec cotXmlCodec, int maxPayloadBytes,
+ String descriptorBase64, String messageName) {
+ return new TakProtobufCodec(cotXmlCodec, maxPayloadBytes, createFormatter(descriptorBase64, messageName));
+ }
+
+ @Override
+ public Message decode(byte[] payload) throws IOException {
+ if (payload == null || payload.length == 0) {
+ throw new IOException("Invalid TAK protobuf payload: empty");
+ }
+ if (payload.length > maxPayloadBytes) {
+ throw new IOException("Invalid TAK protobuf payload: exceeds max size");
+ }
+
+ Map meta = new LinkedHashMap<>();
+ meta.put("tak.format", "protobuf");
+ meta.put("tak.transport", "stream");
+
+ byte[] embeddedCot = extractEmbeddedCotXml(payload);
+ if (embeddedCot != null) {
+ try {
+ Message cotMessage = cotXmlCodec.decode(embeddedCot);
+ if (cotMessage.getMeta() != null) {
+ meta.putAll(cotMessage.getMeta());
+ }
+ meta.put("tak.format", "protobuf");
+ meta.put("tak.transport", "stream");
+ meta.put("tak.protobuf_embedded_xml", "true");
+ } catch (IOException ignored) {
+ // Keep protobuf metadata only when embedded CoT is malformed.
+ }
+ } else if (protobufFormatter != null) {
+ mergeSchemaParsedMeta(meta, payload);
+ }
+
+ return new MessageBuilder()
+ .setOpaqueData(payload)
+ .setContentType("application/x-protobuf")
+ .setMeta(meta)
+ .build();
+ }
+
+ @Override
+ public byte[] encode(Message message) throws IOException {
+ byte[] opaque = message.getOpaqueData();
+ if (opaque != null && opaque.length > 0 && !looksLikeCotXml(opaque)) {
+ if (opaque.length > maxPayloadBytes) {
+ throw new IOException("TAK protobuf payload exceeds max size");
+ }
+ return opaque;
+ }
+
+ // Fallback mode: wrap CoT XML as protobuf field #1 (length-delimited).
+ // This keeps the transport protobuf-framed for phase-2 interoperability work.
+ byte[] cotXml = (opaque != null && opaque.length > 0) ? opaque : cotXmlCodec.encode(message);
+ if (cotXml.length > maxPayloadBytes) {
+ throw new IOException("TAK protobuf payload exceeds max size");
+ }
+ return wrapCotInLengthDelimitedField(cotXml);
+ }
+
+ private static byte[] wrapCotInLengthDelimitedField(byte[] cotXml) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream(cotXml.length + 8);
+ CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out);
+ codedOutputStream.writeByteArray(1, cotXml);
+ codedOutputStream.flush();
+ return out.toByteArray();
+ }
+
+ private static boolean looksLikeCotXml(byte[] payload) {
+ String candidate = new String(payload, StandardCharsets.UTF_8).trim();
+ return candidate.startsWith(" 0 && looksLikeCotXml(candidate)) {
+ return candidate;
+ }
+ } else if (!input.skipField(tag)) {
+ break;
+ }
+ }
+ return null;
+ }
+
+ private void mergeSchemaParsedMeta(Map meta, byte[] payload) {
+ try {
+ IdentifierResolver parsed = protobufFormatter.parse(payload);
+ if (parsed == null) {
+ return;
+ }
+ promote(parsed, meta, "uid", "tak.uid");
+ promote(parsed, meta, "type", "tak.type");
+ promote(parsed, meta, "time", "tak.time");
+ promote(parsed, meta, "start", "tak.start");
+ promote(parsed, meta, "stale", "tak.stale");
+ promote(parsed, meta, "how", "tak.how");
+ meta.put("tak.protobuf_parsed", "true");
+ } catch (Exception ignored) {
+ // keep base protobuf metadata when schema parsing fails
+ }
+ }
+
+ private static void promote(IdentifierResolver parsed, Map meta, String fromKey, String toKey) {
+ Object val = parsed.get(fromKey);
+ if (val != null) {
+ meta.put(toKey, val.toString());
+ }
+ }
+
+ private static MessageFormatter createFormatter(String descriptorBase64, String messageName) {
+ if (descriptorBase64 == null || descriptorBase64.isBlank() || messageName == null || messageName.isBlank()) {
+ return null;
+ }
+ try {
+ ProtoBufSchemaConfig schemaConfig = new ProtoBufSchemaConfig();
+ ProtoBufSchemaConfig.ProtobufConfig protobufConfig = new ProtoBufSchemaConfig.ProtobufConfig();
+ protobufConfig.setDescriptorValue(Base64.getDecoder().decode(descriptorBase64));
+ protobufConfig.setMessageName(messageName);
+ schemaConfig.setProtobufConfig(protobufConfig);
+ return MessageFormatterFactory.getInstance().getFormatter(schemaConfig);
+ } catch (Exception ignored) {
+ return null;
+ }
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java
new file mode 100644
index 000000000..1fc0cdf8e
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.framing;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TakFrameReader {
+
+ private final TakStreamFramer framer;
+ private final byte[] readBuffer;
+
+ public TakFrameReader(TakStreamFramer framer, int bufferSize) {
+ this.framer = framer;
+ this.readBuffer = new byte[Math.max(bufferSize, 256)];
+ }
+
+ public List read(InputStream inputStream) throws IOException {
+ int read = inputStream.read(readBuffer);
+ if (read < 0) {
+ throw new EOFException("TAK connection closed");
+ }
+ return framer.onBytes(readBuffer, read);
+ }
+
+ public List read(ByteBuffer byteBuffer) throws IOException {
+ if (byteBuffer == null || !byteBuffer.hasRemaining()) {
+ return List.of();
+ }
+ int len = byteBuffer.remaining();
+ byte[] data = new byte[len];
+ byteBuffer.get(data);
+ return framer.onBytes(data, len);
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java
new file mode 100644
index 000000000..d509eecfe
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.framing;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TakStreamFramer {
+
+ public enum Mode {
+ XML_STREAM,
+ PROTO_STREAM
+ }
+
+ private static final byte TAK_PROTO_MAGIC = (byte) 0xBF;
+ private static final byte[] XML_END_TAG = "".getBytes(StandardCharsets.UTF_8);
+
+ private final Mode mode;
+ private final int maxPayloadBytes;
+ private byte[] buffer;
+
+ public TakStreamFramer(Mode mode, int maxPayloadBytes) {
+ this.mode = mode;
+ this.maxPayloadBytes = maxPayloadBytes;
+ this.buffer = new byte[0];
+ }
+
+ public synchronized List onBytes(byte[] incoming, int length) throws IOException {
+ if (incoming == null || length <= 0) {
+ return List.of();
+ }
+ append(incoming, length);
+ if (buffer.length > maxPayloadBytes * 2) {
+ throw new IOException("Buffered TAK stream exceeds safe bound");
+ }
+ return switch (mode) {
+ case XML_STREAM -> parseXml();
+ case PROTO_STREAM -> parseProto();
+ };
+ }
+
+ public synchronized byte[] frame(byte[] payload) throws IOException {
+ if (payload == null) {
+ return new byte[0];
+ }
+ if (payload.length > maxPayloadBytes) {
+ throw new IOException("Payload exceeds max frame size");
+ }
+ if (mode == Mode.XML_STREAM) {
+ return payload;
+ }
+ byte[] varint = encodeVarint(payload.length);
+ byte[] out = new byte[1 + varint.length + payload.length];
+ out[0] = TAK_PROTO_MAGIC;
+ System.arraycopy(varint, 0, out, 1, varint.length);
+ System.arraycopy(payload, 0, out, 1 + varint.length, payload.length);
+ return out;
+ }
+
+ private List parseXml() throws IOException {
+ List frames = new ArrayList<>();
+ while (true) {
+ int endTag = indexOf(buffer, XML_END_TAG, 0);
+ if (endTag < 0) {
+ break;
+ }
+ int frameEnd = endTag + XML_END_TAG.length;
+ while (frameEnd < buffer.length && Character.isWhitespace((char) buffer[frameEnd])) {
+ frameEnd++;
+ }
+ if (frameEnd > maxPayloadBytes) {
+ throw new IOException("TAK XML frame exceeds max frame size");
+ }
+ frames.add(Arrays.copyOfRange(buffer, 0, frameEnd));
+ buffer = Arrays.copyOfRange(buffer, frameEnd, buffer.length);
+ }
+ return frames;
+ }
+
+ private List parseProto() throws IOException {
+ List frames = new ArrayList<>();
+ while (buffer.length > 0) {
+ if (buffer[0] != TAK_PROTO_MAGIC) {
+ throw new IOException("Invalid TAK protobuf stream header");
+ }
+ if (buffer.length < 2) {
+ break;
+ }
+ VarintResult varintResult = decodeVarint(buffer, 1);
+ if (varintResult == null) {
+ break;
+ }
+ int length = varintResult.value;
+ if (length < 0 || length > maxPayloadBytes) {
+ throw new IOException("Invalid TAK protobuf payload length: " + length);
+ }
+ int frameLength = 1 + varintResult.bytesRead + length;
+ if (buffer.length < frameLength) {
+ break;
+ }
+ int payloadOffset = 1 + varintResult.bytesRead;
+ frames.add(Arrays.copyOfRange(buffer, payloadOffset, payloadOffset + length));
+ buffer = Arrays.copyOfRange(buffer, frameLength, buffer.length);
+ }
+ return frames;
+ }
+
+ private void append(byte[] data, int length) {
+ int copyLength = Math.min(length, data.length);
+ byte[] next = Arrays.copyOf(buffer, buffer.length + copyLength);
+ System.arraycopy(data, 0, next, buffer.length, copyLength);
+ buffer = next;
+ }
+
+ private static byte[] encodeVarint(int value) {
+ byte[] tmp = new byte[5];
+ int count = 0;
+ int v = value;
+ while ((v & ~0x7F) != 0) {
+ tmp[count++] = (byte) ((v & 0x7F) | 0x80);
+ v >>>= 7;
+ }
+ tmp[count++] = (byte) (v & 0x7F);
+ return Arrays.copyOf(tmp, count);
+ }
+
+ private static VarintResult decodeVarint(byte[] data, int offset) throws IOException {
+ int value = 0;
+ int shift = 0;
+ int bytes = 0;
+ for (int i = offset; i < data.length && bytes < 5; i++) {
+ int b = data[i] & 0xFF;
+ value |= (b & 0x7F) << shift;
+ bytes++;
+ if ((b & 0x80) == 0) {
+ return new VarintResult(value, bytes);
+ }
+ shift += 7;
+ }
+ if (bytes >= 5 && (data[offset + 4] & 0x80) != 0) {
+ throw new IOException("Malformed varint length");
+ }
+ return null;
+ }
+
+ private static int indexOf(byte[] haystack, byte[] needle, int from) {
+ if (needle.length == 0 || haystack.length < needle.length) {
+ return -1;
+ }
+ for (int i = from; i <= haystack.length - needle.length; i++) {
+ boolean match = true;
+ for (int j = 0; j < needle.length; j++) {
+ if (haystack[i + j] != needle[j]) {
+ match = false;
+ break;
+ }
+ }
+ if (match) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private record VarintResult(int value, int bytesRead) {}
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakConnectionManager.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakConnectionManager.java
new file mode 100644
index 000000000..bc131c96e
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakConnectionManager.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.transport;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TakConnectionManager {
+
+ private final TakServerConnection connection;
+ private final ReentrantLock writeLock;
+
+ public TakConnectionManager(TakServerConnection connection) {
+ this.connection = connection;
+ this.writeLock = new ReentrantLock();
+ }
+
+ public void connect() throws IOException {
+ connection.connect();
+ }
+
+ public void reconnect() throws IOException {
+ connection.close();
+ connection.connect();
+ }
+
+ public boolean isConnected() {
+ return connection.isConnected();
+ }
+
+ public InputStream input() throws IOException {
+ return connection.getInputStream();
+ }
+
+ public void write(byte[] data) throws IOException {
+ writeLock.lock();
+ try {
+ connection.write(data);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public void close() throws IOException {
+ connection.close();
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java
new file mode 100644
index 000000000..31cd4d76d
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.transport;
+
+import io.mapsmessaging.network.protocol.impl.tak.TakExtensionConfig;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.util.Optional;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TakMulticastTransport {
+
+ private final TakExtensionConfig config;
+ private final ReentrantLock sendLock;
+
+ private MulticastSocket socket;
+ private InetAddress groupAddress;
+
+ public TakMulticastTransport(TakExtensionConfig config) {
+ this.config = config;
+ this.sendLock = new ReentrantLock();
+ }
+
+ public synchronized void start() throws IOException {
+ close();
+ groupAddress = InetAddress.getByName(config.getMulticastGroup());
+ socket = new MulticastSocket(config.getMulticastPort());
+ socket.setReuseAddress(true);
+ socket.setSoTimeout(1000);
+ socket.setTimeToLive(config.getMulticastTtl());
+
+ NetworkInterface networkInterface = resolveInterface();
+ if (networkInterface != null) {
+ socket.setNetworkInterface(networkInterface);
+ socket.joinGroup(new InetSocketAddress(groupAddress, config.getMulticastPort()), networkInterface);
+ } else {
+ socket.joinGroup(groupAddress);
+ }
+ }
+
+ public Optional read() throws IOException {
+ MulticastSocket current = getSocket();
+ byte[] buffer = new byte[config.getMulticastReadBufferBytes()];
+ DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+ try {
+ current.receive(packet);
+ if (packet.getLength() > config.getMaxPayloadBytes()) {
+ return Optional.empty();
+ }
+ byte[] copy = new byte[packet.getLength()];
+ System.arraycopy(packet.getData(), packet.getOffset(), copy, 0, packet.getLength());
+ return Optional.of(copy);
+ } catch (SocketTimeoutException ignored) {
+ return Optional.empty();
+ }
+ }
+
+ public void send(byte[] payload) throws IOException {
+ MulticastSocket current = getSocket();
+ DatagramPacket packet = new DatagramPacket(payload, payload.length, groupAddress, config.getMulticastPort());
+ sendLock.lock();
+ try {
+ current.send(packet);
+ } finally {
+ sendLock.unlock();
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (socket == null) {
+ return;
+ }
+ IOException deferred = null;
+ try {
+ NetworkInterface networkInterface = resolveInterface();
+ if (networkInterface != null && groupAddress != null) {
+ socket.leaveGroup(new InetSocketAddress(groupAddress, config.getMulticastPort()), networkInterface);
+ } else if (groupAddress != null) {
+ socket.leaveGroup(groupAddress);
+ }
+ } catch (IOException ex) {
+ deferred = ex;
+ }
+ socket.close();
+ socket = null;
+ groupAddress = null;
+ if (deferred != null) {
+ throw deferred;
+ }
+ }
+
+ private synchronized MulticastSocket getSocket() throws IOException {
+ if (socket == null || socket.isClosed()) {
+ throw new IOException("TAK multicast transport is not started");
+ }
+ return socket;
+ }
+
+ private NetworkInterface resolveInterface() throws IOException {
+ String iface = config.getMulticastInterface();
+ if (iface == null || iface.isBlank()) {
+ return null;
+ }
+ NetworkInterface named = NetworkInterface.getByName(iface);
+ if (named != null) {
+ return named;
+ }
+ InetAddress address = InetAddress.getByName(iface);
+ return NetworkInterface.getByInetAddress(address);
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java
new file mode 100644
index 000000000..4860087e2
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.transport;
+
+import io.mapsmessaging.network.EndPointURL;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.time.Duration;
+
+public class TakServerConnection {
+
+ private final EndPointURL url;
+ private final int timeoutMs;
+
+ private Socket socket;
+ private InputStream inputStream;
+ private OutputStream outputStream;
+
+ public TakServerConnection(EndPointURL url, Duration timeout) {
+ this.url = url;
+ this.timeoutMs = (int) Math.max(1000, timeout.toMillis());
+ }
+
+ public synchronized void connect() throws IOException {
+ close();
+ SocketFactory socketFactory = isSecure() ? SSLSocketFactory.getDefault() : SocketFactory.getDefault();
+ socket = socketFactory.createSocket();
+ socket.connect(new InetSocketAddress(url.getHost(), url.getPort()), timeoutMs);
+ socket.setKeepAlive(true);
+ socket.setTcpNoDelay(true);
+ socket.setSoTimeout(timeoutMs);
+ if (isSecure() && socket instanceof SSLSocket sslSocket) {
+ try {
+ sslSocket.startHandshake();
+ } catch (SSLException sslException) {
+ throw new IOException("TAK TLS handshake failed. Verify trustStore/keyStore and server certificate validity", sslException);
+ }
+ }
+ inputStream = socket.getInputStream();
+ outputStream = socket.getOutputStream();
+ }
+
+ public synchronized InputStream getInputStream() throws IOException {
+ if (!isConnected()) {
+ throw new IOException("TAK connection is not connected");
+ }
+ return inputStream;
+ }
+
+ public synchronized void write(byte[] data) throws IOException {
+ if (!isConnected()) {
+ throw new IOException("TAK connection is not connected");
+ }
+ outputStream.write(data);
+ outputStream.flush();
+ }
+
+ public synchronized boolean isConnected() {
+ return socket != null && socket.isConnected() && !socket.isClosed();
+ }
+
+ public synchronized void close() throws IOException {
+ IOException deferred = null;
+ if (inputStream != null) {
+ try {
+ inputStream.close();
+ } catch (IOException ex) {
+ deferred = ex;
+ }
+ inputStream = null;
+ }
+ if (outputStream != null) {
+ try {
+ outputStream.close();
+ } catch (IOException ex) {
+ deferred = ex;
+ }
+ outputStream = null;
+ }
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (IOException ex) {
+ deferred = ex;
+ }
+ socket = null;
+ }
+ if (deferred != null) {
+ throw deferred;
+ }
+ }
+
+ private boolean isSecure() {
+ String protocol = url.getProtocol();
+ return "ssl".equalsIgnoreCase(protocol) || "wss".equalsIgnoreCase(protocol);
+ }
+}
diff --git a/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory b/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory
index 6d4a40add..2cf75a557 100644
--- a/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory
+++ b/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory
@@ -21,4 +21,5 @@ io.mapsmessaging.network.io.impl.tcp.TCPEndPointConnectionFactory
io.mapsmessaging.network.io.impl.ssl.SSLEndPointConnectionFactory
io.mapsmessaging.network.io.impl.noop.NoOpEndPointConnectionFactory
io.mapsmessaging.network.io.impl.udp.UDPEndPointConnectionFactory
-io.mapsmessaging.network.io.impl.serial.SerialEndPointConnectionFactory
\ No newline at end of file
+io.mapsmessaging.network.io.impl.serial.SerialEndPointConnectionFactory
+io.mapsmessaging.network.protocol.impl.tak.TakEndPointConnectionFactory
diff --git a/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory b/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory
index 7af47b746..46d7b5a9f 100644
--- a/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory
+++ b/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory
@@ -75,4 +75,5 @@ io.mapsmessaging.network.protocol.impl.apache_pulsar.PulsarProtocolFactory
io.mapsmessaging.network.protocol.impl.ibm_mq.MqProtocolFactory
io.mapsmessaging.network.protocol.impl.aws_sns.SnsProtocolFactory
io.mapsmessaging.network.protocol.impl.v2x_step.V2xStepProtocolFactory
-io.mapsmessaging.network.protocol.impl.extension.impl.example.ExampleProtocolFactory
\ No newline at end of file
+io.mapsmessaging.network.protocol.impl.extension.impl.example.ExampleProtocolFactory
+io.mapsmessaging.network.protocol.impl.tak.TakProtocolFactory
diff --git a/src/main/resources/NetworkConnectionManager.yaml b/src/main/resources/NetworkConnectionManager.yaml
index 942151398..61397a68b 100644
--- a/src/main/resources/NetworkConnectionManager.yaml
+++ b/src/main/resources/NetworkConnectionManager.yaml
@@ -66,4 +66,29 @@ NetworkConnectionManager:
-
direction: pull
local_namespace: "/"
- remote_namespace: "/#"
\ No newline at end of file
+ remote_namespace: "/#"
+
+ # Example TAK bridge (CoT XML stream over TLS)
+ # - name: "TAK Server Connection"
+ # url: ssl://tak.example.local:8089/
+ # protocol: tak
+ # plugin: true
+ #
+ # config:
+ # payload: cot_xml
+ # framing: xml_stream
+ # max_payload_bytes: 1048576
+ # reconnect_delay_ms: 2000
+ #
+ # links:
+ # - direction: pull
+ # remote_namespace: "tak/cot/#"
+ # local_namespace: "tak/cot/"
+ # selector: ""
+ # qos: 0
+ #
+ # - direction: push
+ # remote_namespace: "tak/cot/out/#"
+ # local_namespace: "ops/cot/out/"
+ # selector: "tak.type LIKE 'a-%'"
+ # qos: 0
diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java
new file mode 100644
index 000000000..d2d2b6094
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java
@@ -0,0 +1,142 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.api.MessageBuilder;
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.dto.rest.config.network.EndPointServerConfigDTO;
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.network.io.EndPoint;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TakExtensionReconnectIntegrationTest {
+
+ @Test
+ void reconnectsAfterRemoteDropAndResumesOutbound() throws Exception {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ int port = serverSocket.getLocalPort();
+ CountDownLatch firstMessageSeen = new CountDownLatch(1);
+ CountDownLatch secondMessageSeen = new CountDownLatch(1);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ Future> server = executor.submit(() -> {
+ try {
+ try (Socket first = serverSocket.accept()) {
+ first.setSoTimeout(2000);
+ String firstPayload = readPayload(first);
+ assertTrue(firstPayload.contains(" config = new LinkedHashMap<>();
+ config.put("payload", "cot_xml");
+ config.put("framing", "xml_stream");
+ config.put("reconnect_delay_ms", 100);
+ config.put("reconnect_max_delay_ms", 400);
+ config.put("reconnect_backoff_multiplier", 1.5d);
+ config.put("reconnect_jitter_ms", 0);
+ config.put("use_maps_transport", false);
+ setField(dto, "config", config);
+ return dto;
+ }
+
+ private static Message cotMessage(String uid) {
+ String cot = """
+
+
+
+ """.formatted(uid);
+ return new MessageBuilder().setOpaqueData(cot.getBytes(StandardCharsets.UTF_8)).build();
+ }
+
+ private static String readPayload(Socket socket) throws IOException {
+ byte[] buffer = new byte[4096];
+ int read = socket.getInputStream().read(buffer);
+ if (read <= 0) {
+ throw new IOException("No payload received");
+ }
+ return new String(buffer, 0, read, StandardCharsets.UTF_8);
+ }
+
+ private static void setField(Object target, String fieldName, Object value) throws Exception {
+ var field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}
diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java
new file mode 100644
index 000000000..c854f2004
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TakExtensionTest {
+
+ @Test
+ void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception {
+ ExtensionConfigDTO dto = new ExtensionConfigDTO();
+ Map config = new LinkedHashMap<>();
+ config.put("payload", "tak_proto_v1");
+ config.put("framing", "proto_stream");
+ config.put("max_payload_bytes", 2048);
+ config.put("reconnect_delay_ms", 1500);
+ config.put("reconnect_max_delay_ms", 6000);
+ config.put("reconnect_backoff_multiplier", 1.5);
+ config.put("reconnect_jitter_ms", 100);
+ config.put("read_buffer_bytes", 4096);
+ config.put("multicast_enabled", true);
+ config.put("multicast_ingress_enabled", true);
+ config.put("multicast_egress_enabled", false);
+ config.put("multicast_group", "239.88.77.66");
+ config.put("multicast_port", 7171);
+ config.put("multicast_interface", "lo0");
+ config.put("multicast_ttl", 3);
+ config.put("multicast_read_buffer_bytes", 2048);
+ config.put("protobuf_descriptor_base64", "AQID");
+ config.put("protobuf_message_name", "TakMessage");
+ config.put("use_maps_transport", false);
+ setField(dto, "config", config);
+
+ TakExtensionConfig parsed = TakExtensionConfig.from(dto);
+
+ assertEquals("tak_proto_v1", parsed.getPayload());
+ assertEquals(TakStreamFramer.Mode.PROTO_STREAM, parsed.getFramingMode());
+ assertEquals(2048, parsed.getMaxPayloadBytes());
+ assertEquals(1500, parsed.getReconnectDelayMs());
+ assertEquals(6000, parsed.getReconnectMaxDelayMs());
+ assertEquals(1.5d, parsed.getReconnectBackoffMultiplier());
+ assertEquals(100, parsed.getReconnectJitterMs());
+ assertEquals(4096, parsed.getReadBufferBytes());
+ assertTrue(parsed.isMulticastEnabled());
+ assertTrue(parsed.isMulticastIngressEnabled());
+ assertFalse(parsed.isMulticastEgressEnabled());
+ assertEquals("239.88.77.66", parsed.getMulticastGroup());
+ assertEquals(7171, parsed.getMulticastPort());
+ assertEquals("lo0", parsed.getMulticastInterface());
+ assertEquals(3, parsed.getMulticastTtl());
+ assertEquals(2048, parsed.getMulticastReadBufferBytes());
+ assertEquals("AQID", parsed.getProtobufDescriptorBase64());
+ assertEquals("TakMessage", parsed.getProtobufMessageName());
+ assertFalse(parsed.isUseMapsTransport());
+ }
+
+ @Test
+ void appliesSafeDefaultsWhenMissing() {
+ TakExtensionConfig parsed = TakExtensionConfig.from(null);
+ assertEquals("cot_xml", parsed.getPayload());
+ assertEquals(TakStreamFramer.Mode.XML_STREAM, parsed.getFramingMode());
+ assertEquals(1024 * 1024, parsed.getMaxPayloadBytes());
+ assertEquals(2000, parsed.getReconnectDelayMs());
+ assertEquals(30000, parsed.getReconnectMaxDelayMs());
+ assertEquals(2.0d, parsed.getReconnectBackoffMultiplier());
+ assertEquals(250, parsed.getReconnectJitterMs());
+ assertFalse(parsed.isMulticastEnabled());
+ assertEquals("239.2.3.1", parsed.getMulticastGroup());
+ assertEquals(6969, parsed.getMulticastPort());
+ assertEquals("", parsed.getProtobufDescriptorBase64());
+ assertEquals("", parsed.getProtobufMessageName());
+ assertTrue(parsed.isUseMapsTransport());
+ }
+
+ @Test
+ void clampsReconnectHardeningValues() throws Exception {
+ ExtensionConfigDTO dto = new ExtensionConfigDTO();
+ Map config = new LinkedHashMap<>();
+ config.put("reconnect_delay_ms", 50);
+ config.put("reconnect_max_delay_ms", 10);
+ config.put("reconnect_backoff_multiplier", 0.1d);
+ config.put("reconnect_jitter_ms", -1);
+ setField(dto, "config", config);
+
+ TakExtensionConfig parsed = TakExtensionConfig.from(dto);
+
+ assertEquals(100, parsed.getReconnectDelayMs());
+ assertEquals(100, parsed.getReconnectMaxDelayMs());
+ assertEquals(1.0d, parsed.getReconnectBackoffMultiplier());
+ assertEquals(0, parsed.getReconnectJitterMs());
+ }
+
+ private static void setField(Object target, String fieldName, Object value) throws Exception {
+ Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}
diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java
new file mode 100644
index 000000000..5da3631b6
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.codec;
+
+import io.mapsmessaging.api.MessageBuilder;
+import io.mapsmessaging.api.message.Message;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class CotXmlCodecTest {
+
+ private static final String VALID_COT = """
+
+
+
+ """;
+
+ @Test
+ void decodeValidCotExtractsCoreFields() throws IOException {
+ CotXmlCodec codec = new CotXmlCodec();
+
+ Message message = codec.decode(VALID_COT.getBytes(StandardCharsets.UTF_8));
+
+ assertNotNull(message.getMeta());
+ assertEquals("u-1", message.getMeta().get("tak.uid"));
+ assertEquals("a-f-G-U-C", message.getMeta().get("tak.type"));
+ assertEquals("51.5007", message.getMeta().get("tak.lat"));
+ assertEquals("-0.1246", message.getMeta().get("tak.lon"));
+ assertEquals("application/cot+xml", message.getContentType());
+ assertNotNull(message.getOpaqueData());
+ }
+
+ @Test
+ void decodeMissingRequiredFieldFails() {
+ CotXmlCodec codec = new CotXmlCodec();
+ String invalid = """
+
+
+
+ """;
+
+ assertThrows(IOException.class, () -> codec.decode(invalid.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ @Test
+ void decodeMalformedXmlFails() {
+ CotXmlCodec codec = new CotXmlCodec();
+ String invalid = " codec.decode(invalid.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ @Test
+ void decodeRejectsXxe() {
+ CotXmlCodec codec = new CotXmlCodec();
+ String xxe = """
+ ]>
+
+
+ &xxe;
+
+ """;
+ assertThrows(IOException.class, () -> codec.decode(xxe.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ @Test
+ void encodeFromMetaBuildsCotXml() throws IOException {
+ CotXmlCodec codec = new CotXmlCodec();
+ Map meta = new LinkedHashMap<>();
+ meta.put("tak.uid", "u-2");
+ meta.put("tak.type", "a-f-G-U-C");
+ meta.put("tak.time", "2026-02-19T09:10:00Z");
+ meta.put("tak.start", "2026-02-19T09:10:00Z");
+ meta.put("tak.stale", "2026-02-19T09:15:00Z");
+ meta.put("tak.how", "m-g");
+ meta.put("tak.lat", "1");
+ meta.put("tak.lon", "2");
+ meta.put("tak.hae", "3");
+ meta.put("tak.ce", "4");
+ meta.put("tak.le", "5");
+
+ Message message = new MessageBuilder().setMeta(meta).build();
+ byte[] xml = codec.encode(message);
+ String text = new String(xml, StandardCharsets.UTF_8);
+
+ assertTrue(text.startsWith("
+
+
+ """;
+ Message message = new MessageBuilder().setOpaqueData(cot.getBytes(StandardCharsets.UTF_8)).build();
+
+ byte[] encoded = codec.encode(message);
+ Message decoded = codec.decode(encoded);
+
+ assertEquals("application/x-protobuf", decoded.getContentType());
+ assertEquals("protobuf", decoded.getMeta().get("tak.format"));
+ assertEquals("u-100", decoded.getMeta().get("tak.uid"));
+ assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type"));
+ assertNotNull(decoded.getOpaqueData());
+ assertTrue(decoded.getOpaqueData().length > 0);
+ }
+
+ @Test
+ void passesThroughBinaryProtobufPayload() throws IOException {
+ TakProtobufCodec codec = new TakProtobufCodec();
+ byte[] raw = new byte[]{0x08, 0x7F, 0x10, 0x01};
+ Message message = new MessageBuilder().setOpaqueData(raw).build();
+
+ byte[] encoded = codec.encode(message);
+ Message decoded = codec.decode(encoded);
+
+ assertArrayEquals(raw, encoded);
+ assertArrayEquals(raw, decoded.getOpaqueData());
+ assertEquals("protobuf", decoded.getMeta().get("tak.format"));
+ }
+
+ @Test
+ void buildsCotFromMetaWhenPayloadMissing() throws IOException {
+ TakProtobufCodec codec = new TakProtobufCodec();
+ Map meta = new LinkedHashMap<>();
+ meta.put("tak.uid", "u-200");
+ meta.put("tak.type", "a-f-G-U-C");
+ meta.put("tak.time", "2026-02-19T09:10:00Z");
+ meta.put("tak.start", "2026-02-19T09:10:00Z");
+ meta.put("tak.stale", "2026-02-19T09:15:00Z");
+ meta.put("tak.how", "m-g");
+ meta.put("tak.lat", "1");
+ meta.put("tak.lon", "2");
+ Message message = new MessageBuilder().setMeta(meta).build();
+
+ byte[] encoded = codec.encode(message);
+ Message decoded = codec.decode(encoded);
+
+ assertEquals("u-200", decoded.getMeta().get("tak.uid"));
+ assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type"));
+ }
+
+ @Test
+ void decodesUsingMapsProtobufFormatterWhenConfigured() throws IOException {
+ ProtoBufSchemaConfig schema = (ProtoBufSchemaConfig) TestProtobufSchemas.protobufSchemaConfig(UUID.randomUUID());
+ String descriptorBase64 = Base64.getEncoder().encodeToString(schema.getProtobufConfig().getDescriptorValue());
+ String messageName = schema.getProtobufConfig().getMessageName();
+ TakProtobufCodec codec = TakProtobufCodec.withSchemaFormatter(new CotXmlCodec(), 1024 * 1024, descriptorBase64, messageName);
+
+ byte[] payload = TestProtobufSchemas.encodeProtobufPayload();
+ Message decoded = codec.decode(payload);
+
+ assertEquals("true", decoded.getMeta().get("tak.protobuf_parsed"));
+ }
+}
diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java
new file mode 100644
index 000000000..17bd80ec1
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.framing;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TakStreamFramerTest {
+
+ @Test
+ void protoFrameRoundTrip() throws IOException {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024 * 1024);
+ byte[] payload = "hello".getBytes(StandardCharsets.UTF_8);
+
+ byte[] framed = framer.frame(payload);
+ List decoded = framer.onBytes(framed, framed.length);
+
+ assertEquals(1, decoded.size());
+ assertArrayEquals(payload, decoded.getFirst());
+ }
+
+ @Test
+ void protoPartialReadReassemblesFrames() throws IOException {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024 * 1024);
+ byte[] payload = "abc123".getBytes(StandardCharsets.UTF_8);
+ byte[] framed = framer.frame(payload);
+
+ List first = framer.onBytes(framed, 2);
+ assertTrue(first.isEmpty());
+ List second = framer.onBytes(slice(framed, 2), framed.length - 2);
+
+ assertEquals(1, second.size());
+ assertArrayEquals(payload, second.getFirst());
+ }
+
+ @Test
+ void protoRejectsOversizedFrame() {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 8);
+ byte[] fake = new byte[]{(byte) 0xBF, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+ assertThrows(IOException.class, () -> framer.onBytes(fake, fake.length));
+ }
+
+ @Test
+ void protoRejectsInvalidMagic() {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024);
+ byte[] invalid = new byte[]{0x01, 0x01, 0x7F};
+ assertThrows(IOException.class, () -> framer.onBytes(invalid, invalid.length));
+ }
+
+ private static byte[] slice(byte[] source, int from) {
+ byte[] out = new byte[source.length - from];
+ System.arraycopy(source, from, out, 0, out.length);
+ return out;
+ }
+}
diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java
new file mode 100644
index 000000000..f221e518c
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.framing;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TakStreamFramerXmlTest {
+
+ @Test
+ void xmlDelimitsMultipleEvents() throws IOException {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.XML_STREAM, 1024 * 1024);
+ String payload = "\n\n";
+
+ List frames = framer.onBytes(payload.getBytes(StandardCharsets.UTF_8), payload.length());
+
+ assertEquals(2, frames.size());
+ assertTrue(new String(frames.get(0), StandardCharsets.UTF_8).contains("uid=\"1\""));
+ assertTrue(new String(frames.get(1), StandardCharsets.UTF_8).contains("uid=\"2\""));
+ }
+
+ @Test
+ void xmlPartialEventBuffersUntilComplete() throws IOException {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.XML_STREAM, 1024 * 1024);
+ byte[] bytes = "".getBytes(StandardCharsets.UTF_8);
+
+ List first = framer.onBytes(bytes, 10);
+ assertTrue(first.isEmpty());
+
+ byte[] rest = new byte[bytes.length - 10];
+ System.arraycopy(bytes, 10, rest, 0, rest.length);
+ List second = framer.onBytes(rest, rest.length);
+ assertEquals(1, second.size());
+ }
+}
diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnectionIntegrationTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnectionIntegrationTest.java
new file mode 100644
index 000000000..c8f0188a3
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnectionIntegrationTest.java
@@ -0,0 +1,164 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.transport;
+
+import io.mapsmessaging.network.EndPointURL;
+import org.junit.jupiter.api.Test;
+
+import javax.net.ssl.SSLServerSocketFactory;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TakServerConnectionIntegrationTest {
+
+ @Test
+ void tcpConnectionRoundTrip() throws Exception {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ int port = serverSocket.getLocalPort();
+ byte[] inbound = "ping".getBytes();
+ byte[] outbound = "pong".getBytes();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ Future> server = executor.submit(() -> {
+ try (Socket accepted = serverSocket.accept()) {
+ byte[] buffer = accepted.getInputStream().readNBytes(inbound.length);
+ assertArrayEquals(inbound, buffer);
+ accepted.getOutputStream().write(outbound);
+ accepted.getOutputStream().flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ TakServerConnection connection = new TakServerConnection(new EndPointURL("tcp://127.0.0.1:" + port), Duration.ofSeconds(2));
+ connection.connect();
+ connection.write(inbound);
+ byte[] response = connection.getInputStream().readNBytes(outbound.length);
+ connection.close();
+
+ assertArrayEquals(outbound, response);
+ server.get(3, TimeUnit.SECONDS);
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ void reconnectsAfterDisconnect() throws Exception {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ int port = serverSocket.getLocalPort();
+ CountDownLatch firstAccepted = new CountDownLatch(1);
+ CountDownLatch secondAccepted = new CountDownLatch(1);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ Future> server = executor.submit(() -> {
+ try {
+ try (Socket first = serverSocket.accept()) {
+ firstAccepted.countDown();
+ byte[] firstPayload = first.getInputStream().readNBytes(3);
+ assertArrayEquals("one".getBytes(), firstPayload);
+ }
+ try (Socket second = serverSocket.accept()) {
+ secondAccepted.countDown();
+ byte[] secondPayload = second.getInputStream().readNBytes(3);
+ assertArrayEquals("two".getBytes(), secondPayload);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ TakConnectionManager manager = new TakConnectionManager(new TakServerConnection(new EndPointURL("tcp://127.0.0.1:" + port), Duration.ofSeconds(2)));
+ manager.connect();
+ manager.write("one".getBytes());
+ assertTrue(firstAccepted.await(2, TimeUnit.SECONDS));
+
+ manager.reconnect();
+ manager.write("two".getBytes());
+ assertTrue(secondAccepted.await(2, TimeUnit.SECONDS));
+
+ manager.close();
+ server.get(3, TimeUnit.SECONDS);
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ void tlsHandshakeFailureProvidesActionableMessage() throws Exception {
+ try (ServerSocket plainServer = new ServerSocket(0)) {
+ int port = plainServer.getLocalPort();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ Future> server = executor.submit(() -> {
+ try (Socket ignored = plainServer.accept()) {
+ // Non-TLS endpoint used intentionally to force TLS handshake failure.
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ TakServerConnection connection = new TakServerConnection(new EndPointURL("ssl://127.0.0.1:" + port), Duration.ofSeconds(2));
+ IOException exception = assertThrows(IOException.class, connection::connect);
+ assertTrue(exception.getMessage().contains("TLS handshake failed"));
+ server.get(3, TimeUnit.SECONDS);
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ void tlsRoundTripWithDefaultTestKeystore() throws Exception {
+ try (var serverSocket = SSLServerSocketFactory.getDefault().createServerSocket(0)) {
+ int port = serverSocket.getLocalPort();
+ byte[] inbound = "hello".getBytes();
+ byte[] outbound = "world".getBytes();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ Future> server = executor.submit(() -> {
+ try (Socket accepted = serverSocket.accept()) {
+ byte[] buffer = accepted.getInputStream().readNBytes(inbound.length);
+ assertArrayEquals(inbound, buffer);
+ accepted.getOutputStream().write(outbound);
+ accepted.getOutputStream().flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ TakServerConnection connection = new TakServerConnection(new EndPointURL("ssl://127.0.0.1:" + port), Duration.ofSeconds(2));
+ connection.connect();
+ connection.write(inbound);
+ byte[] response = connection.getInputStream().readNBytes(outbound.length);
+ connection.close();
+
+ assertArrayEquals(outbound, response);
+ server.get(3, TimeUnit.SECONDS);
+ executor.shutdownNow();
+ }
+ }
+}