From 8e6d1009eb035a404f25d1d24b392168cabc47ee Mon Sep 17 00:00:00 2001 From: Henry Birge-Lee Date: Fri, 31 Jan 2025 20:24:35 -0500 Subject: [PATCH 1/4] new integration tests for acme http-01 and website change --- tests/integration/test_deployed_mpic_api.py | 65 ++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_deployed_mpic_api.py b/tests/integration/test_deployed_mpic_api.py index a14ce96..9c3c1eb 100644 --- a/tests/integration/test_deployed_mpic_api.py +++ b/tests/integration/test_deployed_mpic_api.py @@ -5,7 +5,7 @@ from pydantic import TypeAdapter -from open_mpic_core.common_domain.check_parameters import CaaCheckParameters, DcvWebsiteChangeValidationDetails, DcvAcmeDns01ValidationDetails, DcvDnsChangeValidationDetails +from open_mpic_core.common_domain.check_parameters import CaaCheckParameters, DcvWebsiteChangeValidationDetails, DcvAcmeDns01ValidationDetails, DcvAcmeHttp01ValidationDetails, DcvDnsChangeValidationDetails from open_mpic_core.common_domain.check_parameters import DcvCheckParameters from open_mpic_core.common_domain.enum.certificate_type import CertificateType from open_mpic_core.common_domain.enum.check_type import CheckType @@ -191,6 +191,69 @@ def api_should_return_200_is_valid_false_given_invalid_dns_01_validation(self, a assert mpic_response.is_valid is False + + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, token, key_authorization', [ + ('integration-testing.open-mpic.org', 'Standard http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs"), + ('integration-testing.open-mpic.org', 'Redirect 302 http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs") + ]) + def api_should_return_200_given_valid_http_01_validation(self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization): + print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") + request = MpicDcvRequest( + domain_or_ip_target=domain_or_ip_target, + orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), + dcv_check_parameters=DcvCheckParameters( + validation_details=DcvAcmeHttp01ValidationDetails(key_authorization=key_authorization, token=token) + ) + ) + print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body + response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) + assert response.status_code == 200 + mpic_response = self.mpic_response_adapter.validate_json(response.text) + + assert mpic_response.is_valid is True + + + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, token, key_authorization', [ + ('integration-testing.open-mpic.org', 'Failed http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa"), + ('integration-testing.open-mpic.org', 'Failed 302 http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa") + ]) + def api_should_return_200_given_invalid_http_01_validation(self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization): + print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") + request = MpicDcvRequest( + domain_or_ip_target=domain_or_ip_target, + orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), + dcv_check_parameters=DcvCheckParameters( + validation_details=DcvAcmeHttp01ValidationDetails(key_authorization=key_authorization, token=token) + ) + ) + print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body + response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) + assert response.status_code == 200 + mpic_response = self.mpic_response_adapter.validate_json(response.text) + + assert mpic_response.is_valid is False + + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, http_token_path, challenge_value', [ + ('integration-testing.open-mpic.org', 'Valid website change v2 challenge', 'validation-doc.txt', 'test-validation'), + ('integration-testing.open-mpic.org', 'Valid 302 website change v2 challenge', 'validation-doc-redirect.txt', "test-validation-redirect") + ]) + def api_should_return_200_given_valid_website_change_validation(self, api_client, domain_or_ip_target, purpose_of_test, http_token_path, challenge_value): + print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") + request = MpicDcvRequest( + domain_or_ip_target=domain_or_ip_target, + orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), + dcv_check_parameters=DcvCheckParameters( + validation_details=DcvWebsiteChangeValidationDetails(http_token_path=http_token_path, + challenge_value=challenge_value) + ) + ) + print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body + response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) + assert response.status_code == 200 + mpic_response = self.mpic_response_adapter.validate_json(response.text) + + assert mpic_response.is_valid is True + @pytest.mark.parametrize('domain_or_ip_target, dns_record_type, challenge_value, purpose_of_test', [ ('dns-change-txt.integration-testing.open-mpic.org', DnsRecordType.TXT, "1234567890abcdefg.", 'standard TXT dns change'), ('dns-change-cname.integration-testing.open-mpic.org', DnsRecordType.CNAME, "1234567890abcdefg.", 'standard CNAME dns change'), From 1c91cbe82ac13b25fef2c75996c4ad79630f7e90 Mon Sep 17 00:00:00 2001 From: Henry Birge-Lee Date: Sat, 1 Feb 2025 13:58:43 -0500 Subject: [PATCH 2/4] black format --- tests/integration/test_deployed_mpic_api.py | 534 +++++++++++++++----- 1 file changed, 408 insertions(+), 126 deletions(-) diff --git a/tests/integration/test_deployed_mpic_api.py b/tests/integration/test_deployed_mpic_api.py index 9c3c1eb..9eaf89e 100644 --- a/tests/integration/test_deployed_mpic_api.py +++ b/tests/integration/test_deployed_mpic_api.py @@ -5,18 +5,28 @@ from pydantic import TypeAdapter -from open_mpic_core.common_domain.check_parameters import CaaCheckParameters, DcvWebsiteChangeValidationDetails, DcvAcmeDns01ValidationDetails, DcvAcmeHttp01ValidationDetails, DcvDnsChangeValidationDetails +from open_mpic_core.common_domain.check_parameters import ( + CaaCheckParameters, + DcvWebsiteChangeValidationDetails, + DcvAcmeDns01ValidationDetails, + DcvAcmeHttp01ValidationDetails, + DcvDnsChangeValidationDetails, +) from open_mpic_core.common_domain.check_parameters import DcvCheckParameters from open_mpic_core.common_domain.enum.certificate_type import CertificateType from open_mpic_core.common_domain.enum.check_type import CheckType from open_mpic_core.common_domain.enum.dns_record_type import DnsRecordType from open_mpic_core.mpic_coordinator.domain.mpic_request import MpicCaaRequest from open_mpic_core.mpic_coordinator.domain.mpic_request import MpicDcvRequest -from open_mpic_core.mpic_coordinator.domain.mpic_orchestration_parameters import MpicRequestOrchestrationParameters +from open_mpic_core.mpic_coordinator.domain.mpic_orchestration_parameters import ( + MpicRequestOrchestrationParameters, +) import testing_api_client from open_mpic_core.mpic_coordinator.domain.mpic_response import MpicResponse -from open_mpic_core.mpic_coordinator.messages.mpic_request_validation_messages import MpicRequestValidationMessages +from open_mpic_core.mpic_coordinator.messages.mpic_request_validation_messages import ( + MpicRequestValidationMessages, +) MPIC_REQUEST_PATH = "/mpic" @@ -28,20 +38,22 @@ class TestDeployedMpicApi: def setup_class(cls): cls.mpic_response_adapter = TypeAdapter(MpicResponse) - @pytest.fixture(scope='class') + @pytest.fixture(scope="class") def api_client(self): with pytest.MonkeyPatch.context() as class_scoped_monkeypatch: # blank out argv except first param; arg parser doesn't expect pytest args - class_scoped_monkeypatch.setattr(sys, 'argv', sys.argv[:1]) + class_scoped_monkeypatch.setattr(sys, "argv", sys.argv[:1]) api_client = testing_api_client.TestingApiClient() yield api_client api_client.close() def api_should_return_200_and_passed_corroboration_given_successful_caa_check(self, api_client): request = MpicCaaRequest( - domain_or_ip_target='example.com', + domain_or_ip_target="example.com", orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), - caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['mozilla.com']) + caa_check_parameters=CaaCheckParameters( + certificate_type=CertificateType.TLS_SERVER, caa_domains=["mozilla.com"] + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -54,35 +66,124 @@ def api_should_return_200_and_passed_corroboration_given_successful_caa_check(se assert mpic_response.is_valid is True perspectives_list = mpic_response.perspectives assert len(perspectives_list) == request.orchestration_parameters.perspective_count - assert (len(list(filter(lambda perspective: perspective.check_type == CheckType.CAA, perspectives_list))) - == request.orchestration_parameters.perspective_count) - - @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, is_wildcard_domain', [ - ('empty.basic.caatestsuite.com', 'Tests handling of 0 issue ";"', False), - ('deny.basic.caatestsuite.com', 'Tests handling of 0 issue "caatestsuite.com"', False), - ('uppercase-deny.basic.caatestsuite.com', 'Tests handling of uppercase issue tag (0 ISSUE "caatestsuite.com")', False), - ('mixedcase-deny.basic.caatestsuite.com', 'Tests handling of mixed case issue tag (0 IsSuE "caatestsuite.com")', False), - ('big.basic.caatestsuite.com', 'Tests handling of gigantic (1001) CAA record set (0 issue "caatestsuite.com")', False), - ('critical1.basic.caatestsuite.com', 'Tests handling of unknown critical property (128 caatestsuitedummyproperty "test")', False), - ('critical2.basic.caatestsuite.com', 'Tests handling of unknown critical property with another flag (130)', False), - ('sub1.deny.basic.caatestsuite.com', 'Tests basic tree climbing when CAA record is at parent domain', False), - ('sub2.sub1.deny.basic.caatestsuite.com', 'Tests tree climbing when CAA record is at grandparent domain', False), - ('deny.basic.caatestsuite.com', 'Tests handling of issue property for a wildcard domain', True), - ('deny-wild.basic.caatestsuite.com', 'Tests handling of issuewild for a wildcard domain', True), - ('cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME, where CAA record is at CNAME target', False), - ('cname-cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME chain, where CAA record is at ultimate target', False), - ('sub1.cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME, where parent is CNAME and CAA record is at target', False), - ('deny.permit.basic.caatestsuite.com', 'Tests rejection when parent name contains a permissible CAA record set', False), - ('ipv6only.caatestsuite.com', 'Tests handling of record at IPv6-only authoritative name server', False), - #('expired.caatestsuite-dnssec.com', 'Tests rejection when expired DNSSEC signatures', False), # DNSSEC SHOULD be enabled in production but is not a current requirement for MPIC - #('missing.caatestsuite-dnssec.com', 'Tests rejection when missing DNSSEC signatures', False), # DNSSEC SHOULD be enabled in production but is not a current requirement for MPIC - ('blackhole.caatestsuite-dnssec.com', 'Tests rejection when DNSSEC chain goes to non-responsive server', False), - ('servfail.caatestsuite-dnssec.com', 'Tests rejection when DNSSEC chain goes to server returning SERVFAIL', False), - ('refused.caatestsuite-dnssec.com', 'Tests rejection when DNSSEC chain goes to server returning REFUSED', False), - ('xss.caatestsuite.com', 'Tests rejection when issue property has HTML and JS', False), - ]) - def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suite(self, api_client, domain_or_ip_target, - purpose_of_test, is_wildcard_domain): + assert ( + len( + list( + filter( + lambda perspective: perspective.check_type == CheckType.CAA, + perspectives_list, + ) + ) + ) + == request.orchestration_parameters.perspective_count + ) + + @pytest.mark.parametrize( + "domain_or_ip_target, purpose_of_test, is_wildcard_domain", + [ + ("empty.basic.caatestsuite.com", 'Tests handling of 0 issue ";"', False), + ( + "deny.basic.caatestsuite.com", + 'Tests handling of 0 issue "caatestsuite.com"', + False, + ), + ( + "uppercase-deny.basic.caatestsuite.com", + 'Tests handling of uppercase issue tag (0 ISSUE "caatestsuite.com")', + False, + ), + ( + "mixedcase-deny.basic.caatestsuite.com", + 'Tests handling of mixed case issue tag (0 IsSuE "caatestsuite.com")', + False, + ), + ( + "big.basic.caatestsuite.com", + 'Tests handling of gigantic (1001) CAA record set (0 issue "caatestsuite.com")', + False, + ), + ( + "critical1.basic.caatestsuite.com", + 'Tests handling of unknown critical property (128 caatestsuitedummyproperty "test")', + False, + ), + ( + "critical2.basic.caatestsuite.com", + "Tests handling of unknown critical property with another flag (130)", + False, + ), + ( + "sub1.deny.basic.caatestsuite.com", + "Tests basic tree climbing when CAA record is at parent domain", + False, + ), + ( + "sub2.sub1.deny.basic.caatestsuite.com", + "Tests tree climbing when CAA record is at grandparent domain", + False, + ), + ( + "deny.basic.caatestsuite.com", + "Tests handling of issue property for a wildcard domain", + True, + ), + ( + "deny-wild.basic.caatestsuite.com", + "Tests handling of issuewild for a wildcard domain", + True, + ), + ( + "cname-deny.basic.caatestsuite.com", + "Tests handling of CNAME, where CAA record is at CNAME target", + False, + ), + ( + "cname-cname-deny.basic.caatestsuite.com", + "Tests handling of CNAME chain, where CAA record is at ultimate target", + False, + ), + ( + "sub1.cname-deny.basic.caatestsuite.com", + "Tests handling of CNAME, where parent is CNAME and CAA record is at target", + False, + ), + ( + "deny.permit.basic.caatestsuite.com", + "Tests rejection when parent name contains a permissible CAA record set", + False, + ), + ( + "ipv6only.caatestsuite.com", + "Tests handling of record at IPv6-only authoritative name server", + False, + ), + # ('expired.caatestsuite-dnssec.com', 'Tests rejection when expired DNSSEC signatures', False), # DNSSEC SHOULD be enabled in production but is not a current requirement for MPIC + # ('missing.caatestsuite-dnssec.com', 'Tests rejection when missing DNSSEC signatures', False), # DNSSEC SHOULD be enabled in production but is not a current requirement for MPIC + ( + "blackhole.caatestsuite-dnssec.com", + "Tests rejection when DNSSEC chain goes to non-responsive server", + False, + ), + ( + "servfail.caatestsuite-dnssec.com", + "Tests rejection when DNSSEC chain goes to server returning SERVFAIL", + False, + ), + ( + "refused.caatestsuite-dnssec.com", + "Tests rejection when DNSSEC chain goes to server returning REFUSED", + False, + ), + ( + "xss.caatestsuite.com", + "Tests rejection when issue property has HTML and JS", + False, + ), + ], + ) + def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suite( + self, api_client, domain_or_ip_target, purpose_of_test, is_wildcard_domain + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") if is_wildcard_domain: domain_or_ip_target = "*." + domain_or_ip_target @@ -90,8 +191,8 @@ def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suit domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), caa_check_parameters=CaaCheckParameters( - certificate_type=CertificateType.TLS_SERVER, caa_domains=['example.com'] - ) + certificate_type=CertificateType.TLS_SERVER, caa_domains=["example.com"] + ), ) response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) mpic_response = self.mpic_response_adapter.validate_json(response.text) @@ -101,23 +202,79 @@ def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suit # This case is handled in a compliant manner as it is treated as a lookup failure. # The test for proper communication with an IPv6 nameserver can be enabled with the following additional parameter to the list below. # ('ipv6only.caatestsuite.com', 'Tests handling of record at IPv6-only authoritative name server', False), - @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, is_wildcard_domain', [ - ('deny.basic.caatestsuite.com', 'Tests handling of 0 issue "caatestsuite.com"', False), - ('uppercase-deny.basic.caatestsuite.com', 'Tests handling of uppercase issue tag (0 ISSUE "caatestsuite.com")', False), - ('mixedcase-deny.basic.caatestsuite.com', 'Tests handling of mixed case issue tag (0 IsSuE "caatestsuite.com")', False), - ('big.basic.caatestsuite.com', 'Tests handling of gigantic (1001) CAA record set (0 issue "caatestsuite.com")', False), - ('sub1.deny.basic.caatestsuite.com', 'Tests basic tree climbing when CAA record is at parent domain', False), - ('sub2.sub1.deny.basic.caatestsuite.com', 'Tests tree climbing when CAA record is at grandparent domain', False), - ('deny.basic.caatestsuite.com', 'Tests handling of issue property for a wildcard domain', True), - ('deny-wild.basic.caatestsuite.com', 'Tests handling of issuewild for a wildcard domain', True), - ('cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME, where CAA record is at CNAME target', False), - ('cname-cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME chain, where CAA record is at ultimate target', False), - ('sub1.cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME, where parent is CNAME and CAA record is at target', False), - ('permit.basic.caatestsuite.com', 'Tests acceptance when name contains a permissible CAA record set', False), - ('deny.permit.basic.caatestsuite.com', 'Tests acceptance on a CAA record set', False), - ]) - def api_should_return_is_valid_true_for_valid_tests_in_caa_test_suite_when_caa_domain_is_caatestsuite_com(self, api_client, domain_or_ip_target, - purpose_of_test, is_wildcard_domain): + @pytest.mark.parametrize( + "domain_or_ip_target, purpose_of_test, is_wildcard_domain", + [ + ( + "deny.basic.caatestsuite.com", + 'Tests handling of 0 issue "caatestsuite.com"', + False, + ), + ( + "uppercase-deny.basic.caatestsuite.com", + 'Tests handling of uppercase issue tag (0 ISSUE "caatestsuite.com")', + False, + ), + ( + "mixedcase-deny.basic.caatestsuite.com", + 'Tests handling of mixed case issue tag (0 IsSuE "caatestsuite.com")', + False, + ), + ( + "big.basic.caatestsuite.com", + 'Tests handling of gigantic (1001) CAA record set (0 issue "caatestsuite.com")', + False, + ), + ( + "sub1.deny.basic.caatestsuite.com", + "Tests basic tree climbing when CAA record is at parent domain", + False, + ), + ( + "sub2.sub1.deny.basic.caatestsuite.com", + "Tests tree climbing when CAA record is at grandparent domain", + False, + ), + ( + "deny.basic.caatestsuite.com", + "Tests handling of issue property for a wildcard domain", + True, + ), + ( + "deny-wild.basic.caatestsuite.com", + "Tests handling of issuewild for a wildcard domain", + True, + ), + ( + "cname-deny.basic.caatestsuite.com", + "Tests handling of CNAME, where CAA record is at CNAME target", + False, + ), + ( + "cname-cname-deny.basic.caatestsuite.com", + "Tests handling of CNAME chain, where CAA record is at ultimate target", + False, + ), + ( + "sub1.cname-deny.basic.caatestsuite.com", + "Tests handling of CNAME, where parent is CNAME and CAA record is at target", + False, + ), + ( + "permit.basic.caatestsuite.com", + "Tests acceptance when name contains a permissible CAA record set", + False, + ), + ( + "deny.permit.basic.caatestsuite.com", + "Tests acceptance on a CAA record set", + False, + ), + ], + ) + def api_should_return_is_valid_true_for_valid_tests_in_caa_test_suite_when_caa_domain_is_caatestsuite_com( + self, api_client, domain_or_ip_target, purpose_of_test, is_wildcard_domain + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") if is_wildcard_domain: domain_or_ip_target = "*." + domain_or_ip_target @@ -125,148 +282,265 @@ def api_should_return_is_valid_true_for_valid_tests_in_caa_test_suite_when_caa_d domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), caa_check_parameters=CaaCheckParameters( - certificate_type=CertificateType.TLS_SERVER, caa_domains=['caatestsuite.com', 'example.com']) + certificate_type=CertificateType.TLS_SERVER, + caa_domains=["caatestsuite.com", "example.com"], + ), ) response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) mpic_response = self.mpic_response_adapter.validate_json(response.text) assert mpic_response.is_valid is True - @pytest.mark.skip(reason='Behavior not required in RFC 8659') - @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test', [ - ('dname-deny.basic.caatestsuite.com', 'Tests handling of a DNAME when CAA record exists at DNAME target'), - ('cname-deny-sub.basic.caatestsuite.com', 'Tests handling of a CNAME when CAA record exists at parent of CNAME target') - ]) - def api_should_return_is_valid_false_for_do_not_issue_caa_test_suite_for_rfc_6844(self, api_client, domain_or_ip_target, purpose_of_test): + @pytest.mark.skip(reason="Behavior not required in RFC 8659") + @pytest.mark.parametrize( + "domain_or_ip_target, purpose_of_test", + [ + ( + "dname-deny.basic.caatestsuite.com", + "Tests handling of a DNAME when CAA record exists at DNAME target", + ), + ( + "cname-deny-sub.basic.caatestsuite.com", + "Tests handling of a CNAME when CAA record exists at parent of CNAME target", + ), + ], + ) + def api_should_return_is_valid_false_for_do_not_issue_caa_test_suite_for_rfc_6844( + self, api_client, domain_or_ip_target, purpose_of_test + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicCaaRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), - caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['example.com']) + caa_check_parameters=CaaCheckParameters( + certificate_type=CertificateType.TLS_SERVER, caa_domains=["example.com"] + ), ) response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) mpic_response = self.mpic_response_adapter.validate_json(response.text) assert mpic_response.is_valid is False - @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test', [ - ('dns-01.integration-testing.open-mpic.org', 'Standard proper dns-01 test'), - ('dns-01-multi.integration-testing.open-mpic.org', 'Proper dns-01 test with multiple TXT records'), - ('dns-01-cname.integration-testing.open-mpic.org', 'Proper dns-01 test with CNAME') - ]) + @pytest.mark.parametrize( + "domain_or_ip_target, purpose_of_test", + [ + ("dns-01.integration-testing.open-mpic.org", "Standard proper dns-01 test"), + ( + "dns-01-multi.integration-testing.open-mpic.org", + "Proper dns-01 test with multiple TXT records", + ), + ( + "dns-01-cname.integration-testing.open-mpic.org", + "Proper dns-01 test with CNAME", + ), + ], + ) def api_should_return_200_given_valid_dns_01_validation(self, api_client, domain_or_ip_target, purpose_of_test): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvAcmeDns01ValidationDetails(key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo") - ) + validation_details=DcvAcmeDns01ValidationDetails( + key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo" + ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is True - @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test', [ - ('dns-01-leading-whitespace.integration-testing.open-mpic.org', 'leading whitespace'), - ('dns-01-trailing-whitespace.integration-testing.open-mpic.org', 'trailing'), - ('dns-01-nxdomain.integration-testing.open-mpic.org', 'NXDOMAIN') - ]) - def api_should_return_200_is_valid_false_given_invalid_dns_01_validation(self, api_client, domain_or_ip_target, purpose_of_test): + @pytest.mark.parametrize( + "domain_or_ip_target, purpose_of_test", + [ + ( + "dns-01-leading-whitespace.integration-testing.open-mpic.org", + "leading whitespace", + ), + ( + "dns-01-trailing-whitespace.integration-testing.open-mpic.org", + "trailing", + ), + ("dns-01-nxdomain.integration-testing.open-mpic.org", "NXDOMAIN"), + ], + ) + def api_should_return_200_is_valid_false_given_invalid_dns_01_validation( + self, api_client, domain_or_ip_target, purpose_of_test + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvAcmeDns01ValidationDetails(key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo") - ) + validation_details=DcvAcmeDns01ValidationDetails( + key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo" + ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - - assert mpic_response.is_valid is False + assert mpic_response.is_valid is False - @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, token, key_authorization', [ - ('integration-testing.open-mpic.org', 'Standard http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs"), - ('integration-testing.open-mpic.org', 'Redirect 302 http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs") - ]) - def api_should_return_200_given_valid_http_01_validation(self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization): + @pytest.mark.parametrize( + "domain_or_ip_target, purpose_of_test, token, key_authorization", + [ + ( + "integration-testing.open-mpic.org", + "Standard http-01 test", + "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", + "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs", + ), + ( + "integration-testing.open-mpic.org", + "Redirect 302 http-01 test", + "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", + "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs", + ), + ], + ) + def api_should_return_200_given_valid_http_01_validation( + self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( validation_details=DcvAcmeHttp01ValidationDetails(key_authorization=key_authorization, token=token) - ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - - assert mpic_response.is_valid is True + assert mpic_response.is_valid is True - @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, token, key_authorization', [ - ('integration-testing.open-mpic.org', 'Failed http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa"), - ('integration-testing.open-mpic.org', 'Failed 302 http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa") - ]) - def api_should_return_200_given_invalid_http_01_validation(self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization): + @pytest.mark.parametrize( + "domain_or_ip_target, purpose_of_test, token, key_authorization", + [ + ( + "integration-testing.open-mpic.org", + "Failed http-01 test", + "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", + "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa", + ), + ( + "integration-testing.open-mpic.org", + "Failed 302 http-01 test", + "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", + "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa", + ), + ], + ) + def api_should_return_200_given_invalid_http_01_validation( + self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( validation_details=DcvAcmeHttp01ValidationDetails(key_authorization=key_authorization, token=token) - ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is False - @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, http_token_path, challenge_value', [ - ('integration-testing.open-mpic.org', 'Valid website change v2 challenge', 'validation-doc.txt', 'test-validation'), - ('integration-testing.open-mpic.org', 'Valid 302 website change v2 challenge', 'validation-doc-redirect.txt', "test-validation-redirect") - ]) - def api_should_return_200_given_valid_website_change_validation(self, api_client, domain_or_ip_target, purpose_of_test, http_token_path, challenge_value): + @pytest.mark.parametrize( + "domain_or_ip_target, purpose_of_test, http_token_path, challenge_value", + [ + ( + "integration-testing.open-mpic.org", + "Valid website change v2 challenge", + "validation-doc.txt", + "test-validation", + ), + ( + "integration-testing.open-mpic.org", + "Valid 302 website change v2 challenge", + "validation-doc-redirect.txt", + "test-validation-redirect", + ), + ], + ) + def api_should_return_200_given_valid_website_change_validation( + self, + api_client, + domain_or_ip_target, + purpose_of_test, + http_token_path, + challenge_value, + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvWebsiteChangeValidationDetails(http_token_path=http_token_path, - challenge_value=challenge_value) - ) + validation_details=DcvWebsiteChangeValidationDetails( + http_token_path=http_token_path, challenge_value=challenge_value + ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is True - @pytest.mark.parametrize('domain_or_ip_target, dns_record_type, challenge_value, purpose_of_test', [ - ('dns-change-txt.integration-testing.open-mpic.org', DnsRecordType.TXT, "1234567890abcdefg.", 'standard TXT dns change'), - ('dns-change-cname.integration-testing.open-mpic.org', DnsRecordType.CNAME, "1234567890abcdefg.", 'standard CNAME dns change'), - ('dns-change-caa.integration-testing.open-mpic.org', DnsRecordType.CAA, '0 dnschange "1234567890abcdefg."', 'standard CAA dns change'), - ]) - def api_should_return_200_is_valid_true_given_valid_dns_change_validation(self, api_client, domain_or_ip_target, dns_record_type, challenge_value, purpose_of_test): + @pytest.mark.parametrize( + "domain_or_ip_target, dns_record_type, challenge_value, purpose_of_test", + [ + ( + "dns-change-txt.integration-testing.open-mpic.org", + DnsRecordType.TXT, + "1234567890abcdefg.", + "standard TXT dns change", + ), + ( + "dns-change-cname.integration-testing.open-mpic.org", + DnsRecordType.CNAME, + "1234567890abcdefg.", + "standard CNAME dns change", + ), + ( + "dns-change-caa.integration-testing.open-mpic.org", + DnsRecordType.CAA, + '0 dnschange "1234567890abcdefg."', + "standard CAA dns change", + ), + ], + ) + def api_should_return_200_is_valid_true_given_valid_dns_change_validation( + self, + api_client, + domain_or_ip_target, + dns_record_type, + challenge_value, + purpose_of_test, + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvDnsChangeValidationDetails(challenge_value=challenge_value, dns_record_type=dns_record_type, dns_name_prefix="") - ) + validation_details=DcvDnsChangeValidationDetails( + challenge_value=challenge_value, + dns_record_type=dns_record_type, + dns_name_prefix="", + ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -278,11 +552,10 @@ def api_should_return_200_is_valid_true_given_valid_dns_change_validation(self, def api_should_return_200_and_failed_corroboration_given_failed_dcv_check(self, api_client): request = MpicDcvRequest( - domain_or_ip_target='ifconfig.me', + domain_or_ip_target="ifconfig.me", dcv_check_parameters=DcvCheckParameters( - validation_details=DcvWebsiteChangeValidationDetails(http_token_path='/', - challenge_value='test') - ) + validation_details=DcvWebsiteChangeValidationDetails(http_token_path="/", challenge_value="test") + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -293,9 +566,13 @@ def api_should_return_200_and_failed_corroboration_given_failed_dcv_check(self, def api_should_return_400_given_invalid_orchestration_parameters_in_request(self, api_client): request = MpicCaaRequest( - domain_or_ip_target='example.com', - orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=5), # invalid quorum count - caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['mozilla.com']) + domain_or_ip_target="example.com", + orchestration_parameters=MpicRequestOrchestrationParameters( + perspective_count=3, quorum_count=5 + ), # invalid quorum count + caa_check_parameters=CaaCheckParameters( + certificate_type=CertificateType.TLS_SERVER, caa_domains=["mozilla.com"] + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -303,20 +580,25 @@ def api_should_return_400_given_invalid_orchestration_parameters_in_request(self assert response.status_code == 400 response_body = json.loads(response.text) print("\nResponse:\n", json.dumps(response_body, indent=4)) # pretty print response body - assert response_body['error'] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key - assert any(issue['issue_type'] == MpicRequestValidationMessages.INVALID_QUORUM_COUNT.key for issue in response_body['validation_issues']) + assert response_body["error"] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key + assert any( + issue["issue_type"] == MpicRequestValidationMessages.INVALID_QUORUM_COUNT.key + for issue in response_body["validation_issues"] + ) def api_should_return_400_given_invalid_check_type_in_request(self, api_client): request = MpicCaaRequest( - domain_or_ip_target='example.com', + domain_or_ip_target="example.com", orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), - caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['mozilla.com']) + caa_check_parameters=CaaCheckParameters( + certificate_type=CertificateType.TLS_SERVER, caa_domains=["mozilla.com"] + ), ) - request.check_type = 'invalid_check_type' + request.check_type = "invalid_check_type" print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 400 response_body = json.loads(response.text) print("\nResponse:\n", json.dumps(response_body, indent=4)) - assert response_body['error'] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key + assert response_body["error"] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key From 0f299a24059fb7ce4257522013a5e9b4e077ef31 Mon Sep 17 00:00:00 2001 From: Henry Birge-Lee Date: Sat, 1 Feb 2025 18:01:06 -0500 Subject: [PATCH 3/4] undid black formatting --- tests/integration/test_deployed_mpic_api.py | 534 +++++--------------- 1 file changed, 126 insertions(+), 408 deletions(-) diff --git a/tests/integration/test_deployed_mpic_api.py b/tests/integration/test_deployed_mpic_api.py index 9eaf89e..9c3c1eb 100644 --- a/tests/integration/test_deployed_mpic_api.py +++ b/tests/integration/test_deployed_mpic_api.py @@ -5,28 +5,18 @@ from pydantic import TypeAdapter -from open_mpic_core.common_domain.check_parameters import ( - CaaCheckParameters, - DcvWebsiteChangeValidationDetails, - DcvAcmeDns01ValidationDetails, - DcvAcmeHttp01ValidationDetails, - DcvDnsChangeValidationDetails, -) +from open_mpic_core.common_domain.check_parameters import CaaCheckParameters, DcvWebsiteChangeValidationDetails, DcvAcmeDns01ValidationDetails, DcvAcmeHttp01ValidationDetails, DcvDnsChangeValidationDetails from open_mpic_core.common_domain.check_parameters import DcvCheckParameters from open_mpic_core.common_domain.enum.certificate_type import CertificateType from open_mpic_core.common_domain.enum.check_type import CheckType from open_mpic_core.common_domain.enum.dns_record_type import DnsRecordType from open_mpic_core.mpic_coordinator.domain.mpic_request import MpicCaaRequest from open_mpic_core.mpic_coordinator.domain.mpic_request import MpicDcvRequest -from open_mpic_core.mpic_coordinator.domain.mpic_orchestration_parameters import ( - MpicRequestOrchestrationParameters, -) +from open_mpic_core.mpic_coordinator.domain.mpic_orchestration_parameters import MpicRequestOrchestrationParameters import testing_api_client from open_mpic_core.mpic_coordinator.domain.mpic_response import MpicResponse -from open_mpic_core.mpic_coordinator.messages.mpic_request_validation_messages import ( - MpicRequestValidationMessages, -) +from open_mpic_core.mpic_coordinator.messages.mpic_request_validation_messages import MpicRequestValidationMessages MPIC_REQUEST_PATH = "/mpic" @@ -38,22 +28,20 @@ class TestDeployedMpicApi: def setup_class(cls): cls.mpic_response_adapter = TypeAdapter(MpicResponse) - @pytest.fixture(scope="class") + @pytest.fixture(scope='class') def api_client(self): with pytest.MonkeyPatch.context() as class_scoped_monkeypatch: # blank out argv except first param; arg parser doesn't expect pytest args - class_scoped_monkeypatch.setattr(sys, "argv", sys.argv[:1]) + class_scoped_monkeypatch.setattr(sys, 'argv', sys.argv[:1]) api_client = testing_api_client.TestingApiClient() yield api_client api_client.close() def api_should_return_200_and_passed_corroboration_given_successful_caa_check(self, api_client): request = MpicCaaRequest( - domain_or_ip_target="example.com", + domain_or_ip_target='example.com', orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), - caa_check_parameters=CaaCheckParameters( - certificate_type=CertificateType.TLS_SERVER, caa_domains=["mozilla.com"] - ), + caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['mozilla.com']) ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -66,124 +54,35 @@ def api_should_return_200_and_passed_corroboration_given_successful_caa_check(se assert mpic_response.is_valid is True perspectives_list = mpic_response.perspectives assert len(perspectives_list) == request.orchestration_parameters.perspective_count - assert ( - len( - list( - filter( - lambda perspective: perspective.check_type == CheckType.CAA, - perspectives_list, - ) - ) - ) - == request.orchestration_parameters.perspective_count - ) - - @pytest.mark.parametrize( - "domain_or_ip_target, purpose_of_test, is_wildcard_domain", - [ - ("empty.basic.caatestsuite.com", 'Tests handling of 0 issue ";"', False), - ( - "deny.basic.caatestsuite.com", - 'Tests handling of 0 issue "caatestsuite.com"', - False, - ), - ( - "uppercase-deny.basic.caatestsuite.com", - 'Tests handling of uppercase issue tag (0 ISSUE "caatestsuite.com")', - False, - ), - ( - "mixedcase-deny.basic.caatestsuite.com", - 'Tests handling of mixed case issue tag (0 IsSuE "caatestsuite.com")', - False, - ), - ( - "big.basic.caatestsuite.com", - 'Tests handling of gigantic (1001) CAA record set (0 issue "caatestsuite.com")', - False, - ), - ( - "critical1.basic.caatestsuite.com", - 'Tests handling of unknown critical property (128 caatestsuitedummyproperty "test")', - False, - ), - ( - "critical2.basic.caatestsuite.com", - "Tests handling of unknown critical property with another flag (130)", - False, - ), - ( - "sub1.deny.basic.caatestsuite.com", - "Tests basic tree climbing when CAA record is at parent domain", - False, - ), - ( - "sub2.sub1.deny.basic.caatestsuite.com", - "Tests tree climbing when CAA record is at grandparent domain", - False, - ), - ( - "deny.basic.caatestsuite.com", - "Tests handling of issue property for a wildcard domain", - True, - ), - ( - "deny-wild.basic.caatestsuite.com", - "Tests handling of issuewild for a wildcard domain", - True, - ), - ( - "cname-deny.basic.caatestsuite.com", - "Tests handling of CNAME, where CAA record is at CNAME target", - False, - ), - ( - "cname-cname-deny.basic.caatestsuite.com", - "Tests handling of CNAME chain, where CAA record is at ultimate target", - False, - ), - ( - "sub1.cname-deny.basic.caatestsuite.com", - "Tests handling of CNAME, where parent is CNAME and CAA record is at target", - False, - ), - ( - "deny.permit.basic.caatestsuite.com", - "Tests rejection when parent name contains a permissible CAA record set", - False, - ), - ( - "ipv6only.caatestsuite.com", - "Tests handling of record at IPv6-only authoritative name server", - False, - ), - # ('expired.caatestsuite-dnssec.com', 'Tests rejection when expired DNSSEC signatures', False), # DNSSEC SHOULD be enabled in production but is not a current requirement for MPIC - # ('missing.caatestsuite-dnssec.com', 'Tests rejection when missing DNSSEC signatures', False), # DNSSEC SHOULD be enabled in production but is not a current requirement for MPIC - ( - "blackhole.caatestsuite-dnssec.com", - "Tests rejection when DNSSEC chain goes to non-responsive server", - False, - ), - ( - "servfail.caatestsuite-dnssec.com", - "Tests rejection when DNSSEC chain goes to server returning SERVFAIL", - False, - ), - ( - "refused.caatestsuite-dnssec.com", - "Tests rejection when DNSSEC chain goes to server returning REFUSED", - False, - ), - ( - "xss.caatestsuite.com", - "Tests rejection when issue property has HTML and JS", - False, - ), - ], - ) - def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suite( - self, api_client, domain_or_ip_target, purpose_of_test, is_wildcard_domain - ): + assert (len(list(filter(lambda perspective: perspective.check_type == CheckType.CAA, perspectives_list))) + == request.orchestration_parameters.perspective_count) + + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, is_wildcard_domain', [ + ('empty.basic.caatestsuite.com', 'Tests handling of 0 issue ";"', False), + ('deny.basic.caatestsuite.com', 'Tests handling of 0 issue "caatestsuite.com"', False), + ('uppercase-deny.basic.caatestsuite.com', 'Tests handling of uppercase issue tag (0 ISSUE "caatestsuite.com")', False), + ('mixedcase-deny.basic.caatestsuite.com', 'Tests handling of mixed case issue tag (0 IsSuE "caatestsuite.com")', False), + ('big.basic.caatestsuite.com', 'Tests handling of gigantic (1001) CAA record set (0 issue "caatestsuite.com")', False), + ('critical1.basic.caatestsuite.com', 'Tests handling of unknown critical property (128 caatestsuitedummyproperty "test")', False), + ('critical2.basic.caatestsuite.com', 'Tests handling of unknown critical property with another flag (130)', False), + ('sub1.deny.basic.caatestsuite.com', 'Tests basic tree climbing when CAA record is at parent domain', False), + ('sub2.sub1.deny.basic.caatestsuite.com', 'Tests tree climbing when CAA record is at grandparent domain', False), + ('deny.basic.caatestsuite.com', 'Tests handling of issue property for a wildcard domain', True), + ('deny-wild.basic.caatestsuite.com', 'Tests handling of issuewild for a wildcard domain', True), + ('cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME, where CAA record is at CNAME target', False), + ('cname-cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME chain, where CAA record is at ultimate target', False), + ('sub1.cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME, where parent is CNAME and CAA record is at target', False), + ('deny.permit.basic.caatestsuite.com', 'Tests rejection when parent name contains a permissible CAA record set', False), + ('ipv6only.caatestsuite.com', 'Tests handling of record at IPv6-only authoritative name server', False), + #('expired.caatestsuite-dnssec.com', 'Tests rejection when expired DNSSEC signatures', False), # DNSSEC SHOULD be enabled in production but is not a current requirement for MPIC + #('missing.caatestsuite-dnssec.com', 'Tests rejection when missing DNSSEC signatures', False), # DNSSEC SHOULD be enabled in production but is not a current requirement for MPIC + ('blackhole.caatestsuite-dnssec.com', 'Tests rejection when DNSSEC chain goes to non-responsive server', False), + ('servfail.caatestsuite-dnssec.com', 'Tests rejection when DNSSEC chain goes to server returning SERVFAIL', False), + ('refused.caatestsuite-dnssec.com', 'Tests rejection when DNSSEC chain goes to server returning REFUSED', False), + ('xss.caatestsuite.com', 'Tests rejection when issue property has HTML and JS', False), + ]) + def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suite(self, api_client, domain_or_ip_target, + purpose_of_test, is_wildcard_domain): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") if is_wildcard_domain: domain_or_ip_target = "*." + domain_or_ip_target @@ -191,8 +90,8 @@ def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suit domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), caa_check_parameters=CaaCheckParameters( - certificate_type=CertificateType.TLS_SERVER, caa_domains=["example.com"] - ), + certificate_type=CertificateType.TLS_SERVER, caa_domains=['example.com'] + ) ) response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) mpic_response = self.mpic_response_adapter.validate_json(response.text) @@ -202,79 +101,23 @@ def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suit # This case is handled in a compliant manner as it is treated as a lookup failure. # The test for proper communication with an IPv6 nameserver can be enabled with the following additional parameter to the list below. # ('ipv6only.caatestsuite.com', 'Tests handling of record at IPv6-only authoritative name server', False), - @pytest.mark.parametrize( - "domain_or_ip_target, purpose_of_test, is_wildcard_domain", - [ - ( - "deny.basic.caatestsuite.com", - 'Tests handling of 0 issue "caatestsuite.com"', - False, - ), - ( - "uppercase-deny.basic.caatestsuite.com", - 'Tests handling of uppercase issue tag (0 ISSUE "caatestsuite.com")', - False, - ), - ( - "mixedcase-deny.basic.caatestsuite.com", - 'Tests handling of mixed case issue tag (0 IsSuE "caatestsuite.com")', - False, - ), - ( - "big.basic.caatestsuite.com", - 'Tests handling of gigantic (1001) CAA record set (0 issue "caatestsuite.com")', - False, - ), - ( - "sub1.deny.basic.caatestsuite.com", - "Tests basic tree climbing when CAA record is at parent domain", - False, - ), - ( - "sub2.sub1.deny.basic.caatestsuite.com", - "Tests tree climbing when CAA record is at grandparent domain", - False, - ), - ( - "deny.basic.caatestsuite.com", - "Tests handling of issue property for a wildcard domain", - True, - ), - ( - "deny-wild.basic.caatestsuite.com", - "Tests handling of issuewild for a wildcard domain", - True, - ), - ( - "cname-deny.basic.caatestsuite.com", - "Tests handling of CNAME, where CAA record is at CNAME target", - False, - ), - ( - "cname-cname-deny.basic.caatestsuite.com", - "Tests handling of CNAME chain, where CAA record is at ultimate target", - False, - ), - ( - "sub1.cname-deny.basic.caatestsuite.com", - "Tests handling of CNAME, where parent is CNAME and CAA record is at target", - False, - ), - ( - "permit.basic.caatestsuite.com", - "Tests acceptance when name contains a permissible CAA record set", - False, - ), - ( - "deny.permit.basic.caatestsuite.com", - "Tests acceptance on a CAA record set", - False, - ), - ], - ) - def api_should_return_is_valid_true_for_valid_tests_in_caa_test_suite_when_caa_domain_is_caatestsuite_com( - self, api_client, domain_or_ip_target, purpose_of_test, is_wildcard_domain - ): + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, is_wildcard_domain', [ + ('deny.basic.caatestsuite.com', 'Tests handling of 0 issue "caatestsuite.com"', False), + ('uppercase-deny.basic.caatestsuite.com', 'Tests handling of uppercase issue tag (0 ISSUE "caatestsuite.com")', False), + ('mixedcase-deny.basic.caatestsuite.com', 'Tests handling of mixed case issue tag (0 IsSuE "caatestsuite.com")', False), + ('big.basic.caatestsuite.com', 'Tests handling of gigantic (1001) CAA record set (0 issue "caatestsuite.com")', False), + ('sub1.deny.basic.caatestsuite.com', 'Tests basic tree climbing when CAA record is at parent domain', False), + ('sub2.sub1.deny.basic.caatestsuite.com', 'Tests tree climbing when CAA record is at grandparent domain', False), + ('deny.basic.caatestsuite.com', 'Tests handling of issue property for a wildcard domain', True), + ('deny-wild.basic.caatestsuite.com', 'Tests handling of issuewild for a wildcard domain', True), + ('cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME, where CAA record is at CNAME target', False), + ('cname-cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME chain, where CAA record is at ultimate target', False), + ('sub1.cname-deny.basic.caatestsuite.com', 'Tests handling of CNAME, where parent is CNAME and CAA record is at target', False), + ('permit.basic.caatestsuite.com', 'Tests acceptance when name contains a permissible CAA record set', False), + ('deny.permit.basic.caatestsuite.com', 'Tests acceptance on a CAA record set', False), + ]) + def api_should_return_is_valid_true_for_valid_tests_in_caa_test_suite_when_caa_domain_is_caatestsuite_com(self, api_client, domain_or_ip_target, + purpose_of_test, is_wildcard_domain): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") if is_wildcard_domain: domain_or_ip_target = "*." + domain_or_ip_target @@ -282,265 +125,148 @@ def api_should_return_is_valid_true_for_valid_tests_in_caa_test_suite_when_caa_d domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), caa_check_parameters=CaaCheckParameters( - certificate_type=CertificateType.TLS_SERVER, - caa_domains=["caatestsuite.com", "example.com"], - ), + certificate_type=CertificateType.TLS_SERVER, caa_domains=['caatestsuite.com', 'example.com']) ) response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) mpic_response = self.mpic_response_adapter.validate_json(response.text) assert mpic_response.is_valid is True - @pytest.mark.skip(reason="Behavior not required in RFC 8659") - @pytest.mark.parametrize( - "domain_or_ip_target, purpose_of_test", - [ - ( - "dname-deny.basic.caatestsuite.com", - "Tests handling of a DNAME when CAA record exists at DNAME target", - ), - ( - "cname-deny-sub.basic.caatestsuite.com", - "Tests handling of a CNAME when CAA record exists at parent of CNAME target", - ), - ], - ) - def api_should_return_is_valid_false_for_do_not_issue_caa_test_suite_for_rfc_6844( - self, api_client, domain_or_ip_target, purpose_of_test - ): + @pytest.mark.skip(reason='Behavior not required in RFC 8659') + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test', [ + ('dname-deny.basic.caatestsuite.com', 'Tests handling of a DNAME when CAA record exists at DNAME target'), + ('cname-deny-sub.basic.caatestsuite.com', 'Tests handling of a CNAME when CAA record exists at parent of CNAME target') + ]) + def api_should_return_is_valid_false_for_do_not_issue_caa_test_suite_for_rfc_6844(self, api_client, domain_or_ip_target, purpose_of_test): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicCaaRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), - caa_check_parameters=CaaCheckParameters( - certificate_type=CertificateType.TLS_SERVER, caa_domains=["example.com"] - ), + caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['example.com']) ) response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) mpic_response = self.mpic_response_adapter.validate_json(response.text) assert mpic_response.is_valid is False - @pytest.mark.parametrize( - "domain_or_ip_target, purpose_of_test", - [ - ("dns-01.integration-testing.open-mpic.org", "Standard proper dns-01 test"), - ( - "dns-01-multi.integration-testing.open-mpic.org", - "Proper dns-01 test with multiple TXT records", - ), - ( - "dns-01-cname.integration-testing.open-mpic.org", - "Proper dns-01 test with CNAME", - ), - ], - ) + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test', [ + ('dns-01.integration-testing.open-mpic.org', 'Standard proper dns-01 test'), + ('dns-01-multi.integration-testing.open-mpic.org', 'Proper dns-01 test with multiple TXT records'), + ('dns-01-cname.integration-testing.open-mpic.org', 'Proper dns-01 test with CNAME') + ]) def api_should_return_200_given_valid_dns_01_validation(self, api_client, domain_or_ip_target, purpose_of_test): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvAcmeDns01ValidationDetails( - key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo" - ) - ), + validation_details=DcvAcmeDns01ValidationDetails(key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo") + ) ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is True - @pytest.mark.parametrize( - "domain_or_ip_target, purpose_of_test", - [ - ( - "dns-01-leading-whitespace.integration-testing.open-mpic.org", - "leading whitespace", - ), - ( - "dns-01-trailing-whitespace.integration-testing.open-mpic.org", - "trailing", - ), - ("dns-01-nxdomain.integration-testing.open-mpic.org", "NXDOMAIN"), - ], - ) - def api_should_return_200_is_valid_false_given_invalid_dns_01_validation( - self, api_client, domain_or_ip_target, purpose_of_test - ): + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test', [ + ('dns-01-leading-whitespace.integration-testing.open-mpic.org', 'leading whitespace'), + ('dns-01-trailing-whitespace.integration-testing.open-mpic.org', 'trailing'), + ('dns-01-nxdomain.integration-testing.open-mpic.org', 'NXDOMAIN') + ]) + def api_should_return_200_is_valid_false_given_invalid_dns_01_validation(self, api_client, domain_or_ip_target, purpose_of_test): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvAcmeDns01ValidationDetails( - key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo" - ) - ), + validation_details=DcvAcmeDns01ValidationDetails(key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo") + ) ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is False - @pytest.mark.parametrize( - "domain_or_ip_target, purpose_of_test, token, key_authorization", - [ - ( - "integration-testing.open-mpic.org", - "Standard http-01 test", - "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", - "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs", - ), - ( - "integration-testing.open-mpic.org", - "Redirect 302 http-01 test", - "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", - "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs", - ), - ], - ) - def api_should_return_200_given_valid_http_01_validation( - self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization - ): + + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, token, key_authorization', [ + ('integration-testing.open-mpic.org', 'Standard http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs"), + ('integration-testing.open-mpic.org', 'Redirect 302 http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs") + ]) + def api_should_return_200_given_valid_http_01_validation(self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( validation_details=DcvAcmeHttp01ValidationDetails(key_authorization=key_authorization, token=token) - ), + ) ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is True - @pytest.mark.parametrize( - "domain_or_ip_target, purpose_of_test, token, key_authorization", - [ - ( - "integration-testing.open-mpic.org", - "Failed http-01 test", - "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", - "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa", - ), - ( - "integration-testing.open-mpic.org", - "Failed 302 http-01 test", - "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", - "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa", - ), - ], - ) - def api_should_return_200_given_invalid_http_01_validation( - self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization - ): + + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, token, key_authorization', [ + ('integration-testing.open-mpic.org', 'Failed http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa"), + ('integration-testing.open-mpic.org', 'Failed 302 http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa") + ]) + def api_should_return_200_given_invalid_http_01_validation(self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( validation_details=DcvAcmeHttp01ValidationDetails(key_authorization=key_authorization, token=token) - ), + ) ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is False - @pytest.mark.parametrize( - "domain_or_ip_target, purpose_of_test, http_token_path, challenge_value", - [ - ( - "integration-testing.open-mpic.org", - "Valid website change v2 challenge", - "validation-doc.txt", - "test-validation", - ), - ( - "integration-testing.open-mpic.org", - "Valid 302 website change v2 challenge", - "validation-doc-redirect.txt", - "test-validation-redirect", - ), - ], - ) - def api_should_return_200_given_valid_website_change_validation( - self, - api_client, - domain_or_ip_target, - purpose_of_test, - http_token_path, - challenge_value, - ): + @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, http_token_path, challenge_value', [ + ('integration-testing.open-mpic.org', 'Valid website change v2 challenge', 'validation-doc.txt', 'test-validation'), + ('integration-testing.open-mpic.org', 'Valid 302 website change v2 challenge', 'validation-doc-redirect.txt', "test-validation-redirect") + ]) + def api_should_return_200_given_valid_website_change_validation(self, api_client, domain_or_ip_target, purpose_of_test, http_token_path, challenge_value): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvWebsiteChangeValidationDetails( - http_token_path=http_token_path, challenge_value=challenge_value - ) - ), + validation_details=DcvWebsiteChangeValidationDetails(http_token_path=http_token_path, + challenge_value=challenge_value) + ) ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is True - @pytest.mark.parametrize( - "domain_or_ip_target, dns_record_type, challenge_value, purpose_of_test", - [ - ( - "dns-change-txt.integration-testing.open-mpic.org", - DnsRecordType.TXT, - "1234567890abcdefg.", - "standard TXT dns change", - ), - ( - "dns-change-cname.integration-testing.open-mpic.org", - DnsRecordType.CNAME, - "1234567890abcdefg.", - "standard CNAME dns change", - ), - ( - "dns-change-caa.integration-testing.open-mpic.org", - DnsRecordType.CAA, - '0 dnschange "1234567890abcdefg."', - "standard CAA dns change", - ), - ], - ) - def api_should_return_200_is_valid_true_given_valid_dns_change_validation( - self, - api_client, - domain_or_ip_target, - dns_record_type, - challenge_value, - purpose_of_test, - ): + @pytest.mark.parametrize('domain_or_ip_target, dns_record_type, challenge_value, purpose_of_test', [ + ('dns-change-txt.integration-testing.open-mpic.org', DnsRecordType.TXT, "1234567890abcdefg.", 'standard TXT dns change'), + ('dns-change-cname.integration-testing.open-mpic.org', DnsRecordType.CNAME, "1234567890abcdefg.", 'standard CNAME dns change'), + ('dns-change-caa.integration-testing.open-mpic.org', DnsRecordType.CAA, '0 dnschange "1234567890abcdefg."', 'standard CAA dns change'), + ]) + def api_should_return_200_is_valid_true_given_valid_dns_change_validation(self, api_client, domain_or_ip_target, dns_record_type, challenge_value, purpose_of_test): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvDnsChangeValidationDetails( - challenge_value=challenge_value, - dns_record_type=dns_record_type, - dns_name_prefix="", - ) - ), + validation_details=DcvDnsChangeValidationDetails(challenge_value=challenge_value, dns_record_type=dns_record_type, dns_name_prefix="") + ) ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -552,10 +278,11 @@ def api_should_return_200_is_valid_true_given_valid_dns_change_validation( def api_should_return_200_and_failed_corroboration_given_failed_dcv_check(self, api_client): request = MpicDcvRequest( - domain_or_ip_target="ifconfig.me", + domain_or_ip_target='ifconfig.me', dcv_check_parameters=DcvCheckParameters( - validation_details=DcvWebsiteChangeValidationDetails(http_token_path="/", challenge_value="test") - ), + validation_details=DcvWebsiteChangeValidationDetails(http_token_path='/', + challenge_value='test') + ) ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -566,13 +293,9 @@ def api_should_return_200_and_failed_corroboration_given_failed_dcv_check(self, def api_should_return_400_given_invalid_orchestration_parameters_in_request(self, api_client): request = MpicCaaRequest( - domain_or_ip_target="example.com", - orchestration_parameters=MpicRequestOrchestrationParameters( - perspective_count=3, quorum_count=5 - ), # invalid quorum count - caa_check_parameters=CaaCheckParameters( - certificate_type=CertificateType.TLS_SERVER, caa_domains=["mozilla.com"] - ), + domain_or_ip_target='example.com', + orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=5), # invalid quorum count + caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['mozilla.com']) ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -580,25 +303,20 @@ def api_should_return_400_given_invalid_orchestration_parameters_in_request(self assert response.status_code == 400 response_body = json.loads(response.text) print("\nResponse:\n", json.dumps(response_body, indent=4)) # pretty print response body - assert response_body["error"] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key - assert any( - issue["issue_type"] == MpicRequestValidationMessages.INVALID_QUORUM_COUNT.key - for issue in response_body["validation_issues"] - ) + assert response_body['error'] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key + assert any(issue['issue_type'] == MpicRequestValidationMessages.INVALID_QUORUM_COUNT.key for issue in response_body['validation_issues']) def api_should_return_400_given_invalid_check_type_in_request(self, api_client): request = MpicCaaRequest( - domain_or_ip_target="example.com", + domain_or_ip_target='example.com', orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), - caa_check_parameters=CaaCheckParameters( - certificate_type=CertificateType.TLS_SERVER, caa_domains=["mozilla.com"] - ), + caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['mozilla.com']) ) - request.check_type = "invalid_check_type" + request.check_type = 'invalid_check_type' print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 400 response_body = json.loads(response.text) print("\nResponse:\n", json.dumps(response_body, indent=4)) - assert response_body["error"] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key + assert response_body['error'] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key From 8868981ccbf9f1437e09d963f8edd582d2eacac3 Mon Sep 17 00:00:00 2001 From: Henry Birge-Lee Date: Sat, 1 Feb 2025 18:03:59 -0500 Subject: [PATCH 4/4] added fmt on fmt off statements and ran black formatter --- tests/integration/test_deployed_mpic_api.py | 164 +++++++++++++------- 1 file changed, 111 insertions(+), 53 deletions(-) diff --git a/tests/integration/test_deployed_mpic_api.py b/tests/integration/test_deployed_mpic_api.py index 9c3c1eb..0bfbde4 100644 --- a/tests/integration/test_deployed_mpic_api.py +++ b/tests/integration/test_deployed_mpic_api.py @@ -5,7 +5,13 @@ from pydantic import TypeAdapter -from open_mpic_core.common_domain.check_parameters import CaaCheckParameters, DcvWebsiteChangeValidationDetails, DcvAcmeDns01ValidationDetails, DcvAcmeHttp01ValidationDetails, DcvDnsChangeValidationDetails +from open_mpic_core.common_domain.check_parameters import ( + CaaCheckParameters, + DcvWebsiteChangeValidationDetails, + DcvAcmeDns01ValidationDetails, + DcvAcmeHttp01ValidationDetails, + DcvDnsChangeValidationDetails, +) from open_mpic_core.common_domain.check_parameters import DcvCheckParameters from open_mpic_core.common_domain.enum.certificate_type import CertificateType from open_mpic_core.common_domain.enum.check_type import CheckType @@ -28,20 +34,22 @@ class TestDeployedMpicApi: def setup_class(cls): cls.mpic_response_adapter = TypeAdapter(MpicResponse) - @pytest.fixture(scope='class') + @pytest.fixture(scope="class") def api_client(self): with pytest.MonkeyPatch.context() as class_scoped_monkeypatch: # blank out argv except first param; arg parser doesn't expect pytest args - class_scoped_monkeypatch.setattr(sys, 'argv', sys.argv[:1]) + class_scoped_monkeypatch.setattr(sys, "argv", sys.argv[:1]) api_client = testing_api_client.TestingApiClient() yield api_client api_client.close() def api_should_return_200_and_passed_corroboration_given_successful_caa_check(self, api_client): request = MpicCaaRequest( - domain_or_ip_target='example.com', + domain_or_ip_target="example.com", orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), - caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['mozilla.com']) + caa_check_parameters=CaaCheckParameters( + certificate_type=CertificateType.TLS_SERVER, caa_domains=["mozilla.com"] + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -54,9 +62,12 @@ def api_should_return_200_and_passed_corroboration_given_successful_caa_check(se assert mpic_response.is_valid is True perspectives_list = mpic_response.perspectives assert len(perspectives_list) == request.orchestration_parameters.perspective_count - assert (len(list(filter(lambda perspective: perspective.check_type == CheckType.CAA, perspectives_list))) - == request.orchestration_parameters.perspective_count) + assert ( + len(list(filter(lambda perspective: perspective.check_type == CheckType.CAA, perspectives_list))) + == request.orchestration_parameters.perspective_count + ) + # fmt: off @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, is_wildcard_domain', [ ('empty.basic.caatestsuite.com', 'Tests handling of 0 issue ";"', False), ('deny.basic.caatestsuite.com', 'Tests handling of 0 issue "caatestsuite.com"', False), @@ -81,8 +92,10 @@ def api_should_return_200_and_passed_corroboration_given_successful_caa_check(se ('refused.caatestsuite-dnssec.com', 'Tests rejection when DNSSEC chain goes to server returning REFUSED', False), ('xss.caatestsuite.com', 'Tests rejection when issue property has HTML and JS', False), ]) - def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suite(self, api_client, domain_or_ip_target, - purpose_of_test, is_wildcard_domain): + # fmt: on + def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suite( + self, api_client, domain_or_ip_target, purpose_of_test, is_wildcard_domain + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") if is_wildcard_domain: domain_or_ip_target = "*." + domain_or_ip_target @@ -90,8 +103,8 @@ def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suit domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), caa_check_parameters=CaaCheckParameters( - certificate_type=CertificateType.TLS_SERVER, caa_domains=['example.com'] - ) + certificate_type=CertificateType.TLS_SERVER, caa_domains=["example.com"] + ), ) response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) mpic_response = self.mpic_response_adapter.validate_json(response.text) @@ -101,6 +114,7 @@ def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suit # This case is handled in a compliant manner as it is treated as a lookup failure. # The test for proper communication with an IPv6 nameserver can be enabled with the following additional parameter to the list below. # ('ipv6only.caatestsuite.com', 'Tests handling of record at IPv6-only authoritative name server', False), + # fmt: off @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, is_wildcard_domain', [ ('deny.basic.caatestsuite.com', 'Tests handling of 0 issue "caatestsuite.com"', False), ('uppercase-deny.basic.caatestsuite.com', 'Tests handling of uppercase issue tag (0 ISSUE "caatestsuite.com")', False), @@ -116,8 +130,10 @@ def api_should_return_is_valid_false_for_all_tests_in_do_not_issue_caa_test_suit ('permit.basic.caatestsuite.com', 'Tests acceptance when name contains a permissible CAA record set', False), ('deny.permit.basic.caatestsuite.com', 'Tests acceptance on a CAA record set', False), ]) - def api_should_return_is_valid_true_for_valid_tests_in_caa_test_suite_when_caa_domain_is_caatestsuite_com(self, api_client, domain_or_ip_target, - purpose_of_test, is_wildcard_domain): + # fmt: on + def api_should_return_is_valid_true_for_valid_tests_in_caa_test_suite_when_caa_domain_is_caatestsuite_com( + self, api_client, domain_or_ip_target, purpose_of_test, is_wildcard_domain + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") if is_wildcard_domain: domain_or_ip_target = "*." + domain_or_ip_target @@ -125,148 +141,182 @@ def api_should_return_is_valid_true_for_valid_tests_in_caa_test_suite_when_caa_d domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), caa_check_parameters=CaaCheckParameters( - certificate_type=CertificateType.TLS_SERVER, caa_domains=['caatestsuite.com', 'example.com']) + certificate_type=CertificateType.TLS_SERVER, caa_domains=["caatestsuite.com", "example.com"] + ), ) response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) mpic_response = self.mpic_response_adapter.validate_json(response.text) assert mpic_response.is_valid is True - @pytest.mark.skip(reason='Behavior not required in RFC 8659') + @pytest.mark.skip(reason="Behavior not required in RFC 8659") + # fmt: off @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test', [ ('dname-deny.basic.caatestsuite.com', 'Tests handling of a DNAME when CAA record exists at DNAME target'), ('cname-deny-sub.basic.caatestsuite.com', 'Tests handling of a CNAME when CAA record exists at parent of CNAME target') ]) - def api_should_return_is_valid_false_for_do_not_issue_caa_test_suite_for_rfc_6844(self, api_client, domain_or_ip_target, purpose_of_test): + # fmt: on + def api_should_return_is_valid_false_for_do_not_issue_caa_test_suite_for_rfc_6844( + self, api_client, domain_or_ip_target, purpose_of_test + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicCaaRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), - caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['example.com']) + caa_check_parameters=CaaCheckParameters( + certificate_type=CertificateType.TLS_SERVER, caa_domains=["example.com"] + ), ) response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) mpic_response = self.mpic_response_adapter.validate_json(response.text) assert mpic_response.is_valid is False + # fmt: off @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test', [ ('dns-01.integration-testing.open-mpic.org', 'Standard proper dns-01 test'), ('dns-01-multi.integration-testing.open-mpic.org', 'Proper dns-01 test with multiple TXT records'), ('dns-01-cname.integration-testing.open-mpic.org', 'Proper dns-01 test with CNAME') ]) + # fmt: on def api_should_return_200_given_valid_dns_01_validation(self, api_client, domain_or_ip_target, purpose_of_test): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvAcmeDns01ValidationDetails(key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo") - ) + validation_details=DcvAcmeDns01ValidationDetails( + key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo" + ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is True + # fmt: off @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test', [ ('dns-01-leading-whitespace.integration-testing.open-mpic.org', 'leading whitespace'), ('dns-01-trailing-whitespace.integration-testing.open-mpic.org', 'trailing'), ('dns-01-nxdomain.integration-testing.open-mpic.org', 'NXDOMAIN') ]) - def api_should_return_200_is_valid_false_given_invalid_dns_01_validation(self, api_client, domain_or_ip_target, purpose_of_test): + # fmt: on + def api_should_return_200_is_valid_false_given_invalid_dns_01_validation( + self, api_client, domain_or_ip_target, purpose_of_test + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvAcmeDns01ValidationDetails(key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo") - ) + validation_details=DcvAcmeDns01ValidationDetails( + key_authorization="7FwkJPsKf-TH54wu4eiIFA3nhzYaevsL7953ihy-tpo" + ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - - assert mpic_response.is_valid is False + assert mpic_response.is_valid is False + # fmt: off @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, token, key_authorization', [ ('integration-testing.open-mpic.org', 'Standard http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs"), ('integration-testing.open-mpic.org', 'Redirect 302 http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xs") ]) - def api_should_return_200_given_valid_http_01_validation(self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization): + # fmt: on + def api_should_return_200_given_valid_http_01_validation( + self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( validation_details=DcvAcmeHttp01ValidationDetails(key_authorization=key_authorization, token=token) - ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - - assert mpic_response.is_valid is True + assert mpic_response.is_valid is True + # fmt: off @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, token, key_authorization', [ ('integration-testing.open-mpic.org', 'Failed http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa"), ('integration-testing.open-mpic.org', 'Failed 302 http-01 test', "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oB", "evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA.NzbLsXh8uDCcd-6MNwXF4W_7noWXFZAfHkxZsRGC9Xa") ]) - def api_should_return_200_given_invalid_http_01_validation(self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization): + # fmt: on + def api_should_return_200_given_invalid_http_01_validation( + self, api_client, domain_or_ip_target, purpose_of_test, token, key_authorization + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( validation_details=DcvAcmeHttp01ValidationDetails(key_authorization=key_authorization, token=token) - ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is False + # fmt: off @pytest.mark.parametrize('domain_or_ip_target, purpose_of_test, http_token_path, challenge_value', [ ('integration-testing.open-mpic.org', 'Valid website change v2 challenge', 'validation-doc.txt', 'test-validation'), ('integration-testing.open-mpic.org', 'Valid 302 website change v2 challenge', 'validation-doc-redirect.txt', "test-validation-redirect") ]) - def api_should_return_200_given_valid_website_change_validation(self, api_client, domain_or_ip_target, purpose_of_test, http_token_path, challenge_value): + # fmt: on + def api_should_return_200_given_valid_website_change_validation( + self, api_client, domain_or_ip_target, purpose_of_test, http_token_path, challenge_value + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvWebsiteChangeValidationDetails(http_token_path=http_token_path, - challenge_value=challenge_value) - ) + validation_details=DcvWebsiteChangeValidationDetails( + http_token_path=http_token_path, challenge_value=challenge_value + ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 200 mpic_response = self.mpic_response_adapter.validate_json(response.text) - + assert mpic_response.is_valid is True + # fmt: off @pytest.mark.parametrize('domain_or_ip_target, dns_record_type, challenge_value, purpose_of_test', [ ('dns-change-txt.integration-testing.open-mpic.org', DnsRecordType.TXT, "1234567890abcdefg.", 'standard TXT dns change'), ('dns-change-cname.integration-testing.open-mpic.org', DnsRecordType.CNAME, "1234567890abcdefg.", 'standard CNAME dns change'), ('dns-change-caa.integration-testing.open-mpic.org', DnsRecordType.CAA, '0 dnschange "1234567890abcdefg."', 'standard CAA dns change'), ]) - def api_should_return_200_is_valid_true_given_valid_dns_change_validation(self, api_client, domain_or_ip_target, dns_record_type, challenge_value, purpose_of_test): + # fmt: on + def api_should_return_200_is_valid_true_given_valid_dns_change_validation( + self, api_client, domain_or_ip_target, dns_record_type, challenge_value, purpose_of_test + ): print(f"Running test for {domain_or_ip_target} ({purpose_of_test})") request = MpicDcvRequest( domain_or_ip_target=domain_or_ip_target, orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), dcv_check_parameters=DcvCheckParameters( - validation_details=DcvDnsChangeValidationDetails(challenge_value=challenge_value, dns_record_type=dns_record_type, dns_name_prefix="") - ) + validation_details=DcvDnsChangeValidationDetails( + challenge_value=challenge_value, dns_record_type=dns_record_type, dns_name_prefix="" + ) + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -278,11 +328,10 @@ def api_should_return_200_is_valid_true_given_valid_dns_change_validation(self, def api_should_return_200_and_failed_corroboration_given_failed_dcv_check(self, api_client): request = MpicDcvRequest( - domain_or_ip_target='ifconfig.me', + domain_or_ip_target="ifconfig.me", dcv_check_parameters=DcvCheckParameters( - validation_details=DcvWebsiteChangeValidationDetails(http_token_path='/', - challenge_value='test') - ) + validation_details=DcvWebsiteChangeValidationDetails(http_token_path="/", challenge_value="test") + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -293,9 +342,13 @@ def api_should_return_200_and_failed_corroboration_given_failed_dcv_check(self, def api_should_return_400_given_invalid_orchestration_parameters_in_request(self, api_client): request = MpicCaaRequest( - domain_or_ip_target='example.com', - orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=5), # invalid quorum count - caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['mozilla.com']) + domain_or_ip_target="example.com", + orchestration_parameters=MpicRequestOrchestrationParameters( + perspective_count=3, quorum_count=5 + ), # invalid quorum count + caa_check_parameters=CaaCheckParameters( + certificate_type=CertificateType.TLS_SERVER, caa_domains=["mozilla.com"] + ), ) print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body @@ -303,20 +356,25 @@ def api_should_return_400_given_invalid_orchestration_parameters_in_request(self assert response.status_code == 400 response_body = json.loads(response.text) print("\nResponse:\n", json.dumps(response_body, indent=4)) # pretty print response body - assert response_body['error'] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key - assert any(issue['issue_type'] == MpicRequestValidationMessages.INVALID_QUORUM_COUNT.key for issue in response_body['validation_issues']) + assert response_body["error"] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key + assert any( + issue["issue_type"] == MpicRequestValidationMessages.INVALID_QUORUM_COUNT.key + for issue in response_body["validation_issues"] + ) def api_should_return_400_given_invalid_check_type_in_request(self, api_client): request = MpicCaaRequest( - domain_or_ip_target='example.com', + domain_or_ip_target="example.com", orchestration_parameters=MpicRequestOrchestrationParameters(perspective_count=3, quorum_count=2), - caa_check_parameters=CaaCheckParameters(certificate_type=CertificateType.TLS_SERVER, caa_domains=['mozilla.com']) + caa_check_parameters=CaaCheckParameters( + certificate_type=CertificateType.TLS_SERVER, caa_domains=["mozilla.com"] + ), ) - request.check_type = 'invalid_check_type' + request.check_type = "invalid_check_type" print("\nRequest:\n", json.dumps(request.model_dump(), indent=4)) # pretty print request body response = api_client.post(MPIC_REQUEST_PATH, json.dumps(request.model_dump())) assert response.status_code == 400 response_body = json.loads(response.text) print("\nResponse:\n", json.dumps(response_body, indent=4)) - assert response_body['error'] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key + assert response_body["error"] == MpicRequestValidationMessages.REQUEST_VALIDATION_FAILED.key