diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java new file mode 100644 index 000000000..914f8f092 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java @@ -0,0 +1,52 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.network.EndPointURL; +import io.mapsmessaging.network.io.EndPoint; +import io.mapsmessaging.network.io.EndPointConnectedCallback; +import io.mapsmessaging.network.io.EndPointConnectionFactory; +import io.mapsmessaging.network.io.EndPointServerStatus; +import io.mapsmessaging.network.io.impl.SelectorLoadManager; +import io.mapsmessaging.network.protocol.impl.extension.ExtensionEndPointConnectionFactory; + +import java.io.IOException; +import java.util.List; + +public class TakEndPointConnectionFactory implements EndPointConnectionFactory { + + private final ExtensionEndPointConnectionFactory delegate = new ExtensionEndPointConnectionFactory(); + + @Override + public EndPoint connect(EndPointURL url, SelectorLoadManager selector, EndPointConnectedCallback connectedCallback, + EndPointServerStatus endPointServerStatus, List jmxPath) throws IOException { + return delegate.connect(url, selector, connectedCallback, endPointServerStatus, jmxPath); + } + + @Override + public String getName() { + return "tak"; + } + + @Override + public String getDescription() { + return "TAK extension endpoint factory"; + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java new file mode 100644 index 000000000..a4db2d1e0 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java @@ -0,0 +1,252 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.EndPointURL; +import io.mapsmessaging.network.io.EndPoint; +import io.mapsmessaging.network.protocol.impl.extension.Extension; +import io.mapsmessaging.network.protocol.impl.tak.codec.CotXmlCodec; +import io.mapsmessaging.network.protocol.impl.tak.codec.TakPayloadCodec; +import io.mapsmessaging.network.protocol.impl.tak.codec.TakProtobufCodec; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakFrameReader; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer; +import io.mapsmessaging.network.protocol.impl.tak.transport.TakConnectionManager; +import io.mapsmessaging.network.protocol.impl.tak.transport.TakMulticastTransport; +import io.mapsmessaging.network.protocol.impl.tak.transport.TakServerConnection; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TakExtension extends Extension { + + private final EndPointURL url; + private final TakExtensionConfig config; + private final TakPayloadCodec payloadCodec; + private final TakStreamFramer streamFramer; + private final TakConnectionManager connectionManager; + private final TakMulticastTransport multicastTransport; + private final TakFrameReader frameReader; + private final Set remoteLinks; + private final Set localLinks; + private final AtomicBoolean running; + private volatile Thread readerThread; + private volatile Thread multicastReaderThread; + + public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionConfig) { + this.url = new EndPointURL(endPoint.getConfig().getUrl()); + this.config = TakExtensionConfig.from(extensionConfig); + this.payloadCodec = TakExtensionConfig.PAYLOAD_TAK_PROTO_V1.equalsIgnoreCase(config.getPayload()) + ? new TakProtobufCodec(new CotXmlCodec(), config.getMaxPayloadBytes()) + : new CotXmlCodec(); + this.streamFramer = new TakStreamFramer(config.getFramingMode(), config.getMaxPayloadBytes()); + this.connectionManager = new TakConnectionManager(new TakServerConnection(url, Duration.ofSeconds(30))); + this.multicastTransport = config.isMulticastEnabled() ? new TakMulticastTransport(config) : null; + this.frameReader = new TakFrameReader(streamFramer, config.getReadBufferBytes()); + this.remoteLinks = ConcurrentHashMap.newKeySet(); + this.localLinks = ConcurrentHashMap.newKeySet(); + this.running = new AtomicBoolean(false); + } + + @Override + public void initialise() throws IOException { + connectionManager.connect(); + if (multicastTransport != null) { + multicastTransport.start(); + } + running.set(true); + readerThread = new Thread(this::readerLoop, "tak-reader-" + url.getHost() + "-" + url.getPort()); + readerThread.setDaemon(true); + readerThread.start(); + if (multicastTransport != null && config.isMulticastIngressEnabled()) { + multicastReaderThread = new Thread(this::multicastReaderLoop, "tak-mcast-reader-" + config.getMulticastGroup() + "-" + config.getMulticastPort()); + multicastReaderThread.setDaemon(true); + multicastReaderThread.start(); + } + } + + @Override + public void close() throws IOException { + running.set(false); + Thread thread = readerThread; + if (thread != null) { + thread.interrupt(); + try { + thread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + Thread multicastThread = multicastReaderThread; + if (multicastThread != null) { + multicastThread.interrupt(); + try { + multicastThread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (multicastTransport != null) { + multicastTransport.close(); + } + connectionManager.close(); + } + + @Override + public String getName() { + return "TAK"; + } + + @Override + public String getVersion() { + return "1.0"; + } + + @Override + public boolean supportsRemoteFiltering() { + return false; + } + + @Override + public void outbound(String destinationName, Message message) { + if (!running.get()) { + return; + } + try { + byte[] payload = payloadCodec.encode(message); + try { + byte[] framed = streamFramer.frame(payload); + connectionManager.write(framed); + } catch (IOException ignored) { + // Stream path is best-effort; multicast egress may still succeed. + } + if (multicastTransport != null && config.isMulticastEgressEnabled()) { + try { + multicastTransport.send(payload); + } catch (IOException ignored) { + // Best-effort multicast path. + } + } + } catch (IOException ignored) { + // Connection failures are handled by reader loop reconnect logic. + } + } + + @Override + public void registerRemoteLink(String destination, @Nullable String filter) { + remoteLinks.add(destination); + } + + @Override + public void registerLocalLink(String destination) { + localLinks.add(destination); + } + + private void readerLoop() { + while (running.get()) { + try { + List frames = frameReader.read(connectionManager.input()); + for (byte[] frame : frames) { + Message message = payloadCodec.decode(frame); + String destination = resolveInboundDestination(message); + inbound(destination, message); + } + } catch (IOException ex) { + if (!running.get()) { + break; + } + reconnectWithDelay(); + } + } + } + + private void reconnectWithDelay() { + long delayMs = config.getReconnectDelayMs(); + while (running.get()) { + try { + Thread.sleep(applyJitter(delayMs, config.getReconnectJitterMs())); + connectionManager.reconnect(); + return; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (IOException ignored) { + delayMs = nextDelay(delayMs); + } + } + } + + private long nextDelay(long currentDelayMs) { + long multiplied = (long) Math.ceil(currentDelayMs * config.getReconnectBackoffMultiplier()); + return Math.min(config.getReconnectMaxDelayMs(), Math.max(config.getReconnectDelayMs(), multiplied)); + } + + private long applyJitter(long delayMs, int jitterMs) { + if (jitterMs <= 0) { + return delayMs; + } + int jitter = ThreadLocalRandom.current().nextInt(jitterMs + 1); + return delayMs + jitter; + } + + private void multicastReaderLoop() { + while (running.get()) { + try { + Optional frame = multicastTransport.read(); + if (frame.isEmpty()) { + continue; + } + Message message = payloadCodec.decode(frame.get()); + String destination = resolveInboundDestination(message); + inbound(destination, message); + } catch (IOException ex) { + if (!running.get()) { + break; + } + } + } + } + + private String resolveInboundDestination(Message message) { + String eventType = null; + if (message.getMeta() != null) { + eventType = message.getMeta().get("tak.type"); + } + if (eventType == null || eventType.isBlank()) { + eventType = "unknown"; + } + if (!remoteLinks.isEmpty()) { + String remote = remoteLinks.iterator().next(); + if (remote.contains("#")) { + return remote.replace("#", eventType); + } + return remote; + } + return "tak/cot/" + eventType; + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java new file mode 100644 index 000000000..a953a5aac --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java @@ -0,0 +1,240 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer; + +import java.util.Locale; +import java.util.Map; + +public class TakExtensionConfig { + + public static final String PAYLOAD_COT_XML = "cot_xml"; + public static final String PAYLOAD_TAK_PROTO_V1 = "tak_proto_v1"; + private static final int DEFAULT_MAX_PAYLOAD = 1024 * 1024; + private static final int DEFAULT_RECONNECT_MS = 2000; + private static final int DEFAULT_RECONNECT_MAX_MS = 30000; + private static final double DEFAULT_RECONNECT_MULTIPLIER = 2.0d; + private static final int DEFAULT_RECONNECT_JITTER_MS = 250; + private static final int DEFAULT_READ_BUFFER = 8192; + private static final String DEFAULT_MULTICAST_GROUP = "239.2.3.1"; + private static final int DEFAULT_MULTICAST_PORT = 6969; + private static final int DEFAULT_MULTICAST_TTL = 1; + + private final String payload; + private final TakStreamFramer.Mode framingMode; + private final int maxPayloadBytes; + private final int reconnectDelayMs; + private final int reconnectMaxDelayMs; + private final double reconnectBackoffMultiplier; + private final int reconnectJitterMs; + private final int readBufferBytes; + private final boolean multicastEnabled; + private final boolean multicastIngressEnabled; + private final boolean multicastEgressEnabled; + private final String multicastGroup; + private final int multicastPort; + private final String multicastInterface; + private final int multicastTtl; + private final int multicastReadBufferBytes; + + private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs, + int reconnectMaxDelayMs, double reconnectBackoffMultiplier, int reconnectJitterMs, int readBufferBytes, + boolean multicastEnabled, boolean multicastIngressEnabled, boolean multicastEgressEnabled, + String multicastGroup, int multicastPort, String multicastInterface, int multicastTtl, int multicastReadBufferBytes) { + this.payload = payload; + this.framingMode = framingMode; + this.maxPayloadBytes = maxPayloadBytes; + this.reconnectDelayMs = reconnectDelayMs; + this.reconnectMaxDelayMs = reconnectMaxDelayMs; + this.reconnectBackoffMultiplier = reconnectBackoffMultiplier; + this.reconnectJitterMs = reconnectJitterMs; + this.readBufferBytes = readBufferBytes; + this.multicastEnabled = multicastEnabled; + this.multicastIngressEnabled = multicastIngressEnabled; + this.multicastEgressEnabled = multicastEgressEnabled; + this.multicastGroup = multicastGroup; + this.multicastPort = multicastPort; + this.multicastInterface = multicastInterface; + this.multicastTtl = multicastTtl; + this.multicastReadBufferBytes = multicastReadBufferBytes; + } + + public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { + Map config = extensionConfig != null ? extensionConfig.getConfig() : null; + String payload = asString(config, "payload", PAYLOAD_COT_XML).toLowerCase(Locale.ROOT); + String framing = asString(config, "framing", "xml_stream").toLowerCase(Locale.ROOT); + TakStreamFramer.Mode mode = "proto_stream".equals(framing) ? TakStreamFramer.Mode.PROTO_STREAM : TakStreamFramer.Mode.XML_STREAM; + int maxPayload = asInt(config, "max_payload_bytes", DEFAULT_MAX_PAYLOAD); + int reconnectMs = asInt(config, "reconnect_delay_ms", DEFAULT_RECONNECT_MS); + int reconnectMaxMs = asInt(config, "reconnect_max_delay_ms", DEFAULT_RECONNECT_MAX_MS); + double reconnectMultiplier = asDouble(config, "reconnect_backoff_multiplier", DEFAULT_RECONNECT_MULTIPLIER); + int reconnectJitter = asInt(config, "reconnect_jitter_ms", DEFAULT_RECONNECT_JITTER_MS); + int readBuffer = asInt(config, "read_buffer_bytes", DEFAULT_READ_BUFFER); + boolean multicastEnabled = asBoolean(config, "multicast_enabled", false); + boolean multicastIngressEnabled = asBoolean(config, "multicast_ingress_enabled", multicastEnabled); + boolean multicastEgressEnabled = asBoolean(config, "multicast_egress_enabled", multicastEnabled); + String multicastGroup = asString(config, "multicast_group", DEFAULT_MULTICAST_GROUP).trim(); + int multicastPort = asInt(config, "multicast_port", DEFAULT_MULTICAST_PORT); + String multicastInterface = asString(config, "multicast_interface", "").trim(); + int multicastTtl = asInt(config, "multicast_ttl", DEFAULT_MULTICAST_TTL); + int multicastReadBuffer = asInt(config, "multicast_read_buffer_bytes", DEFAULT_READ_BUFFER); + int reconnectBase = Math.max(100, reconnectMs); + int reconnectMax = Math.max(reconnectBase, reconnectMaxMs); + return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), reconnectBase, + reconnectMax, clampMultiplier(reconnectMultiplier), Math.max(0, reconnectJitter), Math.max(512, readBuffer), + multicastEnabled, multicastIngressEnabled, multicastEgressEnabled, + multicastGroup.isEmpty() ? DEFAULT_MULTICAST_GROUP : multicastGroup, + Math.max(1, multicastPort), + multicastInterface, + Math.max(1, Math.min(255, multicastTtl)), + Math.max(512, multicastReadBuffer)); + } + + public String getPayload() { + return payload; + } + + public TakStreamFramer.Mode getFramingMode() { + return framingMode; + } + + public int getMaxPayloadBytes() { + return maxPayloadBytes; + } + + public int getReconnectDelayMs() { + return reconnectDelayMs; + } + + public int getReconnectMaxDelayMs() { + return reconnectMaxDelayMs; + } + + public double getReconnectBackoffMultiplier() { + return reconnectBackoffMultiplier; + } + + public int getReconnectJitterMs() { + return reconnectJitterMs; + } + + public int getReadBufferBytes() { + return readBufferBytes; + } + + public boolean isMulticastEnabled() { + return multicastEnabled; + } + + public boolean isMulticastIngressEnabled() { + return multicastIngressEnabled; + } + + public boolean isMulticastEgressEnabled() { + return multicastEgressEnabled; + } + + public String getMulticastGroup() { + return multicastGroup; + } + + public int getMulticastPort() { + return multicastPort; + } + + public String getMulticastInterface() { + return multicastInterface; + } + + public int getMulticastTtl() { + return multicastTtl; + } + + public int getMulticastReadBufferBytes() { + return multicastReadBufferBytes; + } + + private static String asString(Map config, String key, String def) { + if (config == null) { + return def; + } + Object val = config.get(key); + return val == null ? def : val.toString(); + } + + private static int asInt(Map config, String key, int def) { + if (config == null) { + return def; + } + Object val = config.get(key); + if (val == null) { + return def; + } + if (val instanceof Number number) { + return number.intValue(); + } + try { + return Integer.parseInt(val.toString()); + } catch (NumberFormatException ignored) { + return def; + } + } + + private static double asDouble(Map config, String key, double def) { + if (config == null) { + return def; + } + Object val = config.get(key); + if (val == null) { + return def; + } + if (val instanceof Number number) { + return number.doubleValue(); + } + try { + return Double.parseDouble(val.toString()); + } catch (NumberFormatException ignored) { + return def; + } + } + + private static double clampMultiplier(double multiplier) { + if (Double.isNaN(multiplier) || Double.isInfinite(multiplier)) { + return DEFAULT_RECONNECT_MULTIPLIER; + } + return Math.max(1.0d, Math.min(10.0d, multiplier)); + } + + private static boolean asBoolean(Map config, String key, boolean def) { + if (config == null) { + return def; + } + Object val = config.get(key); + if (val == null) { + return def; + } + if (val instanceof Boolean bool) { + return bool; + } + return Boolean.parseBoolean(val.toString()); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java new file mode 100644 index 000000000..02d419ae7 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java @@ -0,0 +1,68 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.dto.rest.config.protocol.ProtocolConfigDTO; +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.io.EndPoint; +import io.mapsmessaging.network.io.Packet; +import io.mapsmessaging.network.protocol.Protocol; +import io.mapsmessaging.network.protocol.ProtocolImplFactory; +import io.mapsmessaging.network.protocol.detection.NoOpDetection; +import io.mapsmessaging.network.protocol.impl.extension.ExtensionProtocol; + +import java.io.IOException; + +public class TakProtocolFactory extends ProtocolImplFactory { + + public TakProtocolFactory() { + super("tak", "TAK Cursor-on-Target extension protocol", new NoOpDetection()); + } + + @Override + public Protocol connect(EndPoint endPoint, String sessionId, String username, String password) throws IOException { + ExtensionConfigDTO extensionConfig = null; + if (endPoint.getConfig() != null && endPoint.getConfig().getProtocolConfigs() != null) { + for (ProtocolConfigDTO protocolConfig : endPoint.getConfig().getProtocolConfigs()) { + if (protocolConfig instanceof ExtensionConfigDTO config && getName().equalsIgnoreCase(config.getProtocol())) { + extensionConfig = config; + break; + } + } + } + if (extensionConfig == null) { + throw new IOException("TAK extension configuration not found"); + } + + Protocol protocol = new ExtensionProtocol(endPoint, new TakExtension(endPoint, extensionConfig)); + protocol.connect(sessionId, username, password); + return protocol; + } + + @Override + public void create(EndPoint endPoint, Packet packet) { + // TAK extension does not accept inbound client sockets. + } + + @Override + public String getTransportType() { + return "tak"; + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java new file mode 100644 index 000000000..63803d3e2 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java @@ -0,0 +1,177 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import io.mapsmessaging.api.MessageBuilder; +import io.mapsmessaging.api.features.QualityOfService; +import io.mapsmessaging.api.message.Message; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; + +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import java.io.IOException; +import java.io.StringReader; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.LinkedHashMap; +import java.util.Map; + +public class CotXmlCodec implements TakPayloadCodec { + + @Override + public Message decode(byte[] payload) throws IOException { + String xml = new String(payload, StandardCharsets.UTF_8); + Element event = parseRootEvent(xml); + Element point = extractPoint(event); + + String uid = required(event, "uid"); + String type = required(event, "type"); + String time = required(event, "time"); + String start = required(event, "start"); + String stale = required(event, "stale"); + String how = required(event, "how"); + String lat = required(point, "lat"); + String lon = required(point, "lon"); + + Map meta = new LinkedHashMap<>(); + meta.put("tak.uid", uid); + meta.put("tak.type", type); + meta.put("tak.time", time); + meta.put("tak.start", start); + meta.put("tak.stale", stale); + meta.put("tak.how", how); + meta.put("tak.lat", lat); + meta.put("tak.lon", lon); + putIfPresent(meta, "tak.hae", point.getAttribute("hae")); + putIfPresent(meta, "tak.ce", point.getAttribute("ce")); + putIfPresent(meta, "tak.le", point.getAttribute("le")); + meta.put("tak.format", "xml"); + meta.put("tak.transport", "stream"); + + return new MessageBuilder() + .setOpaqueData(payload) + .setMeta(meta) + .setContentType("application/cot+xml") + .setQoS(QualityOfService.AT_MOST_ONCE) + .build(); + } + + @Override + public byte[] encode(Message message) throws IOException { + byte[] opaque = message.getOpaqueData(); + if (opaque != null && opaque.length > 0) { + String candidate = new String(opaque, StandardCharsets.UTF_8).trim(); + if (candidate.startsWith(" meta = message.getMeta() != null ? message.getMeta() : new LinkedHashMap<>(); + String now = Instant.now().truncatedTo(ChronoUnit.SECONDS).toString(); + String stale = Instant.now().plus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS).toString(); + + String uid = getOrDefault(meta, "tak.uid", "maps-generated"); + String type = getOrDefault(meta, "tak.type", "a-f-G-U-C"); + String time = getOrDefault(meta, "tak.time", now); + String start = getOrDefault(meta, "tak.start", now); + String staleTime = getOrDefault(meta, "tak.stale", stale); + String how = getOrDefault(meta, "tak.how", "m-g"); + String lat = getOrDefault(meta, "tak.lat", "0"); + String lon = getOrDefault(meta, "tak.lon", "0"); + String hae = getOrDefault(meta, "tak.hae", "0"); + String ce = getOrDefault(meta, "tak.ce", "9999999"); + String le = getOrDefault(meta, "tak.le", "9999999"); + + String xml = "" + + "" + + ""; + return xml.getBytes(StandardCharsets.UTF_8); + } + + private static String getOrDefault(Map meta, String key, String def) { + String value = meta.get(key); + return value == null || value.isBlank() ? def : value; + } + + private static void putIfPresent(Map meta, String key, String value) { + if (value != null && !value.isBlank()) { + meta.put(key, value); + } + } + + private static Element parseRootEvent(String xml) throws IOException { + try { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + dbf.setNamespaceAware(true); + dbf.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); + dbf.setFeature("http://xml.org/sax/features/external-general-entities", false); + dbf.setFeature("http://xml.org/sax/features/external-parameter-entities", false); + dbf.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true); + dbf.setExpandEntityReferences(false); + dbf.setXIncludeAware(false); + dbf.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, ""); + dbf.setAttribute(XMLConstants.ACCESS_EXTERNAL_SCHEMA, ""); + DocumentBuilder builder = dbf.newDocumentBuilder(); + Document document = builder.parse(new InputSource(new StringReader(xml))); + Element root = document.getDocumentElement(); + if (root == null || !"event".equalsIgnoreCase(root.getLocalName() == null ? root.getNodeName() : root.getLocalName())) { + throw new IOException("Invalid CoT XML payload: root element must be event"); + } + return root; + } catch (ParserConfigurationException | SAXException ex) { + throw new IOException("Invalid CoT XML payload", ex); + } + } + + private static Element extractPoint(Element root) throws IOException { + NodeList byNs = root.getElementsByTagNameNS("*", "point"); + if (byNs.getLength() > 0 && byNs.item(0) instanceof Element element) { + return element; + } + NodeList byName = root.getElementsByTagName("point"); + if (byName.getLength() > 0 && byName.item(0) instanceof Element element) { + return element; + } + throw new IOException("Invalid CoT XML payload: missing point element"); + } + + private static String required(Element element, String name) throws IOException { + String value = element.getAttribute(name); + if (value == null || value.isBlank()) { + throw new IOException("Invalid CoT XML payload: missing " + name); + } + return value; + } + + private static String esc(String value) { + return value + .replace("&", "&") + .replace("\"", """) + .replace("<", "<") + .replace(">", ">"); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java new file mode 100644 index 000000000..0e24b9ca7 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java @@ -0,0 +1,31 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import io.mapsmessaging.api.message.Message; + +import java.io.IOException; + +public interface TakPayloadCodec { + + Message decode(byte[] payload) throws IOException; + + byte[] encode(Message message) throws IOException; +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java new file mode 100644 index 000000000..ef31f61eb --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java @@ -0,0 +1,136 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.WireFormat; +import io.mapsmessaging.api.MessageBuilder; +import io.mapsmessaging.api.message.Message; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; + +public class TakProtobufCodec implements TakPayloadCodec { + + private static final int DEFAULT_MAX_PAYLOAD_BYTES = 1024 * 1024; + private final CotXmlCodec cotXmlCodec; + private final int maxPayloadBytes; + + public TakProtobufCodec() { + this(new CotXmlCodec(), DEFAULT_MAX_PAYLOAD_BYTES); + } + + public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes) { + this.cotXmlCodec = cotXmlCodec; + this.maxPayloadBytes = Math.max(1024, maxPayloadBytes); + } + + @Override + public Message decode(byte[] payload) throws IOException { + if (payload == null || payload.length == 0) { + throw new IOException("Invalid TAK protobuf payload: empty"); + } + if (payload.length > maxPayloadBytes) { + throw new IOException("Invalid TAK protobuf payload: exceeds max size"); + } + + Map meta = new LinkedHashMap<>(); + meta.put("tak.format", "protobuf"); + meta.put("tak.transport", "stream"); + + byte[] embeddedCot = extractEmbeddedCotXml(payload); + if (embeddedCot != null) { + try { + Message cotMessage = cotXmlCodec.decode(embeddedCot); + if (cotMessage.getMeta() != null) { + meta.putAll(cotMessage.getMeta()); + } + meta.put("tak.format", "protobuf"); + meta.put("tak.transport", "stream"); + meta.put("tak.protobuf_embedded_xml", "true"); + } catch (IOException ignored) { + // Keep protobuf metadata only when embedded CoT is malformed. + } + } + + return new MessageBuilder() + .setOpaqueData(payload) + .setContentType("application/x-protobuf") + .setMeta(meta) + .build(); + } + + @Override + public byte[] encode(Message message) throws IOException { + byte[] opaque = message.getOpaqueData(); + if (opaque != null && opaque.length > 0 && !looksLikeCotXml(opaque)) { + if (opaque.length > maxPayloadBytes) { + throw new IOException("TAK protobuf payload exceeds max size"); + } + return opaque; + } + + // Fallback mode: wrap CoT XML as protobuf field #1 (length-delimited). + // This keeps the transport protobuf-framed for phase-2 interoperability work. + byte[] cotXml = (opaque != null && opaque.length > 0) ? opaque : cotXmlCodec.encode(message); + if (cotXml.length > maxPayloadBytes) { + throw new IOException("TAK protobuf payload exceeds max size"); + } + return wrapCotInLengthDelimitedField(cotXml); + } + + private static byte[] wrapCotInLengthDelimitedField(byte[] cotXml) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(cotXml.length + 8); + CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out); + codedOutputStream.writeByteArray(1, cotXml); + codedOutputStream.flush(); + return out.toByteArray(); + } + + private static boolean looksLikeCotXml(byte[] payload) { + String candidate = new String(payload, StandardCharsets.UTF_8).trim(); + return candidate.startsWith(" 0 && looksLikeCotXml(candidate)) { + return candidate; + } + } else if (!input.skipField(tag)) { + break; + } + } + return null; + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java new file mode 100644 index 000000000..1fc0cdf8e --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java @@ -0,0 +1,55 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +public class TakFrameReader { + + private final TakStreamFramer framer; + private final byte[] readBuffer; + + public TakFrameReader(TakStreamFramer framer, int bufferSize) { + this.framer = framer; + this.readBuffer = new byte[Math.max(bufferSize, 256)]; + } + + public List read(InputStream inputStream) throws IOException { + int read = inputStream.read(readBuffer); + if (read < 0) { + throw new EOFException("TAK connection closed"); + } + return framer.onBytes(readBuffer, read); + } + + public List read(ByteBuffer byteBuffer) throws IOException { + if (byteBuffer == null || !byteBuffer.hasRemaining()) { + return List.of(); + } + int len = byteBuffer.remaining(); + byte[] data = new byte[len]; + byteBuffer.get(data); + return framer.onBytes(data, len); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java new file mode 100644 index 000000000..d509eecfe --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java @@ -0,0 +1,186 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TakStreamFramer { + + public enum Mode { + XML_STREAM, + PROTO_STREAM + } + + private static final byte TAK_PROTO_MAGIC = (byte) 0xBF; + private static final byte[] XML_END_TAG = "".getBytes(StandardCharsets.UTF_8); + + private final Mode mode; + private final int maxPayloadBytes; + private byte[] buffer; + + public TakStreamFramer(Mode mode, int maxPayloadBytes) { + this.mode = mode; + this.maxPayloadBytes = maxPayloadBytes; + this.buffer = new byte[0]; + } + + public synchronized List onBytes(byte[] incoming, int length) throws IOException { + if (incoming == null || length <= 0) { + return List.of(); + } + append(incoming, length); + if (buffer.length > maxPayloadBytes * 2) { + throw new IOException("Buffered TAK stream exceeds safe bound"); + } + return switch (mode) { + case XML_STREAM -> parseXml(); + case PROTO_STREAM -> parseProto(); + }; + } + + public synchronized byte[] frame(byte[] payload) throws IOException { + if (payload == null) { + return new byte[0]; + } + if (payload.length > maxPayloadBytes) { + throw new IOException("Payload exceeds max frame size"); + } + if (mode == Mode.XML_STREAM) { + return payload; + } + byte[] varint = encodeVarint(payload.length); + byte[] out = new byte[1 + varint.length + payload.length]; + out[0] = TAK_PROTO_MAGIC; + System.arraycopy(varint, 0, out, 1, varint.length); + System.arraycopy(payload, 0, out, 1 + varint.length, payload.length); + return out; + } + + private List parseXml() throws IOException { + List frames = new ArrayList<>(); + while (true) { + int endTag = indexOf(buffer, XML_END_TAG, 0); + if (endTag < 0) { + break; + } + int frameEnd = endTag + XML_END_TAG.length; + while (frameEnd < buffer.length && Character.isWhitespace((char) buffer[frameEnd])) { + frameEnd++; + } + if (frameEnd > maxPayloadBytes) { + throw new IOException("TAK XML frame exceeds max frame size"); + } + frames.add(Arrays.copyOfRange(buffer, 0, frameEnd)); + buffer = Arrays.copyOfRange(buffer, frameEnd, buffer.length); + } + return frames; + } + + private List parseProto() throws IOException { + List frames = new ArrayList<>(); + while (buffer.length > 0) { + if (buffer[0] != TAK_PROTO_MAGIC) { + throw new IOException("Invalid TAK protobuf stream header"); + } + if (buffer.length < 2) { + break; + } + VarintResult varintResult = decodeVarint(buffer, 1); + if (varintResult == null) { + break; + } + int length = varintResult.value; + if (length < 0 || length > maxPayloadBytes) { + throw new IOException("Invalid TAK protobuf payload length: " + length); + } + int frameLength = 1 + varintResult.bytesRead + length; + if (buffer.length < frameLength) { + break; + } + int payloadOffset = 1 + varintResult.bytesRead; + frames.add(Arrays.copyOfRange(buffer, payloadOffset, payloadOffset + length)); + buffer = Arrays.copyOfRange(buffer, frameLength, buffer.length); + } + return frames; + } + + private void append(byte[] data, int length) { + int copyLength = Math.min(length, data.length); + byte[] next = Arrays.copyOf(buffer, buffer.length + copyLength); + System.arraycopy(data, 0, next, buffer.length, copyLength); + buffer = next; + } + + private static byte[] encodeVarint(int value) { + byte[] tmp = new byte[5]; + int count = 0; + int v = value; + while ((v & ~0x7F) != 0) { + tmp[count++] = (byte) ((v & 0x7F) | 0x80); + v >>>= 7; + } + tmp[count++] = (byte) (v & 0x7F); + return Arrays.copyOf(tmp, count); + } + + private static VarintResult decodeVarint(byte[] data, int offset) throws IOException { + int value = 0; + int shift = 0; + int bytes = 0; + for (int i = offset; i < data.length && bytes < 5; i++) { + int b = data[i] & 0xFF; + value |= (b & 0x7F) << shift; + bytes++; + if ((b & 0x80) == 0) { + return new VarintResult(value, bytes); + } + shift += 7; + } + if (bytes >= 5 && (data[offset + 4] & 0x80) != 0) { + throw new IOException("Malformed varint length"); + } + return null; + } + + private static int indexOf(byte[] haystack, byte[] needle, int from) { + if (needle.length == 0 || haystack.length < needle.length) { + return -1; + } + for (int i = from; i <= haystack.length - needle.length; i++) { + boolean match = true; + for (int j = 0; j < needle.length; j++) { + if (haystack[i + j] != needle[j]) { + match = false; + break; + } + } + if (match) { + return i; + } + } + return -1; + } + + private record VarintResult(int value, int bytesRead) {} +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakConnectionManager.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakConnectionManager.java new file mode 100644 index 000000000..bc131c96e --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakConnectionManager.java @@ -0,0 +1,65 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.transport; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.locks.ReentrantLock; + +public class TakConnectionManager { + + private final TakServerConnection connection; + private final ReentrantLock writeLock; + + public TakConnectionManager(TakServerConnection connection) { + this.connection = connection; + this.writeLock = new ReentrantLock(); + } + + public void connect() throws IOException { + connection.connect(); + } + + public void reconnect() throws IOException { + connection.close(); + connection.connect(); + } + + public boolean isConnected() { + return connection.isConnected(); + } + + public InputStream input() throws IOException { + return connection.getInputStream(); + } + + public void write(byte[] data) throws IOException { + writeLock.lock(); + try { + connection.write(data); + } finally { + writeLock.unlock(); + } + } + + public void close() throws IOException { + connection.close(); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java new file mode 100644 index 000000000..31cd4d76d --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java @@ -0,0 +1,134 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.transport; + +import io.mapsmessaging.network.protocol.impl.tak.TakExtensionConfig; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.util.Optional; +import java.util.concurrent.locks.ReentrantLock; + +public class TakMulticastTransport { + + private final TakExtensionConfig config; + private final ReentrantLock sendLock; + + private MulticastSocket socket; + private InetAddress groupAddress; + + public TakMulticastTransport(TakExtensionConfig config) { + this.config = config; + this.sendLock = new ReentrantLock(); + } + + public synchronized void start() throws IOException { + close(); + groupAddress = InetAddress.getByName(config.getMulticastGroup()); + socket = new MulticastSocket(config.getMulticastPort()); + socket.setReuseAddress(true); + socket.setSoTimeout(1000); + socket.setTimeToLive(config.getMulticastTtl()); + + NetworkInterface networkInterface = resolveInterface(); + if (networkInterface != null) { + socket.setNetworkInterface(networkInterface); + socket.joinGroup(new InetSocketAddress(groupAddress, config.getMulticastPort()), networkInterface); + } else { + socket.joinGroup(groupAddress); + } + } + + public Optional read() throws IOException { + MulticastSocket current = getSocket(); + byte[] buffer = new byte[config.getMulticastReadBufferBytes()]; + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + current.receive(packet); + if (packet.getLength() > config.getMaxPayloadBytes()) { + return Optional.empty(); + } + byte[] copy = new byte[packet.getLength()]; + System.arraycopy(packet.getData(), packet.getOffset(), copy, 0, packet.getLength()); + return Optional.of(copy); + } catch (SocketTimeoutException ignored) { + return Optional.empty(); + } + } + + public void send(byte[] payload) throws IOException { + MulticastSocket current = getSocket(); + DatagramPacket packet = new DatagramPacket(payload, payload.length, groupAddress, config.getMulticastPort()); + sendLock.lock(); + try { + current.send(packet); + } finally { + sendLock.unlock(); + } + } + + public synchronized void close() throws IOException { + if (socket == null) { + return; + } + IOException deferred = null; + try { + NetworkInterface networkInterface = resolveInterface(); + if (networkInterface != null && groupAddress != null) { + socket.leaveGroup(new InetSocketAddress(groupAddress, config.getMulticastPort()), networkInterface); + } else if (groupAddress != null) { + socket.leaveGroup(groupAddress); + } + } catch (IOException ex) { + deferred = ex; + } + socket.close(); + socket = null; + groupAddress = null; + if (deferred != null) { + throw deferred; + } + } + + private synchronized MulticastSocket getSocket() throws IOException { + if (socket == null || socket.isClosed()) { + throw new IOException("TAK multicast transport is not started"); + } + return socket; + } + + private NetworkInterface resolveInterface() throws IOException { + String iface = config.getMulticastInterface(); + if (iface == null || iface.isBlank()) { + return null; + } + NetworkInterface named = NetworkInterface.getByName(iface); + if (named != null) { + return named; + } + InetAddress address = InetAddress.getByName(iface); + return NetworkInterface.getByInetAddress(address); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java new file mode 100644 index 000000000..4860087e2 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakServerConnection.java @@ -0,0 +1,122 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.transport; + +import io.mapsmessaging.network.EndPointURL; + +import javax.net.SocketFactory; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.time.Duration; + +public class TakServerConnection { + + private final EndPointURL url; + private final int timeoutMs; + + private Socket socket; + private InputStream inputStream; + private OutputStream outputStream; + + public TakServerConnection(EndPointURL url, Duration timeout) { + this.url = url; + this.timeoutMs = (int) Math.max(1000, timeout.toMillis()); + } + + public synchronized void connect() throws IOException { + close(); + SocketFactory socketFactory = isSecure() ? SSLSocketFactory.getDefault() : SocketFactory.getDefault(); + socket = socketFactory.createSocket(); + socket.connect(new InetSocketAddress(url.getHost(), url.getPort()), timeoutMs); + socket.setKeepAlive(true); + socket.setTcpNoDelay(true); + socket.setSoTimeout(timeoutMs); + if (isSecure() && socket instanceof SSLSocket sslSocket) { + try { + sslSocket.startHandshake(); + } catch (SSLException sslException) { + throw new IOException("TAK TLS handshake failed. Verify trustStore/keyStore and server certificate validity", sslException); + } + } + inputStream = socket.getInputStream(); + outputStream = socket.getOutputStream(); + } + + public synchronized InputStream getInputStream() throws IOException { + if (!isConnected()) { + throw new IOException("TAK connection is not connected"); + } + return inputStream; + } + + public synchronized void write(byte[] data) throws IOException { + if (!isConnected()) { + throw new IOException("TAK connection is not connected"); + } + outputStream.write(data); + outputStream.flush(); + } + + public synchronized boolean isConnected() { + return socket != null && socket.isConnected() && !socket.isClosed(); + } + + public synchronized void close() throws IOException { + IOException deferred = null; + if (inputStream != null) { + try { + inputStream.close(); + } catch (IOException ex) { + deferred = ex; + } + inputStream = null; + } + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException ex) { + deferred = ex; + } + outputStream = null; + } + if (socket != null) { + try { + socket.close(); + } catch (IOException ex) { + deferred = ex; + } + socket = null; + } + if (deferred != null) { + throw deferred; + } + } + + private boolean isSecure() { + String protocol = url.getProtocol(); + return "ssl".equalsIgnoreCase(protocol) || "wss".equalsIgnoreCase(protocol); + } +} diff --git a/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory b/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory index 6d4a40add..2cf75a557 100644 --- a/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory +++ b/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory @@ -21,4 +21,5 @@ io.mapsmessaging.network.io.impl.tcp.TCPEndPointConnectionFactory io.mapsmessaging.network.io.impl.ssl.SSLEndPointConnectionFactory io.mapsmessaging.network.io.impl.noop.NoOpEndPointConnectionFactory io.mapsmessaging.network.io.impl.udp.UDPEndPointConnectionFactory -io.mapsmessaging.network.io.impl.serial.SerialEndPointConnectionFactory \ No newline at end of file +io.mapsmessaging.network.io.impl.serial.SerialEndPointConnectionFactory +io.mapsmessaging.network.protocol.impl.tak.TakEndPointConnectionFactory diff --git a/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory b/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory index 7af47b746..46d7b5a9f 100644 --- a/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory +++ b/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory @@ -75,4 +75,5 @@ io.mapsmessaging.network.protocol.impl.apache_pulsar.PulsarProtocolFactory io.mapsmessaging.network.protocol.impl.ibm_mq.MqProtocolFactory io.mapsmessaging.network.protocol.impl.aws_sns.SnsProtocolFactory io.mapsmessaging.network.protocol.impl.v2x_step.V2xStepProtocolFactory -io.mapsmessaging.network.protocol.impl.extension.impl.example.ExampleProtocolFactory \ No newline at end of file +io.mapsmessaging.network.protocol.impl.extension.impl.example.ExampleProtocolFactory +io.mapsmessaging.network.protocol.impl.tak.TakProtocolFactory diff --git a/src/main/resources/NetworkConnectionManager.yaml b/src/main/resources/NetworkConnectionManager.yaml index 942151398..61397a68b 100644 --- a/src/main/resources/NetworkConnectionManager.yaml +++ b/src/main/resources/NetworkConnectionManager.yaml @@ -66,4 +66,29 @@ NetworkConnectionManager: - direction: pull local_namespace: "/" - remote_namespace: "/#" \ No newline at end of file + remote_namespace: "/#" + + # Example TAK bridge (CoT XML stream over TLS) + # - name: "TAK Server Connection" + # url: ssl://tak.example.local:8089/ + # protocol: tak + # plugin: true + # + # config: + # payload: cot_xml + # framing: xml_stream + # max_payload_bytes: 1048576 + # reconnect_delay_ms: 2000 + # + # links: + # - direction: pull + # remote_namespace: "tak/cot/#" + # local_namespace: "tak/cot/" + # selector: "" + # qos: 0 + # + # - direction: push + # remote_namespace: "tak/cot/out/#" + # local_namespace: "ops/cot/out/" + # selector: "tak.type LIKE 'a-%'" + # qos: 0 diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java new file mode 100644 index 000000000..c15046b0f --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java @@ -0,0 +1,116 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TakExtensionTest { + + @Test + void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { + ExtensionConfigDTO dto = new ExtensionConfigDTO(); + Map config = new LinkedHashMap<>(); + config.put("payload", "tak_proto_v1"); + config.put("framing", "proto_stream"); + config.put("max_payload_bytes", 2048); + config.put("reconnect_delay_ms", 1500); + config.put("reconnect_max_delay_ms", 6000); + config.put("reconnect_backoff_multiplier", 1.5); + config.put("reconnect_jitter_ms", 100); + config.put("read_buffer_bytes", 4096); + config.put("multicast_enabled", true); + config.put("multicast_ingress_enabled", true); + config.put("multicast_egress_enabled", false); + config.put("multicast_group", "239.88.77.66"); + config.put("multicast_port", 7171); + config.put("multicast_interface", "lo0"); + config.put("multicast_ttl", 3); + config.put("multicast_read_buffer_bytes", 2048); + setField(dto, "config", config); + + TakExtensionConfig parsed = TakExtensionConfig.from(dto); + + assertEquals("tak_proto_v1", parsed.getPayload()); + assertEquals(TakStreamFramer.Mode.PROTO_STREAM, parsed.getFramingMode()); + assertEquals(2048, parsed.getMaxPayloadBytes()); + assertEquals(1500, parsed.getReconnectDelayMs()); + assertEquals(6000, parsed.getReconnectMaxDelayMs()); + assertEquals(1.5d, parsed.getReconnectBackoffMultiplier()); + assertEquals(100, parsed.getReconnectJitterMs()); + assertEquals(4096, parsed.getReadBufferBytes()); + assertTrue(parsed.isMulticastEnabled()); + assertTrue(parsed.isMulticastIngressEnabled()); + assertFalse(parsed.isMulticastEgressEnabled()); + assertEquals("239.88.77.66", parsed.getMulticastGroup()); + assertEquals(7171, parsed.getMulticastPort()); + assertEquals("lo0", parsed.getMulticastInterface()); + assertEquals(3, parsed.getMulticastTtl()); + assertEquals(2048, parsed.getMulticastReadBufferBytes()); + } + + @Test + void appliesSafeDefaultsWhenMissing() { + TakExtensionConfig parsed = TakExtensionConfig.from(null); + assertEquals("cot_xml", parsed.getPayload()); + assertEquals(TakStreamFramer.Mode.XML_STREAM, parsed.getFramingMode()); + assertEquals(1024 * 1024, parsed.getMaxPayloadBytes()); + assertEquals(2000, parsed.getReconnectDelayMs()); + assertEquals(30000, parsed.getReconnectMaxDelayMs()); + assertEquals(2.0d, parsed.getReconnectBackoffMultiplier()); + assertEquals(250, parsed.getReconnectJitterMs()); + assertFalse(parsed.isMulticastEnabled()); + assertEquals("239.2.3.1", parsed.getMulticastGroup()); + assertEquals(6969, parsed.getMulticastPort()); + } + + @Test + void clampsReconnectHardeningValues() throws Exception { + ExtensionConfigDTO dto = new ExtensionConfigDTO(); + Map config = new LinkedHashMap<>(); + config.put("reconnect_delay_ms", 50); + config.put("reconnect_max_delay_ms", 10); + config.put("reconnect_backoff_multiplier", 0.1d); + config.put("reconnect_jitter_ms", -1); + setField(dto, "config", config); + + TakExtensionConfig parsed = TakExtensionConfig.from(dto); + + assertEquals(100, parsed.getReconnectDelayMs()); + assertEquals(100, parsed.getReconnectMaxDelayMs()); + assertEquals(1.0d, parsed.getReconnectBackoffMultiplier()); + assertEquals(0, parsed.getReconnectJitterMs()); + } + + private static void setField(Object target, String fieldName, Object value) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java new file mode 100644 index 000000000..5da3631b6 --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java @@ -0,0 +1,115 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import io.mapsmessaging.api.MessageBuilder; +import io.mapsmessaging.api.message.Message; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class CotXmlCodecTest { + + private static final String VALID_COT = """ + + + + """; + + @Test + void decodeValidCotExtractsCoreFields() throws IOException { + CotXmlCodec codec = new CotXmlCodec(); + + Message message = codec.decode(VALID_COT.getBytes(StandardCharsets.UTF_8)); + + assertNotNull(message.getMeta()); + assertEquals("u-1", message.getMeta().get("tak.uid")); + assertEquals("a-f-G-U-C", message.getMeta().get("tak.type")); + assertEquals("51.5007", message.getMeta().get("tak.lat")); + assertEquals("-0.1246", message.getMeta().get("tak.lon")); + assertEquals("application/cot+xml", message.getContentType()); + assertNotNull(message.getOpaqueData()); + } + + @Test + void decodeMissingRequiredFieldFails() { + CotXmlCodec codec = new CotXmlCodec(); + String invalid = """ + + + + """; + + assertThrows(IOException.class, () -> codec.decode(invalid.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void decodeMalformedXmlFails() { + CotXmlCodec codec = new CotXmlCodec(); + String invalid = " codec.decode(invalid.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void decodeRejectsXxe() { + CotXmlCodec codec = new CotXmlCodec(); + String xxe = """ + ]> + + + &xxe; + + """; + assertThrows(IOException.class, () -> codec.decode(xxe.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void encodeFromMetaBuildsCotXml() throws IOException { + CotXmlCodec codec = new CotXmlCodec(); + Map meta = new LinkedHashMap<>(); + meta.put("tak.uid", "u-2"); + meta.put("tak.type", "a-f-G-U-C"); + meta.put("tak.time", "2026-02-19T09:10:00Z"); + meta.put("tak.start", "2026-02-19T09:10:00Z"); + meta.put("tak.stale", "2026-02-19T09:15:00Z"); + meta.put("tak.how", "m-g"); + meta.put("tak.lat", "1"); + meta.put("tak.lon", "2"); + meta.put("tak.hae", "3"); + meta.put("tak.ce", "4"); + meta.put("tak.le", "5"); + + Message message = new MessageBuilder().setMeta(meta).build(); + byte[] xml = codec.encode(message); + String text = new String(xml, StandardCharsets.UTF_8); + + assertTrue(text.startsWith(" + + + """; + Message message = new MessageBuilder().setOpaqueData(cot.getBytes(StandardCharsets.UTF_8)).build(); + + byte[] encoded = codec.encode(message); + Message decoded = codec.decode(encoded); + + assertEquals("application/x-protobuf", decoded.getContentType()); + assertEquals("protobuf", decoded.getMeta().get("tak.format")); + assertEquals("u-100", decoded.getMeta().get("tak.uid")); + assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type")); + assertNotNull(decoded.getOpaqueData()); + assertTrue(decoded.getOpaqueData().length > 0); + } + + @Test + void passesThroughBinaryProtobufPayload() throws IOException { + TakProtobufCodec codec = new TakProtobufCodec(); + byte[] raw = new byte[]{0x08, 0x7F, 0x10, 0x01}; + Message message = new MessageBuilder().setOpaqueData(raw).build(); + + byte[] encoded = codec.encode(message); + Message decoded = codec.decode(encoded); + + assertArrayEquals(raw, encoded); + assertArrayEquals(raw, decoded.getOpaqueData()); + assertEquals("protobuf", decoded.getMeta().get("tak.format")); + } + + @Test + void buildsCotFromMetaWhenPayloadMissing() throws IOException { + TakProtobufCodec codec = new TakProtobufCodec(); + Map meta = new LinkedHashMap<>(); + meta.put("tak.uid", "u-200"); + meta.put("tak.type", "a-f-G-U-C"); + meta.put("tak.time", "2026-02-19T09:10:00Z"); + meta.put("tak.start", "2026-02-19T09:10:00Z"); + meta.put("tak.stale", "2026-02-19T09:15:00Z"); + meta.put("tak.how", "m-g"); + meta.put("tak.lat", "1"); + meta.put("tak.lon", "2"); + Message message = new MessageBuilder().setMeta(meta).build(); + + byte[] encoded = codec.encode(message); + Message decoded = codec.decode(encoded); + + assertEquals("u-200", decoded.getMeta().get("tak.uid")); + assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type")); + } +} diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java new file mode 100644 index 000000000..17bd80ec1 --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java @@ -0,0 +1,80 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TakStreamFramerTest { + + @Test + void protoFrameRoundTrip() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024 * 1024); + byte[] payload = "hello".getBytes(StandardCharsets.UTF_8); + + byte[] framed = framer.frame(payload); + List decoded = framer.onBytes(framed, framed.length); + + assertEquals(1, decoded.size()); + assertArrayEquals(payload, decoded.getFirst()); + } + + @Test + void protoPartialReadReassemblesFrames() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024 * 1024); + byte[] payload = "abc123".getBytes(StandardCharsets.UTF_8); + byte[] framed = framer.frame(payload); + + List first = framer.onBytes(framed, 2); + assertTrue(first.isEmpty()); + List second = framer.onBytes(slice(framed, 2), framed.length - 2); + + assertEquals(1, second.size()); + assertArrayEquals(payload, second.getFirst()); + } + + @Test + void protoRejectsOversizedFrame() { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 8); + byte[] fake = new byte[]{(byte) 0xBF, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + assertThrows(IOException.class, () -> framer.onBytes(fake, fake.length)); + } + + @Test + void protoRejectsInvalidMagic() { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024); + byte[] invalid = new byte[]{0x01, 0x01, 0x7F}; + assertThrows(IOException.class, () -> framer.onBytes(invalid, invalid.length)); + } + + private static byte[] slice(byte[] source, int from) { + byte[] out = new byte[source.length - from]; + System.arraycopy(source, from, out, 0, out.length); + return out; + } +} diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java new file mode 100644 index 000000000..f221e518c --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java @@ -0,0 +1,58 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TakStreamFramerXmlTest { + + @Test + void xmlDelimitsMultipleEvents() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.XML_STREAM, 1024 * 1024); + String payload = "\n\n"; + + List frames = framer.onBytes(payload.getBytes(StandardCharsets.UTF_8), payload.length()); + + assertEquals(2, frames.size()); + assertTrue(new String(frames.get(0), StandardCharsets.UTF_8).contains("uid=\"1\"")); + assertTrue(new String(frames.get(1), StandardCharsets.UTF_8).contains("uid=\"2\"")); + } + + @Test + void xmlPartialEventBuffersUntilComplete() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.XML_STREAM, 1024 * 1024); + byte[] bytes = "".getBytes(StandardCharsets.UTF_8); + + List first = framer.onBytes(bytes, 10); + assertTrue(first.isEmpty()); + + byte[] rest = new byte[bytes.length - 10]; + System.arraycopy(bytes, 10, rest, 0, rest.length); + List second = framer.onBytes(rest, rest.length); + assertEquals(1, second.size()); + } +}