Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions cassandra/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -1085,20 +1085,10 @@ def encode_message(cls, msg, stream_id, protocol_version, compressor, allow_beta
:param compressor: optional compression function to be used on the body
"""
flags = 0
body = io.BytesIO()
if msg.custom_payload:
if protocol_version < 4:
raise UnsupportedOperation("Custom key/value payloads can only be used with protocol version 4 or higher")
flags |= CUSTOM_PAYLOAD_FLAG
write_bytesmap(body, msg.custom_payload)
msg.send_body(body, protocol_version)
body = body.getvalue()

# With checksumming, the compression is done at the segment frame encoding
if (not ProtocolVersion.has_checksumming_support(protocol_version)
and compressor and len(body) > 0):
body = compressor(body)
flags |= COMPRESSED_FLAG

if msg.tracing:
flags |= TRACING_FLAG
Expand All @@ -1107,9 +1097,31 @@ def encode_message(cls, msg, stream_id, protocol_version, compressor, allow_beta
flags |= USE_BETA_FLAG

buff = io.BytesIO()
cls._write_header(buff, protocol_version, flags, stream_id, msg.opcode, len(body))
buff.write(body)
buff.seek(9)

# With checksumming, the compression is done at the segment frame encoding
if (compressor and not ProtocolVersion.has_checksumming_support(protocol_version)):
body = io.BytesIO()
if msg.custom_payload:
write_bytesmap(body, msg.custom_payload)
msg.send_body(body, protocol_version)
body = body.getvalue()

if len(body) > 0:
body = compressor(body)
flags |= COMPRESSED_FLAG

buff.write(body)
length = len(body)
else:
if msg.custom_payload:
write_bytesmap(buff, msg.custom_payload)
msg.send_body(buff, protocol_version)

length = buff.tell() - 9

buff.seek(0)
cls._write_header(buff, protocol_version, flags, stream_id, msg.opcode, length)
return buff.getvalue()

@staticmethod
Expand Down
Loading