diff --git a/src/open_mpic_core/common_util/domain_encoder.py b/src/open_mpic_core/common_util/domain_encoder.py index 218e132..b6135e1 100644 --- a/src/open_mpic_core/common_util/domain_encoder.py +++ b/src/open_mpic_core/common_util/domain_encoder.py @@ -8,6 +8,15 @@ class DomainEncoder: @staticmethod def prepare_target_for_lookup(domain_or_ip_target) -> str: + # Handle bracketed IPv6 addresses (e.g., [2606:4700:4700::1111]) + if domain_or_ip_target.startswith("[") and domain_or_ip_target.endswith("]"): + inner = domain_or_ip_target[1:-1] + try: + ipaddress.ip_address(inner) + return inner # Return the IPv6 address without brackets + except ValueError: + pass # Not a valid IP, will be handled below + try: # First check if it's an IP address ipaddress.ip_address(domain_or_ip_target) diff --git a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py index 62166fd..eac0e50 100644 --- a/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py +++ b/src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py @@ -173,21 +173,33 @@ async def perform_dns_resolution(self, name_to_resolve, validation_method, dns_r lookup = await self.resolver.resolve(qname=name_to_resolve, rdtype=dns_rdata_type) return lookup + @staticmethod + def format_host_for_url(domain_or_ip_target: str) -> str: + """Format host for URL, wrapping IPv6 addresses in square brackets if needed.""" + try: + ip = ipaddress.ip_address(domain_or_ip_target) + if isinstance(ip, ipaddress.IPv6Address): + return f"[{domain_or_ip_target}]" + except ValueError: + pass + return domain_or_ip_target + async def perform_http_based_validation(self, request: DcvCheckRequest) -> DcvCheckResponse: validation_method = request.dcv_check_parameters.validation_method domain_or_ip_target = request.domain_or_ip_target + formatted_host = MpicDcvChecker.format_host_for_url(domain_or_ip_target) http_headers = request.dcv_check_parameters.http_headers if validation_method == DcvValidationMethod.WEBSITE_CHANGE: expected_response_content = request.dcv_check_parameters.challenge_value url_scheme = request.dcv_check_parameters.url_scheme token_path = request.dcv_check_parameters.http_token_path - token_url = f"{url_scheme}://{domain_or_ip_target}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{token_path}" # noqa E501 (http) + token_url = f"{url_scheme}://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{token_path}" # noqa E501 (http) dcv_check_response = DcvUtils.create_empty_check_response(DcvValidationMethod.WEBSITE_CHANGE) else: expected_response_content = request.dcv_check_parameters.key_authorization token = request.dcv_check_parameters.token token_url = ( - f"http://{domain_or_ip_target}/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" # noqa E501 (http) + f"http://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" # noqa E501 (http) ) dcv_check_response = DcvUtils.create_empty_check_response(DcvValidationMethod.ACME_HTTP_01) try: diff --git a/tests/unit/open_mpic_core/test_domain_encoder.py b/tests/unit/open_mpic_core/test_domain_encoder.py index 3ee8ad9..c979e44 100644 --- a/tests/unit/open_mpic_core/test_domain_encoder.py +++ b/tests/unit/open_mpic_core/test_domain_encoder.py @@ -12,6 +12,10 @@ class TestDomainEncoder: ("café.example.com", "xn--caf-dma.example.com"), ("bücher.example.de", "xn--bcher-kva.example.de"), ("127.0.0.1", "127.0.0.1"), + ("2606:4700:4700::1111", "2606:4700:4700::1111"), + ("[2606:4700:4700::1111]", "2606:4700:4700::1111"), + ("::1", "::1"), + ("[::1]", "::1"), ("example.com", "example.com"), ("subdomain.café.example.com", "subdomain.xn--caf-dma.example.com"), ], diff --git a/tests/unit/open_mpic_core/test_mpic_dcv_checker.py b/tests/unit/open_mpic_core/test_mpic_dcv_checker.py index a9bca4e..97b2258 100644 --- a/tests/unit/open_mpic_core/test_mpic_dcv_checker.py +++ b/tests/unit/open_mpic_core/test_mpic_dcv_checker.py @@ -1012,6 +1012,72 @@ def _mock_successful_tls_alpn_validation_entirely(self, dcv_request, mocker): DcvTlsAlpnValidator, "perform_tls_alpn_validation", return_value=response ) + # fmt: off + @pytest.mark.parametrize("input_target, expected_output", [ + ("example.com", "example.com"), # regular domain unchanged + ("192.168.1.1", "192.168.1.1"), # IPv4 address unchanged + ("2001:db8::1", "[2001:db8::1]"), # IPv6 address wrapped in brackets + ("::1", "[::1]"), # IPv6 loopback wrapped in brackets + ("2001:0db8:85a3:0000:0000:8a2e:0370:7334", "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]"), # full IPv6 + ("fe80::1%eth0", "[fe80::1%eth0]"), # IPv6 with zone ID also wrapped + ]) + # fmt: on + def format_host_for_url__should_wrap_ipv6_addresses_in_square_brackets(self, input_target, expected_output): + result = MpicDcvChecker.format_host_for_url(input_target) + assert result == expected_output + + @pytest.mark.parametrize("dcv_method", [DcvValidationMethod.WEBSITE_CHANGE, DcvValidationMethod.ACME_HTTP_01]) + async def http_based_dcv_checks__should_format_ipv6_addresses_with_square_brackets_in_url( + self, dcv_method, mocker + ): + dcv_request = ValidCheckCreator.create_valid_dcv_check_request(dcv_method) + ipv6_address = "2001:db8::1" + dcv_request.domain_or_ip_target = ipv6_address + + if dcv_method == DcvValidationMethod.WEBSITE_CHANGE: + expected_challenge = dcv_request.dcv_check_parameters.challenge_value + url_scheme = dcv_request.dcv_check_parameters.url_scheme + http_token_path = dcv_request.dcv_check_parameters.http_token_path + expected_url = f"{url_scheme}://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}" + else: + expected_challenge = dcv_request.dcv_check_parameters.key_authorization + token = dcv_request.dcv_check_parameters.token + expected_url = f"http://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" + + success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge) + mock_get = mocker.patch( + "aiohttp.ClientSession.get", + side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)), + ) + + dcv_response = await self.dcv_checker.check_dcv(dcv_request) + + # Verify the URL used in the request contains properly formatted IPv6 + assert mock_get.call_args.kwargs["url"] == expected_url + assert dcv_response.details.response_url == expected_url + + async def http_based_dcv_checks__should_not_modify_ipv4_addresses_in_url(self, mocker): + dcv_request = ValidCheckCreator.create_valid_dcv_check_request(DcvValidationMethod.WEBSITE_CHANGE) + ipv4_address = "192.168.1.1" + dcv_request.domain_or_ip_target = ipv4_address + + expected_challenge = dcv_request.dcv_check_parameters.challenge_value + url_scheme = dcv_request.dcv_check_parameters.url_scheme + http_token_path = dcv_request.dcv_check_parameters.http_token_path + expected_url = f"{url_scheme}://{ipv4_address}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}" + + success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge) + mock_get = mocker.patch( + "aiohttp.ClientSession.get", + side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)), + ) + + dcv_response = await self.dcv_checker.check_dcv(dcv_request) + + # Verify the URL used in the request contains IPv4 without modification + assert mock_get.call_args.kwargs["url"] == expected_url + assert dcv_response.details.response_url == expected_url + @staticmethod def shuffle_case(string_to_shuffle: str) -> str: result = "".join(