diff --git a/pylabrobot/io/binary.py b/pylabrobot/io/binary.py new file mode 100644 index 00000000000..29683556438 --- /dev/null +++ b/pylabrobot/io/binary.py @@ -0,0 +1,221 @@ +"""Primitive byte de/serialization. Nice wrapper around struct packing/unpacking. + +This module provides low-level byte serialization/deserialization without any +protocol-specific wrapping. + +Example: + # Writing + data = Writer().u8(1).u16(100).string("test").finish() + + # Reading + reader = Reader(data) + val1 = reader.u8() + val2 = reader.u16() + val3 = reader.string() +""" + +from __future__ import annotations + +import struct +from io import BytesIO + + +class Writer: + """Raw byte writer. + + Provides fluent interface for building byte sequences. All integers use little-endian encoding. + """ + + def __init__(self): + self._buffer = BytesIO() + + def u8(self, value: int) -> "Writer": + """Write unsigned 8-bit integer (0-255).""" + self._buffer.write(struct.pack(" "Writer": + """Write unsigned 16-bit integer (little-endian).""" + self._buffer.write(struct.pack(" "Writer": + """Write unsigned 32-bit integer (little-endian).""" + self._buffer.write(struct.pack(" "Writer": + """Write unsigned 64-bit integer (little-endian).""" + self._buffer.write(struct.pack(" "Writer": + """Write signed 8-bit integer (-128 to 127).""" + self._buffer.write(struct.pack(" "Writer": + """Write signed 16-bit integer (little-endian).""" + self._buffer.write(struct.pack(" "Writer": + """Write signed 32-bit integer (little-endian).""" + self._buffer.write(struct.pack(" "Writer": + """Write signed 64-bit integer (little-endian).""" + self._buffer.write(struct.pack(" "Writer": + """Write 32-bit float (little-endian).""" + self._buffer.write(struct.pack(" "Writer": + """Write 64-bit double (little-endian).""" + self._buffer.write(struct.pack(" "Writer": + """Write null-terminated UTF-8 string.""" + self._buffer.write(value.encode("utf-8")) + self._buffer.write(b"\x00") + return self + + def raw_bytes(self, value: bytes) -> "Writer": + """Write raw bytes.""" + self._buffer.write(value) + return self + + def finish(self) -> bytes: + """Return the built byte sequence.""" + return self._buffer.getvalue() + + +class Reader: + """Raw byte reader. + + Reads primitive values from byte sequences. All integers use little-endian encoding. + """ + + def __init__(self, data: bytes): + self._data = data + self._offset = 0 + + def u8(self) -> int: + """Read unsigned 8-bit integer.""" + if self._offset + 1 > len(self._data): + raise ValueError(f"Not enough data for u8 at offset {self._offset}") + value: int = struct.unpack(" int: + """Read unsigned 16-bit integer (little-endian).""" + if self._offset + 2 > len(self._data): + raise ValueError(f"Not enough data for u16 at offset {self._offset}") + value: int = struct.unpack(" int: + """Read unsigned 32-bit integer (little-endian).""" + if self._offset + 4 > len(self._data): + raise ValueError(f"Not enough data for u32 at offset {self._offset}") + value: int = struct.unpack(" int: + """Read unsigned 64-bit integer (little-endian).""" + if self._offset + 8 > len(self._data): + raise ValueError(f"Not enough data for u64 at offset {self._offset}") + value: int = struct.unpack(" int: + """Read signed 8-bit integer.""" + if self._offset + 1 > len(self._data): + raise ValueError(f"Not enough data for i8 at offset {self._offset}") + value: int = struct.unpack(" int: + """Read signed 16-bit integer (little-endian).""" + if self._offset + 2 > len(self._data): + raise ValueError(f"Not enough data for i16 at offset {self._offset}") + value: int = struct.unpack(" int: + """Read signed 32-bit integer (little-endian).""" + if self._offset + 4 > len(self._data): + raise ValueError(f"Not enough data for i32 at offset {self._offset}") + value: int = struct.unpack(" int: + """Read signed 64-bit integer (little-endian).""" + if self._offset + 8 > len(self._data): + raise ValueError(f"Not enough data for i64 at offset {self._offset}") + value: int = struct.unpack(" float: + """Read 32-bit float (little-endian).""" + if self._offset + 4 > len(self._data): + raise ValueError(f"Not enough data for f32 at offset {self._offset}") + value: float = struct.unpack(" float: + """Read 64-bit double (little-endian).""" + if self._offset + 8 > len(self._data): + raise ValueError(f"Not enough data for f64 at offset {self._offset}") + value: float = struct.unpack(" str: + """Read null-terminated UTF-8 string.""" + # Find null terminator + null_pos = self._data.find(b"\x00", self._offset) + if null_pos == -1: + raise ValueError(f"No null terminator found for string at offset {self._offset}") + + # Extract string (excluding null terminator) + string_bytes = self._data[self._offset : null_pos] + self._offset = null_pos + 1 # Move past null terminator + + return string_bytes.decode("utf-8") + + def raw_bytes(self, n: int) -> bytes: + """Read n raw bytes.""" + if self._offset + n > len(self._data): + raise ValueError(f"Not enough data for {n} bytes at offset {self._offset}") + value = self._data[self._offset : self._offset + n] + self._offset += n + return value + + def remaining(self) -> bytes: + """Return all remaining unread bytes.""" + remaining = self._data[self._offset :] + self._offset = len(self._data) + return remaining + + def has_remaining(self) -> bool: + """Check if there are unread bytes.""" + return self._offset < len(self._data) + + def offset(self) -> int: + """Get current read offset.""" + return self._offset