From b077ee8b8eb528c5ee98971a0e5202daf4c7e4fe Mon Sep 17 00:00:00 2001 From: krital Date: Thu, 19 Feb 2026 18:12:38 +0200 Subject: [PATCH 1/7] feat(tak): add phase 1 TAK extension integration --- .../tak/TakEndPointConnectionFactory.java | 52 +++++ .../protocol/impl/tak/TakExtension.java | 183 +++++++++++++++++ .../protocol/impl/tak/TakExtensionConfig.java | 106 ++++++++++ .../protocol/impl/tak/TakProtocolFactory.java | 68 +++++++ .../protocol/impl/tak/codec/CotXmlCodec.java | 177 +++++++++++++++++ .../impl/tak/codec/TakPayloadCodec.java | 31 +++ .../impl/tak/codec/TakProtobufCodec.java | 37 ++++ .../impl/tak/framing/TakFrameReader.java | 55 ++++++ .../impl/tak/framing/TakStreamFramer.java | 186 ++++++++++++++++++ .../tak/transport/TakConnectionManager.java | 65 ++++++ .../tak/transport/TakServerConnection.java | 109 ++++++++++ ...aging.network.io.EndPointConnectionFactory | 3 +- ...aging.network.protocol.ProtocolImplFactory | 3 +- .../resources/NetworkConnectionManager.yaml | 27 ++- .../protocol/impl/tak/TakExtensionTest.java | 67 +++++++ .../impl/tak/codec/CotXmlCodecTest.java | 115 +++++++++++ .../impl/tak/framing/TakStreamFramerTest.java | 80 ++++++++ .../tak/framing/TakStreamFramerXmlTest.java | 58 ++++++ 18 files changed, 1419 insertions(+), 3 deletions(-) create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakConnectionManager.java create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java create mode 100644 src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java create mode 100644 src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java create mode 100644 src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java create mode 100644 src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java new file mode 100644 index 000000000..914f8f092 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java @@ -0,0 +1,52 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.network.EndPointURL; +import io.mapsmessaging.network.io.EndPoint; +import io.mapsmessaging.network.io.EndPointConnectedCallback; +import io.mapsmessaging.network.io.EndPointConnectionFactory; +import io.mapsmessaging.network.io.EndPointServerStatus; +import io.mapsmessaging.network.io.impl.SelectorLoadManager; +import io.mapsmessaging.network.protocol.impl.extension.ExtensionEndPointConnectionFactory; + +import java.io.IOException; +import java.util.List; + +public class TakEndPointConnectionFactory implements EndPointConnectionFactory { + + private final ExtensionEndPointConnectionFactory delegate = new ExtensionEndPointConnectionFactory(); + + @Override + public EndPoint connect(EndPointURL url, SelectorLoadManager selector, EndPointConnectedCallback connectedCallback, + EndPointServerStatus endPointServerStatus, List jmxPath) throws IOException { + return delegate.connect(url, selector, connectedCallback, endPointServerStatus, jmxPath); + } + + @Override + public String getName() { + return "tak"; + } + + @Override + public String getDescription() { + return "TAK extension endpoint factory"; + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java new file mode 100644 index 000000000..7afb50138 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java @@ -0,0 +1,183 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.EndPointURL; +import io.mapsmessaging.network.io.EndPoint; +import io.mapsmessaging.network.protocol.impl.extension.Extension; +import io.mapsmessaging.network.protocol.impl.tak.codec.CotXmlCodec; +import io.mapsmessaging.network.protocol.impl.tak.codec.TakPayloadCodec; +import io.mapsmessaging.network.protocol.impl.tak.codec.TakProtobufCodec; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakFrameReader; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer; +import io.mapsmessaging.network.protocol.impl.tak.transport.TakConnectionManager; +import io.mapsmessaging.network.protocol.impl.tak.transport.TakServerConnection; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TakExtension extends Extension { + + private final EndPointURL url; + private final TakExtensionConfig config; + private final TakPayloadCodec payloadCodec; + private final TakStreamFramer streamFramer; + private final TakConnectionManager connectionManager; + private final TakFrameReader frameReader; + private final Set remoteLinks; + private final Set localLinks; + private final AtomicBoolean running; + private volatile Thread readerThread; + + public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionConfig) { + this.url = new EndPointURL(endPoint.getConfig().getUrl()); + this.config = TakExtensionConfig.from(extensionConfig); + this.payloadCodec = TakExtensionConfig.PAYLOAD_TAK_PROTO_V1.equalsIgnoreCase(config.getPayload()) + ? new TakProtobufCodec() + : new CotXmlCodec(); + this.streamFramer = new TakStreamFramer(config.getFramingMode(), config.getMaxPayloadBytes()); + this.connectionManager = new TakConnectionManager(new TakServerConnection(url, Duration.ofSeconds(30))); + this.frameReader = new TakFrameReader(streamFramer, config.getReadBufferBytes()); + this.remoteLinks = ConcurrentHashMap.newKeySet(); + this.localLinks = ConcurrentHashMap.newKeySet(); + this.running = new AtomicBoolean(false); + } + + @Override + public void initialise() throws IOException { + connectionManager.connect(); + running.set(true); + readerThread = new Thread(this::readerLoop, "tak-reader-" + url.getHost() + "-" + url.getPort()); + readerThread.setDaemon(true); + readerThread.start(); + } + + @Override + public void close() throws IOException { + running.set(false); + Thread thread = readerThread; + if (thread != null) { + thread.interrupt(); + try { + thread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + connectionManager.close(); + } + + @Override + public String getName() { + return "TAK"; + } + + @Override + public String getVersion() { + return "1.0"; + } + + @Override + public boolean supportsRemoteFiltering() { + return false; + } + + @Override + public void outbound(String destinationName, Message message) { + if (!running.get()) { + return; + } + try { + byte[] payload = payloadCodec.encode(message); + byte[] framed = streamFramer.frame(payload); + connectionManager.write(framed); + } catch (IOException ignored) { + // Connection failures are handled by reader loop reconnect logic. + } + } + + @Override + public void registerRemoteLink(String destination, @Nullable String filter) { + remoteLinks.add(destination); + } + + @Override + public void registerLocalLink(String destination) { + localLinks.add(destination); + } + + private void readerLoop() { + while (running.get()) { + try { + List frames = frameReader.read(connectionManager.input()); + for (byte[] frame : frames) { + Message message = payloadCodec.decode(frame); + String destination = resolveInboundDestination(message); + inbound(destination, message); + } + } catch (IOException ex) { + if (!running.get()) { + break; + } + reconnectWithDelay(); + } + } + } + + private void reconnectWithDelay() { + while (running.get()) { + try { + Thread.sleep(config.getReconnectDelayMs()); + connectionManager.reconnect(); + return; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (IOException ignored) { + // Keep trying while running. + } + } + } + + private String resolveInboundDestination(Message message) { + String eventType = null; + if (message.getMeta() != null) { + eventType = message.getMeta().get("tak.type"); + } + if (eventType == null || eventType.isBlank()) { + eventType = "unknown"; + } + if (!remoteLinks.isEmpty()) { + String remote = remoteLinks.iterator().next(); + if (remote.contains("#")) { + return remote.replace("#", eventType); + } + return remote; + } + return "tak/cot/" + eventType; + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java new file mode 100644 index 000000000..9e992ac0d --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java @@ -0,0 +1,106 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer; + +import java.util.Locale; +import java.util.Map; + +public class TakExtensionConfig { + + public static final String PAYLOAD_COT_XML = "cot_xml"; + public static final String PAYLOAD_TAK_PROTO_V1 = "tak_proto_v1"; + private static final int DEFAULT_MAX_PAYLOAD = 1024 * 1024; + private static final int DEFAULT_RECONNECT_MS = 2000; + private static final int DEFAULT_READ_BUFFER = 8192; + + private final String payload; + private final TakStreamFramer.Mode framingMode; + private final int maxPayloadBytes; + private final int reconnectDelayMs; + private final int readBufferBytes; + + private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs, int readBufferBytes) { + this.payload = payload; + this.framingMode = framingMode; + this.maxPayloadBytes = maxPayloadBytes; + this.reconnectDelayMs = reconnectDelayMs; + this.readBufferBytes = readBufferBytes; + } + + public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { + Map config = extensionConfig != null ? extensionConfig.getConfig() : null; + String payload = asString(config, "payload", PAYLOAD_COT_XML).toLowerCase(Locale.ROOT); + String framing = asString(config, "framing", "xml_stream").toLowerCase(Locale.ROOT); + TakStreamFramer.Mode mode = "proto_stream".equals(framing) ? TakStreamFramer.Mode.PROTO_STREAM : TakStreamFramer.Mode.XML_STREAM; + int maxPayload = asInt(config, "max_payload_bytes", DEFAULT_MAX_PAYLOAD); + int reconnectMs = asInt(config, "reconnect_delay_ms", DEFAULT_RECONNECT_MS); + int readBuffer = asInt(config, "read_buffer_bytes", DEFAULT_READ_BUFFER); + return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), Math.max(100, reconnectMs), Math.max(512, readBuffer)); + } + + public String getPayload() { + return payload; + } + + public TakStreamFramer.Mode getFramingMode() { + return framingMode; + } + + public int getMaxPayloadBytes() { + return maxPayloadBytes; + } + + public int getReconnectDelayMs() { + return reconnectDelayMs; + } + + public int getReadBufferBytes() { + return readBufferBytes; + } + + private static String asString(Map config, String key, String def) { + if (config == null) { + return def; + } + Object val = config.get(key); + return val == null ? def : val.toString(); + } + + private static int asInt(Map config, String key, int def) { + if (config == null) { + return def; + } + Object val = config.get(key); + if (val == null) { + return def; + } + if (val instanceof Number number) { + return number.intValue(); + } + try { + return Integer.parseInt(val.toString()); + } catch (NumberFormatException ignored) { + return def; + } + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java new file mode 100644 index 000000000..02d419ae7 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java @@ -0,0 +1,68 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.dto.rest.config.protocol.ProtocolConfigDTO; +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.io.EndPoint; +import io.mapsmessaging.network.io.Packet; +import io.mapsmessaging.network.protocol.Protocol; +import io.mapsmessaging.network.protocol.ProtocolImplFactory; +import io.mapsmessaging.network.protocol.detection.NoOpDetection; +import io.mapsmessaging.network.protocol.impl.extension.ExtensionProtocol; + +import java.io.IOException; + +public class TakProtocolFactory extends ProtocolImplFactory { + + public TakProtocolFactory() { + super("tak", "TAK Cursor-on-Target extension protocol", new NoOpDetection()); + } + + @Override + public Protocol connect(EndPoint endPoint, String sessionId, String username, String password) throws IOException { + ExtensionConfigDTO extensionConfig = null; + if (endPoint.getConfig() != null && endPoint.getConfig().getProtocolConfigs() != null) { + for (ProtocolConfigDTO protocolConfig : endPoint.getConfig().getProtocolConfigs()) { + if (protocolConfig instanceof ExtensionConfigDTO config && getName().equalsIgnoreCase(config.getProtocol())) { + extensionConfig = config; + break; + } + } + } + if (extensionConfig == null) { + throw new IOException("TAK extension configuration not found"); + } + + Protocol protocol = new ExtensionProtocol(endPoint, new TakExtension(endPoint, extensionConfig)); + protocol.connect(sessionId, username, password); + return protocol; + } + + @Override + public void create(EndPoint endPoint, Packet packet) { + // TAK extension does not accept inbound client sockets. + } + + @Override + public String getTransportType() { + return "tak"; + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java new file mode 100644 index 000000000..63803d3e2 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java @@ -0,0 +1,177 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import io.mapsmessaging.api.MessageBuilder; +import io.mapsmessaging.api.features.QualityOfService; +import io.mapsmessaging.api.message.Message; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; + +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import java.io.IOException; +import java.io.StringReader; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.LinkedHashMap; +import java.util.Map; + +public class CotXmlCodec implements TakPayloadCodec { + + @Override + public Message decode(byte[] payload) throws IOException { + String xml = new String(payload, StandardCharsets.UTF_8); + Element event = parseRootEvent(xml); + Element point = extractPoint(event); + + String uid = required(event, "uid"); + String type = required(event, "type"); + String time = required(event, "time"); + String start = required(event, "start"); + String stale = required(event, "stale"); + String how = required(event, "how"); + String lat = required(point, "lat"); + String lon = required(point, "lon"); + + Map meta = new LinkedHashMap<>(); + meta.put("tak.uid", uid); + meta.put("tak.type", type); + meta.put("tak.time", time); + meta.put("tak.start", start); + meta.put("tak.stale", stale); + meta.put("tak.how", how); + meta.put("tak.lat", lat); + meta.put("tak.lon", lon); + putIfPresent(meta, "tak.hae", point.getAttribute("hae")); + putIfPresent(meta, "tak.ce", point.getAttribute("ce")); + putIfPresent(meta, "tak.le", point.getAttribute("le")); + meta.put("tak.format", "xml"); + meta.put("tak.transport", "stream"); + + return new MessageBuilder() + .setOpaqueData(payload) + .setMeta(meta) + .setContentType("application/cot+xml") + .setQoS(QualityOfService.AT_MOST_ONCE) + .build(); + } + + @Override + public byte[] encode(Message message) throws IOException { + byte[] opaque = message.getOpaqueData(); + if (opaque != null && opaque.length > 0) { + String candidate = new String(opaque, StandardCharsets.UTF_8).trim(); + if (candidate.startsWith(" meta = message.getMeta() != null ? message.getMeta() : new LinkedHashMap<>(); + String now = Instant.now().truncatedTo(ChronoUnit.SECONDS).toString(); + String stale = Instant.now().plus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS).toString(); + + String uid = getOrDefault(meta, "tak.uid", "maps-generated"); + String type = getOrDefault(meta, "tak.type", "a-f-G-U-C"); + String time = getOrDefault(meta, "tak.time", now); + String start = getOrDefault(meta, "tak.start", now); + String staleTime = getOrDefault(meta, "tak.stale", stale); + String how = getOrDefault(meta, "tak.how", "m-g"); + String lat = getOrDefault(meta, "tak.lat", "0"); + String lon = getOrDefault(meta, "tak.lon", "0"); + String hae = getOrDefault(meta, "tak.hae", "0"); + String ce = getOrDefault(meta, "tak.ce", "9999999"); + String le = getOrDefault(meta, "tak.le", "9999999"); + + String xml = "" + + "" + + ""; + return xml.getBytes(StandardCharsets.UTF_8); + } + + private static String getOrDefault(Map meta, String key, String def) { + String value = meta.get(key); + return value == null || value.isBlank() ? def : value; + } + + private static void putIfPresent(Map meta, String key, String value) { + if (value != null && !value.isBlank()) { + meta.put(key, value); + } + } + + private static Element parseRootEvent(String xml) throws IOException { + try { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + dbf.setNamespaceAware(true); + dbf.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); + dbf.setFeature("http://xml.org/sax/features/external-general-entities", false); + dbf.setFeature("http://xml.org/sax/features/external-parameter-entities", false); + dbf.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true); + dbf.setExpandEntityReferences(false); + dbf.setXIncludeAware(false); + dbf.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, ""); + dbf.setAttribute(XMLConstants.ACCESS_EXTERNAL_SCHEMA, ""); + DocumentBuilder builder = dbf.newDocumentBuilder(); + Document document = builder.parse(new InputSource(new StringReader(xml))); + Element root = document.getDocumentElement(); + if (root == null || !"event".equalsIgnoreCase(root.getLocalName() == null ? root.getNodeName() : root.getLocalName())) { + throw new IOException("Invalid CoT XML payload: root element must be event"); + } + return root; + } catch (ParserConfigurationException | SAXException ex) { + throw new IOException("Invalid CoT XML payload", ex); + } + } + + private static Element extractPoint(Element root) throws IOException { + NodeList byNs = root.getElementsByTagNameNS("*", "point"); + if (byNs.getLength() > 0 && byNs.item(0) instanceof Element element) { + return element; + } + NodeList byName = root.getElementsByTagName("point"); + if (byName.getLength() > 0 && byName.item(0) instanceof Element element) { + return element; + } + throw new IOException("Invalid CoT XML payload: missing point element"); + } + + private static String required(Element element, String name) throws IOException { + String value = element.getAttribute(name); + if (value == null || value.isBlank()) { + throw new IOException("Invalid CoT XML payload: missing " + name); + } + return value; + } + + private static String esc(String value) { + return value + .replace("&", "&") + .replace("\"", """) + .replace("<", "<") + .replace(">", ">"); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java new file mode 100644 index 000000000..0e24b9ca7 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java @@ -0,0 +1,31 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import io.mapsmessaging.api.message.Message; + +import java.io.IOException; + +public interface TakPayloadCodec { + + Message decode(byte[] payload) throws IOException; + + byte[] encode(Message message) throws IOException; +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java new file mode 100644 index 000000000..0323865be --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java @@ -0,0 +1,37 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import io.mapsmessaging.api.message.Message; + +import java.io.IOException; + +public class TakProtobufCodec implements TakPayloadCodec { + + @Override + public Message decode(byte[] payload) throws IOException { + throw new IOException("TAK protobuf codec is not implemented"); + } + + @Override + public byte[] encode(Message message) throws IOException { + throw new IOException("TAK protobuf codec is not implemented"); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java new file mode 100644 index 000000000..1fc0cdf8e --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java @@ -0,0 +1,55 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +public class TakFrameReader { + + private final TakStreamFramer framer; + private final byte[] readBuffer; + + public TakFrameReader(TakStreamFramer framer, int bufferSize) { + this.framer = framer; + this.readBuffer = new byte[Math.max(bufferSize, 256)]; + } + + public List read(InputStream inputStream) throws IOException { + int read = inputStream.read(readBuffer); + if (read < 0) { + throw new EOFException("TAK connection closed"); + } + return framer.onBytes(readBuffer, read); + } + + public List read(ByteBuffer byteBuffer) throws IOException { + if (byteBuffer == null || !byteBuffer.hasRemaining()) { + return List.of(); + } + int len = byteBuffer.remaining(); + byte[] data = new byte[len]; + byteBuffer.get(data); + return framer.onBytes(data, len); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java new file mode 100644 index 000000000..d509eecfe --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java @@ -0,0 +1,186 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TakStreamFramer { + + public enum Mode { + XML_STREAM, + PROTO_STREAM + } + + private static final byte TAK_PROTO_MAGIC = (byte) 0xBF; + private static final byte[] XML_END_TAG = "".getBytes(StandardCharsets.UTF_8); + + private final Mode mode; + private final int maxPayloadBytes; + private byte[] buffer; + + public TakStreamFramer(Mode mode, int maxPayloadBytes) { + this.mode = mode; + this.maxPayloadBytes = maxPayloadBytes; + this.buffer = new byte[0]; + } + + public synchronized List onBytes(byte[] incoming, int length) throws IOException { + if (incoming == null || length <= 0) { + return List.of(); + } + append(incoming, length); + if (buffer.length > maxPayloadBytes * 2) { + throw new IOException("Buffered TAK stream exceeds safe bound"); + } + return switch (mode) { + case XML_STREAM -> parseXml(); + case PROTO_STREAM -> parseProto(); + }; + } + + public synchronized byte[] frame(byte[] payload) throws IOException { + if (payload == null) { + return new byte[0]; + } + if (payload.length > maxPayloadBytes) { + throw new IOException("Payload exceeds max frame size"); + } + if (mode == Mode.XML_STREAM) { + return payload; + } + byte[] varint = encodeVarint(payload.length); + byte[] out = new byte[1 + varint.length + payload.length]; + out[0] = TAK_PROTO_MAGIC; + System.arraycopy(varint, 0, out, 1, varint.length); + System.arraycopy(payload, 0, out, 1 + varint.length, payload.length); + return out; + } + + private List parseXml() throws IOException { + List frames = new ArrayList<>(); + while (true) { + int endTag = indexOf(buffer, XML_END_TAG, 0); + if (endTag < 0) { + break; + } + int frameEnd = endTag + XML_END_TAG.length; + while (frameEnd < buffer.length && Character.isWhitespace((char) buffer[frameEnd])) { + frameEnd++; + } + if (frameEnd > maxPayloadBytes) { + throw new IOException("TAK XML frame exceeds max frame size"); + } + frames.add(Arrays.copyOfRange(buffer, 0, frameEnd)); + buffer = Arrays.copyOfRange(buffer, frameEnd, buffer.length); + } + return frames; + } + + private List parseProto() throws IOException { + List frames = new ArrayList<>(); + while (buffer.length > 0) { + if (buffer[0] != TAK_PROTO_MAGIC) { + throw new IOException("Invalid TAK protobuf stream header"); + } + if (buffer.length < 2) { + break; + } + VarintResult varintResult = decodeVarint(buffer, 1); + if (varintResult == null) { + break; + } + int length = varintResult.value; + if (length < 0 || length > maxPayloadBytes) { + throw new IOException("Invalid TAK protobuf payload length: " + length); + } + int frameLength = 1 + varintResult.bytesRead + length; + if (buffer.length < frameLength) { + break; + } + int payloadOffset = 1 + varintResult.bytesRead; + frames.add(Arrays.copyOfRange(buffer, payloadOffset, payloadOffset + length)); + buffer = Arrays.copyOfRange(buffer, frameLength, buffer.length); + } + return frames; + } + + private void append(byte[] data, int length) { + int copyLength = Math.min(length, data.length); + byte[] next = Arrays.copyOf(buffer, buffer.length + copyLength); + System.arraycopy(data, 0, next, buffer.length, copyLength); + buffer = next; + } + + private static byte[] encodeVarint(int value) { + byte[] tmp = new byte[5]; + int count = 0; + int v = value; + while ((v & ~0x7F) != 0) { + tmp[count++] = (byte) ((v & 0x7F) | 0x80); + v >>>= 7; + } + tmp[count++] = (byte) (v & 0x7F); + return Arrays.copyOf(tmp, count); + } + + private static VarintResult decodeVarint(byte[] data, int offset) throws IOException { + int value = 0; + int shift = 0; + int bytes = 0; + for (int i = offset; i < data.length && bytes < 5; i++) { + int b = data[i] & 0xFF; + value |= (b & 0x7F) << shift; + bytes++; + if ((b & 0x80) == 0) { + return new VarintResult(value, bytes); + } + shift += 7; + } + if (bytes >= 5 && (data[offset + 4] & 0x80) != 0) { + throw new IOException("Malformed varint length"); + } + return null; + } + + private static int indexOf(byte[] haystack, byte[] needle, int from) { + if (needle.length == 0 || haystack.length < needle.length) { + return -1; + } + for (int i = from; i <= haystack.length - needle.length; i++) { + boolean match = true; + for (int j = 0; j < needle.length; j++) { + if (haystack[i + j] != needle[j]) { + match = false; + break; + } + } + if (match) { + return i; + } + } + return -1; + } + + private record VarintResult(int value, int bytesRead) {} +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakConnectionManager.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakConnectionManager.java new file mode 100644 index 000000000..bc131c96e --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakConnectionManager.java @@ -0,0 +1,65 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.transport; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.locks.ReentrantLock; + +public class TakConnectionManager { + + private final TakServerConnection connection; + private final ReentrantLock writeLock; + + public TakConnectionManager(TakServerConnection connection) { + this.connection = connection; + this.writeLock = new ReentrantLock(); + } + + public void connect() throws IOException { + connection.connect(); + } + + public void reconnect() throws IOException { + connection.close(); + connection.connect(); + } + + public boolean isConnected() { + return connection.isConnected(); + } + + public InputStream input() throws IOException { + return connection.getInputStream(); + } + + public void write(byte[] data) throws IOException { + writeLock.lock(); + try { + connection.write(data); + } finally { + writeLock.unlock(); + } + } + + public void close() throws IOException { + connection.close(); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java new file mode 100644 index 000000000..99c4f1dfb --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java @@ -0,0 +1,109 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.transport; + +import io.mapsmessaging.network.EndPointURL; + +import javax.net.SocketFactory; +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.time.Duration; + +public class TakServerConnection { + + private final EndPointURL url; + private final int timeoutMs; + + private Socket socket; + private InputStream inputStream; + private OutputStream outputStream; + + public TakServerConnection(EndPointURL url, Duration timeout) { + this.url = url; + this.timeoutMs = (int) Math.max(1000, timeout.toMillis()); + } + + public synchronized void connect() throws IOException { + close(); + SocketFactory socketFactory = isSecure() ? SSLSocketFactory.getDefault() : SocketFactory.getDefault(); + socket = socketFactory.createSocket(url.getHost(), url.getPort()); + socket.setSoTimeout(timeoutMs); + inputStream = socket.getInputStream(); + outputStream = socket.getOutputStream(); + } + + public synchronized InputStream getInputStream() throws IOException { + if (!isConnected()) { + throw new IOException("TAK connection is not connected"); + } + return inputStream; + } + + public synchronized void write(byte[] data) throws IOException { + if (!isConnected()) { + throw new IOException("TAK connection is not connected"); + } + outputStream.write(data); + outputStream.flush(); + } + + public synchronized boolean isConnected() { + return socket != null && socket.isConnected() && !socket.isClosed(); + } + + public synchronized void close() throws IOException { + IOException deferred = null; + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException ex) { + deferred = ex; + } + inputStream = null; + } + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException ex) { + deferred = ex; + } + outputStream = null; + } + if (socket != null) { + try { + socket.close(); + } catch (IOException ex) { + deferred = ex; + } + socket = null; + } + if (deferred != null) { + throw deferred; + } + } + + private boolean isSecure() { + String protocol = url.getProtocol(); + return "ssl".equalsIgnoreCase(protocol) || "wss".equalsIgnoreCase(protocol); + } +} diff --git a/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory b/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory index 6d4a40add..2cf75a557 100644 --- a/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory +++ b/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory @@ -21,4 +21,5 @@ io.mapsmessaging.network.io.impl.tcp.TCPEndPointConnectionFactory io.mapsmessaging.network.io.impl.ssl.SSLEndPointConnectionFactory io.mapsmessaging.network.io.impl.noop.NoOpEndPointConnectionFactory io.mapsmessaging.network.io.impl.udp.UDPEndPointConnectionFactory -io.mapsmessaging.network.io.impl.serial.SerialEndPointConnectionFactory \ No newline at end of file +io.mapsmessaging.network.io.impl.serial.SerialEndPointConnectionFactory +io.mapsmessaging.network.protocol.impl.tak.TakEndPointConnectionFactory diff --git a/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory b/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory index 7af47b746..46d7b5a9f 100644 --- a/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory +++ b/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory @@ -75,4 +75,5 @@ io.mapsmessaging.network.protocol.impl.apache_pulsar.PulsarProtocolFactory io.mapsmessaging.network.protocol.impl.ibm_mq.MqProtocolFactory io.mapsmessaging.network.protocol.impl.aws_sns.SnsProtocolFactory io.mapsmessaging.network.protocol.impl.v2x_step.V2xStepProtocolFactory -io.mapsmessaging.network.protocol.impl.extension.impl.example.ExampleProtocolFactory \ No newline at end of file +io.mapsmessaging.network.protocol.impl.extension.impl.example.ExampleProtocolFactory +io.mapsmessaging.network.protocol.impl.tak.TakProtocolFactory diff --git a/src/main/resources/NetworkConnectionManager.yaml b/src/main/resources/NetworkConnectionManager.yaml index 942151398..61397a68b 100644 --- a/src/main/resources/NetworkConnectionManager.yaml +++ b/src/main/resources/NetworkConnectionManager.yaml @@ -66,4 +66,29 @@ NetworkConnectionManager: - direction: pull local_namespace: "/" - remote_namespace: "/#" \ No newline at end of file + remote_namespace: "/#" + + # Example TAK bridge (CoT XML stream over TLS) + # - name: "TAK Server Connection" + # url: ssl://tak.example.local:8089/ + # protocol: tak + # plugin: true + # + # config: + # payload: cot_xml + # framing: xml_stream + # max_payload_bytes: 1048576 + # reconnect_delay_ms: 2000 + # + # links: + # - direction: pull + # remote_namespace: "tak/cot/#" + # local_namespace: "tak/cot/" + # selector: "" + # qos: 0 + # + # - direction: push + # remote_namespace: "tak/cot/out/#" + # local_namespace: "ops/cot/out/" + # selector: "tak.type LIKE 'a-%'" + # qos: 0 diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java new file mode 100644 index 000000000..234e4e534 --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java @@ -0,0 +1,67 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TakExtensionTest { + + @Test + void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { + ExtensionConfigDTO dto = new ExtensionConfigDTO(); + Map config = new LinkedHashMap<>(); + config.put("payload", "tak_proto_v1"); + config.put("framing", "proto_stream"); + config.put("max_payload_bytes", 2048); + config.put("reconnect_delay_ms", 1500); + config.put("read_buffer_bytes", 4096); + setField(dto, "config", config); + + TakExtensionConfig parsed = TakExtensionConfig.from(dto); + + assertEquals("tak_proto_v1", parsed.getPayload()); + assertEquals(TakStreamFramer.Mode.PROTO_STREAM, parsed.getFramingMode()); + assertEquals(2048, parsed.getMaxPayloadBytes()); + assertEquals(1500, parsed.getReconnectDelayMs()); + assertEquals(4096, parsed.getReadBufferBytes()); + } + + @Test + void appliesSafeDefaultsWhenMissing() { + TakExtensionConfig parsed = TakExtensionConfig.from(null); + assertEquals("cot_xml", parsed.getPayload()); + assertEquals(TakStreamFramer.Mode.XML_STREAM, parsed.getFramingMode()); + assertEquals(1024 * 1024, parsed.getMaxPayloadBytes()); + } + + private static void setField(Object target, String fieldName, Object value) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java new file mode 100644 index 000000000..5da3631b6 --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java @@ -0,0 +1,115 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import io.mapsmessaging.api.MessageBuilder; +import io.mapsmessaging.api.message.Message; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class CotXmlCodecTest { + + private static final String VALID_COT = """ + + + + """; + + @Test + void decodeValidCotExtractsCoreFields() throws IOException { + CotXmlCodec codec = new CotXmlCodec(); + + Message message = codec.decode(VALID_COT.getBytes(StandardCharsets.UTF_8)); + + assertNotNull(message.getMeta()); + assertEquals("u-1", message.getMeta().get("tak.uid")); + assertEquals("a-f-G-U-C", message.getMeta().get("tak.type")); + assertEquals("51.5007", message.getMeta().get("tak.lat")); + assertEquals("-0.1246", message.getMeta().get("tak.lon")); + assertEquals("application/cot+xml", message.getContentType()); + assertNotNull(message.getOpaqueData()); + } + + @Test + void decodeMissingRequiredFieldFails() { + CotXmlCodec codec = new CotXmlCodec(); + String invalid = """ + + + + """; + + assertThrows(IOException.class, () -> codec.decode(invalid.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void decodeMalformedXmlFails() { + CotXmlCodec codec = new CotXmlCodec(); + String invalid = " codec.decode(invalid.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void decodeRejectsXxe() { + CotXmlCodec codec = new CotXmlCodec(); + String xxe = """ + ]> + + + &xxe; + + """; + assertThrows(IOException.class, () -> codec.decode(xxe.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void encodeFromMetaBuildsCotXml() throws IOException { + CotXmlCodec codec = new CotXmlCodec(); + Map meta = new LinkedHashMap<>(); + meta.put("tak.uid", "u-2"); + meta.put("tak.type", "a-f-G-U-C"); + meta.put("tak.time", "2026-02-19T09:10:00Z"); + meta.put("tak.start", "2026-02-19T09:10:00Z"); + meta.put("tak.stale", "2026-02-19T09:15:00Z"); + meta.put("tak.how", "m-g"); + meta.put("tak.lat", "1"); + meta.put("tak.lon", "2"); + meta.put("tak.hae", "3"); + meta.put("tak.ce", "4"); + meta.put("tak.le", "5"); + + Message message = new MessageBuilder().setMeta(meta).build(); + byte[] xml = codec.encode(message); + String text = new String(xml, StandardCharsets.UTF_8); + + assertTrue(text.startsWith(" decoded = framer.onBytes(framed, framed.length); + + assertEquals(1, decoded.size()); + assertArrayEquals(payload, decoded.getFirst()); + } + + @Test + void protoPartialReadReassemblesFrames() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024 * 1024); + byte[] payload = "abc123".getBytes(StandardCharsets.UTF_8); + byte[] framed = framer.frame(payload); + + List first = framer.onBytes(framed, 2); + assertTrue(first.isEmpty()); + List second = framer.onBytes(slice(framed, 2), framed.length - 2); + + assertEquals(1, second.size()); + assertArrayEquals(payload, second.getFirst()); + } + + @Test + void protoRejectsOversizedFrame() { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 8); + byte[] fake = new byte[]{(byte) 0xBF, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + assertThrows(IOException.class, () -> framer.onBytes(fake, fake.length)); + } + + @Test + void protoRejectsInvalidMagic() { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024); + byte[] invalid = new byte[]{0x01, 0x01, 0x7F}; + assertThrows(IOException.class, () -> framer.onBytes(invalid, invalid.length)); + } + + private static byte[] slice(byte[] source, int from) { + byte[] out = new byte[source.length - from]; + System.arraycopy(source, from, out, 0, out.length); + return out; + } +} diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java new file mode 100644 index 000000000..f221e518c --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java @@ -0,0 +1,58 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TakStreamFramerXmlTest { + + @Test + void xmlDelimitsMultipleEvents() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.XML_STREAM, 1024 * 1024); + String payload = "\n\n"; + + List frames = framer.onBytes(payload.getBytes(StandardCharsets.UTF_8), payload.length()); + + assertEquals(2, frames.size()); + assertTrue(new String(frames.get(0), StandardCharsets.UTF_8).contains("uid=\"1\"")); + assertTrue(new String(frames.get(1), StandardCharsets.UTF_8).contains("uid=\"2\"")); + } + + @Test + void xmlPartialEventBuffersUntilComplete() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.XML_STREAM, 1024 * 1024); + byte[] bytes = "".getBytes(StandardCharsets.UTF_8); + + List first = framer.onBytes(bytes, 10); + assertTrue(first.isEmpty()); + + byte[] rest = new byte[bytes.length - 10]; + System.arraycopy(bytes, 10, rest, 0, rest.length); + List second = framer.onBytes(rest, rest.length); + assertEquals(1, second.size()); + } +} From 07d3e70745471b4d8139581e84a62fe8ee34bac2 Mon Sep 17 00:00:00 2001 From: krital Date: Thu, 19 Feb 2026 20:09:36 +0200 Subject: [PATCH 2/7] feat(tak): add phase 2 protobuf codec support --- .../protocol/impl/tak/TakExtension.java | 2 +- .../impl/tak/codec/TakProtobufCodec.java | 103 +++++++++++++++++- .../impl/tak/codec/TakProtobufCodecTest.java | 93 ++++++++++++++++ 3 files changed, 195 insertions(+), 3 deletions(-) create mode 100644 src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodecTest.java diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java index 7afb50138..bf406af74 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java @@ -57,7 +57,7 @@ public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionCon this.url = new EndPointURL(endPoint.getConfig().getUrl()); this.config = TakExtensionConfig.from(extensionConfig); this.payloadCodec = TakExtensionConfig.PAYLOAD_TAK_PROTO_V1.equalsIgnoreCase(config.getPayload()) - ? new TakProtobufCodec() + ? new TakProtobufCodec(new CotXmlCodec(), config.getMaxPayloadBytes()) : new CotXmlCodec(); this.streamFramer = new TakStreamFramer(config.getFramingMode(), config.getMaxPayloadBytes()); this.connectionManager = new TakConnectionManager(new TakServerConnection(url, Duration.ofSeconds(30))); diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java index 0323865be..ef31f61eb 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java @@ -19,19 +19,118 @@ package io.mapsmessaging.network.protocol.impl.tak.codec; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.WireFormat; +import io.mapsmessaging.api.MessageBuilder; import io.mapsmessaging.api.message.Message; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; public class TakProtobufCodec implements TakPayloadCodec { + private static final int DEFAULT_MAX_PAYLOAD_BYTES = 1024 * 1024; + private final CotXmlCodec cotXmlCodec; + private final int maxPayloadBytes; + + public TakProtobufCodec() { + this(new CotXmlCodec(), DEFAULT_MAX_PAYLOAD_BYTES); + } + + public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes) { + this.cotXmlCodec = cotXmlCodec; + this.maxPayloadBytes = Math.max(1024, maxPayloadBytes); + } + @Override public Message decode(byte[] payload) throws IOException { - throw new IOException("TAK protobuf codec is not implemented"); + if (payload == null || payload.length == 0) { + throw new IOException("Invalid TAK protobuf payload: empty"); + } + if (payload.length > maxPayloadBytes) { + throw new IOException("Invalid TAK protobuf payload: exceeds max size"); + } + + Map meta = new LinkedHashMap<>(); + meta.put("tak.format", "protobuf"); + meta.put("tak.transport", "stream"); + + byte[] embeddedCot = extractEmbeddedCotXml(payload); + if (embeddedCot != null) { + try { + Message cotMessage = cotXmlCodec.decode(embeddedCot); + if (cotMessage.getMeta() != null) { + meta.putAll(cotMessage.getMeta()); + } + meta.put("tak.format", "protobuf"); + meta.put("tak.transport", "stream"); + meta.put("tak.protobuf_embedded_xml", "true"); + } catch (IOException ignored) { + // Keep protobuf metadata only when embedded CoT is malformed. + } + } + + return new MessageBuilder() + .setOpaqueData(payload) + .setContentType("application/x-protobuf") + .setMeta(meta) + .build(); } @Override public byte[] encode(Message message) throws IOException { - throw new IOException("TAK protobuf codec is not implemented"); + byte[] opaque = message.getOpaqueData(); + if (opaque != null && opaque.length > 0 && !looksLikeCotXml(opaque)) { + if (opaque.length > maxPayloadBytes) { + throw new IOException("TAK protobuf payload exceeds max size"); + } + return opaque; + } + + // Fallback mode: wrap CoT XML as protobuf field #1 (length-delimited). + // This keeps the transport protobuf-framed for phase-2 interoperability work. + byte[] cotXml = (opaque != null && opaque.length > 0) ? opaque : cotXmlCodec.encode(message); + if (cotXml.length > maxPayloadBytes) { + throw new IOException("TAK protobuf payload exceeds max size"); + } + return wrapCotInLengthDelimitedField(cotXml); + } + + private static byte[] wrapCotInLengthDelimitedField(byte[] cotXml) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(cotXml.length + 8); + CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out); + codedOutputStream.writeByteArray(1, cotXml); + codedOutputStream.flush(); + return out.toByteArray(); + } + + private static boolean looksLikeCotXml(byte[] payload) { + String candidate = new String(payload, StandardCharsets.UTF_8).trim(); + return candidate.startsWith(" 0 && looksLikeCotXml(candidate)) { + return candidate; + } + } else if (!input.skipField(tag)) { + break; + } + } + return null; } } diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodecTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodecTest.java new file mode 100644 index 000000000..a94c0f410 --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodecTest.java @@ -0,0 +1,93 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import io.mapsmessaging.api.MessageBuilder; +import io.mapsmessaging.api.message.Message; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TakProtobufCodecTest { + + @Test + void encodesCotXmlIntoProtobufPayload() throws IOException { + TakProtobufCodec codec = new TakProtobufCodec(); + String cot = """ + + + + """; + Message message = new MessageBuilder().setOpaqueData(cot.getBytes(StandardCharsets.UTF_8)).build(); + + byte[] encoded = codec.encode(message); + Message decoded = codec.decode(encoded); + + assertEquals("application/x-protobuf", decoded.getContentType()); + assertEquals("protobuf", decoded.getMeta().get("tak.format")); + assertEquals("u-100", decoded.getMeta().get("tak.uid")); + assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type")); + assertNotNull(decoded.getOpaqueData()); + assertTrue(decoded.getOpaqueData().length > 0); + } + + @Test + void passesThroughBinaryProtobufPayload() throws IOException { + TakProtobufCodec codec = new TakProtobufCodec(); + byte[] raw = new byte[]{0x08, 0x7F, 0x10, 0x01}; + Message message = new MessageBuilder().setOpaqueData(raw).build(); + + byte[] encoded = codec.encode(message); + Message decoded = codec.decode(encoded); + + assertArrayEquals(raw, encoded); + assertArrayEquals(raw, decoded.getOpaqueData()); + assertEquals("protobuf", decoded.getMeta().get("tak.format")); + } + + @Test + void buildsCotFromMetaWhenPayloadMissing() throws IOException { + TakProtobufCodec codec = new TakProtobufCodec(); + Map meta = new LinkedHashMap<>(); + meta.put("tak.uid", "u-200"); + meta.put("tak.type", "a-f-G-U-C"); + meta.put("tak.time", "2026-02-19T09:10:00Z"); + meta.put("tak.start", "2026-02-19T09:10:00Z"); + meta.put("tak.stale", "2026-02-19T09:15:00Z"); + meta.put("tak.how", "m-g"); + meta.put("tak.lat", "1"); + meta.put("tak.lon", "2"); + Message message = new MessageBuilder().setMeta(meta).build(); + + byte[] encoded = codec.encode(message); + Message decoded = codec.decode(encoded); + + assertEquals("u-200", decoded.getMeta().get("tak.uid")); + assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type")); + } +} From ad83d85098b6968eb0d61267a57036e01d0fe3be Mon Sep 17 00:00:00 2001 From: krital Date: Thu, 19 Feb 2026 21:33:07 +0200 Subject: [PATCH 3/7] feat(tak): add phase 3 multicast transport support --- .../protocol/impl/tak/TakExtension.java | 46 ++++++ .../protocol/impl/tak/TakExtensionConfig.java | 85 +++++++++++- .../tak/transport/TakMulticastTransport.java | 131 ++++++++++++++++++ .../protocol/impl/tak/TakExtensionTest.java | 21 +++ 4 files changed, 281 insertions(+), 2 deletions(-) create mode 100644 src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java index bf406af74..ac8bca046 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java @@ -30,12 +30,14 @@ import io.mapsmessaging.network.protocol.impl.tak.framing.TakFrameReader; import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer; import io.mapsmessaging.network.protocol.impl.tak.transport.TakConnectionManager; +import io.mapsmessaging.network.protocol.impl.tak.transport.TakMulticastTransport; import io.mapsmessaging.network.protocol.impl.tak.transport.TakServerConnection; import org.jetbrains.annotations.Nullable; import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,11 +49,13 @@ public class TakExtension extends Extension { private final TakPayloadCodec payloadCodec; private final TakStreamFramer streamFramer; private final TakConnectionManager connectionManager; + private final TakMulticastTransport multicastTransport; private final TakFrameReader frameReader; private final Set remoteLinks; private final Set localLinks; private final AtomicBoolean running; private volatile Thread readerThread; + private volatile Thread multicastReaderThread; public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionConfig) { this.url = new EndPointURL(endPoint.getConfig().getUrl()); @@ -61,6 +65,7 @@ public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionCon : new CotXmlCodec(); this.streamFramer = new TakStreamFramer(config.getFramingMode(), config.getMaxPayloadBytes()); this.connectionManager = new TakConnectionManager(new TakServerConnection(url, Duration.ofSeconds(30))); + this.multicastTransport = config.isMulticastEnabled() ? new TakMulticastTransport(config) : null; this.frameReader = new TakFrameReader(streamFramer, config.getReadBufferBytes()); this.remoteLinks = ConcurrentHashMap.newKeySet(); this.localLinks = ConcurrentHashMap.newKeySet(); @@ -70,10 +75,18 @@ public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionCon @Override public void initialise() throws IOException { connectionManager.connect(); + if (multicastTransport != null) { + multicastTransport.start(); + } running.set(true); readerThread = new Thread(this::readerLoop, "tak-reader-" + url.getHost() + "-" + url.getPort()); readerThread.setDaemon(true); readerThread.start(); + if (multicastTransport != null && config.isMulticastIngressEnabled()) { + multicastReaderThread = new Thread(this::multicastReaderLoop, "tak-mcast-reader-" + config.getMulticastGroup() + "-" + config.getMulticastPort()); + multicastReaderThread.setDaemon(true); + multicastReaderThread.start(); + } } @Override @@ -88,6 +101,18 @@ public void close() throws IOException { Thread.currentThread().interrupt(); } } + Thread multicastThread = multicastReaderThread; + if (multicastThread != null) { + multicastThread.interrupt(); + try { + multicastThread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (multicastTransport != null) { + multicastTransport.close(); + } connectionManager.close(); } @@ -115,6 +140,9 @@ public void outbound(String destinationName, Message message) { byte[] payload = payloadCodec.encode(message); byte[] framed = streamFramer.frame(payload); connectionManager.write(framed); + if (multicastTransport != null && config.isMulticastEgressEnabled()) { + multicastTransport.send(payload); + } } catch (IOException ignored) { // Connection failures are handled by reader loop reconnect logic. } @@ -163,6 +191,24 @@ private void reconnectWithDelay() { } } + private void multicastReaderLoop() { + while (running.get()) { + try { + Optional frame = multicastTransport.read(); + if (frame.isEmpty()) { + continue; + } + Message message = payloadCodec.decode(frame.get()); + String destination = resolveInboundDestination(message); + inbound(destination, message); + } catch (IOException ex) { + if (!running.get()) { + break; + } + } + } + } + private String resolveInboundDestination(Message message) { String eventType = null; if (message.getMeta() != null) { diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java index 9e992ac0d..9cb5951c4 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java @@ -32,19 +32,40 @@ public class TakExtensionConfig { private static final int DEFAULT_MAX_PAYLOAD = 1024 * 1024; private static final int DEFAULT_RECONNECT_MS = 2000; private static final int DEFAULT_READ_BUFFER = 8192; + private static final String DEFAULT_MULTICAST_GROUP = "239.2.3.1"; + private static final int DEFAULT_MULTICAST_PORT = 6969; + private static final int DEFAULT_MULTICAST_TTL = 1; private final String payload; private final TakStreamFramer.Mode framingMode; private final int maxPayloadBytes; private final int reconnectDelayMs; private final int readBufferBytes; + private final boolean multicastEnabled; + private final boolean multicastIngressEnabled; + private final boolean multicastEgressEnabled; + private final String multicastGroup; + private final int multicastPort; + private final String multicastInterface; + private final int multicastTtl; + private final int multicastReadBufferBytes; - private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs, int readBufferBytes) { + private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs, int readBufferBytes, + boolean multicastEnabled, boolean multicastIngressEnabled, boolean multicastEgressEnabled, + String multicastGroup, int multicastPort, String multicastInterface, int multicastTtl, int multicastReadBufferBytes) { this.payload = payload; this.framingMode = framingMode; this.maxPayloadBytes = maxPayloadBytes; this.reconnectDelayMs = reconnectDelayMs; this.readBufferBytes = readBufferBytes; + this.multicastEnabled = multicastEnabled; + this.multicastIngressEnabled = multicastIngressEnabled; + this.multicastEgressEnabled = multicastEgressEnabled; + this.multicastGroup = multicastGroup; + this.multicastPort = multicastPort; + this.multicastInterface = multicastInterface; + this.multicastTtl = multicastTtl; + this.multicastReadBufferBytes = multicastReadBufferBytes; } public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { @@ -55,7 +76,21 @@ public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { int maxPayload = asInt(config, "max_payload_bytes", DEFAULT_MAX_PAYLOAD); int reconnectMs = asInt(config, "reconnect_delay_ms", DEFAULT_RECONNECT_MS); int readBuffer = asInt(config, "read_buffer_bytes", DEFAULT_READ_BUFFER); - return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), Math.max(100, reconnectMs), Math.max(512, readBuffer)); + boolean multicastEnabled = asBoolean(config, "multicast_enabled", false); + boolean multicastIngressEnabled = asBoolean(config, "multicast_ingress_enabled", multicastEnabled); + boolean multicastEgressEnabled = asBoolean(config, "multicast_egress_enabled", multicastEnabled); + String multicastGroup = asString(config, "multicast_group", DEFAULT_MULTICAST_GROUP).trim(); + int multicastPort = asInt(config, "multicast_port", DEFAULT_MULTICAST_PORT); + String multicastInterface = asString(config, "multicast_interface", "").trim(); + int multicastTtl = asInt(config, "multicast_ttl", DEFAULT_MULTICAST_TTL); + int multicastReadBuffer = asInt(config, "multicast_read_buffer_bytes", DEFAULT_READ_BUFFER); + return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), Math.max(100, reconnectMs), Math.max(512, readBuffer), + multicastEnabled, multicastIngressEnabled, multicastEgressEnabled, + multicastGroup.isEmpty() ? DEFAULT_MULTICAST_GROUP : multicastGroup, + Math.max(1, multicastPort), + multicastInterface, + Math.max(1, Math.min(255, multicastTtl)), + Math.max(512, multicastReadBuffer)); } public String getPayload() { @@ -78,6 +113,38 @@ public int getReadBufferBytes() { return readBufferBytes; } + public boolean isMulticastEnabled() { + return multicastEnabled; + } + + public boolean isMulticastIngressEnabled() { + return multicastIngressEnabled; + } + + public boolean isMulticastEgressEnabled() { + return multicastEgressEnabled; + } + + public String getMulticastGroup() { + return multicastGroup; + } + + public int getMulticastPort() { + return multicastPort; + } + + public String getMulticastInterface() { + return multicastInterface; + } + + public int getMulticastTtl() { + return multicastTtl; + } + + public int getMulticastReadBufferBytes() { + return multicastReadBufferBytes; + } + private static String asString(Map config, String key, String def) { if (config == null) { return def; @@ -103,4 +170,18 @@ private static int asInt(Map config, String key, int def) { return def; } } + + private static boolean asBoolean(Map config, String key, boolean def) { + if (config == null) { + return def; + } + Object val = config.get(key); + if (val == null) { + return def; + } + if (val instanceof Boolean bool) { + return bool; + } + return Boolean.parseBoolean(val.toString()); + } } diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java new file mode 100644 index 000000000..320849cc5 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java @@ -0,0 +1,131 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.transport; + +import io.mapsmessaging.network.protocol.impl.tak.TakExtensionConfig; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.util.Optional; +import java.util.concurrent.locks.ReentrantLock; + +public class TakMulticastTransport { + + private final TakExtensionConfig config; + private final ReentrantLock sendLock; + + private MulticastSocket socket; + private InetAddress groupAddress; + + public TakMulticastTransport(TakExtensionConfig config) { + this.config = config; + this.sendLock = new ReentrantLock(); + } + + public synchronized void start() throws IOException { + close(); + groupAddress = InetAddress.getByName(config.getMulticastGroup()); + socket = new MulticastSocket(config.getMulticastPort()); + socket.setReuseAddress(true); + socket.setSoTimeout(1000); + socket.setTimeToLive(config.getMulticastTtl()); + + NetworkInterface networkInterface = resolveInterface(); + if (networkInterface != null) { + socket.setNetworkInterface(networkInterface); + socket.joinGroup(new InetSocketAddress(groupAddress, config.getMulticastPort()), networkInterface); + } else { + socket.joinGroup(groupAddress); + } + } + + public Optional read() throws IOException { + MulticastSocket current = getSocket(); + byte[] buffer = new byte[config.getMulticastReadBufferBytes()]; + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + current.receive(packet); + byte[] copy = new byte[packet.getLength()]; + System.arraycopy(packet.getData(), packet.getOffset(), copy, 0, packet.getLength()); + return Optional.of(copy); + } catch (SocketTimeoutException ignored) { + return Optional.empty(); + } + } + + public void send(byte[] payload) throws IOException { + MulticastSocket current = getSocket(); + DatagramPacket packet = new DatagramPacket(payload, payload.length, groupAddress, config.getMulticastPort()); + sendLock.lock(); + try { + current.send(packet); + } finally { + sendLock.unlock(); + } + } + + public synchronized void close() throws IOException { + if (socket == null) { + return; + } + IOException deferred = null; + try { + NetworkInterface networkInterface = resolveInterface(); + if (networkInterface != null && groupAddress != null) { + socket.leaveGroup(new InetSocketAddress(groupAddress, config.getMulticastPort()), networkInterface); + } else if (groupAddress != null) { + socket.leaveGroup(groupAddress); + } + } catch (IOException ex) { + deferred = ex; + } + socket.close(); + socket = null; + groupAddress = null; + if (deferred != null) { + throw deferred; + } + } + + private synchronized MulticastSocket getSocket() throws IOException { + if (socket == null || socket.isClosed()) { + throw new IOException("TAK multicast transport is not started"); + } + return socket; + } + + private NetworkInterface resolveInterface() throws IOException { + String iface = config.getMulticastInterface(); + if (iface == null || iface.isBlank()) { + return null; + } + NetworkInterface named = NetworkInterface.getByName(iface); + if (named != null) { + return named; + } + InetAddress address = InetAddress.getByName(iface); + return NetworkInterface.getByInetAddress(address); + } +} diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java index 234e4e534..a340be201 100644 --- a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java @@ -28,6 +28,8 @@ import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; class TakExtensionTest { @@ -40,6 +42,14 @@ void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { config.put("max_payload_bytes", 2048); config.put("reconnect_delay_ms", 1500); config.put("read_buffer_bytes", 4096); + config.put("multicast_enabled", true); + config.put("multicast_ingress_enabled", true); + config.put("multicast_egress_enabled", false); + config.put("multicast_group", "239.88.77.66"); + config.put("multicast_port", 7171); + config.put("multicast_interface", "lo0"); + config.put("multicast_ttl", 3); + config.put("multicast_read_buffer_bytes", 2048); setField(dto, "config", config); TakExtensionConfig parsed = TakExtensionConfig.from(dto); @@ -49,6 +59,14 @@ void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { assertEquals(2048, parsed.getMaxPayloadBytes()); assertEquals(1500, parsed.getReconnectDelayMs()); assertEquals(4096, parsed.getReadBufferBytes()); + assertTrue(parsed.isMulticastEnabled()); + assertTrue(parsed.isMulticastIngressEnabled()); + assertFalse(parsed.isMulticastEgressEnabled()); + assertEquals("239.88.77.66", parsed.getMulticastGroup()); + assertEquals(7171, parsed.getMulticastPort()); + assertEquals("lo0", parsed.getMulticastInterface()); + assertEquals(3, parsed.getMulticastTtl()); + assertEquals(2048, parsed.getMulticastReadBufferBytes()); } @Test @@ -57,6 +75,9 @@ void appliesSafeDefaultsWhenMissing() { assertEquals("cot_xml", parsed.getPayload()); assertEquals(TakStreamFramer.Mode.XML_STREAM, parsed.getFramingMode()); assertEquals(1024 * 1024, parsed.getMaxPayloadBytes()); + assertFalse(parsed.isMulticastEnabled()); + assertEquals("239.2.3.1", parsed.getMulticastGroup()); + assertEquals(6969, parsed.getMulticastPort()); } private static void setField(Object target, String fieldName, Object value) throws Exception { From fabd3c2942660231efe87060150c63dcbcc29dca Mon Sep 17 00:00:00 2001 From: krital Date: Thu, 19 Feb 2026 21:41:17 +0200 Subject: [PATCH 4/7] feat(tak): phase 4 hardening for reconnect and transport --- .../protocol/impl/tak/TakExtension.java | 33 +++++++++-- .../protocol/impl/tak/TakExtensionConfig.java | 57 ++++++++++++++++++- .../tak/transport/TakMulticastTransport.java | 3 + .../tak/transport/TakServerConnection.java | 15 ++++- .../protocol/impl/tak/TakExtensionTest.java | 28 +++++++++ 5 files changed, 128 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java index ac8bca046..a4db2d1e0 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; import java.util.List; import java.util.Optional; import java.util.Set; @@ -138,10 +139,18 @@ public void outbound(String destinationName, Message message) { } try { byte[] payload = payloadCodec.encode(message); - byte[] framed = streamFramer.frame(payload); - connectionManager.write(framed); + try { + byte[] framed = streamFramer.frame(payload); + connectionManager.write(framed); + } catch (IOException ignored) { + // Stream path is best-effort; multicast egress may still succeed. + } if (multicastTransport != null && config.isMulticastEgressEnabled()) { - multicastTransport.send(payload); + try { + multicastTransport.send(payload); + } catch (IOException ignored) { + // Best-effort multicast path. + } } } catch (IOException ignored) { // Connection failures are handled by reader loop reconnect logic. @@ -177,20 +186,34 @@ private void readerLoop() { } private void reconnectWithDelay() { + long delayMs = config.getReconnectDelayMs(); while (running.get()) { try { - Thread.sleep(config.getReconnectDelayMs()); + Thread.sleep(applyJitter(delayMs, config.getReconnectJitterMs())); connectionManager.reconnect(); return; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } catch (IOException ignored) { - // Keep trying while running. + delayMs = nextDelay(delayMs); } } } + private long nextDelay(long currentDelayMs) { + long multiplied = (long) Math.ceil(currentDelayMs * config.getReconnectBackoffMultiplier()); + return Math.min(config.getReconnectMaxDelayMs(), Math.max(config.getReconnectDelayMs(), multiplied)); + } + + private long applyJitter(long delayMs, int jitterMs) { + if (jitterMs <= 0) { + return delayMs; + } + int jitter = ThreadLocalRandom.current().nextInt(jitterMs + 1); + return delayMs + jitter; + } + private void multicastReaderLoop() { while (running.get()) { try { diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java index 9cb5951c4..a953a5aac 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java @@ -31,6 +31,9 @@ public class TakExtensionConfig { public static final String PAYLOAD_TAK_PROTO_V1 = "tak_proto_v1"; private static final int DEFAULT_MAX_PAYLOAD = 1024 * 1024; private static final int DEFAULT_RECONNECT_MS = 2000; + private static final int DEFAULT_RECONNECT_MAX_MS = 30000; + private static final double DEFAULT_RECONNECT_MULTIPLIER = 2.0d; + private static final int DEFAULT_RECONNECT_JITTER_MS = 250; private static final int DEFAULT_READ_BUFFER = 8192; private static final String DEFAULT_MULTICAST_GROUP = "239.2.3.1"; private static final int DEFAULT_MULTICAST_PORT = 6969; @@ -40,6 +43,9 @@ public class TakExtensionConfig { private final TakStreamFramer.Mode framingMode; private final int maxPayloadBytes; private final int reconnectDelayMs; + private final int reconnectMaxDelayMs; + private final double reconnectBackoffMultiplier; + private final int reconnectJitterMs; private final int readBufferBytes; private final boolean multicastEnabled; private final boolean multicastIngressEnabled; @@ -50,13 +56,17 @@ public class TakExtensionConfig { private final int multicastTtl; private final int multicastReadBufferBytes; - private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs, int readBufferBytes, + private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs, + int reconnectMaxDelayMs, double reconnectBackoffMultiplier, int reconnectJitterMs, int readBufferBytes, boolean multicastEnabled, boolean multicastIngressEnabled, boolean multicastEgressEnabled, String multicastGroup, int multicastPort, String multicastInterface, int multicastTtl, int multicastReadBufferBytes) { this.payload = payload; this.framingMode = framingMode; this.maxPayloadBytes = maxPayloadBytes; this.reconnectDelayMs = reconnectDelayMs; + this.reconnectMaxDelayMs = reconnectMaxDelayMs; + this.reconnectBackoffMultiplier = reconnectBackoffMultiplier; + this.reconnectJitterMs = reconnectJitterMs; this.readBufferBytes = readBufferBytes; this.multicastEnabled = multicastEnabled; this.multicastIngressEnabled = multicastIngressEnabled; @@ -75,6 +85,9 @@ public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { TakStreamFramer.Mode mode = "proto_stream".equals(framing) ? TakStreamFramer.Mode.PROTO_STREAM : TakStreamFramer.Mode.XML_STREAM; int maxPayload = asInt(config, "max_payload_bytes", DEFAULT_MAX_PAYLOAD); int reconnectMs = asInt(config, "reconnect_delay_ms", DEFAULT_RECONNECT_MS); + int reconnectMaxMs = asInt(config, "reconnect_max_delay_ms", DEFAULT_RECONNECT_MAX_MS); + double reconnectMultiplier = asDouble(config, "reconnect_backoff_multiplier", DEFAULT_RECONNECT_MULTIPLIER); + int reconnectJitter = asInt(config, "reconnect_jitter_ms", DEFAULT_RECONNECT_JITTER_MS); int readBuffer = asInt(config, "read_buffer_bytes", DEFAULT_READ_BUFFER); boolean multicastEnabled = asBoolean(config, "multicast_enabled", false); boolean multicastIngressEnabled = asBoolean(config, "multicast_ingress_enabled", multicastEnabled); @@ -84,7 +97,10 @@ public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { String multicastInterface = asString(config, "multicast_interface", "").trim(); int multicastTtl = asInt(config, "multicast_ttl", DEFAULT_MULTICAST_TTL); int multicastReadBuffer = asInt(config, "multicast_read_buffer_bytes", DEFAULT_READ_BUFFER); - return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), Math.max(100, reconnectMs), Math.max(512, readBuffer), + int reconnectBase = Math.max(100, reconnectMs); + int reconnectMax = Math.max(reconnectBase, reconnectMaxMs); + return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), reconnectBase, + reconnectMax, clampMultiplier(reconnectMultiplier), Math.max(0, reconnectJitter), Math.max(512, readBuffer), multicastEnabled, multicastIngressEnabled, multicastEgressEnabled, multicastGroup.isEmpty() ? DEFAULT_MULTICAST_GROUP : multicastGroup, Math.max(1, multicastPort), @@ -109,6 +125,18 @@ public int getReconnectDelayMs() { return reconnectDelayMs; } + public int getReconnectMaxDelayMs() { + return reconnectMaxDelayMs; + } + + public double getReconnectBackoffMultiplier() { + return reconnectBackoffMultiplier; + } + + public int getReconnectJitterMs() { + return reconnectJitterMs; + } + public int getReadBufferBytes() { return readBufferBytes; } @@ -171,6 +199,31 @@ private static int asInt(Map config, String key, int def) { } } + private static double asDouble(Map config, String key, double def) { + if (config == null) { + return def; + } + Object val = config.get(key); + if (val == null) { + return def; + } + if (val instanceof Number number) { + return number.doubleValue(); + } + try { + return Double.parseDouble(val.toString()); + } catch (NumberFormatException ignored) { + return def; + } + } + + private static double clampMultiplier(double multiplier) { + if (Double.isNaN(multiplier) || Double.isInfinite(multiplier)) { + return DEFAULT_RECONNECT_MULTIPLIER; + } + return Math.max(1.0d, Math.min(10.0d, multiplier)); + } + private static boolean asBoolean(Map config, String key, boolean def) { if (config == null) { return def; diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java index 320849cc5..31cd4d76d 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java @@ -67,6 +67,9 @@ public Optional read() throws IOException { DatagramPacket packet = new DatagramPacket(buffer, buffer.length); try { current.receive(packet); + if (packet.getLength() > config.getMaxPayloadBytes()) { + return Optional.empty(); + } byte[] copy = new byte[packet.getLength()]; System.arraycopy(packet.getData(), packet.getOffset(), copy, 0, packet.getLength()); return Optional.of(copy); diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java index 99c4f1dfb..4860087e2 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java @@ -22,10 +22,13 @@ import io.mapsmessaging.network.EndPointURL; import javax.net.SocketFactory; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocketFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.net.Socket; import java.time.Duration; @@ -46,8 +49,18 @@ public TakServerConnection(EndPointURL url, Duration timeout) { public synchronized void connect() throws IOException { close(); SocketFactory socketFactory = isSecure() ? SSLSocketFactory.getDefault() : SocketFactory.getDefault(); - socket = socketFactory.createSocket(url.getHost(), url.getPort()); + socket = socketFactory.createSocket(); + socket.connect(new InetSocketAddress(url.getHost(), url.getPort()), timeoutMs); + socket.setKeepAlive(true); + socket.setTcpNoDelay(true); socket.setSoTimeout(timeoutMs); + if (isSecure() && socket instanceof SSLSocket sslSocket) { + try { + sslSocket.startHandshake(); + } catch (SSLException sslException) { + throw new IOException("TAK TLS handshake failed. Verify trustStore/keyStore and server certificate validity", sslException); + } + } inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); } diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java index a340be201..c15046b0f 100644 --- a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java @@ -41,6 +41,9 @@ void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { config.put("framing", "proto_stream"); config.put("max_payload_bytes", 2048); config.put("reconnect_delay_ms", 1500); + config.put("reconnect_max_delay_ms", 6000); + config.put("reconnect_backoff_multiplier", 1.5); + config.put("reconnect_jitter_ms", 100); config.put("read_buffer_bytes", 4096); config.put("multicast_enabled", true); config.put("multicast_ingress_enabled", true); @@ -58,6 +61,9 @@ void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { assertEquals(TakStreamFramer.Mode.PROTO_STREAM, parsed.getFramingMode()); assertEquals(2048, parsed.getMaxPayloadBytes()); assertEquals(1500, parsed.getReconnectDelayMs()); + assertEquals(6000, parsed.getReconnectMaxDelayMs()); + assertEquals(1.5d, parsed.getReconnectBackoffMultiplier()); + assertEquals(100, parsed.getReconnectJitterMs()); assertEquals(4096, parsed.getReadBufferBytes()); assertTrue(parsed.isMulticastEnabled()); assertTrue(parsed.isMulticastIngressEnabled()); @@ -75,11 +81,33 @@ void appliesSafeDefaultsWhenMissing() { assertEquals("cot_xml", parsed.getPayload()); assertEquals(TakStreamFramer.Mode.XML_STREAM, parsed.getFramingMode()); assertEquals(1024 * 1024, parsed.getMaxPayloadBytes()); + assertEquals(2000, parsed.getReconnectDelayMs()); + assertEquals(30000, parsed.getReconnectMaxDelayMs()); + assertEquals(2.0d, parsed.getReconnectBackoffMultiplier()); + assertEquals(250, parsed.getReconnectJitterMs()); assertFalse(parsed.isMulticastEnabled()); assertEquals("239.2.3.1", parsed.getMulticastGroup()); assertEquals(6969, parsed.getMulticastPort()); } + @Test + void clampsReconnectHardeningValues() throws Exception { + ExtensionConfigDTO dto = new ExtensionConfigDTO(); + Map config = new LinkedHashMap<>(); + config.put("reconnect_delay_ms", 50); + config.put("reconnect_max_delay_ms", 10); + config.put("reconnect_backoff_multiplier", 0.1d); + config.put("reconnect_jitter_ms", -1); + setField(dto, "config", config); + + TakExtensionConfig parsed = TakExtensionConfig.from(dto); + + assertEquals(100, parsed.getReconnectDelayMs()); + assertEquals(100, parsed.getReconnectMaxDelayMs()); + assertEquals(1.0d, parsed.getReconnectBackoffMultiplier()); + assertEquals(0, parsed.getReconnectJitterMs()); + } + private static void setField(Object target, String fieldName, Object value) throws Exception { Field field = target.getClass().getDeclaredField(fieldName); field.setAccessible(true); From 7ae56a21bc6c66509f63cf6e07dfab540638dd3a Mon Sep 17 00:00:00 2001 From: krital Date: Fri, 20 Feb 2026 00:25:10 +0200 Subject: [PATCH 5/7] feat(tak): complete protobuf integration and observability --- .../logging/ServerLogMessages.java | 11 ++ .../protocol/impl/tak/TakExtension.java | 44 +++-- .../protocol/impl/tak/TakExtensionConfig.java | 23 ++- .../impl/tak/codec/TakProtobufCodec.java | 61 ++++++- .../protocol/impl/tak/TakExtensionTest.java | 6 + .../impl/tak/codec/TakProtobufCodecTest.java | 17 ++ .../TakServerConnectionIntegrationTest.java | 164 ++++++++++++++++++ 7 files changed, 313 insertions(+), 13 deletions(-) create mode 100644 src/test/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnectionIntegrationTest.java diff --git a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java index 4231631b4..3d89bec23 100644 --- a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java +++ b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java @@ -244,6 +244,17 @@ public enum ServerLogMessages implements LogMessage { // + // + TAK_EXTENSION_INITIALISED(LEVEL.INFO, SERVER_CATEGORY.PROTOCOL, "TAK extension initialised for {}"), + TAK_EXTENSION_CLOSED(LEVEL.INFO, SERVER_CATEGORY.PROTOCOL, "TAK extension closed for {}"), + TAK_EXTENSION_OUTBOUND_FAILED(LEVEL.WARN, SERVER_CATEGORY.PROTOCOL, "TAK outbound failed for destination {}"), + TAK_EXTENSION_DECODE_FAILED(LEVEL.WARN, SERVER_CATEGORY.PROTOCOL, "TAK decode failed for transport {}"), + TAK_EXTENSION_RECONNECT_ATTEMPT(LEVEL.INFO, SERVER_CATEGORY.NETWORK, "TAK reconnect attempt in {}ms for {}"), + TAK_EXTENSION_RECONNECT_SUCCESS(LEVEL.INFO, SERVER_CATEGORY.NETWORK, "TAK reconnect succeeded for {}"), + TAK_EXTENSION_RECONNECT_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "TAK reconnect failed for {}"), + TAK_MULTICAST_IO_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "TAK multicast IO failed for {}:{}"), + // + // PACKET_SECURITY_VERIFICATION_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "Packet integrity verification failed: ip={}, algorithm={}, reason={}, packetLength={}, signatureSize={}"), PACKET_SECURITY_NOT_INITIALISED(LEVEL.ERROR, SERVER_CATEGORY.NETWORK, "Packet integrity not initialised: algorithm={}"), diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java index a4db2d1e0..944545f9c 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java @@ -21,6 +21,9 @@ import io.mapsmessaging.api.message.Message; import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.logging.Logger; +import io.mapsmessaging.logging.LoggerFactory; +import io.mapsmessaging.logging.ServerLogMessages; import io.mapsmessaging.network.EndPointURL; import io.mapsmessaging.network.io.EndPoint; import io.mapsmessaging.network.protocol.impl.extension.Extension; @@ -46,6 +49,7 @@ public class TakExtension extends Extension { private final EndPointURL url; + private final Logger logger; private final TakExtensionConfig config; private final TakPayloadCodec payloadCodec; private final TakStreamFramer streamFramer; @@ -60,9 +64,14 @@ public class TakExtension extends Extension { public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionConfig) { this.url = new EndPointURL(endPoint.getConfig().getUrl()); + this.logger = LoggerFactory.getLogger(TakExtension.class); this.config = TakExtensionConfig.from(extensionConfig); this.payloadCodec = TakExtensionConfig.PAYLOAD_TAK_PROTO_V1.equalsIgnoreCase(config.getPayload()) - ? new TakProtobufCodec(new CotXmlCodec(), config.getMaxPayloadBytes()) + ? TakProtobufCodec.withSchemaFormatter( + new CotXmlCodec(), + config.getMaxPayloadBytes(), + config.getProtobufDescriptorBase64(), + config.getProtobufMessageName()) : new CotXmlCodec(); this.streamFramer = new TakStreamFramer(config.getFramingMode(), config.getMaxPayloadBytes()); this.connectionManager = new TakConnectionManager(new TakServerConnection(url, Duration.ofSeconds(30))); @@ -79,6 +88,7 @@ public void initialise() throws IOException { if (multicastTransport != null) { multicastTransport.start(); } + logger.log(ServerLogMessages.TAK_EXTENSION_INITIALISED, url.toString()); running.set(true); readerThread = new Thread(this::readerLoop, "tak-reader-" + url.getHost() + "-" + url.getPort()); readerThread.setDaemon(true); @@ -115,6 +125,7 @@ public void close() throws IOException { multicastTransport.close(); } connectionManager.close(); + logger.log(ServerLogMessages.TAK_EXTENSION_CLOSED, url.toString()); } @Override @@ -149,11 +160,11 @@ public void outbound(String destinationName, Message message) { try { multicastTransport.send(payload); } catch (IOException ignored) { - // Best-effort multicast path. + logger.log(ServerLogMessages.TAK_MULTICAST_IO_FAILED, config.getMulticastGroup(), config.getMulticastPort()); } } } catch (IOException ignored) { - // Connection failures are handled by reader loop reconnect logic. + logger.log(ServerLogMessages.TAK_EXTENSION_OUTBOUND_FAILED, destinationName); } } @@ -172,9 +183,13 @@ private void readerLoop() { try { List frames = frameReader.read(connectionManager.input()); for (byte[] frame : frames) { - Message message = payloadCodec.decode(frame); - String destination = resolveInboundDestination(message); - inbound(destination, message); + try { + Message message = payloadCodec.decode(frame); + String destination = resolveInboundDestination(message); + inbound(destination, message); + } catch (IOException decodeFailure) { + logger.log(ServerLogMessages.TAK_EXTENSION_DECODE_FAILED, "stream"); + } } } catch (IOException ex) { if (!running.get()) { @@ -189,13 +204,17 @@ private void reconnectWithDelay() { long delayMs = config.getReconnectDelayMs(); while (running.get()) { try { - Thread.sleep(applyJitter(delayMs, config.getReconnectJitterMs())); + long sleepFor = applyJitter(delayMs, config.getReconnectJitterMs()); + logger.log(ServerLogMessages.TAK_EXTENSION_RECONNECT_ATTEMPT, sleepFor, url.toString()); + Thread.sleep(sleepFor); connectionManager.reconnect(); + logger.log(ServerLogMessages.TAK_EXTENSION_RECONNECT_SUCCESS, url.toString()); return; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } catch (IOException ignored) { + logger.log(ServerLogMessages.TAK_EXTENSION_RECONNECT_FAILED, url.toString()); delayMs = nextDelay(delayMs); } } @@ -221,13 +240,18 @@ private void multicastReaderLoop() { if (frame.isEmpty()) { continue; } - Message message = payloadCodec.decode(frame.get()); - String destination = resolveInboundDestination(message); - inbound(destination, message); + try { + Message message = payloadCodec.decode(frame.get()); + String destination = resolveInboundDestination(message); + inbound(destination, message); + } catch (IOException decodeFailure) { + logger.log(ServerLogMessages.TAK_EXTENSION_DECODE_FAILED, "multicast"); + } } catch (IOException ex) { if (!running.get()) { break; } + logger.log(ServerLogMessages.TAK_MULTICAST_IO_FAILED, config.getMulticastGroup(), config.getMulticastPort()); } } } diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java index a953a5aac..788c5d0d4 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java @@ -38,6 +38,8 @@ public class TakExtensionConfig { private static final String DEFAULT_MULTICAST_GROUP = "239.2.3.1"; private static final int DEFAULT_MULTICAST_PORT = 6969; private static final int DEFAULT_MULTICAST_TTL = 1; + private static final String DEFAULT_PROTOBUF_DESCRIPTOR_BASE64 = ""; + private static final String DEFAULT_PROTOBUF_MESSAGE_NAME = ""; private final String payload; private final TakStreamFramer.Mode framingMode; @@ -55,11 +57,14 @@ public class TakExtensionConfig { private final String multicastInterface; private final int multicastTtl; private final int multicastReadBufferBytes; + private final String protobufDescriptorBase64; + private final String protobufMessageName; private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs, int reconnectMaxDelayMs, double reconnectBackoffMultiplier, int reconnectJitterMs, int readBufferBytes, boolean multicastEnabled, boolean multicastIngressEnabled, boolean multicastEgressEnabled, - String multicastGroup, int multicastPort, String multicastInterface, int multicastTtl, int multicastReadBufferBytes) { + String multicastGroup, int multicastPort, String multicastInterface, int multicastTtl, int multicastReadBufferBytes, + String protobufDescriptorBase64, String protobufMessageName) { this.payload = payload; this.framingMode = framingMode; this.maxPayloadBytes = maxPayloadBytes; @@ -76,6 +81,8 @@ private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int this.multicastInterface = multicastInterface; this.multicastTtl = multicastTtl; this.multicastReadBufferBytes = multicastReadBufferBytes; + this.protobufDescriptorBase64 = protobufDescriptorBase64; + this.protobufMessageName = protobufMessageName; } public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { @@ -97,6 +104,8 @@ public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { String multicastInterface = asString(config, "multicast_interface", "").trim(); int multicastTtl = asInt(config, "multicast_ttl", DEFAULT_MULTICAST_TTL); int multicastReadBuffer = asInt(config, "multicast_read_buffer_bytes", DEFAULT_READ_BUFFER); + String protobufDescriptorBase64 = asString(config, "protobuf_descriptor_base64", DEFAULT_PROTOBUF_DESCRIPTOR_BASE64).trim(); + String protobufMessageName = asString(config, "protobuf_message_name", DEFAULT_PROTOBUF_MESSAGE_NAME).trim(); int reconnectBase = Math.max(100, reconnectMs); int reconnectMax = Math.max(reconnectBase, reconnectMaxMs); return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), reconnectBase, @@ -106,7 +115,9 @@ reconnectMax, clampMultiplier(reconnectMultiplier), Math.max(0, reconnectJitter) Math.max(1, multicastPort), multicastInterface, Math.max(1, Math.min(255, multicastTtl)), - Math.max(512, multicastReadBuffer)); + Math.max(512, multicastReadBuffer), + protobufDescriptorBase64, + protobufMessageName); } public String getPayload() { @@ -173,6 +184,14 @@ public int getMulticastReadBufferBytes() { return multicastReadBufferBytes; } + public String getProtobufDescriptorBase64() { + return protobufDescriptorBase64; + } + + public String getProtobufMessageName() { + return protobufMessageName; + } + private static String asString(Map config, String key, String def) { if (config == null) { return def; diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java index ef31f61eb..683ea49e7 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java @@ -24,10 +24,15 @@ import com.google.protobuf.WireFormat; import io.mapsmessaging.api.MessageBuilder; import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.schemas.config.impl.ProtoBufSchemaConfig; +import io.mapsmessaging.schemas.formatters.MessageFormatter; +import io.mapsmessaging.schemas.formatters.MessageFormatterFactory; +import io.mapsmessaging.selector.IdentifierResolver; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.LinkedHashMap; import java.util.Map; @@ -36,14 +41,25 @@ public class TakProtobufCodec implements TakPayloadCodec { private static final int DEFAULT_MAX_PAYLOAD_BYTES = 1024 * 1024; private final CotXmlCodec cotXmlCodec; private final int maxPayloadBytes; + private final MessageFormatter protobufFormatter; public TakProtobufCodec() { - this(new CotXmlCodec(), DEFAULT_MAX_PAYLOAD_BYTES); + this(new CotXmlCodec(), DEFAULT_MAX_PAYLOAD_BYTES, null); } public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes) { + this(cotXmlCodec, maxPayloadBytes, null); + } + + public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes, MessageFormatter protobufFormatter) { this.cotXmlCodec = cotXmlCodec; this.maxPayloadBytes = Math.max(1024, maxPayloadBytes); + this.protobufFormatter = protobufFormatter; + } + + public static TakProtobufCodec withSchemaFormatter(CotXmlCodec cotXmlCodec, int maxPayloadBytes, + String descriptorBase64, String messageName) { + return new TakProtobufCodec(cotXmlCodec, maxPayloadBytes, createFormatter(descriptorBase64, messageName)); } @Override @@ -72,6 +88,8 @@ public Message decode(byte[] payload) throws IOException { } catch (IOException ignored) { // Keep protobuf metadata only when embedded CoT is malformed. } + } else if (protobufFormatter != null) { + mergeSchemaParsedMeta(meta, payload); } return new MessageBuilder() @@ -133,4 +151,45 @@ private static byte[] extractEmbeddedCotXml(byte[] payload) throws IOException { } return null; } + + private void mergeSchemaParsedMeta(Map meta, byte[] payload) { + try { + IdentifierResolver parsed = protobufFormatter.parse(payload); + if (parsed == null) { + return; + } + promote(parsed, meta, "uid", "tak.uid"); + promote(parsed, meta, "type", "tak.type"); + promote(parsed, meta, "time", "tak.time"); + promote(parsed, meta, "start", "tak.start"); + promote(parsed, meta, "stale", "tak.stale"); + promote(parsed, meta, "how", "tak.how"); + meta.put("tak.protobuf_parsed", "true"); + } catch (Exception ignored) { + // keep base protobuf metadata when schema parsing fails + } + } + + private static void promote(IdentifierResolver parsed, Map meta, String fromKey, String toKey) { + Object val = parsed.get(fromKey); + if (val != null) { + meta.put(toKey, val.toString()); + } + } + + private static MessageFormatter createFormatter(String descriptorBase64, String messageName) { + if (descriptorBase64 == null || descriptorBase64.isBlank() || messageName == null || messageName.isBlank()) { + return null; + } + try { + ProtoBufSchemaConfig schemaConfig = new ProtoBufSchemaConfig(); + ProtoBufSchemaConfig.ProtobufConfig protobufConfig = new ProtoBufSchemaConfig.ProtobufConfig(); + protobufConfig.setDescriptorValue(Base64.getDecoder().decode(descriptorBase64)); + protobufConfig.setMessageName(messageName); + schemaConfig.setProtobufConfig(protobufConfig); + return MessageFormatterFactory.getInstance().getFormatter(schemaConfig); + } catch (Exception ignored) { + return null; + } + } } diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java index c15046b0f..cc1d9231a 100644 --- a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java @@ -53,6 +53,8 @@ void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { config.put("multicast_interface", "lo0"); config.put("multicast_ttl", 3); config.put("multicast_read_buffer_bytes", 2048); + config.put("protobuf_descriptor_base64", "AQID"); + config.put("protobuf_message_name", "TakMessage"); setField(dto, "config", config); TakExtensionConfig parsed = TakExtensionConfig.from(dto); @@ -73,6 +75,8 @@ void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { assertEquals("lo0", parsed.getMulticastInterface()); assertEquals(3, parsed.getMulticastTtl()); assertEquals(2048, parsed.getMulticastReadBufferBytes()); + assertEquals("AQID", parsed.getProtobufDescriptorBase64()); + assertEquals("TakMessage", parsed.getProtobufMessageName()); } @Test @@ -88,6 +92,8 @@ void appliesSafeDefaultsWhenMissing() { assertFalse(parsed.isMulticastEnabled()); assertEquals("239.2.3.1", parsed.getMulticastGroup()); assertEquals(6969, parsed.getMulticastPort()); + assertEquals("", parsed.getProtobufDescriptorBase64()); + assertEquals("", parsed.getProtobufMessageName()); } @Test diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodecTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodecTest.java index a94c0f410..13a21d49f 100644 --- a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodecTest.java +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodecTest.java @@ -19,14 +19,18 @@ package io.mapsmessaging.network.protocol.impl.tak.codec; +import io.mapsmessaging.api.transformers.TestProtobufSchemas; import io.mapsmessaging.api.MessageBuilder; import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.schemas.config.impl.ProtoBufSchemaConfig; import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.LinkedHashMap; import java.util.Map; +import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -90,4 +94,17 @@ void buildsCotFromMetaWhenPayloadMissing() throws IOException { assertEquals("u-200", decoded.getMeta().get("tak.uid")); assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type")); } + + @Test + void decodesUsingMapsProtobufFormatterWhenConfigured() throws IOException { + ProtoBufSchemaConfig schema = (ProtoBufSchemaConfig) TestProtobufSchemas.protobufSchemaConfig(UUID.randomUUID()); + String descriptorBase64 = Base64.getEncoder().encodeToString(schema.getProtobufConfig().getDescriptorValue()); + String messageName = schema.getProtobufConfig().getMessageName(); + TakProtobufCodec codec = TakProtobufCodec.withSchemaFormatter(new CotXmlCodec(), 1024 * 1024, descriptorBase64, messageName); + + byte[] payload = TestProtobufSchemas.encodeProtobufPayload(); + Message decoded = codec.decode(payload); + + assertEquals("true", decoded.getMeta().get("tak.protobuf_parsed")); + } } diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnectionIntegrationTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnectionIntegrationTest.java new file mode 100644 index 000000000..c8f0188a3 --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnectionIntegrationTest.java @@ -0,0 +1,164 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.transport; + +import io.mapsmessaging.network.EndPointURL; +import org.junit.jupiter.api.Test; + +import javax.net.ssl.SSLServerSocketFactory; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TakServerConnectionIntegrationTest { + + @Test + void tcpConnectionRoundTrip() throws Exception { + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + byte[] inbound = "ping".getBytes(); + byte[] outbound = "pong".getBytes(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + Future server = executor.submit(() -> { + try (Socket accepted = serverSocket.accept()) { + byte[] buffer = accepted.getInputStream().readNBytes(inbound.length); + assertArrayEquals(inbound, buffer); + accepted.getOutputStream().write(outbound); + accepted.getOutputStream().flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + TakServerConnection connection = new TakServerConnection(new EndPointURL("tcp://127.0.0.1:" + port), Duration.ofSeconds(2)); + connection.connect(); + connection.write(inbound); + byte[] response = connection.getInputStream().readNBytes(outbound.length); + connection.close(); + + assertArrayEquals(outbound, response); + server.get(3, TimeUnit.SECONDS); + executor.shutdownNow(); + } + } + + @Test + void reconnectsAfterDisconnect() throws Exception { + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + CountDownLatch firstAccepted = new CountDownLatch(1); + CountDownLatch secondAccepted = new CountDownLatch(1); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + Future server = executor.submit(() -> { + try { + try (Socket first = serverSocket.accept()) { + firstAccepted.countDown(); + byte[] firstPayload = first.getInputStream().readNBytes(3); + assertArrayEquals("one".getBytes(), firstPayload); + } + try (Socket second = serverSocket.accept()) { + secondAccepted.countDown(); + byte[] secondPayload = second.getInputStream().readNBytes(3); + assertArrayEquals("two".getBytes(), secondPayload); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + TakConnectionManager manager = new TakConnectionManager(new TakServerConnection(new EndPointURL("tcp://127.0.0.1:" + port), Duration.ofSeconds(2))); + manager.connect(); + manager.write("one".getBytes()); + assertTrue(firstAccepted.await(2, TimeUnit.SECONDS)); + + manager.reconnect(); + manager.write("two".getBytes()); + assertTrue(secondAccepted.await(2, TimeUnit.SECONDS)); + + manager.close(); + server.get(3, TimeUnit.SECONDS); + executor.shutdownNow(); + } + } + + @Test + void tlsHandshakeFailureProvidesActionableMessage() throws Exception { + try (ServerSocket plainServer = new ServerSocket(0)) { + int port = plainServer.getLocalPort(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future server = executor.submit(() -> { + try (Socket ignored = plainServer.accept()) { + // Non-TLS endpoint used intentionally to force TLS handshake failure. + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + TakServerConnection connection = new TakServerConnection(new EndPointURL("ssl://127.0.0.1:" + port), Duration.ofSeconds(2)); + IOException exception = assertThrows(IOException.class, connection::connect); + assertTrue(exception.getMessage().contains("TLS handshake failed")); + server.get(3, TimeUnit.SECONDS); + executor.shutdownNow(); + } + } + + @Test + void tlsRoundTripWithDefaultTestKeystore() throws Exception { + try (var serverSocket = SSLServerSocketFactory.getDefault().createServerSocket(0)) { + int port = serverSocket.getLocalPort(); + byte[] inbound = "hello".getBytes(); + byte[] outbound = "world".getBytes(); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + Future server = executor.submit(() -> { + try (Socket accepted = serverSocket.accept()) { + byte[] buffer = accepted.getInputStream().readNBytes(inbound.length); + assertArrayEquals(inbound, buffer); + accepted.getOutputStream().write(outbound); + accepted.getOutputStream().flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + + TakServerConnection connection = new TakServerConnection(new EndPointURL("ssl://127.0.0.1:" + port), Duration.ofSeconds(2)); + connection.connect(); + connection.write(inbound); + byte[] response = connection.getInputStream().readNBytes(outbound.length); + connection.close(); + + assertArrayEquals(outbound, response); + server.get(3, TimeUnit.SECONDS); + executor.shutdownNow(); + } + } +} From 9cb7a188e86736598d727202cd3d564b9e9c0bae Mon Sep 17 00:00:00 2001 From: krital Date: Fri, 20 Feb 2026 01:21:26 +0200 Subject: [PATCH 6/7] test(tak): add extension reconnect integration coverage --- .../TakExtensionReconnectIntegrationTest.java | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) create mode 100644 src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java new file mode 100644 index 000000000..c5196d243 --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java @@ -0,0 +1,141 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.api.MessageBuilder; +import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.dto.rest.config.network.EndPointServerConfigDTO; +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.io.EndPoint; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class TakExtensionReconnectIntegrationTest { + + @Test + void reconnectsAfterRemoteDropAndResumesOutbound() throws Exception { + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + CountDownLatch firstMessageSeen = new CountDownLatch(1); + CountDownLatch secondMessageSeen = new CountDownLatch(1); + ExecutorService executor = Executors.newSingleThreadExecutor(); + + Future server = executor.submit(() -> { + try { + try (Socket first = serverSocket.accept()) { + first.setSoTimeout(2000); + String firstPayload = readPayload(first); + assertTrue(firstPayload.contains(" config = new LinkedHashMap<>(); + config.put("payload", "cot_xml"); + config.put("framing", "xml_stream"); + config.put("reconnect_delay_ms", 100); + config.put("reconnect_max_delay_ms", 400); + config.put("reconnect_backoff_multiplier", 1.5d); + config.put("reconnect_jitter_ms", 0); + setField(dto, "config", config); + return dto; + } + + private static Message cotMessage(String uid) { + String cot = """ + + + + """.formatted(uid); + return new MessageBuilder().setOpaqueData(cot.getBytes(StandardCharsets.UTF_8)).build(); + } + + private static String readPayload(Socket socket) throws IOException { + byte[] buffer = new byte[4096]; + int read = socket.getInputStream().read(buffer); + if (read <= 0) { + throw new IOException("No payload received"); + } + return new String(buffer, 0, read, StandardCharsets.UTF_8); + } + + private static void setField(Object target, String fieldName, Object value) throws Exception { + var field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} From 446caf7828f4cac6180db30063372492d264c05e Mon Sep 17 00:00:00 2001 From: krital Date: Fri, 20 Feb 2026 01:39:31 +0200 Subject: [PATCH 7/7] feat(tak): use MAPS endpoint transport by default --- .../protocol/impl/extension/Extension.java | 5 ++ .../impl/extension/ExtensionProtocol.java | 2 +- .../protocol/impl/tak/TakExtension.java | 52 ++++++++++++++++--- .../protocol/impl/tak/TakExtensionConfig.java | 13 ++++- .../TakExtensionReconnectIntegrationTest.java | 1 + .../protocol/impl/tak/TakExtensionTest.java | 3 ++ 6 files changed, 67 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java index c339a94fb..92ad6e903 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java @@ -20,6 +20,7 @@ package io.mapsmessaging.network.protocol.impl.extension; import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.network.io.Packet; import jakarta.validation.constraints.NotNull; import lombok.Getter; import lombok.NonNull; @@ -74,6 +75,10 @@ public String getSessionId() { public abstract void registerLocalLink(@NonNull @NotNull String destination) throws IOException; + public boolean processPacket(@NonNull @NotNull Packet packet) throws IOException { + return false; + } + protected void inbound(@NonNull @NotNull String destinationName, @NonNull @NotNull Message message) throws IOException { if (extensionProtocol == null) { diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java index 2ae596900..947f2f538 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java @@ -155,7 +155,7 @@ public ProtocolInformationDTO getInformation() { @Override public boolean processPacket(@NonNull @NotNull Packet packet) throws IOException { - return false; + return extension.processPacket(packet); } @Override diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java index 944545f9c..378c7c498 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java @@ -26,7 +26,9 @@ import io.mapsmessaging.logging.ServerLogMessages; import io.mapsmessaging.network.EndPointURL; import io.mapsmessaging.network.io.EndPoint; +import io.mapsmessaging.network.io.Packet; import io.mapsmessaging.network.protocol.impl.extension.Extension; +import io.mapsmessaging.network.protocol.impl.extension.ExtensionEndPoint; import io.mapsmessaging.network.protocol.impl.tak.codec.CotXmlCodec; import io.mapsmessaging.network.protocol.impl.tak.codec.TakPayloadCodec; import io.mapsmessaging.network.protocol.impl.tak.codec.TakProtobufCodec; @@ -38,6 +40,7 @@ import org.jetbrains.annotations.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Duration; import java.util.concurrent.ThreadLocalRandom; import java.util.List; @@ -50,7 +53,9 @@ public class TakExtension extends Extension { private final EndPointURL url; private final Logger logger; + private final EndPoint endPoint; private final TakExtensionConfig config; + private final boolean mapsManagedTransport; private final TakPayloadCodec payloadCodec; private final TakStreamFramer streamFramer; private final TakConnectionManager connectionManager; @@ -63,9 +68,11 @@ public class TakExtension extends Extension { private volatile Thread multicastReaderThread; public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionConfig) { + this.endPoint = endPoint; this.url = new EndPointURL(endPoint.getConfig().getUrl()); this.logger = LoggerFactory.getLogger(TakExtension.class); this.config = TakExtensionConfig.from(extensionConfig); + this.mapsManagedTransport = config.isUseMapsTransport() && !(endPoint instanceof ExtensionEndPoint); this.payloadCodec = TakExtensionConfig.PAYLOAD_TAK_PROTO_V1.equalsIgnoreCase(config.getPayload()) ? TakProtobufCodec.withSchemaFormatter( new CotXmlCodec(), @@ -84,15 +91,19 @@ public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionCon @Override public void initialise() throws IOException { - connectionManager.connect(); + if (!mapsManagedTransport) { + connectionManager.connect(); + } if (multicastTransport != null) { multicastTransport.start(); } logger.log(ServerLogMessages.TAK_EXTENSION_INITIALISED, url.toString()); running.set(true); - readerThread = new Thread(this::readerLoop, "tak-reader-" + url.getHost() + "-" + url.getPort()); - readerThread.setDaemon(true); - readerThread.start(); + if (!mapsManagedTransport) { + readerThread = new Thread(this::readerLoop, "tak-reader-" + url.getHost() + "-" + url.getPort()); + readerThread.setDaemon(true); + readerThread.start(); + } if (multicastTransport != null && config.isMulticastIngressEnabled()) { multicastReaderThread = new Thread(this::multicastReaderLoop, "tak-mcast-reader-" + config.getMulticastGroup() + "-" + config.getMulticastPort()); multicastReaderThread.setDaemon(true); @@ -124,7 +135,9 @@ public void close() throws IOException { if (multicastTransport != null) { multicastTransport.close(); } - connectionManager.close(); + if (!mapsManagedTransport) { + connectionManager.close(); + } logger.log(ServerLogMessages.TAK_EXTENSION_CLOSED, url.toString()); } @@ -152,7 +165,11 @@ public void outbound(String destinationName, Message message) { byte[] payload = payloadCodec.encode(message); try { byte[] framed = streamFramer.frame(payload); - connectionManager.write(framed); + if (mapsManagedTransport) { + endPoint.sendPacket(new Packet(ByteBuffer.wrap(framed))); + } else { + connectionManager.write(framed); + } } catch (IOException ignored) { // Stream path is best-effort; multicast egress may still succeed. } @@ -220,6 +237,29 @@ private void reconnectWithDelay() { } } + @Override + public boolean processPacket(@org.jetbrains.annotations.NotNull Packet packet) throws IOException { + if (!mapsManagedTransport || !running.get()) { + return false; + } + ByteBuffer raw = packet.getRawBuffer(); + if (raw == null || raw.remaining() <= 0) { + return false; + } + List frames = frameReader.read(raw); + for (byte[] frame : frames) { + try { + Message message = payloadCodec.decode(frame); + String destination = resolveInboundDestination(message); + inbound(destination, message); + } catch (IOException decodeFailure) { + logger.log(ServerLogMessages.TAK_EXTENSION_DECODE_FAILED, "stream"); + } + } + packet.position(packet.limit()); + return true; + } + private long nextDelay(long currentDelayMs) { long multiplied = (long) Math.ceil(currentDelayMs * config.getReconnectBackoffMultiplier()); return Math.min(config.getReconnectMaxDelayMs(), Math.max(config.getReconnectDelayMs(), multiplied)); diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java index 788c5d0d4..733adc478 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java @@ -40,6 +40,7 @@ public class TakExtensionConfig { private static final int DEFAULT_MULTICAST_TTL = 1; private static final String DEFAULT_PROTOBUF_DESCRIPTOR_BASE64 = ""; private static final String DEFAULT_PROTOBUF_MESSAGE_NAME = ""; + private static final boolean DEFAULT_USE_MAPS_TRANSPORT = true; private final String payload; private final TakStreamFramer.Mode framingMode; @@ -59,12 +60,13 @@ public class TakExtensionConfig { private final int multicastReadBufferBytes; private final String protobufDescriptorBase64; private final String protobufMessageName; + private final boolean useMapsTransport; private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs, int reconnectMaxDelayMs, double reconnectBackoffMultiplier, int reconnectJitterMs, int readBufferBytes, boolean multicastEnabled, boolean multicastIngressEnabled, boolean multicastEgressEnabled, String multicastGroup, int multicastPort, String multicastInterface, int multicastTtl, int multicastReadBufferBytes, - String protobufDescriptorBase64, String protobufMessageName) { + String protobufDescriptorBase64, String protobufMessageName, boolean useMapsTransport) { this.payload = payload; this.framingMode = framingMode; this.maxPayloadBytes = maxPayloadBytes; @@ -83,6 +85,7 @@ private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int this.multicastReadBufferBytes = multicastReadBufferBytes; this.protobufDescriptorBase64 = protobufDescriptorBase64; this.protobufMessageName = protobufMessageName; + this.useMapsTransport = useMapsTransport; } public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { @@ -106,6 +109,7 @@ public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { int multicastReadBuffer = asInt(config, "multicast_read_buffer_bytes", DEFAULT_READ_BUFFER); String protobufDescriptorBase64 = asString(config, "protobuf_descriptor_base64", DEFAULT_PROTOBUF_DESCRIPTOR_BASE64).trim(); String protobufMessageName = asString(config, "protobuf_message_name", DEFAULT_PROTOBUF_MESSAGE_NAME).trim(); + boolean useMapsTransport = asBoolean(config, "use_maps_transport", DEFAULT_USE_MAPS_TRANSPORT); int reconnectBase = Math.max(100, reconnectMs); int reconnectMax = Math.max(reconnectBase, reconnectMaxMs); return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), reconnectBase, @@ -117,7 +121,8 @@ reconnectMax, clampMultiplier(reconnectMultiplier), Math.max(0, reconnectJitter) Math.max(1, Math.min(255, multicastTtl)), Math.max(512, multicastReadBuffer), protobufDescriptorBase64, - protobufMessageName); + protobufMessageName, + useMapsTransport); } public String getPayload() { @@ -192,6 +197,10 @@ public String getProtobufMessageName() { return protobufMessageName; } + public boolean isUseMapsTransport() { + return useMapsTransport; + } + private static String asString(Map config, String key, String def) { if (config == null) { return def; diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java index c5196d243..d2d2b6094 100644 --- a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionReconnectIntegrationTest.java @@ -111,6 +111,7 @@ private static ExtensionConfigDTO buildConfig() throws Exception { config.put("reconnect_max_delay_ms", 400); config.put("reconnect_backoff_multiplier", 1.5d); config.put("reconnect_jitter_ms", 0); + config.put("use_maps_transport", false); setField(dto, "config", config); return dto; } diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java index cc1d9231a..c854f2004 100644 --- a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java @@ -55,6 +55,7 @@ void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { config.put("multicast_read_buffer_bytes", 2048); config.put("protobuf_descriptor_base64", "AQID"); config.put("protobuf_message_name", "TakMessage"); + config.put("use_maps_transport", false); setField(dto, "config", config); TakExtensionConfig parsed = TakExtensionConfig.from(dto); @@ -77,6 +78,7 @@ void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { assertEquals(2048, parsed.getMulticastReadBufferBytes()); assertEquals("AQID", parsed.getProtobufDescriptorBase64()); assertEquals("TakMessage", parsed.getProtobufMessageName()); + assertFalse(parsed.isUseMapsTransport()); } @Test @@ -94,6 +96,7 @@ void appliesSafeDefaultsWhenMissing() { assertEquals(6969, parsed.getMulticastPort()); assertEquals("", parsed.getProtobufDescriptorBase64()); assertEquals("", parsed.getProtobufMessageName()); + assertTrue(parsed.isUseMapsTransport()); } @Test