From 89a137b63b55ff8f68e4dc92307f89b06e025dbf Mon Sep 17 00:00:00 2001 From: Dmitry Sharkov Date: Mon, 22 Dec 2025 09:51:34 -0500 Subject: [PATCH 1/9] added a patch with dns persistent DCV method, still refactoring logic --- src/open_mpic_core/__init__.py | 1 + .../common_domain/check_parameters.py | 9 + .../common_domain/check_response_details.py | 5 + .../enum/dcv_validation_method.py | 8 +- .../mpic_dcv_checker/mpic_dcv_checker.py | 162 ++++++++++++--- .../test_check_request_parameters.py | 7 + .../open_mpic_core/test_mpic_caa_request.py | 4 +- .../open_mpic_core/test_mpic_coordinator.py | 36 +++- .../open_mpic_core/test_mpic_dcv_checker.py | 187 ++++++++++++++++-- .../open_mpic_core/test_mpic_dcv_request.py | 22 +-- tests/unit/test_util/valid_check_creator.py | 13 ++ .../test_util/valid_mpic_request_creator.py | 6 + 12 files changed, 403 insertions(+), 57 deletions(-) diff --git a/src/open_mpic_core/__init__.py b/src/open_mpic_core/__init__.py index ca3c1a0..2d55948 100644 --- a/src/open_mpic_core/__init__.py +++ b/src/open_mpic_core/__init__.py @@ -13,6 +13,7 @@ DcvAcmeHttp01ValidationParameters, DcvWebsiteChangeValidationParameters, DcvDnsChangeValidationParameters, + DcvDnsPersistentValidationParameters, DcvAcmeDns01ValidationParameters, DcvContactPhoneTxtValidationParameters, DcvContactEmailCaaValidationParameters, diff --git a/src/open_mpic_core/common_domain/check_parameters.py b/src/open_mpic_core/common_domain/check_parameters.py index 103952f..c23e419 100644 --- a/src/open_mpic_core/common_domain/check_parameters.py +++ b/src/open_mpic_core/common_domain/check_parameters.py @@ -54,6 +54,14 @@ def validate_record_type(cls, v: DnsRecordType) -> DnsRecordType: return v +class DcvDnsPersistentValidationParameters(DcvValidationParameters): + validation_method: Literal[DcvValidationMethod.DNS_PERSISTENT] = DcvValidationMethod.DNS_PERSISTENT + dns_record_type: Literal[DnsRecordType.TXT] = DnsRecordType.TXT + dns_name_prefix: Literal["_validation-persist"] = "_validation-persist" + issuer_domain_names: list[str] # Disclosed issuer domain names from CA's CP/CPS + expected_account_uri: str # The specific account URI to validate + + class DcvContactEmailTxtValidationParameters(DcvGeneralDnsValidationParameters): validation_method: Literal[DcvValidationMethod.CONTACT_EMAIL_TXT] = DcvValidationMethod.CONTACT_EMAIL_TXT dns_record_type: Literal[DnsRecordType.TXT] = DnsRecordType.TXT @@ -117,6 +125,7 @@ class DcvAcmeTlsAlpn01ValidationParameters(DcvValidationParameters): Union[ DcvWebsiteChangeValidationParameters, DcvDnsChangeValidationParameters, + DcvDnsPersistentValidationParameters, DcvAcmeHttp01ValidationParameters, DcvAcmeDns01ValidationParameters, DcvAcmeTlsAlpn01ValidationParameters, diff --git a/src/open_mpic_core/common_domain/check_response_details.py b/src/open_mpic_core/common_domain/check_response_details.py index a09b753..8ced953 100644 --- a/src/open_mpic_core/common_domain/check_response_details.py +++ b/src/open_mpic_core/common_domain/check_response_details.py @@ -27,12 +27,14 @@ class DcvHttpCheckResponseDetails(BaseModel): class DcvDnsCheckResponseDetails(BaseModel): validation_method: Literal[ DcvValidationMethod.DNS_CHANGE, + DcvValidationMethod.DNS_PERSISTENT, DcvValidationMethod.IP_ADDRESS, DcvValidationMethod.CONTACT_EMAIL_CAA, DcvValidationMethod.CONTACT_EMAIL_TXT, DcvValidationMethod.CONTACT_PHONE_CAA, DcvValidationMethod.CONTACT_PHONE_TXT, DcvValidationMethod.ACME_DNS_01, + DcvValidationMethod.DNS_ACCOUNT_01, DcvValidationMethod.REVERSE_ADDRESS_LOOKUP, ] records_seen: list[str] | None = None # list of records found in DNS query; not base64 encoded @@ -41,6 +43,7 @@ class DcvDnsCheckResponseDetails(BaseModel): found_at: str | None = None # domain where DNS record was found cname_chain: list[str] | None = None # List of CNAMEs followed to obtain the final result. + class DcvTlsAlpnCheckResponseDetails(BaseModel): validation_method: Literal[DcvValidationMethod.ACME_TLS_ALPN_01] common_name: str | None = None # common name seen in certificate. @@ -56,8 +59,10 @@ def build_response_details(validation_method: DcvValidationMethod) -> DcvCheckRe types = { DcvValidationMethod.WEBSITE_CHANGE: DcvHttpCheckResponseDetails, DcvValidationMethod.DNS_CHANGE: DcvDnsCheckResponseDetails, + DcvValidationMethod.DNS_PERSISTENT: DcvDnsCheckResponseDetails, DcvValidationMethod.ACME_HTTP_01: DcvHttpCheckResponseDetails, DcvValidationMethod.ACME_DNS_01: DcvDnsCheckResponseDetails, + DcvValidationMethod.DNS_ACCOUNT_01: DcvDnsCheckResponseDetails, DcvValidationMethod.ACME_TLS_ALPN_01: DcvTlsAlpnCheckResponseDetails, DcvValidationMethod.CONTACT_PHONE_TXT: DcvDnsCheckResponseDetails, DcvValidationMethod.CONTACT_PHONE_CAA: DcvDnsCheckResponseDetails, diff --git a/src/open_mpic_core/common_domain/enum/dcv_validation_method.py b/src/open_mpic_core/common_domain/enum/dcv_validation_method.py index b0cdcc2..781d889 100644 --- a/src/open_mpic_core/common_domain/enum/dcv_validation_method.py +++ b/src/open_mpic_core/common_domain/enum/dcv_validation_method.py @@ -2,11 +2,13 @@ class DcvValidationMethod(StrEnum): - WEBSITE_CHANGE = 'website-change' + WEBSITE_CHANGE = 'website-change' # CABF BRs 3.2.2.4.18 Agreed-Upon Change to Website v2 DNS_CHANGE = 'dns-change' # CNAME, TXT, or CAA record - ACME_HTTP_01 = 'acme-http-01' + DNS_PERSISTENT = 'dns-persistent' # CABF BRs 3.2.2.4.22 DNS TXT Record with Persistent Value + ACME_HTTP_01 = 'acme-http-01' # CABF BRs 3.2.2.4.19 Agreed-Upon Change to Website - ACME ACME_DNS_01 = 'acme-dns-01' # TXT record - ACME_TLS_ALPN_01 = 'acme-tls-alpn-01' + ACME_TLS_ALPN_01 = 'acme-tls-alpn-01' # CABF BRs 3.2.2.4.20 TLS Using ALPN + DNS_ACCOUNT_01 = 'dns-account-01' # CABF BRs 3.2.2.4.21 DNS Labeled with Account ID - ACME TODO not yet implemented CONTACT_EMAIL_CAA = 'contact-email-caa' CONTACT_EMAIL_TXT = 'contact-email-txt' CONTACT_PHONE_CAA = 'contact-phone-caa' diff --git a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py index eac0e50..0289307 100644 --- a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py +++ b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py @@ -1,6 +1,7 @@ import asyncio import time from contextlib import asynccontextmanager +from datetime import datetime import dns.asyncresolver import requests @@ -14,7 +15,7 @@ from aiohttp import ClientError from aiohttp.web import HTTPException -from open_mpic_core import DcvCheckRequest, DcvCheckResponse +from open_mpic_core import DcvCheckRequest, DcvCheckResponse, DcvCheckParameters from open_mpic_core import RedirectResponse, DcvUtils from open_mpic_core import DcvValidationMethod, DnsRecordType from open_mpic_core import MpicValidationError, ErrorMessages @@ -25,6 +26,12 @@ logger = get_logger(__name__) +class ExpectedDnsRecordContent: + expected_value: str | None = None, + possible_values: list[str] | None = None, + expected_parameters: dict[str, str] | None = None + + # noinspection PyUnusedLocal class MpicDcvChecker: WELL_KNOWN_PKI_PATH = ".well-known/pki-validation" @@ -92,11 +99,10 @@ async def check_dcv(self, dcv_request: DcvCheckRequest) -> DcvCheckResponse: result = await self.perform_http_based_validation(dcv_request) case DcvValidationMethod.ACME_TLS_ALPN_01: result = await self.acme_tls_alpn_validator.perform_tls_alpn_validation(dcv_request) - case _: # ACME_DNS_01 | DNS_CHANGE | IP_LOOKUP | CONTACT_EMAIL | CONTACT_PHONE | REVERSE_ADDRESS_LOOKUP + case _: # all DNS based methods result = await self.perform_general_dns_validation(dcv_request) # noinspection PyUnresolvedReferences - self.logger.trace( "Completed DCV for %s with method %s. Trace ID: %s", dcv_request.domain_or_ip_target, @@ -119,12 +125,23 @@ async def perform_general_dns_validation(self, request: DcvCheckRequest) -> DcvC if validation_method == DcvValidationMethod.ACME_DNS_01: expected_dns_record_content = check_parameters.key_authorization_hash + elif validation_method == DcvValidationMethod.DNS_PERSISTENT: + expected_dns_record_content = None # Validated via issuer_domains and account_uri else: expected_dns_record_content = check_parameters.challenge_value - if validation_method == DcvValidationMethod.DNS_CHANGE: + new_expected_dns_record_content = MpicDcvChecker.build_expected_dns_record_content( + validation_method, check_parameters + ) + + if validation_method == DcvValidationMethod.DNS_CHANGE: # DNS_CHANGE may allow for non-exact match exact_match = check_parameters.require_exact_match + # Extract persistent validation parameters if needed + persistent_params = None + if validation_method == DcvValidationMethod.DNS_PERSISTENT: + persistent_params = (check_parameters.issuer_domain_names, check_parameters.expected_account_uri) + dcv_check_response = DcvUtils.create_empty_check_response(validation_method) try: @@ -134,7 +151,7 @@ async def perform_general_dns_validation(self, request: DcvCheckRequest) -> DcvC ): lookup = await self.perform_dns_resolution(name_to_resolve, validation_method, dns_record_type) MpicDcvChecker.evaluate_dns_lookup_response( - dcv_check_response, lookup, validation_method, dns_record_type, expected_dns_record_content, exact_match + dcv_check_response, lookup, validation_method, dns_record_type, expected_dns_record_content, exact_match, persistent_params, new_expected_dns_record_content ) except dns.exception.DNSException as e: log_msg = f"DNS lookup error for {name_to_resolve}: {str(e)}. Trace ID: {request.trace_identifier}" @@ -170,7 +187,8 @@ async def perform_dns_resolution(self, name_to_resolve, validation_method, dns_r except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN): domain = domain.parent() else: - lookup = await self.resolver.resolve(qname=name_to_resolve, rdtype=dns_rdata_type) + domain = dns.name.from_text(name_to_resolve) # to ensure trailing dot is added + lookup = await self.resolver.resolve(qname=domain, rdtype=dns_rdata_type) return lookup @staticmethod @@ -314,9 +332,11 @@ def evaluate_dns_lookup_response( dns_response: dns.resolver.Answer, validation_method: DcvValidationMethod, dns_record_type: DnsRecordType, - expected_dns_record_content: str, + expected_dns_record_content: str | None, exact_match: bool = True, - ): + persistent_params: tuple[list[str], str] | None = None, + new_expected_dns_record_content: ExpectedDnsRecordContent | None = None, + ) -> None: if dns_response is None: dcv_check_response.check_passed = False dcv_check_response.check_completed = True @@ -356,24 +376,38 @@ def evaluate_dns_lookup_response( dcv_check_response.details.cname_chain = cname_chain_str dcv_check_response.details.found_at = dns_response.qname.to_text(omit_final_dot=True) - # Case-insensitive comparison for all validation methods except ACME and IP Address - if validation_method not in (DcvValidationMethod.ACME_DNS_01, DcvValidationMethod.IP_ADDRESS): + # Convert expected and actual to lowercase for case-insensitive comparison for all applicable validation methods + if validation_method not in ( + DcvValidationMethod.ACME_DNS_01, + DcvValidationMethod.IP_ADDRESS, + DcvValidationMethod.DNS_PERSISTENT + ): expected_dns_record_content = expected_dns_record_content.lower() records_as_strings = [record.lower() for record in records_as_strings] - # exact_match=True requires at least one record matches and will fail even if whitespace is different. - # exact_match=False simply runs a contains check. - if exact_match: - if validation_method == DcvValidationMethod.IP_ADDRESS: - dcv_check_response.check_passed = MpicDcvChecker.is_expected_ip_address_in_response( - expected_dns_record_content, records_as_strings - ) - else: - dcv_check_response.check_passed = expected_dns_record_content in records_as_strings - else: - dcv_check_response.check_passed = any( - expected_dns_record_content in record for record in records_as_strings + # Some validation methods have special evaluation logic + if validation_method == DcvValidationMethod.IP_ADDRESS: + dcv_check_response.check_passed = MpicDcvChecker.is_expected_ip_address_in_response( + expected_dns_record_content, records_as_strings + ) + elif validation_method == DcvValidationMethod.DNS_PERSISTENT: + # FIXME refactor to abstract more of this into its own method + if persistent_params is None: + dcv_check_response.check_passed = False + return + issuer_domains, account_uri = persistent_params + dcv_check_response.check_passed = MpicDcvChecker.evaluate_persistent_dns_response( + records_as_strings, issuer_domains, account_uri ) + else: + # exact_match=True requires at least one record matches and will fail even if whitespace is different. + # exact_match=False simply runs a contains check. + if exact_match: + dcv_check_response.check_passed = expected_dns_record_content in records_as_strings + else: + dcv_check_response.check_passed = any( + expected_dns_record_content in record for record in records_as_strings + ) dcv_check_response.check_completed = True @staticmethod @@ -397,6 +431,90 @@ def is_expected_ip_address_in_response(ip_address_as_string: str, records_as_str continue return ip_address_found + @staticmethod + def evaluate_persistent_dns_response( + records_as_strings: list[str], + issuer_domain_names: list[str], + expected_account_uri: str + ) -> bool: + """ + Evaluate DNS TXT records for persistent validation per CA/Browser Forum requirements. + Expected format follows RFC 8659 CAA issue-value syntax: + "issuer-domain-name; accounturi=; persistUntil=" + """ + if not issuer_domain_names or not expected_account_uri or not records_as_strings: + return False + + issuer_domain_names = [domain.lower() for domain in issuer_domain_names] + expected_account_uri = expected_account_uri.lower() + + for txt_record in records_as_strings: + # Split on semicolon and strip whitespace from each part + parts = [part.strip() for part in txt_record.split(';')] + + if len(parts) < 2: # Need at least issuer-domain-name and accounturi + continue + + # First part: issuer-domain-name validation + issuer_domain = parts[0].lower() + # Validate domain format (each label must match CAA domain label regex) + domain_labels = issuer_domain.split(".") + domain_label_match_regex = r"^[a-zA-Z0-9]+(-*[a-zA-Z0-9]+)*$" + is_valid_domain = all(re.match(domain_label_match_regex, label) for label in domain_labels) + if not is_valid_domain: + continue + if issuer_domain not in issuer_domain_names: + continue + + # Look for required accounturi parameter + account_uri_found = False + persist_until_valid = True # Assume valid unless proven otherwise + + for part in parts[1:]: + if '=' in part: + param_name, param_value = part.split('=', 1) + param_name = param_name.lower().strip() + param_value = param_value.strip() + + if param_name == 'accounturi': + if param_value.lower() == expected_account_uri: + account_uri_found = True + elif param_name == 'persistuntil': # TODO does MPIC need to evaluate this? + try: + persist_until_timestamp = int(param_value) + persist_until_datetime = datetime.fromtimestamp(persist_until_timestamp) + current_time = datetime.now() + + if current_time >= persist_until_datetime: + persist_until_valid = False + break # Record has expired + except (ValueError, OSError): + persist_until_valid = False + break # Invalid timestamp format + # Additional parameters are ignored per CA/Browser Forum spec + + # Record is valid if issuer matches, account URI matches, and not expired + if account_uri_found and persist_until_valid: + return True + + return False + + @staticmethod + def build_expected_dns_record_content( + validation_method: DcvValidationMethod, + check_parameters, + ) -> ExpectedDnsRecordContent: + expected_dns_record_content: ExpectedDnsRecordContent = ExpectedDnsRecordContent() + if validation_method == DcvValidationMethod.ACME_DNS_01: + expected_dns_record_content.expected_value = check_parameters.key_authorization_hash + elif validation_method == DcvValidationMethod.DNS_PERSISTENT: + expected_dns_record_content.expected_value = None # Validated via issuer_domains and account_uri + expected_dns_record_content.possible_values = check_parameters.issuer_domain_names + expected_dns_record_content.expected_parameters = {"accounturi": check_parameters.expected_account_uri} + else: + expected_dns_record_content.expected_value = check_parameters.challenge_value + return expected_dns_record_content + # noinspection PyUnresolvedReferences @staticmethod def extract_value_from_record(record: dns.rdata.Rdata) -> str: diff --git a/tests/unit/open_mpic_core/test_check_request_parameters.py b/tests/unit/open_mpic_core/test_check_request_parameters.py index 0e7cf8a..5f61b30 100644 --- a/tests/unit/open_mpic_core/test_check_request_parameters.py +++ b/tests/unit/open_mpic_core/test_check_request_parameters.py @@ -5,6 +5,7 @@ DcvAcmeHttp01ValidationParameters, DcvWebsiteChangeValidationParameters, DcvDnsChangeValidationParameters, + DcvDnsPersistentValidationParameters, DcvAcmeDns01ValidationParameters, DcvContactPhoneTxtValidationParameters, DcvContactEmailCaaValidationParameters, @@ -24,6 +25,8 @@ class TestCheckRequestDetails: DcvDnsChangeValidationParameters), ('{"validation_method": "dns-change", "dns_record_type": "CNAME", "challenge_value": "test-cv"}', DcvDnsChangeValidationParameters), + ('{"validation_method": "dns-persistent", "issuer_domain_names": ["authority.example"], "expected_account_uri": "https://authority.example/acct/123"}', + DcvDnsPersistentValidationParameters), ('{"validation_method": "acme-http-01", "token": "test-t", "key_authorization": "test-ka"}', DcvAcmeHttp01ValidationParameters), ('{"validation_method": "acme-dns-01", "key_authorization_hash": "test-ka"}', @@ -57,6 +60,10 @@ def check_request_parameters__should_automatically_deserialize_into_correct_obje "should fail validation when DNS record type is invalid for Contact Phone"), ('{"validation_method": "ip-address", "dns_record_type": "TXT", "challenge_value": "test-cv"}', "should fail validation when DNS record type is invalid like TXT for IP Address"), + ('{"validation_method": "dns-persistent", "expected_account_uri": "https://authority.example/acct/123"}', + "should fail validation when required issuer_domain_names is missing for DNS Persistent"), + ('{"validation_method": "dns-persistent", "issuer_domain_names": ["authority.example"]}', + "should fail validation when required expected_account_uri is missing for DNS Persistent"), ]) # fmt: on def check_request_parameters__should_fail_validation_when_serialized_object_is_malformed( diff --git a/tests/unit/open_mpic_core/test_mpic_caa_request.py b/tests/unit/open_mpic_core/test_mpic_caa_request.py index 2918228..35cfc91 100644 --- a/tests/unit/open_mpic_core/test_mpic_caa_request.py +++ b/tests/unit/open_mpic_core/test_mpic_caa_request.py @@ -15,7 +15,7 @@ class TestMpicCaaRequest: def model_validate_json__should_return_caa_mpic_request_given_valid_caa_json(self): request = ValidMpicRequestCreator.create_valid_caa_mpic_request() - mpic_request = MpicCaaRequest.model_validate_json(json.dumps(request.model_dump())) + mpic_request = MpicCaaRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert mpic_request.domain_or_ip_target == request.domain_or_ip_target def mpic_caa_request__should_require_domain_or_ip_target(self): @@ -23,7 +23,7 @@ def mpic_caa_request__should_require_domain_or_ip_target(self): # noinspection PyTypeChecker request.domain_or_ip_target = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicCaaRequest.model_validate_json(json.dumps(request.model_dump())) + MpicCaaRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "domain_or_ip_target" in str(validation_error.value) @pytest.mark.parametrize("certificate_type", ["invalid"]) diff --git a/tests/unit/open_mpic_core/test_mpic_coordinator.py b/tests/unit/open_mpic_core/test_mpic_coordinator.py index 2925593..e6488c7 100644 --- a/tests/unit/open_mpic_core/test_mpic_coordinator.py +++ b/tests/unit/open_mpic_core/test_mpic_coordinator.py @@ -569,7 +569,7 @@ async def coordinate_mpic__should_be_able_to_trace_timing_of_remote_perspective_ log_contents = self.log_output.getvalue() assert all(text in log_contents for text in ["seconds", "TRACE", mpic_coordinator.logger.name]) - async def coordinate_mpi__should_not_log_trace_timings_if_trace_level_logging_is_not_enabled(self): + async def coordinate_mpic__should_not_log_trace_timings_if_trace_level_logging_is_not_enabled(self): mpic_request = ValidMpicRequestCreator.create_valid_caa_mpic_request() mpic_coordinator_config = self.create_mpic_coordinator_configuration() mocked_call_perspective_function = AsyncMock() @@ -600,6 +600,28 @@ async def coordinate_mpic__should_set_mpic_completed_true_if_enough_perspectives assert mpic_response.is_valid is should_complete_mpic assert mpic_response.mpic_completed is should_complete_mpic + # TODO do we need this here? or just in dcv checker tests? this probably doesn't test anything yet untested + async def coordinate_mpic__should_successfully_handle_dns_persistent_dcv_method(self): + mpic_request = ValidMpicRequestCreator.create_valid_dcv_mpic_request( + validation_method=DcvValidationMethod.DNS_PERSISTENT + ) + mpic_coordinator_config = self.create_mpic_coordinator_configuration() + mocked_call_remote_perspective_function = AsyncMock() + mocked_call_remote_perspective_function.side_effect = TestMpicCoordinator.SideEffectForMockedPayloads( + self.create_passing_dcv_check_response + ) + mpic_coordinator = MpicCoordinator(mocked_call_remote_perspective_function, mpic_coordinator_config) + mpic_response = await mpic_coordinator.coordinate_mpic(mpic_request) + + # Verify the response is valid + assert mpic_response.is_valid is True + # Verify DNS_PERSISTENT method was used + assert mpic_request.dcv_check_parameters.validation_method == DcvValidationMethod.DNS_PERSISTENT + # Verify required DNS_PERSISTENT parameters are present + assert hasattr(mpic_request.dcv_check_parameters, "issuer_domain_names") + assert hasattr(mpic_request.dcv_check_parameters, "expected_account_uri") + assert len(mpic_request.dcv_check_parameters.issuer_domain_names) > 0 + @staticmethod def create_mpic_coordinator_configuration() -> MpicCoordinatorConfiguration: target_perspectives = [ @@ -629,6 +651,18 @@ def create_passing_caa_check_response( details=CaaCheckResponseDetails(caa_record_present=False), ) + # noinspection PyUnusedLocal + def create_passing_dcv_check_response( + self, perspective: RemotePerspective, check_type: CheckType, check_request + ): + from open_mpic_core import DcvCheckResponse, DcvCheckResponseDetailsBuilder + validation_method = check_request.dcv_check_parameters.validation_method + return DcvCheckResponse( + check_completed=True, + check_passed=True, + details=DcvCheckResponseDetailsBuilder.build_response_details(validation_method), + ) + # noinspection PyUnusedLocal def create_failing_remote_caa_check_response( self, perspective: RemotePerspective, check_type: CheckType, check_request_serialized: str diff --git a/tests/unit/open_mpic_core/test_mpic_dcv_checker.py b/tests/unit/open_mpic_core/test_mpic_dcv_checker.py index 97b2258..62d2b03 100644 --- a/tests/unit/open_mpic_core/test_mpic_dcv_checker.py +++ b/tests/unit/open_mpic_core/test_mpic_dcv_checker.py @@ -1,6 +1,8 @@ import asyncio import base64 import logging +import time + import dns import random import dns.rdatatype @@ -88,6 +90,7 @@ def mpic_dcv_checker__should_be_able_to_log_at_trace_level(self): (DcvValidationMethod.DNS_CHANGE, DnsRecordType.TXT), (DcvValidationMethod.DNS_CHANGE, DnsRecordType.CNAME), (DcvValidationMethod.DNS_CHANGE, DnsRecordType.CAA), + (DcvValidationMethod.DNS_PERSISTENT, None), (DcvValidationMethod.CONTACT_EMAIL_TXT, None), (DcvValidationMethod.CONTACT_EMAIL_CAA, None), (DcvValidationMethod.CONTACT_PHONE_TXT, None), @@ -120,6 +123,7 @@ async def check_dcv__should_perform_appropriate_check_and_allow_issuance_given_t (DcvValidationMethod.DNS_CHANGE, DnsRecordType.TXT, True), (DcvValidationMethod.DNS_CHANGE, DnsRecordType.CNAME, True), (DcvValidationMethod.DNS_CHANGE, DnsRecordType.CAA, True), + # (DcvValidationMethod.DNS_PERSISTENT, None, True), # Skipped: no challenge_value (DcvValidationMethod.CONTACT_EMAIL_TXT, None, True), (DcvValidationMethod.CONTACT_EMAIL_CAA, None, True), (DcvValidationMethod.CONTACT_PHONE_TXT, None, True), @@ -134,6 +138,9 @@ async def check_dcv__should_perform_appropriate_check_and_allow_issuance_given_t async def check_dcv__should_be_case_insensitive_for_challenge_values_for_all_validation_methods_except_acme( self, dcv_method, record_type, is_case_insensitive, mocker ): + if dcv_method == DcvValidationMethod.DNS_PERSISTENT: + pytest.skip("DNS_PERSISTENT does not use challenge_value for case sensitivity test") + dcv_request = ValidCheckCreator.create_valid_dcv_check_request(dcv_method, record_type) if dcv_method in (DcvValidationMethod.CONTACT_PHONE_TXT, DcvValidationMethod.CONTACT_PHONE_CAA): # technically this should be case-insensitive, but also it would usually have digits... @@ -205,7 +212,7 @@ async def check_dcv__should_handle_domains_with_non_ascii_characters( dcv_request.domain_or_ip_target = encoded_domain # do this first for mocking self._mock_request_specific_dns_resolve_call(dcv_request, mocker) - dcv_request.domain_or_ip_target = domain # set to original to see if the mock triggers as expected + dcv_request.domain_or_ip_target = domain # set to original (mock expects punycode; testing if that happens) dcv_response = await self.dcv_checker.check_dcv(dcv_request) assert dcv_response.check_passed is True @@ -583,40 +590,34 @@ async def dns_validation__should_use_dns_name_prefix_if_provided(self, dns_name_ dcv_response = await self.dcv_checker.perform_general_dns_validation(dcv_request) assert dcv_response.check_passed is True if dns_name_prefix is not None and len(dns_name_prefix) > 0: - mock_dns_resolver_resolve.assert_called_once_with( - qname=f"{dns_name_prefix}.{dcv_request.domain_or_ip_target}", rdtype=dns.rdatatype.TXT - ) + expected_domain = dns.name.from_text(f"{dns_name_prefix}.{dcv_request.domain_or_ip_target}") else: - mock_dns_resolver_resolve.assert_called_once_with( - qname=dcv_request.domain_or_ip_target, rdtype=dns.rdatatype.TXT - ) + expected_domain = dns.name.from_text(dcv_request.domain_or_ip_target) + mock_dns_resolver_resolve.assert_called_once_with(qname=expected_domain, rdtype=dns.rdatatype.TXT) async def acme_dns_validation__should_auto_insert_acme_challenge_prefix(self, mocker): dcv_request = ValidCheckCreator.create_valid_acme_dns_01_check_request() mock_dns_resolver_resolve = self._mock_request_specific_dns_resolve_call(dcv_request, mocker) dcv_response = await self.dcv_checker.perform_general_dns_validation(dcv_request) assert dcv_response.check_passed is True - mock_dns_resolver_resolve.assert_called_once_with( - qname=f"_acme-challenge.{dcv_request.domain_or_ip_target}", rdtype=dns.rdatatype.TXT - ) + expected_domain = dns.name.from_text(f"_acme-challenge.{dcv_request.domain_or_ip_target}") + mock_dns_resolver_resolve.assert_called_once_with(qname=expected_domain, rdtype=dns.rdatatype.TXT) async def contact_email_txt_lookup__should_auto_insert_validation_prefix(self, mocker): dcv_request = ValidCheckCreator.create_valid_contact_check_request(DcvValidationMethod.CONTACT_EMAIL_TXT) mock_dns_resolver_resolve = self._mock_request_specific_dns_resolve_call(dcv_request, mocker) dcv_response = await self.dcv_checker.perform_general_dns_validation(dcv_request) assert dcv_response.check_passed is True - mock_dns_resolver_resolve.assert_called_once_with( - qname=f"_validation-contactemail.{dcv_request.domain_or_ip_target}", rdtype=dns.rdatatype.TXT - ) + expected_domain = dns.name.from_text(f"_validation-contactemail.{dcv_request.domain_or_ip_target}") + mock_dns_resolver_resolve.assert_called_once_with(qname=expected_domain, rdtype=dns.rdatatype.TXT) async def contact_phone_txt_lookup__should_auto_insert_validation_prefix(self, mocker): dcv_request = ValidCheckCreator.create_valid_contact_check_request(DcvValidationMethod.CONTACT_PHONE_TXT) mock_dns_resolver_resolve = self._mock_request_specific_dns_resolve_call(dcv_request, mocker) dcv_response = await self.dcv_checker.perform_general_dns_validation(dcv_request) assert dcv_response.check_passed is True - mock_dns_resolver_resolve.assert_called_once_with( - qname=f"_validation-contactphone.{dcv_request.domain_or_ip_target}", rdtype=dns.rdatatype.TXT - ) + expected_domain = dns.name.from_text(f"_validation-contactphone.{dcv_request.domain_or_ip_target}") + mock_dns_resolver_resolve.assert_called_once_with(qname=expected_domain, rdtype=dns.rdatatype.TXT) # fmt: off @pytest.mark.parametrize("dcv_method, tag, expected_result", [ @@ -897,8 +898,11 @@ def _mock_request_specific_dns_resolve_call(self, dcv_request: DcvCheckRequest, expected_domain = f"_validation-contactphone.{dcv_request.domain_or_ip_target}" case DcvValidationMethod.CONTACT_EMAIL_TXT: expected_domain = f"_validation-contactemail.{dcv_request.domain_or_ip_target}" - case DcvValidationMethod.CONTACT_PHONE_CAA | DcvValidationMethod.CONTACT_EMAIL_CAA: - expected_domain = dns.name.from_text(expected_domain) # CAA -- using dns names instead of strings + case DcvValidationMethod.DNS_PERSISTENT: + expected_domain = f"_validation-persist.{dcv_request.domain_or_ip_target}" + + # expecting a dns name instead of string from the DCV checker (avoiding use of search directive in resolv.conf) + expected_domain = dns.name.from_text(expected_domain) test_dns_query_answer = self._create_basic_dns_response_for_mock(dcv_request, mocker) # noinspection PyUnusedLocal @@ -971,6 +975,12 @@ def _create_basic_dns_response_for_mock(self, dcv_request: DcvCheckRequest, mock record_data = {"flags": "", "tag": "issue", "value": check_parameters.challenge_value} else: record_data = {"value": check_parameters.challenge_value} + case DcvValidationMethod.DNS_PERSISTENT: + issuer_domain = check_parameters.issuer_domain_names[0] + account_uri = check_parameters.expected_account_uri + persist_until = int(time.time()) + 365*24*60*60 # 1 year from now + persistent_value = f"{issuer_domain}; accounturi={account_uri}; persistUntil={persist_until}" + record_data = {"value": persistent_value} case DcvValidationMethod.CONTACT_EMAIL_CAA: record_data = {"flags": "", "tag": "contactemail", "value": check_parameters.challenge_value} case DcvValidationMethod.CONTACT_PHONE_CAA: @@ -1093,5 +1103,146 @@ def shuffle_case(string_to_shuffle: str) -> str: return result +class TestDnsPersistentValidation: + """Tests specifically for DNS_PERSISTENT validation method""" + + @pytest.fixture(autouse=True) + def setup_dcv_checker(self): + self.dcv_checker = MpicDcvChecker() + yield self.dcv_checker + + @pytest.mark.parametrize("record_content, expected_result, description", [ + ("authority.example; accounturi=https://authority.example/acct/123; persistUntil=1893456000", True, + "Valid record with future persistUntil"), + ("authority.example; accounturi=https://authority.example/acct/123", True, + "Valid record without persistUntil (optional parameter)"), + ("AUTHORITY.EXAMPLE; ACCOUNTURI=HTTPS://AUTHORITY.EXAMPLE/ACCT/123", True, + "Valid record with uppercase (case insensitive)"), + ("authority.example; accounturi=https://authority.example/acct/123; customParam=value; persistUntil=1893456000", True, + "Valid record with additional unknown parameters (should be ignored)"), + ("wrong.issuer; accounturi=https://authority.example/acct/123; persistUntil=1893456000", False, + "Wrong issuer domain"), + ("authority.example; accounturi=https://wrong.example/acct/456; persistUntil=1893456000", False, + "Wrong account URI"), + ("authority.example; accounturi=https://authority.example/acct/123; persistUntil=1735689600", False, + "Expired persistUntil (past timestamp)"), + ("authority.example; accounturi=https://authority.example/acct/123; persistUntil=invalid", False, + "Invalid timestamp format"), + ("authority.example; wrongparam=value", False, + "Missing required accounturi parameter"), + ("invalid-domain; accounturi=https://authority.example/acct/123; persistUntil=1893456000", False, + "Invalid issuer domain format"), + ]) + def test_evaluate_persistent_dns_response(self, record_content, expected_result, description): + """Test the persistent DNS validation logic with various record formats""" + from open_mpic_core.mpic_dcv_checker.mpic_dcv_checker import MpicDcvChecker + + issuer_domain_names = ["authority.example"] + expected_account_uri = "https://authority.example/acct/123" + records = [record_content] if record_content else [] + + result = MpicDcvChecker.evaluate_persistent_dns_response(records, issuer_domain_names, expected_account_uri) + assert result == expected_result, f"Test failed: {description}" + + def test_evaluate_persistent_dns_response_multiple_records(self): + """Test that validation passes if any record in the list is valid""" + from open_mpic_core.mpic_dcv_checker.mpic_dcv_checker import MpicDcvChecker + + issuer_domain_names = ["authority.example"] + expected_account_uri = "https://authority.example/acct/123" + records = [ + "wrong.issuer; accounturi=https://authority.example/acct/123; persistUntil=1893456000", # Invalid + "authority.example; accounturi=https://wrong.example/acct/456; persistUntil=1893456000", # Invalid + "authority.example; accounturi=https://authority.example/acct/123; persistUntil=1893456000", # Valid + ] + + result = MpicDcvChecker.evaluate_persistent_dns_response(records, issuer_domain_names, expected_account_uri) + assert result is True, "Should pass if any record is valid" + + def test_evaluate_persistent_dns_response_multiple_issuers(self): + """Test validation with multiple allowed issuer domains""" + from open_mpic_core.mpic_dcv_checker.mpic_dcv_checker import MpicDcvChecker + + issuer_domain_names = ["authority1.example", "authority2.example"] + expected_account_uri = "https://authority2.example/acct/456" + record = "authority2.example; accounturi=https://authority2.example/acct/456; persistUntil=1893456000" + + result = MpicDcvChecker.evaluate_persistent_dns_response([record], issuer_domain_names, expected_account_uri) + assert result is True, "Should pass with second allowed issuer domain" + + def test_evaluate_persistent_dns_response_edge_cases(self): + """Test edge cases for persistent DNS validation""" + from open_mpic_core.mpic_dcv_checker.mpic_dcv_checker import MpicDcvChecker + + issuer_domain_names = ["authority.example"] + expected_account_uri = "https://authority.example/acct/123" + + # Test empty issuer_domain_names + result = MpicDcvChecker.evaluate_persistent_dns_response(["valid.record"], [], expected_account_uri) + assert result is False, "Should fail with empty issuer_domain_names" + + # Test empty account URI + result = MpicDcvChecker.evaluate_persistent_dns_response(["valid.record"], issuer_domain_names, "") + assert result is False, "Should fail with empty account_uri" + + # Test empty records + result = MpicDcvChecker.evaluate_persistent_dns_response([], issuer_domain_names, expected_account_uri) + assert result is False, "Should fail with empty records list" + + # Test malformed records + malformed_records = [ + ";;;", # Only separators + "authority.example;", # Incomplete + "authority.example; =value", # Missing parameter name + "authority.example; accounturi", # Missing value + ] + + for record in malformed_records: + result = MpicDcvChecker.evaluate_persistent_dns_response([record], issuer_domain_names, expected_account_uri) + assert result is False, f"Should fail with malformed record: {record}" + + async def test_dns_persistent_integration(self, mocker): + """Integration test for DNS_PERSISTENT validation through the main check_dcv method""" + dcv_request = ValidCheckCreator.create_valid_dns_persistent_check_request() + + # Mock DNS resolution to return a valid persistent record + issuer_domain = dcv_request.dcv_check_parameters.issuer_domain_names[0] + account_uri = dcv_request.dcv_check_parameters.expected_account_uri + persistent_value = f"{issuer_domain}; accounturi={account_uri}; persistUntil=1893456000" + + # Mock DNS response + from unit.test_util.mock_dns_object_creator import MockDnsObjectCreator + + record_data = {"value": persistent_value} + test_dns_query_answer = MockDnsObjectCreator.create_dns_query_answer( + dcv_request.domain_or_ip_target, + "_validation-persist", + dcv_request.dcv_check_parameters.dns_record_type, + record_data, + mocker + ) + + # Mock the DNS resolver + async def side_effect(qname, rdtype): + if qname == f"_validation-persist.{dcv_request.domain_or_ip_target}": + return test_dns_query_answer + raise dns.resolver.NoAnswer + + mocker.patch.object( + self.dcv_checker.resolver, + 'resolve', + side_effect=side_effect + ) + + # Execute the check + dcv_response = await self.dcv_checker.check_dcv(dcv_request) + + # Verify the response + assert dcv_response.check_passed is True, "DNS persistent validation should pass" + assert dcv_response.check_completed is True, "Check should be completed" + assert dcv_response.errors is None or len(dcv_response.errors) == 0, "Should have no errors" + assert dcv_response.details.records_seen == [persistent_value], "Should see the persistent record" + + if __name__ == "__main__": pytest.main() diff --git a/tests/unit/open_mpic_core/test_mpic_dcv_request.py b/tests/unit/open_mpic_core/test_mpic_dcv_request.py index 4a50b70..34cf81b 100644 --- a/tests/unit/open_mpic_core/test_mpic_dcv_request.py +++ b/tests/unit/open_mpic_core/test_mpic_dcv_request.py @@ -15,7 +15,7 @@ class TestMpicDcvRequest: def model_validate_json__should_return_dcv_mpic_request_given_valid_dcv_json(self): request = ValidMpicRequestCreator.create_valid_dcv_mpic_request() - mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert mpic_request.domain_or_ip_target == request.domain_or_ip_target def mpic_dcv_request__should_require_dcv_check_parameters(self): @@ -23,14 +23,14 @@ def mpic_dcv_request__should_require_dcv_check_parameters(self): # noinspection PyTypeChecker request.dcv_check_parameters = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "dcv_check_parameters" in str(validation_error.value) def mpic_dcv_request__should_require_validation_method_in_check_parameters(self): request = ValidMpicRequestCreator.create_valid_dcv_mpic_request() request.dcv_check_parameters.validation_method = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "validation_method" in str(validation_error.value) def mpic_dcv_request__should_require_valid_validation_method_in_check_parameters(self): @@ -45,7 +45,7 @@ def mpic_dcv_request__should_require_challenge_value_in_check_parameters(self): # noinspection PyTypeChecker request.dcv_check_parameters.challenge_value = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "challenge_value" in str(validation_error.value) def mpic_dcv_request__should_require_dns_record_type_for_dns_change_validation(self): @@ -53,7 +53,7 @@ def mpic_dcv_request__should_require_dns_record_type_for_dns_change_validation(s # noinspection PyTypeChecker request.dcv_check_parameters.dns_record_type = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "dns_record_type" in str(validation_error.value) def mpic_dcv_request__should_require_valid_dns_record_type_for_dns_change_validation(self): @@ -70,7 +70,7 @@ def mpic_dcv_request__should_require_http_token_path_for_website_change_validati # noinspection PyTypeChecker request.dcv_check_parameters.http_token_path = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "http_token_path" in str(validation_error.value) def mpic_dcv_request__should_require_token_for_acme_http_01_validation(self): @@ -78,7 +78,7 @@ def mpic_dcv_request__should_require_token_for_acme_http_01_validation(self): # noinspection PyTypeChecker request.dcv_check_parameters.token = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "token" in str(validation_error.value) @pytest.mark.parametrize("validation_method", [DcvValidationMethod.ACME_HTTP_01, DcvValidationMethod.ACME_DNS_01]) @@ -90,12 +90,12 @@ def mpic_dcv_request__should_require_key_authorization_for_acme_validations(self else: request.dcv_check_parameters.key_authorization_hash = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "key_authorization" in str(validation_error.value) def mpic_dcv_request__should_have_check_type_set_to_dcv(self): request = ValidMpicRequestCreator.create_valid_dcv_mpic_request() - mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert mpic_request.check_type == CheckType.DCV def mpic_dcv_request__should_default_to_http_scheme_for_website_change_validation_given_no_scheme_specified(self): @@ -105,7 +105,7 @@ def mpic_dcv_request__should_default_to_http_scheme_for_website_change_validatio challenge_value="test", http_token_path="example-path", ) - mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert mpic_request.dcv_check_parameters.url_scheme == UrlScheme.HTTP @pytest.mark.parametrize( @@ -121,7 +121,7 @@ def mpic_dcv_request__should_enforce_domain_prefix_for_contact_lookup_for_txt_re request = ValidMpicRequestCreator.create_valid_dcv_mpic_request(validation_method) request.dcv_check_parameters.dns_name_prefix = "moo" with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert expected_prefix in str(validation_error.value) diff --git a/tests/unit/test_util/valid_check_creator.py b/tests/unit/test_util/valid_check_creator.py index 7f1dee5..0ed154c 100644 --- a/tests/unit/test_util/valid_check_creator.py +++ b/tests/unit/test_util/valid_check_creator.py @@ -3,6 +3,7 @@ from open_mpic_core import ( DcvWebsiteChangeValidationParameters, DcvDnsChangeValidationParameters, + DcvDnsPersistentValidationParameters, CaaCheckParameters, DcvAcmeHttp01ValidationParameters, DcvAcmeDns01ValidationParameters, @@ -72,6 +73,16 @@ def create_valid_contact_check_request(validation_method: DcvValidationMethod) - # check_request.dcv_check_parameters.require_exact_match = True return check_request + @staticmethod + def create_valid_dns_persistent_check_request() -> DcvCheckRequest: + return DcvCheckRequest( + domain_or_ip_target="example.com", + dcv_check_parameters=DcvDnsPersistentValidationParameters( + issuer_domain_names=["authority.example"], + expected_account_uri="https://authority.example/acct/123" + ), + ) + @staticmethod def create_valid_ip_lookup_check_request(record_type=DnsRecordType.A) -> DcvCheckRequest: check_request = DcvCheckRequest( @@ -127,6 +138,8 @@ def create_valid_dcv_check_request(validation_method: DcvValidationMethod, recor if record_type is None: record_type = DnsRecordType.TXT return ValidCheckCreator.create_valid_dns_check_request(record_type) + case DcvValidationMethod.DNS_PERSISTENT: + return ValidCheckCreator.create_valid_dns_persistent_check_request() case DcvValidationMethod.ACME_HTTP_01: return ValidCheckCreator.create_valid_acme_http_01_check_request() case DcvValidationMethod.ACME_DNS_01: diff --git a/tests/unit/test_util/valid_mpic_request_creator.py b/tests/unit/test_util/valid_mpic_request_creator.py index 328542e..e5f637c 100644 --- a/tests/unit/test_util/valid_mpic_request_creator.py +++ b/tests/unit/test_util/valid_mpic_request_creator.py @@ -1,6 +1,7 @@ from open_mpic_core import ( CaaCheckParameters, DcvDnsChangeValidationParameters, + DcvDnsPersistentValidationParameters, DcvWebsiteChangeValidationParameters, DcvAcmeDns01ValidationParameters, DcvAcmeHttp01ValidationParameters, @@ -66,4 +67,9 @@ def create_check_parameters( check_parameters = DcvContactEmailCaaValidationParameters(challenge_value="test") case DcvValidationMethod.CONTACT_EMAIL_TXT: check_parameters = DcvContactEmailTxtValidationParameters(challenge_value="test") + case DcvValidationMethod.DNS_PERSISTENT: + check_parameters = DcvDnsPersistentValidationParameters( + issuer_domain_names=["authority.example"], + expected_account_uri="https://authority.example/acct/123" + ) return check_parameters From c532bc7f91cec691f3a13687696a09a26a5be086 Mon Sep 17 00:00:00 2001 From: Dmitry Sharkov Date: Mon, 22 Dec 2025 15:04:29 -0500 Subject: [PATCH 2/9] refactored tests and got them actually firing for DCV dns persist method --- pyproject.toml | 2 - .../mpic_dcv_checker/mpic_dcv_checker.py | 162 ++++--- .../open_mpic_core/test_mpic_dcv_checker.py | 410 +++++++++--------- 3 files changed, 290 insertions(+), 284 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5712f14..517eefc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,8 +99,6 @@ markers = [ addopts = [ "--import-mode=prepend", # explicit default, as the tests rely on it for proper import resolution ] -spec_header_format = "Spec for {test_case} ({path}):" -spec_test_format = "{result} {docstring_summary}" # defaults to {name} if docstring is not present in test asyncio_mode = "auto" # defaults to "strict" asyncio_default_fixture_loop_scope = "function" diff --git a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py index 0289307..d5a04a0 100644 --- a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py +++ b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py @@ -1,7 +1,6 @@ import asyncio import time from contextlib import asynccontextmanager -from datetime import datetime import dns.asyncresolver import requests @@ -27,6 +26,13 @@ class ExpectedDnsRecordContent: + def __init__(self, expected_value=None, possible_values=None, expected_parameters=None): + self.expected_value = expected_value + self.possible_values = None + if self.expected_value is None: + self.possible_values = possible_values + self.expected_parameters = expected_parameters + expected_value: str | None = None, possible_values: list[str] | None = None, expected_parameters: dict[str, str] | None = None @@ -123,14 +129,7 @@ async def perform_general_dns_validation(self, request: DcvCheckRequest) -> DcvC else: name_to_resolve = request.domain_or_ip_target - if validation_method == DcvValidationMethod.ACME_DNS_01: - expected_dns_record_content = check_parameters.key_authorization_hash - elif validation_method == DcvValidationMethod.DNS_PERSISTENT: - expected_dns_record_content = None # Validated via issuer_domains and account_uri - else: - expected_dns_record_content = check_parameters.challenge_value - - new_expected_dns_record_content = MpicDcvChecker.build_expected_dns_record_content( + expected_dns_record_content = MpicDcvChecker.build_expected_dns_record_content( validation_method, check_parameters ) @@ -151,7 +150,7 @@ async def perform_general_dns_validation(self, request: DcvCheckRequest) -> DcvC ): lookup = await self.perform_dns_resolution(name_to_resolve, validation_method, dns_record_type) MpicDcvChecker.evaluate_dns_lookup_response( - dcv_check_response, lookup, validation_method, dns_record_type, expected_dns_record_content, exact_match, persistent_params, new_expected_dns_record_content + dcv_check_response, lookup, validation_method, dns_record_type, expected_dns_record_content, exact_match ) except dns.exception.DNSException as e: log_msg = f"DNS lookup error for {name_to_resolve}: {str(e)}. Trace ID: {request.trace_identifier}" @@ -211,14 +210,14 @@ async def perform_http_based_validation(self, request: DcvCheckRequest) -> DcvCh expected_response_content = request.dcv_check_parameters.challenge_value url_scheme = request.dcv_check_parameters.url_scheme token_path = request.dcv_check_parameters.http_token_path - token_url = f"{url_scheme}://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{token_path}" # noqa E501 (http) + token_url = ( + f"{url_scheme}://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{token_path}" # noqa E501 (http) + ) dcv_check_response = DcvUtils.create_empty_check_response(DcvValidationMethod.WEBSITE_CHANGE) else: expected_response_content = request.dcv_check_parameters.key_authorization token = request.dcv_check_parameters.token - token_url = ( - f"http://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" # noqa E501 (http) - ) + token_url = f"http://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" # noqa E501 (http) dcv_check_response = DcvUtils.create_empty_check_response(DcvValidationMethod.ACME_HTTP_01) try: async with self.get_async_http_client() as async_http_client: @@ -332,10 +331,8 @@ def evaluate_dns_lookup_response( dns_response: dns.resolver.Answer, validation_method: DcvValidationMethod, dns_record_type: DnsRecordType, - expected_dns_record_content: str | None, + expected_dns_record_content: ExpectedDnsRecordContent | None, exact_match: bool = True, - persistent_params: tuple[list[str], str] | None = None, - new_expected_dns_record_content: ExpectedDnsRecordContent | None = None, ) -> None: if dns_response is None: dcv_check_response.check_passed = False @@ -376,38 +373,31 @@ def evaluate_dns_lookup_response( dcv_check_response.details.cname_chain = cname_chain_str dcv_check_response.details.found_at = dns_response.qname.to_text(omit_final_dot=True) - # Convert expected and actual to lowercase for case-insensitive comparison for all applicable validation methods - if validation_method not in ( - DcvValidationMethod.ACME_DNS_01, - DcvValidationMethod.IP_ADDRESS, - DcvValidationMethod.DNS_PERSISTENT - ): - expected_dns_record_content = expected_dns_record_content.lower() - records_as_strings = [record.lower() for record in records_as_strings] - - # Some validation methods have special evaluation logic + # handle "special logic" validation methods first if validation_method == DcvValidationMethod.IP_ADDRESS: dcv_check_response.check_passed = MpicDcvChecker.is_expected_ip_address_in_response( - expected_dns_record_content, records_as_strings + expected_dns_record_content.expected_value, records_as_strings ) elif validation_method == DcvValidationMethod.DNS_PERSISTENT: - # FIXME refactor to abstract more of this into its own method - if persistent_params is None: - dcv_check_response.check_passed = False - return - issuer_domains, account_uri = persistent_params dcv_check_response.check_passed = MpicDcvChecker.evaluate_persistent_dns_response( - records_as_strings, issuer_domains, account_uri + expected_dns_record_content, records_as_strings ) else: + if validation_method == DcvValidationMethod.ACME_DNS_01: + expected_dns_value = expected_dns_record_content.expected_value # case-sensitive per ACME spec + else: + expected_dns_value = expected_dns_record_content.expected_value.lower() # all others case-insensitive + records_as_strings = [record.lower() for record in records_as_strings] + # exact_match=True requires at least one record matches and will fail even if whitespace is different. # exact_match=False simply runs a contains check. if exact_match: - dcv_check_response.check_passed = expected_dns_record_content in records_as_strings + dcv_check_response.check_passed = expected_dns_value in records_as_strings else: dcv_check_response.check_passed = any( - expected_dns_record_content in record for record in records_as_strings + expected_dns_value in record for record in records_as_strings ) + dcv_check_response.check_completed = True @staticmethod @@ -433,87 +423,83 @@ def is_expected_ip_address_in_response(ip_address_as_string: str, records_as_str @staticmethod def evaluate_persistent_dns_response( - records_as_strings: list[str], - issuer_domain_names: list[str], - expected_account_uri: str + expected_dns_record_content: ExpectedDnsRecordContent, + records_as_strings: list[str], ) -> bool: """ Evaluate DNS TXT records for persistent validation per CA/Browser Forum requirements. Expected format follows RFC 8659 CAA issue-value syntax: "issuer-domain-name; accounturi=; persistUntil=" + The persistUntil parameter is optional. """ - if not issuer_domain_names or not expected_account_uri or not records_as_strings: - return False - - issuer_domain_names = [domain.lower() for domain in issuer_domain_names] - expected_account_uri = expected_account_uri.lower() + found_valid_record = False + accepted_domain_names = [domain.lower() for domain in expected_dns_record_content.possible_values] + expected_account_uri = expected_dns_record_content.expected_parameters['accounturi'].lower() for txt_record in records_as_strings: - # Split on semicolon and strip whitespace from each part - parts = [part.strip() for part in txt_record.split(';')] + # Split on semicolon (parameter delimiter) and strip whitespace from each part + parts = [part.strip() for part in txt_record.rstrip().split(";")] + if len(parts) < 2: + continue # Need at least issuer-domain-name and one parameter - if len(parts) < 2: # Need at least issuer-domain-name and accounturi - continue + issuer_domain_name = parts[0].lower() + param_list = parts[1:] - # First part: issuer-domain-name validation - issuer_domain = parts[0].lower() - # Validate domain format (each label must match CAA domain label regex) - domain_labels = issuer_domain.split(".") - domain_label_match_regex = r"^[a-zA-Z0-9]+(-*[a-zA-Z0-9]+)*$" - is_valid_domain = all(re.match(domain_label_match_regex, label) for label in domain_labels) - if not is_valid_domain: - continue - if issuer_domain not in issuer_domain_names: + # First check issuer-domain-nsame matches one of the expected values + if issuer_domain_name not in accepted_domain_names: continue # Look for required accounturi parameter - account_uri_found = False - persist_until_valid = True # Assume valid unless proven otherwise - - for part in parts[1:]: - if '=' in part: - param_name, param_value = part.split('=', 1) - param_name = param_name.lower().strip() - param_value = param_value.strip() - - if param_name == 'accounturi': + valid_account_uri = False + within_allowed_time = True # Assume valid unless proven otherwise + + if not (len(param_list) == 1 and param_list[0].strip() == ""): # if actual parameters follow the semicolon + for parameter in param_list: + name_and_value = parameter.split("=", 1) + if len(name_and_value) != 2: + break # malformed parameter; skip to next record + param_name = name_and_value[0].strip().lower() + param_value = name_and_value[1].strip() + + if param_name == "accounturi": if param_value.lower() == expected_account_uri: - account_uri_found = True - elif param_name == 'persistuntil': # TODO does MPIC need to evaluate this? + valid_account_uri = True + else: + break # accounturi does not match; skip to next record + elif param_name == "persistuntil": try: - persist_until_timestamp = int(param_value) - persist_until_datetime = datetime.fromtimestamp(persist_until_timestamp) - current_time = datetime.now() - - if current_time >= persist_until_datetime: - persist_until_valid = False - break # Record has expired + persist_until_in_seconds = int(param_value) # seconds since epoch + current_seconds = int(time.time()) + if persist_until_in_seconds < current_seconds: + within_allowed_time = False + break except (ValueError, OSError): - persist_until_valid = False + within_allowed_time = False break # Invalid timestamp format - # Additional parameters are ignored per CA/Browser Forum spec + # Additional parameters are ignored per CA/Browser Forum spec # Record is valid if issuer matches, account URI matches, and not expired - if account_uri_found and persist_until_valid: - return True + if valid_account_uri and within_allowed_time: + found_valid_record = True - return False + return found_valid_record @staticmethod def build_expected_dns_record_content( validation_method: DcvValidationMethod, check_parameters, ) -> ExpectedDnsRecordContent: - expected_dns_record_content: ExpectedDnsRecordContent = ExpectedDnsRecordContent() if validation_method == DcvValidationMethod.ACME_DNS_01: - expected_dns_record_content.expected_value = check_parameters.key_authorization_hash + expected_content = ExpectedDnsRecordContent(expected_value=check_parameters.key_authorization_hash) elif validation_method == DcvValidationMethod.DNS_PERSISTENT: - expected_dns_record_content.expected_value = None # Validated via issuer_domains and account_uri - expected_dns_record_content.possible_values = check_parameters.issuer_domain_names - expected_dns_record_content.expected_parameters = {"accounturi": check_parameters.expected_account_uri} + expected_content = ExpectedDnsRecordContent( + expected_value=None, # validated via issuer_domains and account_uri + possible_values=check_parameters.issuer_domain_names, + expected_parameters={"accounturi": check_parameters.expected_account_uri}, + ) else: - expected_dns_record_content.expected_value = check_parameters.challenge_value - return expected_dns_record_content + expected_content = ExpectedDnsRecordContent(expected_value=check_parameters.challenge_value) + return expected_content # noinspection PyUnresolvedReferences @staticmethod diff --git a/tests/unit/open_mpic_core/test_mpic_dcv_checker.py b/tests/unit/open_mpic_core/test_mpic_dcv_checker.py index 62d2b03..42ae7f2 100644 --- a/tests/unit/open_mpic_core/test_mpic_dcv_checker.py +++ b/tests/unit/open_mpic_core/test_mpic_dcv_checker.py @@ -28,6 +28,7 @@ from open_mpic_core import DcvTlsAlpnValidator, DcvCheckResponseDetailsBuilder from open_mpic_core import DcvValidationMethod, DnsRecordType from open_mpic_core import MpicValidationError, ErrorMessages, TRACE_LEVEL +from open_mpic_core.mpic_dcv_checker.mpic_dcv_checker import ExpectedDnsRecordContent from unit.test_util.mock_dns_object_creator import MockDnsObjectCreator from unit.test_util.valid_check_creator import ValidCheckCreator @@ -476,6 +477,56 @@ async def http_based_dcv_checks__should_not_pass_on_invalid_redirect_code_or_por dcv_response = await self.dcv_checker.check_dcv(dcv_request) assert dcv_response.check_passed is False + @pytest.mark.parametrize("dcv_method", [DcvValidationMethod.WEBSITE_CHANGE, DcvValidationMethod.ACME_HTTP_01]) + async def http_based_dcv_checks__should_format_ipv6_addresses_with_square_brackets_in_url(self, dcv_method, mocker): + dcv_request = ValidCheckCreator.create_valid_dcv_check_request(dcv_method) + ipv6_address = "2001:db8::1" + dcv_request.domain_or_ip_target = ipv6_address + + if dcv_method == DcvValidationMethod.WEBSITE_CHANGE: + expected_challenge = dcv_request.dcv_check_parameters.challenge_value + url_scheme = dcv_request.dcv_check_parameters.url_scheme + http_token_path = dcv_request.dcv_check_parameters.http_token_path + expected_url = f"{url_scheme}://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}" + else: + expected_challenge = dcv_request.dcv_check_parameters.key_authorization + token = dcv_request.dcv_check_parameters.token + expected_url = f"http://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" + + success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge) + mock_get = mocker.patch( + "aiohttp.ClientSession.get", + side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)), + ) + + dcv_response = await self.dcv_checker.check_dcv(dcv_request) + + # Verify the URL used in the request contains properly formatted IPv6 + assert mock_get.call_args.kwargs["url"] == expected_url + assert dcv_response.details.response_url == expected_url + + async def http_based_dcv_checks__should_not_modify_ipv4_addresses_in_url(self, mocker): + dcv_request = ValidCheckCreator.create_valid_dcv_check_request(DcvValidationMethod.WEBSITE_CHANGE) + ipv4_address = "192.168.1.1" + dcv_request.domain_or_ip_target = ipv4_address + + expected_challenge = dcv_request.dcv_check_parameters.challenge_value + url_scheme = dcv_request.dcv_check_parameters.url_scheme + http_token_path = dcv_request.dcv_check_parameters.http_token_path + expected_url = f"{url_scheme}://{ipv4_address}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}" + + success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge) + mock_get = mocker.patch( + "aiohttp.ClientSession.get", + side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)), + ) + + dcv_response = await self.dcv_checker.check_dcv(dcv_request) + + # Verify the URL used in the request contains IPv4 without modification + assert mock_get.call_args.kwargs["url"] == expected_url + assert dcv_response.details.response_url == expected_url + @pytest.mark.parametrize("url_scheme", ["http", "https"]) async def website_change_validation__should_use_specified_url_scheme(self, url_scheme, mocker): dcv_request = ValidCheckCreator.create_valid_http_check_request() @@ -674,6 +725,170 @@ async def dns_based_dcv_checks__should_not_pass_given_non_matching_dns_record(se dcv_response = await self.dcv_checker.check_dcv(dcv_request) assert dcv_response.check_passed is False + @pytest.mark.parametrize("set_persist_until_parameter", [True, False]) + def evaluate_persistent_dns_response__should_return_true_given_valid_record( + self, set_persist_until_parameter + ): + issuer_domain_name = "ca.example.com" + expected_account_uri = "https://ca.example.com/acct/123" + + if set_persist_until_parameter: + future_timestamp = int(time.time()) + 3600 # 1 hour in the future + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}; persistUntil={future_timestamp}"] + else: + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is True + + def evaluate_persistent_dns_response__should_be_case_insensitive(self): + issuer_domain_name = "cA.EXaMPle.com" + expected_account_uri = "https://cA.EXaMPle.com/acct/123" + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name.lower()], + expected_parameters={"accounturi": expected_account_uri.lower()}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is True + + def evaluate_persistent_dns_response__should_ignore_additional_unknown_parameters(self): + issuer_domain_name = "ca.example.com" + expected_account_uri = "https://ca.example.com/acct/123" + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}; customParam=foo; anotherParam=123"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is True + + def evaluate_persistent_dns_response__should_return_false_given_no_matching_issuer_domain(self): + issuer_domain_name = "nonmatching.example.com" + expected_account_uri = "https://ca.example.com/acct/123" + records = [f"{issuer_domain_name}; accounturi={expected_account_uri};"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=["ca.example.com"], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is False + + def evaluate_persistent_dns_response__should_return_false_given_no_matching_account_uri(self): + issuer_domain_name = "ca.example.com" + expected_account_uri = "https://ca.example.com/acct/foo123" + records = [f"{issuer_domain_name}; accounturi=https://ca.example.com/acct/bar456;"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is False + + def evaluate_persistent_dns_response__should_return_false_given_expired_persist_until(self): + issuer_domain_name = "ca.example.com" + expected_account_uri = "https://ca.example.com/acct/123" + past_timestamp = int(time.time()) - 3600 # 1 hour in the past + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}; persistUntil={past_timestamp}"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is False + + def evaluate_persistent_dns_response__should_return_false_given_missing_account_uri_parameter(self): + issuer_domain_name = "ca.example.com" + records = [f"{issuer_domain_name}; persistUntil={int(time.time())+3600}"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": "https://ca.example.com/acct/123"}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is False + + def evaluate_persistent_dns_response__should_return_false_given_malformed_persist_until_parameter(self): + expected_account_uri = "https://ca.example.com/acct/123" + records = [f"accounturi={expected_account_uri}; persistUntil={int(time.time())+3600}garbage"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=["ca.example.com"], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is False + + def evaluate_persistent_dns_response__should_return_true_given_any_record_in_the_provided_list_is_valid(self): + issuer_domain_names = ["ca.example.com", "ca1.example.com"] + expected_account_uri = "https://ca.example.com/acct/123" + time_now = int(time.time()) + records = [ + f"bad.example.com; accounturi=https://ca.example.com/acct/123; persistUntil={time_now + 3600}", + f"ca.example.com; accounturi=https://bad.example.com/acct/456; persistUntil={time_now + 3600}", + f"ca1.example.com; accounturi=https://ca.example.com/acct/123; persistUntil={time_now - 10}", + f"ca.example.com; accounturi=https://ca.example.com/acct/123; persistUntil={time_now + 3600}", # Valid + ] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=issuer_domain_names, + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is True, "Should pass if any record is valid" + + def evaluate_persistent_dns_response__should_accept_match_for_any_issuer_in_the_provided_list(self): + issuer_domain_names = ["ca.example.com", "alt.example.com"] + expected_account_uri = "https://ca.example.com/acct/123" + time_now = int(time.time()) + records = [f"alt.example.com; accounturi=https://ca.example.com/acct/123; persistUntil={time_now + 3600}"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=issuer_domain_names, + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is True, "Should pass with second allowed issuer domain" + + def evaluate_persistent_dns_response__should_return_false_given_malformed_record(self): + issuer_domain_names = ["ca.example"] + expected_account_uri = "https://ca.example/acct/123" + malformed_records = [ + ";;;", # Only separators + "ca.example", # Missing parameters + "ca.example;", # Parameter separator but no parameters + "ca.example; =value", # Missing parameter name + "ca.example; accounturi", # Missing value + ] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=issuer_domain_names, + expected_parameters={"accounturi": expected_account_uri}, + ) + + for record in malformed_records: + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, [record]) + assert result is False, f"Should fail with malformed record: {record}" + @pytest.mark.parametrize("dcv_method", [DcvValidationMethod.DNS_CHANGE, DcvValidationMethod.ACME_DNS_01]) async def dns_based_dcv_checks__should_return_timestamp_and_list_of_records_seen(self, dcv_method, mocker): dcv_request = ValidCheckCreator.create_valid_dcv_check_request(dcv_method) @@ -748,7 +963,7 @@ async def dns_based_dcv_checks__should_not_pass_with_errors_given_exception_rais async def is_expected_ip_address_in_response__should_return_true_if_valid_record_exists_alongside_malformed_records( self, record_type ): - records_as_strings = ["foo", "bar"] + records_as_strings = ["foo", "bar", "1.1.1.1"] if record_type is DnsRecordType.A: expected_record = "1.2.3.4" records_as_strings.append(expected_record) @@ -1036,58 +1251,6 @@ def format_host_for_url__should_wrap_ipv6_addresses_in_square_brackets(self, inp result = MpicDcvChecker.format_host_for_url(input_target) assert result == expected_output - @pytest.mark.parametrize("dcv_method", [DcvValidationMethod.WEBSITE_CHANGE, DcvValidationMethod.ACME_HTTP_01]) - async def http_based_dcv_checks__should_format_ipv6_addresses_with_square_brackets_in_url( - self, dcv_method, mocker - ): - dcv_request = ValidCheckCreator.create_valid_dcv_check_request(dcv_method) - ipv6_address = "2001:db8::1" - dcv_request.domain_or_ip_target = ipv6_address - - if dcv_method == DcvValidationMethod.WEBSITE_CHANGE: - expected_challenge = dcv_request.dcv_check_parameters.challenge_value - url_scheme = dcv_request.dcv_check_parameters.url_scheme - http_token_path = dcv_request.dcv_check_parameters.http_token_path - expected_url = f"{url_scheme}://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}" - else: - expected_challenge = dcv_request.dcv_check_parameters.key_authorization - token = dcv_request.dcv_check_parameters.token - expected_url = f"http://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" - - success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge) - mock_get = mocker.patch( - "aiohttp.ClientSession.get", - side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)), - ) - - dcv_response = await self.dcv_checker.check_dcv(dcv_request) - - # Verify the URL used in the request contains properly formatted IPv6 - assert mock_get.call_args.kwargs["url"] == expected_url - assert dcv_response.details.response_url == expected_url - - async def http_based_dcv_checks__should_not_modify_ipv4_addresses_in_url(self, mocker): - dcv_request = ValidCheckCreator.create_valid_dcv_check_request(DcvValidationMethod.WEBSITE_CHANGE) - ipv4_address = "192.168.1.1" - dcv_request.domain_or_ip_target = ipv4_address - - expected_challenge = dcv_request.dcv_check_parameters.challenge_value - url_scheme = dcv_request.dcv_check_parameters.url_scheme - http_token_path = dcv_request.dcv_check_parameters.http_token_path - expected_url = f"{url_scheme}://{ipv4_address}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}" - - success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge) - mock_get = mocker.patch( - "aiohttp.ClientSession.get", - side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)), - ) - - dcv_response = await self.dcv_checker.check_dcv(dcv_request) - - # Verify the URL used in the request contains IPv4 without modification - assert mock_get.call_args.kwargs["url"] == expected_url - assert dcv_response.details.response_url == expected_url - @staticmethod def shuffle_case(string_to_shuffle: str) -> str: result = "".join( @@ -1103,146 +1266,5 @@ def shuffle_case(string_to_shuffle: str) -> str: return result -class TestDnsPersistentValidation: - """Tests specifically for DNS_PERSISTENT validation method""" - - @pytest.fixture(autouse=True) - def setup_dcv_checker(self): - self.dcv_checker = MpicDcvChecker() - yield self.dcv_checker - - @pytest.mark.parametrize("record_content, expected_result, description", [ - ("authority.example; accounturi=https://authority.example/acct/123; persistUntil=1893456000", True, - "Valid record with future persistUntil"), - ("authority.example; accounturi=https://authority.example/acct/123", True, - "Valid record without persistUntil (optional parameter)"), - ("AUTHORITY.EXAMPLE; ACCOUNTURI=HTTPS://AUTHORITY.EXAMPLE/ACCT/123", True, - "Valid record with uppercase (case insensitive)"), - ("authority.example; accounturi=https://authority.example/acct/123; customParam=value; persistUntil=1893456000", True, - "Valid record with additional unknown parameters (should be ignored)"), - ("wrong.issuer; accounturi=https://authority.example/acct/123; persistUntil=1893456000", False, - "Wrong issuer domain"), - ("authority.example; accounturi=https://wrong.example/acct/456; persistUntil=1893456000", False, - "Wrong account URI"), - ("authority.example; accounturi=https://authority.example/acct/123; persistUntil=1735689600", False, - "Expired persistUntil (past timestamp)"), - ("authority.example; accounturi=https://authority.example/acct/123; persistUntil=invalid", False, - "Invalid timestamp format"), - ("authority.example; wrongparam=value", False, - "Missing required accounturi parameter"), - ("invalid-domain; accounturi=https://authority.example/acct/123; persistUntil=1893456000", False, - "Invalid issuer domain format"), - ]) - def test_evaluate_persistent_dns_response(self, record_content, expected_result, description): - """Test the persistent DNS validation logic with various record formats""" - from open_mpic_core.mpic_dcv_checker.mpic_dcv_checker import MpicDcvChecker - - issuer_domain_names = ["authority.example"] - expected_account_uri = "https://authority.example/acct/123" - records = [record_content] if record_content else [] - - result = MpicDcvChecker.evaluate_persistent_dns_response(records, issuer_domain_names, expected_account_uri) - assert result == expected_result, f"Test failed: {description}" - - def test_evaluate_persistent_dns_response_multiple_records(self): - """Test that validation passes if any record in the list is valid""" - from open_mpic_core.mpic_dcv_checker.mpic_dcv_checker import MpicDcvChecker - - issuer_domain_names = ["authority.example"] - expected_account_uri = "https://authority.example/acct/123" - records = [ - "wrong.issuer; accounturi=https://authority.example/acct/123; persistUntil=1893456000", # Invalid - "authority.example; accounturi=https://wrong.example/acct/456; persistUntil=1893456000", # Invalid - "authority.example; accounturi=https://authority.example/acct/123; persistUntil=1893456000", # Valid - ] - - result = MpicDcvChecker.evaluate_persistent_dns_response(records, issuer_domain_names, expected_account_uri) - assert result is True, "Should pass if any record is valid" - - def test_evaluate_persistent_dns_response_multiple_issuers(self): - """Test validation with multiple allowed issuer domains""" - from open_mpic_core.mpic_dcv_checker.mpic_dcv_checker import MpicDcvChecker - - issuer_domain_names = ["authority1.example", "authority2.example"] - expected_account_uri = "https://authority2.example/acct/456" - record = "authority2.example; accounturi=https://authority2.example/acct/456; persistUntil=1893456000" - - result = MpicDcvChecker.evaluate_persistent_dns_response([record], issuer_domain_names, expected_account_uri) - assert result is True, "Should pass with second allowed issuer domain" - - def test_evaluate_persistent_dns_response_edge_cases(self): - """Test edge cases for persistent DNS validation""" - from open_mpic_core.mpic_dcv_checker.mpic_dcv_checker import MpicDcvChecker - - issuer_domain_names = ["authority.example"] - expected_account_uri = "https://authority.example/acct/123" - - # Test empty issuer_domain_names - result = MpicDcvChecker.evaluate_persistent_dns_response(["valid.record"], [], expected_account_uri) - assert result is False, "Should fail with empty issuer_domain_names" - - # Test empty account URI - result = MpicDcvChecker.evaluate_persistent_dns_response(["valid.record"], issuer_domain_names, "") - assert result is False, "Should fail with empty account_uri" - - # Test empty records - result = MpicDcvChecker.evaluate_persistent_dns_response([], issuer_domain_names, expected_account_uri) - assert result is False, "Should fail with empty records list" - - # Test malformed records - malformed_records = [ - ";;;", # Only separators - "authority.example;", # Incomplete - "authority.example; =value", # Missing parameter name - "authority.example; accounturi", # Missing value - ] - - for record in malformed_records: - result = MpicDcvChecker.evaluate_persistent_dns_response([record], issuer_domain_names, expected_account_uri) - assert result is False, f"Should fail with malformed record: {record}" - - async def test_dns_persistent_integration(self, mocker): - """Integration test for DNS_PERSISTENT validation through the main check_dcv method""" - dcv_request = ValidCheckCreator.create_valid_dns_persistent_check_request() - - # Mock DNS resolution to return a valid persistent record - issuer_domain = dcv_request.dcv_check_parameters.issuer_domain_names[0] - account_uri = dcv_request.dcv_check_parameters.expected_account_uri - persistent_value = f"{issuer_domain}; accounturi={account_uri}; persistUntil=1893456000" - - # Mock DNS response - from unit.test_util.mock_dns_object_creator import MockDnsObjectCreator - - record_data = {"value": persistent_value} - test_dns_query_answer = MockDnsObjectCreator.create_dns_query_answer( - dcv_request.domain_or_ip_target, - "_validation-persist", - dcv_request.dcv_check_parameters.dns_record_type, - record_data, - mocker - ) - - # Mock the DNS resolver - async def side_effect(qname, rdtype): - if qname == f"_validation-persist.{dcv_request.domain_or_ip_target}": - return test_dns_query_answer - raise dns.resolver.NoAnswer - - mocker.patch.object( - self.dcv_checker.resolver, - 'resolve', - side_effect=side_effect - ) - - # Execute the check - dcv_response = await self.dcv_checker.check_dcv(dcv_request) - - # Verify the response - assert dcv_response.check_passed is True, "DNS persistent validation should pass" - assert dcv_response.check_completed is True, "Check should be completed" - assert dcv_response.errors is None or len(dcv_response.errors) == 0, "Should have no errors" - assert dcv_response.details.records_seen == [persistent_value], "Should see the persistent record" - - if __name__ == "__main__": pytest.main() From 1c4dfdcd9aea5e1cb758e580eca0d4b49448fb0e Mon Sep 17 00:00:00 2001 From: Dmitry Sharkov Date: Mon, 22 Dec 2025 15:31:39 -0500 Subject: [PATCH 3/9] bumped versions of implemented API spec and of this library --- pyproject.toml | 2 +- src/open_mpic_core/__about__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 517eefc..51ff95b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,7 +57,7 @@ build.targets.wheel.packages = ["src/open_mpic_core"] "./tests/unit/test_util" = "open_mpic_core_test/test_util" # include tests in the wheel to facilitate integration testing in wrapper projects [tool.api] -spec_version = "3.7.0" +spec_version = "3.8.0" spec_repository = "https://github.com/open-mpic/open-mpic-specification" [tool.hatch.envs.default] diff --git a/src/open_mpic_core/__about__.py b/src/open_mpic_core/__about__.py index 0a895f3..833eacf 100644 --- a/src/open_mpic_core/__about__.py +++ b/src/open_mpic_core/__about__.py @@ -1 +1 @@ -__version__ = "6.2.0" +__version__ = "6.3.0" From 696b6bb523e7933d8c38427b58cff974366b431b Mon Sep 17 00:00:00 2001 From: Dmitry Sharkov Date: Mon, 22 Dec 2025 17:27:07 -0500 Subject: [PATCH 4/9] removed unnecessary field declaration --- src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py index d5a04a0..3c49013 100644 --- a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py +++ b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py @@ -33,10 +33,6 @@ def __init__(self, expected_value=None, possible_values=None, expected_parameter self.possible_values = possible_values self.expected_parameters = expected_parameters - expected_value: str | None = None, - possible_values: list[str] | None = None, - expected_parameters: dict[str, str] | None = None - # noinspection PyUnusedLocal class MpicDcvChecker: From 1bb47704cfebd51b932e220914c745dde7df3ae7 Mon Sep 17 00:00:00 2001 From: Dmitry Sharkov Date: Mon, 22 Dec 2025 17:34:03 -0500 Subject: [PATCH 5/9] removed unnecessary test. fixed typo. --- .../mpic_dcv_checker/mpic_dcv_checker.py | 2 +- .../open_mpic_core/test_mpic_coordinator.py | 22 ------------------- 2 files changed, 1 insertion(+), 23 deletions(-) diff --git a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py index 3c49013..74ebd40 100644 --- a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py +++ b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py @@ -441,7 +441,7 @@ def evaluate_persistent_dns_response( issuer_domain_name = parts[0].lower() param_list = parts[1:] - # First check issuer-domain-nsame matches one of the expected values + # First check issuer-domain-name matches one of the expected values if issuer_domain_name not in accepted_domain_names: continue diff --git a/tests/unit/open_mpic_core/test_mpic_coordinator.py b/tests/unit/open_mpic_core/test_mpic_coordinator.py index e6488c7..be37e79 100644 --- a/tests/unit/open_mpic_core/test_mpic_coordinator.py +++ b/tests/unit/open_mpic_core/test_mpic_coordinator.py @@ -600,28 +600,6 @@ async def coordinate_mpic__should_set_mpic_completed_true_if_enough_perspectives assert mpic_response.is_valid is should_complete_mpic assert mpic_response.mpic_completed is should_complete_mpic - # TODO do we need this here? or just in dcv checker tests? this probably doesn't test anything yet untested - async def coordinate_mpic__should_successfully_handle_dns_persistent_dcv_method(self): - mpic_request = ValidMpicRequestCreator.create_valid_dcv_mpic_request( - validation_method=DcvValidationMethod.DNS_PERSISTENT - ) - mpic_coordinator_config = self.create_mpic_coordinator_configuration() - mocked_call_remote_perspective_function = AsyncMock() - mocked_call_remote_perspective_function.side_effect = TestMpicCoordinator.SideEffectForMockedPayloads( - self.create_passing_dcv_check_response - ) - mpic_coordinator = MpicCoordinator(mocked_call_remote_perspective_function, mpic_coordinator_config) - mpic_response = await mpic_coordinator.coordinate_mpic(mpic_request) - - # Verify the response is valid - assert mpic_response.is_valid is True - # Verify DNS_PERSISTENT method was used - assert mpic_request.dcv_check_parameters.validation_method == DcvValidationMethod.DNS_PERSISTENT - # Verify required DNS_PERSISTENT parameters are present - assert hasattr(mpic_request.dcv_check_parameters, "issuer_domain_names") - assert hasattr(mpic_request.dcv_check_parameters, "expected_account_uri") - assert len(mpic_request.dcv_check_parameters.issuer_domain_names) > 0 - @staticmethod def create_mpic_coordinator_configuration() -> MpicCoordinatorConfiguration: target_perspectives = [ From a5d32c2f601ad8a3bae9f39735f37dd140884b30 Mon Sep 17 00:00:00 2001 From: Dmitry Sharkov Date: Mon, 22 Dec 2025 17:36:30 -0500 Subject: [PATCH 6/9] cleaned up a test --- tests/unit/open_mpic_core/test_mpic_dcv_checker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/unit/open_mpic_core/test_mpic_dcv_checker.py b/tests/unit/open_mpic_core/test_mpic_dcv_checker.py index 42ae7f2..244a3aa 100644 --- a/tests/unit/open_mpic_core/test_mpic_dcv_checker.py +++ b/tests/unit/open_mpic_core/test_mpic_dcv_checker.py @@ -825,11 +825,12 @@ def evaluate_persistent_dns_response__should_return_false_given_missing_account_ assert result is False def evaluate_persistent_dns_response__should_return_false_given_malformed_persist_until_parameter(self): + issuer_domain_name = "ca.example.com" expected_account_uri = "https://ca.example.com/acct/123" - records = [f"accounturi={expected_account_uri}; persistUntil={int(time.time())+3600}garbage"] + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}; persistUntil={int(time.time())+3600}foo"] expected_dns_record_content = ExpectedDnsRecordContent( - possible_values=["ca.example.com"], + possible_values=[issuer_domain_name], expected_parameters={"accounturi": expected_account_uri}, ) From b28a28782b1ca6de47987514072eb2c10ecfc3d6 Mon Sep 17 00:00:00 2001 From: Dmitry Sharkov Date: Mon, 22 Dec 2025 17:37:39 -0500 Subject: [PATCH 7/9] removed unused variable --- src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py index 74ebd40..2ec4934 100644 --- a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py +++ b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py @@ -132,11 +132,6 @@ async def perform_general_dns_validation(self, request: DcvCheckRequest) -> DcvC if validation_method == DcvValidationMethod.DNS_CHANGE: # DNS_CHANGE may allow for non-exact match exact_match = check_parameters.require_exact_match - # Extract persistent validation parameters if needed - persistent_params = None - if validation_method == DcvValidationMethod.DNS_PERSISTENT: - persistent_params = (check_parameters.issuer_domain_names, check_parameters.expected_account_uri) - dcv_check_response = DcvUtils.create_empty_check_response(validation_method) try: From 28b84870a1e67eec7163b14d7a06b341a1881cec Mon Sep 17 00:00:00 2001 From: Dmitry Sharkov Date: Mon, 22 Dec 2025 17:38:17 -0500 Subject: [PATCH 8/9] removed unused import from mpic_dcv_checker --- src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py index 2ec4934..d18ffee 100644 --- a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py +++ b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py @@ -14,7 +14,7 @@ from aiohttp import ClientError from aiohttp.web import HTTPException -from open_mpic_core import DcvCheckRequest, DcvCheckResponse, DcvCheckParameters +from open_mpic_core import DcvCheckRequest, DcvCheckResponse from open_mpic_core import RedirectResponse, DcvUtils from open_mpic_core import DcvValidationMethod, DnsRecordType from open_mpic_core import MpicValidationError, ErrorMessages @@ -484,7 +484,7 @@ def build_expected_dns_record_content( expected_content = ExpectedDnsRecordContent(expected_value=check_parameters.key_authorization_hash) elif validation_method == DcvValidationMethod.DNS_PERSISTENT: expected_content = ExpectedDnsRecordContent( - expected_value=None, # validated via issuer_domains and account_uri + expected_value=None, # validated via issuer_domains and account_uri possible_values=check_parameters.issuer_domain_names, expected_parameters={"accounturi": check_parameters.expected_account_uri}, ) From 73d23abad6113f7a49a7087f9651677b09a905f0 Mon Sep 17 00:00:00 2001 From: Dmitry Sharkov Date: Mon, 12 Jan 2026 02:23:40 -0500 Subject: [PATCH 9/9] upgraded aiohttp to 3.13.3 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 51ff95b..3f55cd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ dependencies = [ "requests==2.32.4", "dnspython==2.7.0", "pydantic==2.11.7", - "aiohttp==3.12.14", + "aiohttp==3.13.3", "black==25.1.0", "cryptography==45.0.4", ]