Skip to content
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ publish: release

# Code analysis
check-format:
python3 -m py_compile src/c2pa/c2pa.py
flake8 src/c2pa/c2pa.py

# Formats Python source code using autopep8 with aggressive settings
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import c2pa
## Examples

See the [`examples` directory](https://github.com/contentauth/c2pa-python/tree/main/examples) for some helpful examples:

- `examples/sign.py` shows how to sign and verify an asset with a C2PA manifest.
- `examples/training.py` demonstrates how to add a "Do Not Train" assertion to an asset and verify it.

Expand Down
43 changes: 18 additions & 25 deletions examples/sign.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
# Load certificates and private key (here from the test fixtures).
# This is OK for development, but in production you should use a
# secure way to load the certificates and private key.
certs = open(fixtures_dir + "es256_certs.pem", "rb").read()
key = open(fixtures_dir + "es256_private.key", "rb").read()
with open(fixtures_dir + "es256_certs.pem", "rb") as cert_file:
certs = cert_file.read()
with open(fixtures_dir + "es256_private.key", "rb") as key_file:
key = key_file.read()

# Define a callback signer function
def callback_signer_es256(data: bytes) -> bytes:
Expand All @@ -55,14 +57,6 @@ def callback_signer_es256(data: bytes) -> bytes:
)
return signature

# Create a signer using the callback function we defined
signer = c2pa.Signer.from_callback(
callback=callback_signer_es256,
alg=c2pa.C2paSigningAlg.ES256,
certs=certs.decode('utf-8'),
tsa_url="http://timestamp.digicert.com"
)

# Create a manifest definition as a dictionary.
# This manifest follows the V2 manifest format.
manifest_definition = {
Expand Down Expand Up @@ -92,29 +86,28 @@ def callback_signer_es256(data: bytes) -> bytes:
]
}

# Create the builder with the manifest definition
builder = c2pa.Builder(manifest_definition)

# Sign the image with the signer created above,
# which will use the callback signer
print("\nSigning the image file...")

builder.sign_file(
source_path=fixtures_dir + "A.jpg",
dest_path=output_dir + "A_signed.jpg",
signer=signer
)

# Clean up
signer.close()
builder.close()
with c2pa.Signer.from_callback(
callback=callback_signer_es256,
alg=c2pa.C2paSigningAlg.ES256,
certs=certs.decode('utf-8'),
tsa_url="http://timestamp.digicert.com"
) as signer:
with c2pa.Builder(manifest_definition) as builder:
builder.sign_file(
source_path=fixtures_dir + "A.jpg",
dest_path=output_dir + "A_signed.jpg",
signer=signer
)

# Re-Read the signed image to verify
print("\nReading signed image metadata:")
with open(output_dir + "A_signed.jpg", "rb") as file:
reader = c2pa.Reader("image/jpeg", file)
print(reader.json())
reader.close()
with c2pa.Reader("image/jpeg", file) as reader:
print(reader.json())

print("\nExample completed successfully!")

41 changes: 16 additions & 25 deletions examples/sign_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
output_dir = os.path.join(os.path.dirname(__file__), "../output/")

# Note: Builder, Reader, and Signer support being used as context managers
# (with 'with' statements), but this example shows manual usage which requires
# explicitly calling the close() function to clean up resources.
# (with 'with' statements) for proper resource management.

# Ensure the output directory exists
if not os.path.exists(output_dir):
Expand All @@ -40,13 +39,14 @@
# Read existing C2PA metadata from the file
print("\nReading existing C2PA metadata:")
with open(fixtures_dir + "C.jpg", "rb") as file:
reader = c2pa.Reader("image/jpeg", file)
print(reader.json())
reader.close()
with c2pa.Reader("image/jpeg", file) as reader:
print(reader.json())

# Create a signer from certificate and key files
certs = open(fixtures_dir + "es256_certs.pem", "rb").read()
key = open(fixtures_dir + "es256_private.key", "rb").read()
with open(fixtures_dir + "es256_certs.pem", "rb") as cert_file:
certs = cert_file.read()
with open(fixtures_dir + "es256_private.key", "rb") as key_file:
key = key_file.read()

# Define Signer information
signer_info = c2pa.C2paSignerInfo(
Expand All @@ -56,9 +56,6 @@
ta_url=b"http://timestamp.digicert.com" # Use bytes and add timestamp URL
)

# Create the Signer from the information
signer = c2pa.Signer.from_info(signer_info)

# Create a manifest definition as a dictionary
# This examples signs using a V1 manifest
# Note that this is a v1 spec manifest (legacy)
Expand Down Expand Up @@ -89,27 +86,21 @@
]
}

# Create the builder with the manifest definition
builder = c2pa.Builder(manifest_definition)

# Sign the image
print("\nSigning the image...")
with open(fixtures_dir + "C.jpg", "rb") as source:
# File needs to be opened in write+read mode to be signed
# and verified properly.
with open(output_dir + "C_signed.jpg", "w+b") as dest:
result = builder.sign(signer, "image/jpeg", source, dest)
with c2pa.Signer.from_info(signer_info) as signer:
with c2pa.Builder(manifest_definition) as builder:
with open(fixtures_dir + "C.jpg", "rb") as source:
# File needs to be opened in write+read mode to be signed
# and verified properly.
with open(output_dir + "C_signed.jpg", "w+b") as dest:
result = builder.sign(signer, "image/jpeg", source, dest)

# Read the signed image to verify
print("\nReading signed image metadata:")
with open(output_dir + "C_signed.jpg", "rb") as file:
reader = c2pa.Reader("image/jpeg", file)
print(reader.json())
reader.close()

# Clean up resources manually, since we are not using with statements
signer.close()
builder.close()
with c2pa.Reader("image/jpeg", file) as reader:
print(reader.json())

print("\nExample completed successfully!")

84 changes: 38 additions & 46 deletions examples/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ def getitem(d, key):
# V2 signing API example
try:
# Read the private key and certificate files
key = open(keyFile,"rb").read()
certs = open(pemFile,"rb").read()
with open(keyFile, "rb") as key_file:
key = key_file.read()
with open(pemFile, "rb") as cert_file:
certs = cert_file.read()

# Create a signer using the new API
signer_info = c2pa.C2paSignerInfo(
Expand All @@ -103,66 +105,56 @@ def getitem(d, key):
private_key=key,
ta_url=b"http://timestamp.digicert.com"
)
signer = c2pa.Signer.from_info(signer_info)

# Create the builder
builder = c2pa.Builder(manifest_json)
with c2pa.Signer.from_info(signer_info) as signer:
with c2pa.Builder(manifest_json) as builder:
# Add the thumbnail resource using a stream
with open(fixtures_dir + "A_thumbnail.jpg", "rb") as thumbnail_file:
builder.add_resource("thumbnail", thumbnail_file)

# Add the thumbnail resource using a stream
with open(fixtures_dir + "A_thumbnail.jpg", "rb") as thumbnail_file:
builder.add_resource("thumbnail", thumbnail_file)
# Add the ingredient using the correct method
with open(fixtures_dir + "A_thumbnail.jpg", "rb") as ingredient_file:
builder.add_ingredient(json.dumps(ingredient_json), "image/jpeg", ingredient_file)

# Add the ingredient using the correct method
with open(fixtures_dir + "A_thumbnail.jpg", "rb") as ingredient_file:
builder.add_ingredient(json.dumps(ingredient_json), "image/jpeg", ingredient_file)
if os.path.exists(testOutputFile):
os.remove(testOutputFile)

if os.path.exists(testOutputFile):
os.remove(testOutputFile)
# Sign the file using the stream-based sign method
with open(testFile, "rb") as source_file:
with open(testOutputFile, "w+b") as dest_file:
result = builder.sign(signer, "image/jpeg", source_file, dest_file)

# Sign the file using the stream-based sign method
with open(testFile, "rb") as source_file:
with open(testOutputFile, "w+b") as dest_file:
result = builder.sign(signer, "image/jpeg", source_file, dest_file)

# As an alternative, you can also use file paths directly during signing:
# builder.sign_file(testFile, testOutputFile, signer)

# Clean up native resources (using a with statement works too!)
signer.close()
builder.close()
# As an alternative, you can also use file paths directly during signing:
# builder.sign_file(testFile, testOutputFile, signer)

except Exception as err:
print("Exception during signing: ", err)
print(f"Exception during signing: {err}")

print("\nSuccessfully added do not train manifest to file " + testOutputFile)
print(f"\nSuccessfully added do not train manifest to file {testOutputFile}")

# now verify the asset and check the manifest for a do not train assertion...

allowed = True # opt out model, assume training is ok if the assertion doesn't exist
try:
# Create reader using the Reader API
reader = c2pa.Reader(testOutputFile)

# Retrieve the manifest store
manifest_store = json.loads(reader.json())

# Look at data in the active manifest
manifest = manifest_store["manifests"][manifest_store["active_manifest"]]
for assertion in manifest["assertions"]:
if assertion["label"] == "cawg.training-mining":
if getitem(assertion, ("data","entries","cawg.ai_generative_training","use")) == "notAllowed":
allowed = False

# Get the ingredient thumbnail and save it to a file using resource_to_stream
uri = getitem(manifest,("ingredients", 0, "thumbnail", "identifier"))
with open(output_dir + "thumbnail_v2.jpg", "wb") as thumbnail_output:
reader.resource_to_stream(uri, thumbnail_output)

# Clean up native resources (using a with statement works too!)
reader.close()
with c2pa.Reader(testOutputFile) as reader:
# Retrieve the manifest store
manifest_store = json.loads(reader.json())

# Look at data in the active manifest
manifest = manifest_store["manifests"][manifest_store["active_manifest"]]
for assertion in manifest["assertions"]:
if assertion["label"] == "cawg.training-mining":
if getitem(assertion, ("data","entries","cawg.ai_generative_training","use")) == "notAllowed":
allowed = False

# Get the ingredient thumbnail and save it to a file using resource_to_stream
uri = getitem(manifest,("ingredients", 0, "thumbnail", "identifier"))
with open(output_dir + "thumbnail_v2.jpg", "wb") as thumbnail_output:
reader.resource_to_stream(uri, thumbnail_output)

except Exception as err:
print("Exception during assertions reading: ", err)
print(f"Exception during assertions reading: {err}")

if allowed:
print("Training is allowed")
Expand Down
Loading
Loading