diff --git a/Makefile b/Makefile index 7802a319..a7d2fb95 100644 --- a/Makefile +++ b/Makefile @@ -94,6 +94,7 @@ publish: release # Code analysis check-format: + python3 -m py_compile src/c2pa/c2pa.py flake8 src/c2pa/c2pa.py # Formats Python source code using autopep8 with aggressive settings diff --git a/README.md b/README.md index 98c640de..d33472ee 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ import c2pa ## Examples See the [`examples` directory](https://github.com/contentauth/c2pa-python/tree/main/examples) for some helpful examples: + - `examples/sign.py` shows how to sign and verify an asset with a C2PA manifest. - `examples/training.py` demonstrates how to add a "Do Not Train" assertion to an asset and verify it. diff --git a/examples/sign.py b/examples/sign.py index 070572a1..7182f99a 100644 --- a/examples/sign.py +++ b/examples/sign.py @@ -38,8 +38,10 @@ # Load certificates and private key (here from the test fixtures). # This is OK for development, but in production you should use a # secure way to load the certificates and private key. -certs = open(fixtures_dir + "es256_certs.pem", "rb").read() -key = open(fixtures_dir + "es256_private.key", "rb").read() +with open(fixtures_dir + "es256_certs.pem", "rb") as cert_file: + certs = cert_file.read() +with open(fixtures_dir + "es256_private.key", "rb") as key_file: + key = key_file.read() # Define a callback signer function def callback_signer_es256(data: bytes) -> bytes: @@ -55,14 +57,6 @@ def callback_signer_es256(data: bytes) -> bytes: ) return signature -# Create a signer using the callback function we defined -signer = c2pa.Signer.from_callback( - callback=callback_signer_es256, - alg=c2pa.C2paSigningAlg.ES256, - certs=certs.decode('utf-8'), - tsa_url="http://timestamp.digicert.com" -) - # Create a manifest definition as a dictionary. # This manifest follows the V2 manifest format. manifest_definition = { @@ -92,29 +86,28 @@ def callback_signer_es256(data: bytes) -> bytes: ] } -# Create the builder with the manifest definition -builder = c2pa.Builder(manifest_definition) - # Sign the image with the signer created above, # which will use the callback signer print("\nSigning the image file...") -builder.sign_file( - source_path=fixtures_dir + "A.jpg", - dest_path=output_dir + "A_signed.jpg", - signer=signer -) - -# Clean up -signer.close() -builder.close() +with c2pa.Signer.from_callback( + callback=callback_signer_es256, + alg=c2pa.C2paSigningAlg.ES256, + certs=certs.decode('utf-8'), + tsa_url="http://timestamp.digicert.com" +) as signer: + with c2pa.Builder(manifest_definition) as builder: + builder.sign_file( + source_path=fixtures_dir + "A.jpg", + dest_path=output_dir + "A_signed.jpg", + signer=signer + ) # Re-Read the signed image to verify print("\nReading signed image metadata:") with open(output_dir + "A_signed.jpg", "rb") as file: - reader = c2pa.Reader("image/jpeg", file) - print(reader.json()) - reader.close() + with c2pa.Reader("image/jpeg", file) as reader: + print(reader.json()) print("\nExample completed successfully!") diff --git a/examples/sign_info.py b/examples/sign_info.py index 51f28b33..0efa68d8 100644 --- a/examples/sign_info.py +++ b/examples/sign_info.py @@ -26,8 +26,7 @@ output_dir = os.path.join(os.path.dirname(__file__), "../output/") # Note: Builder, Reader, and Signer support being used as context managers -# (with 'with' statements), but this example shows manual usage which requires -# explicitly calling the close() function to clean up resources. +# (with 'with' statements) for proper resource management. # Ensure the output directory exists if not os.path.exists(output_dir): @@ -40,13 +39,14 @@ # Read existing C2PA metadata from the file print("\nReading existing C2PA metadata:") with open(fixtures_dir + "C.jpg", "rb") as file: - reader = c2pa.Reader("image/jpeg", file) - print(reader.json()) - reader.close() + with c2pa.Reader("image/jpeg", file) as reader: + print(reader.json()) # Create a signer from certificate and key files -certs = open(fixtures_dir + "es256_certs.pem", "rb").read() -key = open(fixtures_dir + "es256_private.key", "rb").read() +with open(fixtures_dir + "es256_certs.pem", "rb") as cert_file: + certs = cert_file.read() +with open(fixtures_dir + "es256_private.key", "rb") as key_file: + key = key_file.read() # Define Signer information signer_info = c2pa.C2paSignerInfo( @@ -56,9 +56,6 @@ ta_url=b"http://timestamp.digicert.com" # Use bytes and add timestamp URL ) -# Create the Signer from the information -signer = c2pa.Signer.from_info(signer_info) - # Create a manifest definition as a dictionary # This examples signs using a V1 manifest # Note that this is a v1 spec manifest (legacy) @@ -89,27 +86,21 @@ ] } -# Create the builder with the manifest definition -builder = c2pa.Builder(manifest_definition) - # Sign the image print("\nSigning the image...") -with open(fixtures_dir + "C.jpg", "rb") as source: - # File needs to be opened in write+read mode to be signed - # and verified properly. - with open(output_dir + "C_signed.jpg", "w+b") as dest: - result = builder.sign(signer, "image/jpeg", source, dest) +with c2pa.Signer.from_info(signer_info) as signer: + with c2pa.Builder(manifest_definition) as builder: + with open(fixtures_dir + "C.jpg", "rb") as source: + # File needs to be opened in write+read mode to be signed + # and verified properly. + with open(output_dir + "C_signed.jpg", "w+b") as dest: + result = builder.sign(signer, "image/jpeg", source, dest) # Read the signed image to verify print("\nReading signed image metadata:") with open(output_dir + "C_signed.jpg", "rb") as file: - reader = c2pa.Reader("image/jpeg", file) - print(reader.json()) - reader.close() - -# Clean up resources manually, since we are not using with statements -signer.close() -builder.close() + with c2pa.Reader("image/jpeg", file) as reader: + print(reader.json()) print("\nExample completed successfully!") diff --git a/examples/training.py b/examples/training.py index 48f4b22e..b07d47ab 100644 --- a/examples/training.py +++ b/examples/training.py @@ -93,8 +93,10 @@ def getitem(d, key): # V2 signing API example try: # Read the private key and certificate files - key = open(keyFile,"rb").read() - certs = open(pemFile,"rb").read() + with open(keyFile, "rb") as key_file: + key = key_file.read() + with open(pemFile, "rb") as cert_file: + certs = cert_file.read() # Create a signer using the new API signer_info = c2pa.C2paSignerInfo( @@ -103,66 +105,56 @@ def getitem(d, key): private_key=key, ta_url=b"http://timestamp.digicert.com" ) - signer = c2pa.Signer.from_info(signer_info) - # Create the builder - builder = c2pa.Builder(manifest_json) + with c2pa.Signer.from_info(signer_info) as signer: + with c2pa.Builder(manifest_json) as builder: + # Add the thumbnail resource using a stream + with open(fixtures_dir + "A_thumbnail.jpg", "rb") as thumbnail_file: + builder.add_resource("thumbnail", thumbnail_file) - # Add the thumbnail resource using a stream - with open(fixtures_dir + "A_thumbnail.jpg", "rb") as thumbnail_file: - builder.add_resource("thumbnail", thumbnail_file) + # Add the ingredient using the correct method + with open(fixtures_dir + "A_thumbnail.jpg", "rb") as ingredient_file: + builder.add_ingredient(json.dumps(ingredient_json), "image/jpeg", ingredient_file) - # Add the ingredient using the correct method - with open(fixtures_dir + "A_thumbnail.jpg", "rb") as ingredient_file: - builder.add_ingredient(json.dumps(ingredient_json), "image/jpeg", ingredient_file) + if os.path.exists(testOutputFile): + os.remove(testOutputFile) - if os.path.exists(testOutputFile): - os.remove(testOutputFile) + # Sign the file using the stream-based sign method + with open(testFile, "rb") as source_file: + with open(testOutputFile, "w+b") as dest_file: + result = builder.sign(signer, "image/jpeg", source_file, dest_file) - # Sign the file using the stream-based sign method - with open(testFile, "rb") as source_file: - with open(testOutputFile, "w+b") as dest_file: - result = builder.sign(signer, "image/jpeg", source_file, dest_file) - - # As an alternative, you can also use file paths directly during signing: - # builder.sign_file(testFile, testOutputFile, signer) - - # Clean up native resources (using a with statement works too!) - signer.close() - builder.close() + # As an alternative, you can also use file paths directly during signing: + # builder.sign_file(testFile, testOutputFile, signer) except Exception as err: - print("Exception during signing: ", err) + print(f"Exception during signing: {err}") -print("\nSuccessfully added do not train manifest to file " + testOutputFile) +print(f"\nSuccessfully added do not train manifest to file {testOutputFile}") # now verify the asset and check the manifest for a do not train assertion... allowed = True # opt out model, assume training is ok if the assertion doesn't exist try: # Create reader using the Reader API - reader = c2pa.Reader(testOutputFile) - - # Retrieve the manifest store - manifest_store = json.loads(reader.json()) - - # Look at data in the active manifest - manifest = manifest_store["manifests"][manifest_store["active_manifest"]] - for assertion in manifest["assertions"]: - if assertion["label"] == "cawg.training-mining": - if getitem(assertion, ("data","entries","cawg.ai_generative_training","use")) == "notAllowed": - allowed = False - - # Get the ingredient thumbnail and save it to a file using resource_to_stream - uri = getitem(manifest,("ingredients", 0, "thumbnail", "identifier")) - with open(output_dir + "thumbnail_v2.jpg", "wb") as thumbnail_output: - reader.resource_to_stream(uri, thumbnail_output) - - # Clean up native resources (using a with statement works too!) - reader.close() + with c2pa.Reader(testOutputFile) as reader: + # Retrieve the manifest store + manifest_store = json.loads(reader.json()) + + # Look at data in the active manifest + manifest = manifest_store["manifests"][manifest_store["active_manifest"]] + for assertion in manifest["assertions"]: + if assertion["label"] == "cawg.training-mining": + if getitem(assertion, ("data","entries","cawg.ai_generative_training","use")) == "notAllowed": + allowed = False + + # Get the ingredient thumbnail and save it to a file using resource_to_stream + uri = getitem(manifest,("ingredients", 0, "thumbnail", "identifier")) + with open(output_dir + "thumbnail_v2.jpg", "wb") as thumbnail_output: + reader.resource_to_stream(uri, thumbnail_output) except Exception as err: - print("Exception during assertions reading: ", err) + print(f"Exception during assertions reading: {err}") if allowed: print("Training is allowed") diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py index 2e367988..fe6e52ef 100644 --- a/src/c2pa/c2pa.py +++ b/src/c2pa/c2pa.py @@ -23,6 +23,7 @@ import io from .lib import dynamically_load_library import mimetypes +from itertools import count # Create a module-specific logger logger = logging.getLogger("c2pa") @@ -567,7 +568,7 @@ def _convert_to_py_string(value) -> str: # Only if we got a valid pointer with valid content if ptr and ptr.value is not None: try: - py_string = ptr.value.decode('utf-8', errors='replace') + py_string = ptr.value.decode('utf-8', errors='strict') except Exception: py_string = "" finally: @@ -911,16 +912,11 @@ def sign_file( class Stream: - # Class-level counter for generating unique stream IDs - # (useful for tracing streams usage in debug) - _next_stream_id = 0 + # Class-level somewhat atomic counter for generating + # unique stream IDs (useful for tracing streams usage in debug) + _stream_id_counter = count(start=0, step=1) + # Maximum value for a 32-bit signed integer (2^31 - 1) - # This prevents integer overflow which could cause: - # 1. Unexpected behavior in stream ID generation - # 2. Potential security issues if IDs wrap around - # 3. Memory issues if the number grows too large - # When this limit is reached, we reset to 0 since the timestamp component - # of the stream ID ensures uniqueness even after counter reset _MAX_STREAM_ID = 2**31 - 1 # Class-level error messages to avoid multiple creation @@ -958,10 +954,15 @@ def __init__(self, file_like_stream): self._stream = None # Generate unique stream ID using object ID and counter - if Stream._next_stream_id >= Stream._MAX_STREAM_ID: # pragma: no cover - Stream._next_stream_id = 0 - self._stream_id = f"{id(self)}-{Stream._next_stream_id}" - Stream._next_stream_id += 1 + stream_counter = next(Stream._stream_id_counter) + + # Handle counter overflow by resetting the counter + if stream_counter >= Stream._MAX_STREAM_ID: # pragma: no cover + # Reset the counter to 0 and get the next value + Stream._stream_id_counter = count(start=0, step=1) + stream_counter = next(Stream._stream_id_counter) + + self._stream_id = f"{id(self)}-{stream_counter}" # Rest of the existing initialization code... required_methods = ['read', 'write', 'seek', 'tell', 'flush'] @@ -1372,34 +1373,32 @@ def __init__(self, str(e))) try: - # Open the file and create a stream - file = open(path, 'rb') - self._own_stream = Stream(file) + with open(path, 'rb') as file: + self._own_stream = Stream(file) - self._reader = _lib.c2pa_reader_from_stream( - mime_type_str, - self._own_stream._stream - ) + self._reader = _lib.c2pa_reader_from_stream( + mime_type_str, + self._own_stream._stream + ) - if not self._reader: - self._own_stream.close() - file.close() - error = _parse_operation_result_for_error( - _lib.c2pa_error()) - if error: - raise C2paError(error) - raise C2paError( - Reader._ERROR_MESSAGES['reader_error'].format( - "Unknown error" + if not self._reader: + self._own_stream.close() + error = _parse_operation_result_for_error( + _lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError( + Reader._ERROR_MESSAGES['reader_error'].format( + "Unknown error" + ) ) - ) - # Store the file to close it later - self._backing_file = file - - self._initialized = True + # Store the file to close it later + self._backing_file = file + self._initialized = True except Exception as e: + # File automatically closed by context manager if self._own_stream: self._own_stream.close() if hasattr(self, '_backing_file') and self._backing_file: @@ -1418,50 +1417,49 @@ def __init__(self, f"Reader does not support {format_or_path}") try: - file = open(stream, 'rb') - self._own_stream = Stream(file) - - format_str = str(format_or_path) - format_bytes = format_str.encode('utf-8') - - if manifest_data is None: - self._reader = _lib.c2pa_reader_from_stream( - format_bytes, self._own_stream._stream) - else: - if not isinstance(manifest_data, bytes): - raise TypeError( - Reader._ERROR_MESSAGES['manifest_error']) - manifest_array = ( - ctypes.c_ubyte * - len(manifest_data))( - * - manifest_data) - self._reader = ( - _lib.c2pa_reader_from_manifest_data_and_stream( - format_bytes, - self._own_stream._stream, - manifest_array, - len(manifest_data), + with open(stream, 'rb') as file: + self._own_stream = Stream(file) + + format_str = str(format_or_path) + format_bytes = format_str.encode('utf-8') + + if manifest_data is None: + self._reader = _lib.c2pa_reader_from_stream( + format_bytes, self._own_stream._stream) + else: + if not isinstance(manifest_data, bytes): + raise TypeError( + Reader._ERROR_MESSAGES['manifest_error']) + manifest_array = ( + ctypes.c_ubyte * + len(manifest_data))( + * + manifest_data) + self._reader = ( + _lib.c2pa_reader_from_manifest_data_and_stream( + format_bytes, + self._own_stream._stream, + manifest_array, + len(manifest_data), + ) ) - ) - if not self._reader: - self._own_stream.close() - file.close() - error = _parse_operation_result_for_error( - _lib.c2pa_error()) - if error: - raise C2paError(error) - raise C2paError( - Reader._ERROR_MESSAGES['reader_error'].format( - "Unknown error" + if not self._reader: + self._own_stream.close() + error = _parse_operation_result_for_error( + _lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError( + Reader._ERROR_MESSAGES['reader_error'].format( + "Unknown error" + ) ) - ) - - self._backing_file = file - self._initialized = True + self._backing_file = file + self._initialized = True except Exception as e: + # File closed by context manager if self._own_stream: self._own_stream.close() if hasattr(self, '_backing_file') and self._backing_file: @@ -2717,28 +2715,53 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes: C2paError: If there was an error signing the data C2paError.Encoding: If the private key contains invalid UTF-8 chars """ - data_array = (ctypes.c_ubyte * len(data))(*data) - try: - key_str = private_key.encode('utf-8') - except UnicodeError as e: - raise C2paError.Encoding( - f"Invalid UTF-8 characters in private key: {str(e)}") + if not data: + raise C2paError("Data to sign cannot be empty") - signature_ptr = _lib.c2pa_ed25519_sign(data_array, len(data), key_str) + if not private_key or not isinstance(private_key, str): + raise C2paError("Private key must be a non-empty string") - if not signature_ptr: - error = _parse_operation_result_for_error(_lib.c2pa_error()) - if error: - raise C2paError(error) - raise C2paError("Failed to sign data with Ed25519") + # Create secure memory buffer for data + data_array = None + key_bytes = None try: - # Ed25519 signatures are always 64 bytes - signature = bytes(signature_ptr[:64]) - finally: - _lib.c2pa_signature_free(signature_ptr) + # Create data array with size validation + data_size = len(data) + data_array = (ctypes.c_ubyte * data_size)(*data) - return signature + # Encode private key to bytes + try: + key_bytes = private_key.encode('utf-8') + except UnicodeError as e: + raise C2paError.Encoding( + f"Invalid UTF-8 characters in private key: {str(e)}") + + # Perform the signing operation + signature_ptr = _lib.c2pa_ed25519_sign( + data_array, + data_size, + key_bytes + ) + + if not signature_ptr: + error = _parse_operation_result_for_error(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to sign data with Ed25519") + + try: + # Ed25519 signatures are always 64 bytes + signature = bytes(signature_ptr[:64]) + finally: + _lib.c2pa_signature_free(signature_ptr) + + return signature + + finally: + if key_bytes: + ctypes.memset(key_bytes, 0, len(key_bytes)) + del key_bytes __all__ = [