From 57838d5b97baaf6742dbf897fbbc9981b5a6578f Mon Sep 17 00:00:00 2001 From: dtfiedler Date: Wed, 14 Jan 2026 08:44:13 -0600 Subject: [PATCH] feat(bundle): add stream signing support for large files Add streaming hash and signing functions that process data incrementally without loading the entire payload into memory. This enables signing large files with constant memory usage and progress callbacks. New functions: - deep_hash_blob_stream: Stream-hash a blob - get_signature_data_stream: Compute signature hash with streaming data - sign_stream: Sign streaming data end-to-end Features: - 256 KiB default chunk size (matches Arweave) - Progress callbacks: on_progress(processed_bytes, total_bytes) - Produces identical signatures to in-memory sign() PE-8860 Co-Authored-By: Claude Opus 4.5 --- tests/test_stream_signing.py | 323 +++++++++++++++++++++++++++++++++++ turbo_sdk/bundle/__init__.py | 14 +- turbo_sdk/bundle/sign.py | 179 +++++++++++++++++++ 3 files changed, 515 insertions(+), 1 deletion(-) create mode 100644 tests/test_stream_signing.py diff --git a/tests/test_stream_signing.py b/tests/test_stream_signing.py new file mode 100644 index 0000000..f89d99b --- /dev/null +++ b/tests/test_stream_signing.py @@ -0,0 +1,323 @@ +""" +Unit tests for stream signing functionality. + +Tests verify that stream signing produces identical results to in-memory signing +and that progress callbacks work correctly. +""" + +import io +import pytest +from turbo_sdk.bundle import create_data, sign, encode_tags +from turbo_sdk.bundle.sign import ( + deep_hash, + deep_hash_blob_stream, + get_signature_data, + get_signature_data_stream, + sign_stream, + DEFAULT_STREAM_CHUNK_SIZE, +) +from turbo_sdk.signers import EthereumSigner + + +# Test private key (not a real key, just for testing) +TEST_PRIVATE_KEY = "0x" + "ab" * 32 + + +class TestDeepHashBlobStream: + """Tests for deep_hash_blob_stream function.""" + + def test_matches_in_memory_deep_hash(self): + """Stream hash should match in-memory hash for same data.""" + test_data = b"Hello, this is test data for streaming hash verification!" * 100 + stream = io.BytesIO(test_data) + + in_memory_hash = deep_hash(test_data) + stream_hash = deep_hash_blob_stream(stream, len(test_data)) + + assert in_memory_hash == stream_hash + + def test_empty_data(self): + """Should handle empty data correctly.""" + test_data = b"" + stream = io.BytesIO(test_data) + + in_memory_hash = deep_hash(test_data) + stream_hash = deep_hash_blob_stream(stream, len(test_data)) + + assert in_memory_hash == stream_hash + + def test_single_byte(self): + """Should handle single byte data.""" + test_data = b"x" + stream = io.BytesIO(test_data) + + in_memory_hash = deep_hash(test_data) + stream_hash = deep_hash_blob_stream(stream, len(test_data)) + + assert in_memory_hash == stream_hash + + def test_large_data(self): + """Should handle data larger than chunk size.""" + # Create data larger than default chunk size (256 KiB) + test_data = b"x" * (DEFAULT_STREAM_CHUNK_SIZE * 3 + 1000) + stream = io.BytesIO(test_data) + + in_memory_hash = deep_hash(test_data) + stream_hash = deep_hash_blob_stream(stream, len(test_data)) + + assert in_memory_hash == stream_hash + + def test_custom_chunk_size(self): + """Should work with custom chunk sizes.""" + test_data = b"Test data for custom chunk size" * 100 + stream = io.BytesIO(test_data) + + in_memory_hash = deep_hash(test_data) + stream_hash = deep_hash_blob_stream(stream, len(test_data), chunk_size=64) + + assert in_memory_hash == stream_hash + + def test_progress_callback_called(self): + """Progress callback should be called during hashing.""" + test_data = b"x" * 5000 + stream = io.BytesIO(test_data) + progress_calls = [] + + def on_progress(processed, total): + progress_calls.append((processed, total)) + + deep_hash_blob_stream(stream, len(test_data), chunk_size=1024, on_progress=on_progress) + + assert len(progress_calls) > 0 + assert progress_calls[-1] == (len(test_data), len(test_data)) + + def test_progress_callback_increments(self): + """Progress should increment correctly.""" + test_data = b"x" * 3000 + stream = io.BytesIO(test_data) + progress_calls = [] + + def on_progress(processed, total): + progress_calls.append((processed, total)) + + deep_hash_blob_stream(stream, len(test_data), chunk_size=1000, on_progress=on_progress) + + # Should have 3 calls for 3000 bytes with 1000 byte chunks + assert len(progress_calls) == 3 + assert progress_calls[0] == (1000, 3000) + assert progress_calls[1] == (2000, 3000) + assert progress_calls[2] == (3000, 3000) + + def test_raises_on_premature_stream_end(self): + """Should raise error if stream ends before expected size.""" + test_data = b"short" + stream = io.BytesIO(test_data) + + with pytest.raises(ValueError, match="Stream ended prematurely"): + deep_hash_blob_stream(stream, len(test_data) + 100) + + +class TestGetSignatureDataStream: + """Tests for get_signature_data_stream function.""" + + def test_matches_in_memory_signature_data(self): + """Stream signature data should match in-memory version.""" + sig_type = 3 # Ethereum + raw_owner = b"x" * 65 + raw_target = b"" + raw_anchor = b"a" * 32 + raw_tags = b"" + data = b"Test payload data" * 500 + + # Create mock dataitem for in-memory comparison + class MockDataItem: + signature_type = sig_type + raw_owner = b"x" * 65 + raw_target = b"" + raw_anchor = b"a" * 32 + raw_tags = b"" + raw_data = data + + mock_item = MockDataItem() + in_memory_hash = get_signature_data(mock_item) + + data_stream = io.BytesIO(data) + stream_hash = get_signature_data_stream( + signature_type=sig_type, + raw_owner=raw_owner, + raw_target=raw_target, + raw_anchor=raw_anchor, + raw_tags=raw_tags, + data_stream=data_stream, + data_size=len(data), + ) + + assert in_memory_hash == stream_hash + + def test_with_tags(self): + """Should work correctly with encoded tags.""" + sig_type = 3 + raw_owner = b"x" * 65 + raw_target = b"" + raw_anchor = b"a" * 32 + raw_tags = encode_tags([{"name": "Content-Type", "value": "text/plain"}]) + data = b"Data with tags" + + class MockDataItem: + signature_type = sig_type + raw_owner = b"x" * 65 + raw_target = b"" + raw_anchor = b"a" * 32 + raw_tags = encode_tags([{"name": "Content-Type", "value": "text/plain"}]) + raw_data = data + + mock_item = MockDataItem() + in_memory_hash = get_signature_data(mock_item) + + data_stream = io.BytesIO(data) + stream_hash = get_signature_data_stream( + signature_type=sig_type, + raw_owner=raw_owner, + raw_target=raw_target, + raw_anchor=raw_anchor, + raw_tags=raw_tags, + data_stream=data_stream, + data_size=len(data), + ) + + assert in_memory_hash == stream_hash + + def test_with_target(self): + """Should work correctly with target address.""" + sig_type = 3 + raw_owner = b"x" * 65 + raw_target = b"t" * 32 # 32-byte target + raw_anchor = b"a" * 32 + raw_tags = b"" + data = b"Data with target" + + class MockDataItem: + signature_type = sig_type + raw_owner = b"x" * 65 + raw_target = b"t" * 32 + raw_anchor = b"a" * 32 + raw_tags = b"" + raw_data = data + + mock_item = MockDataItem() + in_memory_hash = get_signature_data(mock_item) + + data_stream = io.BytesIO(data) + stream_hash = get_signature_data_stream( + signature_type=sig_type, + raw_owner=raw_owner, + raw_target=raw_target, + raw_anchor=raw_anchor, + raw_tags=raw_tags, + data_stream=data_stream, + data_size=len(data), + ) + + assert in_memory_hash == stream_hash + + +class TestSignStream: + """Tests for sign_stream function.""" + + @pytest.fixture + def signer(self): + """Create a real Ethereum signer for testing.""" + return EthereumSigner(TEST_PRIVATE_KEY) + + def test_matches_in_memory_sign(self, signer): + """Stream signing should produce identical signature to in-memory signing.""" + test_data = b"Hello Arweave! " * 1000 + tags = [{"name": "Content-Type", "value": "text/plain"}] + + # In-memory signing via DataItem + data_item = create_data(bytearray(test_data), signer, tags) + sign(data_item, signer) + in_memory_signature = data_item.raw_signature + + # Stream signing + encoded_tags = encode_tags(tags) + stream = io.BytesIO(test_data) + + stream_signature = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=data_item.raw_anchor, # Use same anchor + raw_tags=encoded_tags, + data_stream=stream, + data_size=len(test_data), + signer=signer, + ) + + assert in_memory_signature == stream_signature + + def test_with_progress_callback(self, signer): + """Progress callback should work during stream signing.""" + test_data = b"x" * 10000 + progress_calls = [] + + def on_progress(processed, total): + progress_calls.append((processed, total)) + + stream = io.BytesIO(test_data) + + sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=b"a" * 32, + raw_tags=b"", + data_stream=stream, + data_size=len(test_data), + signer=signer, + chunk_size=1000, + on_progress=on_progress, + ) + + assert len(progress_calls) == 10 + assert progress_calls[-1] == (10000, 10000) + + def test_different_data_produces_different_signature(self, signer): + """Different data should produce different signatures.""" + data1 = b"First data payload" + data2 = b"Second data payload" + anchor = b"a" * 32 + + stream1 = io.BytesIO(data1) + sig1 = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=anchor, + raw_tags=b"", + data_stream=stream1, + data_size=len(data1), + signer=signer, + ) + + stream2 = io.BytesIO(data2) + sig2 = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=anchor, + raw_tags=b"", + data_stream=stream2, + data_size=len(data2), + signer=signer, + ) + + assert sig1 != sig2 + + +class TestDefaultChunkSize: + """Tests for default chunk size constant.""" + + def test_default_chunk_size_is_256_kib(self): + """Default chunk size should be 256 KiB to match Arweave.""" + assert DEFAULT_STREAM_CHUNK_SIZE == 256 * 1024 diff --git a/turbo_sdk/bundle/__init__.py b/turbo_sdk/bundle/__init__.py index d4e22fb..2e87e29 100644 --- a/turbo_sdk/bundle/__init__.py +++ b/turbo_sdk/bundle/__init__.py @@ -1,7 +1,15 @@ from .constants import SIG_CONFIG, MAX_TAG_BYTES, MIN_BINARY_SIZE from .dataitem import DataItem from .create import create_data -from .sign import sign, deep_hash, get_signature_data +from .sign import ( + sign, + deep_hash, + get_signature_data, + sign_stream, + get_signature_data_stream, + deep_hash_blob_stream, + DEFAULT_STREAM_CHUNK_SIZE, +) from .tags import encode_tags, decode_tags from .utils import set_bytes, byte_array_to_long @@ -14,6 +22,10 @@ "sign", "deep_hash", "get_signature_data", + "sign_stream", + "get_signature_data_stream", + "deep_hash_blob_stream", + "DEFAULT_STREAM_CHUNK_SIZE", "encode_tags", "decode_tags", "set_bytes", diff --git a/turbo_sdk/bundle/sign.py b/turbo_sdk/bundle/sign.py index 4a18de6..3b770c3 100644 --- a/turbo_sdk/bundle/sign.py +++ b/turbo_sdk/bundle/sign.py @@ -1,4 +1,5 @@ import hashlib +from typing import BinaryIO, Callable, Optional def deep_hash(data) -> bytearray: @@ -54,3 +55,181 @@ def sign(dataitem, signer): signature = signer.sign(signature_data) dataitem.set_signature(signature) return hashlib.sha256(signature).digest() + + +# Default chunk size for streaming: 256 KiB (matches Arweave chunk size) +DEFAULT_STREAM_CHUNK_SIZE = 256 * 1024 + + +def deep_hash_blob_stream( + stream: BinaryIO, + data_size: int, + chunk_size: int = DEFAULT_STREAM_CHUNK_SIZE, + on_progress: Optional[Callable[[int, int], None]] = None, +) -> bytes: + """ + Compute deep hash of a blob by streaming data without loading it all into memory. + + Args: + stream: A file-like object supporting read() + data_size: Total size of the data in bytes (must be known upfront) + chunk_size: Size of chunks to read at a time (default 64KB) + on_progress: Optional callback(processed_bytes, total_bytes) called after each chunk + + Returns: + The deep hash as bytes + """ + # Compute tag hash - requires knowing size upfront + tag = b"blob" + str(data_size).encode() + tag_hash = hashlib.sha384(tag).digest() + + # Stream data through SHA-384 + data_hasher = hashlib.sha384() + bytes_processed = 0 + + while bytes_processed < data_size: + remaining = data_size - bytes_processed + to_read = min(chunk_size, remaining) + chunk = stream.read(to_read) + + if not chunk: + raise ValueError( + f"Stream ended prematurely: expected {data_size} bytes, got {bytes_processed}" + ) + + data_hasher.update(chunk) + bytes_processed += len(chunk) + + if on_progress: + on_progress(bytes_processed, data_size) + + # Combine tag hash and data hash, then hash again + tagged_hash = tag_hash + data_hasher.digest() + return hashlib.sha384(tagged_hash).digest() + + +def deep_hash_chunks_with_final_stream( + chunks: list, + acc: bytes, + final_stream: BinaryIO, + final_size: int, + chunk_size: int = DEFAULT_STREAM_CHUNK_SIZE, + on_progress: Optional[Callable[[int, int], None]] = None, +) -> bytes: + """ + Process deep hash chunks where the final element is a stream. + + This processes all chunks except the last one normally, then streams the final chunk. + """ + if len(chunks) == 0: + # No more regular chunks, process the stream as final element + final_hash = deep_hash_blob_stream(final_stream, final_size, chunk_size, on_progress) + hash_pair = acc + final_hash + return hashlib.sha384(hash_pair).digest() + + # Process current chunk normally + hash_pair = acc + deep_hash(chunks[0]) + new_acc = hashlib.sha384(hash_pair).digest() + return deep_hash_chunks_with_final_stream( + chunks[1:], new_acc, final_stream, final_size, chunk_size, on_progress + ) + + +def get_signature_data_stream( + signature_type: int, + raw_owner: bytes, + raw_target: bytes, + raw_anchor: bytes, + raw_tags: bytes, + data_stream: BinaryIO, + data_size: int, + chunk_size: int = DEFAULT_STREAM_CHUNK_SIZE, + on_progress: Optional[Callable[[int, int], None]] = None, +) -> bytes: + """ + Compute signature data hash with streaming support for the data portion. + + This is equivalent to get_signature_data() but streams the data instead of + loading it all into memory. + + Args: + signature_type: The signature type (1 for Arweave, 3 for Ethereum) + raw_owner: The raw owner/public key bytes + raw_target: The raw target bytes (empty if none) + raw_anchor: The raw anchor bytes (empty if none) + raw_tags: The raw encoded tags bytes + data_stream: A file-like object for reading the data + data_size: Total size of the data in bytes + chunk_size: Size of chunks to read (default 64KB) + on_progress: Optional callback(processed_bytes, total_bytes) + + Returns: + The signature data hash as bytes + """ + # All elements except raw_data - these are processed normally + prefix_elements = [ + "dataitem", + "1", + str(signature_type), + raw_owner, + raw_target, + raw_anchor, + raw_tags, + ] + + # Start the list deep hash + tag = b"list" + str(len(prefix_elements) + 1).encode() # +1 for the streamed data + acc = hashlib.sha384(tag).digest() + + # Process prefix elements, then stream the final data element + return deep_hash_chunks_with_final_stream( + prefix_elements, acc, data_stream, data_size, chunk_size, on_progress + ) + + +def sign_stream( + signature_type: int, + raw_owner: bytes, + raw_target: bytes, + raw_anchor: bytes, + raw_tags: bytes, + data_stream: BinaryIO, + data_size: int, + signer, + chunk_size: int = DEFAULT_STREAM_CHUNK_SIZE, + on_progress: Optional[Callable[[int, int], None]] = None, +) -> bytes: + """ + Sign streaming data without loading the entire payload into memory. + + This computes the signature by streaming the data through the deep hash algorithm, + then signs the result. + + Args: + signature_type: The signature type (1 for Arweave, 3 for Ethereum) + raw_owner: The raw owner/public key bytes + raw_target: The raw target bytes (empty if none) + raw_anchor: The raw anchor bytes (empty if none) + raw_tags: The raw encoded tags bytes + data_stream: A file-like object for reading the data + data_size: Total size of the data in bytes + signer: The signer instance with a sign() method + chunk_size: Size of chunks to read (default 64KB) + on_progress: Optional callback(processed_bytes, total_bytes) for signing progress + + Returns: + The signature bytes + """ + signature_data = get_signature_data_stream( + signature_type=signature_type, + raw_owner=raw_owner, + raw_target=raw_target, + raw_anchor=raw_anchor, + raw_tags=raw_tags, + data_stream=data_stream, + data_size=data_size, + chunk_size=chunk_size, + on_progress=on_progress, + ) + + return signer.sign(signature_data)