diff --git a/CHANGELOG.md b/CHANGELOG.md index fd82c5e..fa66f48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `print()` method now supports the `file` argument - `as_json()` and `as_text()` methods +## Fixed +- `SubFile` methods `readinto`, `readline`, and `readlines` now terminate at the end of the subfile + ## [0.5.1] - 2025-12-17 diff --git a/jbpy/core.py b/jbpy/core.py index f696e2d..a0207a4 100644 --- a/jbpy/core.py +++ b/jbpy/core.py @@ -124,26 +124,41 @@ def read(self, size: int = -1) -> bytes: self._pos += len(data) return data - def readinto(self, b) -> None: + def readinto(self, b) -> int | None: + if self._pos >= self._length: + return 0 self._file.seek(self._start + self._pos) - num_read = self._file.readinto(b) + bytes_remaining = self._length - self._pos + v = memoryview(b) + num_read = self._file.readinto(v[:bytes_remaining]) if num_read is not None: self._pos += num_read return num_read def readline(self, size=-1) -> bytes: + if self._pos >= self._length: + return b"" self._file.seek(self._start + self._pos) - data = self._file.readline(size) + bytes_remaining = self._length - self._pos + _sz = bytes_remaining if size == -1 else min(bytes_remaining, size) + data = self._file.readline(_sz) self._pos += len(data) return data def readlines(self, hint=-1) -> list[bytes]: + if self._pos >= self._length: + return [] self._file.seek(self._start + self._pos) - before = self._file.tell() - data = self._file.readlines(hint) - after = self._file.tell() - self._pos += after - before - return data + line = self.readline() + n = len(line) + lines = [line] + + while line and (n < hint or (hint <= 0 or hint is None)): + line = self.readline() + if line: + lines.append(line) + n += len(line) + return lines def readable(self) -> bool: return self._file.readable() diff --git a/test/test_core.py b/test/test_core.py index 1c9b7a0..e0d8921 100644 --- a/test/test_core.py +++ b/test/test_core.py @@ -754,11 +754,8 @@ def test_as_filelike(tmp_path): def test_subfile(tmp_path): filename = tmp_path / "random.bin" all_data = bytearray( - "".join(random.choices(string.ascii_letters + string.digits, k=1000)).encode() + "".join(random.choices(string.ascii_letters + string.digits, k=50)).encode() ) - all_data[500] = ord("\n") - all_data[550] = ord("\n") - all_data[600] = ord("\n") filename.write_bytes(all_data) with filename.open("rb") as file: @@ -787,30 +784,97 @@ def test_subfile(tmp_path): subfile.seek(length + 100) assert subfile.read() == b"" - expected_pos = 100 - subfile.seek(expected_pos) - ba = bytearray(50) - assert subfile.readinto(ba) == len(ba) - assert ba == all_data[start + expected_pos : start + expected_pos + len(ba)] + assert subfile.readable() - expected_pos = 400 - subfile.seek(expected_pos) - assert ( - subfile.readline(3) - == all_data[start + expected_pos : start + expected_pos + 3] - ) - assert subfile.readline() == all_data[start + expected_pos + 3 : 501] - assert subfile.readlines() == [ - all_data[501:551], - all_data[551:601], - all_data[601:], - ] - - expected_pos = 400 - subfile.seek(offset=expected_pos) - assert subfile.readlines(95) == [ - all_data[start + expected_pos : 501], - all_data[501:551], - ] - assert subfile.readable() +def test_subfile_readinto(): + all_data = bytearray( + "".join(random.choices(string.ascii_letters + string.digits, k=1000)).encode() + ) + start = 400 + length = 400 + f_all = io.BytesIO(all_data) + f_io = io.BytesIO(all_data[start : start + length]) + f_jb = jbpy.core.SubFile(f_all, start, length) + + # read small piece + piece_size = 50 + b_io = bytearray(piece_size) + b_jb = bytearray(piece_size) + assert f_jb.readinto(b_jb) == f_io.readinto(b_io) + assert b_jb == b_io + + # read rest of subfile into too large of a buffer + b_io += bytearray(length) + b_jb += bytearray(length) + assert f_jb.readinto(b_jb) == f_io.readinto(b_io) + assert ( + b_jb[: length - piece_size] + == b_io[: length - piece_size] + == all_data[start + piece_size : start + length] + ) + + # try to read, but we're at the end + assert f_jb.readinto(b_jb) == f_io.readinto(b_io) == 0 + + +def test_subfile_readline(): + all_data = bytearray( + "".join(random.choices(string.ascii_letters + string.digits, k=1000)).encode() + ) + newline_positions = (500, 550, 600, 650, 700) + for pos in newline_positions: + all_data[pos] = ord("\n") + + start = 400 + length = 400 + f_all = io.BytesIO(all_data) + f_io = io.BytesIO(all_data[start : start + length]) + f_jb = jbpy.core.SubFile(f_all, start, length) + + # Hit size limit, don't read full line + assert f_jb.readline(3) == f_io.readline(3) == all_data[start : start + 3] + # Set limit way too high, hit first newline + assert ( + f_jb.readline(3 * length) + == f_io.readline(3 * length) + == all_data[start + 3 : 501] + ) + # Read the rest of the lines + for _ in range(len(newline_positions)): + assert f_jb.readline() == (b := f_io.readline()) + assert len(b) + # Try to read but we're at the end + assert f_jb.readline() == f_io.readline() == b"" + assert f_jb.tell() == f_io.tell() == length + + +def test_subfile_readlines(): + all_data = bytearray( + "".join(random.choices(string.ascii_letters + string.digits, k=1000)).encode() + ) + newline_positions = (500, 550, 600, 650, 700) + for pos in newline_positions: + all_data[pos] = ord("\n") + + start = 400 + length = 400 + f_all = io.BytesIO(all_data) + f_io = io.BytesIO(all_data[start : start + length]) + f_jb = jbpy.core.SubFile(f_all, start, length) + + # Hit size limit, only read first line + assert ( + f_jb.readlines(1) + == f_io.readlines(1) + == [all_data[start : newline_positions[0] + 1]] + ) + # Hit size limit, read next two lines + assert f_jb.readlines(70) == (l_io := f_io.readlines(70)) + assert len(l_io) == 2 + # Read the rest + assert f_jb.readlines() == (l_io := f_io.readlines()) + assert len(l_io) == 3 + # Try to read but we're at the end + assert f_jb.readlines() == f_io.readlines() == [] + assert f_jb.tell() == f_io.tell() == length