diff --git a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java
index 4231631b4..3d89bec23 100644
--- a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java
+++ b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java
@@ -244,6 +244,17 @@ public enum ServerLogMessages implements LogMessage {
//
+ //
+ TAK_EXTENSION_INITIALISED(LEVEL.INFO, SERVER_CATEGORY.PROTOCOL, "TAK extension initialised for {}"),
+ TAK_EXTENSION_CLOSED(LEVEL.INFO, SERVER_CATEGORY.PROTOCOL, "TAK extension closed for {}"),
+ TAK_EXTENSION_OUTBOUND_FAILED(LEVEL.WARN, SERVER_CATEGORY.PROTOCOL, "TAK outbound failed for destination {}"),
+ TAK_EXTENSION_DECODE_FAILED(LEVEL.WARN, SERVER_CATEGORY.PROTOCOL, "TAK decode failed for transport {}"),
+ TAK_EXTENSION_RECONNECT_ATTEMPT(LEVEL.INFO, SERVER_CATEGORY.NETWORK, "TAK reconnect attempt in {}ms for {}"),
+ TAK_EXTENSION_RECONNECT_SUCCESS(LEVEL.INFO, SERVER_CATEGORY.NETWORK, "TAK reconnect succeeded for {}"),
+ TAK_EXTENSION_RECONNECT_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "TAK reconnect failed for {}"),
+ TAK_MULTICAST_IO_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "TAK multicast IO failed for {}:{}"),
+ //
+
//
PACKET_SECURITY_VERIFICATION_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "Packet integrity verification failed: ip={}, algorithm={}, reason={}, packetLength={}, signatureSize={}"),
PACKET_SECURITY_NOT_INITIALISED(LEVEL.ERROR, SERVER_CATEGORY.NETWORK, "Packet integrity not initialised: algorithm={}"),
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java
index c339a94fb..92ad6e903 100644
--- a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java
@@ -20,6 +20,7 @@
package io.mapsmessaging.network.protocol.impl.extension;
import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.network.io.Packet;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.NonNull;
@@ -74,6 +75,10 @@ public String getSessionId() {
public abstract void registerLocalLink(@NonNull @NotNull String destination) throws IOException;
+ public boolean processPacket(@NonNull @NotNull Packet packet) throws IOException {
+ return false;
+ }
+
protected void inbound(@NonNull @NotNull String destinationName, @NonNull @NotNull Message message) throws IOException {
if (extensionProtocol == null) {
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java
index 2ae596900..947f2f538 100644
--- a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java
@@ -155,7 +155,7 @@ public ProtocolInformationDTO getInformation() {
@Override
public boolean processPacket(@NonNull @NotNull Packet packet) throws IOException {
- return false;
+ return extension.processPacket(packet);
}
@Override
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java
new file mode 100644
index 000000000..914f8f092
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.network.EndPointURL;
+import io.mapsmessaging.network.io.EndPoint;
+import io.mapsmessaging.network.io.EndPointConnectedCallback;
+import io.mapsmessaging.network.io.EndPointConnectionFactory;
+import io.mapsmessaging.network.io.EndPointServerStatus;
+import io.mapsmessaging.network.io.impl.SelectorLoadManager;
+import io.mapsmessaging.network.protocol.impl.extension.ExtensionEndPointConnectionFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+public class TakEndPointConnectionFactory implements EndPointConnectionFactory {
+
+ private final ExtensionEndPointConnectionFactory delegate = new ExtensionEndPointConnectionFactory();
+
+ @Override
+ public EndPoint connect(EndPointURL url, SelectorLoadManager selector, EndPointConnectedCallback connectedCallback,
+ EndPointServerStatus endPointServerStatus, List jmxPath) throws IOException {
+ return delegate.connect(url, selector, connectedCallback, endPointServerStatus, jmxPath);
+ }
+
+ @Override
+ public String getName() {
+ return "tak";
+ }
+
+ @Override
+ public String getDescription() {
+ return "TAK extension endpoint factory";
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java
new file mode 100644
index 000000000..442800d77
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java
@@ -0,0 +1,241 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.logging.Logger;
+import io.mapsmessaging.logging.LoggerFactory;
+import io.mapsmessaging.logging.ServerLogMessages;
+import io.mapsmessaging.network.EndPointURL;
+import io.mapsmessaging.network.io.EndPoint;
+import io.mapsmessaging.network.io.Packet;
+import io.mapsmessaging.network.protocol.impl.extension.Extension;
+import io.mapsmessaging.network.protocol.impl.tak.codec.CotXmlCodec;
+import io.mapsmessaging.network.protocol.impl.tak.codec.TakPayloadCodec;
+import io.mapsmessaging.network.protocol.impl.tak.codec.TakProtobufCodec;
+import io.mapsmessaging.network.protocol.impl.tak.framing.TakFrameReader;
+import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer;
+import io.mapsmessaging.network.protocol.impl.tak.transport.TakMulticastTransport;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TakExtension extends Extension {
+
+ private static final String PROTOCOL_NAME = "TAK";
+ private static final String PROTOCOL_VERSION = "1.0";
+ private static final String DEFAULT_EVENT_TYPE = "unknown";
+ private static final String DEFAULT_DESTINATION_PREFIX = "tak/cot/";
+ private static final String WILDCARD_MARKER = "#";
+
+ private final EndPointURL url;
+ private final Logger logger;
+ private final EndPoint endPoint;
+ private final TakExtensionConfig config;
+ private final TakPayloadCodec payloadCodec;
+ private final TakStreamFramer streamFramer;
+ private final TakMulticastTransport multicastTransport;
+ private final TakFrameReader frameReader;
+ private final Set remoteLinks;
+ private final AtomicBoolean running;
+ private volatile Thread multicastReaderThread;
+
+ public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionConfig) {
+ this.endPoint = endPoint;
+ this.url = new EndPointURL(endPoint.getConfig().getUrl());
+ this.logger = LoggerFactory.getLogger(TakExtension.class);
+ this.config = TakExtensionConfig.from(extensionConfig);
+ CotXmlCodec cotXmlCodec = CotXmlCodec.withSchemaFormatter(
+ config.isXmlNamespaceAware(),
+ config.isXmlCoalescing(),
+ config.isXmlValidating(),
+ config.getXmlRootEntry(),
+ config.getXmlSchemaId());
+ this.payloadCodec = TakExtensionConfig.PAYLOAD_TAK_PROTO_V1.equalsIgnoreCase(config.getPayload())
+ ? TakProtobufCodec.withSchemaFormatter(
+ cotXmlCodec,
+ config.getMaxPayloadBytes(),
+ config.getProtobufDescriptorBase64(),
+ config.getProtobufMessageName(),
+ config.getProtobufSchemaId())
+ : cotXmlCodec;
+ this.streamFramer = new TakStreamFramer(config.getFramingMode(), config.getMaxPayloadBytes());
+ this.multicastTransport = config.isMulticastEnabled() ? new TakMulticastTransport(config) : null;
+ this.frameReader = new TakFrameReader(streamFramer, config.getReadBufferBytes());
+ this.remoteLinks = ConcurrentHashMap.newKeySet();
+ this.running = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void initialise() throws IOException {
+ if (multicastTransport != null) {
+ multicastTransport.start();
+ }
+ logger.log(ServerLogMessages.TAK_EXTENSION_INITIALISED, url.toString());
+ running.set(true);
+ if (multicastTransport != null && config.isMulticastIngressEnabled()) {
+ multicastReaderThread = new Thread(this::multicastReaderLoop, "tak-mcast-reader-" + config.getMulticastGroup() + "-" + config.getMulticastPort());
+ multicastReaderThread.setDaemon(true);
+ multicastReaderThread.start();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ running.set(false);
+ Thread multicastThread = multicastReaderThread;
+ if (multicastThread != null) {
+ multicastThread.interrupt();
+ try {
+ multicastThread.join(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (multicastTransport != null) {
+ multicastTransport.close();
+ }
+ logger.log(ServerLogMessages.TAK_EXTENSION_CLOSED, url.toString());
+ }
+
+ @Override
+ public String getName() {
+ return PROTOCOL_NAME;
+ }
+
+ @Override
+ public String getVersion() {
+ return PROTOCOL_VERSION;
+ }
+
+ @Override
+ public boolean supportsRemoteFiltering() {
+ return false;
+ }
+
+ @Override
+ public void outbound(String destinationName, Message message) {
+ if (!running.get()) {
+ return;
+ }
+ try {
+ byte[] payload = payloadCodec.encode(message);
+ try {
+ byte[] framed = streamFramer.frame(payload);
+ endPoint.sendPacket(new Packet(ByteBuffer.wrap(framed)));
+ } catch (IOException ignored) {
+ // Stream path is best-effort; multicast egress may still succeed.
+ }
+ if (multicastTransport != null && config.isMulticastEgressEnabled()) {
+ try {
+ multicastTransport.send(payload);
+ endPoint.updateWriteBytes(payload.length);
+ } catch (IOException ignored) {
+ logger.log(ServerLogMessages.TAK_MULTICAST_IO_FAILED, config.getMulticastGroup(), config.getMulticastPort());
+ }
+ }
+ } catch (IOException ignored) {
+ logger.log(ServerLogMessages.TAK_EXTENSION_OUTBOUND_FAILED, destinationName);
+ }
+ }
+
+ @Override
+ public void registerRemoteLink(String destination, @Nullable String filter) {
+ remoteLinks.add(destination);
+ }
+
+ @Override
+ public void registerLocalLink(String destination) {
+ // Egress mapping is already handled by ExtensionProtocol/Protocol.parseOutboundMessage.
+ }
+
+ @Override
+ public boolean processPacket(@NotNull Packet packet) throws IOException {
+ if (!running.get()) {
+ return false;
+ }
+ ByteBuffer raw = packet.getRawBuffer();
+ if (raw == null || raw.remaining() <= 0) {
+ return false;
+ }
+ for (byte[] frame : frameReader.read(raw)) {
+ processInboundFrame(frame, "stream");
+ }
+ packet.position(packet.limit());
+ return true;
+ }
+
+ private void multicastReaderLoop() {
+ while (running.get()) {
+ try {
+ Optional frame = multicastTransport.read();
+ if (frame.isEmpty()) {
+ continue;
+ }
+ endPoint.updateReadBytes(frame.get().length);
+ processInboundFrame(frame.get(), "multicast");
+ } catch (IOException ex) {
+ if (!running.get()) {
+ break;
+ }
+ logger.log(ServerLogMessages.TAK_MULTICAST_IO_FAILED, config.getMulticastGroup(), config.getMulticastPort());
+ }
+ }
+ }
+
+ private String resolveInboundDestination(Message message) {
+ String eventType = null;
+ if (message.getMeta() != null) {
+ eventType = message.getMeta().get("tak.type");
+ if (eventType == null || eventType.isBlank()) {
+ eventType = message.getMeta().get("tak_type");
+ }
+ }
+ if (eventType == null || eventType.isBlank()) {
+ eventType = DEFAULT_EVENT_TYPE;
+ }
+ if (!remoteLinks.isEmpty()) {
+ String remoteDestination = remoteLinks.iterator().next();
+ if (remoteDestination.contains(WILDCARD_MARKER)) {
+ return remoteDestination.replace(WILDCARD_MARKER, eventType);
+ }
+ return remoteDestination;
+ }
+ return DEFAULT_DESTINATION_PREFIX + eventType;
+ }
+
+ private void processInboundFrame(byte[] frame, String source) {
+ try {
+ Message message = payloadCodec.decode(frame);
+ String destination = resolveInboundDestination(message);
+ inbound(destination, message);
+ } catch (IOException decodeFailure) {
+ logger.log(ServerLogMessages.TAK_EXTENSION_DECODE_FAILED, source);
+ }
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java
new file mode 100644
index 000000000..737a62a8e
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java
@@ -0,0 +1,221 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer;
+import lombok.Getter;
+
+import java.util.Locale;
+import java.util.Map;
+
+@Getter
+public class TakExtensionConfig {
+
+ public static final String PAYLOAD_COT_XML = "cot_xml";
+ public static final String PAYLOAD_TAK_PROTO_V1 = "tak_proto_v1";
+ private static final int DEFAULT_MAX_PAYLOAD = 1024 * 1024;
+ private static final int DEFAULT_RECONNECT_MS = 2000;
+ private static final int DEFAULT_RECONNECT_MAX_MS = 30000;
+ private static final double DEFAULT_RECONNECT_MULTIPLIER = 2.0d;
+ private static final int DEFAULT_RECONNECT_JITTER_MS = 250;
+ private static final int DEFAULT_READ_BUFFER = 8192;
+ private static final String DEFAULT_MULTICAST_GROUP = "239.2.3.1";
+ private static final int DEFAULT_MULTICAST_PORT = 6969;
+ private static final int DEFAULT_MULTICAST_TTL = 1;
+ private static final String DEFAULT_PROTOBUF_DESCRIPTOR_BASE64 = "";
+ private static final String DEFAULT_PROTOBUF_MESSAGE_NAME = "";
+ private static final String DEFAULT_XML_SCHEMA_ID = "";
+ private static final String DEFAULT_PROTOBUF_SCHEMA_ID = "";
+ private static final boolean DEFAULT_XML_NAMESPACE_AWARE = true;
+ private static final boolean DEFAULT_XML_COALESCING = true;
+ private static final boolean DEFAULT_XML_VALIDATING = false;
+ private static final String DEFAULT_XML_ROOT_ENTRY = "event";
+
+ private final String payload;
+ private final TakStreamFramer.Mode framingMode;
+ private final int maxPayloadBytes;
+ private final int reconnectDelayMs;
+ private final int reconnectMaxDelayMs;
+ private final double reconnectBackoffMultiplier;
+ private final int reconnectJitterMs;
+ private final int readBufferBytes;
+ private final boolean multicastEnabled;
+ private final boolean multicastIngressEnabled;
+ private final boolean multicastEgressEnabled;
+ private final String multicastGroup;
+ private final int multicastPort;
+ private final String multicastInterface;
+ private final int multicastTtl;
+ private final int multicastReadBufferBytes;
+ private final String protobufDescriptorBase64;
+ private final String protobufMessageName;
+ private final String xmlSchemaId;
+ private final String protobufSchemaId;
+ private final boolean xmlNamespaceAware;
+ private final boolean xmlCoalescing;
+ private final boolean xmlValidating;
+ private final String xmlRootEntry;
+
+ private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs,
+ int reconnectMaxDelayMs, double reconnectBackoffMultiplier, int reconnectJitterMs, int readBufferBytes,
+ boolean multicastEnabled, boolean multicastIngressEnabled, boolean multicastEgressEnabled,
+ String multicastGroup, int multicastPort, String multicastInterface, int multicastTtl, int multicastReadBufferBytes,
+ String protobufDescriptorBase64, String protobufMessageName,
+ String xmlSchemaId, String protobufSchemaId,
+ boolean xmlNamespaceAware, boolean xmlCoalescing, boolean xmlValidating, String xmlRootEntry) {
+ this.payload = payload;
+ this.framingMode = framingMode;
+ this.maxPayloadBytes = maxPayloadBytes;
+ this.reconnectDelayMs = reconnectDelayMs;
+ this.reconnectMaxDelayMs = reconnectMaxDelayMs;
+ this.reconnectBackoffMultiplier = reconnectBackoffMultiplier;
+ this.reconnectJitterMs = reconnectJitterMs;
+ this.readBufferBytes = readBufferBytes;
+ this.multicastEnabled = multicastEnabled;
+ this.multicastIngressEnabled = multicastIngressEnabled;
+ this.multicastEgressEnabled = multicastEgressEnabled;
+ this.multicastGroup = multicastGroup;
+ this.multicastPort = multicastPort;
+ this.multicastInterface = multicastInterface;
+ this.multicastTtl = multicastTtl;
+ this.multicastReadBufferBytes = multicastReadBufferBytes;
+ this.protobufDescriptorBase64 = protobufDescriptorBase64;
+ this.protobufMessageName = protobufMessageName;
+ this.xmlSchemaId = xmlSchemaId;
+ this.protobufSchemaId = protobufSchemaId;
+ this.xmlNamespaceAware = xmlNamespaceAware;
+ this.xmlCoalescing = xmlCoalescing;
+ this.xmlValidating = xmlValidating;
+ this.xmlRootEntry = xmlRootEntry;
+ }
+
+ public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) {
+ Map config = extensionConfig != null ? extensionConfig.getConfig() : null;
+ String payload = asString(config, "payload", PAYLOAD_COT_XML).toLowerCase(Locale.ROOT);
+ String framing = asString(config, "framing", "xml_stream").toLowerCase(Locale.ROOT);
+ TakStreamFramer.Mode mode = "proto_stream".equals(framing) ? TakStreamFramer.Mode.PROTO_STREAM : TakStreamFramer.Mode.XML_STREAM;
+ int maxPayload = asInt(config, "max_payload_bytes", DEFAULT_MAX_PAYLOAD);
+ int reconnectMs = asInt(config, "reconnect_delay_ms", DEFAULT_RECONNECT_MS);
+ int reconnectMaxMs = asInt(config, "reconnect_max_delay_ms", DEFAULT_RECONNECT_MAX_MS);
+ double reconnectMultiplier = asDouble(config, "reconnect_backoff_multiplier", DEFAULT_RECONNECT_MULTIPLIER);
+ int reconnectJitter = asInt(config, "reconnect_jitter_ms", DEFAULT_RECONNECT_JITTER_MS);
+ int readBuffer = asInt(config, "read_buffer_bytes", DEFAULT_READ_BUFFER);
+ boolean multicastEnabled = asBoolean(config, "multicast_enabled", false);
+ boolean multicastIngressEnabled = asBoolean(config, "multicast_ingress_enabled", multicastEnabled);
+ boolean multicastEgressEnabled = asBoolean(config, "multicast_egress_enabled", multicastEnabled);
+ String multicastGroup = asString(config, "multicast_group", DEFAULT_MULTICAST_GROUP).trim();
+ int multicastPort = asInt(config, "multicast_port", DEFAULT_MULTICAST_PORT);
+ String multicastInterface = asString(config, "multicast_interface", "").trim();
+ int multicastTtl = asInt(config, "multicast_ttl", DEFAULT_MULTICAST_TTL);
+ int multicastReadBuffer = asInt(config, "multicast_read_buffer_bytes", DEFAULT_READ_BUFFER);
+ String protobufDescriptorBase64 = asString(config, "protobuf_descriptor_base64", DEFAULT_PROTOBUF_DESCRIPTOR_BASE64).trim();
+ String protobufMessageName = asString(config, "protobuf_message_name", DEFAULT_PROTOBUF_MESSAGE_NAME).trim();
+ String xmlSchemaId = asString(config, "xml_schema_id", DEFAULT_XML_SCHEMA_ID).trim();
+ String protobufSchemaId = asString(config, "protobuf_schema_id", DEFAULT_PROTOBUF_SCHEMA_ID).trim();
+ boolean xmlNamespaceAware = asBoolean(config, "xml_namespace_aware", DEFAULT_XML_NAMESPACE_AWARE);
+ boolean xmlCoalescing = asBoolean(config, "xml_coalescing", DEFAULT_XML_COALESCING);
+ boolean xmlValidating = asBoolean(config, "xml_validating", DEFAULT_XML_VALIDATING);
+ String xmlRootEntry = asString(config, "xml_root_entry", DEFAULT_XML_ROOT_ENTRY).trim();
+ int reconnectBase = Math.max(100, reconnectMs);
+ int reconnectMax = Math.max(reconnectBase, reconnectMaxMs);
+ return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), reconnectBase,
+ reconnectMax, clampMultiplier(reconnectMultiplier), Math.max(0, reconnectJitter), Math.max(512, readBuffer),
+ multicastEnabled, multicastIngressEnabled, multicastEgressEnabled,
+ multicastGroup.isEmpty() ? DEFAULT_MULTICAST_GROUP : multicastGroup,
+ Math.max(1, multicastPort),
+ multicastInterface,
+ Math.max(1, Math.min(255, multicastTtl)),
+ Math.max(512, multicastReadBuffer),
+ protobufDescriptorBase64,
+ protobufMessageName,
+ xmlSchemaId,
+ protobufSchemaId,
+ xmlNamespaceAware,
+ xmlCoalescing,
+ xmlValidating,
+ xmlRootEntry.isEmpty() ? DEFAULT_XML_ROOT_ENTRY : xmlRootEntry);
+ }
+
+ private static String asString(Map config, String key, String def) {
+ if (config == null) {
+ return def;
+ }
+ Object val = config.get(key);
+ return val == null ? def : val.toString();
+ }
+
+ private static int asInt(Map config, String key, int def) {
+ if (config == null) {
+ return def;
+ }
+ Object val = config.get(key);
+ if (val == null) {
+ return def;
+ }
+ if (val instanceof Number number) {
+ return number.intValue();
+ }
+ try {
+ return Integer.parseInt(val.toString());
+ } catch (NumberFormatException ignored) {
+ return def;
+ }
+ }
+
+ private static double asDouble(Map config, String key, double def) {
+ if (config == null) {
+ return def;
+ }
+ Object val = config.get(key);
+ if (val == null) {
+ return def;
+ }
+ if (val instanceof Number number) {
+ return number.doubleValue();
+ }
+ try {
+ return Double.parseDouble(val.toString());
+ } catch (NumberFormatException ignored) {
+ return def;
+ }
+ }
+
+ private static double clampMultiplier(double multiplier) {
+ if (Double.isNaN(multiplier) || Double.isInfinite(multiplier)) {
+ return DEFAULT_RECONNECT_MULTIPLIER;
+ }
+ return Math.max(1.0d, Math.min(10.0d, multiplier));
+ }
+
+ private static boolean asBoolean(Map config, String key, boolean def) {
+ if (config == null) {
+ return def;
+ }
+ Object val = config.get(key);
+ if (val == null) {
+ return def;
+ }
+ if (val instanceof Boolean bool) {
+ return bool;
+ }
+ return Boolean.parseBoolean(val.toString());
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java
new file mode 100644
index 000000000..db353b2f6
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.dto.rest.config.protocol.ProtocolConfigDTO;
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.network.io.EndPoint;
+import io.mapsmessaging.network.io.Packet;
+import io.mapsmessaging.network.protocol.Protocol;
+import io.mapsmessaging.network.protocol.ProtocolImplFactory;
+import io.mapsmessaging.network.protocol.detection.NoOpDetection;
+import io.mapsmessaging.network.protocol.impl.extension.ExtensionEndPoint;
+import io.mapsmessaging.network.protocol.impl.extension.ExtensionProtocol;
+
+import java.io.IOException;
+
+public class TakProtocolFactory extends ProtocolImplFactory {
+
+ public TakProtocolFactory() {
+ super("tak", "TAK Cursor-on-Target extension protocol", new NoOpDetection());
+ }
+
+ @Override
+ public Protocol connect(EndPoint endPoint, String sessionId, String username, String password) throws IOException {
+ ExtensionConfigDTO extensionConfig;
+ if (endPoint instanceof ExtensionEndPoint extensionEndPoint && extensionEndPoint.config() instanceof ExtensionConfigDTO config) {
+ extensionConfig = config;
+ } else {
+ extensionConfig = null;
+ if (endPoint.getConfig() != null && endPoint.getConfig().getProtocolConfigs() != null) {
+ for (ProtocolConfigDTO protocolConfig : endPoint.getConfig().getProtocolConfigs()) {
+ if (protocolConfig instanceof ExtensionConfigDTO config && getName().equalsIgnoreCase(config.getProtocol())) {
+ extensionConfig = config;
+ break;
+ }
+ }
+ }
+ if (extensionConfig == null) {
+ throw new IOException("TAK extension configuration not found");
+ }
+ }
+
+ Protocol protocol = new ExtensionProtocol(endPoint, new TakExtension(endPoint, extensionConfig));
+ protocol.connect(sessionId, username, password);
+ return protocol;
+ }
+
+ @Override
+ public void create(EndPoint endPoint, Packet packet) {
+ // TAK extension does not accept inbound client sockets.
+ }
+
+ @Override
+ public String getTransportType() {
+ return "tak";
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java
new file mode 100644
index 000000000..fde8a5e62
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java
@@ -0,0 +1,322 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.codec;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.WireFormat;
+import io.mapsmessaging.api.MessageBuilder;
+import io.mapsmessaging.api.features.QualityOfService;
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.engine.schema.SchemaManager;
+import io.mapsmessaging.schemas.config.impl.XmlSchemaConfig;
+import io.mapsmessaging.schemas.formatters.MessageFormatter;
+import io.mapsmessaging.schemas.formatters.MessageFormatterFactory;
+import io.mapsmessaging.selector.IdentifierResolver;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class CotXmlCodec implements TakPayloadCodec {
+
+ private final MessageFormatter xmlFormatter;
+ private final String schemaId;
+
+ public CotXmlCodec() {
+ this(null, SchemaManager.DEFAULT_XML_SCHEMA.toString());
+ }
+
+ public CotXmlCodec(MessageFormatter xmlFormatter) {
+ this(xmlFormatter, SchemaManager.DEFAULT_XML_SCHEMA.toString());
+ }
+
+ public CotXmlCodec(MessageFormatter xmlFormatter, String schemaId) {
+ this.xmlFormatter = xmlFormatter;
+ this.schemaId = schemaId;
+ }
+
+ public static CotXmlCodec withSchemaFormatter(boolean namespaceAware, boolean coalescing, boolean validating, String rootEntry) {
+ return withSchemaFormatter(namespaceAware, coalescing, validating, rootEntry, SchemaManager.DEFAULT_XML_SCHEMA.toString());
+ }
+
+ public static CotXmlCodec withSchemaFormatter(boolean namespaceAware, boolean coalescing, boolean validating, String rootEntry, String schemaId) {
+ return new CotXmlCodec(createFormatter(namespaceAware, coalescing, validating, rootEntry), schemaId);
+ }
+
+ @Override
+ public Message decode(byte[] payload) throws IOException {
+ String xml = new String(payload, StandardCharsets.UTF_8);
+ Element event = parseRootEvent(xml);
+ Element point = extractPoint(event);
+
+ String uid = required(event, "uid");
+ String type = required(event, "type");
+ String time = required(event, "time");
+ String start = required(event, "start");
+ String stale = required(event, "stale");
+ String how = required(event, "how");
+ String lat = required(point, "lat");
+ String lon = required(point, "lon");
+
+ Map meta = new LinkedHashMap<>();
+ meta.put("tak.uid", uid);
+ meta.put("tak.type", type);
+ meta.put("tak.time", time);
+ meta.put("tak.start", start);
+ meta.put("tak.stale", stale);
+ meta.put("tak.how", how);
+ meta.put("tak.lat", lat);
+ meta.put("tak.lon", lon);
+ putIfPresent(meta, "tak.hae", point.getAttribute("hae"));
+ putIfPresent(meta, "tak.ce", point.getAttribute("ce"));
+ putIfPresent(meta, "tak.le", point.getAttribute("le"));
+ meta.put("tak.format", "xml");
+ meta.put("tak.transport", "stream");
+ mergeSchemaParsedMeta(meta, payload);
+ addSelectorAliases(meta);
+
+ MessageBuilder builder = new MessageBuilder()
+ .setOpaqueData(payload)
+ .setMeta(meta)
+ .setContentType("application/cot+xml")
+ .setQoS(QualityOfService.AT_MOST_ONCE);
+ if (schemaId != null && !schemaId.isBlank()) {
+ builder.setSchemaId(schemaId);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public byte[] encode(Message message) throws IOException {
+ byte[] opaque = message.getOpaqueData();
+ byte[] cloudEventPayload = TakCloudEventPayloadExtractor.tryExtractPayload(opaque);
+ if (cloudEventPayload != null) {
+ opaque = cloudEventPayload;
+ }
+ if (opaque != null && opaque.length > 0) {
+ String candidate = new String(opaque, StandardCharsets.UTF_8).trim();
+ if (candidate.startsWith(" meta = message.getMeta() != null ? message.getMeta() : new LinkedHashMap<>();
+ String now = Instant.now().truncatedTo(ChronoUnit.SECONDS).toString();
+ String stale = Instant.now().plus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS).toString();
+
+ String uid = getOrDefault(meta, "tak.uid", "maps-generated");
+ String type = getOrDefault(meta, "tak.type", "a-f-G-U-C");
+ String time = getOrDefault(meta, "tak.time", now);
+ String start = getOrDefault(meta, "tak.start", now);
+ String staleTime = getOrDefault(meta, "tak.stale", stale);
+ String how = getOrDefault(meta, "tak.how", "m-g");
+ String lat = getOrDefault(meta, "tak.lat", "0");
+ String lon = getOrDefault(meta, "tak.lon", "0");
+ String hae = getOrDefault(meta, "tak.hae", "0");
+ String ce = getOrDefault(meta, "tak.ce", "9999999");
+ String le = getOrDefault(meta, "tak.le", "9999999");
+
+ String xml = ""
+ + ""
+ + "";
+ return xml.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private static byte[] tryExtractEmbeddedCotFromProtobuf(byte[] payload) {
+ try {
+ CodedInputStream input = CodedInputStream.newInstance(payload);
+ while (!input.isAtEnd()) {
+ int tag = input.readTag();
+ if (tag == 0) {
+ break;
+ }
+ int wireType = WireFormat.getTagWireType(tag);
+ if (wireType == WireFormat.WIRETYPE_LENGTH_DELIMITED) {
+ byte[] candidate = input.readByteArray();
+ if (candidate.length > 0) {
+ String xmlCandidate = new String(candidate, StandardCharsets.UTF_8).trim();
+ if (xmlCandidate.startsWith(" meta, String key, String def) {
+ String value = meta.get(key);
+ return value == null || value.isBlank() ? def : value;
+ }
+
+ private static void putIfPresent(Map meta, String key, String value) {
+ if (value != null && !value.isBlank()) {
+ meta.put(key, value);
+ }
+ }
+
+ private static Element parseRootEvent(String xml) throws IOException {
+ try {
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ dbf.setNamespaceAware(true);
+ dbf.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
+ dbf.setFeature("http://xml.org/sax/features/external-general-entities", false);
+ dbf.setFeature("http://xml.org/sax/features/external-parameter-entities", false);
+ dbf.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true);
+ dbf.setExpandEntityReferences(false);
+ dbf.setXIncludeAware(false);
+ dbf.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, "");
+ dbf.setAttribute(XMLConstants.ACCESS_EXTERNAL_SCHEMA, "");
+ DocumentBuilder builder = dbf.newDocumentBuilder();
+ Document document = builder.parse(new InputSource(new StringReader(xml)));
+ Element root = document.getDocumentElement();
+ if (root == null || !"event".equalsIgnoreCase(root.getLocalName() == null ? root.getNodeName() : root.getLocalName())) {
+ throw new IOException("Invalid CoT XML payload: root element must be event");
+ }
+ return root;
+ } catch (ParserConfigurationException | SAXException ex) {
+ throw new IOException("Invalid CoT XML payload", ex);
+ }
+ }
+
+ private static Element extractPoint(Element root) throws IOException {
+ NodeList byNs = root.getElementsByTagNameNS("*", "point");
+ if (byNs.getLength() > 0 && byNs.item(0) instanceof Element element) {
+ return element;
+ }
+ NodeList byName = root.getElementsByTagName("point");
+ if (byName.getLength() > 0 && byName.item(0) instanceof Element element) {
+ return element;
+ }
+ throw new IOException("Invalid CoT XML payload: missing point element");
+ }
+
+ private static String required(Element element, String name) throws IOException {
+ String value = element.getAttribute(name);
+ if (value == null || value.isBlank()) {
+ throw new IOException("Invalid CoT XML payload: missing " + name);
+ }
+ return value;
+ }
+
+ private static String esc(String value) {
+ return value
+ .replace("&", "&")
+ .replace("\"", """)
+ .replace("<", "<")
+ .replace(">", ">");
+ }
+
+ private void mergeSchemaParsedMeta(Map meta, byte[] payload) {
+ if (xmlFormatter == null) {
+ return;
+ }
+ try {
+ IdentifierResolver parsed = xmlFormatter.parse(payload);
+ if (parsed == null) {
+ return;
+ }
+ promoteAny(parsed, meta, "tak.uid", "uid", "event.uid");
+ promoteAny(parsed, meta, "tak.type", "type", "event.type");
+ promoteAny(parsed, meta, "tak.time", "time", "event.time");
+ promoteAny(parsed, meta, "tak.start", "start", "event.start");
+ promoteAny(parsed, meta, "tak.stale", "stale", "event.stale");
+ promoteAny(parsed, meta, "tak.how", "how", "event.how");
+ promoteAny(parsed, meta, "tak.lat", "point.lat", "event.point.lat");
+ promoteAny(parsed, meta, "tak.lon", "point.lon", "event.point.lon");
+ promoteAny(parsed, meta, "tak.hae", "point.hae", "event.point.hae");
+ promoteAny(parsed, meta, "tak.ce", "point.ce", "event.point.ce");
+ promoteAny(parsed, meta, "tak.le", "point.le", "event.point.le");
+ meta.put("tak.xml_schema_parsed", "true");
+ } catch (Exception ignored) {
+ // Preserve default CoT metadata extraction if formatter parsing fails.
+ }
+ }
+
+ private static void promoteAny(IdentifierResolver parsed, Map meta, String toKey, String... fromKeys) {
+ for (String fromKey : fromKeys) {
+ Object val = parsed.get(fromKey);
+ if (val != null) {
+ meta.put(toKey, val.toString());
+ return;
+ }
+ }
+ }
+
+ private static void addSelectorAliases(Map meta) {
+ alias(meta, "tak.uid", "tak_uid");
+ alias(meta, "tak.type", "tak_type");
+ alias(meta, "tak.time", "tak_time");
+ alias(meta, "tak.start", "tak_start");
+ alias(meta, "tak.stale", "tak_stale");
+ alias(meta, "tak.how", "tak_how");
+ alias(meta, "tak.lat", "tak_lat");
+ alias(meta, "tak.lon", "tak_lon");
+ alias(meta, "tak.hae", "tak_hae");
+ alias(meta, "tak.ce", "tak_ce");
+ alias(meta, "tak.le", "tak_le");
+ alias(meta, "tak.format", "tak_format");
+ alias(meta, "tak.transport", "tak_transport");
+ }
+
+ private static void alias(Map meta, String sourceKey, String aliasKey) {
+ String value = meta.get(sourceKey);
+ if (value != null && !value.isBlank()) {
+ meta.put(aliasKey, value);
+ }
+ }
+
+ private static MessageFormatter createFormatter(boolean namespaceAware, boolean coalescing, boolean validating, String rootEntry) {
+ try {
+ XmlSchemaConfig schemaConfig = new XmlSchemaConfig();
+ XmlSchemaConfig.XmlConfig xmlConfig = new XmlSchemaConfig.XmlConfig();
+ xmlConfig.setNamespaceAware(namespaceAware);
+ xmlConfig.setCoalescing(coalescing);
+ xmlConfig.setValidating(validating);
+ xmlConfig.setRootEntry(rootEntry == null || rootEntry.isBlank() ? "event" : rootEntry);
+ schemaConfig.setConfig(xmlConfig);
+ return MessageFormatterFactory.getInstance().getFormatter(schemaConfig);
+ } catch (Exception ignored) {
+ return null;
+ }
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakCloudEventPayloadExtractor.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakCloudEventPayloadExtractor.java
new file mode 100644
index 000000000..5d482358d
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakCloudEventPayloadExtractor.java
@@ -0,0 +1,72 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.codec;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+
+final class TakCloudEventPayloadExtractor {
+
+ private TakCloudEventPayloadExtractor() {
+ }
+
+ static byte[] tryExtractPayload(byte[] opaqueData) {
+ if (opaqueData == null || opaqueData.length == 0) {
+ return null;
+ }
+ String jsonCandidate = new String(opaqueData, StandardCharsets.UTF_8).trim();
+ if (!jsonCandidate.startsWith("{")) {
+ return null;
+ }
+ try {
+ JsonElement rootElement = JsonParser.parseString(jsonCandidate);
+ if (!rootElement.isJsonObject()) {
+ return null;
+ }
+ JsonObject root = rootElement.getAsJsonObject();
+ if (!root.has("specversion")) {
+ return null;
+ }
+
+ JsonElement base64Element = root.get("data_base64");
+ if (base64Element != null && base64Element.isJsonPrimitive()) {
+ return Base64.getDecoder().decode(base64Element.getAsString());
+ }
+
+ JsonElement dataElement = root.get("data");
+ if (dataElement == null || !dataElement.isJsonObject()) {
+ return null;
+ }
+ JsonObject data = dataElement.getAsJsonObject();
+ JsonElement payloadBase64 = data.get("payload_base64");
+ if (payloadBase64 != null && payloadBase64.isJsonPrimitive()) {
+ return Base64.getDecoder().decode(payloadBase64.getAsString());
+ }
+ return null;
+ } catch (Exception ignored) {
+ return null;
+ }
+ }
+}
+
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java
new file mode 100644
index 000000000..0e24b9ca7
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java
@@ -0,0 +1,31 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.codec;
+
+import io.mapsmessaging.api.message.Message;
+
+import java.io.IOException;
+
+public interface TakPayloadCodec {
+
+ Message decode(byte[] payload) throws IOException;
+
+ byte[] encode(Message message) throws IOException;
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java
new file mode 100644
index 000000000..d28665214
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java
@@ -0,0 +1,244 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.codec;
+
+import com.google.protobuf.CodedInputStream;
+import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
+import io.mapsmessaging.api.MessageBuilder;
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.schemas.config.impl.ProtoBufSchemaConfig;
+import io.mapsmessaging.schemas.formatters.MessageFormatter;
+import io.mapsmessaging.schemas.formatters.MessageFormatterFactory;
+import io.mapsmessaging.selector.IdentifierResolver;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class TakProtobufCodec implements TakPayloadCodec {
+
+ private static final int DEFAULT_MAX_PAYLOAD_BYTES = 1024 * 1024;
+ private final CotXmlCodec cotXmlCodec;
+ private final int maxPayloadBytes;
+ private final MessageFormatter protobufFormatter;
+ private final String protobufSchemaId;
+
+ public TakProtobufCodec() {
+ this(new CotXmlCodec(), DEFAULT_MAX_PAYLOAD_BYTES, null, null);
+ }
+
+ public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes) {
+ this(cotXmlCodec, maxPayloadBytes, null, null);
+ }
+
+ public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes, MessageFormatter protobufFormatter) {
+ this(cotXmlCodec, maxPayloadBytes, protobufFormatter, null);
+ }
+
+ public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes, MessageFormatter protobufFormatter, String protobufSchemaId) {
+ this.cotXmlCodec = cotXmlCodec;
+ this.maxPayloadBytes = Math.max(1024, maxPayloadBytes);
+ this.protobufFormatter = protobufFormatter;
+ this.protobufSchemaId = protobufSchemaId;
+ }
+
+ public static TakProtobufCodec withSchemaFormatter(CotXmlCodec cotXmlCodec, int maxPayloadBytes,
+ String descriptorBase64, String messageName) {
+ return withSchemaFormatter(cotXmlCodec, maxPayloadBytes, descriptorBase64, messageName, null);
+ }
+
+ public static TakProtobufCodec withSchemaFormatter(CotXmlCodec cotXmlCodec, int maxPayloadBytes,
+ String descriptorBase64, String messageName, String protobufSchemaId) {
+ return new TakProtobufCodec(cotXmlCodec, maxPayloadBytes, createFormatter(descriptorBase64, messageName), protobufSchemaId);
+ }
+
+ @Override
+ public Message decode(byte[] payload) throws IOException {
+ if (payload == null || payload.length == 0) {
+ throw new IOException("Invalid TAK protobuf payload: empty");
+ }
+ if (payload.length > maxPayloadBytes) {
+ throw new IOException("Invalid TAK protobuf payload: exceeds max size");
+ }
+
+ Map meta = new LinkedHashMap<>();
+ meta.put("tak.format", "protobuf");
+ meta.put("tak.transport", "stream");
+
+ byte[] embeddedCot = extractEmbeddedCotXml(payload);
+ String schemaId = protobufSchemaId;
+ if (embeddedCot != null) {
+ try {
+ Message cotMessage = cotXmlCodec.decode(embeddedCot);
+ if (cotMessage.getMeta() != null) {
+ meta.putAll(cotMessage.getMeta());
+ }
+ if (cotMessage.getSchemaId() != null && !cotMessage.getSchemaId().isBlank()) {
+ schemaId = cotMessage.getSchemaId();
+ }
+ meta.put("tak.format", "protobuf");
+ meta.put("tak.transport", "stream");
+ meta.put("tak.protobuf_embedded_xml", "true");
+ } catch (IOException ignored) {
+ // Keep protobuf metadata only when embedded CoT is malformed.
+ }
+ } else if (protobufFormatter != null) {
+ mergeSchemaParsedMeta(meta, payload);
+ }
+ addSelectorAliases(meta);
+
+ MessageBuilder builder = new MessageBuilder()
+ .setOpaqueData(payload)
+ .setContentType("application/x-protobuf")
+ .setMeta(meta);
+ if (schemaId != null && !schemaId.isBlank()) {
+ builder.setSchemaId(schemaId);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public byte[] encode(Message message) throws IOException {
+ byte[] opaque = message.getOpaqueData();
+ byte[] cloudEventPayload = TakCloudEventPayloadExtractor.tryExtractPayload(opaque);
+ if (cloudEventPayload != null) {
+ opaque = cloudEventPayload;
+ }
+ if (opaque != null && opaque.length > 0 && !looksLikeCotXml(opaque)) {
+ if (opaque.length > maxPayloadBytes) {
+ throw new IOException("TAK protobuf payload exceeds max size");
+ }
+ return opaque;
+ }
+
+ // Fallback mode: wrap CoT XML as protobuf field #1 (length-delimited).
+ // This keeps the transport protobuf-framed for phase-2 interoperability work.
+ byte[] cotXml = (opaque != null && opaque.length > 0) ? opaque : cotXmlCodec.encode(message);
+ if (cotXml.length > maxPayloadBytes) {
+ throw new IOException("TAK protobuf payload exceeds max size");
+ }
+ return wrapCotInLengthDelimitedField(cotXml);
+ }
+
+ private static byte[] wrapCotInLengthDelimitedField(byte[] cotXml) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream(cotXml.length + 8);
+ CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out);
+ codedOutputStream.writeByteArray(1, cotXml);
+ codedOutputStream.flush();
+ return out.toByteArray();
+ }
+
+ private static boolean looksLikeCotXml(byte[] payload) {
+ String candidate = new String(payload, StandardCharsets.UTF_8).trim();
+ return candidate.startsWith(" 0 && looksLikeCotXml(candidate)) {
+ return candidate;
+ }
+ } else if (!input.skipField(tag)) {
+ break;
+ }
+ }
+ return null;
+ }
+
+ private void mergeSchemaParsedMeta(Map meta, byte[] payload) {
+ try {
+ IdentifierResolver parsed = protobufFormatter.parse(payload);
+ if (parsed == null) {
+ return;
+ }
+ promoteAny(parsed, meta, "tak.uid", "uid", "event.uid");
+ promoteAny(parsed, meta, "tak.type", "type", "event.type");
+ promoteAny(parsed, meta, "tak.time", "time", "event.time");
+ promoteAny(parsed, meta, "tak.start", "start", "event.start");
+ promoteAny(parsed, meta, "tak.stale", "stale", "event.stale");
+ promoteAny(parsed, meta, "tak.how", "how", "event.how");
+ meta.put("tak.protobuf_parsed", "true");
+ } catch (Exception ignored) {
+ // keep base protobuf metadata when schema parsing fails
+ }
+ }
+
+ private static void promoteAny(IdentifierResolver parsed, Map meta, String toKey, String... fromKeys) {
+ for (String fromKey : fromKeys) {
+ Object val = parsed.get(fromKey);
+ if (val != null) {
+ meta.put(toKey, val.toString());
+ return;
+ }
+ }
+ }
+
+ private static void addSelectorAliases(Map meta) {
+ alias(meta, "tak.uid", "tak_uid");
+ alias(meta, "tak.type", "tak_type");
+ alias(meta, "tak.time", "tak_time");
+ alias(meta, "tak.start", "tak_start");
+ alias(meta, "tak.stale", "tak_stale");
+ alias(meta, "tak.how", "tak_how");
+ alias(meta, "tak.lat", "tak_lat");
+ alias(meta, "tak.lon", "tak_lon");
+ alias(meta, "tak.hae", "tak_hae");
+ alias(meta, "tak.ce", "tak_ce");
+ alias(meta, "tak.le", "tak_le");
+ alias(meta, "tak.format", "tak_format");
+ alias(meta, "tak.transport", "tak_transport");
+ }
+
+ private static void alias(Map meta, String sourceKey, String aliasKey) {
+ String value = meta.get(sourceKey);
+ if (value != null && !value.isBlank()) {
+ meta.put(aliasKey, value);
+ }
+ }
+
+ private static MessageFormatter createFormatter(String descriptorBase64, String messageName) {
+ if (descriptorBase64 == null || descriptorBase64.isBlank() || messageName == null || messageName.isBlank()) {
+ return null;
+ }
+ try {
+ ProtoBufSchemaConfig schemaConfig = new ProtoBufSchemaConfig();
+ ProtoBufSchemaConfig.ProtobufConfig protobufConfig = new ProtoBufSchemaConfig.ProtobufConfig();
+ protobufConfig.setDescriptorValue(Base64.getDecoder().decode(descriptorBase64));
+ protobufConfig.setMessageName(messageName);
+ schemaConfig.setProtobufConfig(protobufConfig);
+ return MessageFormatterFactory.getInstance().getFormatter(schemaConfig);
+ } catch (Exception ignored) {
+ return null;
+ }
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java
new file mode 100644
index 000000000..1fc0cdf8e
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.framing;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class TakFrameReader {
+
+ private final TakStreamFramer framer;
+ private final byte[] readBuffer;
+
+ public TakFrameReader(TakStreamFramer framer, int bufferSize) {
+ this.framer = framer;
+ this.readBuffer = new byte[Math.max(bufferSize, 256)];
+ }
+
+ public List read(InputStream inputStream) throws IOException {
+ int read = inputStream.read(readBuffer);
+ if (read < 0) {
+ throw new EOFException("TAK connection closed");
+ }
+ return framer.onBytes(readBuffer, read);
+ }
+
+ public List read(ByteBuffer byteBuffer) throws IOException {
+ if (byteBuffer == null || !byteBuffer.hasRemaining()) {
+ return List.of();
+ }
+ int len = byteBuffer.remaining();
+ byte[] data = new byte[len];
+ byteBuffer.get(data);
+ return framer.onBytes(data, len);
+ }
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java
new file mode 100644
index 000000000..d509eecfe
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.framing;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class TakStreamFramer {
+
+ public enum Mode {
+ XML_STREAM,
+ PROTO_STREAM
+ }
+
+ private static final byte TAK_PROTO_MAGIC = (byte) 0xBF;
+ private static final byte[] XML_END_TAG = "".getBytes(StandardCharsets.UTF_8);
+
+ private final Mode mode;
+ private final int maxPayloadBytes;
+ private byte[] buffer;
+
+ public TakStreamFramer(Mode mode, int maxPayloadBytes) {
+ this.mode = mode;
+ this.maxPayloadBytes = maxPayloadBytes;
+ this.buffer = new byte[0];
+ }
+
+ public synchronized List onBytes(byte[] incoming, int length) throws IOException {
+ if (incoming == null || length <= 0) {
+ return List.of();
+ }
+ append(incoming, length);
+ if (buffer.length > maxPayloadBytes * 2) {
+ throw new IOException("Buffered TAK stream exceeds safe bound");
+ }
+ return switch (mode) {
+ case XML_STREAM -> parseXml();
+ case PROTO_STREAM -> parseProto();
+ };
+ }
+
+ public synchronized byte[] frame(byte[] payload) throws IOException {
+ if (payload == null) {
+ return new byte[0];
+ }
+ if (payload.length > maxPayloadBytes) {
+ throw new IOException("Payload exceeds max frame size");
+ }
+ if (mode == Mode.XML_STREAM) {
+ return payload;
+ }
+ byte[] varint = encodeVarint(payload.length);
+ byte[] out = new byte[1 + varint.length + payload.length];
+ out[0] = TAK_PROTO_MAGIC;
+ System.arraycopy(varint, 0, out, 1, varint.length);
+ System.arraycopy(payload, 0, out, 1 + varint.length, payload.length);
+ return out;
+ }
+
+ private List parseXml() throws IOException {
+ List frames = new ArrayList<>();
+ while (true) {
+ int endTag = indexOf(buffer, XML_END_TAG, 0);
+ if (endTag < 0) {
+ break;
+ }
+ int frameEnd = endTag + XML_END_TAG.length;
+ while (frameEnd < buffer.length && Character.isWhitespace((char) buffer[frameEnd])) {
+ frameEnd++;
+ }
+ if (frameEnd > maxPayloadBytes) {
+ throw new IOException("TAK XML frame exceeds max frame size");
+ }
+ frames.add(Arrays.copyOfRange(buffer, 0, frameEnd));
+ buffer = Arrays.copyOfRange(buffer, frameEnd, buffer.length);
+ }
+ return frames;
+ }
+
+ private List parseProto() throws IOException {
+ List frames = new ArrayList<>();
+ while (buffer.length > 0) {
+ if (buffer[0] != TAK_PROTO_MAGIC) {
+ throw new IOException("Invalid TAK protobuf stream header");
+ }
+ if (buffer.length < 2) {
+ break;
+ }
+ VarintResult varintResult = decodeVarint(buffer, 1);
+ if (varintResult == null) {
+ break;
+ }
+ int length = varintResult.value;
+ if (length < 0 || length > maxPayloadBytes) {
+ throw new IOException("Invalid TAK protobuf payload length: " + length);
+ }
+ int frameLength = 1 + varintResult.bytesRead + length;
+ if (buffer.length < frameLength) {
+ break;
+ }
+ int payloadOffset = 1 + varintResult.bytesRead;
+ frames.add(Arrays.copyOfRange(buffer, payloadOffset, payloadOffset + length));
+ buffer = Arrays.copyOfRange(buffer, frameLength, buffer.length);
+ }
+ return frames;
+ }
+
+ private void append(byte[] data, int length) {
+ int copyLength = Math.min(length, data.length);
+ byte[] next = Arrays.copyOf(buffer, buffer.length + copyLength);
+ System.arraycopy(data, 0, next, buffer.length, copyLength);
+ buffer = next;
+ }
+
+ private static byte[] encodeVarint(int value) {
+ byte[] tmp = new byte[5];
+ int count = 0;
+ int v = value;
+ while ((v & ~0x7F) != 0) {
+ tmp[count++] = (byte) ((v & 0x7F) | 0x80);
+ v >>>= 7;
+ }
+ tmp[count++] = (byte) (v & 0x7F);
+ return Arrays.copyOf(tmp, count);
+ }
+
+ private static VarintResult decodeVarint(byte[] data, int offset) throws IOException {
+ int value = 0;
+ int shift = 0;
+ int bytes = 0;
+ for (int i = offset; i < data.length && bytes < 5; i++) {
+ int b = data[i] & 0xFF;
+ value |= (b & 0x7F) << shift;
+ bytes++;
+ if ((b & 0x80) == 0) {
+ return new VarintResult(value, bytes);
+ }
+ shift += 7;
+ }
+ if (bytes >= 5 && (data[offset + 4] & 0x80) != 0) {
+ throw new IOException("Malformed varint length");
+ }
+ return null;
+ }
+
+ private static int indexOf(byte[] haystack, byte[] needle, int from) {
+ if (needle.length == 0 || haystack.length < needle.length) {
+ return -1;
+ }
+ for (int i = from; i <= haystack.length - needle.length; i++) {
+ boolean match = true;
+ for (int j = 0; j < needle.length; j++) {
+ if (haystack[i + j] != needle[j]) {
+ match = false;
+ break;
+ }
+ }
+ if (match) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private record VarintResult(int value, int bytesRead) {}
+}
diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java
new file mode 100644
index 000000000..31cd4d76d
--- /dev/null
+++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.transport;
+
+import io.mapsmessaging.network.protocol.impl.tak.TakExtensionConfig;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.MulticastSocket;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.util.Optional;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class TakMulticastTransport {
+
+ private final TakExtensionConfig config;
+ private final ReentrantLock sendLock;
+
+ private MulticastSocket socket;
+ private InetAddress groupAddress;
+
+ public TakMulticastTransport(TakExtensionConfig config) {
+ this.config = config;
+ this.sendLock = new ReentrantLock();
+ }
+
+ public synchronized void start() throws IOException {
+ close();
+ groupAddress = InetAddress.getByName(config.getMulticastGroup());
+ socket = new MulticastSocket(config.getMulticastPort());
+ socket.setReuseAddress(true);
+ socket.setSoTimeout(1000);
+ socket.setTimeToLive(config.getMulticastTtl());
+
+ NetworkInterface networkInterface = resolveInterface();
+ if (networkInterface != null) {
+ socket.setNetworkInterface(networkInterface);
+ socket.joinGroup(new InetSocketAddress(groupAddress, config.getMulticastPort()), networkInterface);
+ } else {
+ socket.joinGroup(groupAddress);
+ }
+ }
+
+ public Optional read() throws IOException {
+ MulticastSocket current = getSocket();
+ byte[] buffer = new byte[config.getMulticastReadBufferBytes()];
+ DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
+ try {
+ current.receive(packet);
+ if (packet.getLength() > config.getMaxPayloadBytes()) {
+ return Optional.empty();
+ }
+ byte[] copy = new byte[packet.getLength()];
+ System.arraycopy(packet.getData(), packet.getOffset(), copy, 0, packet.getLength());
+ return Optional.of(copy);
+ } catch (SocketTimeoutException ignored) {
+ return Optional.empty();
+ }
+ }
+
+ public void send(byte[] payload) throws IOException {
+ MulticastSocket current = getSocket();
+ DatagramPacket packet = new DatagramPacket(payload, payload.length, groupAddress, config.getMulticastPort());
+ sendLock.lock();
+ try {
+ current.send(packet);
+ } finally {
+ sendLock.unlock();
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (socket == null) {
+ return;
+ }
+ IOException deferred = null;
+ try {
+ NetworkInterface networkInterface = resolveInterface();
+ if (networkInterface != null && groupAddress != null) {
+ socket.leaveGroup(new InetSocketAddress(groupAddress, config.getMulticastPort()), networkInterface);
+ } else if (groupAddress != null) {
+ socket.leaveGroup(groupAddress);
+ }
+ } catch (IOException ex) {
+ deferred = ex;
+ }
+ socket.close();
+ socket = null;
+ groupAddress = null;
+ if (deferred != null) {
+ throw deferred;
+ }
+ }
+
+ private synchronized MulticastSocket getSocket() throws IOException {
+ if (socket == null || socket.isClosed()) {
+ throw new IOException("TAK multicast transport is not started");
+ }
+ return socket;
+ }
+
+ private NetworkInterface resolveInterface() throws IOException {
+ String iface = config.getMulticastInterface();
+ if (iface == null || iface.isBlank()) {
+ return null;
+ }
+ NetworkInterface named = NetworkInterface.getByName(iface);
+ if (named != null) {
+ return named;
+ }
+ InetAddress address = InetAddress.getByName(iface);
+ return NetworkInterface.getByInetAddress(address);
+ }
+}
diff --git a/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory b/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory
index 6d4a40add..2cf75a557 100644
--- a/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory
+++ b/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory
@@ -21,4 +21,5 @@ io.mapsmessaging.network.io.impl.tcp.TCPEndPointConnectionFactory
io.mapsmessaging.network.io.impl.ssl.SSLEndPointConnectionFactory
io.mapsmessaging.network.io.impl.noop.NoOpEndPointConnectionFactory
io.mapsmessaging.network.io.impl.udp.UDPEndPointConnectionFactory
-io.mapsmessaging.network.io.impl.serial.SerialEndPointConnectionFactory
\ No newline at end of file
+io.mapsmessaging.network.io.impl.serial.SerialEndPointConnectionFactory
+io.mapsmessaging.network.protocol.impl.tak.TakEndPointConnectionFactory
diff --git a/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory b/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory
index 7af47b746..46d7b5a9f 100644
--- a/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory
+++ b/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory
@@ -75,4 +75,5 @@ io.mapsmessaging.network.protocol.impl.apache_pulsar.PulsarProtocolFactory
io.mapsmessaging.network.protocol.impl.ibm_mq.MqProtocolFactory
io.mapsmessaging.network.protocol.impl.aws_sns.SnsProtocolFactory
io.mapsmessaging.network.protocol.impl.v2x_step.V2xStepProtocolFactory
-io.mapsmessaging.network.protocol.impl.extension.impl.example.ExampleProtocolFactory
\ No newline at end of file
+io.mapsmessaging.network.protocol.impl.extension.impl.example.ExampleProtocolFactory
+io.mapsmessaging.network.protocol.impl.tak.TakProtocolFactory
diff --git a/src/main/resources/NetworkConnectionManager.yaml b/src/main/resources/NetworkConnectionManager.yaml
index 942151398..1b2ce0964 100644
--- a/src/main/resources/NetworkConnectionManager.yaml
+++ b/src/main/resources/NetworkConnectionManager.yaml
@@ -66,4 +66,41 @@ NetworkConnectionManager:
-
direction: pull
local_namespace: "/"
- remote_namespace: "/#"
\ No newline at end of file
+ remote_namespace: "/#"
+
+ # Example TAK bridge (CoT XML stream over TLS)
+ # - name: "TAK Server Connection"
+ # url: ssl://tak.example.local:8089/
+ # protocol: tak
+ # plugin: true
+ #
+ # config:
+ # payload: cot_xml
+ # framing: xml_stream
+ # max_payload_bytes: 1048576
+ # reconnect_delay_ms: 2000
+ # reconnect_max_delay_ms: 30000
+ # reconnect_backoff_multiplier: 2.0
+ # reconnect_jitter_ms: 250
+ # read_buffer_bytes: 8192
+ # xml_schema_id: "10000000-0000-1000-a000-100000000004" # default XML schema
+ #
+ # # Optional protobuf mode:
+ # # payload: tak_proto_v1
+ # # framing: proto_stream
+ # # protobuf_descriptor_base64: ""
+ # # protobuf_message_name: "TakMessage"
+ # # protobuf_schema_id: ""
+ #
+ # links:
+ # - direction: pull
+ # remote_namespace: "tak/cot/#"
+ # local_namespace: "tak/cot/"
+ # selector: ""
+ # qos: 0
+ #
+ # - direction: push
+ # remote_namespace: "tak/cot/out/#"
+ # local_namespace: "ops/cot/out/"
+ # selector: "tak_type LIKE 'a-%'"
+ # qos: 0
diff --git a/src/test/java/io/mapsmessaging/api/transformers/TakToCloudEventTransformationTest.java b/src/test/java/io/mapsmessaging/api/transformers/TakToCloudEventTransformationTest.java
new file mode 100644
index 000000000..9bb10ea6b
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/api/transformers/TakToCloudEventTransformationTest.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.api.transformers;
+
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.mapsmessaging.api.message.Message;
+import io.mapsmessaging.engine.schema.SchemaManager;
+import io.mapsmessaging.network.protocol.impl.tak.codec.CotXmlCodec;
+import io.mapsmessaging.network.protocol.impl.tak.codec.TakProtobufCodec;
+import io.mapsmessaging.network.protocol.transformation.cloudevent.CloudEventEnvelopeTransformation;
+import io.mapsmessaging.network.protocol.transformation.cloudevent.CloudEventJsonTransformation;
+import io.mapsmessaging.network.protocol.transformation.cloudevent.CloudEventNativeTransformation;
+import io.mapsmessaging.schemas.config.impl.ProtoBufSchemaConfig;
+import io.mapsmessaging.test.BaseTestConfig;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class TakToCloudEventTransformationTest extends BaseTestConfig {
+
+ private final SchemaManager schemaManager = SchemaManager.getInstance();
+ private UUID protobufSchemaId;
+
+ @AfterEach
+ void cleanUpSchema() {
+ if (protobufSchemaId != null) {
+ schemaManager.removeSchema(protobufSchemaId.toString());
+ protobufSchemaId = null;
+ }
+ }
+
+ @Test
+ void cotXmlDecodedMessageTransformsToCloudEventJsonData() throws Exception {
+ CotXmlCodec codec = new CotXmlCodec();
+ byte[] cotPayload = """
+
+
+
+ """.getBytes(StandardCharsets.UTF_8);
+ Message takMessage = codec.decode(cotPayload);
+
+ Message cloudEventMessage = new CloudEventJsonTransformation().outgoing(takMessage, "/tak/cot/in");
+ assertNotNull(cloudEventMessage);
+ JsonObject cloudEvent = JsonParser.parseString(new String(cloudEventMessage.getOpaqueData(), StandardCharsets.UTF_8)).getAsJsonObject();
+
+ assertEquals("1.0", cloudEvent.get("specversion").getAsString());
+ assertEquals("application/json", cloudEvent.get("datacontenttype").getAsString());
+ assertTrue(cloudEvent.has("data"));
+ assertFalse(cloudEvent.getAsJsonObject("data").has("payload_base64"));
+ assertEquals("u-xml-1", cloudEvent.get("mapsMeta_tak_uid").getAsString());
+ assertEquals("xml", cloudEvent.get("mapsMeta_tak_format").getAsString());
+ }
+
+ @Test
+ void takProtobufDecodedMessageTransformsToCloudEventJsonData() throws Exception {
+ protobufSchemaId = UUID.randomUUID();
+ ProtoBufSchemaConfig schema = (ProtoBufSchemaConfig) TestProtobufSchemas.protobufSchemaConfig(protobufSchemaId);
+ schemaManager.addSchema("/tak/proto", schema);
+ String descriptorBase64 = Base64.getEncoder().encodeToString(schema.getProtobufConfig().getDescriptorValue());
+ String messageName = schema.getProtobufConfig().getMessageName();
+
+ TakProtobufCodec codec = TakProtobufCodec.withSchemaFormatter(
+ new CotXmlCodec(),
+ 1024 * 1024,
+ descriptorBase64,
+ messageName,
+ protobufSchemaId.toString());
+ Message takMessage = codec.decode(TestProtobufSchemas.encodeProtobufPayload());
+
+ Message cloudEventMessage = new CloudEventJsonTransformation().outgoing(takMessage, "/tak/proto/in");
+ assertNotNull(cloudEventMessage);
+ JsonObject cloudEvent = JsonParser.parseString(new String(cloudEventMessage.getOpaqueData(), StandardCharsets.UTF_8)).getAsJsonObject();
+ JsonObject data = cloudEvent.getAsJsonObject("data");
+
+ assertEquals("1.0", cloudEvent.get("specversion").getAsString());
+ assertEquals("application/json", cloudEvent.get("datacontenttype").getAsString());
+ assertTrue(cloudEvent.has("mapsSchemaId"));
+ assertTrue(data.has("id"));
+ assertEquals("abc", data.get("id").getAsString());
+ assertEquals(123, data.get("count").getAsInt());
+ assertTrue(data.get("active").getAsBoolean());
+ assertEquals("protobuf", cloudEvent.get("mapsMeta_tak_format").getAsString());
+ }
+
+ @Test
+ void cotXmlRoundTripsTakCloudEventTakViaEnvelope() throws Exception {
+ CotXmlCodec codec = new CotXmlCodec();
+ byte[] cotPayload = ""
+ .getBytes(StandardCharsets.UTF_8);
+ Message takMessage = codec.decode(cotPayload);
+ Message cloudEvent = new CloudEventEnvelopeTransformation().outgoing(takMessage, "/tak/cot/in");
+
+ byte[] encodedBackToTak = codec.encode(cloudEvent);
+ assertArrayEquals(cotPayload, encodedBackToTak);
+
+ Message decodedBack = codec.decode(encodedBackToTak);
+ assertEquals("u-xml-2", decodedBack.getMeta().get("tak.uid"));
+ assertEquals("xml", decodedBack.getMeta().get("tak.format"));
+ }
+
+ @Test
+ void protobufRoundTripsTakCloudEventTakViaNative() throws Exception {
+ protobufSchemaId = UUID.randomUUID();
+ ProtoBufSchemaConfig schema = (ProtoBufSchemaConfig) TestProtobufSchemas.protobufSchemaConfig(protobufSchemaId);
+ schemaManager.addSchema("/tak/proto", schema);
+ String descriptorBase64 = Base64.getEncoder().encodeToString(schema.getProtobufConfig().getDescriptorValue());
+ String messageName = schema.getProtobufConfig().getMessageName();
+
+ TakProtobufCodec codec = TakProtobufCodec.withSchemaFormatter(
+ new CotXmlCodec(),
+ 1024 * 1024,
+ descriptorBase64,
+ messageName,
+ protobufSchemaId.toString());
+ byte[] protobufPayload = TestProtobufSchemas.encodeProtobufPayload();
+ Message takMessage = codec.decode(protobufPayload);
+ Message cloudEvent = new CloudEventNativeTransformation().outgoing(takMessage, "/tak/proto/in");
+
+ byte[] encodedBackToTak = codec.encode(cloudEvent);
+ assertArrayEquals(protobufPayload, encodedBackToTak);
+
+ Message decodedBack = codec.decode(encodedBackToTak);
+ assertEquals("protobuf", decodedBack.getMeta().get("tak.format"));
+ }
+
+ @Test
+ void cotXmlToCloudEventToTakProtobufToCloudEventToCotXml() throws Exception {
+ protobufSchemaId = UUID.randomUUID();
+ ProtoBufSchemaConfig schema = (ProtoBufSchemaConfig) TestProtobufSchemas.protobufSchemaConfig(protobufSchemaId);
+ schemaManager.addSchema("/tak/proto", schema);
+ String descriptorBase64 = Base64.getEncoder().encodeToString(schema.getProtobufConfig().getDescriptorValue());
+ String messageName = schema.getProtobufConfig().getMessageName();
+
+ CotXmlCodec xmlCodec = new CotXmlCodec();
+ TakProtobufCodec protobufCodec = TakProtobufCodec.withSchemaFormatter(
+ new CotXmlCodec(),
+ 1024 * 1024,
+ descriptorBase64,
+ messageName,
+ protobufSchemaId.toString());
+
+ byte[] startXml = ""
+ .getBytes(StandardCharsets.UTF_8);
+ Message takXmlMessage = xmlCodec.decode(startXml);
+
+ Message cloudEventFromXml = new CloudEventEnvelopeTransformation().outgoing(takXmlMessage, "/tak/cot/in");
+ byte[] takProtobufBytes = protobufCodec.encode(cloudEventFromXml);
+ Message takProtobufMessage = protobufCodec.decode(takProtobufBytes);
+
+ Message cloudEventFromProtobuf = new CloudEventNativeTransformation().outgoing(takProtobufMessage, "/tak/proto/in");
+ byte[] endXml = xmlCodec.encode(cloudEventFromProtobuf);
+ Message decodedEndXml = xmlCodec.decode(endXml);
+
+ assertEquals("u-chain-1", decodedEndXml.getMeta().get("tak.uid"));
+ assertEquals("a-f-G-U-C", decodedEndXml.getMeta().get("tak.type"));
+ assertEquals("10.1", decodedEndXml.getMeta().get("tak.lat"));
+ assertEquals("11.2", decodedEndXml.getMeta().get("tak.lon"));
+ assertEquals("xml", decodedEndXml.getMeta().get("tak.format"));
+ }
+}
diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java
new file mode 100644
index 000000000..204a1c0c3
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak;
+
+import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO;
+import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TakExtensionTest {
+
+ @Test
+ void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception {
+ ExtensionConfigDTO dto = new ExtensionConfigDTO();
+ Map config = new LinkedHashMap<>();
+ config.put("payload", "tak_proto_v1");
+ config.put("framing", "proto_stream");
+ config.put("max_payload_bytes", 2048);
+ config.put("reconnect_delay_ms", 1500);
+ config.put("reconnect_max_delay_ms", 6000);
+ config.put("reconnect_backoff_multiplier", 1.5);
+ config.put("reconnect_jitter_ms", 100);
+ config.put("read_buffer_bytes", 4096);
+ config.put("multicast_enabled", true);
+ config.put("multicast_ingress_enabled", true);
+ config.put("multicast_egress_enabled", false);
+ config.put("multicast_group", "239.88.77.66");
+ config.put("multicast_port", 7171);
+ config.put("multicast_interface", "lo0");
+ config.put("multicast_ttl", 3);
+ config.put("multicast_read_buffer_bytes", 2048);
+ config.put("protobuf_descriptor_base64", "AQID");
+ config.put("protobuf_message_name", "TakMessage");
+ config.put("xml_schema_id", "10000000-0000-1000-a000-100000000004");
+ config.put("protobuf_schema_id", "f4f6f837-89ce-4cca-b74d-aa14513c501f");
+ config.put("xml_namespace_aware", false);
+ config.put("xml_coalescing", false);
+ config.put("xml_validating", true);
+ config.put("xml_root_entry", "cotEvent");
+ setField(dto, "config", config);
+
+ TakExtensionConfig parsed = TakExtensionConfig.from(dto);
+
+ assertEquals("tak_proto_v1", parsed.getPayload());
+ assertEquals(TakStreamFramer.Mode.PROTO_STREAM, parsed.getFramingMode());
+ assertEquals(2048, parsed.getMaxPayloadBytes());
+ assertEquals(1500, parsed.getReconnectDelayMs());
+ assertEquals(6000, parsed.getReconnectMaxDelayMs());
+ assertEquals(1.5d, parsed.getReconnectBackoffMultiplier());
+ assertEquals(100, parsed.getReconnectJitterMs());
+ assertEquals(4096, parsed.getReadBufferBytes());
+ assertTrue(parsed.isMulticastEnabled());
+ assertTrue(parsed.isMulticastIngressEnabled());
+ assertFalse(parsed.isMulticastEgressEnabled());
+ assertEquals("239.88.77.66", parsed.getMulticastGroup());
+ assertEquals(7171, parsed.getMulticastPort());
+ assertEquals("lo0", parsed.getMulticastInterface());
+ assertEquals(3, parsed.getMulticastTtl());
+ assertEquals(2048, parsed.getMulticastReadBufferBytes());
+ assertEquals("AQID", parsed.getProtobufDescriptorBase64());
+ assertEquals("TakMessage", parsed.getProtobufMessageName());
+ assertEquals("10000000-0000-1000-a000-100000000004", parsed.getXmlSchemaId());
+ assertEquals("f4f6f837-89ce-4cca-b74d-aa14513c501f", parsed.getProtobufSchemaId());
+ assertFalse(parsed.isXmlNamespaceAware());
+ assertFalse(parsed.isXmlCoalescing());
+ assertTrue(parsed.isXmlValidating());
+ assertEquals("cotEvent", parsed.getXmlRootEntry());
+ }
+
+ @Test
+ void appliesSafeDefaultsWhenMissing() {
+ TakExtensionConfig parsed = TakExtensionConfig.from(null);
+ assertEquals("cot_xml", parsed.getPayload());
+ assertEquals(TakStreamFramer.Mode.XML_STREAM, parsed.getFramingMode());
+ assertEquals(1024 * 1024, parsed.getMaxPayloadBytes());
+ assertEquals(2000, parsed.getReconnectDelayMs());
+ assertEquals(30000, parsed.getReconnectMaxDelayMs());
+ assertEquals(2.0d, parsed.getReconnectBackoffMultiplier());
+ assertEquals(250, parsed.getReconnectJitterMs());
+ assertFalse(parsed.isMulticastEnabled());
+ assertEquals("239.2.3.1", parsed.getMulticastGroup());
+ assertEquals(6969, parsed.getMulticastPort());
+ assertEquals("", parsed.getProtobufDescriptorBase64());
+ assertEquals("", parsed.getProtobufMessageName());
+ assertEquals("", parsed.getXmlSchemaId());
+ assertEquals("", parsed.getProtobufSchemaId());
+ assertTrue(parsed.isXmlNamespaceAware());
+ assertTrue(parsed.isXmlCoalescing());
+ assertFalse(parsed.isXmlValidating());
+ assertEquals("event", parsed.getXmlRootEntry());
+ }
+
+ @Test
+ void clampsReconnectHardeningValues() throws Exception {
+ ExtensionConfigDTO dto = new ExtensionConfigDTO();
+ Map config = new LinkedHashMap<>();
+ config.put("reconnect_delay_ms", 50);
+ config.put("reconnect_max_delay_ms", 10);
+ config.put("reconnect_backoff_multiplier", 0.1d);
+ config.put("reconnect_jitter_ms", -1);
+ setField(dto, "config", config);
+
+ TakExtensionConfig parsed = TakExtensionConfig.from(dto);
+
+ assertEquals(100, parsed.getReconnectDelayMs());
+ assertEquals(100, parsed.getReconnectMaxDelayMs());
+ assertEquals(1.0d, parsed.getReconnectBackoffMultiplier());
+ assertEquals(0, parsed.getReconnectJitterMs());
+ }
+
+ private static void setField(Object target, String fieldName, Object value) throws Exception {
+ Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ }
+}
diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java
new file mode 100644
index 000000000..5da3631b6
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.codec;
+
+import io.mapsmessaging.api.MessageBuilder;
+import io.mapsmessaging.api.message.Message;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class CotXmlCodecTest {
+
+ private static final String VALID_COT = """
+
+
+
+ """;
+
+ @Test
+ void decodeValidCotExtractsCoreFields() throws IOException {
+ CotXmlCodec codec = new CotXmlCodec();
+
+ Message message = codec.decode(VALID_COT.getBytes(StandardCharsets.UTF_8));
+
+ assertNotNull(message.getMeta());
+ assertEquals("u-1", message.getMeta().get("tak.uid"));
+ assertEquals("a-f-G-U-C", message.getMeta().get("tak.type"));
+ assertEquals("51.5007", message.getMeta().get("tak.lat"));
+ assertEquals("-0.1246", message.getMeta().get("tak.lon"));
+ assertEquals("application/cot+xml", message.getContentType());
+ assertNotNull(message.getOpaqueData());
+ }
+
+ @Test
+ void decodeMissingRequiredFieldFails() {
+ CotXmlCodec codec = new CotXmlCodec();
+ String invalid = """
+
+
+
+ """;
+
+ assertThrows(IOException.class, () -> codec.decode(invalid.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ @Test
+ void decodeMalformedXmlFails() {
+ CotXmlCodec codec = new CotXmlCodec();
+ String invalid = " codec.decode(invalid.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ @Test
+ void decodeRejectsXxe() {
+ CotXmlCodec codec = new CotXmlCodec();
+ String xxe = """
+ ]>
+
+
+ &xxe;
+
+ """;
+ assertThrows(IOException.class, () -> codec.decode(xxe.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ @Test
+ void encodeFromMetaBuildsCotXml() throws IOException {
+ CotXmlCodec codec = new CotXmlCodec();
+ Map meta = new LinkedHashMap<>();
+ meta.put("tak.uid", "u-2");
+ meta.put("tak.type", "a-f-G-U-C");
+ meta.put("tak.time", "2026-02-19T09:10:00Z");
+ meta.put("tak.start", "2026-02-19T09:10:00Z");
+ meta.put("tak.stale", "2026-02-19T09:15:00Z");
+ meta.put("tak.how", "m-g");
+ meta.put("tak.lat", "1");
+ meta.put("tak.lon", "2");
+ meta.put("tak.hae", "3");
+ meta.put("tak.ce", "4");
+ meta.put("tak.le", "5");
+
+ Message message = new MessageBuilder().setMeta(meta).build();
+ byte[] xml = codec.encode(message);
+ String text = new String(xml, StandardCharsets.UTF_8);
+
+ assertTrue(text.startsWith("
+
+
+ """;
+ Message message = new MessageBuilder().setOpaqueData(cot.getBytes(StandardCharsets.UTF_8)).build();
+
+ byte[] encoded = codec.encode(message);
+ Message decoded = codec.decode(encoded);
+
+ assertEquals("application/x-protobuf", decoded.getContentType());
+ assertEquals("protobuf", decoded.getMeta().get("tak.format"));
+ assertEquals("u-100", decoded.getMeta().get("tak.uid"));
+ assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type"));
+ assertNotNull(decoded.getOpaqueData());
+ assertTrue(decoded.getOpaqueData().length > 0);
+ }
+
+ @Test
+ void passesThroughBinaryProtobufPayload() throws IOException {
+ TakProtobufCodec codec = new TakProtobufCodec();
+ byte[] raw = new byte[]{0x08, 0x7F, 0x10, 0x01};
+ Message message = new MessageBuilder().setOpaqueData(raw).build();
+
+ byte[] encoded = codec.encode(message);
+ Message decoded = codec.decode(encoded);
+
+ assertArrayEquals(raw, encoded);
+ assertArrayEquals(raw, decoded.getOpaqueData());
+ assertEquals("protobuf", decoded.getMeta().get("tak.format"));
+ }
+
+ @Test
+ void buildsCotFromMetaWhenPayloadMissing() throws IOException {
+ TakProtobufCodec codec = new TakProtobufCodec();
+ Map meta = new LinkedHashMap<>();
+ meta.put("tak.uid", "u-200");
+ meta.put("tak.type", "a-f-G-U-C");
+ meta.put("tak.time", "2026-02-19T09:10:00Z");
+ meta.put("tak.start", "2026-02-19T09:10:00Z");
+ meta.put("tak.stale", "2026-02-19T09:15:00Z");
+ meta.put("tak.how", "m-g");
+ meta.put("tak.lat", "1");
+ meta.put("tak.lon", "2");
+ Message message = new MessageBuilder().setMeta(meta).build();
+
+ byte[] encoded = codec.encode(message);
+ Message decoded = codec.decode(encoded);
+
+ assertEquals("u-200", decoded.getMeta().get("tak.uid"));
+ assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type"));
+ }
+
+ @Test
+ void decodesUsingMapsProtobufFormatterWhenConfigured() throws IOException {
+ ProtoBufSchemaConfig schema = (ProtoBufSchemaConfig) TestProtobufSchemas.protobufSchemaConfig(UUID.randomUUID());
+ String descriptorBase64 = Base64.getEncoder().encodeToString(schema.getProtobufConfig().getDescriptorValue());
+ String messageName = schema.getProtobufConfig().getMessageName();
+ TakProtobufCodec codec = TakProtobufCodec.withSchemaFormatter(new CotXmlCodec(), 1024 * 1024, descriptorBase64, messageName);
+
+ byte[] payload = TestProtobufSchemas.encodeProtobufPayload();
+ Message decoded = codec.decode(payload);
+
+ assertEquals("true", decoded.getMeta().get("tak.protobuf_parsed"));
+ }
+}
diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java
new file mode 100644
index 000000000..17bd80ec1
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.framing;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TakStreamFramerTest {
+
+ @Test
+ void protoFrameRoundTrip() throws IOException {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024 * 1024);
+ byte[] payload = "hello".getBytes(StandardCharsets.UTF_8);
+
+ byte[] framed = framer.frame(payload);
+ List decoded = framer.onBytes(framed, framed.length);
+
+ assertEquals(1, decoded.size());
+ assertArrayEquals(payload, decoded.getFirst());
+ }
+
+ @Test
+ void protoPartialReadReassemblesFrames() throws IOException {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024 * 1024);
+ byte[] payload = "abc123".getBytes(StandardCharsets.UTF_8);
+ byte[] framed = framer.frame(payload);
+
+ List first = framer.onBytes(framed, 2);
+ assertTrue(first.isEmpty());
+ List second = framer.onBytes(slice(framed, 2), framed.length - 2);
+
+ assertEquals(1, second.size());
+ assertArrayEquals(payload, second.getFirst());
+ }
+
+ @Test
+ void protoRejectsOversizedFrame() {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 8);
+ byte[] fake = new byte[]{(byte) 0xBF, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
+ assertThrows(IOException.class, () -> framer.onBytes(fake, fake.length));
+ }
+
+ @Test
+ void protoRejectsInvalidMagic() {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024);
+ byte[] invalid = new byte[]{0x01, 0x01, 0x7F};
+ assertThrows(IOException.class, () -> framer.onBytes(invalid, invalid.length));
+ }
+
+ private static byte[] slice(byte[] source, int from) {
+ byte[] out = new byte[source.length - from];
+ System.arraycopy(source, from, out, 0, out.length);
+ return out;
+ }
+}
diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java
new file mode 100644
index 000000000..f221e518c
--- /dev/null
+++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright [ 2020 - 2024 ] Matthew Buckton
+ * Copyright [ 2024 - 2026 ] MapsMessaging B.V.
+ *
+ * Licensed under the Apache License, Version 2.0 with the Commons Clause
+ * (the "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * https://commonsclause.com/
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.mapsmessaging.network.protocol.impl.tak.framing;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TakStreamFramerXmlTest {
+
+ @Test
+ void xmlDelimitsMultipleEvents() throws IOException {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.XML_STREAM, 1024 * 1024);
+ String payload = "\n\n";
+
+ List frames = framer.onBytes(payload.getBytes(StandardCharsets.UTF_8), payload.length());
+
+ assertEquals(2, frames.size());
+ assertTrue(new String(frames.get(0), StandardCharsets.UTF_8).contains("uid=\"1\""));
+ assertTrue(new String(frames.get(1), StandardCharsets.UTF_8).contains("uid=\"2\""));
+ }
+
+ @Test
+ void xmlPartialEventBuffersUntilComplete() throws IOException {
+ TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.XML_STREAM, 1024 * 1024);
+ byte[] bytes = "".getBytes(StandardCharsets.UTF_8);
+
+ List first = framer.onBytes(bytes, 10);
+ assertTrue(first.isEmpty());
+
+ byte[] rest = new byte[bytes.length - 10];
+ System.arraycopy(bytes, 10, rest, 0, rest.length);
+ List second = framer.onBytes(rest, rest.length);
+ assertEquals(1, second.size());
+ }
+}