From cd72bb81d6a8ad47e547a940db78a105e4686e56 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Sat, 10 Jan 2026 00:05:31 +0200 Subject: [PATCH] Optimize write path in protocol.py to reduce copies Refactored `_ProtocolHandler.encode_message` to reduce memory copies and allocations. - Implemented 'Reserve and Seek' strategy for the write path. - In uncompressed scenarios (including Protocol V5+), we now write directly to the final buffer instead of an intermediate one, avoiding `bytes` creation and buffer copying. - Reserved space for the frame header, wrote the body, and then back-filled the header with the correct length. - Unified buffer initialization and header writing logic for cleaner code. - Optimized conditional checks for compression support. Signed-off-by: Yaniv Kaul --- cassandra/protocol.py | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index e574965de8..f37633a756 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -1085,20 +1085,10 @@ def encode_message(cls, msg, stream_id, protocol_version, compressor, allow_beta :param compressor: optional compression function to be used on the body """ flags = 0 - body = io.BytesIO() if msg.custom_payload: if protocol_version < 4: raise UnsupportedOperation("Custom key/value payloads can only be used with protocol version 4 or higher") flags |= CUSTOM_PAYLOAD_FLAG - write_bytesmap(body, msg.custom_payload) - msg.send_body(body, protocol_version) - body = body.getvalue() - - # With checksumming, the compression is done at the segment frame encoding - if (not ProtocolVersion.has_checksumming_support(protocol_version) - and compressor and len(body) > 0): - body = compressor(body) - flags |= COMPRESSED_FLAG if msg.tracing: flags |= TRACING_FLAG @@ -1107,9 +1097,31 @@ def encode_message(cls, msg, stream_id, protocol_version, compressor, allow_beta flags |= USE_BETA_FLAG buff = io.BytesIO() - cls._write_header(buff, protocol_version, flags, stream_id, msg.opcode, len(body)) - buff.write(body) + buff.seek(9) + + # With checksumming, the compression is done at the segment frame encoding + if (compressor and not ProtocolVersion.has_checksumming_support(protocol_version)): + body = io.BytesIO() + if msg.custom_payload: + write_bytesmap(body, msg.custom_payload) + msg.send_body(body, protocol_version) + body = body.getvalue() + + if len(body) > 0: + body = compressor(body) + flags |= COMPRESSED_FLAG + + buff.write(body) + length = len(body) + else: + if msg.custom_payload: + write_bytesmap(buff, msg.custom_payload) + msg.send_body(buff, protocol_version) + + length = buff.tell() - 9 + buff.seek(0) + cls._write_header(buff, protocol_version, flags, stream_id, msg.opcode, length) return buff.getvalue() @staticmethod