From 8dd6568d429984775aeacfdf46da73d05386f6c2 Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Fri, 19 Sep 2025 20:35:59 -0700 Subject: [PATCH 01/11] fix: 1 --- src/c2pa/c2pa.py | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index 7d920e61..64771fa8 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -336,6 +336,19 @@ def _setup_function(func, argtypes, restype=None): None ) +# Add dummy context functions +_setup_function( + _lib.c2pa_create_dummy_context, + [], + ctypes.POINTER(StreamContext) +) + +_setup_function( + _lib.c2pa_free_dummy_context, + [ctypes.POINTER(StreamContext)], + None +) + # Set up function prototypes not attached to an API object _setup_function(_lib.c2pa_version, [], ctypes.c_void_p) _setup_function(_lib.c2pa_error, [], ctypes.c_void_p) @@ -1123,9 +1136,12 @@ def flush_callback(ctx): self._write_cb = WriteCallback(write_callback) self._flush_cb = FlushCallback(flush_callback) + # Create a dummy context since Python callbacks don't use it + self._dummy_context = _lib.c2pa_create_dummy_context() + # Create the stream self._stream = _lib.c2pa_create_stream( - None, # context + self._dummy_context, # context self._read_cb, self._seek_cb, self._write_cb, @@ -1170,6 +1186,15 @@ def __del__(self): self._stream = None self._closed = True self._initialized = False + + # Clean up dummy context + if hasattr(self, '_dummy_context') and self._dummy_context: + try: + _lib.c2pa_free_dummy_context(self._dummy_context) + except Exception: + pass + finally: + self._dummy_context = None except Exception: # Destructors must not raise exceptions pass @@ -1200,6 +1225,17 @@ def close(self): finally: self._stream = None + # Clean up dummy context + if hasattr(self, '_dummy_context') and self._dummy_context: + try: + _lib.c2pa_free_dummy_context(self._dummy_context) + except Exception as e: + logger.error( + Stream._ERROR_MESSAGES['callback_error'].format( + 'dummy_context', str(e))) + finally: + self._dummy_context = None + # Clean up callbacks for attr in ['_read_cb', '_seek_cb', '_write_cb', '_flush_cb']: if hasattr(self, attr): From 4979476d43d32ecaa5ecd87c17e99a5964af74f0 Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Fri, 19 Sep 2025 20:59:19 -0700 Subject: [PATCH 02/11] fix: Update c2pa.py code --- src/c2pa/c2pa.py | 37 +++---------------------------------- 1 file changed, 3 insertions(+), 34 deletions(-) diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index 64771fa8..3882a103 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -336,19 +336,6 @@ def _setup_function(func, argtypes, restype=None): None ) -# Add dummy context functions -_setup_function( - _lib.c2pa_create_dummy_context, - [], - ctypes.POINTER(StreamContext) -) - -_setup_function( - _lib.c2pa_free_dummy_context, - [ctypes.POINTER(StreamContext)], - None -) - # Set up function prototypes not attached to an API object _setup_function(_lib.c2pa_version, [], ctypes.c_void_p) _setup_function(_lib.c2pa_error, [], ctypes.c_void_p) @@ -1137,11 +1124,12 @@ def flush_callback(ctx): self._flush_cb = FlushCallback(flush_callback) # Create a dummy context since Python callbacks don't use it - self._dummy_context = _lib.c2pa_create_dummy_context() + # We create a small buffer and cast it to a StreamContext pointer + self._dummy_context = ctypes.create_string_buffer(1) # Create a 1-byte buffer # Create the stream self._stream = _lib.c2pa_create_stream( - self._dummy_context, # context + ctypes.cast(self._dummy_context, ctypes.POINTER(StreamContext)), # context self._read_cb, self._seek_cb, self._write_cb, @@ -1186,15 +1174,6 @@ def __del__(self): self._stream = None self._closed = True self._initialized = False - - # Clean up dummy context - if hasattr(self, '_dummy_context') and self._dummy_context: - try: - _lib.c2pa_free_dummy_context(self._dummy_context) - except Exception: - pass - finally: - self._dummy_context = None except Exception: # Destructors must not raise exceptions pass @@ -1225,16 +1204,6 @@ def close(self): finally: self._stream = None - # Clean up dummy context - if hasattr(self, '_dummy_context') and self._dummy_context: - try: - _lib.c2pa_free_dummy_context(self._dummy_context) - except Exception as e: - logger.error( - Stream._ERROR_MESSAGES['callback_error'].format( - 'dummy_context', str(e))) - finally: - self._dummy_context = None # Clean up callbacks for attr in ['_read_cb', '_seek_cb', '_write_cb', '_flush_cb']: From 60c0bac166e8abcabab0fdea14b446f538763e07 Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Sat, 20 Sep 2025 20:44:55 -0700 Subject: [PATCH 03/11] fix: Renamings --- src/c2pa/c2pa.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index 3882a103..b63afaa0 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -1123,13 +1123,14 @@ def flush_callback(ctx): self._write_cb = WriteCallback(write_callback) self._flush_cb = FlushCallback(flush_callback) - # Create a dummy context since Python callbacks don't use it - # We create a small buffer and cast it to a StreamContext pointer - self._dummy_context = ctypes.create_string_buffer(1) # Create a 1-byte buffer - + # Create a (placeholder) context we don't actually use the context, + # but we need having a valid context pointer. Therefore, we create + # a small buffer and cast it to a StreamContext pointer + self._placeholder_context = ctypes.create_string_buffer(1) + # Create the stream self._stream = _lib.c2pa_create_stream( - ctypes.cast(self._dummy_context, ctypes.POINTER(StreamContext)), # context + ctypes.cast(self._placeholder_context, ctypes.POINTER(StreamContext)), # context self._read_cb, self._seek_cb, self._write_cb, From 31f94b574e4acae0ecc8884b47fa3c6380cafa26 Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Sat, 20 Sep 2025 20:48:41 -0700 Subject: [PATCH 04/11] fix: Format --- src/c2pa/c2pa.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index b63afaa0..c8524bc8 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -1123,14 +1123,14 @@ def flush_callback(ctx): self._write_cb = WriteCallback(write_callback) self._flush_cb = FlushCallback(flush_callback) - # Create a (placeholder) context we don't actually use the context, + # Create a placeholder context as we don't actually use the context, # but we need having a valid context pointer. Therefore, we create # a small buffer and cast it to a StreamContext pointer - self._placeholder_context = ctypes.create_string_buffer(1) + self._context = ctypes.create_string_buffer(1) # Create the stream self._stream = _lib.c2pa_create_stream( - ctypes.cast(self._placeholder_context, ctypes.POINTER(StreamContext)), # context + ctypes.cast(self._context, ctypes.POINTER(StreamContext)), self._read_cb, self._seek_cb, self._write_cb, @@ -1205,7 +1205,6 @@ def close(self): finally: self._stream = None - # Clean up callbacks for attr in ['_read_cb', '_seek_cb', '_write_cb', '_flush_cb']: if hasattr(self, attr): From 1d4ee359d4b56f31abd24da4dbc85fcc88e23f5f Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Sat, 20 Sep 2025 21:11:00 -0700 Subject: [PATCH 05/11] fix: UTF8 encoding --- src/c2pa/c2pa.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index c8524bc8..f4d723fd 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -567,8 +567,8 @@ def _convert_to_py_string(value) -> str: # Only if we got a valid pointer with valid content if ptr and ptr.value is not None: try: - py_string = ptr.value.decode('utf-8', errors='replace') - except Exception: + py_string = ptr.value.decode('utf-8', errors='strict') + except UnicodeDecodeError: py_string = "" finally: # Only free if we have a valid pointer From 36ac9a84655f841cd164b3512700ea3d15b180ac Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Sat, 20 Sep 2025 21:18:31 -0700 Subject: [PATCH 06/11] fix: Stream Id --- src/c2pa/c2pa.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index f4d723fd..79a2cf5b 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -23,6 +23,7 @@ import io from .lib import dynamically_load_library import mimetypes +from itertools import count # Create a module-specific logger logger = logging.getLogger("c2pa") @@ -926,16 +927,11 @@ def sign_file( class Stream: - # Class-level counter for generating unique stream IDs + # Class-level atomic counter for generating unique stream IDs # (useful for tracing streams usage in debug) - _next_stream_id = 0 + _stream_id_counter = count(start=0, step=1) + # Maximum value for a 32-bit signed integer (2^31 - 1) - # This prevents integer overflow which could cause: - # 1. Unexpected behavior in stream ID generation - # 2. Potential security issues if IDs wrap around - # 3. Memory issues if the number grows too large - # When this limit is reached, we reset to 0 since the timestamp component - # of the stream ID ensures uniqueness even after counter reset _MAX_STREAM_ID = 2**31 - 1 # Class-level error messages to avoid multiple creation @@ -972,11 +968,17 @@ def __init__(self, file_like_stream): self._initialized = False self._stream = None - # Generate unique stream ID using object ID and counter - if Stream._next_stream_id >= Stream._MAX_STREAM_ID: # pragma: no cover - Stream._next_stream_id = 0 - self._stream_id = f"{id(self)}-{Stream._next_stream_id}" - Stream._next_stream_id += 1 + # Generate unique stream ID using object ID and atomic counter + # Get next atomic counter value + stream_counter = next(Stream._stream_id_counter) + + # Handle counter overflow by resetting the counter + if stream_counter >= Stream._MAX_STREAM_ID: # pragma: no cover + # Reset the counter to 0 and get the next value + Stream._stream_id_counter = count(start=0, step=1) + stream_counter = next(Stream._stream_id_counter) + + self._stream_id = f"{id(self)}-{stream_counter}" # Rest of the existing initialization code... required_methods = ['read', 'write', 'seek', 'tell', 'flush'] From 6d096d1ce145a68ba4856ea62caf50e404ac710e Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Sat, 20 Sep 2025 21:24:45 -0700 Subject: [PATCH 07/11] fix: Clear key memory --- src/c2pa/c2pa.py | 57 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index 79a2cf5b..4674e2c2 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -2720,28 +2720,51 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes: C2paError: If there was an error signing the data C2paError.Encoding: If the private key contains invalid UTF-8 chars """ - data_array = (ctypes.c_ubyte * len(data))(*data) - try: - key_str = private_key.encode('utf-8') - except UnicodeError as e: - raise C2paError.Encoding( - f"Invalid UTF-8 characters in private key: {str(e)}") + # Validate input data + if not data: + raise C2paError("Data to sign cannot be empty") - signature_ptr = _lib.c2pa_ed25519_sign(data_array, len(data), key_str) + # Validate private key format + if not private_key or not isinstance(private_key, str): + raise C2paError("Private key must be a non-empty string") - if not signature_ptr: - error = _parse_operation_result_for_error(_lib.c2pa_error()) - if error: - raise C2paError(error) - raise C2paError("Failed to sign data with Ed25519") + # Create secure memory buffer for data + data_array = None + key_bytes = None try: - # Ed25519 signatures are always 64 bytes - signature = bytes(signature_ptr[:64]) - finally: - _lib.c2pa_signature_free(signature_ptr) + # Create data array with size validation + data_size = len(data) + data_array = (ctypes.c_ubyte * data_size)(*data) + + # Encode private key to bytes + try: + key_bytes = private_key.encode('utf-8') + except UnicodeError as e: + raise C2paError.Encoding( + f"Invalid UTF-8 characters in private key: {str(e)}") + + # Perform the signing operation + signature_ptr = _lib.c2pa_ed25519_sign(data_array, data_size, key_bytes) - return signature + if not signature_ptr: + error = _parse_operation_result_for_error(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to sign data with Ed25519") + + try: + # Ed25519 signatures are always 64 bytes + signature = bytes(signature_ptr[:64]) + finally: + _lib.c2pa_signature_free(signature_ptr) + + return signature + + finally: + if key_bytes: + ctypes.memset(key_bytes, 0, len(key_bytes)) + del key_bytes __all__ = [ From 88c348ba2601b54b5625104f8b97ff854b1c4b06 Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Sat, 20 Sep 2025 21:29:54 -0700 Subject: [PATCH 08/11] fix: Makefile changes --- Makefile | 1 + src/c2pa/c2pa.py | 127 ++++++++++++++++++++++++----------------------- 2 files changed, 66 insertions(+), 62 deletions(-) diff --git a/Makefile b/Makefile index 02105bff..bd4efbad 100644 --- a/Makefile +++ b/Makefile @@ -94,6 +94,7 @@ publish: release # Code analysis check-format: + python3 -m py_compile src/c2pa/c2pa.py flake8 src/c2pa/c2pa.py # Formats Python source code using autopep8 with aggressive settings diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index 4674e2c2..88c61124 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -1381,34 +1381,33 @@ def __init__(self, str(e))) try: - # Open the file and create a stream - file = open(path, 'rb') - self._own_stream = Stream(file) + # Use context manager for automatic file cleanup + with open(path, 'rb') as file: + self._own_stream = Stream(file) - self._reader = _lib.c2pa_reader_from_stream( - mime_type_str, - self._own_stream._stream - ) - - if not self._reader: - self._own_stream.close() - file.close() - error = _parse_operation_result_for_error( - _lib.c2pa_error()) - if error: - raise C2paError(error) - raise C2paError( - Reader._ERROR_MESSAGES['reader_error'].format( - "Unknown error" - ) + self._reader = _lib.c2pa_reader_from_stream( + mime_type_str, + self._own_stream._stream ) - # Store the file to close it later - self._backing_file = file + if not self._reader: + self._own_stream.close() + error = _parse_operation_result_for_error( + _lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError( + Reader._ERROR_MESSAGES['reader_error'].format( + "Unknown error" + ) + ) - self._initialized = True + # Store the file to close it later + self._backing_file = file + self._initialized = True except Exception as e: + # File automatically closed by context manager if self._own_stream: self._own_stream.close() if hasattr(self, '_backing_file') and self._backing_file: @@ -1427,50 +1426,50 @@ def __init__(self, f"Reader does not support {format_or_path}") try: - file = open(stream, 'rb') - self._own_stream = Stream(file) - - format_str = str(format_or_path) - format_bytes = format_str.encode('utf-8') - - if manifest_data is None: - self._reader = _lib.c2pa_reader_from_stream( - format_bytes, self._own_stream._stream) - else: - if not isinstance(manifest_data, bytes): - raise TypeError( - Reader._ERROR_MESSAGES['manifest_error']) - manifest_array = ( - ctypes.c_ubyte * - len(manifest_data))( - * - manifest_data) - self._reader = ( - _lib.c2pa_reader_from_manifest_data_and_stream( - format_bytes, - self._own_stream._stream, - manifest_array, - len(manifest_data), + # Use context manager for automatic file cleanup + with open(stream, 'rb') as file: + self._own_stream = Stream(file) + + format_str = str(format_or_path) + format_bytes = format_str.encode('utf-8') + + if manifest_data is None: + self._reader = _lib.c2pa_reader_from_stream( + format_bytes, self._own_stream._stream) + else: + if not isinstance(manifest_data, bytes): + raise TypeError( + Reader._ERROR_MESSAGES['manifest_error']) + manifest_array = ( + ctypes.c_ubyte * + len(manifest_data))( + * + manifest_data) + self._reader = ( + _lib.c2pa_reader_from_manifest_data_and_stream( + format_bytes, + self._own_stream._stream, + manifest_array, + len(manifest_data), + ) ) - ) - if not self._reader: - self._own_stream.close() - file.close() - error = _parse_operation_result_for_error( - _lib.c2pa_error()) - if error: - raise C2paError(error) - raise C2paError( - Reader._ERROR_MESSAGES['reader_error'].format( - "Unknown error" + if not self._reader: + self._own_stream.close() + error = _parse_operation_result_for_error( + _lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError( + Reader._ERROR_MESSAGES['reader_error'].format( + "Unknown error" + ) ) - ) - - self._backing_file = file - self._initialized = True + self._backing_file = file + self._initialized = True except Exception as e: + # File automatically closed by context manager if self._own_stream: self._own_stream.close() if hasattr(self, '_backing_file') and self._backing_file: @@ -2745,7 +2744,11 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes: f"Invalid UTF-8 characters in private key: {str(e)}") # Perform the signing operation - signature_ptr = _lib.c2pa_ed25519_sign(data_array, data_size, key_bytes) + signature_ptr = _lib.c2pa_ed25519_sign( + data_array, + data_size, + key_bytes + ) if not signature_ptr: error = _parse_operation_result_for_error(_lib.c2pa_error()) From a5e78b2b6dfc377edfad34bb91947e2549a788e0 Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Sat, 20 Sep 2025 21:57:27 -0700 Subject: [PATCH 09/11] fix: Update examples too --- examples/sign.py | 43 ++++++++++------------ examples/sign_info.py | 41 +++++++++------------ examples/training.py | 84 ++++++++++++++++++++----------------------- 3 files changed, 72 insertions(+), 96 deletions(-) diff --git a/examples/sign.py b/examples/sign.py index 070572a1..7182f99a 100644 --- a/examples/sign.py +++ b/examples/sign.py @@ -38,8 +38,10 @@ # Load certificates and private key (here from the test fixtures). # This is OK for development, but in production you should use a # secure way to load the certificates and private key. -certs = open(fixtures_dir + "es256_certs.pem", "rb").read() -key = open(fixtures_dir + "es256_private.key", "rb").read() +with open(fixtures_dir + "es256_certs.pem", "rb") as cert_file: + certs = cert_file.read() +with open(fixtures_dir + "es256_private.key", "rb") as key_file: + key = key_file.read() # Define a callback signer function def callback_signer_es256(data: bytes) -> bytes: @@ -55,14 +57,6 @@ def callback_signer_es256(data: bytes) -> bytes: ) return signature -# Create a signer using the callback function we defined -signer = c2pa.Signer.from_callback( - callback=callback_signer_es256, - alg=c2pa.C2paSigningAlg.ES256, - certs=certs.decode('utf-8'), - tsa_url="http://timestamp.digicert.com" -) - # Create a manifest definition as a dictionary. # This manifest follows the V2 manifest format. manifest_definition = { @@ -92,29 +86,28 @@ def callback_signer_es256(data: bytes) -> bytes: ] } -# Create the builder with the manifest definition -builder = c2pa.Builder(manifest_definition) - # Sign the image with the signer created above, # which will use the callback signer print("\nSigning the image file...") -builder.sign_file( - source_path=fixtures_dir + "A.jpg", - dest_path=output_dir + "A_signed.jpg", - signer=signer -) - -# Clean up -signer.close() -builder.close() +with c2pa.Signer.from_callback( + callback=callback_signer_es256, + alg=c2pa.C2paSigningAlg.ES256, + certs=certs.decode('utf-8'), + tsa_url="http://timestamp.digicert.com" +) as signer: + with c2pa.Builder(manifest_definition) as builder: + builder.sign_file( + source_path=fixtures_dir + "A.jpg", + dest_path=output_dir + "A_signed.jpg", + signer=signer + ) # Re-Read the signed image to verify print("\nReading signed image metadata:") with open(output_dir + "A_signed.jpg", "rb") as file: - reader = c2pa.Reader("image/jpeg", file) - print(reader.json()) - reader.close() + with c2pa.Reader("image/jpeg", file) as reader: + print(reader.json()) print("\nExample completed successfully!") diff --git a/examples/sign_info.py b/examples/sign_info.py index 51f28b33..0efa68d8 100644 --- a/examples/sign_info.py +++ b/examples/sign_info.py @@ -26,8 +26,7 @@ output_dir = os.path.join(os.path.dirname(__file__), "../output/") # Note: Builder, Reader, and Signer support being used as context managers -# (with 'with' statements), but this example shows manual usage which requires -# explicitly calling the close() function to clean up resources. +# (with 'with' statements) for proper resource management. # Ensure the output directory exists if not os.path.exists(output_dir): @@ -40,13 +39,14 @@ # Read existing C2PA metadata from the file print("\nReading existing C2PA metadata:") with open(fixtures_dir + "C.jpg", "rb") as file: - reader = c2pa.Reader("image/jpeg", file) - print(reader.json()) - reader.close() + with c2pa.Reader("image/jpeg", file) as reader: + print(reader.json()) # Create a signer from certificate and key files -certs = open(fixtures_dir + "es256_certs.pem", "rb").read() -key = open(fixtures_dir + "es256_private.key", "rb").read() +with open(fixtures_dir + "es256_certs.pem", "rb") as cert_file: + certs = cert_file.read() +with open(fixtures_dir + "es256_private.key", "rb") as key_file: + key = key_file.read() # Define Signer information signer_info = c2pa.C2paSignerInfo( @@ -56,9 +56,6 @@ ta_url=b"http://timestamp.digicert.com" # Use bytes and add timestamp URL ) -# Create the Signer from the information -signer = c2pa.Signer.from_info(signer_info) - # Create a manifest definition as a dictionary # This examples signs using a V1 manifest # Note that this is a v1 spec manifest (legacy) @@ -89,27 +86,21 @@ ] } -# Create the builder with the manifest definition -builder = c2pa.Builder(manifest_definition) - # Sign the image print("\nSigning the image...") -with open(fixtures_dir + "C.jpg", "rb") as source: - # File needs to be opened in write+read mode to be signed - # and verified properly. - with open(output_dir + "C_signed.jpg", "w+b") as dest: - result = builder.sign(signer, "image/jpeg", source, dest) +with c2pa.Signer.from_info(signer_info) as signer: + with c2pa.Builder(manifest_definition) as builder: + with open(fixtures_dir + "C.jpg", "rb") as source: + # File needs to be opened in write+read mode to be signed + # and verified properly. + with open(output_dir + "C_signed.jpg", "w+b") as dest: + result = builder.sign(signer, "image/jpeg", source, dest) # Read the signed image to verify print("\nReading signed image metadata:") with open(output_dir + "C_signed.jpg", "rb") as file: - reader = c2pa.Reader("image/jpeg", file) - print(reader.json()) - reader.close() - -# Clean up resources manually, since we are not using with statements -signer.close() -builder.close() + with c2pa.Reader("image/jpeg", file) as reader: + print(reader.json()) print("\nExample completed successfully!") diff --git a/examples/training.py b/examples/training.py index 48f4b22e..b07d47ab 100644 --- a/examples/training.py +++ b/examples/training.py @@ -93,8 +93,10 @@ def getitem(d, key): # V2 signing API example try: # Read the private key and certificate files - key = open(keyFile,"rb").read() - certs = open(pemFile,"rb").read() + with open(keyFile, "rb") as key_file: + key = key_file.read() + with open(pemFile, "rb") as cert_file: + certs = cert_file.read() # Create a signer using the new API signer_info = c2pa.C2paSignerInfo( @@ -103,66 +105,56 @@ def getitem(d, key): private_key=key, ta_url=b"http://timestamp.digicert.com" ) - signer = c2pa.Signer.from_info(signer_info) - # Create the builder - builder = c2pa.Builder(manifest_json) + with c2pa.Signer.from_info(signer_info) as signer: + with c2pa.Builder(manifest_json) as builder: + # Add the thumbnail resource using a stream + with open(fixtures_dir + "A_thumbnail.jpg", "rb") as thumbnail_file: + builder.add_resource("thumbnail", thumbnail_file) - # Add the thumbnail resource using a stream - with open(fixtures_dir + "A_thumbnail.jpg", "rb") as thumbnail_file: - builder.add_resource("thumbnail", thumbnail_file) + # Add the ingredient using the correct method + with open(fixtures_dir + "A_thumbnail.jpg", "rb") as ingredient_file: + builder.add_ingredient(json.dumps(ingredient_json), "image/jpeg", ingredient_file) - # Add the ingredient using the correct method - with open(fixtures_dir + "A_thumbnail.jpg", "rb") as ingredient_file: - builder.add_ingredient(json.dumps(ingredient_json), "image/jpeg", ingredient_file) + if os.path.exists(testOutputFile): + os.remove(testOutputFile) - if os.path.exists(testOutputFile): - os.remove(testOutputFile) + # Sign the file using the stream-based sign method + with open(testFile, "rb") as source_file: + with open(testOutputFile, "w+b") as dest_file: + result = builder.sign(signer, "image/jpeg", source_file, dest_file) - # Sign the file using the stream-based sign method - with open(testFile, "rb") as source_file: - with open(testOutputFile, "w+b") as dest_file: - result = builder.sign(signer, "image/jpeg", source_file, dest_file) - - # As an alternative, you can also use file paths directly during signing: - # builder.sign_file(testFile, testOutputFile, signer) - - # Clean up native resources (using a with statement works too!) - signer.close() - builder.close() + # As an alternative, you can also use file paths directly during signing: + # builder.sign_file(testFile, testOutputFile, signer) except Exception as err: - print("Exception during signing: ", err) + print(f"Exception during signing: {err}") -print("\nSuccessfully added do not train manifest to file " + testOutputFile) +print(f"\nSuccessfully added do not train manifest to file {testOutputFile}") # now verify the asset and check the manifest for a do not train assertion... allowed = True # opt out model, assume training is ok if the assertion doesn't exist try: # Create reader using the Reader API - reader = c2pa.Reader(testOutputFile) - - # Retrieve the manifest store - manifest_store = json.loads(reader.json()) - - # Look at data in the active manifest - manifest = manifest_store["manifests"][manifest_store["active_manifest"]] - for assertion in manifest["assertions"]: - if assertion["label"] == "cawg.training-mining": - if getitem(assertion, ("data","entries","cawg.ai_generative_training","use")) == "notAllowed": - allowed = False - - # Get the ingredient thumbnail and save it to a file using resource_to_stream - uri = getitem(manifest,("ingredients", 0, "thumbnail", "identifier")) - with open(output_dir + "thumbnail_v2.jpg", "wb") as thumbnail_output: - reader.resource_to_stream(uri, thumbnail_output) - - # Clean up native resources (using a with statement works too!) - reader.close() + with c2pa.Reader(testOutputFile) as reader: + # Retrieve the manifest store + manifest_store = json.loads(reader.json()) + + # Look at data in the active manifest + manifest = manifest_store["manifests"][manifest_store["active_manifest"]] + for assertion in manifest["assertions"]: + if assertion["label"] == "cawg.training-mining": + if getitem(assertion, ("data","entries","cawg.ai_generative_training","use")) == "notAllowed": + allowed = False + + # Get the ingredient thumbnail and save it to a file using resource_to_stream + uri = getitem(manifest,("ingredients", 0, "thumbnail", "identifier")) + with open(output_dir + "thumbnail_v2.jpg", "wb") as thumbnail_output: + reader.resource_to_stream(uri, thumbnail_output) except Exception as err: - print("Exception during assertions reading: ", err) + print(f"Exception during assertions reading: {err}") if allowed: print("Training is allowed") From bcdb7212f625a269a185c18f41b35c9659b79eb6 Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Sat, 20 Sep 2025 21:59:37 -0700 Subject: [PATCH 10/11] fix: Docs --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 1fd37e03..d5259e9d 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ import c2pa ## Examples See the [`examples` directory](https://github.com/contentauth/c2pa-python/tree/main/examples) for some helpful examples: + - `examples/sign.py` shows how to sign and verify an asset with a C2PA manifest. - `examples/training.py` demonstrates how to add a "Do Not Train" assertion to an asset and verify it. From a54a9d39e966418c5609d0a986a9115451b8cccc Mon Sep 17 00:00:00 2001 From: Tania Mathern Date: Sun, 21 Sep 2025 20:26:40 -0700 Subject: [PATCH 11/11] fix: COmment format --- src/c2pa/c2pa.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index 88c61124..303bc499 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -569,7 +569,7 @@ def _convert_to_py_string(value) -> str: if ptr and ptr.value is not None: try: py_string = ptr.value.decode('utf-8', errors='strict') - except UnicodeDecodeError: + except Exception: py_string = "" finally: # Only free if we have a valid pointer @@ -927,8 +927,8 @@ def sign_file( class Stream: - # Class-level atomic counter for generating unique stream IDs - # (useful for tracing streams usage in debug) + # Class-level somewhat atomic counter for generating + # unique stream IDs (useful for tracing streams usage in debug) _stream_id_counter = count(start=0, step=1) # Maximum value for a 32-bit signed integer (2^31 - 1) @@ -968,8 +968,7 @@ def __init__(self, file_like_stream): self._initialized = False self._stream = None - # Generate unique stream ID using object ID and atomic counter - # Get next atomic counter value + # Generate unique stream ID using object ID and counter stream_counter = next(Stream._stream_id_counter) # Handle counter overflow by resetting the counter @@ -1381,7 +1380,6 @@ def __init__(self, str(e))) try: - # Use context manager for automatic file cleanup with open(path, 'rb') as file: self._own_stream = Stream(file) @@ -1426,7 +1424,6 @@ def __init__(self, f"Reader does not support {format_or_path}") try: - # Use context manager for automatic file cleanup with open(stream, 'rb') as file: self._own_stream = Stream(file) @@ -1469,7 +1466,7 @@ def __init__(self, self._backing_file = file self._initialized = True except Exception as e: - # File automatically closed by context manager + # File closed by context manager if self._own_stream: self._own_stream.close() if hasattr(self, '_backing_file') and self._backing_file: @@ -2719,11 +2716,9 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes: C2paError: If there was an error signing the data C2paError.Encoding: If the private key contains invalid UTF-8 chars """ - # Validate input data if not data: raise C2paError("Data to sign cannot be empty") - # Validate private key format if not private_key or not isinstance(private_key, str): raise C2paError("Private key must be a non-empty string")