Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/open_mpic_core/common_util/domain_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@
class DomainEncoder:
@staticmethod
def prepare_target_for_lookup(domain_or_ip_target) -> str:
# Handle bracketed IPv6 addresses (e.g., [2606:4700:4700::1111])
if domain_or_ip_target.startswith("[") and domain_or_ip_target.endswith("]"):
inner = domain_or_ip_target[1:-1]
try:
ipaddress.ip_address(inner)
return inner # Return the IPv6 address without brackets
except ValueError:
pass # Not a valid IP, will be handled below

try:
# First check if it's an IP address
ipaddress.ip_address(domain_or_ip_target)
Expand Down
16 changes: 14 additions & 2 deletions src/open_mpic_core/mpic_dcv_checker/mpic_dcv_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,33 @@ async def perform_dns_resolution(self, name_to_resolve, validation_method, dns_r
lookup = await self.resolver.resolve(qname=name_to_resolve, rdtype=dns_rdata_type)
return lookup

@staticmethod
def format_host_for_url(domain_or_ip_target: str) -> str:
"""Format host for URL, wrapping IPv6 addresses in square brackets if needed."""
try:
ip = ipaddress.ip_address(domain_or_ip_target)
if isinstance(ip, ipaddress.IPv6Address):
return f"[{domain_or_ip_target}]"
except ValueError:
pass
return domain_or_ip_target

async def perform_http_based_validation(self, request: DcvCheckRequest) -> DcvCheckResponse:
validation_method = request.dcv_check_parameters.validation_method
domain_or_ip_target = request.domain_or_ip_target
formatted_host = MpicDcvChecker.format_host_for_url(domain_or_ip_target)
http_headers = request.dcv_check_parameters.http_headers
if validation_method == DcvValidationMethod.WEBSITE_CHANGE:
expected_response_content = request.dcv_check_parameters.challenge_value
url_scheme = request.dcv_check_parameters.url_scheme
token_path = request.dcv_check_parameters.http_token_path
token_url = f"{url_scheme}://{domain_or_ip_target}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{token_path}" # noqa E501 (http)
token_url = f"{url_scheme}://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{token_path}" # noqa E501 (http)
dcv_check_response = DcvUtils.create_empty_check_response(DcvValidationMethod.WEBSITE_CHANGE)
else:
expected_response_content = request.dcv_check_parameters.key_authorization
token = request.dcv_check_parameters.token
token_url = (
f"http://{domain_or_ip_target}/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" # noqa E501 (http)
f"http://{formatted_host}/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}" # noqa E501 (http)
)
dcv_check_response = DcvUtils.create_empty_check_response(DcvValidationMethod.ACME_HTTP_01)
try:
Expand Down
4 changes: 4 additions & 0 deletions tests/unit/open_mpic_core/test_domain_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ class TestDomainEncoder:
("café.example.com", "xn--caf-dma.example.com"),
("bücher.example.de", "xn--bcher-kva.example.de"),
("127.0.0.1", "127.0.0.1"),
("2606:4700:4700::1111", "2606:4700:4700::1111"),
("[2606:4700:4700::1111]", "2606:4700:4700::1111"),
("::1", "::1"),
("[::1]", "::1"),
("example.com", "example.com"),
("subdomain.café.example.com", "subdomain.xn--caf-dma.example.com"),
],
Expand Down
66 changes: 66 additions & 0 deletions tests/unit/open_mpic_core/test_mpic_dcv_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,72 @@ def _mock_successful_tls_alpn_validation_entirely(self, dcv_request, mocker):
DcvTlsAlpnValidator, "perform_tls_alpn_validation", return_value=response
)

# fmt: off
@pytest.mark.parametrize("input_target, expected_output", [
("example.com", "example.com"), # regular domain unchanged
("192.168.1.1", "192.168.1.1"), # IPv4 address unchanged
("2001:db8::1", "[2001:db8::1]"), # IPv6 address wrapped in brackets
("::1", "[::1]"), # IPv6 loopback wrapped in brackets
("2001:0db8:85a3:0000:0000:8a2e:0370:7334", "[2001:0db8:85a3:0000:0000:8a2e:0370:7334]"), # full IPv6
("fe80::1%eth0", "[fe80::1%eth0]"), # IPv6 with zone ID also wrapped
])
# fmt: on
def format_host_for_url__should_wrap_ipv6_addresses_in_square_brackets(self, input_target, expected_output):
result = MpicDcvChecker.format_host_for_url(input_target)
assert result == expected_output

@pytest.mark.parametrize("dcv_method", [DcvValidationMethod.WEBSITE_CHANGE, DcvValidationMethod.ACME_HTTP_01])
async def http_based_dcv_checks__should_format_ipv6_addresses_with_square_brackets_in_url(
self, dcv_method, mocker
):
dcv_request = ValidCheckCreator.create_valid_dcv_check_request(dcv_method)
ipv6_address = "2001:db8::1"
dcv_request.domain_or_ip_target = ipv6_address

if dcv_method == DcvValidationMethod.WEBSITE_CHANGE:
expected_challenge = dcv_request.dcv_check_parameters.challenge_value
url_scheme = dcv_request.dcv_check_parameters.url_scheme
http_token_path = dcv_request.dcv_check_parameters.http_token_path
expected_url = f"{url_scheme}://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}"
else:
expected_challenge = dcv_request.dcv_check_parameters.key_authorization
token = dcv_request.dcv_check_parameters.token
expected_url = f"http://[{ipv6_address}]/{MpicDcvChecker.WELL_KNOWN_ACME_PATH}/{token}"

success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge)
mock_get = mocker.patch(
"aiohttp.ClientSession.get",
side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)),
)

dcv_response = await self.dcv_checker.check_dcv(dcv_request)

# Verify the URL used in the request contains properly formatted IPv6
assert mock_get.call_args.kwargs["url"] == expected_url
assert dcv_response.details.response_url == expected_url

async def http_based_dcv_checks__should_not_modify_ipv4_addresses_in_url(self, mocker):
dcv_request = ValidCheckCreator.create_valid_dcv_check_request(DcvValidationMethod.WEBSITE_CHANGE)
ipv4_address = "192.168.1.1"
dcv_request.domain_or_ip_target = ipv4_address

expected_challenge = dcv_request.dcv_check_parameters.challenge_value
url_scheme = dcv_request.dcv_check_parameters.url_scheme
http_token_path = dcv_request.dcv_check_parameters.http_token_path
expected_url = f"{url_scheme}://{ipv4_address}/{MpicDcvChecker.WELL_KNOWN_PKI_PATH}/{http_token_path}"

success_response = TestMpicDcvChecker._create_mock_http_response(200, expected_challenge)
mock_get = mocker.patch(
"aiohttp.ClientSession.get",
side_effect=lambda *args, **kwargs: AsyncMock(__aenter__=AsyncMock(return_value=success_response)),
)

dcv_response = await self.dcv_checker.check_dcv(dcv_request)

# Verify the URL used in the request contains IPv4 without modification
assert mock_get.call_args.kwargs["url"] == expected_url
assert dcv_response.details.response_url == expected_url

@staticmethod
def shuffle_case(string_to_shuffle: str) -> str:
result = "".join(
Expand Down