Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `print()` method now supports the `file` argument
- `as_json()` and `as_text()` methods

## Fixed
- `SubFile` methods `readinto`, `readline`, and `readlines` now terminate at the end of the subfile


## [0.5.1] - 2025-12-17

Expand Down
31 changes: 23 additions & 8 deletions jbpy/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,41 @@ def read(self, size: int = -1) -> bytes:
self._pos += len(data)
return data

def readinto(self, b) -> None:
def readinto(self, b) -> int | None:
if self._pos >= self._length:
return 0
self._file.seek(self._start + self._pos)
num_read = self._file.readinto(b)
bytes_remaining = self._length - self._pos
v = memoryview(b)
num_read = self._file.readinto(v[:bytes_remaining])
if num_read is not None:
self._pos += num_read
return num_read

def readline(self, size=-1) -> bytes:
if self._pos >= self._length:
return b""
self._file.seek(self._start + self._pos)
data = self._file.readline(size)
bytes_remaining = self._length - self._pos
_sz = bytes_remaining if size == -1 else min(bytes_remaining, size)
data = self._file.readline(_sz)
self._pos += len(data)
return data

def readlines(self, hint=-1) -> list[bytes]:
if self._pos >= self._length:
return []
self._file.seek(self._start + self._pos)
before = self._file.tell()
data = self._file.readlines(hint)
after = self._file.tell()
self._pos += after - before
return data
line = self.readline()
n = len(line)
lines = [line]

while line and (n < hint or (hint <= 0 or hint is None)):
line = self.readline()
if line:
lines.append(line)
n += len(line)
return lines

def readable(self) -> bool:
return self._file.readable()
Expand Down
122 changes: 93 additions & 29 deletions test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,11 +754,8 @@ def test_as_filelike(tmp_path):
def test_subfile(tmp_path):
filename = tmp_path / "random.bin"
all_data = bytearray(
"".join(random.choices(string.ascii_letters + string.digits, k=1000)).encode()
"".join(random.choices(string.ascii_letters + string.digits, k=50)).encode()
)
all_data[500] = ord("\n")
all_data[550] = ord("\n")
all_data[600] = ord("\n")
filename.write_bytes(all_data)

with filename.open("rb") as file:
Expand Down Expand Up @@ -787,30 +784,97 @@ def test_subfile(tmp_path):
subfile.seek(length + 100)
assert subfile.read() == b""

expected_pos = 100
subfile.seek(expected_pos)
ba = bytearray(50)
assert subfile.readinto(ba) == len(ba)
assert ba == all_data[start + expected_pos : start + expected_pos + len(ba)]
assert subfile.readable()

expected_pos = 400
subfile.seek(expected_pos)
assert (
subfile.readline(3)
== all_data[start + expected_pos : start + expected_pos + 3]
)
assert subfile.readline() == all_data[start + expected_pos + 3 : 501]
assert subfile.readlines() == [
all_data[501:551],
all_data[551:601],
all_data[601:],
]

expected_pos = 400
subfile.seek(offset=expected_pos)
assert subfile.readlines(95) == [
all_data[start + expected_pos : 501],
all_data[501:551],
]

assert subfile.readable()
def test_subfile_readinto():
all_data = bytearray(
"".join(random.choices(string.ascii_letters + string.digits, k=1000)).encode()
)
start = 400
length = 400
f_all = io.BytesIO(all_data)
f_io = io.BytesIO(all_data[start : start + length])
f_jb = jbpy.core.SubFile(f_all, start, length)

# read small piece
piece_size = 50
b_io = bytearray(piece_size)
b_jb = bytearray(piece_size)
assert f_jb.readinto(b_jb) == f_io.readinto(b_io)
assert b_jb == b_io

# read rest of subfile into too large of a buffer
b_io += bytearray(length)
b_jb += bytearray(length)
assert f_jb.readinto(b_jb) == f_io.readinto(b_io)
assert (
b_jb[: length - piece_size]
== b_io[: length - piece_size]
== all_data[start + piece_size : start + length]
)

# try to read, but we're at the end
assert f_jb.readinto(b_jb) == f_io.readinto(b_io) == 0


def test_subfile_readline():
all_data = bytearray(
"".join(random.choices(string.ascii_letters + string.digits, k=1000)).encode()
)
newline_positions = (500, 550, 600, 650, 700)
for pos in newline_positions:
all_data[pos] = ord("\n")

start = 400
length = 400
f_all = io.BytesIO(all_data)
f_io = io.BytesIO(all_data[start : start + length])
f_jb = jbpy.core.SubFile(f_all, start, length)

# Hit size limit, don't read full line
assert f_jb.readline(3) == f_io.readline(3) == all_data[start : start + 3]
# Set limit way too high, hit first newline
assert (
f_jb.readline(3 * length)
== f_io.readline(3 * length)
== all_data[start + 3 : 501]
)
# Read the rest of the lines
for _ in range(len(newline_positions)):
assert f_jb.readline() == (b := f_io.readline())
assert len(b)
# Try to read but we're at the end
assert f_jb.readline() == f_io.readline() == b""
assert f_jb.tell() == f_io.tell() == length


def test_subfile_readlines():
all_data = bytearray(
"".join(random.choices(string.ascii_letters + string.digits, k=1000)).encode()
)
newline_positions = (500, 550, 600, 650, 700)
for pos in newline_positions:
all_data[pos] = ord("\n")

start = 400
length = 400
f_all = io.BytesIO(all_data)
f_io = io.BytesIO(all_data[start : start + length])
f_jb = jbpy.core.SubFile(f_all, start, length)

# Hit size limit, only read first line
assert (
f_jb.readlines(1)
== f_io.readlines(1)
== [all_data[start : newline_positions[0] + 1]]
)
# Hit size limit, read next two lines
assert f_jb.readlines(70) == (l_io := f_io.readlines(70))
assert len(l_io) == 2
# Read the rest
assert f_jb.readlines() == (l_io := f_io.readlines())
assert len(l_io) == 3
# Try to read but we're at the end
assert f_jb.readlines() == f_io.readlines() == []
assert f_jb.tell() == f_io.tell() == length