diff --git a/pyproject.toml b/pyproject.toml index 5712f14..3f55cd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ dependencies = [ "requests==2.32.4", "dnspython==2.7.0", "pydantic==2.11.7", - "aiohttp==3.12.14", + "aiohttp==3.13.3", "black==25.1.0", "cryptography==45.0.4", ] @@ -57,7 +57,7 @@ build.targets.wheel.packages = ["src/open_mpic_core"] "./tests/unit/test_util" = "open_mpic_core_test/test_util" # include tests in the wheel to facilitate integration testing in wrapper projects [tool.api] -spec_version = "3.7.0" +spec_version = "3.8.0" spec_repository = "https://github.com/open-mpic/open-mpic-specification" [tool.hatch.envs.default] @@ -99,8 +99,6 @@ markers = [ addopts = [ "--import-mode=prepend", # explicit default, as the tests rely on it for proper import resolution ] -spec_header_format = "Spec for {test_case} ({path}):" -spec_test_format = "{result} {docstring_summary}" # defaults to {name} if docstring is not present in test asyncio_mode = "auto" # defaults to "strict" asyncio_default_fixture_loop_scope = "function" diff --git a/src/open_mpic_core/__about__.py b/src/open_mpic_core/__about__.py index 0a895f3..833eacf 100644 --- a/src/open_mpic_core/__about__.py +++ b/src/open_mpic_core/__about__.py @@ -1 +1 @@ -__version__ = "6.2.0" +__version__ = "6.3.0" diff --git a/src/open_mpic_core/__init__.py b/src/open_mpic_core/__init__.py index ca3c1a0..2d55948 100644 --- a/src/open_mpic_core/__init__.py +++ b/src/open_mpic_core/__init__.py @@ -13,6 +13,7 @@ DcvAcmeHttp01ValidationParameters, DcvWebsiteChangeValidationParameters, DcvDnsChangeValidationParameters, + DcvDnsPersistentValidationParameters, DcvAcmeDns01ValidationParameters, DcvContactPhoneTxtValidationParameters, DcvContactEmailCaaValidationParameters, diff --git a/src/open_mpic_core/common_domain/check_parameters.py b/src/open_mpic_core/common_domain/check_parameters.py index 103952f..c23e419 100644 --- a/src/open_mpic_core/common_domain/check_parameters.py +++ b/src/open_mpic_core/common_domain/check_parameters.py @@ -54,6 +54,14 @@ def validate_record_type(cls, v: DnsRecordType) -> DnsRecordType: return v +class DcvDnsPersistentValidationParameters(DcvValidationParameters): + validation_method: Literal[DcvValidationMethod.DNS_PERSISTENT] = DcvValidationMethod.DNS_PERSISTENT + dns_record_type: Literal[DnsRecordType.TXT] = DnsRecordType.TXT + dns_name_prefix: Literal["_validation-persist"] = "_validation-persist" + issuer_domain_names: list[str] # Disclosed issuer domain names from CA's CP/CPS + expected_account_uri: str # The specific account URI to validate + + class DcvContactEmailTxtValidationParameters(DcvGeneralDnsValidationParameters): validation_method: Literal[DcvValidationMethod.CONTACT_EMAIL_TXT] = DcvValidationMethod.CONTACT_EMAIL_TXT dns_record_type: Literal[DnsRecordType.TXT] = DnsRecordType.TXT @@ -117,6 +125,7 @@ class DcvAcmeTlsAlpn01ValidationParameters(DcvValidationParameters): Union[ DcvWebsiteChangeValidationParameters, DcvDnsChangeValidationParameters, + DcvDnsPersistentValidationParameters, DcvAcmeHttp01ValidationParameters, DcvAcmeDns01ValidationParameters, DcvAcmeTlsAlpn01ValidationParameters, diff --git a/src/open_mpic_core/common_domain/check_response_details.py b/src/open_mpic_core/common_domain/check_response_details.py index a09b753..8ced953 100644 --- a/src/open_mpic_core/common_domain/check_response_details.py +++ b/src/open_mpic_core/common_domain/check_response_details.py @@ -27,12 +27,14 @@ class DcvHttpCheckResponseDetails(BaseModel): class DcvDnsCheckResponseDetails(BaseModel): validation_method: Literal[ DcvValidationMethod.DNS_CHANGE, + DcvValidationMethod.DNS_PERSISTENT, DcvValidationMethod.IP_ADDRESS, DcvValidationMethod.CONTACT_EMAIL_CAA, DcvValidationMethod.CONTACT_EMAIL_TXT, DcvValidationMethod.CONTACT_PHONE_CAA, DcvValidationMethod.CONTACT_PHONE_TXT, DcvValidationMethod.ACME_DNS_01, + DcvValidationMethod.DNS_ACCOUNT_01, DcvValidationMethod.REVERSE_ADDRESS_LOOKUP, ] records_seen: list[str] | None = None # list of records found in DNS query; not base64 encoded @@ -41,6 +43,7 @@ class DcvDnsCheckResponseDetails(BaseModel): found_at: str | None = None # domain where DNS record was found cname_chain: list[str] | None = None # List of CNAMEs followed to obtain the final result. + class DcvTlsAlpnCheckResponseDetails(BaseModel): validation_method: Literal[DcvValidationMethod.ACME_TLS_ALPN_01] common_name: str | None = None # common name seen in certificate. @@ -56,8 +59,10 @@ def build_response_details(validation_method: DcvValidationMethod) -> DcvCheckRe types = { DcvValidationMethod.WEBSITE_CHANGE: DcvHttpCheckResponseDetails, DcvValidationMethod.DNS_CHANGE: DcvDnsCheckResponseDetails, + DcvValidationMethod.DNS_PERSISTENT: DcvDnsCheckResponseDetails, DcvValidationMethod.ACME_HTTP_01: DcvHttpCheckResponseDetails, DcvValidationMethod.ACME_DNS_01: DcvDnsCheckResponseDetails, + DcvValidationMethod.DNS_ACCOUNT_01: DcvDnsCheckResponseDetails, DcvValidationMethod.ACME_TLS_ALPN_01: DcvTlsAlpnCheckResponseDetails, DcvValidationMethod.CONTACT_PHONE_TXT: DcvDnsCheckResponseDetails, DcvValidationMethod.CONTACT_PHONE_CAA: DcvDnsCheckResponseDetails, diff --git a/src/open_mpic_core/common_domain/enum/dcv_validation_method.py b/src/open_mpic_core/common_domain/enum/dcv_validation_method.py index b0cdcc2..781d889 100644 --- a/src/open_mpic_core/common_domain/enum/dcv_validation_method.py +++ b/src/open_mpic_core/common_domain/enum/dcv_validation_method.py @@ -2,11 +2,13 @@ class DcvValidationMethod(StrEnum): - WEBSITE_CHANGE = 'website-change' + WEBSITE_CHANGE = 'website-change' # CABF BRs 3.2.2.4.18 Agreed-Upon Change to Website v2 DNS_CHANGE = 'dns-change' # CNAME, TXT, or CAA record - ACME_HTTP_01 = 'acme-http-01' + DNS_PERSISTENT = 'dns-persistent' # CABF BRs 3.2.2.4.22 DNS TXT Record with Persistent Value + ACME_HTTP_01 = 'acme-http-01' # CABF BRs 3.2.2.4.19 Agreed-Upon Change to Website - ACME ACME_DNS_01 = 'acme-dns-01' # TXT record - ACME_TLS_ALPN_01 = 'acme-tls-alpn-01' + ACME_TLS_ALPN_01 = 'acme-tls-alpn-01' # CABF BRs 3.2.2.4.20 TLS Using ALPN + DNS_ACCOUNT_01 = 'dns-account-01' # CABF BRs 3.2.2.4.21 DNS Labeled with Account ID - ACME TODO not yet implemented CONTACT_EMAIL_CAA = 'contact-email-caa' CONTACT_EMAIL_TXT = 'contact-email-txt' CONTACT_PHONE_CAA = 'contact-phone-caa' diff --git a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py index eac0e50..d18ffee 100644 --- a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py +++ b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py @@ -25,6 +25,15 @@ logger = get_logger(__name__) +class ExpectedDnsRecordContent: + def __init__(self, expected_value=None, possible_values=None, expected_parameters=None): + self.expected_value = expected_value + self.possible_values = None + if self.expected_value is None: + self.possible_values = possible_values + self.expected_parameters = expected_parameters + + # noinspection PyUnusedLocal class MpicDcvChecker: WELL_KNOWN_PKI_PATH = ".well-known/pki-validation" @@ -92,11 +101,10 @@ async def check_dcv(self, dcv_request: DcvCheckRequest) -> DcvCheckResponse: result = await self.perform_http_based_validation(dcv_request) case DcvValidationMethod.ACME_TLS_ALPN_01: result = await self.acme_tls_alpn_validator.perform_tls_alpn_validation(dcv_request) - case _: # ACME_DNS_01 | DNS_CHANGE | IP_LOOKUP | CONTACT_EMAIL | CONTACT_PHONE | REVERSE_ADDRESS_LOOKUP + case _: # all DNS based methods result = await self.perform_general_dns_validation(dcv_request) # noinspection PyUnresolvedReferences - self.logger.trace( "Completed DCV for %s with method %s. Trace ID: %s", dcv_request.domain_or_ip_target, @@ -117,12 +125,11 @@ async def perform_general_dns_validation(self, request: DcvCheckRequest) -> DcvC else: name_to_resolve = request.domain_or_ip_target - if validation_method == DcvValidationMethod.ACME_DNS_01: - expected_dns_record_content = check_parameters.key_authorization_hash - else: - expected_dns_record_content = check_parameters.challenge_value + expected_dns_record_content = MpicDcvChecker.build_expected_dns_record_content( + validation_method, check_parameters + ) - if validation_method == DcvValidationMethod.DNS_CHANGE: + if validation_method == DcvValidationMethod.DNS_CHANGE: # DNS_CHANGE may allow for non-exact match exact_match = check_parameters.require_exact_match dcv_check_response = DcvUtils.create_empty_check_response(validation_method) @@ -170,7 +177,8 @@ async def perform_dns_resolution(self, name_to_resolve, validation_method, dns_r except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN): domain = domain.parent() else: - lookup = await self.resolver.resolve(qname=name_to_resolve, rdtype=dns_rdata_type) + domain = dns.name.from_text(name_to_resolve) # to ensure trailing dot is added + lookup = await self.resolver.resolve(qname=domain, rdtype=dns_rdata_type) return lookup @staticmethod @@ -193,14 +201,14 @@ async def perform_http_based_validation(self, request: DcvCheckRequest) -> DcvCh expected_response_content = request.dcv_check_parameters.challenge_value url_scheme = request.dcv_check_parameters.url_scheme token_path = request.dcv_check_parameters.http_token_path - token_url = f"{url_scheme}://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{token_path}" # noqa E501 (http) + token_url = ( + f"{url_scheme}://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{token_path}" # noqa E501 (http) + ) dcv_check_response = DcvUtils.create_empty_check_response(DcvValidationMethod.WEBSITE_CHANGE) else: expected_response_content = request.dcv_check_parameters.key_authorization token = request.dcv_check_parameters.token - token_url = ( - f"http://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" # noqa E501 (http) - ) + token_url = f"http://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" # noqa E501 (http) dcv_check_response = DcvUtils.create_empty_check_response(DcvValidationMethod.ACME_HTTP_01) try: async with self.get_async_http_client() as async_http_client: @@ -314,9 +322,9 @@ def evaluate_dns_lookup_response( dns_response: dns.resolver.Answer, validation_method: DcvValidationMethod, dns_record_type: DnsRecordType, - expected_dns_record_content: str, + expected_dns_record_content: ExpectedDnsRecordContent | None, exact_match: bool = True, - ): + ) -> None: if dns_response is None: dcv_check_response.check_passed = False dcv_check_response.check_completed = True @@ -356,24 +364,31 @@ def evaluate_dns_lookup_response( dcv_check_response.details.cname_chain = cname_chain_str dcv_check_response.details.found_at = dns_response.qname.to_text(omit_final_dot=True) - # Case-insensitive comparison for all validation methods except ACME and IP Address - if validation_method not in (DcvValidationMethod.ACME_DNS_01, DcvValidationMethod.IP_ADDRESS): - expected_dns_record_content = expected_dns_record_content.lower() - records_as_strings = [record.lower() for record in records_as_strings] - - # exact_match=True requires at least one record matches and will fail even if whitespace is different. - # exact_match=False simply runs a contains check. - if exact_match: - if validation_method == DcvValidationMethod.IP_ADDRESS: - dcv_check_response.check_passed = MpicDcvChecker.is_expected_ip_address_in_response( - expected_dns_record_content, records_as_strings - ) - else: - dcv_check_response.check_passed = expected_dns_record_content in records_as_strings - else: - dcv_check_response.check_passed = any( - expected_dns_record_content in record for record in records_as_strings + # handle "special logic" validation methods first + if validation_method == DcvValidationMethod.IP_ADDRESS: + dcv_check_response.check_passed = MpicDcvChecker.is_expected_ip_address_in_response( + expected_dns_record_content.expected_value, records_as_strings + ) + elif validation_method == DcvValidationMethod.DNS_PERSISTENT: + dcv_check_response.check_passed = MpicDcvChecker.evaluate_persistent_dns_response( + expected_dns_record_content, records_as_strings ) + else: + if validation_method == DcvValidationMethod.ACME_DNS_01: + expected_dns_value = expected_dns_record_content.expected_value # case-sensitive per ACME spec + else: + expected_dns_value = expected_dns_record_content.expected_value.lower() # all others case-insensitive + records_as_strings = [record.lower() for record in records_as_strings] + + # exact_match=True requires at least one record matches and will fail even if whitespace is different. + # exact_match=False simply runs a contains check. + if exact_match: + dcv_check_response.check_passed = expected_dns_value in records_as_strings + else: + dcv_check_response.check_passed = any( + expected_dns_value in record for record in records_as_strings + ) + dcv_check_response.check_completed = True @staticmethod @@ -397,6 +412,86 @@ def is_expected_ip_address_in_response(ip_address_as_string: str, records_as_str continue return ip_address_found + @staticmethod + def evaluate_persistent_dns_response( + expected_dns_record_content: ExpectedDnsRecordContent, + records_as_strings: list[str], + ) -> bool: + """ + Evaluate DNS TXT records for persistent validation per CA/Browser Forum requirements. + Expected format follows RFC 8659 CAA issue-value syntax: + "issuer-domain-name; accounturi=; persistUntil=" + The persistUntil parameter is optional. + """ + found_valid_record = False + accepted_domain_names = [domain.lower() for domain in expected_dns_record_content.possible_values] + expected_account_uri = expected_dns_record_content.expected_parameters['accounturi'].lower() + + for txt_record in records_as_strings: + # Split on semicolon (parameter delimiter) and strip whitespace from each part + parts = [part.strip() for part in txt_record.rstrip().split(";")] + if len(parts) < 2: + continue # Need at least issuer-domain-name and one parameter + + issuer_domain_name = parts[0].lower() + param_list = parts[1:] + + # First check issuer-domain-name matches one of the expected values + if issuer_domain_name not in accepted_domain_names: + continue + + # Look for required accounturi parameter + valid_account_uri = False + within_allowed_time = True # Assume valid unless proven otherwise + + if not (len(param_list) == 1 and param_list[0].strip() == ""): # if actual parameters follow the semicolon + for parameter in param_list: + name_and_value = parameter.split("=", 1) + if len(name_and_value) != 2: + break # malformed parameter; skip to next record + param_name = name_and_value[0].strip().lower() + param_value = name_and_value[1].strip() + + if param_name == "accounturi": + if param_value.lower() == expected_account_uri: + valid_account_uri = True + else: + break # accounturi does not match; skip to next record + elif param_name == "persistuntil": + try: + persist_until_in_seconds = int(param_value) # seconds since epoch + current_seconds = int(time.time()) + if persist_until_in_seconds < current_seconds: + within_allowed_time = False + break + except (ValueError, OSError): + within_allowed_time = False + break # Invalid timestamp format + # Additional parameters are ignored per CA/Browser Forum spec + + # Record is valid if issuer matches, account URI matches, and not expired + if valid_account_uri and within_allowed_time: + found_valid_record = True + + return found_valid_record + + @staticmethod + def build_expected_dns_record_content( + validation_method: DcvValidationMethod, + check_parameters, + ) -> ExpectedDnsRecordContent: + if validation_method == DcvValidationMethod.ACME_DNS_01: + expected_content = ExpectedDnsRecordContent(expected_value=check_parameters.key_authorization_hash) + elif validation_method == DcvValidationMethod.DNS_PERSISTENT: + expected_content = ExpectedDnsRecordContent( + expected_value=None, # validated via issuer_domains and account_uri + possible_values=check_parameters.issuer_domain_names, + expected_parameters={"accounturi": check_parameters.expected_account_uri}, + ) + else: + expected_content = ExpectedDnsRecordContent(expected_value=check_parameters.challenge_value) + return expected_content + # noinspection PyUnresolvedReferences @staticmethod def extract_value_from_record(record: dns.rdata.Rdata) -> str: diff --git a/tests/unit/open_mpic_core/test_check_request_parameters.py b/tests/unit/open_mpic_core/test_check_request_parameters.py index 0e7cf8a..5f61b30 100644 --- a/tests/unit/open_mpic_core/test_check_request_parameters.py +++ b/tests/unit/open_mpic_core/test_check_request_parameters.py @@ -5,6 +5,7 @@ DcvAcmeHttp01ValidationParameters, DcvWebsiteChangeValidationParameters, DcvDnsChangeValidationParameters, + DcvDnsPersistentValidationParameters, DcvAcmeDns01ValidationParameters, DcvContactPhoneTxtValidationParameters, DcvContactEmailCaaValidationParameters, @@ -24,6 +25,8 @@ class TestCheckRequestDetails: DcvDnsChangeValidationParameters), ('{"validation_method": "dns-change", "dns_record_type": "CNAME", "challenge_value": "test-cv"}', DcvDnsChangeValidationParameters), + ('{"validation_method": "dns-persistent", "issuer_domain_names": ["authority.example"], "expected_account_uri": "https://authority.example/acct/123"}', + DcvDnsPersistentValidationParameters), ('{"validation_method": "acme-http-01", "token": "test-t", "key_authorization": "test-ka"}', DcvAcmeHttp01ValidationParameters), ('{"validation_method": "acme-dns-01", "key_authorization_hash": "test-ka"}', @@ -57,6 +60,10 @@ def check_request_parameters__should_automatically_deserialize_into_correct_obje "should fail validation when DNS record type is invalid for Contact Phone"), ('{"validation_method": "ip-address", "dns_record_type": "TXT", "challenge_value": "test-cv"}', "should fail validation when DNS record type is invalid like TXT for IP Address"), + ('{"validation_method": "dns-persistent", "expected_account_uri": "https://authority.example/acct/123"}', + "should fail validation when required issuer_domain_names is missing for DNS Persistent"), + ('{"validation_method": "dns-persistent", "issuer_domain_names": ["authority.example"]}', + "should fail validation when required expected_account_uri is missing for DNS Persistent"), ]) # fmt: on def check_request_parameters__should_fail_validation_when_serialized_object_is_malformed( diff --git a/tests/unit/open_mpic_core/test_mpic_caa_request.py b/tests/unit/open_mpic_core/test_mpic_caa_request.py index 2918228..35cfc91 100644 --- a/tests/unit/open_mpic_core/test_mpic_caa_request.py +++ b/tests/unit/open_mpic_core/test_mpic_caa_request.py @@ -15,7 +15,7 @@ class TestMpicCaaRequest: def model_validate_json__should_return_caa_mpic_request_given_valid_caa_json(self): request = ValidMpicRequestCreator.create_valid_caa_mpic_request() - mpic_request = MpicCaaRequest.model_validate_json(json.dumps(request.model_dump())) + mpic_request = MpicCaaRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert mpic_request.domain_or_ip_target == request.domain_or_ip_target def mpic_caa_request__should_require_domain_or_ip_target(self): @@ -23,7 +23,7 @@ def mpic_caa_request__should_require_domain_or_ip_target(self): # noinspection PyTypeChecker request.domain_or_ip_target = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicCaaRequest.model_validate_json(json.dumps(request.model_dump())) + MpicCaaRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "domain_or_ip_target" in str(validation_error.value) @pytest.mark.parametrize("certificate_type", ["invalid"]) diff --git a/tests/unit/open_mpic_core/test_mpic_coordinator.py b/tests/unit/open_mpic_core/test_mpic_coordinator.py index 2925593..be37e79 100644 --- a/tests/unit/open_mpic_core/test_mpic_coordinator.py +++ b/tests/unit/open_mpic_core/test_mpic_coordinator.py @@ -569,7 +569,7 @@ async def coordinate_mpic__should_be_able_to_trace_timing_of_remote_perspective_ log_contents = self.log_output.getvalue() assert all(text in log_contents for text in ["seconds", "TRACE", mpic_coordinator.logger.name]) - async def coordinate_mpi__should_not_log_trace_timings_if_trace_level_logging_is_not_enabled(self): + async def coordinate_mpic__should_not_log_trace_timings_if_trace_level_logging_is_not_enabled(self): mpic_request = ValidMpicRequestCreator.create_valid_caa_mpic_request() mpic_coordinator_config = self.create_mpic_coordinator_configuration() mocked_call_perspective_function = AsyncMock() @@ -629,6 +629,18 @@ def create_passing_caa_check_response( details=CaaCheckResponseDetails(caa_record_present=False), ) + # noinspection PyUnusedLocal + def create_passing_dcv_check_response( + self, perspective: RemotePerspective, check_type: CheckType, check_request + ): + from open_mpic_core import DcvCheckResponse, DcvCheckResponseDetailsBuilder + validation_method = check_request.dcv_check_parameters.validation_method + return DcvCheckResponse( + check_completed=True, + check_passed=True, + details=DcvCheckResponseDetailsBuilder.build_response_details(validation_method), + ) + # noinspection PyUnusedLocal def create_failing_remote_caa_check_response( self, perspective: RemotePerspective, check_type: CheckType, check_request_serialized: str diff --git a/tests/unit/open_mpic_core/test_mpic_dcv_checker.py b/tests/unit/open_mpic_core/test_mpic_dcv_checker.py index 97b2258..244a3aa 100644 --- a/tests/unit/open_mpic_core/test_mpic_dcv_checker.py +++ b/tests/unit/open_mpic_core/test_mpic_dcv_checker.py @@ -1,6 +1,8 @@ import asyncio import base64 import logging +import time + import dns import random import dns.rdatatype @@ -26,6 +28,7 @@ from open_mpic_core import DcvTlsAlpnValidator, DcvCheckResponseDetailsBuilder from open_mpic_core import DcvValidationMethod, DnsRecordType from open_mpic_core import MpicValidationError, ErrorMessages, TRACE_LEVEL +from open_mpic_core.mpic_dcv_checker.mpic_dcv_checker import ExpectedDnsRecordContent from unit.test_util.mock_dns_object_creator import MockDnsObjectCreator from unit.test_util.valid_check_creator import ValidCheckCreator @@ -88,6 +91,7 @@ def mpic_dcv_checker__should_be_able_to_log_at_trace_level(self): (DcvValidationMethod.DNS_CHANGE, DnsRecordType.TXT), (DcvValidationMethod.DNS_CHANGE, DnsRecordType.CNAME), (DcvValidationMethod.DNS_CHANGE, DnsRecordType.CAA), + (DcvValidationMethod.DNS_PERSISTENT, None), (DcvValidationMethod.CONTACT_EMAIL_TXT, None), (DcvValidationMethod.CONTACT_EMAIL_CAA, None), (DcvValidationMethod.CONTACT_PHONE_TXT, None), @@ -120,6 +124,7 @@ async def check_dcv__should_perform_appropriate_check_and_allow_issuance_given_t (DcvValidationMethod.DNS_CHANGE, DnsRecordType.TXT, True), (DcvValidationMethod.DNS_CHANGE, DnsRecordType.CNAME, True), (DcvValidationMethod.DNS_CHANGE, DnsRecordType.CAA, True), + # (DcvValidationMethod.DNS_PERSISTENT, None, True), # Skipped: no challenge_value (DcvValidationMethod.CONTACT_EMAIL_TXT, None, True), (DcvValidationMethod.CONTACT_EMAIL_CAA, None, True), (DcvValidationMethod.CONTACT_PHONE_TXT, None, True), @@ -134,6 +139,9 @@ async def check_dcv__should_perform_appropriate_check_and_allow_issuance_given_t async def check_dcv__should_be_case_insensitive_for_challenge_values_for_all_validation_methods_except_acme( self, dcv_method, record_type, is_case_insensitive, mocker ): + if dcv_method == DcvValidationMethod.DNS_PERSISTENT: + pytest.skip("DNS_PERSISTENT does not use challenge_value for case sensitivity test") + dcv_request = ValidCheckCreator.create_valid_dcv_check_request(dcv_method, record_type) if dcv_method in (DcvValidationMethod.CONTACT_PHONE_TXT, DcvValidationMethod.CONTACT_PHONE_CAA): # technically this should be case-insensitive, but also it would usually have digits... @@ -205,7 +213,7 @@ async def check_dcv__should_handle_domains_with_non_ascii_characters( dcv_request.domain_or_ip_target = encoded_domain # do this first for mocking self._mock_request_specific_dns_resolve_call(dcv_request, mocker) - dcv_request.domain_or_ip_target = domain # set to original to see if the mock triggers as expected + dcv_request.domain_or_ip_target = domain # set to original (mock expects punycode; testing if that happens) dcv_response = await self.dcv_checker.check_dcv(dcv_request) assert dcv_response.check_passed is True @@ -469,6 +477,56 @@ async def http_based_dcv_checks__should_not_pass_on_invalid_redirect_code_or_por dcv_response = await self.dcv_checker.check_dcv(dcv_request) assert dcv_response.check_passed is False + @pytest.mark.parametrize("dcv_method", [DcvValidationMethod.WEBSITE_CHANGE, DcvValidationMethod.ACME_HTTP_01]) + async def http_based_dcv_checks__should_format_ipv6_addresses_with_square_brackets_in_url(self, dcv_method, mocker): + dcv_request = ValidCheckCreator.create_valid_dcv_check_request(dcv_method) + ipv6_address = "2001:db8::1" + dcv_request.domain_or_ip_target = ipv6_address + + if dcv_method == DcvValidationMethod.WEBSITE_CHANGE: + expected_challenge = dcv_request.dcv_check_parameters.challenge_value + url_scheme = dcv_request.dcv_check_parameters.url_scheme + http_token_path = dcv_request.dcv_check_parameters.http_token_path + expected_url = f"{url_scheme}://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}" + else: + expected_challenge = dcv_request.dcv_check_parameters.key_authorization + token = dcv_request.dcv_check_parameters.token + expected_url = f"http://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" + + success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge) + mock_get = mocker.patch( + "aiohttp.ClientSession.get", + side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)), + ) + + dcv_response = await self.dcv_checker.check_dcv(dcv_request) + + # Verify the URL used in the request contains properly formatted IPv6 + assert mock_get.call_args.kwargs["url"] == expected_url + assert dcv_response.details.response_url == expected_url + + async def http_based_dcv_checks__should_not_modify_ipv4_addresses_in_url(self, mocker): + dcv_request = ValidCheckCreator.create_valid_dcv_check_request(DcvValidationMethod.WEBSITE_CHANGE) + ipv4_address = "192.168.1.1" + dcv_request.domain_or_ip_target = ipv4_address + + expected_challenge = dcv_request.dcv_check_parameters.challenge_value + url_scheme = dcv_request.dcv_check_parameters.url_scheme + http_token_path = dcv_request.dcv_check_parameters.http_token_path + expected_url = f"{url_scheme}://{ipv4_address}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}" + + success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge) + mock_get = mocker.patch( + "aiohttp.ClientSession.get", + side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)), + ) + + dcv_response = await self.dcv_checker.check_dcv(dcv_request) + + # Verify the URL used in the request contains IPv4 without modification + assert mock_get.call_args.kwargs["url"] == expected_url + assert dcv_response.details.response_url == expected_url + @pytest.mark.parametrize("url_scheme", ["http", "https"]) async def website_change_validation__should_use_specified_url_scheme(self, url_scheme, mocker): dcv_request = ValidCheckCreator.create_valid_http_check_request() @@ -583,40 +641,34 @@ async def dns_validation__should_use_dns_name_prefix_if_provided(self, dns_name_ dcv_response = await self.dcv_checker.perform_general_dns_validation(dcv_request) assert dcv_response.check_passed is True if dns_name_prefix is not None and len(dns_name_prefix) > 0: - mock_dns_resolver_resolve.assert_called_once_with( - qname=f"{dns_name_prefix}.{dcv_request.domain_or_ip_target}", rdtype=dns.rdatatype.TXT - ) + expected_domain = dns.name.from_text(f"{dns_name_prefix}.{dcv_request.domain_or_ip_target}") else: - mock_dns_resolver_resolve.assert_called_once_with( - qname=dcv_request.domain_or_ip_target, rdtype=dns.rdatatype.TXT - ) + expected_domain = dns.name.from_text(dcv_request.domain_or_ip_target) + mock_dns_resolver_resolve.assert_called_once_with(qname=expected_domain, rdtype=dns.rdatatype.TXT) async def acme_dns_validation__should_auto_insert_acme_challenge_prefix(self, mocker): dcv_request = ValidCheckCreator.create_valid_acme_dns_01_check_request() mock_dns_resolver_resolve = self._mock_request_specific_dns_resolve_call(dcv_request, mocker) dcv_response = await self.dcv_checker.perform_general_dns_validation(dcv_request) assert dcv_response.check_passed is True - mock_dns_resolver_resolve.assert_called_once_with( - qname=f"_acme-challenge.{dcv_request.domain_or_ip_target}", rdtype=dns.rdatatype.TXT - ) + expected_domain = dns.name.from_text(f"_acme-challenge.{dcv_request.domain_or_ip_target}") + mock_dns_resolver_resolve.assert_called_once_with(qname=expected_domain, rdtype=dns.rdatatype.TXT) async def contact_email_txt_lookup__should_auto_insert_validation_prefix(self, mocker): dcv_request = ValidCheckCreator.create_valid_contact_check_request(DcvValidationMethod.CONTACT_EMAIL_TXT) mock_dns_resolver_resolve = self._mock_request_specific_dns_resolve_call(dcv_request, mocker) dcv_response = await self.dcv_checker.perform_general_dns_validation(dcv_request) assert dcv_response.check_passed is True - mock_dns_resolver_resolve.assert_called_once_with( - qname=f"_validation-contactemail.{dcv_request.domain_or_ip_target}", rdtype=dns.rdatatype.TXT - ) + expected_domain = dns.name.from_text(f"_validation-contactemail.{dcv_request.domain_or_ip_target}") + mock_dns_resolver_resolve.assert_called_once_with(qname=expected_domain, rdtype=dns.rdatatype.TXT) async def contact_phone_txt_lookup__should_auto_insert_validation_prefix(self, mocker): dcv_request = ValidCheckCreator.create_valid_contact_check_request(DcvValidationMethod.CONTACT_PHONE_TXT) mock_dns_resolver_resolve = self._mock_request_specific_dns_resolve_call(dcv_request, mocker) dcv_response = await self.dcv_checker.perform_general_dns_validation(dcv_request) assert dcv_response.check_passed is True - mock_dns_resolver_resolve.assert_called_once_with( - qname=f"_validation-contactphone.{dcv_request.domain_or_ip_target}", rdtype=dns.rdatatype.TXT - ) + expected_domain = dns.name.from_text(f"_validation-contactphone.{dcv_request.domain_or_ip_target}") + mock_dns_resolver_resolve.assert_called_once_with(qname=expected_domain, rdtype=dns.rdatatype.TXT) # fmt: off @pytest.mark.parametrize("dcv_method, tag, expected_result", [ @@ -673,6 +725,171 @@ async def dns_based_dcv_checks__should_not_pass_given_non_matching_dns_record(se dcv_response = await self.dcv_checker.check_dcv(dcv_request) assert dcv_response.check_passed is False + @pytest.mark.parametrize("set_persist_until_parameter", [True, False]) + def evaluate_persistent_dns_response__should_return_true_given_valid_record( + self, set_persist_until_parameter + ): + issuer_domain_name = "ca.example.com" + expected_account_uri = "https://ca.example.com/acct/123" + + if set_persist_until_parameter: + future_timestamp = int(time.time()) + 3600 # 1 hour in the future + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}; persistUntil={future_timestamp}"] + else: + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is True + + def evaluate_persistent_dns_response__should_be_case_insensitive(self): + issuer_domain_name = "cA.EXaMPle.com" + expected_account_uri = "https://cA.EXaMPle.com/acct/123" + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name.lower()], + expected_parameters={"accounturi": expected_account_uri.lower()}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is True + + def evaluate_persistent_dns_response__should_ignore_additional_unknown_parameters(self): + issuer_domain_name = "ca.example.com" + expected_account_uri = "https://ca.example.com/acct/123" + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}; customParam=foo; anotherParam=123"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is True + + def evaluate_persistent_dns_response__should_return_false_given_no_matching_issuer_domain(self): + issuer_domain_name = "nonmatching.example.com" + expected_account_uri = "https://ca.example.com/acct/123" + records = [f"{issuer_domain_name}; accounturi={expected_account_uri};"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=["ca.example.com"], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is False + + def evaluate_persistent_dns_response__should_return_false_given_no_matching_account_uri(self): + issuer_domain_name = "ca.example.com" + expected_account_uri = "https://ca.example.com/acct/foo123" + records = [f"{issuer_domain_name}; accounturi=https://ca.example.com/acct/bar456;"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is False + + def evaluate_persistent_dns_response__should_return_false_given_expired_persist_until(self): + issuer_domain_name = "ca.example.com" + expected_account_uri = "https://ca.example.com/acct/123" + past_timestamp = int(time.time()) - 3600 # 1 hour in the past + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}; persistUntil={past_timestamp}"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is False + + def evaluate_persistent_dns_response__should_return_false_given_missing_account_uri_parameter(self): + issuer_domain_name = "ca.example.com" + records = [f"{issuer_domain_name}; persistUntil={int(time.time())+3600}"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": "https://ca.example.com/acct/123"}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is False + + def evaluate_persistent_dns_response__should_return_false_given_malformed_persist_until_parameter(self): + issuer_domain_name = "ca.example.com" + expected_account_uri = "https://ca.example.com/acct/123" + records = [f"{issuer_domain_name}; accounturi={expected_account_uri}; persistUntil={int(time.time())+3600}foo"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=[issuer_domain_name], + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is False + + def evaluate_persistent_dns_response__should_return_true_given_any_record_in_the_provided_list_is_valid(self): + issuer_domain_names = ["ca.example.com", "ca1.example.com"] + expected_account_uri = "https://ca.example.com/acct/123" + time_now = int(time.time()) + records = [ + f"bad.example.com; accounturi=https://ca.example.com/acct/123; persistUntil={time_now + 3600}", + f"ca.example.com; accounturi=https://bad.example.com/acct/456; persistUntil={time_now + 3600}", + f"ca1.example.com; accounturi=https://ca.example.com/acct/123; persistUntil={time_now - 10}", + f"ca.example.com; accounturi=https://ca.example.com/acct/123; persistUntil={time_now + 3600}", # Valid + ] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=issuer_domain_names, + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is True, "Should pass if any record is valid" + + def evaluate_persistent_dns_response__should_accept_match_for_any_issuer_in_the_provided_list(self): + issuer_domain_names = ["ca.example.com", "alt.example.com"] + expected_account_uri = "https://ca.example.com/acct/123" + time_now = int(time.time()) + records = [f"alt.example.com; accounturi=https://ca.example.com/acct/123; persistUntil={time_now + 3600}"] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=issuer_domain_names, + expected_parameters={"accounturi": expected_account_uri}, + ) + + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, records) + assert result is True, "Should pass with second allowed issuer domain" + + def evaluate_persistent_dns_response__should_return_false_given_malformed_record(self): + issuer_domain_names = ["ca.example"] + expected_account_uri = "https://ca.example/acct/123" + malformed_records = [ + ";;;", # Only separators + "ca.example", # Missing parameters + "ca.example;", # Parameter separator but no parameters + "ca.example; =value", # Missing parameter name + "ca.example; accounturi", # Missing value + ] + + expected_dns_record_content = ExpectedDnsRecordContent( + possible_values=issuer_domain_names, + expected_parameters={"accounturi": expected_account_uri}, + ) + + for record in malformed_records: + result = MpicDcvChecker.evaluate_persistent_dns_response(expected_dns_record_content, [record]) + assert result is False, f"Should fail with malformed record: {record}" + @pytest.mark.parametrize("dcv_method", [DcvValidationMethod.DNS_CHANGE, DcvValidationMethod.ACME_DNS_01]) async def dns_based_dcv_checks__should_return_timestamp_and_list_of_records_seen(self, dcv_method, mocker): dcv_request = ValidCheckCreator.create_valid_dcv_check_request(dcv_method) @@ -747,7 +964,7 @@ async def dns_based_dcv_checks__should_not_pass_with_errors_given_exception_rais async def is_expected_ip_address_in_response__should_return_true_if_valid_record_exists_alongside_malformed_records( self, record_type ): - records_as_strings = ["foo", "bar"] + records_as_strings = ["foo", "bar", "1.1.1.1"] if record_type is DnsRecordType.A: expected_record = "1.2.3.4" records_as_strings.append(expected_record) @@ -897,8 +1114,11 @@ def _mock_request_specific_dns_resolve_call(self, dcv_request: DcvCheckRequest, expected_domain = f"_validation-contactphone.{dcv_request.domain_or_ip_target}" case DcvValidationMethod.CONTACT_EMAIL_TXT: expected_domain = f"_validation-contactemail.{dcv_request.domain_or_ip_target}" - case DcvValidationMethod.CONTACT_PHONE_CAA | DcvValidationMethod.CONTACT_EMAIL_CAA: - expected_domain = dns.name.from_text(expected_domain) # CAA -- using dns names instead of strings + case DcvValidationMethod.DNS_PERSISTENT: + expected_domain = f"_validation-persist.{dcv_request.domain_or_ip_target}" + + # expecting a dns name instead of string from the DCV checker (avoiding use of search directive in resolv.conf) + expected_domain = dns.name.from_text(expected_domain) test_dns_query_answer = self._create_basic_dns_response_for_mock(dcv_request, mocker) # noinspection PyUnusedLocal @@ -971,6 +1191,12 @@ def _create_basic_dns_response_for_mock(self, dcv_request: DcvCheckRequest, mock record_data = {"flags": "", "tag": "issue", "value": check_parameters.challenge_value} else: record_data = {"value": check_parameters.challenge_value} + case DcvValidationMethod.DNS_PERSISTENT: + issuer_domain = check_parameters.issuer_domain_names[0] + account_uri = check_parameters.expected_account_uri + persist_until = int(time.time()) + 365*24*60*60 # 1 year from now + persistent_value = f"{issuer_domain}; accounturi={account_uri}; persistUntil={persist_until}" + record_data = {"value": persistent_value} case DcvValidationMethod.CONTACT_EMAIL_CAA: record_data = {"flags": "", "tag": "contactemail", "value": check_parameters.challenge_value} case DcvValidationMethod.CONTACT_PHONE_CAA: @@ -1026,58 +1252,6 @@ def format_host_for_url__should_wrap_ipv6_addresses_in_square_brackets(self, inp result = MpicDcvChecker.format_host_for_url(input_target) assert result == expected_output - @pytest.mark.parametrize("dcv_method", [DcvValidationMethod.WEBSITE_CHANGE, DcvValidationMethod.ACME_HTTP_01]) - async def http_based_dcv_checks__should_format_ipv6_addresses_with_square_brackets_in_url( - self, dcv_method, mocker - ): - dcv_request = ValidCheckCreator.create_valid_dcv_check_request(dcv_method) - ipv6_address = "2001:db8::1" - dcv_request.domain_or_ip_target = ipv6_address - - if dcv_method == DcvValidationMethod.WEBSITE_CHANGE: - expected_challenge = dcv_request.dcv_check_parameters.challenge_value - url_scheme = dcv_request.dcv_check_parameters.url_scheme - http_token_path = dcv_request.dcv_check_parameters.http_token_path - expected_url = f"{url_scheme}://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}" - else: - expected_challenge = dcv_request.dcv_check_parameters.key_authorization - token = dcv_request.dcv_check_parameters.token - expected_url = f"http://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" - - success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge) - mock_get = mocker.patch( - "aiohttp.ClientSession.get", - side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)), - ) - - dcv_response = await self.dcv_checker.check_dcv(dcv_request) - - # Verify the URL used in the request contains properly formatted IPv6 - assert mock_get.call_args.kwargs["url"] == expected_url - assert dcv_response.details.response_url == expected_url - - async def http_based_dcv_checks__should_not_modify_ipv4_addresses_in_url(self, mocker): - dcv_request = ValidCheckCreator.create_valid_dcv_check_request(DcvValidationMethod.WEBSITE_CHANGE) - ipv4_address = "192.168.1.1" - dcv_request.domain_or_ip_target = ipv4_address - - expected_challenge = dcv_request.dcv_check_parameters.challenge_value - url_scheme = dcv_request.dcv_check_parameters.url_scheme - http_token_path = dcv_request.dcv_check_parameters.http_token_path - expected_url = f"{url_scheme}://{ipv4_address}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}" - - success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge) - mock_get = mocker.patch( - "aiohttp.ClientSession.get", - side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)), - ) - - dcv_response = await self.dcv_checker.check_dcv(dcv_request) - - # Verify the URL used in the request contains IPv4 without modification - assert mock_get.call_args.kwargs["url"] == expected_url - assert dcv_response.details.response_url == expected_url - @staticmethod def shuffle_case(string_to_shuffle: str) -> str: result = "".join( diff --git a/tests/unit/open_mpic_core/test_mpic_dcv_request.py b/tests/unit/open_mpic_core/test_mpic_dcv_request.py index 4a50b70..34cf81b 100644 --- a/tests/unit/open_mpic_core/test_mpic_dcv_request.py +++ b/tests/unit/open_mpic_core/test_mpic_dcv_request.py @@ -15,7 +15,7 @@ class TestMpicDcvRequest: def model_validate_json__should_return_dcv_mpic_request_given_valid_dcv_json(self): request = ValidMpicRequestCreator.create_valid_dcv_mpic_request() - mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert mpic_request.domain_or_ip_target == request.domain_or_ip_target def mpic_dcv_request__should_require_dcv_check_parameters(self): @@ -23,14 +23,14 @@ def mpic_dcv_request__should_require_dcv_check_parameters(self): # noinspection PyTypeChecker request.dcv_check_parameters = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "dcv_check_parameters" in str(validation_error.value) def mpic_dcv_request__should_require_validation_method_in_check_parameters(self): request = ValidMpicRequestCreator.create_valid_dcv_mpic_request() request.dcv_check_parameters.validation_method = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "validation_method" in str(validation_error.value) def mpic_dcv_request__should_require_valid_validation_method_in_check_parameters(self): @@ -45,7 +45,7 @@ def mpic_dcv_request__should_require_challenge_value_in_check_parameters(self): # noinspection PyTypeChecker request.dcv_check_parameters.challenge_value = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "challenge_value" in str(validation_error.value) def mpic_dcv_request__should_require_dns_record_type_for_dns_change_validation(self): @@ -53,7 +53,7 @@ def mpic_dcv_request__should_require_dns_record_type_for_dns_change_validation(s # noinspection PyTypeChecker request.dcv_check_parameters.dns_record_type = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "dns_record_type" in str(validation_error.value) def mpic_dcv_request__should_require_valid_dns_record_type_for_dns_change_validation(self): @@ -70,7 +70,7 @@ def mpic_dcv_request__should_require_http_token_path_for_website_change_validati # noinspection PyTypeChecker request.dcv_check_parameters.http_token_path = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "http_token_path" in str(validation_error.value) def mpic_dcv_request__should_require_token_for_acme_http_01_validation(self): @@ -78,7 +78,7 @@ def mpic_dcv_request__should_require_token_for_acme_http_01_validation(self): # noinspection PyTypeChecker request.dcv_check_parameters.token = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "token" in str(validation_error.value) @pytest.mark.parametrize("validation_method", [DcvValidationMethod.ACME_HTTP_01, DcvValidationMethod.ACME_DNS_01]) @@ -90,12 +90,12 @@ def mpic_dcv_request__should_require_key_authorization_for_acme_validations(self else: request.dcv_check_parameters.key_authorization_hash = None with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert "key_authorization" in str(validation_error.value) def mpic_dcv_request__should_have_check_type_set_to_dcv(self): request = ValidMpicRequestCreator.create_valid_dcv_mpic_request() - mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert mpic_request.check_type == CheckType.DCV def mpic_dcv_request__should_default_to_http_scheme_for_website_change_validation_given_no_scheme_specified(self): @@ -105,7 +105,7 @@ def mpic_dcv_request__should_default_to_http_scheme_for_website_change_validatio challenge_value="test", http_token_path="example-path", ) - mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + mpic_request = MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert mpic_request.dcv_check_parameters.url_scheme == UrlScheme.HTTP @pytest.mark.parametrize( @@ -121,7 +121,7 @@ def mpic_dcv_request__should_enforce_domain_prefix_for_contact_lookup_for_txt_re request = ValidMpicRequestCreator.create_valid_dcv_mpic_request(validation_method) request.dcv_check_parameters.dns_name_prefix = "moo" with pytest.raises(pydantic.ValidationError) as validation_error: - MpicDcvRequest.model_validate_json(json.dumps(request.model_dump())) + MpicDcvRequest.model_validate_json(json.dumps(request.model_dump(warnings=False))) assert expected_prefix in str(validation_error.value) diff --git a/tests/unit/test_util/valid_check_creator.py b/tests/unit/test_util/valid_check_creator.py index 7f1dee5..0ed154c 100644 --- a/tests/unit/test_util/valid_check_creator.py +++ b/tests/unit/test_util/valid_check_creator.py @@ -3,6 +3,7 @@ from open_mpic_core import ( DcvWebsiteChangeValidationParameters, DcvDnsChangeValidationParameters, + DcvDnsPersistentValidationParameters, CaaCheckParameters, DcvAcmeHttp01ValidationParameters, DcvAcmeDns01ValidationParameters, @@ -72,6 +73,16 @@ def create_valid_contact_check_request(validation_method: DcvValidationMethod) - # check_request.dcv_check_parameters.require_exact_match = True return check_request + @staticmethod + def create_valid_dns_persistent_check_request() -> DcvCheckRequest: + return DcvCheckRequest( + domain_or_ip_target="example.com", + dcv_check_parameters=DcvDnsPersistentValidationParameters( + issuer_domain_names=["authority.example"], + expected_account_uri="https://authority.example/acct/123" + ), + ) + @staticmethod def create_valid_ip_lookup_check_request(record_type=DnsRecordType.A) -> DcvCheckRequest: check_request = DcvCheckRequest( @@ -127,6 +138,8 @@ def create_valid_dcv_check_request(validation_method: DcvValidationMethod, recor if record_type is None: record_type = DnsRecordType.TXT return ValidCheckCreator.create_valid_dns_check_request(record_type) + case DcvValidationMethod.DNS_PERSISTENT: + return ValidCheckCreator.create_valid_dns_persistent_check_request() case DcvValidationMethod.ACME_HTTP_01: return ValidCheckCreator.create_valid_acme_http_01_check_request() case DcvValidationMethod.ACME_DNS_01: diff --git a/tests/unit/test_util/valid_mpic_request_creator.py b/tests/unit/test_util/valid_mpic_request_creator.py index 328542e..e5f637c 100644 --- a/tests/unit/test_util/valid_mpic_request_creator.py +++ b/tests/unit/test_util/valid_mpic_request_creator.py @@ -1,6 +1,7 @@ from open_mpic_core import ( CaaCheckParameters, DcvDnsChangeValidationParameters, + DcvDnsPersistentValidationParameters, DcvWebsiteChangeValidationParameters, DcvAcmeDns01ValidationParameters, DcvAcmeHttp01ValidationParameters, @@ -66,4 +67,9 @@ def create_check_parameters( check_parameters = DcvContactEmailCaaValidationParameters(challenge_value="test") case DcvValidationMethod.CONTACT_EMAIL_TXT: check_parameters = DcvContactEmailTxtValidationParameters(challenge_value="test") + case DcvValidationMethod.DNS_PERSISTENT: + check_parameters = DcvDnsPersistentValidationParameters( + issuer_domain_names=["authority.example"], + expected_account_uri="https://authority.example/acct/123" + ) return check_parameters