diff --git a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java index 4231631b4..3d89bec23 100644 --- a/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java +++ b/src/main/java/io/mapsmessaging/logging/ServerLogMessages.java @@ -244,6 +244,17 @@ public enum ServerLogMessages implements LogMessage { // + // + TAK_EXTENSION_INITIALISED(LEVEL.INFO, SERVER_CATEGORY.PROTOCOL, "TAK extension initialised for {}"), + TAK_EXTENSION_CLOSED(LEVEL.INFO, SERVER_CATEGORY.PROTOCOL, "TAK extension closed for {}"), + TAK_EXTENSION_OUTBOUND_FAILED(LEVEL.WARN, SERVER_CATEGORY.PROTOCOL, "TAK outbound failed for destination {}"), + TAK_EXTENSION_DECODE_FAILED(LEVEL.WARN, SERVER_CATEGORY.PROTOCOL, "TAK decode failed for transport {}"), + TAK_EXTENSION_RECONNECT_ATTEMPT(LEVEL.INFO, SERVER_CATEGORY.NETWORK, "TAK reconnect attempt in {}ms for {}"), + TAK_EXTENSION_RECONNECT_SUCCESS(LEVEL.INFO, SERVER_CATEGORY.NETWORK, "TAK reconnect succeeded for {}"), + TAK_EXTENSION_RECONNECT_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "TAK reconnect failed for {}"), + TAK_MULTICAST_IO_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "TAK multicast IO failed for {}:{}"), + // + // PACKET_SECURITY_VERIFICATION_FAILED(LEVEL.WARN, SERVER_CATEGORY.NETWORK, "Packet integrity verification failed: ip={}, algorithm={}, reason={}, packetLength={}, signatureSize={}"), PACKET_SECURITY_NOT_INITIALISED(LEVEL.ERROR, SERVER_CATEGORY.NETWORK, "Packet integrity not initialised: algorithm={}"), diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java index c339a94fb..92ad6e903 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/Extension.java @@ -20,6 +20,7 @@ package io.mapsmessaging.network.protocol.impl.extension; import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.network.io.Packet; import jakarta.validation.constraints.NotNull; import lombok.Getter; import lombok.NonNull; @@ -74,6 +75,10 @@ public String getSessionId() { public abstract void registerLocalLink(@NonNull @NotNull String destination) throws IOException; + public boolean processPacket(@NonNull @NotNull Packet packet) throws IOException { + return false; + } + protected void inbound(@NonNull @NotNull String destinationName, @NonNull @NotNull Message message) throws IOException { if (extensionProtocol == null) { diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java index 2ae596900..947f2f538 100644 --- a/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/extension/ExtensionProtocol.java @@ -155,7 +155,7 @@ public ProtocolInformationDTO getInformation() { @Override public boolean processPacket(@NonNull @NotNull Packet packet) throws IOException { - return false; + return extension.processPacket(packet); } @Override diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java new file mode 100644 index 000000000..914f8f092 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakEndPointConnectionFactory.java @@ -0,0 +1,52 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.network.EndPointURL; +import io.mapsmessaging.network.io.EndPoint; +import io.mapsmessaging.network.io.EndPointConnectedCallback; +import io.mapsmessaging.network.io.EndPointConnectionFactory; +import io.mapsmessaging.network.io.EndPointServerStatus; +import io.mapsmessaging.network.io.impl.SelectorLoadManager; +import io.mapsmessaging.network.protocol.impl.extension.ExtensionEndPointConnectionFactory; + +import java.io.IOException; +import java.util.List; + +public class TakEndPointConnectionFactory implements EndPointConnectionFactory { + + private final ExtensionEndPointConnectionFactory delegate = new ExtensionEndPointConnectionFactory(); + + @Override + public EndPoint connect(EndPointURL url, SelectorLoadManager selector, EndPointConnectedCallback connectedCallback, + EndPointServerStatus endPointServerStatus, List jmxPath) throws IOException { + return delegate.connect(url, selector, connectedCallback, endPointServerStatus, jmxPath); + } + + @Override + public String getName() { + return "tak"; + } + + @Override + public String getDescription() { + return "TAK extension endpoint factory"; + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java new file mode 100644 index 000000000..442800d77 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtension.java @@ -0,0 +1,241 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.logging.Logger; +import io.mapsmessaging.logging.LoggerFactory; +import io.mapsmessaging.logging.ServerLogMessages; +import io.mapsmessaging.network.EndPointURL; +import io.mapsmessaging.network.io.EndPoint; +import io.mapsmessaging.network.io.Packet; +import io.mapsmessaging.network.protocol.impl.extension.Extension; +import io.mapsmessaging.network.protocol.impl.tak.codec.CotXmlCodec; +import io.mapsmessaging.network.protocol.impl.tak.codec.TakPayloadCodec; +import io.mapsmessaging.network.protocol.impl.tak.codec.TakProtobufCodec; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakFrameReader; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer; +import io.mapsmessaging.network.protocol.impl.tak.transport.TakMulticastTransport; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TakExtension extends Extension { + + private static final String PROTOCOL_NAME = "TAK"; + private static final String PROTOCOL_VERSION = "1.0"; + private static final String DEFAULT_EVENT_TYPE = "unknown"; + private static final String DEFAULT_DESTINATION_PREFIX = "tak/cot/"; + private static final String WILDCARD_MARKER = "#"; + + private final EndPointURL url; + private final Logger logger; + private final EndPoint endPoint; + private final TakExtensionConfig config; + private final TakPayloadCodec payloadCodec; + private final TakStreamFramer streamFramer; + private final TakMulticastTransport multicastTransport; + private final TakFrameReader frameReader; + private final Set remoteLinks; + private final AtomicBoolean running; + private volatile Thread multicastReaderThread; + + public TakExtension(EndPoint endPoint, @Nullable ExtensionConfigDTO extensionConfig) { + this.endPoint = endPoint; + this.url = new EndPointURL(endPoint.getConfig().getUrl()); + this.logger = LoggerFactory.getLogger(TakExtension.class); + this.config = TakExtensionConfig.from(extensionConfig); + CotXmlCodec cotXmlCodec = CotXmlCodec.withSchemaFormatter( + config.isXmlNamespaceAware(), + config.isXmlCoalescing(), + config.isXmlValidating(), + config.getXmlRootEntry(), + config.getXmlSchemaId()); + this.payloadCodec = TakExtensionConfig.PAYLOAD_TAK_PROTO_V1.equalsIgnoreCase(config.getPayload()) + ? TakProtobufCodec.withSchemaFormatter( + cotXmlCodec, + config.getMaxPayloadBytes(), + config.getProtobufDescriptorBase64(), + config.getProtobufMessageName(), + config.getProtobufSchemaId()) + : cotXmlCodec; + this.streamFramer = new TakStreamFramer(config.getFramingMode(), config.getMaxPayloadBytes()); + this.multicastTransport = config.isMulticastEnabled() ? new TakMulticastTransport(config) : null; + this.frameReader = new TakFrameReader(streamFramer, config.getReadBufferBytes()); + this.remoteLinks = ConcurrentHashMap.newKeySet(); + this.running = new AtomicBoolean(false); + } + + @Override + public void initialise() throws IOException { + if (multicastTransport != null) { + multicastTransport.start(); + } + logger.log(ServerLogMessages.TAK_EXTENSION_INITIALISED, url.toString()); + running.set(true); + if (multicastTransport != null && config.isMulticastIngressEnabled()) { + multicastReaderThread = new Thread(this::multicastReaderLoop, "tak-mcast-reader-" + config.getMulticastGroup() + "-" + config.getMulticastPort()); + multicastReaderThread.setDaemon(true); + multicastReaderThread.start(); + } + } + + @Override + public void close() throws IOException { + running.set(false); + Thread multicastThread = multicastReaderThread; + if (multicastThread != null) { + multicastThread.interrupt(); + try { + multicastThread.join(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + if (multicastTransport != null) { + multicastTransport.close(); + } + logger.log(ServerLogMessages.TAK_EXTENSION_CLOSED, url.toString()); + } + + @Override + public String getName() { + return PROTOCOL_NAME; + } + + @Override + public String getVersion() { + return PROTOCOL_VERSION; + } + + @Override + public boolean supportsRemoteFiltering() { + return false; + } + + @Override + public void outbound(String destinationName, Message message) { + if (!running.get()) { + return; + } + try { + byte[] payload = payloadCodec.encode(message); + try { + byte[] framed = streamFramer.frame(payload); + endPoint.sendPacket(new Packet(ByteBuffer.wrap(framed))); + } catch (IOException ignored) { + // Stream path is best-effort; multicast egress may still succeed. + } + if (multicastTransport != null && config.isMulticastEgressEnabled()) { + try { + multicastTransport.send(payload); + endPoint.updateWriteBytes(payload.length); + } catch (IOException ignored) { + logger.log(ServerLogMessages.TAK_MULTICAST_IO_FAILED, config.getMulticastGroup(), config.getMulticastPort()); + } + } + } catch (IOException ignored) { + logger.log(ServerLogMessages.TAK_EXTENSION_OUTBOUND_FAILED, destinationName); + } + } + + @Override + public void registerRemoteLink(String destination, @Nullable String filter) { + remoteLinks.add(destination); + } + + @Override + public void registerLocalLink(String destination) { + // Egress mapping is already handled by ExtensionProtocol/Protocol.parseOutboundMessage. + } + + @Override + public boolean processPacket(@NotNull Packet packet) throws IOException { + if (!running.get()) { + return false; + } + ByteBuffer raw = packet.getRawBuffer(); + if (raw == null || raw.remaining() <= 0) { + return false; + } + for (byte[] frame : frameReader.read(raw)) { + processInboundFrame(frame, "stream"); + } + packet.position(packet.limit()); + return true; + } + + private void multicastReaderLoop() { + while (running.get()) { + try { + Optional frame = multicastTransport.read(); + if (frame.isEmpty()) { + continue; + } + endPoint.updateReadBytes(frame.get().length); + processInboundFrame(frame.get(), "multicast"); + } catch (IOException ex) { + if (!running.get()) { + break; + } + logger.log(ServerLogMessages.TAK_MULTICAST_IO_FAILED, config.getMulticastGroup(), config.getMulticastPort()); + } + } + } + + private String resolveInboundDestination(Message message) { + String eventType = null; + if (message.getMeta() != null) { + eventType = message.getMeta().get("tak.type"); + if (eventType == null || eventType.isBlank()) { + eventType = message.getMeta().get("tak_type"); + } + } + if (eventType == null || eventType.isBlank()) { + eventType = DEFAULT_EVENT_TYPE; + } + if (!remoteLinks.isEmpty()) { + String remoteDestination = remoteLinks.iterator().next(); + if (remoteDestination.contains(WILDCARD_MARKER)) { + return remoteDestination.replace(WILDCARD_MARKER, eventType); + } + return remoteDestination; + } + return DEFAULT_DESTINATION_PREFIX + eventType; + } + + private void processInboundFrame(byte[] frame, String source) { + try { + Message message = payloadCodec.decode(frame); + String destination = resolveInboundDestination(message); + inbound(destination, message); + } catch (IOException decodeFailure) { + logger.log(ServerLogMessages.TAK_EXTENSION_DECODE_FAILED, source); + } + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java new file mode 100644 index 000000000..737a62a8e --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionConfig.java @@ -0,0 +1,221 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer; +import lombok.Getter; + +import java.util.Locale; +import java.util.Map; + +@Getter +public class TakExtensionConfig { + + public static final String PAYLOAD_COT_XML = "cot_xml"; + public static final String PAYLOAD_TAK_PROTO_V1 = "tak_proto_v1"; + private static final int DEFAULT_MAX_PAYLOAD = 1024 * 1024; + private static final int DEFAULT_RECONNECT_MS = 2000; + private static final int DEFAULT_RECONNECT_MAX_MS = 30000; + private static final double DEFAULT_RECONNECT_MULTIPLIER = 2.0d; + private static final int DEFAULT_RECONNECT_JITTER_MS = 250; + private static final int DEFAULT_READ_BUFFER = 8192; + private static final String DEFAULT_MULTICAST_GROUP = "239.2.3.1"; + private static final int DEFAULT_MULTICAST_PORT = 6969; + private static final int DEFAULT_MULTICAST_TTL = 1; + private static final String DEFAULT_PROTOBUF_DESCRIPTOR_BASE64 = ""; + private static final String DEFAULT_PROTOBUF_MESSAGE_NAME = ""; + private static final String DEFAULT_XML_SCHEMA_ID = ""; + private static final String DEFAULT_PROTOBUF_SCHEMA_ID = ""; + private static final boolean DEFAULT_XML_NAMESPACE_AWARE = true; + private static final boolean DEFAULT_XML_COALESCING = true; + private static final boolean DEFAULT_XML_VALIDATING = false; + private static final String DEFAULT_XML_ROOT_ENTRY = "event"; + + private final String payload; + private final TakStreamFramer.Mode framingMode; + private final int maxPayloadBytes; + private final int reconnectDelayMs; + private final int reconnectMaxDelayMs; + private final double reconnectBackoffMultiplier; + private final int reconnectJitterMs; + private final int readBufferBytes; + private final boolean multicastEnabled; + private final boolean multicastIngressEnabled; + private final boolean multicastEgressEnabled; + private final String multicastGroup; + private final int multicastPort; + private final String multicastInterface; + private final int multicastTtl; + private final int multicastReadBufferBytes; + private final String protobufDescriptorBase64; + private final String protobufMessageName; + private final String xmlSchemaId; + private final String protobufSchemaId; + private final boolean xmlNamespaceAware; + private final boolean xmlCoalescing; + private final boolean xmlValidating; + private final String xmlRootEntry; + + private TakExtensionConfig(String payload, TakStreamFramer.Mode framingMode, int maxPayloadBytes, int reconnectDelayMs, + int reconnectMaxDelayMs, double reconnectBackoffMultiplier, int reconnectJitterMs, int readBufferBytes, + boolean multicastEnabled, boolean multicastIngressEnabled, boolean multicastEgressEnabled, + String multicastGroup, int multicastPort, String multicastInterface, int multicastTtl, int multicastReadBufferBytes, + String protobufDescriptorBase64, String protobufMessageName, + String xmlSchemaId, String protobufSchemaId, + boolean xmlNamespaceAware, boolean xmlCoalescing, boolean xmlValidating, String xmlRootEntry) { + this.payload = payload; + this.framingMode = framingMode; + this.maxPayloadBytes = maxPayloadBytes; + this.reconnectDelayMs = reconnectDelayMs; + this.reconnectMaxDelayMs = reconnectMaxDelayMs; + this.reconnectBackoffMultiplier = reconnectBackoffMultiplier; + this.reconnectJitterMs = reconnectJitterMs; + this.readBufferBytes = readBufferBytes; + this.multicastEnabled = multicastEnabled; + this.multicastIngressEnabled = multicastIngressEnabled; + this.multicastEgressEnabled = multicastEgressEnabled; + this.multicastGroup = multicastGroup; + this.multicastPort = multicastPort; + this.multicastInterface = multicastInterface; + this.multicastTtl = multicastTtl; + this.multicastReadBufferBytes = multicastReadBufferBytes; + this.protobufDescriptorBase64 = protobufDescriptorBase64; + this.protobufMessageName = protobufMessageName; + this.xmlSchemaId = xmlSchemaId; + this.protobufSchemaId = protobufSchemaId; + this.xmlNamespaceAware = xmlNamespaceAware; + this.xmlCoalescing = xmlCoalescing; + this.xmlValidating = xmlValidating; + this.xmlRootEntry = xmlRootEntry; + } + + public static TakExtensionConfig from(ExtensionConfigDTO extensionConfig) { + Map config = extensionConfig != null ? extensionConfig.getConfig() : null; + String payload = asString(config, "payload", PAYLOAD_COT_XML).toLowerCase(Locale.ROOT); + String framing = asString(config, "framing", "xml_stream").toLowerCase(Locale.ROOT); + TakStreamFramer.Mode mode = "proto_stream".equals(framing) ? TakStreamFramer.Mode.PROTO_STREAM : TakStreamFramer.Mode.XML_STREAM; + int maxPayload = asInt(config, "max_payload_bytes", DEFAULT_MAX_PAYLOAD); + int reconnectMs = asInt(config, "reconnect_delay_ms", DEFAULT_RECONNECT_MS); + int reconnectMaxMs = asInt(config, "reconnect_max_delay_ms", DEFAULT_RECONNECT_MAX_MS); + double reconnectMultiplier = asDouble(config, "reconnect_backoff_multiplier", DEFAULT_RECONNECT_MULTIPLIER); + int reconnectJitter = asInt(config, "reconnect_jitter_ms", DEFAULT_RECONNECT_JITTER_MS); + int readBuffer = asInt(config, "read_buffer_bytes", DEFAULT_READ_BUFFER); + boolean multicastEnabled = asBoolean(config, "multicast_enabled", false); + boolean multicastIngressEnabled = asBoolean(config, "multicast_ingress_enabled", multicastEnabled); + boolean multicastEgressEnabled = asBoolean(config, "multicast_egress_enabled", multicastEnabled); + String multicastGroup = asString(config, "multicast_group", DEFAULT_MULTICAST_GROUP).trim(); + int multicastPort = asInt(config, "multicast_port", DEFAULT_MULTICAST_PORT); + String multicastInterface = asString(config, "multicast_interface", "").trim(); + int multicastTtl = asInt(config, "multicast_ttl", DEFAULT_MULTICAST_TTL); + int multicastReadBuffer = asInt(config, "multicast_read_buffer_bytes", DEFAULT_READ_BUFFER); + String protobufDescriptorBase64 = asString(config, "protobuf_descriptor_base64", DEFAULT_PROTOBUF_DESCRIPTOR_BASE64).trim(); + String protobufMessageName = asString(config, "protobuf_message_name", DEFAULT_PROTOBUF_MESSAGE_NAME).trim(); + String xmlSchemaId = asString(config, "xml_schema_id", DEFAULT_XML_SCHEMA_ID).trim(); + String protobufSchemaId = asString(config, "protobuf_schema_id", DEFAULT_PROTOBUF_SCHEMA_ID).trim(); + boolean xmlNamespaceAware = asBoolean(config, "xml_namespace_aware", DEFAULT_XML_NAMESPACE_AWARE); + boolean xmlCoalescing = asBoolean(config, "xml_coalescing", DEFAULT_XML_COALESCING); + boolean xmlValidating = asBoolean(config, "xml_validating", DEFAULT_XML_VALIDATING); + String xmlRootEntry = asString(config, "xml_root_entry", DEFAULT_XML_ROOT_ENTRY).trim(); + int reconnectBase = Math.max(100, reconnectMs); + int reconnectMax = Math.max(reconnectBase, reconnectMaxMs); + return new TakExtensionConfig(payload, mode, Math.max(1024, maxPayload), reconnectBase, + reconnectMax, clampMultiplier(reconnectMultiplier), Math.max(0, reconnectJitter), Math.max(512, readBuffer), + multicastEnabled, multicastIngressEnabled, multicastEgressEnabled, + multicastGroup.isEmpty() ? DEFAULT_MULTICAST_GROUP : multicastGroup, + Math.max(1, multicastPort), + multicastInterface, + Math.max(1, Math.min(255, multicastTtl)), + Math.max(512, multicastReadBuffer), + protobufDescriptorBase64, + protobufMessageName, + xmlSchemaId, + protobufSchemaId, + xmlNamespaceAware, + xmlCoalescing, + xmlValidating, + xmlRootEntry.isEmpty() ? DEFAULT_XML_ROOT_ENTRY : xmlRootEntry); + } + + private static String asString(Map config, String key, String def) { + if (config == null) { + return def; + } + Object val = config.get(key); + return val == null ? def : val.toString(); + } + + private static int asInt(Map config, String key, int def) { + if (config == null) { + return def; + } + Object val = config.get(key); + if (val == null) { + return def; + } + if (val instanceof Number number) { + return number.intValue(); + } + try { + return Integer.parseInt(val.toString()); + } catch (NumberFormatException ignored) { + return def; + } + } + + private static double asDouble(Map config, String key, double def) { + if (config == null) { + return def; + } + Object val = config.get(key); + if (val == null) { + return def; + } + if (val instanceof Number number) { + return number.doubleValue(); + } + try { + return Double.parseDouble(val.toString()); + } catch (NumberFormatException ignored) { + return def; + } + } + + private static double clampMultiplier(double multiplier) { + if (Double.isNaN(multiplier) || Double.isInfinite(multiplier)) { + return DEFAULT_RECONNECT_MULTIPLIER; + } + return Math.max(1.0d, Math.min(10.0d, multiplier)); + } + + private static boolean asBoolean(Map config, String key, boolean def) { + if (config == null) { + return def; + } + Object val = config.get(key); + if (val == null) { + return def; + } + if (val instanceof Boolean bool) { + return bool; + } + return Boolean.parseBoolean(val.toString()); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java new file mode 100644 index 000000000..db353b2f6 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/TakProtocolFactory.java @@ -0,0 +1,74 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.dto.rest.config.protocol.ProtocolConfigDTO; +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.io.EndPoint; +import io.mapsmessaging.network.io.Packet; +import io.mapsmessaging.network.protocol.Protocol; +import io.mapsmessaging.network.protocol.ProtocolImplFactory; +import io.mapsmessaging.network.protocol.detection.NoOpDetection; +import io.mapsmessaging.network.protocol.impl.extension.ExtensionEndPoint; +import io.mapsmessaging.network.protocol.impl.extension.ExtensionProtocol; + +import java.io.IOException; + +public class TakProtocolFactory extends ProtocolImplFactory { + + public TakProtocolFactory() { + super("tak", "TAK Cursor-on-Target extension protocol", new NoOpDetection()); + } + + @Override + public Protocol connect(EndPoint endPoint, String sessionId, String username, String password) throws IOException { + ExtensionConfigDTO extensionConfig; + if (endPoint instanceof ExtensionEndPoint extensionEndPoint && extensionEndPoint.config() instanceof ExtensionConfigDTO config) { + extensionConfig = config; + } else { + extensionConfig = null; + if (endPoint.getConfig() != null && endPoint.getConfig().getProtocolConfigs() != null) { + for (ProtocolConfigDTO protocolConfig : endPoint.getConfig().getProtocolConfigs()) { + if (protocolConfig instanceof ExtensionConfigDTO config && getName().equalsIgnoreCase(config.getProtocol())) { + extensionConfig = config; + break; + } + } + } + if (extensionConfig == null) { + throw new IOException("TAK extension configuration not found"); + } + } + + Protocol protocol = new ExtensionProtocol(endPoint, new TakExtension(endPoint, extensionConfig)); + protocol.connect(sessionId, username, password); + return protocol; + } + + @Override + public void create(EndPoint endPoint, Packet packet) { + // TAK extension does not accept inbound client sockets. + } + + @Override + public String getTransportType() { + return "tak"; + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java new file mode 100644 index 000000000..fde8a5e62 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodec.java @@ -0,0 +1,322 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.WireFormat; +import io.mapsmessaging.api.MessageBuilder; +import io.mapsmessaging.api.features.QualityOfService; +import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.engine.schema.SchemaManager; +import io.mapsmessaging.schemas.config.impl.XmlSchemaConfig; +import io.mapsmessaging.schemas.formatters.MessageFormatter; +import io.mapsmessaging.schemas.formatters.MessageFormatterFactory; +import io.mapsmessaging.selector.IdentifierResolver; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; + +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import java.io.IOException; +import java.io.StringReader; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.LinkedHashMap; +import java.util.Map; + +public class CotXmlCodec implements TakPayloadCodec { + + private final MessageFormatter xmlFormatter; + private final String schemaId; + + public CotXmlCodec() { + this(null, SchemaManager.DEFAULT_XML_SCHEMA.toString()); + } + + public CotXmlCodec(MessageFormatter xmlFormatter) { + this(xmlFormatter, SchemaManager.DEFAULT_XML_SCHEMA.toString()); + } + + public CotXmlCodec(MessageFormatter xmlFormatter, String schemaId) { + this.xmlFormatter = xmlFormatter; + this.schemaId = schemaId; + } + + public static CotXmlCodec withSchemaFormatter(boolean namespaceAware, boolean coalescing, boolean validating, String rootEntry) { + return withSchemaFormatter(namespaceAware, coalescing, validating, rootEntry, SchemaManager.DEFAULT_XML_SCHEMA.toString()); + } + + public static CotXmlCodec withSchemaFormatter(boolean namespaceAware, boolean coalescing, boolean validating, String rootEntry, String schemaId) { + return new CotXmlCodec(createFormatter(namespaceAware, coalescing, validating, rootEntry), schemaId); + } + + @Override + public Message decode(byte[] payload) throws IOException { + String xml = new String(payload, StandardCharsets.UTF_8); + Element event = parseRootEvent(xml); + Element point = extractPoint(event); + + String uid = required(event, "uid"); + String type = required(event, "type"); + String time = required(event, "time"); + String start = required(event, "start"); + String stale = required(event, "stale"); + String how = required(event, "how"); + String lat = required(point, "lat"); + String lon = required(point, "lon"); + + Map meta = new LinkedHashMap<>(); + meta.put("tak.uid", uid); + meta.put("tak.type", type); + meta.put("tak.time", time); + meta.put("tak.start", start); + meta.put("tak.stale", stale); + meta.put("tak.how", how); + meta.put("tak.lat", lat); + meta.put("tak.lon", lon); + putIfPresent(meta, "tak.hae", point.getAttribute("hae")); + putIfPresent(meta, "tak.ce", point.getAttribute("ce")); + putIfPresent(meta, "tak.le", point.getAttribute("le")); + meta.put("tak.format", "xml"); + meta.put("tak.transport", "stream"); + mergeSchemaParsedMeta(meta, payload); + addSelectorAliases(meta); + + MessageBuilder builder = new MessageBuilder() + .setOpaqueData(payload) + .setMeta(meta) + .setContentType("application/cot+xml") + .setQoS(QualityOfService.AT_MOST_ONCE); + if (schemaId != null && !schemaId.isBlank()) { + builder.setSchemaId(schemaId); + } + return builder.build(); + } + + @Override + public byte[] encode(Message message) throws IOException { + byte[] opaque = message.getOpaqueData(); + byte[] cloudEventPayload = TakCloudEventPayloadExtractor.tryExtractPayload(opaque); + if (cloudEventPayload != null) { + opaque = cloudEventPayload; + } + if (opaque != null && opaque.length > 0) { + String candidate = new String(opaque, StandardCharsets.UTF_8).trim(); + if (candidate.startsWith(" meta = message.getMeta() != null ? message.getMeta() : new LinkedHashMap<>(); + String now = Instant.now().truncatedTo(ChronoUnit.SECONDS).toString(); + String stale = Instant.now().plus(5, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS).toString(); + + String uid = getOrDefault(meta, "tak.uid", "maps-generated"); + String type = getOrDefault(meta, "tak.type", "a-f-G-U-C"); + String time = getOrDefault(meta, "tak.time", now); + String start = getOrDefault(meta, "tak.start", now); + String staleTime = getOrDefault(meta, "tak.stale", stale); + String how = getOrDefault(meta, "tak.how", "m-g"); + String lat = getOrDefault(meta, "tak.lat", "0"); + String lon = getOrDefault(meta, "tak.lon", "0"); + String hae = getOrDefault(meta, "tak.hae", "0"); + String ce = getOrDefault(meta, "tak.ce", "9999999"); + String le = getOrDefault(meta, "tak.le", "9999999"); + + String xml = "" + + "" + + ""; + return xml.getBytes(StandardCharsets.UTF_8); + } + + private static byte[] tryExtractEmbeddedCotFromProtobuf(byte[] payload) { + try { + CodedInputStream input = CodedInputStream.newInstance(payload); + while (!input.isAtEnd()) { + int tag = input.readTag(); + if (tag == 0) { + break; + } + int wireType = WireFormat.getTagWireType(tag); + if (wireType == WireFormat.WIRETYPE_LENGTH_DELIMITED) { + byte[] candidate = input.readByteArray(); + if (candidate.length > 0) { + String xmlCandidate = new String(candidate, StandardCharsets.UTF_8).trim(); + if (xmlCandidate.startsWith(" meta, String key, String def) { + String value = meta.get(key); + return value == null || value.isBlank() ? def : value; + } + + private static void putIfPresent(Map meta, String key, String value) { + if (value != null && !value.isBlank()) { + meta.put(key, value); + } + } + + private static Element parseRootEvent(String xml) throws IOException { + try { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + dbf.setNamespaceAware(true); + dbf.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); + dbf.setFeature("http://xml.org/sax/features/external-general-entities", false); + dbf.setFeature("http://xml.org/sax/features/external-parameter-entities", false); + dbf.setFeature(XMLConstants.FEATURE_SECURE_PROCESSING, true); + dbf.setExpandEntityReferences(false); + dbf.setXIncludeAware(false); + dbf.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, ""); + dbf.setAttribute(XMLConstants.ACCESS_EXTERNAL_SCHEMA, ""); + DocumentBuilder builder = dbf.newDocumentBuilder(); + Document document = builder.parse(new InputSource(new StringReader(xml))); + Element root = document.getDocumentElement(); + if (root == null || !"event".equalsIgnoreCase(root.getLocalName() == null ? root.getNodeName() : root.getLocalName())) { + throw new IOException("Invalid CoT XML payload: root element must be event"); + } + return root; + } catch (ParserConfigurationException | SAXException ex) { + throw new IOException("Invalid CoT XML payload", ex); + } + } + + private static Element extractPoint(Element root) throws IOException { + NodeList byNs = root.getElementsByTagNameNS("*", "point"); + if (byNs.getLength() > 0 && byNs.item(0) instanceof Element element) { + return element; + } + NodeList byName = root.getElementsByTagName("point"); + if (byName.getLength() > 0 && byName.item(0) instanceof Element element) { + return element; + } + throw new IOException("Invalid CoT XML payload: missing point element"); + } + + private static String required(Element element, String name) throws IOException { + String value = element.getAttribute(name); + if (value == null || value.isBlank()) { + throw new IOException("Invalid CoT XML payload: missing " + name); + } + return value; + } + + private static String esc(String value) { + return value + .replace("&", "&") + .replace("\"", """) + .replace("<", "<") + .replace(">", ">"); + } + + private void mergeSchemaParsedMeta(Map meta, byte[] payload) { + if (xmlFormatter == null) { + return; + } + try { + IdentifierResolver parsed = xmlFormatter.parse(payload); + if (parsed == null) { + return; + } + promoteAny(parsed, meta, "tak.uid", "uid", "event.uid"); + promoteAny(parsed, meta, "tak.type", "type", "event.type"); + promoteAny(parsed, meta, "tak.time", "time", "event.time"); + promoteAny(parsed, meta, "tak.start", "start", "event.start"); + promoteAny(parsed, meta, "tak.stale", "stale", "event.stale"); + promoteAny(parsed, meta, "tak.how", "how", "event.how"); + promoteAny(parsed, meta, "tak.lat", "point.lat", "event.point.lat"); + promoteAny(parsed, meta, "tak.lon", "point.lon", "event.point.lon"); + promoteAny(parsed, meta, "tak.hae", "point.hae", "event.point.hae"); + promoteAny(parsed, meta, "tak.ce", "point.ce", "event.point.ce"); + promoteAny(parsed, meta, "tak.le", "point.le", "event.point.le"); + meta.put("tak.xml_schema_parsed", "true"); + } catch (Exception ignored) { + // Preserve default CoT metadata extraction if formatter parsing fails. + } + } + + private static void promoteAny(IdentifierResolver parsed, Map meta, String toKey, String... fromKeys) { + for (String fromKey : fromKeys) { + Object val = parsed.get(fromKey); + if (val != null) { + meta.put(toKey, val.toString()); + return; + } + } + } + + private static void addSelectorAliases(Map meta) { + alias(meta, "tak.uid", "tak_uid"); + alias(meta, "tak.type", "tak_type"); + alias(meta, "tak.time", "tak_time"); + alias(meta, "tak.start", "tak_start"); + alias(meta, "tak.stale", "tak_stale"); + alias(meta, "tak.how", "tak_how"); + alias(meta, "tak.lat", "tak_lat"); + alias(meta, "tak.lon", "tak_lon"); + alias(meta, "tak.hae", "tak_hae"); + alias(meta, "tak.ce", "tak_ce"); + alias(meta, "tak.le", "tak_le"); + alias(meta, "tak.format", "tak_format"); + alias(meta, "tak.transport", "tak_transport"); + } + + private static void alias(Map meta, String sourceKey, String aliasKey) { + String value = meta.get(sourceKey); + if (value != null && !value.isBlank()) { + meta.put(aliasKey, value); + } + } + + private static MessageFormatter createFormatter(boolean namespaceAware, boolean coalescing, boolean validating, String rootEntry) { + try { + XmlSchemaConfig schemaConfig = new XmlSchemaConfig(); + XmlSchemaConfig.XmlConfig xmlConfig = new XmlSchemaConfig.XmlConfig(); + xmlConfig.setNamespaceAware(namespaceAware); + xmlConfig.setCoalescing(coalescing); + xmlConfig.setValidating(validating); + xmlConfig.setRootEntry(rootEntry == null || rootEntry.isBlank() ? "event" : rootEntry); + schemaConfig.setConfig(xmlConfig); + return MessageFormatterFactory.getInstance().getFormatter(schemaConfig); + } catch (Exception ignored) { + return null; + } + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakCloudEventPayloadExtractor.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakCloudEventPayloadExtractor.java new file mode 100644 index 000000000..5d482358d --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakCloudEventPayloadExtractor.java @@ -0,0 +1,72 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; + +final class TakCloudEventPayloadExtractor { + + private TakCloudEventPayloadExtractor() { + } + + static byte[] tryExtractPayload(byte[] opaqueData) { + if (opaqueData == null || opaqueData.length == 0) { + return null; + } + String jsonCandidate = new String(opaqueData, StandardCharsets.UTF_8).trim(); + if (!jsonCandidate.startsWith("{")) { + return null; + } + try { + JsonElement rootElement = JsonParser.parseString(jsonCandidate); + if (!rootElement.isJsonObject()) { + return null; + } + JsonObject root = rootElement.getAsJsonObject(); + if (!root.has("specversion")) { + return null; + } + + JsonElement base64Element = root.get("data_base64"); + if (base64Element != null && base64Element.isJsonPrimitive()) { + return Base64.getDecoder().decode(base64Element.getAsString()); + } + + JsonElement dataElement = root.get("data"); + if (dataElement == null || !dataElement.isJsonObject()) { + return null; + } + JsonObject data = dataElement.getAsJsonObject(); + JsonElement payloadBase64 = data.get("payload_base64"); + if (payloadBase64 != null && payloadBase64.isJsonPrimitive()) { + return Base64.getDecoder().decode(payloadBase64.getAsString()); + } + return null; + } catch (Exception ignored) { + return null; + } + } +} + diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java new file mode 100644 index 000000000..0e24b9ca7 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakPayloadCodec.java @@ -0,0 +1,31 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import io.mapsmessaging.api.message.Message; + +import java.io.IOException; + +public interface TakPayloadCodec { + + Message decode(byte[] payload) throws IOException; + + byte[] encode(Message message) throws IOException; +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java new file mode 100644 index 000000000..d28665214 --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/codec/TakProtobufCodec.java @@ -0,0 +1,244 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.CodedOutputStream; +import com.google.protobuf.WireFormat; +import io.mapsmessaging.api.MessageBuilder; +import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.schemas.config.impl.ProtoBufSchemaConfig; +import io.mapsmessaging.schemas.formatters.MessageFormatter; +import io.mapsmessaging.schemas.formatters.MessageFormatterFactory; +import io.mapsmessaging.selector.IdentifierResolver; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.LinkedHashMap; +import java.util.Map; + +public class TakProtobufCodec implements TakPayloadCodec { + + private static final int DEFAULT_MAX_PAYLOAD_BYTES = 1024 * 1024; + private final CotXmlCodec cotXmlCodec; + private final int maxPayloadBytes; + private final MessageFormatter protobufFormatter; + private final String protobufSchemaId; + + public TakProtobufCodec() { + this(new CotXmlCodec(), DEFAULT_MAX_PAYLOAD_BYTES, null, null); + } + + public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes) { + this(cotXmlCodec, maxPayloadBytes, null, null); + } + + public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes, MessageFormatter protobufFormatter) { + this(cotXmlCodec, maxPayloadBytes, protobufFormatter, null); + } + + public TakProtobufCodec(CotXmlCodec cotXmlCodec, int maxPayloadBytes, MessageFormatter protobufFormatter, String protobufSchemaId) { + this.cotXmlCodec = cotXmlCodec; + this.maxPayloadBytes = Math.max(1024, maxPayloadBytes); + this.protobufFormatter = protobufFormatter; + this.protobufSchemaId = protobufSchemaId; + } + + public static TakProtobufCodec withSchemaFormatter(CotXmlCodec cotXmlCodec, int maxPayloadBytes, + String descriptorBase64, String messageName) { + return withSchemaFormatter(cotXmlCodec, maxPayloadBytes, descriptorBase64, messageName, null); + } + + public static TakProtobufCodec withSchemaFormatter(CotXmlCodec cotXmlCodec, int maxPayloadBytes, + String descriptorBase64, String messageName, String protobufSchemaId) { + return new TakProtobufCodec(cotXmlCodec, maxPayloadBytes, createFormatter(descriptorBase64, messageName), protobufSchemaId); + } + + @Override + public Message decode(byte[] payload) throws IOException { + if (payload == null || payload.length == 0) { + throw new IOException("Invalid TAK protobuf payload: empty"); + } + if (payload.length > maxPayloadBytes) { + throw new IOException("Invalid TAK protobuf payload: exceeds max size"); + } + + Map meta = new LinkedHashMap<>(); + meta.put("tak.format", "protobuf"); + meta.put("tak.transport", "stream"); + + byte[] embeddedCot = extractEmbeddedCotXml(payload); + String schemaId = protobufSchemaId; + if (embeddedCot != null) { + try { + Message cotMessage = cotXmlCodec.decode(embeddedCot); + if (cotMessage.getMeta() != null) { + meta.putAll(cotMessage.getMeta()); + } + if (cotMessage.getSchemaId() != null && !cotMessage.getSchemaId().isBlank()) { + schemaId = cotMessage.getSchemaId(); + } + meta.put("tak.format", "protobuf"); + meta.put("tak.transport", "stream"); + meta.put("tak.protobuf_embedded_xml", "true"); + } catch (IOException ignored) { + // Keep protobuf metadata only when embedded CoT is malformed. + } + } else if (protobufFormatter != null) { + mergeSchemaParsedMeta(meta, payload); + } + addSelectorAliases(meta); + + MessageBuilder builder = new MessageBuilder() + .setOpaqueData(payload) + .setContentType("application/x-protobuf") + .setMeta(meta); + if (schemaId != null && !schemaId.isBlank()) { + builder.setSchemaId(schemaId); + } + return builder.build(); + } + + @Override + public byte[] encode(Message message) throws IOException { + byte[] opaque = message.getOpaqueData(); + byte[] cloudEventPayload = TakCloudEventPayloadExtractor.tryExtractPayload(opaque); + if (cloudEventPayload != null) { + opaque = cloudEventPayload; + } + if (opaque != null && opaque.length > 0 && !looksLikeCotXml(opaque)) { + if (opaque.length > maxPayloadBytes) { + throw new IOException("TAK protobuf payload exceeds max size"); + } + return opaque; + } + + // Fallback mode: wrap CoT XML as protobuf field #1 (length-delimited). + // This keeps the transport protobuf-framed for phase-2 interoperability work. + byte[] cotXml = (opaque != null && opaque.length > 0) ? opaque : cotXmlCodec.encode(message); + if (cotXml.length > maxPayloadBytes) { + throw new IOException("TAK protobuf payload exceeds max size"); + } + return wrapCotInLengthDelimitedField(cotXml); + } + + private static byte[] wrapCotInLengthDelimitedField(byte[] cotXml) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(cotXml.length + 8); + CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(out); + codedOutputStream.writeByteArray(1, cotXml); + codedOutputStream.flush(); + return out.toByteArray(); + } + + private static boolean looksLikeCotXml(byte[] payload) { + String candidate = new String(payload, StandardCharsets.UTF_8).trim(); + return candidate.startsWith(" 0 && looksLikeCotXml(candidate)) { + return candidate; + } + } else if (!input.skipField(tag)) { + break; + } + } + return null; + } + + private void mergeSchemaParsedMeta(Map meta, byte[] payload) { + try { + IdentifierResolver parsed = protobufFormatter.parse(payload); + if (parsed == null) { + return; + } + promoteAny(parsed, meta, "tak.uid", "uid", "event.uid"); + promoteAny(parsed, meta, "tak.type", "type", "event.type"); + promoteAny(parsed, meta, "tak.time", "time", "event.time"); + promoteAny(parsed, meta, "tak.start", "start", "event.start"); + promoteAny(parsed, meta, "tak.stale", "stale", "event.stale"); + promoteAny(parsed, meta, "tak.how", "how", "event.how"); + meta.put("tak.protobuf_parsed", "true"); + } catch (Exception ignored) { + // keep base protobuf metadata when schema parsing fails + } + } + + private static void promoteAny(IdentifierResolver parsed, Map meta, String toKey, String... fromKeys) { + for (String fromKey : fromKeys) { + Object val = parsed.get(fromKey); + if (val != null) { + meta.put(toKey, val.toString()); + return; + } + } + } + + private static void addSelectorAliases(Map meta) { + alias(meta, "tak.uid", "tak_uid"); + alias(meta, "tak.type", "tak_type"); + alias(meta, "tak.time", "tak_time"); + alias(meta, "tak.start", "tak_start"); + alias(meta, "tak.stale", "tak_stale"); + alias(meta, "tak.how", "tak_how"); + alias(meta, "tak.lat", "tak_lat"); + alias(meta, "tak.lon", "tak_lon"); + alias(meta, "tak.hae", "tak_hae"); + alias(meta, "tak.ce", "tak_ce"); + alias(meta, "tak.le", "tak_le"); + alias(meta, "tak.format", "tak_format"); + alias(meta, "tak.transport", "tak_transport"); + } + + private static void alias(Map meta, String sourceKey, String aliasKey) { + String value = meta.get(sourceKey); + if (value != null && !value.isBlank()) { + meta.put(aliasKey, value); + } + } + + private static MessageFormatter createFormatter(String descriptorBase64, String messageName) { + if (descriptorBase64 == null || descriptorBase64.isBlank() || messageName == null || messageName.isBlank()) { + return null; + } + try { + ProtoBufSchemaConfig schemaConfig = new ProtoBufSchemaConfig(); + ProtoBufSchemaConfig.ProtobufConfig protobufConfig = new ProtoBufSchemaConfig.ProtobufConfig(); + protobufConfig.setDescriptorValue(Base64.getDecoder().decode(descriptorBase64)); + protobufConfig.setMessageName(messageName); + schemaConfig.setProtobufConfig(protobufConfig); + return MessageFormatterFactory.getInstance().getFormatter(schemaConfig); + } catch (Exception ignored) { + return null; + } + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java new file mode 100644 index 000000000..1fc0cdf8e --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakFrameReader.java @@ -0,0 +1,55 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.List; + +public class TakFrameReader { + + private final TakStreamFramer framer; + private final byte[] readBuffer; + + public TakFrameReader(TakStreamFramer framer, int bufferSize) { + this.framer = framer; + this.readBuffer = new byte[Math.max(bufferSize, 256)]; + } + + public List read(InputStream inputStream) throws IOException { + int read = inputStream.read(readBuffer); + if (read < 0) { + throw new EOFException("TAK connection closed"); + } + return framer.onBytes(readBuffer, read); + } + + public List read(ByteBuffer byteBuffer) throws IOException { + if (byteBuffer == null || !byteBuffer.hasRemaining()) { + return List.of(); + } + int len = byteBuffer.remaining(); + byte[] data = new byte[len]; + byteBuffer.get(data); + return framer.onBytes(data, len); + } +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java new file mode 100644 index 000000000..d509eecfe --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramer.java @@ -0,0 +1,186 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TakStreamFramer { + + public enum Mode { + XML_STREAM, + PROTO_STREAM + } + + private static final byte TAK_PROTO_MAGIC = (byte) 0xBF; + private static final byte[] XML_END_TAG = "".getBytes(StandardCharsets.UTF_8); + + private final Mode mode; + private final int maxPayloadBytes; + private byte[] buffer; + + public TakStreamFramer(Mode mode, int maxPayloadBytes) { + this.mode = mode; + this.maxPayloadBytes = maxPayloadBytes; + this.buffer = new byte[0]; + } + + public synchronized List onBytes(byte[] incoming, int length) throws IOException { + if (incoming == null || length <= 0) { + return List.of(); + } + append(incoming, length); + if (buffer.length > maxPayloadBytes * 2) { + throw new IOException("Buffered TAK stream exceeds safe bound"); + } + return switch (mode) { + case XML_STREAM -> parseXml(); + case PROTO_STREAM -> parseProto(); + }; + } + + public synchronized byte[] frame(byte[] payload) throws IOException { + if (payload == null) { + return new byte[0]; + } + if (payload.length > maxPayloadBytes) { + throw new IOException("Payload exceeds max frame size"); + } + if (mode == Mode.XML_STREAM) { + return payload; + } + byte[] varint = encodeVarint(payload.length); + byte[] out = new byte[1 + varint.length + payload.length]; + out[0] = TAK_PROTO_MAGIC; + System.arraycopy(varint, 0, out, 1, varint.length); + System.arraycopy(payload, 0, out, 1 + varint.length, payload.length); + return out; + } + + private List parseXml() throws IOException { + List frames = new ArrayList<>(); + while (true) { + int endTag = indexOf(buffer, XML_END_TAG, 0); + if (endTag < 0) { + break; + } + int frameEnd = endTag + XML_END_TAG.length; + while (frameEnd < buffer.length && Character.isWhitespace((char) buffer[frameEnd])) { + frameEnd++; + } + if (frameEnd > maxPayloadBytes) { + throw new IOException("TAK XML frame exceeds max frame size"); + } + frames.add(Arrays.copyOfRange(buffer, 0, frameEnd)); + buffer = Arrays.copyOfRange(buffer, frameEnd, buffer.length); + } + return frames; + } + + private List parseProto() throws IOException { + List frames = new ArrayList<>(); + while (buffer.length > 0) { + if (buffer[0] != TAK_PROTO_MAGIC) { + throw new IOException("Invalid TAK protobuf stream header"); + } + if (buffer.length < 2) { + break; + } + VarintResult varintResult = decodeVarint(buffer, 1); + if (varintResult == null) { + break; + } + int length = varintResult.value; + if (length < 0 || length > maxPayloadBytes) { + throw new IOException("Invalid TAK protobuf payload length: " + length); + } + int frameLength = 1 + varintResult.bytesRead + length; + if (buffer.length < frameLength) { + break; + } + int payloadOffset = 1 + varintResult.bytesRead; + frames.add(Arrays.copyOfRange(buffer, payloadOffset, payloadOffset + length)); + buffer = Arrays.copyOfRange(buffer, frameLength, buffer.length); + } + return frames; + } + + private void append(byte[] data, int length) { + int copyLength = Math.min(length, data.length); + byte[] next = Arrays.copyOf(buffer, buffer.length + copyLength); + System.arraycopy(data, 0, next, buffer.length, copyLength); + buffer = next; + } + + private static byte[] encodeVarint(int value) { + byte[] tmp = new byte[5]; + int count = 0; + int v = value; + while ((v & ~0x7F) != 0) { + tmp[count++] = (byte) ((v & 0x7F) | 0x80); + v >>>= 7; + } + tmp[count++] = (byte) (v & 0x7F); + return Arrays.copyOf(tmp, count); + } + + private static VarintResult decodeVarint(byte[] data, int offset) throws IOException { + int value = 0; + int shift = 0; + int bytes = 0; + for (int i = offset; i < data.length && bytes < 5; i++) { + int b = data[i] & 0xFF; + value |= (b & 0x7F) << shift; + bytes++; + if ((b & 0x80) == 0) { + return new VarintResult(value, bytes); + } + shift += 7; + } + if (bytes >= 5 && (data[offset + 4] & 0x80) != 0) { + throw new IOException("Malformed varint length"); + } + return null; + } + + private static int indexOf(byte[] haystack, byte[] needle, int from) { + if (needle.length == 0 || haystack.length < needle.length) { + return -1; + } + for (int i = from; i <= haystack.length - needle.length; i++) { + boolean match = true; + for (int j = 0; j < needle.length; j++) { + if (haystack[i + j] != needle[j]) { + match = false; + break; + } + } + if (match) { + return i; + } + } + return -1; + } + + private record VarintResult(int value, int bytesRead) {} +} diff --git a/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java new file mode 100644 index 000000000..31cd4d76d --- /dev/null +++ b/src/main/java/io/mapsmessaging/network/protocol/impl/tak/transport/TakMulticastTransport.java @@ -0,0 +1,134 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.transport; + +import io.mapsmessaging.network.protocol.impl.tak.TakExtensionConfig; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.MulticastSocket; +import java.net.NetworkInterface; +import java.net.SocketTimeoutException; +import java.util.Optional; +import java.util.concurrent.locks.ReentrantLock; + +public class TakMulticastTransport { + + private final TakExtensionConfig config; + private final ReentrantLock sendLock; + + private MulticastSocket socket; + private InetAddress groupAddress; + + public TakMulticastTransport(TakExtensionConfig config) { + this.config = config; + this.sendLock = new ReentrantLock(); + } + + public synchronized void start() throws IOException { + close(); + groupAddress = InetAddress.getByName(config.getMulticastGroup()); + socket = new MulticastSocket(config.getMulticastPort()); + socket.setReuseAddress(true); + socket.setSoTimeout(1000); + socket.setTimeToLive(config.getMulticastTtl()); + + NetworkInterface networkInterface = resolveInterface(); + if (networkInterface != null) { + socket.setNetworkInterface(networkInterface); + socket.joinGroup(new InetSocketAddress(groupAddress, config.getMulticastPort()), networkInterface); + } else { + socket.joinGroup(groupAddress); + } + } + + public Optional read() throws IOException { + MulticastSocket current = getSocket(); + byte[] buffer = new byte[config.getMulticastReadBufferBytes()]; + DatagramPacket packet = new DatagramPacket(buffer, buffer.length); + try { + current.receive(packet); + if (packet.getLength() > config.getMaxPayloadBytes()) { + return Optional.empty(); + } + byte[] copy = new byte[packet.getLength()]; + System.arraycopy(packet.getData(), packet.getOffset(), copy, 0, packet.getLength()); + return Optional.of(copy); + } catch (SocketTimeoutException ignored) { + return Optional.empty(); + } + } + + public void send(byte[] payload) throws IOException { + MulticastSocket current = getSocket(); + DatagramPacket packet = new DatagramPacket(payload, payload.length, groupAddress, config.getMulticastPort()); + sendLock.lock(); + try { + current.send(packet); + } finally { + sendLock.unlock(); + } + } + + public synchronized void close() throws IOException { + if (socket == null) { + return; + } + IOException deferred = null; + try { + NetworkInterface networkInterface = resolveInterface(); + if (networkInterface != null && groupAddress != null) { + socket.leaveGroup(new InetSocketAddress(groupAddress, config.getMulticastPort()), networkInterface); + } else if (groupAddress != null) { + socket.leaveGroup(groupAddress); + } + } catch (IOException ex) { + deferred = ex; + } + socket.close(); + socket = null; + groupAddress = null; + if (deferred != null) { + throw deferred; + } + } + + private synchronized MulticastSocket getSocket() throws IOException { + if (socket == null || socket.isClosed()) { + throw new IOException("TAK multicast transport is not started"); + } + return socket; + } + + private NetworkInterface resolveInterface() throws IOException { + String iface = config.getMulticastInterface(); + if (iface == null || iface.isBlank()) { + return null; + } + NetworkInterface named = NetworkInterface.getByName(iface); + if (named != null) { + return named; + } + InetAddress address = InetAddress.getByName(iface); + return NetworkInterface.getByInetAddress(address); + } +} diff --git a/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory b/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory index 6d4a40add..2cf75a557 100644 --- a/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory +++ b/src/main/resources/META-INF/services/io.mapsmessaging.network.io.EndPointConnectionFactory @@ -21,4 +21,5 @@ io.mapsmessaging.network.io.impl.tcp.TCPEndPointConnectionFactory io.mapsmessaging.network.io.impl.ssl.SSLEndPointConnectionFactory io.mapsmessaging.network.io.impl.noop.NoOpEndPointConnectionFactory io.mapsmessaging.network.io.impl.udp.UDPEndPointConnectionFactory -io.mapsmessaging.network.io.impl.serial.SerialEndPointConnectionFactory \ No newline at end of file +io.mapsmessaging.network.io.impl.serial.SerialEndPointConnectionFactory +io.mapsmessaging.network.protocol.impl.tak.TakEndPointConnectionFactory diff --git a/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory b/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory index 7af47b746..46d7b5a9f 100644 --- a/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory +++ b/src/main/resources/META-INF/services/io.mapsmessaging.network.protocol.ProtocolImplFactory @@ -75,4 +75,5 @@ io.mapsmessaging.network.protocol.impl.apache_pulsar.PulsarProtocolFactory io.mapsmessaging.network.protocol.impl.ibm_mq.MqProtocolFactory io.mapsmessaging.network.protocol.impl.aws_sns.SnsProtocolFactory io.mapsmessaging.network.protocol.impl.v2x_step.V2xStepProtocolFactory -io.mapsmessaging.network.protocol.impl.extension.impl.example.ExampleProtocolFactory \ No newline at end of file +io.mapsmessaging.network.protocol.impl.extension.impl.example.ExampleProtocolFactory +io.mapsmessaging.network.protocol.impl.tak.TakProtocolFactory diff --git a/src/main/resources/NetworkConnectionManager.yaml b/src/main/resources/NetworkConnectionManager.yaml index 942151398..1b2ce0964 100644 --- a/src/main/resources/NetworkConnectionManager.yaml +++ b/src/main/resources/NetworkConnectionManager.yaml @@ -66,4 +66,41 @@ NetworkConnectionManager: - direction: pull local_namespace: "/" - remote_namespace: "/#" \ No newline at end of file + remote_namespace: "/#" + + # Example TAK bridge (CoT XML stream over TLS) + # - name: "TAK Server Connection" + # url: ssl://tak.example.local:8089/ + # protocol: tak + # plugin: true + # + # config: + # payload: cot_xml + # framing: xml_stream + # max_payload_bytes: 1048576 + # reconnect_delay_ms: 2000 + # reconnect_max_delay_ms: 30000 + # reconnect_backoff_multiplier: 2.0 + # reconnect_jitter_ms: 250 + # read_buffer_bytes: 8192 + # xml_schema_id: "10000000-0000-1000-a000-100000000004" # default XML schema + # + # # Optional protobuf mode: + # # payload: tak_proto_v1 + # # framing: proto_stream + # # protobuf_descriptor_base64: "" + # # protobuf_message_name: "TakMessage" + # # protobuf_schema_id: "" + # + # links: + # - direction: pull + # remote_namespace: "tak/cot/#" + # local_namespace: "tak/cot/" + # selector: "" + # qos: 0 + # + # - direction: push + # remote_namespace: "tak/cot/out/#" + # local_namespace: "ops/cot/out/" + # selector: "tak_type LIKE 'a-%'" + # qos: 0 diff --git a/src/test/java/io/mapsmessaging/api/transformers/TakToCloudEventTransformationTest.java b/src/test/java/io/mapsmessaging/api/transformers/TakToCloudEventTransformationTest.java new file mode 100644 index 000000000..9bb10ea6b --- /dev/null +++ b/src/test/java/io/mapsmessaging/api/transformers/TakToCloudEventTransformationTest.java @@ -0,0 +1,183 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.api.transformers; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import io.mapsmessaging.api.message.Message; +import io.mapsmessaging.engine.schema.SchemaManager; +import io.mapsmessaging.network.protocol.impl.tak.codec.CotXmlCodec; +import io.mapsmessaging.network.protocol.impl.tak.codec.TakProtobufCodec; +import io.mapsmessaging.network.protocol.transformation.cloudevent.CloudEventEnvelopeTransformation; +import io.mapsmessaging.network.protocol.transformation.cloudevent.CloudEventJsonTransformation; +import io.mapsmessaging.network.protocol.transformation.cloudevent.CloudEventNativeTransformation; +import io.mapsmessaging.schemas.config.impl.ProtoBufSchemaConfig; +import io.mapsmessaging.test.BaseTestConfig; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.*; + +class TakToCloudEventTransformationTest extends BaseTestConfig { + + private final SchemaManager schemaManager = SchemaManager.getInstance(); + private UUID protobufSchemaId; + + @AfterEach + void cleanUpSchema() { + if (protobufSchemaId != null) { + schemaManager.removeSchema(protobufSchemaId.toString()); + protobufSchemaId = null; + } + } + + @Test + void cotXmlDecodedMessageTransformsToCloudEventJsonData() throws Exception { + CotXmlCodec codec = new CotXmlCodec(); + byte[] cotPayload = """ + + + + """.getBytes(StandardCharsets.UTF_8); + Message takMessage = codec.decode(cotPayload); + + Message cloudEventMessage = new CloudEventJsonTransformation().outgoing(takMessage, "/tak/cot/in"); + assertNotNull(cloudEventMessage); + JsonObject cloudEvent = JsonParser.parseString(new String(cloudEventMessage.getOpaqueData(), StandardCharsets.UTF_8)).getAsJsonObject(); + + assertEquals("1.0", cloudEvent.get("specversion").getAsString()); + assertEquals("application/json", cloudEvent.get("datacontenttype").getAsString()); + assertTrue(cloudEvent.has("data")); + assertFalse(cloudEvent.getAsJsonObject("data").has("payload_base64")); + assertEquals("u-xml-1", cloudEvent.get("mapsMeta_tak_uid").getAsString()); + assertEquals("xml", cloudEvent.get("mapsMeta_tak_format").getAsString()); + } + + @Test + void takProtobufDecodedMessageTransformsToCloudEventJsonData() throws Exception { + protobufSchemaId = UUID.randomUUID(); + ProtoBufSchemaConfig schema = (ProtoBufSchemaConfig) TestProtobufSchemas.protobufSchemaConfig(protobufSchemaId); + schemaManager.addSchema("/tak/proto", schema); + String descriptorBase64 = Base64.getEncoder().encodeToString(schema.getProtobufConfig().getDescriptorValue()); + String messageName = schema.getProtobufConfig().getMessageName(); + + TakProtobufCodec codec = TakProtobufCodec.withSchemaFormatter( + new CotXmlCodec(), + 1024 * 1024, + descriptorBase64, + messageName, + protobufSchemaId.toString()); + Message takMessage = codec.decode(TestProtobufSchemas.encodeProtobufPayload()); + + Message cloudEventMessage = new CloudEventJsonTransformation().outgoing(takMessage, "/tak/proto/in"); + assertNotNull(cloudEventMessage); + JsonObject cloudEvent = JsonParser.parseString(new String(cloudEventMessage.getOpaqueData(), StandardCharsets.UTF_8)).getAsJsonObject(); + JsonObject data = cloudEvent.getAsJsonObject("data"); + + assertEquals("1.0", cloudEvent.get("specversion").getAsString()); + assertEquals("application/json", cloudEvent.get("datacontenttype").getAsString()); + assertTrue(cloudEvent.has("mapsSchemaId")); + assertTrue(data.has("id")); + assertEquals("abc", data.get("id").getAsString()); + assertEquals(123, data.get("count").getAsInt()); + assertTrue(data.get("active").getAsBoolean()); + assertEquals("protobuf", cloudEvent.get("mapsMeta_tak_format").getAsString()); + } + + @Test + void cotXmlRoundTripsTakCloudEventTakViaEnvelope() throws Exception { + CotXmlCodec codec = new CotXmlCodec(); + byte[] cotPayload = "" + .getBytes(StandardCharsets.UTF_8); + Message takMessage = codec.decode(cotPayload); + Message cloudEvent = new CloudEventEnvelopeTransformation().outgoing(takMessage, "/tak/cot/in"); + + byte[] encodedBackToTak = codec.encode(cloudEvent); + assertArrayEquals(cotPayload, encodedBackToTak); + + Message decodedBack = codec.decode(encodedBackToTak); + assertEquals("u-xml-2", decodedBack.getMeta().get("tak.uid")); + assertEquals("xml", decodedBack.getMeta().get("tak.format")); + } + + @Test + void protobufRoundTripsTakCloudEventTakViaNative() throws Exception { + protobufSchemaId = UUID.randomUUID(); + ProtoBufSchemaConfig schema = (ProtoBufSchemaConfig) TestProtobufSchemas.protobufSchemaConfig(protobufSchemaId); + schemaManager.addSchema("/tak/proto", schema); + String descriptorBase64 = Base64.getEncoder().encodeToString(schema.getProtobufConfig().getDescriptorValue()); + String messageName = schema.getProtobufConfig().getMessageName(); + + TakProtobufCodec codec = TakProtobufCodec.withSchemaFormatter( + new CotXmlCodec(), + 1024 * 1024, + descriptorBase64, + messageName, + protobufSchemaId.toString()); + byte[] protobufPayload = TestProtobufSchemas.encodeProtobufPayload(); + Message takMessage = codec.decode(protobufPayload); + Message cloudEvent = new CloudEventNativeTransformation().outgoing(takMessage, "/tak/proto/in"); + + byte[] encodedBackToTak = codec.encode(cloudEvent); + assertArrayEquals(protobufPayload, encodedBackToTak); + + Message decodedBack = codec.decode(encodedBackToTak); + assertEquals("protobuf", decodedBack.getMeta().get("tak.format")); + } + + @Test + void cotXmlToCloudEventToTakProtobufToCloudEventToCotXml() throws Exception { + protobufSchemaId = UUID.randomUUID(); + ProtoBufSchemaConfig schema = (ProtoBufSchemaConfig) TestProtobufSchemas.protobufSchemaConfig(protobufSchemaId); + schemaManager.addSchema("/tak/proto", schema); + String descriptorBase64 = Base64.getEncoder().encodeToString(schema.getProtobufConfig().getDescriptorValue()); + String messageName = schema.getProtobufConfig().getMessageName(); + + CotXmlCodec xmlCodec = new CotXmlCodec(); + TakProtobufCodec protobufCodec = TakProtobufCodec.withSchemaFormatter( + new CotXmlCodec(), + 1024 * 1024, + descriptorBase64, + messageName, + protobufSchemaId.toString()); + + byte[] startXml = "" + .getBytes(StandardCharsets.UTF_8); + Message takXmlMessage = xmlCodec.decode(startXml); + + Message cloudEventFromXml = new CloudEventEnvelopeTransformation().outgoing(takXmlMessage, "/tak/cot/in"); + byte[] takProtobufBytes = protobufCodec.encode(cloudEventFromXml); + Message takProtobufMessage = protobufCodec.decode(takProtobufBytes); + + Message cloudEventFromProtobuf = new CloudEventNativeTransformation().outgoing(takProtobufMessage, "/tak/proto/in"); + byte[] endXml = xmlCodec.encode(cloudEventFromProtobuf); + Message decodedEndXml = xmlCodec.decode(endXml); + + assertEquals("u-chain-1", decodedEndXml.getMeta().get("tak.uid")); + assertEquals("a-f-G-U-C", decodedEndXml.getMeta().get("tak.type")); + assertEquals("10.1", decodedEndXml.getMeta().get("tak.lat")); + assertEquals("11.2", decodedEndXml.getMeta().get("tak.lon")); + assertEquals("xml", decodedEndXml.getMeta().get("tak.format")); + } +} diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java new file mode 100644 index 000000000..204a1c0c3 --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/TakExtensionTest.java @@ -0,0 +1,140 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak; + +import io.mapsmessaging.dto.rest.config.protocol.impl.ExtensionConfigDTO; +import io.mapsmessaging.network.protocol.impl.tak.framing.TakStreamFramer; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TakExtensionTest { + + @Test + void parsesExtensionConfigWithDefaultsAndOverrides() throws Exception { + ExtensionConfigDTO dto = new ExtensionConfigDTO(); + Map config = new LinkedHashMap<>(); + config.put("payload", "tak_proto_v1"); + config.put("framing", "proto_stream"); + config.put("max_payload_bytes", 2048); + config.put("reconnect_delay_ms", 1500); + config.put("reconnect_max_delay_ms", 6000); + config.put("reconnect_backoff_multiplier", 1.5); + config.put("reconnect_jitter_ms", 100); + config.put("read_buffer_bytes", 4096); + config.put("multicast_enabled", true); + config.put("multicast_ingress_enabled", true); + config.put("multicast_egress_enabled", false); + config.put("multicast_group", "239.88.77.66"); + config.put("multicast_port", 7171); + config.put("multicast_interface", "lo0"); + config.put("multicast_ttl", 3); + config.put("multicast_read_buffer_bytes", 2048); + config.put("protobuf_descriptor_base64", "AQID"); + config.put("protobuf_message_name", "TakMessage"); + config.put("xml_schema_id", "10000000-0000-1000-a000-100000000004"); + config.put("protobuf_schema_id", "f4f6f837-89ce-4cca-b74d-aa14513c501f"); + config.put("xml_namespace_aware", false); + config.put("xml_coalescing", false); + config.put("xml_validating", true); + config.put("xml_root_entry", "cotEvent"); + setField(dto, "config", config); + + TakExtensionConfig parsed = TakExtensionConfig.from(dto); + + assertEquals("tak_proto_v1", parsed.getPayload()); + assertEquals(TakStreamFramer.Mode.PROTO_STREAM, parsed.getFramingMode()); + assertEquals(2048, parsed.getMaxPayloadBytes()); + assertEquals(1500, parsed.getReconnectDelayMs()); + assertEquals(6000, parsed.getReconnectMaxDelayMs()); + assertEquals(1.5d, parsed.getReconnectBackoffMultiplier()); + assertEquals(100, parsed.getReconnectJitterMs()); + assertEquals(4096, parsed.getReadBufferBytes()); + assertTrue(parsed.isMulticastEnabled()); + assertTrue(parsed.isMulticastIngressEnabled()); + assertFalse(parsed.isMulticastEgressEnabled()); + assertEquals("239.88.77.66", parsed.getMulticastGroup()); + assertEquals(7171, parsed.getMulticastPort()); + assertEquals("lo0", parsed.getMulticastInterface()); + assertEquals(3, parsed.getMulticastTtl()); + assertEquals(2048, parsed.getMulticastReadBufferBytes()); + assertEquals("AQID", parsed.getProtobufDescriptorBase64()); + assertEquals("TakMessage", parsed.getProtobufMessageName()); + assertEquals("10000000-0000-1000-a000-100000000004", parsed.getXmlSchemaId()); + assertEquals("f4f6f837-89ce-4cca-b74d-aa14513c501f", parsed.getProtobufSchemaId()); + assertFalse(parsed.isXmlNamespaceAware()); + assertFalse(parsed.isXmlCoalescing()); + assertTrue(parsed.isXmlValidating()); + assertEquals("cotEvent", parsed.getXmlRootEntry()); + } + + @Test + void appliesSafeDefaultsWhenMissing() { + TakExtensionConfig parsed = TakExtensionConfig.from(null); + assertEquals("cot_xml", parsed.getPayload()); + assertEquals(TakStreamFramer.Mode.XML_STREAM, parsed.getFramingMode()); + assertEquals(1024 * 1024, parsed.getMaxPayloadBytes()); + assertEquals(2000, parsed.getReconnectDelayMs()); + assertEquals(30000, parsed.getReconnectMaxDelayMs()); + assertEquals(2.0d, parsed.getReconnectBackoffMultiplier()); + assertEquals(250, parsed.getReconnectJitterMs()); + assertFalse(parsed.isMulticastEnabled()); + assertEquals("239.2.3.1", parsed.getMulticastGroup()); + assertEquals(6969, parsed.getMulticastPort()); + assertEquals("", parsed.getProtobufDescriptorBase64()); + assertEquals("", parsed.getProtobufMessageName()); + assertEquals("", parsed.getXmlSchemaId()); + assertEquals("", parsed.getProtobufSchemaId()); + assertTrue(parsed.isXmlNamespaceAware()); + assertTrue(parsed.isXmlCoalescing()); + assertFalse(parsed.isXmlValidating()); + assertEquals("event", parsed.getXmlRootEntry()); + } + + @Test + void clampsReconnectHardeningValues() throws Exception { + ExtensionConfigDTO dto = new ExtensionConfigDTO(); + Map config = new LinkedHashMap<>(); + config.put("reconnect_delay_ms", 50); + config.put("reconnect_max_delay_ms", 10); + config.put("reconnect_backoff_multiplier", 0.1d); + config.put("reconnect_jitter_ms", -1); + setField(dto, "config", config); + + TakExtensionConfig parsed = TakExtensionConfig.from(dto); + + assertEquals(100, parsed.getReconnectDelayMs()); + assertEquals(100, parsed.getReconnectMaxDelayMs()); + assertEquals(1.0d, parsed.getReconnectBackoffMultiplier()); + assertEquals(0, parsed.getReconnectJitterMs()); + } + + private static void setField(Object target, String fieldName, Object value) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } +} diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java new file mode 100644 index 000000000..5da3631b6 --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/codec/CotXmlCodecTest.java @@ -0,0 +1,115 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.codec; + +import io.mapsmessaging.api.MessageBuilder; +import io.mapsmessaging.api.message.Message; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class CotXmlCodecTest { + + private static final String VALID_COT = """ + + + + """; + + @Test + void decodeValidCotExtractsCoreFields() throws IOException { + CotXmlCodec codec = new CotXmlCodec(); + + Message message = codec.decode(VALID_COT.getBytes(StandardCharsets.UTF_8)); + + assertNotNull(message.getMeta()); + assertEquals("u-1", message.getMeta().get("tak.uid")); + assertEquals("a-f-G-U-C", message.getMeta().get("tak.type")); + assertEquals("51.5007", message.getMeta().get("tak.lat")); + assertEquals("-0.1246", message.getMeta().get("tak.lon")); + assertEquals("application/cot+xml", message.getContentType()); + assertNotNull(message.getOpaqueData()); + } + + @Test + void decodeMissingRequiredFieldFails() { + CotXmlCodec codec = new CotXmlCodec(); + String invalid = """ + + + + """; + + assertThrows(IOException.class, () -> codec.decode(invalid.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void decodeMalformedXmlFails() { + CotXmlCodec codec = new CotXmlCodec(); + String invalid = " codec.decode(invalid.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void decodeRejectsXxe() { + CotXmlCodec codec = new CotXmlCodec(); + String xxe = """ + ]> + + + &xxe; + + """; + assertThrows(IOException.class, () -> codec.decode(xxe.getBytes(StandardCharsets.UTF_8))); + } + + @Test + void encodeFromMetaBuildsCotXml() throws IOException { + CotXmlCodec codec = new CotXmlCodec(); + Map meta = new LinkedHashMap<>(); + meta.put("tak.uid", "u-2"); + meta.put("tak.type", "a-f-G-U-C"); + meta.put("tak.time", "2026-02-19T09:10:00Z"); + meta.put("tak.start", "2026-02-19T09:10:00Z"); + meta.put("tak.stale", "2026-02-19T09:15:00Z"); + meta.put("tak.how", "m-g"); + meta.put("tak.lat", "1"); + meta.put("tak.lon", "2"); + meta.put("tak.hae", "3"); + meta.put("tak.ce", "4"); + meta.put("tak.le", "5"); + + Message message = new MessageBuilder().setMeta(meta).build(); + byte[] xml = codec.encode(message); + String text = new String(xml, StandardCharsets.UTF_8); + + assertTrue(text.startsWith(" + + + """; + Message message = new MessageBuilder().setOpaqueData(cot.getBytes(StandardCharsets.UTF_8)).build(); + + byte[] encoded = codec.encode(message); + Message decoded = codec.decode(encoded); + + assertEquals("application/x-protobuf", decoded.getContentType()); + assertEquals("protobuf", decoded.getMeta().get("tak.format")); + assertEquals("u-100", decoded.getMeta().get("tak.uid")); + assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type")); + assertNotNull(decoded.getOpaqueData()); + assertTrue(decoded.getOpaqueData().length > 0); + } + + @Test + void passesThroughBinaryProtobufPayload() throws IOException { + TakProtobufCodec codec = new TakProtobufCodec(); + byte[] raw = new byte[]{0x08, 0x7F, 0x10, 0x01}; + Message message = new MessageBuilder().setOpaqueData(raw).build(); + + byte[] encoded = codec.encode(message); + Message decoded = codec.decode(encoded); + + assertArrayEquals(raw, encoded); + assertArrayEquals(raw, decoded.getOpaqueData()); + assertEquals("protobuf", decoded.getMeta().get("tak.format")); + } + + @Test + void buildsCotFromMetaWhenPayloadMissing() throws IOException { + TakProtobufCodec codec = new TakProtobufCodec(); + Map meta = new LinkedHashMap<>(); + meta.put("tak.uid", "u-200"); + meta.put("tak.type", "a-f-G-U-C"); + meta.put("tak.time", "2026-02-19T09:10:00Z"); + meta.put("tak.start", "2026-02-19T09:10:00Z"); + meta.put("tak.stale", "2026-02-19T09:15:00Z"); + meta.put("tak.how", "m-g"); + meta.put("tak.lat", "1"); + meta.put("tak.lon", "2"); + Message message = new MessageBuilder().setMeta(meta).build(); + + byte[] encoded = codec.encode(message); + Message decoded = codec.decode(encoded); + + assertEquals("u-200", decoded.getMeta().get("tak.uid")); + assertEquals("a-f-G-U-C", decoded.getMeta().get("tak.type")); + } + + @Test + void decodesUsingMapsProtobufFormatterWhenConfigured() throws IOException { + ProtoBufSchemaConfig schema = (ProtoBufSchemaConfig) TestProtobufSchemas.protobufSchemaConfig(UUID.randomUUID()); + String descriptorBase64 = Base64.getEncoder().encodeToString(schema.getProtobufConfig().getDescriptorValue()); + String messageName = schema.getProtobufConfig().getMessageName(); + TakProtobufCodec codec = TakProtobufCodec.withSchemaFormatter(new CotXmlCodec(), 1024 * 1024, descriptorBase64, messageName); + + byte[] payload = TestProtobufSchemas.encodeProtobufPayload(); + Message decoded = codec.decode(payload); + + assertEquals("true", decoded.getMeta().get("tak.protobuf_parsed")); + } +} diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java new file mode 100644 index 000000000..17bd80ec1 --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerTest.java @@ -0,0 +1,80 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TakStreamFramerTest { + + @Test + void protoFrameRoundTrip() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024 * 1024); + byte[] payload = "hello".getBytes(StandardCharsets.UTF_8); + + byte[] framed = framer.frame(payload); + List decoded = framer.onBytes(framed, framed.length); + + assertEquals(1, decoded.size()); + assertArrayEquals(payload, decoded.getFirst()); + } + + @Test + void protoPartialReadReassemblesFrames() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024 * 1024); + byte[] payload = "abc123".getBytes(StandardCharsets.UTF_8); + byte[] framed = framer.frame(payload); + + List first = framer.onBytes(framed, 2); + assertTrue(first.isEmpty()); + List second = framer.onBytes(slice(framed, 2), framed.length - 2); + + assertEquals(1, second.size()); + assertArrayEquals(payload, second.getFirst()); + } + + @Test + void protoRejectsOversizedFrame() { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 8); + byte[] fake = new byte[]{(byte) 0xBF, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}; + assertThrows(IOException.class, () -> framer.onBytes(fake, fake.length)); + } + + @Test + void protoRejectsInvalidMagic() { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.PROTO_STREAM, 1024); + byte[] invalid = new byte[]{0x01, 0x01, 0x7F}; + assertThrows(IOException.class, () -> framer.onBytes(invalid, invalid.length)); + } + + private static byte[] slice(byte[] source, int from) { + byte[] out = new byte[source.length - from]; + System.arraycopy(source, from, out, 0, out.length); + return out; + } +} diff --git a/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java new file mode 100644 index 000000000..f221e518c --- /dev/null +++ b/src/test/java/io/mapsmessaging/network/protocol/impl/tak/framing/TakStreamFramerXmlTest.java @@ -0,0 +1,58 @@ +/* + * + * Copyright [ 2020 - 2024 ] Matthew Buckton + * Copyright [ 2024 - 2026 ] MapsMessaging B.V. + * + * Licensed under the Apache License, Version 2.0 with the Commons Clause + * (the "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * https://commonsclause.com/ + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.mapsmessaging.network.protocol.impl.tak.framing; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TakStreamFramerXmlTest { + + @Test + void xmlDelimitsMultipleEvents() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.XML_STREAM, 1024 * 1024); + String payload = "\n\n"; + + List frames = framer.onBytes(payload.getBytes(StandardCharsets.UTF_8), payload.length()); + + assertEquals(2, frames.size()); + assertTrue(new String(frames.get(0), StandardCharsets.UTF_8).contains("uid=\"1\"")); + assertTrue(new String(frames.get(1), StandardCharsets.UTF_8).contains("uid=\"2\"")); + } + + @Test + void xmlPartialEventBuffersUntilComplete() throws IOException { + TakStreamFramer framer = new TakStreamFramer(TakStreamFramer.Mode.XML_STREAM, 1024 * 1024); + byte[] bytes = "".getBytes(StandardCharsets.UTF_8); + + List first = framer.onBytes(bytes, 10); + assertTrue(first.isEmpty()); + + byte[] rest = new byte[bytes.length - 10]; + System.arraycopy(bytes, 10, rest, 0, rest.length); + List second = framer.onBytes(rest, rest.length); + assertEquals(1, second.size()); + } +}