diff --git a/tests/test_stream_signing.py b/tests/test_stream_signing.py new file mode 100644 index 0000000..f89d99b --- /dev/null +++ b/tests/test_stream_signing.py @@ -0,0 +1,323 @@ +""" +Unit tests for stream signing functionality. + +Tests verify that stream signing produces identical results to in-memory signing +and that progress callbacks work correctly. +""" + +import io +import pytest +from turbo_sdk.bundle import create_data, sign, encode_tags +from turbo_sdk.bundle.sign import ( + deep_hash, + deep_hash_blob_stream, + get_signature_data, + get_signature_data_stream, + sign_stream, + DEFAULT_STREAM_CHUNK_SIZE, +) +from turbo_sdk.signers import EthereumSigner + + +# Test private key (not a real key, just for testing) +TEST_PRIVATE_KEY = "0x" + "ab" * 32 + + +class TestDeepHashBlobStream: + """Tests for deep_hash_blob_stream function.""" + + def test_matches_in_memory_deep_hash(self): + """Stream hash should match in-memory hash for same data.""" + test_data = b"Hello, this is test data for streaming hash verification!" * 100 + stream = io.BytesIO(test_data) + + in_memory_hash = deep_hash(test_data) + stream_hash = deep_hash_blob_stream(stream, len(test_data)) + + assert in_memory_hash == stream_hash + + def test_empty_data(self): + """Should handle empty data correctly.""" + test_data = b"" + stream = io.BytesIO(test_data) + + in_memory_hash = deep_hash(test_data) + stream_hash = deep_hash_blob_stream(stream, len(test_data)) + + assert in_memory_hash == stream_hash + + def test_single_byte(self): + """Should handle single byte data.""" + test_data = b"x" + stream = io.BytesIO(test_data) + + in_memory_hash = deep_hash(test_data) + stream_hash = deep_hash_blob_stream(stream, len(test_data)) + + assert in_memory_hash == stream_hash + + def test_large_data(self): + """Should handle data larger than chunk size.""" + # Create data larger than default chunk size (256 KiB) + test_data = b"x" * (DEFAULT_STREAM_CHUNK_SIZE * 3 + 1000) + stream = io.BytesIO(test_data) + + in_memory_hash = deep_hash(test_data) + stream_hash = deep_hash_blob_stream(stream, len(test_data)) + + assert in_memory_hash == stream_hash + + def test_custom_chunk_size(self): + """Should work with custom chunk sizes.""" + test_data = b"Test data for custom chunk size" * 100 + stream = io.BytesIO(test_data) + + in_memory_hash = deep_hash(test_data) + stream_hash = deep_hash_blob_stream(stream, len(test_data), chunk_size=64) + + assert in_memory_hash == stream_hash + + def test_progress_callback_called(self): + """Progress callback should be called during hashing.""" + test_data = b"x" * 5000 + stream = io.BytesIO(test_data) + progress_calls = [] + + def on_progress(processed, total): + progress_calls.append((processed, total)) + + deep_hash_blob_stream(stream, len(test_data), chunk_size=1024, on_progress=on_progress) + + assert len(progress_calls) > 0 + assert progress_calls[-1] == (len(test_data), len(test_data)) + + def test_progress_callback_increments(self): + """Progress should increment correctly.""" + test_data = b"x" * 3000 + stream = io.BytesIO(test_data) + progress_calls = [] + + def on_progress(processed, total): + progress_calls.append((processed, total)) + + deep_hash_blob_stream(stream, len(test_data), chunk_size=1000, on_progress=on_progress) + + # Should have 3 calls for 3000 bytes with 1000 byte chunks + assert len(progress_calls) == 3 + assert progress_calls[0] == (1000, 3000) + assert progress_calls[1] == (2000, 3000) + assert progress_calls[2] == (3000, 3000) + + def test_raises_on_premature_stream_end(self): + """Should raise error if stream ends before expected size.""" + test_data = b"short" + stream = io.BytesIO(test_data) + + with pytest.raises(ValueError, match="Stream ended prematurely"): + deep_hash_blob_stream(stream, len(test_data) + 100) + + +class TestGetSignatureDataStream: + """Tests for get_signature_data_stream function.""" + + def test_matches_in_memory_signature_data(self): + """Stream signature data should match in-memory version.""" + sig_type = 3 # Ethereum + raw_owner = b"x" * 65 + raw_target = b"" + raw_anchor = b"a" * 32 + raw_tags = b"" + data = b"Test payload data" * 500 + + # Create mock dataitem for in-memory comparison + class MockDataItem: + signature_type = sig_type + raw_owner = b"x" * 65 + raw_target = b"" + raw_anchor = b"a" * 32 + raw_tags = b"" + raw_data = data + + mock_item = MockDataItem() + in_memory_hash = get_signature_data(mock_item) + + data_stream = io.BytesIO(data) + stream_hash = get_signature_data_stream( + signature_type=sig_type, + raw_owner=raw_owner, + raw_target=raw_target, + raw_anchor=raw_anchor, + raw_tags=raw_tags, + data_stream=data_stream, + data_size=len(data), + ) + + assert in_memory_hash == stream_hash + + def test_with_tags(self): + """Should work correctly with encoded tags.""" + sig_type = 3 + raw_owner = b"x" * 65 + raw_target = b"" + raw_anchor = b"a" * 32 + raw_tags = encode_tags([{"name": "Content-Type", "value": "text/plain"}]) + data = b"Data with tags" + + class MockDataItem: + signature_type = sig_type + raw_owner = b"x" * 65 + raw_target = b"" + raw_anchor = b"a" * 32 + raw_tags = encode_tags([{"name": "Content-Type", "value": "text/plain"}]) + raw_data = data + + mock_item = MockDataItem() + in_memory_hash = get_signature_data(mock_item) + + data_stream = io.BytesIO(data) + stream_hash = get_signature_data_stream( + signature_type=sig_type, + raw_owner=raw_owner, + raw_target=raw_target, + raw_anchor=raw_anchor, + raw_tags=raw_tags, + data_stream=data_stream, + data_size=len(data), + ) + + assert in_memory_hash == stream_hash + + def test_with_target(self): + """Should work correctly with target address.""" + sig_type = 3 + raw_owner = b"x" * 65 + raw_target = b"t" * 32 # 32-byte target + raw_anchor = b"a" * 32 + raw_tags = b"" + data = b"Data with target" + + class MockDataItem: + signature_type = sig_type + raw_owner = b"x" * 65 + raw_target = b"t" * 32 + raw_anchor = b"a" * 32 + raw_tags = b"" + raw_data = data + + mock_item = MockDataItem() + in_memory_hash = get_signature_data(mock_item) + + data_stream = io.BytesIO(data) + stream_hash = get_signature_data_stream( + signature_type=sig_type, + raw_owner=raw_owner, + raw_target=raw_target, + raw_anchor=raw_anchor, + raw_tags=raw_tags, + data_stream=data_stream, + data_size=len(data), + ) + + assert in_memory_hash == stream_hash + + +class TestSignStream: + """Tests for sign_stream function.""" + + @pytest.fixture + def signer(self): + """Create a real Ethereum signer for testing.""" + return EthereumSigner(TEST_PRIVATE_KEY) + + def test_matches_in_memory_sign(self, signer): + """Stream signing should produce identical signature to in-memory signing.""" + test_data = b"Hello Arweave! " * 1000 + tags = [{"name": "Content-Type", "value": "text/plain"}] + + # In-memory signing via DataItem + data_item = create_data(bytearray(test_data), signer, tags) + sign(data_item, signer) + in_memory_signature = data_item.raw_signature + + # Stream signing + encoded_tags = encode_tags(tags) + stream = io.BytesIO(test_data) + + stream_signature = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=data_item.raw_anchor, # Use same anchor + raw_tags=encoded_tags, + data_stream=stream, + data_size=len(test_data), + signer=signer, + ) + + assert in_memory_signature == stream_signature + + def test_with_progress_callback(self, signer): + """Progress callback should work during stream signing.""" + test_data = b"x" * 10000 + progress_calls = [] + + def on_progress(processed, total): + progress_calls.append((processed, total)) + + stream = io.BytesIO(test_data) + + sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=b"a" * 32, + raw_tags=b"", + data_stream=stream, + data_size=len(test_data), + signer=signer, + chunk_size=1000, + on_progress=on_progress, + ) + + assert len(progress_calls) == 10 + assert progress_calls[-1] == (10000, 10000) + + def test_different_data_produces_different_signature(self, signer): + """Different data should produce different signatures.""" + data1 = b"First data payload" + data2 = b"Second data payload" + anchor = b"a" * 32 + + stream1 = io.BytesIO(data1) + sig1 = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=anchor, + raw_tags=b"", + data_stream=stream1, + data_size=len(data1), + signer=signer, + ) + + stream2 = io.BytesIO(data2) + sig2 = sign_stream( + signature_type=signer.signature_type, + raw_owner=signer.public_key, + raw_target=b"", + raw_anchor=anchor, + raw_tags=b"", + data_stream=stream2, + data_size=len(data2), + signer=signer, + ) + + assert sig1 != sig2 + + +class TestDefaultChunkSize: + """Tests for default chunk size constant.""" + + def test_default_chunk_size_is_256_kib(self): + """Default chunk size should be 256 KiB to match Arweave.""" + assert DEFAULT_STREAM_CHUNK_SIZE == 256 * 1024 diff --git a/turbo_sdk/bundle/__init__.py b/turbo_sdk/bundle/__init__.py index d4e22fb..2e87e29 100644 --- a/turbo_sdk/bundle/__init__.py +++ b/turbo_sdk/bundle/__init__.py @@ -1,7 +1,15 @@ from .constants import SIG_CONFIG, MAX_TAG_BYTES, MIN_BINARY_SIZE from .dataitem import DataItem from .create import create_data -from .sign import sign, deep_hash, get_signature_data +from .sign import ( + sign, + deep_hash, + get_signature_data, + sign_stream, + get_signature_data_stream, + deep_hash_blob_stream, + DEFAULT_STREAM_CHUNK_SIZE, +) from .tags import encode_tags, decode_tags from .utils import set_bytes, byte_array_to_long @@ -14,6 +22,10 @@ "sign", "deep_hash", "get_signature_data", + "sign_stream", + "get_signature_data_stream", + "deep_hash_blob_stream", + "DEFAULT_STREAM_CHUNK_SIZE", "encode_tags", "decode_tags", "set_bytes", diff --git a/turbo_sdk/bundle/sign.py b/turbo_sdk/bundle/sign.py index 4a18de6..3b770c3 100644 --- a/turbo_sdk/bundle/sign.py +++ b/turbo_sdk/bundle/sign.py @@ -1,4 +1,5 @@ import hashlib +from typing import BinaryIO, Callable, Optional def deep_hash(data) -> bytearray: @@ -54,3 +55,181 @@ def sign(dataitem, signer): signature = signer.sign(signature_data) dataitem.set_signature(signature) return hashlib.sha256(signature).digest() + + +# Default chunk size for streaming: 256 KiB (matches Arweave chunk size) +DEFAULT_STREAM_CHUNK_SIZE = 256 * 1024 + + +def deep_hash_blob_stream( + stream: BinaryIO, + data_size: int, + chunk_size: int = DEFAULT_STREAM_CHUNK_SIZE, + on_progress: Optional[Callable[[int, int], None]] = None, +) -> bytes: + """ + Compute deep hash of a blob by streaming data without loading it all into memory. + + Args: + stream: A file-like object supporting read() + data_size: Total size of the data in bytes (must be known upfront) + chunk_size: Size of chunks to read at a time (default 64KB) + on_progress: Optional callback(processed_bytes, total_bytes) called after each chunk + + Returns: + The deep hash as bytes + """ + # Compute tag hash - requires knowing size upfront + tag = b"blob" + str(data_size).encode() + tag_hash = hashlib.sha384(tag).digest() + + # Stream data through SHA-384 + data_hasher = hashlib.sha384() + bytes_processed = 0 + + while bytes_processed < data_size: + remaining = data_size - bytes_processed + to_read = min(chunk_size, remaining) + chunk = stream.read(to_read) + + if not chunk: + raise ValueError( + f"Stream ended prematurely: expected {data_size} bytes, got {bytes_processed}" + ) + + data_hasher.update(chunk) + bytes_processed += len(chunk) + + if on_progress: + on_progress(bytes_processed, data_size) + + # Combine tag hash and data hash, then hash again + tagged_hash = tag_hash + data_hasher.digest() + return hashlib.sha384(tagged_hash).digest() + + +def deep_hash_chunks_with_final_stream( + chunks: list, + acc: bytes, + final_stream: BinaryIO, + final_size: int, + chunk_size: int = DEFAULT_STREAM_CHUNK_SIZE, + on_progress: Optional[Callable[[int, int], None]] = None, +) -> bytes: + """ + Process deep hash chunks where the final element is a stream. + + This processes all chunks except the last one normally, then streams the final chunk. + """ + if len(chunks) == 0: + # No more regular chunks, process the stream as final element + final_hash = deep_hash_blob_stream(final_stream, final_size, chunk_size, on_progress) + hash_pair = acc + final_hash + return hashlib.sha384(hash_pair).digest() + + # Process current chunk normally + hash_pair = acc + deep_hash(chunks[0]) + new_acc = hashlib.sha384(hash_pair).digest() + return deep_hash_chunks_with_final_stream( + chunks[1:], new_acc, final_stream, final_size, chunk_size, on_progress + ) + + +def get_signature_data_stream( + signature_type: int, + raw_owner: bytes, + raw_target: bytes, + raw_anchor: bytes, + raw_tags: bytes, + data_stream: BinaryIO, + data_size: int, + chunk_size: int = DEFAULT_STREAM_CHUNK_SIZE, + on_progress: Optional[Callable[[int, int], None]] = None, +) -> bytes: + """ + Compute signature data hash with streaming support for the data portion. + + This is equivalent to get_signature_data() but streams the data instead of + loading it all into memory. + + Args: + signature_type: The signature type (1 for Arweave, 3 for Ethereum) + raw_owner: The raw owner/public key bytes + raw_target: The raw target bytes (empty if none) + raw_anchor: The raw anchor bytes (empty if none) + raw_tags: The raw encoded tags bytes + data_stream: A file-like object for reading the data + data_size: Total size of the data in bytes + chunk_size: Size of chunks to read (default 64KB) + on_progress: Optional callback(processed_bytes, total_bytes) + + Returns: + The signature data hash as bytes + """ + # All elements except raw_data - these are processed normally + prefix_elements = [ + "dataitem", + "1", + str(signature_type), + raw_owner, + raw_target, + raw_anchor, + raw_tags, + ] + + # Start the list deep hash + tag = b"list" + str(len(prefix_elements) + 1).encode() # +1 for the streamed data + acc = hashlib.sha384(tag).digest() + + # Process prefix elements, then stream the final data element + return deep_hash_chunks_with_final_stream( + prefix_elements, acc, data_stream, data_size, chunk_size, on_progress + ) + + +def sign_stream( + signature_type: int, + raw_owner: bytes, + raw_target: bytes, + raw_anchor: bytes, + raw_tags: bytes, + data_stream: BinaryIO, + data_size: int, + signer, + chunk_size: int = DEFAULT_STREAM_CHUNK_SIZE, + on_progress: Optional[Callable[[int, int], None]] = None, +) -> bytes: + """ + Sign streaming data without loading the entire payload into memory. + + This computes the signature by streaming the data through the deep hash algorithm, + then signs the result. + + Args: + signature_type: The signature type (1 for Arweave, 3 for Ethereum) + raw_owner: The raw owner/public key bytes + raw_target: The raw target bytes (empty if none) + raw_anchor: The raw anchor bytes (empty if none) + raw_tags: The raw encoded tags bytes + data_stream: A file-like object for reading the data + data_size: Total size of the data in bytes + signer: The signer instance with a sign() method + chunk_size: Size of chunks to read (default 64KB) + on_progress: Optional callback(processed_bytes, total_bytes) for signing progress + + Returns: + The signature bytes + """ + signature_data = get_signature_data_stream( + signature_type=signature_type, + raw_owner=raw_owner, + raw_target=raw_target, + raw_anchor=raw_anchor, + raw_tags=raw_tags, + data_stream=data_stream, + data_size=data_size, + chunk_size=chunk_size, + on_progress=on_progress, + ) + + return signer.sign(signature_data)