From bd5f31a88c130af37a8322cb048056ba9b397367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 21 May 2024 14:33:22 -0400 Subject: [PATCH 1/9] Added an API for working with wheel files --- src/packaging/wheelfile.py | 547 +++++++++++++++++++++++++++++++++++++ tests/test_wheelfile.py | 218 +++++++++++++++ 2 files changed, 765 insertions(+) create mode 100644 src/packaging/wheelfile.py create mode 100644 tests/test_wheelfile.py diff --git a/src/packaging/wheelfile.py b/src/packaging/wheelfile.py new file mode 100644 index 000000000..8b1693b39 --- /dev/null +++ b/src/packaging/wheelfile.py @@ -0,0 +1,547 @@ +from __future__ import annotations + +import csv +import hashlib +import os.path +import re +import stat +import time +from base64 import urlsafe_b64decode, urlsafe_b64encode +from collections import OrderedDict +from collections.abc import Iterable, Iterator +from contextlib import ExitStack +from datetime import datetime, timezone +from email.message import Message +from email.policy import EmailPolicy +from io import BytesIO, StringIO, UnsupportedOperation +from os import PathLike +from pathlib import Path, PurePath +from types import TracebackType +from typing import IO, NamedTuple +from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile, ZipInfo + +from . import __version__ as wheel_version +from .tags import Tag +from .utils import ( + InvalidWheelFilename, + NormalizedName, + parse_wheel_filename, +) +from .version import Version + +_DIST_NAME_RE = re.compile(r"[^A-Za-z0-9.]+") +_EXCLUDE_FILENAMES = ("RECORD", "RECORD.jws", "RECORD.p7s") +DEFAULT_TIMESTAMP = datetime(1980, 1, 1, tzinfo=timezone.utc) +EMAIL_POLICY = EmailPolicy(max_line_length=0, mangle_from_=False, utf8=True) + + +class WheelMetadata(NamedTuple): + name: NormalizedName + version: Version + build_tag: tuple[int, str] | tuple[()] + tags: frozenset[Tag] + + @classmethod + def from_filename(cls, fname: str) -> WheelMetadata: + try: + name, version, build, tags = parse_wheel_filename(fname) + except InvalidWheelFilename as exc: + raise WheelError(f"Bad wheel filename {fname!r}") from exc + + return cls(name, version, build, tags) + + +class WheelRecordEntry(NamedTuple): + hash_algorithm: str + hash_value: bytes + filesize: int + + +class WheelContentElement(NamedTuple): + path: PurePath + hash_value: bytes + size: int + stream: IO[bytes] + + +def _encode_hash_value(hash_value: bytes) -> str: + return urlsafe_b64encode(hash_value).rstrip(b"=").decode("ascii") + + +def _decode_hash_value(encoded_hash: str) -> bytes: + pad = b"=" * (4 - (len(encoded_hash) & 3)) + return urlsafe_b64decode(encoded_hash.encode("ascii") + pad) + + +def make_filename( + name: str, + version: str, + build_tag: str | int | None = None, + impl_tag: str = "py3", + abi_tag: str = "none", + plat_tag: str = "any", +) -> str: + name = _DIST_NAME_RE.sub("_", name) + version = _DIST_NAME_RE.sub("_", version) + filename = f"{name}-{version}" + if build_tag: + filename = f"{filename}-{build_tag}" + + return f"{filename}-{impl_tag}-{abi_tag}-{plat_tag}.whl" + + +class WheelError(Exception): + pass + + +class WheelArchiveFile: + def __init__( + self, fp: IO[bytes], arcname: str, record_entry: WheelRecordEntry | None + ): + self._fp = fp + self._arcname = arcname + self._record_entry = record_entry + if record_entry: + self._hash = hashlib.new(record_entry.hash_algorithm) + self._num_bytes_read = 0 + + def read(self, amount: int = -1) -> bytes: + data = self._fp.read(amount) + if amount and self._record_entry is not None: + if data: + self._hash.update(data) + self._num_bytes_read += len(data) + elif self._record_entry: + # The file has been read in full – check that hash and file size match + # with the entry in RECORD + if self._hash.digest() != self._record_entry.hash_value: + raise WheelError(f"Hash mismatch for file {self._arcname!r}") + elif self._num_bytes_read != self._record_entry.filesize: + raise WheelError( + f"{self._arcname}: file size mismatch: " + f"{self._record_entry.filesize} bytes in RECORD, " + f"{self._num_bytes_read} bytes in archive" + ) + + return data + + def __enter__(self) -> WheelArchiveFile: + return self + + def __exit__( + self, + exc_type: type[BaseException], + exc_val: BaseException, + exc_tb: TracebackType, + ) -> None: + self._fp.close() + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self._fp!r}, {self._arcname!r})" + + +class WheelReader: + name: NormalizedName + version: Version + _zip: ZipFile + _dist_info_dir: str + _data_dir: str + _record_entries: OrderedDict[str, WheelRecordEntry] + + def __init__(self, path_or_fd: str | PathLike[str] | IO[bytes]): + self.path_or_fd = path_or_fd + + if isinstance(path_or_fd, (str, PathLike)): + fname = Path(path_or_fd).name + try: + self.name, self.version = parse_wheel_filename(fname)[:2] + except InvalidWheelFilename as exc: + raise WheelError(str(exc)) from None + + def __enter__(self) -> WheelReader: + self._zip = ZipFile(self.path_or_fd, "r") + + # See if the expected .dist-info directory is in place by searching for RECORD + # in the expected directory. Wheels made with older versions of "wheel" did not + # properly normalize the names, so the name of the .dist-info directory does not + # match the expectation there. + dist_info_dir: str | None = None + if hasattr(self, "name"): + dist_info_dir = f"{self.name}-{self.version}.dist-info" + try: + self._zip.getinfo(f"{dist_info_dir}/RECORD") + except KeyError: + dist_info_dir = None + else: + self._dist_info_dir = dist_info_dir + self._data_dir = f"{self.name}-{self.version}.data" + + # If no .dist-info directory could not be found yet, resort to scanning the + # archive's file names for any .dist-info directory containing a RECORD file. + if dist_info_dir is None: + try: + for zinfo in reversed(self._zip.infolist()): + if zinfo.filename.endswith(".dist-info/RECORD"): + dist_info_dir = zinfo.filename.rsplit("/", 1)[0] + namever = dist_info_dir.rsplit(".", 1)[0] + name, version = namever.rpartition("-")[::2] + if name and version: + self.name = NormalizedName(name) + self.version = Version(version) + self._dist_info_dir = dist_info_dir + self._data_dir = dist_info_dir.replace( + ".dist-info", ".data" + ) + break + else: + raise WheelError( + "Cannot find a valid .dist-info directory. " + "Is this really a wheel file?" + ) + except BaseException: + self._zip.close() + raise + + self._record_entries = self._read_record() + return self + + def __exit__( + self, + exc_type: type[BaseException], + exc_val: BaseException, + exc_tb: TracebackType, + ) -> None: + self._zip.close() + self._record_entries.clear() + del self._zip + + def _read_record(self) -> OrderedDict[str, WheelRecordEntry]: + entries = OrderedDict() + try: + contents = self.read_dist_info("RECORD") + except WheelError: + raise WheelError(f"Missing {self._dist_info_dir}/RECORD file") from None + + reader = csv.reader( + contents.strip().split("\n"), + delimiter=",", + quotechar='"', + lineterminator="\n", + ) + for row in reader: + if not row: + break + + path, hash_digest, filesize = row + if hash_digest: + algorithm, hash_digest = hash_digest.split("=") + try: + hashlib.new(algorithm) + except ValueError: + raise WheelError( + f"Unsupported hash algorithm: {algorithm}" + ) from None + + if algorithm.lower() in {"md5", "sha1"}: + raise WheelError( + f"Weak hash algorithm ({algorithm}) is not permitted by PEP 427" + ) + + entries[path] = WheelRecordEntry( + algorithm, _decode_hash_value(hash_digest), int(filesize) + ) + + return entries + + @property + def dist_info_dir(self) -> str: + return self._dist_info_dir + + @property + def data_dir(self) -> str: + return self._data_dir + + @property + def dist_info_filenames(self) -> list[PurePath]: + return [ + PurePath(fname) + for fname in self._zip.namelist() + if fname.startswith(self._dist_info_dir) + ] + + @property + def filenames(self) -> list[PurePath]: + return [PurePath(fname) for fname in self._zip.namelist()] + + def read_dist_info(self, filename: str) -> str: + filename = self.dist_info_dir + "/" + filename + try: + contents = self._zip.read(filename) + except KeyError: + raise WheelError(f"File {filename!r} not found") from None + + return contents.decode("utf-8") + + def get_contents(self) -> Iterator[WheelContentElement]: + for fname, entry in self._record_entries.items(): + with self._zip.open(fname, "r") as stream: + yield WheelContentElement( + PurePath(fname), entry.hash_value, entry.filesize, stream + ) + + def validate_record(self) -> None: + """Verify the integrity of the contained files.""" + for zinfo in self._zip.infolist(): + # Ignore signature files + basename = os.path.basename(zinfo.filename) + if basename in _EXCLUDE_FILENAMES: + continue + + try: + record = self._record_entries[zinfo.filename] + except KeyError: + raise WheelError(f"No hash found for file {zinfo.filename!r}") from None + + hash_ = hashlib.new(record.hash_algorithm) + with self._zip.open(zinfo) as fp: + hash_.update(fp.read(65536)) + + if hash_.digest() != record.hash_value: + raise WheelError(f"Hash mismatch for file {zinfo.filename!r}") + + def extractall(self, base_path: str | PathLike[str]) -> None: + basedir = Path(base_path) + if not basedir.exists(): + raise WheelError(f"{basedir} does not exist") + elif not basedir.is_dir(): + raise WheelError(f"{basedir} is not a directory") + + for fname in self._zip.namelist(): + target_path = basedir.joinpath(fname) + target_path.parent.mkdir(0o755, True, True) + with self._open_file(fname) as infile, target_path.open("wb") as outfile: + while True: + data = infile.read(65536) + if not data: + break + + outfile.write(data) + + def _open_file(self, archive_name: str) -> WheelArchiveFile: + basename = os.path.basename(archive_name) + if basename in _EXCLUDE_FILENAMES: + record_entry = None + else: + record_entry = self._record_entries[archive_name] + + return WheelArchiveFile( + self._zip.open(archive_name), archive_name, record_entry + ) + + def read_file(self, archive_name: str) -> bytes: + with self._open_file(archive_name) as fp: + return fp.read() + + def read_data_file(self, filename: str) -> bytes: + archive_path = self._data_dir + "/" + filename.strip("/") + return self.read_file(archive_path) + + def read_distinfo_file(self, filename: str) -> bytes: + archive_path = self._dist_info_dir + "/" + filename.strip("/") + return self.read_file(archive_path) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.path_or_fd})" + + +def write_wheelfile( + fp: IO[bytes], metadata: WheelMetadata, generator: str, root_is_purelib: bool +) -> None: + msg = Message(policy=EMAIL_POLICY) + msg["Wheel-Version"] = "1.0" # of the spec + msg["Generator"] = generator + msg["Root-Is-Purelib"] = str(root_is_purelib).lower() + if metadata.build_tag: + msg["Build"] = str(metadata.build_tag[0]) + metadata.build_tag[1] + + for tag in sorted(metadata.tags, key=lambda t: (t.interpreter, t.abi, t.platform)): + msg["Tag"] = f"{tag.interpreter}-{tag.abi}-{tag.platform}" + + fp.write(msg.as_bytes()) + + +class WheelWriter: + def __init__( + self, + path_or_fd: str | PathLike[str] | IO[bytes], + metadata: WheelMetadata | None = None, + *, + generator: str | None = None, + root_is_purelib: bool = True, + compress: bool = True, + hash_algorithm: str = "sha256", + ): + self.path_or_fd = path_or_fd + self.generator = generator or f"packaging ({wheel_version})" + self.root_is_purelib = root_is_purelib + self.hash_algorithm = hash_algorithm + self._compress_type = ZIP_DEFLATED if compress else ZIP_STORED + + if metadata: + self.metadata = metadata + elif isinstance(path_or_fd, (str, PathLike)): + filename = Path(path_or_fd).name + self.metadata = WheelMetadata.from_filename(filename) + else: + raise WheelError("path_or_fd is not a path, and metadata was not provided") + + if hash_algorithm not in hashlib.algorithms_available: + raise ValueError(f"Hash algorithm {hash_algorithm!r} is not available") + elif hash_algorithm in ("md5", "sha1"): + raise ValueError( + f"Weak hash algorithm ({hash_algorithm}) is not permitted by PEP 427" + ) + + self._dist_info_dir = f"{self.metadata.name}-{self.metadata.version}.dist-info" + self._data_dir = f"{self.metadata.name}-{self.metadata.version}.data" + self._record_path = f"{self._dist_info_dir}/RECORD" + self._record_entries: dict[str, WheelRecordEntry] = OrderedDict() + + def __enter__(self) -> WheelWriter: + self._zip = ZipFile(self.path_or_fd, "w", compression=self._compress_type) + return self + + def __exit__( + self, + exc_type: type[BaseException], + exc_val: BaseException, + exc_tb: TracebackType, + ) -> None: + try: + if not exc_type: + if f"{self._dist_info_dir}/WHEEL" not in self._record_entries: + self._write_wheelfile() + + self._write_record() + finally: + self._zip.close() + + def _write_record(self) -> None: + data = StringIO() + writer = csv.writer(data, delimiter=",", quotechar='"', lineterminator="\n") + writer.writerows( + [ + ( + fname, + entry.hash_algorithm + "=" + _encode_hash_value(entry.hash_value), + entry.filesize, + ) + for fname, entry in self._record_entries.items() + ] + ) + writer.writerow((self._record_path, "", "")) + self.write_distinfo_file("RECORD", data.getvalue()) + + def _write_wheelfile(self) -> None: + buffer = BytesIO() + write_wheelfile(buffer, self.metadata, self.generator, self.root_is_purelib) + self.write_distinfo_file("WHEEL", buffer.getvalue()) + + def write_metadata(self, items: Iterable[tuple[str, str]]) -> None: + msg = Message(policy=EMAIL_POLICY) + for key, value in items: + key = key.title() + if key == "Description": + msg.set_payload(value, "utf-8") + else: + msg.add_header(key, value) + + if "Metadata-Version" not in msg: + msg["Metadata-Version"] = "2.1" + if "Name" not in msg: + msg["Name"] = self.metadata.name + if "Version" not in msg: + msg["Version"] = str(self.metadata.version) + + self.write_distinfo_file("METADATA", msg.as_bytes()) + + def write_file( + self, + name: str | PurePath, + contents: bytes | str | PathLike[str] | IO[bytes], + timestamp: datetime = DEFAULT_TIMESTAMP, + ) -> None: + arcname = PurePath(name).as_posix() + gmtime = time.gmtime(timestamp.timestamp()) + zinfo = ZipInfo(arcname, gmtime[:6]) + zinfo.compress_type = self._compress_type + zinfo.external_attr = 0o664 << 16 + with ExitStack() as exit_stack: + fp = exit_stack.enter_context(self._zip.open(zinfo, "w")) + if isinstance(contents, str): + contents = contents.encode("utf-8") + elif isinstance(contents, PathLike): + contents = exit_stack.enter_context(Path(contents).open("rb")) + + if isinstance(contents, bytes): + file_size = len(contents) + fp.write(contents) + hash_ = hashlib.new(self.hash_algorithm, contents) + else: + try: + st = os.stat(contents.fileno()) + except (AttributeError, UnsupportedOperation): + pass + else: + zinfo.external_attr = ( + stat.S_IMODE(st.st_mode) | stat.S_IFMT(st.st_mode) + ) << 16 + + hash_ = hashlib.new(self.hash_algorithm) + while True: + buffer = contents.read(65536) + if not buffer: + file_size = contents.tell() + break + + hash_.update(buffer) + fp.write(buffer) + + self._record_entries[arcname] = WheelRecordEntry( + self.hash_algorithm, hash_.digest(), file_size + ) + + def write_files_from_directory(self, directory: str | PathLike[str]) -> None: + basedir = Path(directory) + if not basedir.exists(): + raise WheelError(f"{basedir} does not exist") + elif not basedir.is_dir(): + raise WheelError(f"{basedir} is not a directory") + + for root, _dirs, files in os.walk(basedir): + for fname in files: + path = Path(root) / fname + relative = path.relative_to(basedir) + if relative.as_posix() != self._record_path: + self.write_file(relative, path) + + def write_data_file( + self, + filename: str, + contents: bytes | str | PathLike[str] | IO[bytes], + timestamp: datetime = DEFAULT_TIMESTAMP, + ) -> None: + archive_path = self._data_dir + "/" + filename.strip("/") + self.write_file(archive_path, contents, timestamp) + + def write_distinfo_file( + self, + filename: str, + contents: bytes | str | IO[bytes], + timestamp: datetime = DEFAULT_TIMESTAMP, + ) -> None: + archive_path = self._dist_info_dir + "/" + filename.strip() + self.write_file(archive_path, contents, timestamp) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.path_or_fd!r})" diff --git a/tests/test_wheelfile.py b/tests/test_wheelfile.py new file mode 100644 index 000000000..f6a8ebe51 --- /dev/null +++ b/tests/test_wheelfile.py @@ -0,0 +1,218 @@ +from __future__ import annotations + +import os.path +import sys +from pathlib import Path +from zipfile import ZIP_DEFLATED, ZipFile + +import pytest +from pytest import MonkeyPatch, TempPathFactory + +from packaging.wheel import WheelError, WheelReader, WheelWriter + + +@pytest.fixture +def wheel_path(tmp_path: Path) -> Path: + return tmp_path / "test-1.0-py2.py3-none-any.whl" + + +@pytest.mark.parametrize( + "filename, reason", + [ + pytest.param("test.whl", "wrong number of parts"), + pytest.param("test-1.0.whl", "wrong number of parts"), + pytest.param("test-1.0-py2.whl", "wrong number of parts"), + pytest.param("test-1.0-py2-none.whl", "wrong number of parts"), + pytest.param("test-1.0-py2-none-any", "extension must be '.whl'"), + pytest.param( + "test-1.0-py 2-none-any.whl", + "bad file name", + marks=[pytest.mark.xfail(reason="packaging does not fail this yet")], + ), + ], +) +def test_bad_wheel_filename(filename: str, reason: str) -> None: + basename = os.path.splitext(filename)[0] if filename.endswith(".whl") else filename + exc = pytest.raises(WheelError, WheelReader, filename) + exc.match(rf"^Invalid wheel filename \({reason}\): {basename}$") + + +def test_missing_record(wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + + with pytest.raises( + WheelError, + match=( + "^Cannot find a valid .dist-info directory. Is this really a wheel file\\?$" + ), + ): + with WheelReader(wheel_path): + pass + + +def test_unsupported_hash_algorithm(wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha000=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + with pytest.raises(WheelError, match="^Unsupported hash algorithm: sha000$"): + with WheelReader(wheel_path): + pass + + +@pytest.mark.parametrize( + "algorithm, digest", + [ + pytest.param("md5", "4J-scNa2qvSgy07rS4at-Q", id="md5"), + pytest.param("sha1", "QjCnGu5Qucb6-vir1a6BVptvOA4", id="sha1"), + ], +) +def test_weak_hash_algorithm(wheel_path: Path, algorithm: str, digest: str) -> None: + hash_string = f"{algorithm}={digest}" + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr("test-1.0.dist-info/RECORD", f"hello/héllö.py,{hash_string},25") + + with pytest.raises( + WheelError, + match=rf"^Weak hash algorithm \({algorithm}\) is not permitted by PEP 427$", + ): + with WheelReader(wheel_path): + pass + + +@pytest.mark.parametrize( + "algorithm, digest", + [ + ("sha256", "bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo"), + ("sha384", "cDXriAy_7i02kBeDkN0m2RIDz85w6pwuHkt2PZ4VmT2PQc1TZs8Ebvf6eKDFcD_S"), + ( + "sha512", + "kdX9CQlwNt4FfOpOKO_X0pn_v1opQuksE40SrWtMyP1NqooWVWpzCE3myZTfpy8g2azZON_" + "iLNpWVxTwuDWqBQ", + ), + ], + ids=["sha256", "sha384", "sha512"], +) +def test_validate_record(wheel_path: Path, algorithm: str, digest: str) -> None: + hash_string = f"{algorithm}={digest}" + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') + zf.writestr("test-1.0.dist-info/RECORD", f"hello/héllö.py,{hash_string},25") + + with WheelReader(wheel_path) as wf: + wf.validate_record() + + +def test_testzip_missing_hash(wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') + zf.writestr("test-1.0.dist-info/RECORD", "") + + with WheelReader(wheel_path) as wf: + exc = pytest.raises(WheelError, wf.validate_record) + exc.match("^No hash found for file 'hello/héllö.py'$") + + +def test_validate_record_bad_hash(wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + with WheelReader(wheel_path) as wf: + exc = pytest.raises(WheelError, wf.validate_record) + exc.match("^Hash mismatch for file 'hello/héllö.py'$") + + +def test_write_file(wheel_path: Path) -> None: + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + wf.write_file("hello/héllö.py", 'print("Héllö, world!")\n') + wf.write_file("hello/h,ll,.py", 'print("Héllö, world!")\n') + + with ZipFile(wheel_path, "r") as zf: + infolist = zf.infolist() + assert len(infolist) == 4 + assert infolist[0].filename == "hello/héllö.py" + assert infolist[0].file_size == 25 + assert infolist[1].filename == "hello/h,ll,.py" + assert infolist[1].file_size == 25 + assert infolist[2].filename == "test-1.0.dist-info/WHEEL" + assert infolist[3].filename == "test-1.0.dist-info/RECORD" + + record = zf.read("test-1.0.dist-info/RECORD") + assert record.decode("utf-8") == ( + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25\n" + '"hello/h,ll,.py",sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25\n' + "test-1.0.dist-info/WHEEL," + "sha256=KzXSdMADLwiK8h1P5UAQ76v3nVuO2ZRU8e9GCHCC6Qs,103\n" + "test-1.0.dist-info/RECORD,,\n" + ) + + +def test_timestamp( + tmp_path_factory: TempPathFactory, wheel_path: Path, monkeypatch: MonkeyPatch +) -> None: + # An environment variable can be used to influence the timestamp on + # TarInfo objects inside the zip. See issue #143. + build_dir = tmp_path_factory.mktemp("build") + for filename in ("one", "two", "three"): + build_dir.joinpath(filename).write_text(filename + "\n") + + # The earliest date representable in TarInfos, 1980-01-01 + monkeypatch.setenv("SOURCE_DATE_EPOCH", "315576060") + + with WheelWriter(wheel_path) as wf: + wf.write_files_from_directory(build_dir) + + with ZipFile(wheel_path, "r") as zf: + for info in zf.infolist(): + assert info.date_time == (1980, 1, 1, 0, 0, 0) + assert info.compress_type == ZIP_DEFLATED + + +@pytest.mark.skipif( + sys.platform == "win32", reason="Windows does not support UNIX-like permissions" +) +def test_attributes(tmp_path_factory: TempPathFactory, wheel_path: Path) -> None: + # With the change from ZipFile.write() to .writestr(), we need to manually + # set member attributes. + build_dir = tmp_path_factory.mktemp("build") + files = (("foo", 0o644), ("bar", 0o755)) + for filename, mode in files: + path = build_dir / filename + path.write_text(filename + "\n") + path.chmod(mode) + + with WheelWriter(wheel_path) as wf: + wf.write_files_from_directory(build_dir) + + with ZipFile(wheel_path, "r") as zf: + for filename, mode in files: + info = zf.getinfo(filename) + assert info.external_attr == (mode | 0o100000) << 16 + assert info.compress_type == ZIP_DEFLATED + + info = zf.getinfo("test-1.0.dist-info/RECORD") + permissions = (info.external_attr >> 16) & 0o777 + assert permissions == 0o664 + + +def test_unnormalized_wheel(tmp_path: Path) -> None: + # Previous versions of "wheel" did not correctly normalize the names; test that we + # can still read such wheels + wheel_path = tmp_path / "Test_foo_bar-1.0.0-py3-none-any.whl" + with ZipFile(wheel_path, "w") as zf: + zf.writestr( + "Test_foo_bar-1.0.0.dist-info/RECORD", + "Test_foo_bar-1.0.0.dist-info/RECORD,,\n", + ) + + with WheelReader(wheel_path): + pass From a1603bb188f083e5694132a1884fba72e9634aa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Tue, 21 May 2024 14:38:39 -0400 Subject: [PATCH 2/9] Updated import --- tests/test_wheelfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_wheelfile.py b/tests/test_wheelfile.py index f6a8ebe51..f903b8845 100644 --- a/tests/test_wheelfile.py +++ b/tests/test_wheelfile.py @@ -8,7 +8,7 @@ import pytest from pytest import MonkeyPatch, TempPathFactory -from packaging.wheel import WheelError, WheelReader, WheelWriter +from packaging.wheelfile import WheelError, WheelReader, WheelWriter @pytest.fixture From 84883aa6e3f39e0b8e396cdedfea40caedd2b0f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 22 May 2024 12:23:10 -0400 Subject: [PATCH 3/9] Shuffled code around and improved test coverage --- src/packaging/utils.py | 24 +++ src/packaging/wheelfile.py | 83 ++++---- tests/test_utils.py | 61 ++++++ tests/test_wheelfile.py | 407 +++++++++++++++++++------------------ 4 files changed, 339 insertions(+), 236 deletions(-) diff --git a/src/packaging/utils.py b/src/packaging/utils.py index d33da5bb8..c18ebf7fc 100644 --- a/src/packaging/utils.py +++ b/src/packaging/utils.py @@ -5,6 +5,7 @@ from __future__ import annotations import re +from collections.abc import Collection from typing import NewType, Tuple, Union, cast from .tags import Tag, parse_tag @@ -40,6 +41,7 @@ class InvalidSdistFilename(ValueError): _normalized_regex = re.compile(r"^([a-z0-9]|[a-z0-9]([a-z0-9-](?!--))*[a-z0-9])$") # PEP 427: The build number must start with a digit. _build_tag_regex = re.compile(r"(\d+)(.*)") +_dist_name_re = re.compile(r"[^a-z0-9.]+", re.IGNORECASE) def canonicalize_name(name: str, *, validate: bool = False) -> NormalizedName: @@ -102,6 +104,28 @@ def canonicalize_version( return "".join(parts) +def make_wheel_filename( + name: str, + version: str | Version, + tags: Collection[Tag], + *, + build_tag: BuildTag | None = None, +) -> str: + if not tags: + raise ValueError("At least one tag is required") + + name = canonicalize_name(name).replace("-", "_").lower() + version = canonicalize_version(version) + filename = f"{name}-{version}" + if build_tag: + filename = f"{filename}-{build_tag[0]}{build_tag[1]}" + + interpreter_tags = ".".join(tag.interpreter for tag in tags) + abi_tags = ".".join(tag.abi for tag in tags) + platform_tags = ".".join(tag.platform for tag in tags) + return f"{filename}-{interpreter_tags}-{abi_tags}-{platform_tags}.whl" + + def parse_wheel_filename( filename: str, ) -> tuple[NormalizedName, Version, BuildTag, frozenset[Tag]]: diff --git a/src/packaging/wheelfile.py b/src/packaging/wheelfile.py index 8b1693b39..8b6bc4d95 100644 --- a/src/packaging/wheelfile.py +++ b/src/packaging/wheelfile.py @@ -1,9 +1,19 @@ from __future__ import annotations +__all__ = [ + "WheelMetadata", + "WheelRecordEntry", + "WheelContentElement", + "WheelError", + "WheelArchiveFile", + "WheelReader", + "write_wheelfile", + "WheelWriter", +] + import csv import hashlib import os.path -import re import stat import time from base64 import urlsafe_b64decode, urlsafe_b64encode @@ -20,34 +30,29 @@ from typing import IO, NamedTuple from zipfile import ZIP_DEFLATED, ZIP_STORED, ZipFile, ZipInfo -from . import __version__ as wheel_version from .tags import Tag from .utils import ( + BuildTag, InvalidWheelFilename, NormalizedName, parse_wheel_filename, ) from .version import Version -_DIST_NAME_RE = re.compile(r"[^A-Za-z0-9.]+") -_EXCLUDE_FILENAMES = ("RECORD", "RECORD.jws", "RECORD.p7s") -DEFAULT_TIMESTAMP = datetime(1980, 1, 1, tzinfo=timezone.utc) -EMAIL_POLICY = EmailPolicy(max_line_length=0, mangle_from_=False, utf8=True) +_exclude_filenames = ("RECORD", "RECORD.jws", "RECORD.p7s") +_default_timestamp = datetime(1980, 1, 1, tzinfo=timezone.utc) +_email_policy = EmailPolicy(max_line_length=0, mangle_from_=False, utf8=True) class WheelMetadata(NamedTuple): name: NormalizedName version: Version - build_tag: tuple[int, str] | tuple[()] + build_tag: BuildTag tags: frozenset[Tag] @classmethod def from_filename(cls, fname: str) -> WheelMetadata: - try: - name, version, build, tags = parse_wheel_filename(fname) - except InvalidWheelFilename as exc: - raise WheelError(f"Bad wheel filename {fname!r}") from exc - + name, version, build, tags = parse_wheel_filename(fname) return cls(name, version, build, tags) @@ -73,23 +78,6 @@ def _decode_hash_value(encoded_hash: str) -> bytes: return urlsafe_b64decode(encoded_hash.encode("ascii") + pad) -def make_filename( - name: str, - version: str, - build_tag: str | int | None = None, - impl_tag: str = "py3", - abi_tag: str = "none", - plat_tag: str = "any", -) -> str: - name = _DIST_NAME_RE.sub("_", name) - version = _DIST_NAME_RE.sub("_", version) - filename = f"{name}-{version}" - if build_tag: - filename = f"{filename}-{build_tag}" - - return f"{filename}-{impl_tag}-{abi_tag}-{plat_tag}.whl" - - class WheelError(Exception): pass @@ -294,7 +282,7 @@ def validate_record(self) -> None: for zinfo in self._zip.infolist(): # Ignore signature files basename = os.path.basename(zinfo.filename) - if basename in _EXCLUDE_FILENAMES: + if basename in _exclude_filenames: continue try: @@ -329,7 +317,7 @@ def extractall(self, base_path: str | PathLike[str]) -> None: def _open_file(self, archive_name: str) -> WheelArchiveFile: basename = os.path.basename(archive_name) - if basename in _EXCLUDE_FILENAMES: + if basename in _exclude_filenames: record_entry = None else: record_entry = self._record_entries[archive_name] @@ -355,9 +343,9 @@ def __repr__(self) -> str: def write_wheelfile( - fp: IO[bytes], metadata: WheelMetadata, generator: str, root_is_purelib: bool + fp: IO[bytes], /, *, generator: str, metadata: WheelMetadata, root_is_purelib: bool ) -> None: - msg = Message(policy=EMAIL_POLICY) + msg = Message(policy=_email_policy) msg["Wheel-Version"] = "1.0" # of the spec msg["Generator"] = generator msg["Root-Is-Purelib"] = str(root_is_purelib).lower() @@ -374,15 +362,16 @@ class WheelWriter: def __init__( self, path_or_fd: str | PathLike[str] | IO[bytes], - metadata: WheelMetadata | None = None, + /, *, - generator: str | None = None, + generator: str, + metadata: WheelMetadata | None = None, root_is_purelib: bool = True, compress: bool = True, hash_algorithm: str = "sha256", ): self.path_or_fd = path_or_fd - self.generator = generator or f"packaging ({wheel_version})" + self.generator = generator self.root_is_purelib = root_is_purelib self.hash_algorithm = hash_algorithm self._compress_type = ZIP_DEFLATED if compress else ZIP_STORED @@ -444,11 +433,16 @@ def _write_record(self) -> None: def _write_wheelfile(self) -> None: buffer = BytesIO() - write_wheelfile(buffer, self.metadata, self.generator, self.root_is_purelib) + write_wheelfile( + buffer, + generator=self.generator, + metadata=self.metadata, + root_is_purelib=self.root_is_purelib, + ) self.write_distinfo_file("WHEEL", buffer.getvalue()) def write_metadata(self, items: Iterable[tuple[str, str]]) -> None: - msg = Message(policy=EMAIL_POLICY) + msg = Message(policy=_email_policy) for key, value in items: key = key.title() if key == "Description": @@ -469,7 +463,8 @@ def write_file( self, name: str | PurePath, contents: bytes | str | PathLike[str] | IO[bytes], - timestamp: datetime = DEFAULT_TIMESTAMP, + *, + timestamp: datetime = _default_timestamp, ) -> None: arcname = PurePath(name).as_posix() gmtime = time.gmtime(timestamp.timestamp()) @@ -529,19 +524,21 @@ def write_data_file( self, filename: str, contents: bytes | str | PathLike[str] | IO[bytes], - timestamp: datetime = DEFAULT_TIMESTAMP, + *, + timestamp: datetime = _default_timestamp, ) -> None: archive_path = self._data_dir + "/" + filename.strip("/") - self.write_file(archive_path, contents, timestamp) + self.write_file(archive_path, contents, timestamp=timestamp) def write_distinfo_file( self, filename: str, contents: bytes | str | IO[bytes], - timestamp: datetime = DEFAULT_TIMESTAMP, + *, + timestamp: datetime = _default_timestamp, ) -> None: archive_path = self._dist_info_dir + "/" + filename.strip() - self.write_file(archive_path, contents, timestamp) + self.write_file(archive_path, contents, timestamp=timestamp) def __repr__(self) -> str: return f"{self.__class__.__name__}({self.path_or_fd!r})" diff --git a/tests/test_utils.py b/tests/test_utils.py index 87c86eefd..ab93da988 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,17 +1,20 @@ # This file is dual licensed under the terms of the Apache License, Version # 2.0, and the BSD License. See the LICENSE file in the root of this repository # for complete details. +from __future__ import annotations import pytest from packaging.tags import Tag from packaging.utils import ( + BuildTag, InvalidName, InvalidSdistFilename, InvalidWheelFilename, canonicalize_name, canonicalize_version, is_normalized_name, + make_wheel_filename, parse_sdist_filename, parse_wheel_filename, ) @@ -92,6 +95,64 @@ def test_canonicalize_version_no_strip_trailing_zero(version): assert canonicalize_version(version, strip_trailing_zero=False) == version +@pytest.mark.parametrize( + ("expected_filename", "name", "version", "build", "tags"), + [ + pytest.param( + "foo-1.0-py3-none-any.whl", + "foo", + Version("1.0"), + (), + {Tag("py3", "none", "any")}, + id="simple", + ), + pytest.param( + "foo-1.4.0-py3-none-any.whl", + "foo", + "1.4.0.0.0", + (), + {Tag("py3", "none", "any")}, + id="longversion", + ), + pytest.param( + "some_pack_age-1.0-py3-none-any.whl", + "some-PACK.AGE", + Version("1.0"), + (), + {Tag("py3", "none", "any")}, + id="normalizename", + ), + pytest.param( + "foo-1.0-1000-py3-none-any.whl", + "foo", + Version("1.0"), + (1000, ""), + {Tag("py3", "none", "any")}, + id="numericbuildtag", + ), + pytest.param( + "foo-1.0-1000abc-py3-none-any.whl", + "foo", + Version("1.0"), + (1000, "abc"), + {Tag("py3", "none", "any")}, + id="complexbuildtag", + ), + ], +) +def test_make_wheel_filename( + expected_filename: str, name: str, version: Version, build: BuildTag, tags: set[Tag] +) -> None: + assert ( + make_wheel_filename(name, version, tags, build_tag=build) == expected_filename + ) + + +def test_make_wheel_filename_no_tags() -> None: + with pytest.raises(ValueError, match="At least one tag is required"): + make_wheel_filename("foo", "1.0", []) + + @pytest.mark.parametrize( ("filename", "name", "version", "build", "tags"), [ diff --git a/tests/test_wheelfile.py b/tests/test_wheelfile.py index f903b8845..cd23a2948 100644 --- a/tests/test_wheelfile.py +++ b/tests/test_wheelfile.py @@ -8,6 +8,7 @@ import pytest from pytest import MonkeyPatch, TempPathFactory +from packaging.utils import InvalidWheelFilename from packaging.wheelfile import WheelError, WheelReader, WheelWriter @@ -16,203 +17,223 @@ def wheel_path(tmp_path: Path) -> Path: return tmp_path / "test-1.0-py2.py3-none-any.whl" -@pytest.mark.parametrize( - "filename, reason", - [ - pytest.param("test.whl", "wrong number of parts"), - pytest.param("test-1.0.whl", "wrong number of parts"), - pytest.param("test-1.0-py2.whl", "wrong number of parts"), - pytest.param("test-1.0-py2-none.whl", "wrong number of parts"), - pytest.param("test-1.0-py2-none-any", "extension must be '.whl'"), - pytest.param( - "test-1.0-py 2-none-any.whl", - "bad file name", - marks=[pytest.mark.xfail(reason="packaging does not fail this yet")], - ), - ], -) -def test_bad_wheel_filename(filename: str, reason: str) -> None: - basename = os.path.splitext(filename)[0] if filename.endswith(".whl") else filename - exc = pytest.raises(WheelError, WheelReader, filename) - exc.match(rf"^Invalid wheel filename \({reason}\): {basename}$") - - -def test_missing_record(wheel_path: Path) -> None: - with ZipFile(wheel_path, "w") as zf: - zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') - - with pytest.raises( - WheelError, - match=( - "^Cannot find a valid .dist-info directory. Is this really a wheel file\\?$" - ), - ): - with WheelReader(wheel_path): - pass - - -def test_unsupported_hash_algorithm(wheel_path: Path) -> None: - with ZipFile(wheel_path, "w") as zf: - zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') - zf.writestr( - "test-1.0.dist-info/RECORD", - "hello/héllö.py,sha000=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", - ) +class TestWheelReader: + def test_bad_wheel_filename(self) -> None: + with pytest.raises(WheelError, match="Invalid wheel filename"): + WheelReader("badname") + + def test_missing_record(self, wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + + with pytest.raises( + WheelError, + match=( + r"^Cannot find a valid .dist-info directory. Is this really a wheel " + r"file\?$" + ), + ): + with WheelReader(wheel_path): + pass + + def test_unsupported_hash_algorithm(self, wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha000=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + with pytest.raises(WheelError, match="^Unsupported hash algorithm: sha000$"): + with WheelReader(wheel_path): + pass + + @pytest.mark.parametrize( + "algorithm, digest", + [ + pytest.param("md5", "4J-scNa2qvSgy07rS4at-Q", id="md5"), + pytest.param("sha1", "QjCnGu5Qucb6-vir1a6BVptvOA4", id="sha1"), + ], + ) + def test_weak_hash_algorithm( + self, wheel_path: Path, algorithm: str, digest: str + ) -> None: + hash_string = f"{algorithm}={digest}" + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr("test-1.0.dist-info/RECORD", f"hello/héllö.py,{hash_string},25") + + with pytest.raises( + WheelError, + match=rf"^Weak hash algorithm \({algorithm}\) is not permitted by PEP 427$", + ): + with WheelReader(wheel_path): + pass + + @pytest.mark.parametrize( + "algorithm, digest", + [ + ("sha256", "bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo"), + ( + "sha384", + "cDXriAy_7i02kBeDkN0m2RIDz85w6pwuHkt2PZ4VmT2PQc1TZs8Ebvf6eKDFcD_S", + ), + ( + "sha512", + "kdX9CQlwNt4FfOpOKO_X0pn_v1opQuksE40SrWtMyP1NqooWVWpzCE3myZTfpy8g2azZON_" + "iLNpWVxTwuDWqBQ", + ), + ], + ids=["sha256", "sha384", "sha512"], + ) + def test_validate_record( + self, wheel_path: Path, algorithm: str, digest: str + ) -> None: + hash_string = f"{algorithm}={digest}" + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') + zf.writestr("test-1.0.dist-info/RECORD", f"hello/héllö.py,{hash_string},25") + + with WheelReader(wheel_path) as wf: + wf.validate_record() + + def test_testzip_missing_hash(self, wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') + zf.writestr("test-1.0.dist-info/RECORD", "") + + with WheelReader(wheel_path) as wf: + exc = pytest.raises(WheelError, wf.validate_record) + exc.match("^No hash found for file 'hello/héllö.py'$") + + def test_validate_record_bad_hash(self, wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + with WheelReader(wheel_path) as wf: + exc = pytest.raises(WheelError, wf.validate_record) + exc.match("^Hash mismatch for file 'hello/héllö.py'$") + + def test_unnormalized_wheel(self, tmp_path: Path) -> None: + # Previous versions of "wheel" did not correctly normalize the names; test that + # we can still read such wheels + wheel_path = tmp_path / "Test_foo_bar-1.0.0-py3-none-any.whl" + with ZipFile(wheel_path, "w") as zf: + zf.writestr( + "Test_foo_bar-1.0.0.dist-info/RECORD", + "Test_foo_bar-1.0.0.dist-info/RECORD,,\n", + ) - with pytest.raises(WheelError, match="^Unsupported hash algorithm: sha000$"): with WheelReader(wheel_path): pass -@pytest.mark.parametrize( - "algorithm, digest", - [ - pytest.param("md5", "4J-scNa2qvSgy07rS4at-Q", id="md5"), - pytest.param("sha1", "QjCnGu5Qucb6-vir1a6BVptvOA4", id="sha1"), - ], -) -def test_weak_hash_algorithm(wheel_path: Path, algorithm: str, digest: str) -> None: - hash_string = f"{algorithm}={digest}" - with ZipFile(wheel_path, "w") as zf: - zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') - zf.writestr("test-1.0.dist-info/RECORD", f"hello/héllö.py,{hash_string},25") - - with pytest.raises( - WheelError, - match=rf"^Weak hash algorithm \({algorithm}\) is not permitted by PEP 427$", - ): - with WheelReader(wheel_path): - pass - - -@pytest.mark.parametrize( - "algorithm, digest", - [ - ("sha256", "bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo"), - ("sha384", "cDXriAy_7i02kBeDkN0m2RIDz85w6pwuHkt2PZ4VmT2PQc1TZs8Ebvf6eKDFcD_S"), - ( - "sha512", - "kdX9CQlwNt4FfOpOKO_X0pn_v1opQuksE40SrWtMyP1NqooWVWpzCE3myZTfpy8g2azZON_" - "iLNpWVxTwuDWqBQ", - ), - ], - ids=["sha256", "sha384", "sha512"], -) -def test_validate_record(wheel_path: Path, algorithm: str, digest: str) -> None: - hash_string = f"{algorithm}={digest}" - with ZipFile(wheel_path, "w") as zf: - zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') - zf.writestr("test-1.0.dist-info/RECORD", f"hello/héllö.py,{hash_string},25") - - with WheelReader(wheel_path) as wf: - wf.validate_record() - - -def test_testzip_missing_hash(wheel_path: Path) -> None: - with ZipFile(wheel_path, "w") as zf: - zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') - zf.writestr("test-1.0.dist-info/RECORD", "") - - with WheelReader(wheel_path) as wf: - exc = pytest.raises(WheelError, wf.validate_record) - exc.match("^No hash found for file 'hello/héllö.py'$") - - -def test_validate_record_bad_hash(wheel_path: Path) -> None: - with ZipFile(wheel_path, "w") as zf: - zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') - zf.writestr( - "test-1.0.dist-info/RECORD", - "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", - ) - - with WheelReader(wheel_path) as wf: - exc = pytest.raises(WheelError, wf.validate_record) - exc.match("^Hash mismatch for file 'hello/héllö.py'$") - - -def test_write_file(wheel_path: Path) -> None: - with WheelWriter(wheel_path, generator="generator 1.0") as wf: - wf.write_file("hello/héllö.py", 'print("Héllö, world!")\n') - wf.write_file("hello/h,ll,.py", 'print("Héllö, world!")\n') - - with ZipFile(wheel_path, "r") as zf: - infolist = zf.infolist() - assert len(infolist) == 4 - assert infolist[0].filename == "hello/héllö.py" - assert infolist[0].file_size == 25 - assert infolist[1].filename == "hello/h,ll,.py" - assert infolist[1].file_size == 25 - assert infolist[2].filename == "test-1.0.dist-info/WHEEL" - assert infolist[3].filename == "test-1.0.dist-info/RECORD" - - record = zf.read("test-1.0.dist-info/RECORD") - assert record.decode("utf-8") == ( - "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25\n" - '"hello/h,ll,.py",sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25\n' - "test-1.0.dist-info/WHEEL," - "sha256=KzXSdMADLwiK8h1P5UAQ76v3nVuO2ZRU8e9GCHCC6Qs,103\n" - "test-1.0.dist-info/RECORD,,\n" +class TestWheelWriter: + @pytest.mark.parametrize( + "filename, reason", + [ + pytest.param("test.whl", "wrong number of parts"), + pytest.param("test-1.0.whl", "wrong number of parts"), + pytest.param("test-1.0-py2.whl", "wrong number of parts"), + pytest.param("test-1.0-py2-none.whl", "wrong number of parts"), + pytest.param("test-1.0-py2-none-any", "extension must be '.whl'"), + pytest.param( + "test-1.0-py 2-none-any.whl", + "bad file name", + marks=[ + pytest.mark.xfail( + reason="parse_wheel_filename() does not fail this yet" + ) + ], + ), + ], + ) + def test_bad_wheel_filename(self, filename: str, reason: str) -> None: + basename = ( + os.path.splitext(filename)[0] if filename.endswith(".whl") else filename ) - - -def test_timestamp( - tmp_path_factory: TempPathFactory, wheel_path: Path, monkeypatch: MonkeyPatch -) -> None: - # An environment variable can be used to influence the timestamp on - # TarInfo objects inside the zip. See issue #143. - build_dir = tmp_path_factory.mktemp("build") - for filename in ("one", "two", "three"): - build_dir.joinpath(filename).write_text(filename + "\n") - - # The earliest date representable in TarInfos, 1980-01-01 - monkeypatch.setenv("SOURCE_DATE_EPOCH", "315576060") - - with WheelWriter(wheel_path) as wf: - wf.write_files_from_directory(build_dir) - - with ZipFile(wheel_path, "r") as zf: - for info in zf.infolist(): - assert info.date_time == (1980, 1, 1, 0, 0, 0) - assert info.compress_type == ZIP_DEFLATED - - -@pytest.mark.skipif( - sys.platform == "win32", reason="Windows does not support UNIX-like permissions" -) -def test_attributes(tmp_path_factory: TempPathFactory, wheel_path: Path) -> None: - # With the change from ZipFile.write() to .writestr(), we need to manually - # set member attributes. - build_dir = tmp_path_factory.mktemp("build") - files = (("foo", 0o644), ("bar", 0o755)) - for filename, mode in files: - path = build_dir / filename - path.write_text(filename + "\n") - path.chmod(mode) - - with WheelWriter(wheel_path) as wf: - wf.write_files_from_directory(build_dir) - - with ZipFile(wheel_path, "r") as zf: + with pytest.raises( + InvalidWheelFilename, + match=rf"^Invalid wheel filename \({reason}\): {basename}$", + ): + WheelWriter(filename, generator="foo") + + def test_write_file(self, wheel_path: Path) -> None: + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + wf.write_file("hello/héllö.py", 'print("Héllö, world!")\n') + wf.write_file("hello/h,ll,.py", 'print("Héllö, world!")\n') + + with ZipFile(wheel_path, "r") as zf: + infolist = zf.infolist() + assert len(infolist) == 4 + assert infolist[0].filename == "hello/héllö.py" + assert infolist[0].file_size == 25 + assert infolist[1].filename == "hello/h,ll,.py" + assert infolist[1].file_size == 25 + assert infolist[2].filename == "test-1.0.dist-info/WHEEL" + assert infolist[3].filename == "test-1.0.dist-info/RECORD" + + record = zf.read("test-1.0.dist-info/RECORD") + assert record.decode("utf-8") == ( + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25\n" + '"hello/h,ll,.py",sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,' + "25\n" + "test-1.0.dist-info/WHEEL," + "sha256=KzXSdMADLwiK8h1P5UAQ76v3nVuO2ZRU8e9GCHCC6Qs,103\n" + "test-1.0.dist-info/RECORD,,\n" + ) + + def test_timestamp( + self, + tmp_path_factory: TempPathFactory, + wheel_path: Path, + monkeypatch: MonkeyPatch, + ) -> None: + # An environment variable can be used to influence the timestamp on + # TarInfo objects inside the zip. See issue #143. + build_dir = tmp_path_factory.mktemp("build") + for filename in ("one", "two", "three"): + build_dir.joinpath(filename).write_text(filename + "\n") + + # The earliest date representable in TarInfos, 1980-01-01 + monkeypatch.setenv("SOURCE_DATE_EPOCH", "315576060") + + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + wf.write_files_from_directory(build_dir) + + with ZipFile(wheel_path, "r") as zf: + for info in zf.infolist(): + assert info.date_time == (1980, 1, 1, 0, 0, 0) + assert info.compress_type == ZIP_DEFLATED + + @pytest.mark.skipif( + sys.platform == "win32", reason="Windows does not support UNIX-like permissions" + ) + def test_attributes( + self, tmp_path_factory: TempPathFactory, wheel_path: Path + ) -> None: + # With the change from ZipFile.write() to .writestr(), we need to manually + # set member attributes. + build_dir = tmp_path_factory.mktemp("build") + files = (("foo", 0o644), ("bar", 0o755)) for filename, mode in files: - info = zf.getinfo(filename) - assert info.external_attr == (mode | 0o100000) << 16 - assert info.compress_type == ZIP_DEFLATED - - info = zf.getinfo("test-1.0.dist-info/RECORD") - permissions = (info.external_attr >> 16) & 0o777 - assert permissions == 0o664 - - -def test_unnormalized_wheel(tmp_path: Path) -> None: - # Previous versions of "wheel" did not correctly normalize the names; test that we - # can still read such wheels - wheel_path = tmp_path / "Test_foo_bar-1.0.0-py3-none-any.whl" - with ZipFile(wheel_path, "w") as zf: - zf.writestr( - "Test_foo_bar-1.0.0.dist-info/RECORD", - "Test_foo_bar-1.0.0.dist-info/RECORD,,\n", - ) - - with WheelReader(wheel_path): - pass + path = build_dir / filename + path.write_text(filename + "\n") + path.chmod(mode) + + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + wf.write_files_from_directory(build_dir) + + with ZipFile(wheel_path, "r") as zf: + for filename, mode in files: + info = zf.getinfo(filename) + assert info.external_attr == (mode | 0o100000) << 16 + assert info.compress_type == ZIP_DEFLATED + + info = zf.getinfo("test-1.0.dist-info/RECORD") + permissions = (info.external_attr >> 16) & 0o777 + assert permissions == 0o664 From bb2efe6cd21bf69c59c935eb22589d18b52dee68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 22 May 2024 13:28:16 -0400 Subject: [PATCH 4/9] Added changelog note and stub documentation --- CHANGELOG.rst | 3 ++- docs/index.rst | 1 + docs/wheelfile.rst | 4 ++++ 3 files changed, 7 insertions(+), 1 deletion(-) create mode 100644 docs/wheelfile.rst diff --git a/CHANGELOG.rst b/CHANGELOG.rst index c0d78f86e..8aedfb6c9 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -4,7 +4,8 @@ Changelog *unreleased* ~~~~~~~~~~~~ -No unreleased changes. +* Added the ``packaging.wheelfile`` module for reading and creating wheel files + (:issue:`697`) 24.0 - 2024-03-10 ~~~~~~~~~~~~~~~~~ diff --git a/docs/index.rst b/docs/index.rst index e658ec086..594dd9ba7 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -29,6 +29,7 @@ The ``packaging`` library uses calendar-based versioning (``YY.N``). requirements metadata tags + wheelfile utils .. toctree:: diff --git a/docs/wheelfile.rst b/docs/wheelfile.rst new file mode 100644 index 000000000..f9d1cd840 --- /dev/null +++ b/docs/wheelfile.rst @@ -0,0 +1,4 @@ +Wheel Files +=========== + +.. currentmodule:: packaging.wheelfile From 46eff6127b6001e0310a2b1195ebf4f9f6ebb75b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 21 Jul 2024 12:43:30 +0300 Subject: [PATCH 5/9] Updated default metadata version to 2.3 --- src/packaging/wheelfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/packaging/wheelfile.py b/src/packaging/wheelfile.py index 8b6bc4d95..2230eb915 100644 --- a/src/packaging/wheelfile.py +++ b/src/packaging/wheelfile.py @@ -451,7 +451,7 @@ def write_metadata(self, items: Iterable[tuple[str, str]]) -> None: msg.add_header(key, value) if "Metadata-Version" not in msg: - msg["Metadata-Version"] = "2.1" + msg["Metadata-Version"] = "2.3" if "Name" not in msg: msg["Name"] = self.metadata.name if "Version" not in msg: From 2d5d033f9301eb4f151cc883f842df8da8ac6f1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 31 Jul 2024 12:14:45 +0300 Subject: [PATCH 6/9] Fixed trailing zeroes missing from version number in make_wheel_filename() --- src/packaging/utils.py | 1 - tests/test_utils.py | 8 -------- 2 files changed, 9 deletions(-) diff --git a/src/packaging/utils.py b/src/packaging/utils.py index c18ebf7fc..72cf90fd6 100644 --- a/src/packaging/utils.py +++ b/src/packaging/utils.py @@ -115,7 +115,6 @@ def make_wheel_filename( raise ValueError("At least one tag is required") name = canonicalize_name(name).replace("-", "_").lower() - version = canonicalize_version(version) filename = f"{name}-{version}" if build_tag: filename = f"{filename}-{build_tag[0]}{build_tag[1]}" diff --git a/tests/test_utils.py b/tests/test_utils.py index ab93da988..733494e35 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -106,14 +106,6 @@ def test_canonicalize_version_no_strip_trailing_zero(version): {Tag("py3", "none", "any")}, id="simple", ), - pytest.param( - "foo-1.4.0-py3-none-any.whl", - "foo", - "1.4.0.0.0", - (), - {Tag("py3", "none", "any")}, - id="longversion", - ), pytest.param( "some_pack_age-1.0-py3-none-any.whl", "some-PACK.AGE", From 50a8631e4b3e71ef8fa7952387b0037ef2624f00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 3 Aug 2024 02:02:14 +0300 Subject: [PATCH 7/9] Lots of bug fixes, refactorings and coverage improvements --- src/packaging/wheelfile.py | 77 ++++++----- tests/test_wheelfile.py | 270 ++++++++++++++++++++++++++++++++++++- 2 files changed, 306 insertions(+), 41 deletions(-) diff --git a/src/packaging/wheelfile.py b/src/packaging/wheelfile.py index 2230eb915..224c80cc5 100644 --- a/src/packaging/wheelfile.py +++ b/src/packaging/wheelfile.py @@ -68,6 +68,9 @@ class WheelContentElement(NamedTuple): size: int stream: IO[bytes] + def __repr__(self) -> str: + return f"{self.__class__.__name__}({str(self.path)!r}, size={self.size!r})" + def _encode_hash_value(hash_value: bytes) -> str: return urlsafe_b64encode(hash_value).rstrip(b"=").decode("ascii") @@ -95,21 +98,28 @@ def __init__( def read(self, amount: int = -1) -> bytes: data = self._fp.read(amount) - if amount and self._record_entry is not None: - if data: - self._hash.update(data) - self._num_bytes_read += len(data) - elif self._record_entry: - # The file has been read in full – check that hash and file size match - # with the entry in RECORD - if self._hash.digest() != self._record_entry.hash_value: - raise WheelError(f"Hash mismatch for file {self._arcname!r}") - elif self._num_bytes_read != self._record_entry.filesize: - raise WheelError( - f"{self._arcname}: file size mismatch: " - f"{self._record_entry.filesize} bytes in RECORD, " - f"{self._num_bytes_read} bytes in archive" - ) + if self._record_entry is None: + return data + + if data: + self._hash.update(data) + self._num_bytes_read += len(data) + + if amount < 0 or len(data) < amount: + # The file has been read in full – check that hash and file size match + # with the entry in RECORD + if self._num_bytes_read != self._record_entry.filesize: + raise WheelError( + f"{self._arcname}: file size mismatch: " + f"{self._record_entry.filesize} bytes in RECORD, " + f"{self._num_bytes_read} bytes in archive" + ) + elif self._hash.digest() != self._record_entry.hash_value: + raise WheelError( + f"{self._arcname}: hash mismatch: " + f"{self._record_entry.hash_value.hex()} in RECORD, " + f"{self._hash.hexdigest()} in archive" + ) return data @@ -125,7 +135,7 @@ def __exit__( self._fp.close() def __repr__(self) -> str: - return f"{self.__class__.__name__}({self._fp!r}, {self._arcname!r})" + return f"{self.__class__.__name__}({self._arcname!r})" class WheelReader: @@ -270,7 +280,7 @@ def read_dist_info(self, filename: str) -> str: return contents.decode("utf-8") - def get_contents(self) -> Iterator[WheelContentElement]: + def iterate_contents(self) -> Iterator[WheelContentElement]: for fname, entry in self._record_entries.items(): with self._zip.open(fname, "r") as stream: yield WheelContentElement( @@ -285,17 +295,10 @@ def validate_record(self) -> None: if basename in _exclude_filenames: continue - try: - record = self._record_entries[zinfo.filename] - except KeyError: - raise WheelError(f"No hash found for file {zinfo.filename!r}") from None - - hash_ = hashlib.new(record.hash_algorithm) - with self._zip.open(zinfo) as fp: - hash_.update(fp.read(65536)) - - if hash_.digest() != record.hash_value: - raise WheelError(f"Hash mismatch for file {zinfo.filename!r}") + with self.open(zinfo.filename) as fp: + while True: + if not fp.read(65536): + break def extractall(self, base_path: str | PathLike[str]) -> None: basedir = Path(base_path) @@ -307,7 +310,7 @@ def extractall(self, base_path: str | PathLike[str]) -> None: for fname in self._zip.namelist(): target_path = basedir.joinpath(fname) target_path.parent.mkdir(0o755, True, True) - with self._open_file(fname) as infile, target_path.open("wb") as outfile: + with self.open(fname) as infile, target_path.open("wb") as outfile: while True: data = infile.read(65536) if not data: @@ -315,19 +318,22 @@ def extractall(self, base_path: str | PathLike[str]) -> None: outfile.write(data) - def _open_file(self, archive_name: str) -> WheelArchiveFile: + def open(self, archive_name: str) -> WheelArchiveFile: basename = os.path.basename(archive_name) if basename in _exclude_filenames: record_entry = None else: - record_entry = self._record_entries[archive_name] + try: + record_entry = self._record_entries[archive_name] + except KeyError: + raise WheelError(f"No hash found for file {archive_name!r}") from None return WheelArchiveFile( self._zip.open(archive_name), archive_name, record_entry ) def read_file(self, archive_name: str) -> bytes: - with self._open_file(archive_name) as fp: + with self.open(archive_name) as fp: return fp.read() def read_data_file(self, filename: str) -> bytes: @@ -446,7 +452,7 @@ def write_metadata(self, items: Iterable[tuple[str, str]]) -> None: for key, value in items: key = key.title() if key == "Description": - msg.set_payload(value, "utf-8") + msg.set_payload(value.encode("utf-8")) else: msg.add_header(key, value) @@ -541,4 +547,7 @@ def write_distinfo_file( self.write_file(archive_path, contents, timestamp=timestamp) def __repr__(self) -> str: - return f"{self.__class__.__name__}({self.path_or_fd!r})" + return ( + f"{self.__class__.__name__}({self.path_or_fd}, " + f"generator={self.generator!r})" + ) diff --git a/tests/test_wheelfile.py b/tests/test_wheelfile.py index cd23a2948..4b1b7389f 100644 --- a/tests/test_wheelfile.py +++ b/tests/test_wheelfile.py @@ -2,7 +2,9 @@ import os.path import sys -from pathlib import Path +from io import BytesIO +from pathlib import Path, PurePath +from textwrap import dedent from zipfile import ZIP_DEFLATED, ZipFile import pytest @@ -18,10 +20,45 @@ def wheel_path(tmp_path: Path) -> Path: class TestWheelReader: + @pytest.fixture(scope="class") + def valid_wheel(self, tmp_path_factory: TempPathFactory) -> Path: + path = tmp_path_factory.mktemp("reader") / "test-1.0-py2.py3-none-any.whl" + with ZipFile(path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + return path + + def test_properties(self, valid_wheel: Path) -> None: + with WheelReader(valid_wheel) as reader: + assert reader.dist_info_dir == "test-1.0.dist-info" + assert reader.data_dir == "test-1.0.data" + assert reader.dist_info_filenames == [PurePath("test-1.0.dist-info/RECORD")] + def test_bad_wheel_filename(self) -> None: with pytest.raises(WheelError, match="Invalid wheel filename"): WheelReader("badname") + def test_str_filename(self, valid_wheel: Path) -> None: + reader = WheelReader(str(valid_wheel)) + assert reader.path_or_fd == str(valid_wheel) + + def test_pathlike_filename(self, valid_wheel: Path) -> None: + class Foo: + def __fspath__(self) -> str: + return str(valid_wheel) + + foo = Foo() + with WheelReader(foo) as reader: + assert reader.path_or_fd is foo + + def test_pass_open_file(self, valid_wheel: Path) -> None: + with valid_wheel.open("rb") as fp, WheelReader(fp) as reader: + assert reader.path_or_fd is fp + def test_missing_record(self, wheel_path: Path) -> None: with ZipFile(wheel_path, "w") as zf: zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') @@ -97,7 +134,7 @@ def test_validate_record( with WheelReader(wheel_path) as wf: wf.validate_record() - def test_testzip_missing_hash(self, wheel_path: Path) -> None: + def test_validate_record_missing_hash(self, wheel_path: Path) -> None: with ZipFile(wheel_path, "w") as zf: zf.writestr("hello/héllö.py", 'print("Héllö, world!")\n') zf.writestr("test-1.0.dist-info/RECORD", "") @@ -116,7 +153,13 @@ def test_validate_record_bad_hash(self, wheel_path: Path) -> None: with WheelReader(wheel_path) as wf: exc = pytest.raises(WheelError, wf.validate_record) - exc.match("^Hash mismatch for file 'hello/héllö.py'$") + exc.match( + "hello/héllö.py: hash mismatch: " + "6eff9057745c8900b6bf7ccbf14be177f6aba78d09e40f719b2b9b377e0e570a in " + "RECORD, " + "1eac82375d38fdb8a4c653c6c2b3c363058d5c193cf24bafcd1df040d344597e in " + "archive$" + ) def test_unnormalized_wheel(self, tmp_path: Path) -> None: # Previous versions of "wheel" did not correctly normalize the names; test that @@ -131,6 +174,131 @@ def test_unnormalized_wheel(self, tmp_path: Path) -> None: with WheelReader(wheel_path): pass + def test_read_file(self, valid_wheel: Path) -> None: + with WheelReader(valid_wheel) as wf: + contents = wf.read_file("hello/héllö.py") + + assert contents == b'print("H\xc3\xa9ll\xc3\xb6, world!")\n' + + @pytest.mark.parametrize( + "amount", + [ + pytest.param(-1, id="oneshot"), + pytest.param(2, id="gradual"), + ], + ) + def test_read_file_bad_hash(self, wheel_path: Path, amount: int) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + with pytest.raises( + WheelError, + match=( + "^hello/héllö.py: hash mismatch: " + "6eff9057745c8900b6bf7ccbf14be177f6aba78d09e40f719b2b9b377e0e570a in " + "RECORD, " + "1eac82375d38fdb8a4c653c6c2b3c363058d5c193cf24bafcd1df040d344597e in " + "archive$" + ), + ), WheelReader(wheel_path) as wf, wf.open("hello/héllö.py") as f: + assert repr(f) == "WheelArchiveFile('hello/héllö.py')" + while f.read(amount): + pass + + @pytest.mark.parametrize( + "amount", + [ + pytest.param(-1, id="oneshot"), + pytest.param(2, id="gradual"), + ], + ) + def test_read_file_bad_size(self, wheel_path: Path, amount: int) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("hello/héllö.py", 'print("Héllö, w0rld!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,24", + ) + + with pytest.raises( + WheelError, + match=( + "^hello/héllö.py: file size mismatch: 24 bytes in RECORD, 25 bytes in " + "archive$" + ), + ), WheelReader(wheel_path) as wf, wf.open("hello/héllö.py") as f: + while f.read(amount): + pass + + def test_read_data_file(self, wheel_path: Path) -> None: + with ZipFile(wheel_path, "w") as zf: + zf.writestr("test-1.0.data/héllö.py", 'print("Héllö, world!")\n') + zf.writestr( + "test-1.0.dist-info/RECORD", + "test-1.0.data/héllö.py," + "sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25", + ) + + with WheelReader(wheel_path) as wf: + contents = wf.read_data_file("héllö.py") + + assert contents == b'print("H\xc3\xa9ll\xc3\xb6, world!")\n' + + def test_read_distinfo_file(self, valid_wheel: Path) -> None: + with WheelReader(valid_wheel) as wf: + contents = wf.read_distinfo_file("RECORD") + + assert ( + contents == b"hello/h\xc3\xa9ll\xc3\xb6.py," + b"sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25" + ) + + def test_iterate_contents(self, valid_wheel: Path) -> None: + with WheelReader(valid_wheel) as wf: + for element in wf.iterate_contents(): + assert element.path == PurePath("hello", "héllö.py") + assert element.size == 25 + assert ( + element.hash_value.hex() + == "6eff9057745c8900b6bf7ccbf14be177f6aba78d09e40f719b2b9b377e0e570" + "a" + ) + assert ( + element.stream.read() == b'print("H\xc3\xa9ll\xc3\xb6, world!")\n' + ) + assert repr(element) == "WheelContentElement('hello/héllö.py', size=25)" + + def test_extractall( + self, valid_wheel: Path, tmp_path_factory: TempPathFactory + ) -> None: + dest_dir = tmp_path_factory.mktemp("wheel_contents") + with WheelReader(valid_wheel) as wf: + wf.extractall(dest_dir) + + iterator = os.walk(dest_dir) + dirpath, dirnames, filenames = next(iterator) + assert dirnames == ["hello", "test-1.0.dist-info"] + assert not filenames + + dirpath, dirnames, filenames = next(iterator) + assert dirpath.endswith("hello") + assert filenames == ["héllö.py"] + assert ( + Path(dirpath).joinpath(filenames[0]).read_text() + == 'print("Héllö, world!")\n' + ) + + dirpath, dirnames, filenames = next(iterator) + assert dirpath.endswith("test-1.0.dist-info") + assert filenames == ["RECORD"] + assert Path(dirpath).joinpath(filenames[0]).read_text() == ( + "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25" + ) + class TestWheelWriter: @pytest.mark.parametrize( @@ -162,31 +330,90 @@ def test_bad_wheel_filename(self, filename: str, reason: str) -> None: ): WheelWriter(filename, generator="foo") - def test_write_file(self, wheel_path: Path) -> None: + def test_unavailable_hash_algorithm(self, wheel_path: Path) -> None: + with pytest.raises( + ValueError, + match=r"^Hash algorithm 'sha000' is not available$", + ): + WheelWriter(wheel_path, generator="generator 1.0", hash_algorithm="sha000") + + @pytest.mark.parametrize( + "algorithm", + [ + pytest.param("md5"), + pytest.param("sha1"), + ], + ) + def test_weak_hash_algorithm(self, wheel_path: Path, algorithm: str) -> None: + with pytest.raises( + ValueError, + match=rf"^Weak hash algorithm \({algorithm}\) is not permitted by PEP 427$", + ): + WheelWriter(wheel_path, generator="generator 1.0", hash_algorithm=algorithm) + + def test_write_files(self, wheel_path: Path) -> None: with WheelWriter(wheel_path, generator="generator 1.0") as wf: wf.write_file("hello/héllö.py", 'print("Héllö, world!")\n') wf.write_file("hello/h,ll,.py", 'print("Héllö, world!")\n') + wf.write_data_file("mydata.txt", "Dummy") + wf.write_distinfo_file("LICENSE.txt", "License text") with ZipFile(wheel_path, "r") as zf: infolist = zf.infolist() - assert len(infolist) == 4 + assert len(infolist) == 6 assert infolist[0].filename == "hello/héllö.py" assert infolist[0].file_size == 25 assert infolist[1].filename == "hello/h,ll,.py" assert infolist[1].file_size == 25 - assert infolist[2].filename == "test-1.0.dist-info/WHEEL" - assert infolist[3].filename == "test-1.0.dist-info/RECORD" + assert infolist[2].filename == "test-1.0.data/mydata.txt" + assert infolist[2].file_size == 5 + assert infolist[3].filename == "test-1.0.dist-info/LICENSE.txt" + assert infolist[4].filename == "test-1.0.dist-info/WHEEL" + assert infolist[5].filename == "test-1.0.dist-info/RECORD" record = zf.read("test-1.0.dist-info/RECORD") assert record.decode("utf-8") == ( "hello/héllö.py,sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,25\n" '"hello/h,ll,.py",sha256=bv-QV3RciQC2v3zL8Uvhd_arp40J5A9xmyubN34OVwo,' "25\n" + "test-1.0.data/mydata.txt," + "sha256=0mB6s81UJCwa14-jUFK6fIqv1PR4FQPyJ0wxBjqF9WA,5\n" + "test-1.0.dist-info/LICENSE.txt," + "sha256=Bk_bWStYk3YYSmcUeZRgnr3cqIs1oJW485Zb_XBvOgM,12\n" "test-1.0.dist-info/WHEEL," "sha256=KzXSdMADLwiK8h1P5UAQ76v3nVuO2ZRU8e9GCHCC6Qs,103\n" "test-1.0.dist-info/RECORD,,\n" ) + def test_write_metadata(self, wheel_path: Path) -> None: + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + wf.write_metadata( + [ + ("Foo", "Bar"), + ("Description", "Long description\nspanning\nthree rows"), + ] + ) + + with ZipFile(wheel_path, "r") as zf: + infolist = zf.infolist() + assert len(infolist) == 3 + assert infolist[0].filename == "test-1.0.dist-info/METADATA" + assert infolist[1].filename == "test-1.0.dist-info/WHEEL" + assert infolist[2].filename == "test-1.0.dist-info/RECORD" + + metadata = zf.read("test-1.0.dist-info/METADATA") + assert metadata.decode("utf-8") == dedent( + """\ + Foo: Bar + Metadata-Version: 2.3 + Name: test + Version: 1.0 + + Long description + spanning + three rows""" + ) + def test_timestamp( self, tmp_path_factory: TempPathFactory, @@ -237,3 +464,32 @@ def test_attributes( info = zf.getinfo("test-1.0.dist-info/RECORD") permissions = (info.external_attr >> 16) & 0o777 assert permissions == 0o664 + + def test_write_file_from_bytesio(self, wheel_path: Path) -> None: + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + buffer = BytesIO(b"test content") + wf.write_file("test", buffer) + + with ZipFile(wheel_path, "r") as zf: + assert zf.open("test", "r").read() == b"test content" + + def test_write_files_from_dir_source_nonexistent( + self, wheel_path: Path, tmp_path: Path + ) -> None: + source_dir = tmp_path / "nonexistent" + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + with pytest.raises(WheelError, match=f"{source_dir} does not exist"): + wf.write_files_from_directory(source_dir) + + def test_write_files_from_dir_source_not_dir( + self, wheel_path: Path, tmp_path: Path + ) -> None: + source_dir = tmp_path / "file" + source_dir.touch() + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + with pytest.raises(WheelError, match=f"{source_dir} is not a directory"): + wf.write_files_from_directory(source_dir) + + def test_repr(self, wheel_path: Path) -> None: + with WheelWriter(wheel_path, generator="generator 1.0") as wf: + assert repr(wf) == f"WheelWriter({wheel_path}, generator='generator 1.0')" From 4e7b0142347771fce93adfdd0f456dbe4d8908ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 27 Apr 2025 01:08:20 +0300 Subject: [PATCH 8/9] Fixed Ruff error --- src/packaging/wheelfile.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/packaging/wheelfile.py b/src/packaging/wheelfile.py index 224c80cc5..4d38c2d09 100644 --- a/src/packaging/wheelfile.py +++ b/src/packaging/wheelfile.py @@ -1,14 +1,14 @@ from __future__ import annotations __all__ = [ - "WheelMetadata", - "WheelRecordEntry", + "WheelArchiveFile", "WheelContentElement", "WheelError", - "WheelArchiveFile", + "WheelMetadata", "WheelReader", - "write_wheelfile", + "WheelRecordEntry", "WheelWriter", + "write_wheelfile", ] import csv From 3d9afa9145f4a7c7a35f4c18d471a887866ac212 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 27 Apr 2025 12:07:24 +0300 Subject: [PATCH 9/9] Fixed test failures --- tests/test_wheelfile.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_wheelfile.py b/tests/test_wheelfile.py index 4b1b7389f..549071697 100644 --- a/tests/test_wheelfile.py +++ b/tests/test_wheelfile.py @@ -281,6 +281,7 @@ def test_extractall( iterator = os.walk(dest_dir) dirpath, dirnames, filenames = next(iterator) + dirnames.sort() assert dirnames == ["hello", "test-1.0.dist-info"] assert not filenames @@ -326,7 +327,7 @@ def test_bad_wheel_filename(self, filename: str, reason: str) -> None: ) with pytest.raises( InvalidWheelFilename, - match=rf"^Invalid wheel filename \({reason}\): {basename}$", + match=rf"^Invalid wheel filename \({reason}\): {basename!r}$", ): WheelWriter(filename, generator="foo")