Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions verifiers-reference-implementation/python-server/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'''
Copyright 2025 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''

# --- Configuration ---

# TODO: Replace with your actual application's package name (for Android)
# Used in constructing the SessionTranscript for the 'preview' protocol.
APP_PACKAGE_NAME = "<your_app_package_name>"

# TODO: Replace with the actual SHA-256 hash of your Android app's signing certificate
# Used in constructing the SessionTranscript for the 'openid4vp' protocol when
# the origin is an Android app. Find using:
# keytool -printcert -jarfile your_app.apk | grep SHA256 | awk '{print $2}' | xxd -r -p | sha256sum | awk '{print $1}'
ANDROID_APP_SIGNATURE_HASH = "<your_app_signature_here_without_':'>"

# URL for the external Zero-Knowledge Verifier service's verification endpoint.
ZK_VERIFIER_URL = "<path_to_ZKverifier>/zkverify"
# URL for the external Zero-Knowledge Verifier service's specifications endpoint.
SPECS_URL = "<path_to_ZKverifier>/specs"

25 changes: 25 additions & 0 deletions verifiers-reference-implementation/python-server/keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
'''
Copyright 2025 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''

PRIVATE_KEY = """-----BEGIN PRIVATE KEY-----
<Your private key / Should get it directly from your
Key Management service>
-----END PRIVATE KEY-----"""

CERTIFICATE = """-----BEGIN CERTIFICATE-----
<Your Pulic cert / Can be added here, can come from
your Key management Service.>
-----END CERTIFICATE-----"""
133 changes: 102 additions & 31 deletions verifiers-reference-implementation/python-server/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,19 @@
import cbor2 # For CBOR encoding/decoding
from flask import Flask, request, jsonify, render_template # For the web application framework
from isomdoc import verify_device_response # For mdoc/mDL verification (ISO 18013-5)
from jwcrypto import jwe, jwk # For JSON Web Encryption (JWE) handling in OpenID4VP

from jwcrypto import jwe, jwk, jws # For JSON Web Encryption/Signature handling in OpenID4VP
from jwcrypto.common import json_encode
from keys import CERTIFICATE, PRIVATE_KEY
# --- Configuration ---
import config

# TODO: Replace with your actual application's package name (for Android)
# Used in constructing the SessionTranscript for the 'preview' protocol.
APP_PACKAGE_NAME = "<your_app_package_name>"

# URL for the external Zero-Knowledge Verifier service's verification endpoint.
ZK_VERIFIER_URL = "<path_to_ZKverifier>/zkverify"
# URL for the external Zero-Knowledge Verifier service's specifications endpoint.
SPECS_URL = "<path_to_ZKverifier>/specs"
try:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
except ImportError:
print("Error: cryptography module not found. Please install it.")
from cryptography.hazmat.primitives import serialization

# TODO: Replace with the actual SHA-256 hash of your Android app's signing certificate
# Used in constructing the SessionTranscript for the 'openid4vp' protocol when
# the origin is an Android app. Find using:
# keytool -printcert -jarfile your_app.apk | grep SHA256 | awk '{print $2}' | xxd -r -p | sha256sum | awk '{print $1}'
ANDROID_APP_SIGNATURE_HASH = "<your_app_signature_here_without_':'>"

# JWE Configuration for OpenID4VP (Constants for clarity)
JWE_ALG = "ECDH-ES" # Key Agreement Algorithm
Expand Down Expand Up @@ -128,7 +123,7 @@ def generate_request_state() -> dict:
return state


def construct_openid4vp_request(doctypes: list[str], requested_fields: list[dict], nonce_base64: str, jwe_encryption_public_jwk: jwk.JWK, is_zkp_request: bool, is_signed_request: bool) -> dict:
def construct_openid4vp_request(doctypes: list[str], requested_fields: list[dict], nonce_base64: str, jwe_encryption_public_jwk: jwk.JWK, is_zkp_request: bool, is_signed_request: bool, state: dict, origin: str) -> dict:
"""
Constructs the request dictionary for the OpenID4VP protocol.

Expand All @@ -140,9 +135,11 @@ def construct_openid4vp_request(doctypes: list[str], requested_fields: list[dict
jwe_encryption_public_jwk: The reader's public JWK for response encryption.
is_zkp_request : Boolean for getting a ZKP.
is_signed_request: Flag to indicate if the request is signed.
state: The state dictionary containing the client_id used for signing.
origin: The origin string (URL or Android package info) of the request.

Returns:
A dictionary representing the OpenID4VP request structure.
A dictionary representing the OpenID4VP request structure, or a signed JWT string.
"""

credentials_list = []
Expand Down Expand Up @@ -212,7 +209,7 @@ def construct_openid4vp_request(doctypes: list[str], requested_fields: list[dict
# algorithm for signing and authentication in this context.
mdoc_crypto_capabilities = {
"mso_mdoc":{
"isserauth_alg_values":[-7],
"issuerauth_alg_values":[-7],
"deviceauth_alg_values":[-7]
}
}
Expand All @@ -226,12 +223,71 @@ def construct_openid4vp_request(doctypes: list[str], requested_fields: list[dict
"dcql_query": dcql_query, # The credential query
"client_metadata": client_metadata # How the client wants the response encrypted
}
if is_signed_request:
# --- Request Signing (JAR / OpenID4VP) ---
try:
# 1. Load the Verifier's Certificate
# We must load the PEM string into a cryptography x509 object
verifier_cert_obj = x509.load_pem_x509_certificate(CERTIFICATE.encode('utf-8'), backend=default_backend())

# 2. Calculate Client ID (x509_hash)
# We calculate the SHA-256 hash of the DER-encoded certificate.
# This binds the request to the certificate.
cert_der = verifier_cert_obj.public_bytes(serialization.Encoding.DER)

verifier_fingerprint_bytes = hashlib.sha256(cert_der).digest()
verifier_fingerprint_b64 = base64.urlsafe_b64encode(verifier_fingerprint_bytes).decode('utf-8').rstrip("=")

client_id = f'x509_hash:{verifier_fingerprint_b64}'

# 3. Update Request Payload
request_payload["client_id"] = client_id
if origin:
request_payload["expected_origins"] = [origin]
else:
request_payload["expected_origins"] = ['']

# 4. Update State
# Store the client_id so we can verify it later if needed or for debugging.
if state:
state["sign_request_client_id"] = client_id

# 5. Create Signed JWT (JWS)
# Load the signing private key
signing_key = jwk.JWK.from_pem(PRIVATE_KEY.encode('utf-8'))

# Create the JWS payload
jws_token = jws.JWS(json.dumps(request_payload).encode('utf-8'))

# Construct the JOSE Header
# x5c (X.509 Certificate Chain) must contain the base64-encoded *DER* certificate.
x5c_value = base64.b64encode(cert_der).decode('utf-8')

protected_header = {
"alg": "ES256",
"typ": "oauth-authz-req+jwt",
"kid": "1",
"x5c": [x5c_value]
}

jws_token.add_signature(
key=signing_key,
alg=None,
protected=json_encode(protected_header)
)

# 6. Serialize
return {"request": jws_token.serialize(compact=True)}

except Exception as e:
print(f"Error signing OpenID4VP request: {e}")
return None # Or raise, depending on desired error handling
return request_payload


def fetch_and_process_specs(num_attributes):
"""
Fetches specs from the SPECS_URL, filters them by the number of attributes,
Fetches specs from the config.SPECS_URL, filters them by the number of attributes,
identifies the top two latest versions from the filtered list,
and returns the specs for those versions.

Expand All @@ -248,7 +304,7 @@ def fetch_and_process_specs(num_attributes):
try:
# Make a GET request to the external specs endpoint
# NOTE: 'requests' library needs to be imported for this to work.
response = requests.get(SPECS_URL)
response = requests.get(config.SPECS_URL)

# Raise an exception for bad status codes (4xx or 5xx)
response.raise_for_status()
Expand Down Expand Up @@ -478,23 +534,29 @@ def process_openid4vp_response(encrypted_jwe_string: str, request_state: dict, o

# 4. Construct SessionTranscript based on origin
if origin.startswith("https://") or origin.startswith("http://"): # Web Origin
client_id = f"web-origin:{origin}"
if "sign_request_client_id" in request_state:
client_id = request_state["sign_request_client_id"]
else:
client_id = f"web-origin:{origin}"
origin_info = origin
session_transcript_list = generate_openid4vp_session_transcript(
client_id, nonce_base64_unpadded, origin_info, encryption_public_jwk_thumbprint
)
else: # Assume Android Origin
client_id = f"android-origin:{APP_PACKAGE_NAME}"
if "sign_request_client_id" in request_state:
client_id = request_state["sign_request_client_id"]
else:
client_id = f"android-origin:{config.APP_PACKAGE_NAME}"
# Calculate the base64 encoded SHA256 hash of the app signing cert
try:
app_signature_hash_bytes = bytes.fromhex(ANDROID_APP_SIGNATURE_HASH)
app_signature_hash_bytes = bytes.fromhex(config.ANDROID_APP_SIGNATURE_HASH)
app_signature_hash_base64 = base64.b64encode(app_signature_hash_bytes).decode("utf-8").rstrip("=")
origin_info = f"android:apk-key-hash:{app_signature_hash_base64}"
session_transcript_list = generate_openid4vp_session_transcript(
client_id, nonce_base64_unpadded, origin_info, encryption_public_jwk_thumbprint
)
except ValueError as e:
print(f"Error processing Android signature hash: {e}. Ensure ANDROID_APP_SIGNATURE_HASH is correct hex.")
print(f"Error processing Android signature hash: {e}. Ensure config.ANDROID_APP_SIGNATURE_HASH is correct hex.")
return None

# print(f"Using Session Transcript (List) for Verification: {session_transcript_list}") # Debugging
Expand Down Expand Up @@ -604,12 +666,18 @@ def process_openid4vp_zk_response(encrypted_jwe_string: str, request_state: dict

# 4. Construct the Session Transcript required for verification
if origin.startswith("https://") or origin.startswith("http://"):
client_id = f"web-origin:{origin}"
if "sign_request_client_id" in request_state:
client_id = request_state["sign_request_client_id"]
else:
client_id = f"web-origin:{origin}"
origin_info = origin
else: # Assume Android Origin
client_id = f"android-origin:{APP_PACKAGE_NAME}"
if "sign_request_client_id" in request_state:
client_id = request_state["sign_request_client_id"]
else:
client_id = f"android-origin:{config.APP_PACKAGE_NAME}"
try:
app_signature_hash_bytes = bytes.fromhex(ANDROID_APP_SIGNATURE_HASH)
app_signature_hash_bytes = bytes.fromhex(config.ANDROID_APP_SIGNATURE_HASH)
app_signature_hash_base64 = base64.b64encode(app_signature_hash_bytes).decode("utf-8")
origin_info = f"android:apk-key-hash:{app_signature_hash_base64}"
except ValueError as e:
Expand All @@ -633,15 +701,15 @@ def process_openid4vp_zk_response(encrypted_jwe_string: str, request_state: dict

# 6. Send data to the ZK verification server and process the response
try:
print(f"Sending request to ZK Verifier at: {ZK_VERIFIER_URL}")
print(f"Sending request to ZK Verifier at: {config.ZK_VERIFIER_URL}")
headers = {
"Content-Type": "application/json",
# 'Authorization': f'Bearer {id_token}',
}

# NOTE: 'requests' library needs to be imported for this to work.
response = requests.post(
ZK_VERIFIER_URL,
config.ZK_VERIFIER_URL,
headers=headers,
json=zk_verification_payload,
timeout=80 # Add a timeout for robustness
Expand Down Expand Up @@ -730,7 +798,7 @@ def handle_request_initiation():
requested_attributes = request_data.get("attributes", [])

is_zkp_request = False
if request_data.get("requestZkp") is True:
if "requestZkp" in request_data and request_data["requestZkp"] is True:
is_zkp_request = True

# --- Input Validation ---
Expand Down Expand Up @@ -766,7 +834,9 @@ def handle_request_initiation():
nonce_base64,
jwe_encryption_key_pair, # Pass public JWK
is_zkp_request,
is_signed_request
is_signed_request,
state,
origin
)

# Ensure payload generation was successful (should be if inputs are valid)
Expand Down Expand Up @@ -835,6 +905,7 @@ def handle_zk_verification():
is_signed_request = protocol == "openid4vp-v1-signed"

response_data = request_data.get("response")
extracted_data = None
if not isinstance(response_data, str):
return jsonify({'success': False, 'error': 'Invalid "data" format for openid4vp: expected a encrypted string'}), 400

Expand Down