diff --git a/docs/user_guide/02_analytical/plate-reading/plate-reading.ipynb b/docs/user_guide/02_analytical/plate-reading/plate-reading.ipynb index d35458037bd..dd05196bf7a 100644 --- a/docs/user_guide/02_analytical/plate-reading/plate-reading.ipynb +++ b/docs/user_guide/02_analytical/plate-reading/plate-reading.ipynb @@ -4,21 +4,7 @@ "cell_type": "markdown", "id": "39d0c1a5", "metadata": {}, - "source": [ - "# Plate reading\n", - "\n", - "PyLabRobot supports the following plate readers:\n", - "\n", - "```{toctree}\n", - ":maxdepth: 1\n", - "\n", - "bmg-clariostar\n", - "cytation\n", - "synergyh1\n", - "```\n", - "\n", - "This example uses the `PlateReaderChatterboxBackend`. When using a real machine, use the corresponding backend." - ] + "source": "# Plate reading\n\nPyLabRobot supports the following plate readers:\n\n```{toctree}\n:maxdepth: 1\n\nbmg-clariostar\ncytation\nsynergyh1\ntecan-infinite\n```\n\nThis example uses the `PlateReaderChatterboxBackend`. When using a real machine, use the corresponding backend." }, { "cell_type": "code", @@ -432,4 +418,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/docs/user_guide/02_analytical/plate-reading/tecan-infinite.ipynb b/docs/user_guide/02_analytical/plate-reading/tecan-infinite.ipynb new file mode 100644 index 00000000000..87e19bd0bf8 --- /dev/null +++ b/docs/user_guide/02_analytical/plate-reading/tecan-infinite.ipynb @@ -0,0 +1,186 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Tecan Infinite 200 PRO\n", + "\n", + "The Tecan Infinite 200 PRO is a multimode microplate reader that supports absorbance, fluorescence, and luminescence measurements. This backend targets the Infinite \"M\" series (e.g., Infinite 200 PRO M Plex)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pylabrobot.plate_reading import PlateReader\n", + "from pylabrobot.plate_reading.tecan import TecanInfinite200ProBackend" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pr = PlateReader(name=\"PR\", size_x=0, size_y=0, size_z=0, backend=TecanInfinite200ProBackend())\n", + "await pr.setup()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await pr.open()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Before closing, assign a plate to the plate reader. This determines the well positions for measurements." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pylabrobot.resources import Cor_96_wellplate_360ul_Fb\n", + "plate = Cor_96_wellplate_360ul_Fb(name=\"plate\")\n", + "pr.assign_child_resource(plate)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await pr.close()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Absorbance\n", + "\n", + "Read absorbance at a specified wavelength (230-1000 nm)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import matplotlib.pyplot as plt\n", + "\n", + "data = await pr.read_absorbance(wavelength=450)\n", + "plt.imshow(data)\n", + "plt.colorbar(label=\"OD\")\n", + "plt.title(\"Absorbance at 450 nm\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Fluorescence\n", + "\n", + "Read fluorescence with specified excitation and emission wavelengths (230-850 nm) and focal height." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data = await pr.read_fluorescence(\n", + " excitation_wavelength=485,\n", + " emission_wavelength=528,\n", + " focal_height=7.5\n", + ")\n", + "plt.imshow(data)\n", + "plt.colorbar(label=\"RFU\")\n", + "plt.title(\"Fluorescence (Ex: 485 nm, Em: 528 nm)\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Luminescence\n", + "\n", + "Read luminescence with a specified focal height." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "data = await pr.read_luminescence(focal_height=4.5)\n", + "plt.imshow(data)\n", + "plt.colorbar(label=\"RLU\")\n", + "plt.title(\"Luminescence\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Reading specific wells\n", + "\n", + "You can specify a subset of wells to read instead of the entire plate." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "wells = plate.get_items([\"A1\", \"A2\", \"B1\", \"B2\"])\n", + "data = await pr.read_absorbance(wavelength=450, wells=wells)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleanup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await pr.stop()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "version": "3.10.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/docs/user_guide/machines.md b/docs/user_guide/machines.md index cc6eee2a739..814b1442240 100644 --- a/docs/user_guide/machines.md +++ b/docs/user_guide/machines.md @@ -159,6 +159,7 @@ tr > td:nth-child(5) { width: 15%; } | Byonoy | Luminescence 96 Automate | luminescence | WIP | [OEM](https://byonoy.com/luminescence-96-automate/) | | Molecular Devices | SpectraMax M5e | absorbancefluorescence time-resolved fluorescencefluorescence polarization | Full | [OEM](https://www.moleculardevices.com/products/microplate-readers/multi-mode-readers/spectramax-m-series-readers) | | Molecular Devices | SpectraMax 384plus | absorbance | Full | [OEM](https://www.moleculardevices.com/products/microplate-readers/absorbance-readers/spectramax-abs-plate-readers) | +| Tecan | Infinite 200 PRO | absorbancefluorescenceluminescence | Mostly | [PLR](https://docs.pylabrobot.org/user_guide/02_analytical/plate-reading/tecan-infinite.html) / [OEM](https://lifesciences.tecan.com/infinite-200-pro) | ### Flow Cytometers diff --git a/pylabrobot/plate_reading/__init__.py b/pylabrobot/plate_reading/__init__.py index 58e3f7fa164..7f5f18f184c 100644 --- a/pylabrobot/plate_reading/__init__.py +++ b/pylabrobot/plate_reading/__init__.py @@ -44,3 +44,4 @@ ImagingResult, Objective, ) +from .tecan import InfiniteScanConfig, TecanInfinite200ProBackend diff --git a/pylabrobot/plate_reading/tecan/__init__.py b/pylabrobot/plate_reading/tecan/__init__.py new file mode 100644 index 00000000000..f4e32fe60c2 --- /dev/null +++ b/pylabrobot/plate_reading/tecan/__init__.py @@ -0,0 +1 @@ +from .infinite_backend import InfiniteScanConfig, TecanInfinite200ProBackend diff --git a/pylabrobot/plate_reading/tecan/infinite_backend.py b/pylabrobot/plate_reading/tecan/infinite_backend.py new file mode 100644 index 00000000000..63c6840fd2b --- /dev/null +++ b/pylabrobot/plate_reading/tecan/infinite_backend.py @@ -0,0 +1,1269 @@ +"""Tecan Infinite 200 PRO backend. + +This backend targets the Infinite "M" series (e.g., Infinite 200 PRO). The +"F" series uses a different optical path and is not covered here. +""" + +from __future__ import annotations + +import asyncio +import logging +import math +import re +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Dict, List, Optional, Sequence, Tuple + +from pylabrobot.io.binary import Reader +from pylabrobot.io.usb import USB +from pylabrobot.plate_reading.backend import PlateReaderBackend +from pylabrobot.resources import Plate +from pylabrobot.resources.well import Well + +logger = logging.getLogger(__name__) +BIN_RE = re.compile(r"^(\d+),BIN:$") + + +@dataclass +class InfiniteScanConfig: + """Scan configuration for Infinite plate readers.""" + + flashes: int = 25 + counts_per_mm_x: float = 1_000 + counts_per_mm_y: float = 1_000 + counts_per_mm_z: float = 1_000 + + +def _integration_microseconds_to_seconds(value: int) -> float: + # DLL/UI indicates integration time is stored in microseconds; UI displays ms by dividing by 1000. + return value / 1_000_000.0 + + +def _is_abs_calibration_len(payload_len: int) -> bool: + return payload_len >= 22 and (payload_len - 4) % 18 == 0 + + +def _is_abs_data_len(payload_len: int) -> bool: + return payload_len >= 14 and (payload_len - 4) % 10 == 0 + + +def _split_payload_and_trailer( + payload_len: int, blob: bytes +) -> Optional[Tuple[bytes, Tuple[int, int]]]: + if len(blob) != payload_len + 4: + return None + payload = blob[:payload_len] + trailer_reader = Reader(blob[payload_len:], little_endian=False) + return payload, (trailer_reader.u16(), trailer_reader.u16()) + + +@dataclass(frozen=True) +class _AbsorbanceCalibrationItem: + ticker_overflows: int + ticker_counter: int + meas_gain: int + meas_dark: int + meas_bright: int + ref_gain: int + ref_dark: int + ref_bright: int + + +@dataclass(frozen=True) +class _AbsorbanceCalibration: + ex: int + items: List[_AbsorbanceCalibrationItem] + + +def _decode_abs_calibration(payload_len: int, blob: bytes) -> Optional[_AbsorbanceCalibration]: + split = _split_payload_and_trailer(payload_len, blob) + if split is None: + return None + payload, _ = split + if len(payload) < 4 + 18: + return None + if (len(payload) - 4) % 18 != 0: + return None + reader = Reader(payload, little_endian=False) + reader.raw_bytes(2) # skip first 2 bytes + ex = reader.u16() + items: List[_AbsorbanceCalibrationItem] = [] + while reader.has_remaining(): + items.append( + _AbsorbanceCalibrationItem( + ticker_overflows=reader.u32(), + ticker_counter=reader.u16(), + meas_gain=reader.u16(), + meas_dark=reader.u16(), + meas_bright=reader.u16(), + ref_gain=reader.u16(), + ref_dark=reader.u16(), + ref_bright=reader.u16(), + ) + ) + return _AbsorbanceCalibration(ex=ex, items=items) + + +def _decode_abs_data( + payload_len: int, blob: bytes +) -> Optional[Tuple[int, int, List[Tuple[int, int]]]]: + split = _split_payload_and_trailer(payload_len, blob) + if split is None: + return None + payload, _ = split + if len(payload) < 4: + return None + reader = Reader(payload, little_endian=False) + label = reader.u16() + ex = reader.u16() + items: List[Tuple[int, int]] = [] + while reader.offset() + 10 <= len(payload): + reader.raw_bytes(6) # skip first 6 bytes of each item + meas = reader.u16() + ref = reader.u16() + items.append((meas, ref)) + if reader.offset() != len(payload): + return None + return label, ex, items + + +def _absorbance_od_calibrated( + cal: _AbsorbanceCalibration, meas_ref_items: List[Tuple[int, int]], od_max: float = 4.0 +) -> float: + if not cal.items: + raise ValueError("ABS calibration packet contained no calibration items.") + + min_corr_trans = math.pow(10.0, -od_max) + + if len(cal.items) == len(meas_ref_items) and len(cal.items) > 1: + corr_trans_vals: List[float] = [] + for (meas, ref), cal_item in zip(meas_ref_items, cal.items): + denom_corr = cal_item.meas_bright - cal_item.meas_dark + if denom_corr == 0: + continue + f_corr = (cal_item.ref_bright - cal_item.ref_dark) / denom_corr + denom = ref - cal_item.ref_dark + if denom == 0: + continue + corr_trans_vals.append(((meas - cal_item.meas_dark) / denom) * f_corr) + if not corr_trans_vals: + raise ZeroDivisionError("ABS invalid: no usable reads after per-read calibration.") + corr_trans = max(sum(corr_trans_vals) / len(corr_trans_vals), min_corr_trans) + return float(-math.log10(corr_trans)) + + cal0 = cal.items[0] + denom_corr = cal0.meas_bright - cal0.meas_dark + if denom_corr == 0: + raise ZeroDivisionError("ABS calibration invalid: meas_bright == meas_dark") + f_corr = (cal0.ref_bright - cal0.ref_dark) / denom_corr + + trans_vals: List[float] = [] + for meas, ref in meas_ref_items: + denom = ref - cal0.ref_dark + if denom == 0: + continue + trans_vals.append((meas - cal0.meas_dark) / denom) + if not trans_vals: + raise ZeroDivisionError("ABS invalid: all ref reads equal ref_dark") + + trans_mean = sum(trans_vals) / len(trans_vals) + corr_trans = max(trans_mean * f_corr, min_corr_trans) + return float(-math.log10(corr_trans)) + + +@dataclass(frozen=True) +class _FluorescenceCalibration: + ex: int + meas_dark: int + ref_dark: int + ref_bright: int + + +def _decode_flr_calibration(payload_len: int, blob: bytes) -> Optional[_FluorescenceCalibration]: + split = _split_payload_and_trailer(payload_len, blob) + if split is None: + return None + payload, _ = split + if len(payload) != 18: + return None + reader = Reader(payload, little_endian=False) + ex = reader.u16() + reader.raw_bytes(8) # skip bytes 2-9 + meas_dark = reader.u16() + reader.raw_bytes(2) # skip bytes 12-13 + ref_dark = reader.u16() + ref_bright = reader.u16() + return _FluorescenceCalibration( + ex=ex, + meas_dark=meas_dark, + ref_dark=ref_dark, + ref_bright=ref_bright, + ) + + +def _decode_flr_data( + payload_len: int, blob: bytes +) -> Optional[Tuple[int, int, int, List[Tuple[int, int]]]]: + split = _split_payload_and_trailer(payload_len, blob) + if split is None: + return None + payload, _ = split + if len(payload) < 6: + return None + reader = Reader(payload, little_endian=False) + label = reader.u16() + ex = reader.u16() + em = reader.u16() + items: List[Tuple[int, int]] = [] + while reader.offset() + 10 <= len(payload): + reader.raw_bytes(6) # skip first 6 bytes of each item + meas = reader.u16() + ref = reader.u16() + items.append((meas, ref)) + if reader.offset() != len(payload): + return None + return label, ex, em, items + + +def _fluorescence_corrected( + cal: _FluorescenceCalibration, meas_ref_items: List[Tuple[int, int]] +) -> int: + if not meas_ref_items: + return 0 + meas_mean = sum(m for m, _ in meas_ref_items) / len(meas_ref_items) + ref_mean = sum(r for _, r in meas_ref_items) / len(meas_ref_items) + denom = ref_mean - cal.ref_dark + if denom == 0: + return 0 + corr = (meas_mean - cal.meas_dark) * (cal.ref_bright - cal.ref_dark) / denom + return int(round(corr)) + + +@dataclass(frozen=True) +class _LuminescenceCalibration: + ref_dark: int + + +def _decode_lum_calibration(payload_len: int, blob: bytes) -> Optional[_LuminescenceCalibration]: + split = _split_payload_and_trailer(payload_len, blob) + if split is None: + return None + payload, _ = split + if len(payload) != 10: + return None + reader = Reader(payload, little_endian=False) + reader.raw_bytes(6) # skip bytes 0-5 + return _LuminescenceCalibration(ref_dark=reader.i32()) + + +def _decode_lum_data(payload_len: int, blob: bytes) -> Optional[Tuple[int, int, List[int]]]: + split = _split_payload_and_trailer(payload_len, blob) + if split is None: + return None + payload, _ = split + if len(payload) < 4: + return None + reader = Reader(payload, little_endian=False) + label = reader.u16() + em = reader.u16() + counts: List[int] = [] + while reader.offset() + 10 <= len(payload): + reader.raw_bytes(6) # skip first 6 bytes of each item + counts.append(reader.i32()) + if reader.offset() != len(payload): + return None + return label, em, counts + + +def _luminescence_intensity( + cal: _LuminescenceCalibration, + counts: List[int], + dark_integration_s: float, + meas_integration_s: float, +) -> int: + if not counts: + return 0 + if dark_integration_s == 0 or meas_integration_s == 0: + return 0 + count_mean = sum(counts) / len(counts) + corrected_rate = (count_mean / meas_integration_s) - (cal.ref_dark / dark_integration_s) + return int(corrected_rate) + + +StagePosition = Tuple[int, int] + + +def _consume_leading_ascii_frame(buffer: bytearray) -> Tuple[bool, Optional[str]]: + """Remove a leading STX...ETX ASCII frame if present.""" + + if not buffer or buffer[0] != 0x02: + return False, None + end = buffer.find(b"\x03", 1) + if end == -1: + return False, None + # Payload is followed by a 4-byte trailer and optional CR. + if len(buffer) < end + 5: + return False, None + text = buffer[1:end].decode("ascii", "ignore") + del buffer[: end + 5] + if buffer and buffer[0] == 0x0D: + del buffer[0] + return True, text + + +def _consume_status_frame(buffer: bytearray, length: int) -> bool: + """Drop a leading ESC-prefixed status frame if present.""" + + if len(buffer) >= length and buffer[0] == 0x1B: + del buffer[:length] + return True + return False + + +@dataclass +class _StreamEvent: + """Parsed stream event (ASCII or binary).""" + + text: Optional[str] = None + payload_len: Optional[int] = None + blob: Optional[bytes] = None + + +class _StreamParser: + """Parse mixed ASCII and binary packets from the reader.""" + + def __init__( + self, + *, + status_frame_len: Optional[int] = None, + allow_bare_ascii: bool = False, + ) -> None: + """Initialize the stream parser.""" + self._buffer = bytearray() + self._pending_bin: Optional[int] = None + self._status_frame_len = status_frame_len + self._allow_bare_ascii = allow_bare_ascii + + def has_pending_bin(self) -> bool: + """Return True if a binary payload length is pending.""" + return self._pending_bin is not None + + def feed(self, chunk: bytes) -> List[_StreamEvent]: + """Feed raw bytes and return newly parsed events.""" + self._buffer.extend(chunk) + events: List[_StreamEvent] = [] + progressed = True + while progressed: + progressed = False + if self._pending_bin is not None: + need = self._pending_bin + 4 + if len(self._buffer) < need: + break + blob = bytes(self._buffer[:need]) + del self._buffer[:need] + events.append(_StreamEvent(payload_len=self._pending_bin, blob=blob)) + self._pending_bin = None + progressed = True + continue + if self._status_frame_len and _consume_status_frame(self._buffer, self._status_frame_len): + progressed = True + continue + consumed, text = _consume_leading_ascii_frame(self._buffer) + if consumed: + events.append(_StreamEvent(text=text)) + if text: + m = BIN_RE.match(text) + if m: + self._pending_bin = int(m.group(1)) + progressed = True + continue + if self._allow_bare_ascii and self._buffer and all(32 <= b <= 126 for b in self._buffer): + text = self._buffer.decode("ascii", "ignore") + self._buffer.clear() + events.append(_StreamEvent(text=text)) + progressed = True + continue + return events + + +class _MeasurementDecoder(ABC): + """Shared incremental decoder for Infinite measurement streams.""" + + STATUS_FRAME_LEN: Optional[int] = None + + def __init__(self, expected: int) -> None: + """Initialize decoder state for a scan with expected measurements.""" + self.expected = expected + self._terminal_seen = False + self._parser = _StreamParser(status_frame_len=self.STATUS_FRAME_LEN) + + @property + @abstractmethod + def count(self) -> int: + """Return number of decoded measurements so far.""" + + @property + def done(self) -> bool: + """Return True if the decoder has seen all expected measurements.""" + return self.count >= self.expected + + def pop_terminal(self) -> bool: + """Return and clear the terminal frame seen flag.""" + seen = self._terminal_seen + self._terminal_seen = False + return seen + + def feed(self, chunk: bytes) -> None: + """Consume a raw chunk and update decoder state.""" + for event in self._parser.feed(chunk): + if event.text is not None: + if event.text == "ST": + self._terminal_seen = True + elif event.payload_len is not None and event.blob is not None: + self.feed_bin(event.payload_len, event.blob) + + def feed_bin(self, payload_len: int, blob: bytes) -> None: + """Handle a binary payload if the decoder expects one.""" + if self._should_consume_bin(payload_len): + self._handle_bin(payload_len, blob) + + def _should_consume_bin(self, _payload_len: int) -> bool: + return False + + def _handle_bin(self, _payload_len: int, _blob: bytes) -> None: + return None + + +class TecanInfinite200ProBackend(PlateReaderBackend): + """Backend shell for the Infinite 200 PRO.""" + + _MODE_CAPABILITY_COMMANDS: Dict[str, List[str]] = { + "ABS": [ + "#BEAM DIAMETER", + # Additional capabilities available but currently unused: + # "#EXCITATION WAVELENGTH", + # "#EXCITATION USAGE", + # "#EXCITATION NAME", + # "#EXCITATION BANDWIDTH", + # "#EXCITATION ATTENUATION", + # "#EXCITATION DESCRIPTION", + # "#TIME READDELAY", + # "#SHAKING MODE", + # "#SHAKING CONST.ORBITAL", + # "#SHAKING AMPLITUDE", + # "#SHAKING TIME", + # "#SHAKING CONST.LINEAR", + # "#TEMPERATURE PLATE", + ], + "FI.TOP": [ + # "#BEAM DIAMETER", + # Additional capabilities available but currently unused: + # "#EMISSION WAVELENGTH", + # "#EMISSION USAGE", + # "#EMISSION NAME", + # "#EMISSION BANDWIDTH", + # "#EMISSION ATTENUATION", + # "#EMISSION DESCRIPTION", + # "#EXCITATION WAVELENGTH", + # "#EXCITATION USAGE", + # "#EXCITATION NAME", + # "#EXCITATION BANDWIDTH", + # "#EXCITATION ATTENUATION", + # "#EXCITATION DESCRIPTION", + # "#TIME INTEGRATION", + # "#TIME LAG", + # "#TIME READDELAY", + # "#GAIN VALUE", + # "#READS SPEED", + # "#READS NUMBER", + # "#RANGES PMT,EXCITATION", + # "#RANGES PMT,EMISSION", + # "#POSITION FIL,Z", + # "#TEMPERATURE PLATE", + ], + "FI.BOTTOM": [ + # "#BEAM DIAMETER", + # Additional capabilities available but currently unused: + # "#EMISSION WAVELENGTH", + # "#EMISSION USAGE", + # "#EXCITATION WAVELENGTH", + # "#EXCITATION USAGE", + # "#TIME INTEGRATION", + # "#TIME LAG", + # "#TIME READDELAY", + ], + "LUM": [ + # "#BEAM DIAMETER", + # Additional capabilities available but currently unused: + # "#EMISSION WAVELENGTH", + # "#EMISSION USAGE", + # "#EMISSION NAME", + # "#EMISSION BANDWIDTH", + # "#EMISSION ATTENUATION", + # "#EMISSION DESCRIPTION", + # "#TIME INTEGRATION", + # "#TIME READDELAY", + ], + } + + VENDOR_ID = 0x0C47 + PRODUCT_ID = 0x8007 + + def __init__( + self, + scan_config: Optional[InfiniteScanConfig] = None, + ) -> None: + super().__init__() + self.io = USB( + id_vendor=self.VENDOR_ID, + id_product=self.PRODUCT_ID, + packet_read_timeout=3, + read_timeout=30, + ) + self.config = scan_config or InfiniteScanConfig() + self._setup_lock: Optional[asyncio.Lock] = None + self._ready = False + self._read_chunk_size = 512 + self._max_read_iterations = 200 + self._mode_capabilities: Dict[str, Dict[str, str]] = {} + self._pending_bin_events: List[Tuple[int, bytes]] = [] + self._parser = _StreamParser(allow_bare_ascii=True) + self._run_active = False + self._active_step_loss_commands: List[str] = [] + + async def setup(self) -> None: + if self._setup_lock is None: + self._setup_lock = asyncio.Lock() + async with self._setup_lock: + if self._ready: + return + await self.io.setup() + await self._initialize_device() + for mode in self._MODE_CAPABILITY_COMMANDS: + if mode not in self._mode_capabilities: + await self._query_mode_capabilities(mode) + self._ready = True + + async def stop(self) -> None: + if self._setup_lock is None: + self._setup_lock = asyncio.Lock() + async with self._setup_lock: + if not self._ready: + return + await self._cleanup_protocol() + await self.io.stop() + self._mode_capabilities.clear() + self._reset_stream_state() + self._ready = False + + async def open(self) -> None: + """Open the reader drawer.""" + + await self._send_command("ABSOLUTE MTP,OUT") + await self._send_command("BY#T5000") + + async def close(self, plate: Optional[Plate]) -> None: # noqa: ARG002 + """Close the reader drawer.""" + + await self._send_command("ABSOLUTE MTP,IN") + await self._send_command("BY#T5000") + + async def _run_scan( + self, + ordered_wells: Sequence[Well], + decoder: _MeasurementDecoder, + mode: str, + step_loss_commands: List[str], + serpentine: bool, + scan_direction: str, + ) -> None: + """Run the common scan loop for all measurement types. + + Args: + ordered_wells: The wells to scan in row-major order. + decoder: The decoder to use for parsing measurements. + mode: The mode name for logging (e.g., "Absorbance"). + step_loss_commands: Commands to run after the scan to check for step loss. + serpentine: Whether to use serpentine scan order. + scan_direction: The scan direction command (e.g., "ALTUP", "UP"). + """ + self._active_step_loss_commands = step_loss_commands + + for row_index, row_wells in self._group_by_row(ordered_wells): + start_x, end_x, count = self._scan_range(row_index, row_wells, serpentine=serpentine) + _, y_stage = self._map_well_to_stage(row_wells[0]) + + await self._send_command(f"ABSOLUTE MTP,Y={y_stage}") + await self._send_command(f"SCAN DIRECTION={scan_direction}") + await self._send_command( + f"SCANX {start_x},{end_x},{count}", wait_for_terminal=False, read_response=False + ) + logger.info( + "Queued %s scan row %s (%s wells): y=%s, x=%s..%s", + mode.lower(), + row_index, + count, + y_stage, + start_x, + end_x, + ) + await self._await_measurements(decoder, count, mode) + await self._await_scan_terminal(decoder.pop_terminal()) + + async def read_absorbance(self, plate: Plate, wells: List[Well], wavelength: int) -> List[Dict]: + """Queue and execute an absorbance scan.""" + + if not 230 <= wavelength <= 1_000: + raise ValueError("Absorbance wavelength must be between 230 nm and 1000 nm.") + + ordered_wells = wells if wells else plate.get_all_items() + scan_wells = self._scan_visit_order(ordered_wells, serpentine=True) + decoder = _AbsorbanceRunDecoder(len(scan_wells)) + + await self._begin_run() + try: + await self._configure_absorbance(wavelength) + await self._run_scan( + ordered_wells=ordered_wells, + decoder=decoder, + mode="Absorbance", + step_loss_commands=["CHECK MTP.STEPLOSS", "CHECK ABS.STEPLOSS"], + serpentine=True, + scan_direction="ALTUP", + ) + + if len(decoder.measurements) != len(scan_wells): + raise RuntimeError("Absorbance decoder did not complete scan.") + intensities: List[float] = [] + cal = decoder.calibration + if cal is None: + raise RuntimeError("ABS calibration packet not seen; cannot compute calibrated OD.") + for meas in decoder.measurements: + items = meas.items or [(meas.sample, meas.reference)] + od = _absorbance_od_calibrated(cal, items) + intensities.append(od) + matrix = self._format_plate_result(plate, scan_wells, intensities) + return [ + { + "wavelength": wavelength, + "time": time.time(), + "temperature": None, + "data": matrix, + } + ] + finally: + await self._end_run() + + async def _clear_mode_settings(self, excitation: bool = False, emission: bool = False) -> None: + """Clear mode settings before configuring a new scan.""" + if excitation: + await self._send_command("EXCITATION CLEAR", allow_timeout=True) + if emission: + await self._send_command("EMISSION CLEAR", allow_timeout=True) + await self._send_command("TIME CLEAR", allow_timeout=True) + await self._send_command("GAIN CLEAR", allow_timeout=True) + await self._send_command("READS CLEAR", allow_timeout=True) + await self._send_command("POSITION CLEAR", allow_timeout=True) + await self._send_command("MIRROR CLEAR", allow_timeout=True) + + async def _configure_absorbance(self, wavelength_nm: int) -> None: + wl_decitenth = int(round(wavelength_nm * 10)) + bw_decitenth = int(round(self._auto_bandwidth(wavelength_nm) * 10)) + reads_number = max(1, int(self.config.flashes)) + + await self._send_command("MODE ABS") + await self._clear_mode_settings(excitation=True) + await self._send_command( + f"EXCITATION 0,ABS,{wl_decitenth},{bw_decitenth},0", allow_timeout=True + ) + await self._send_command( + f"EXCITATION 1,ABS,{wl_decitenth},{bw_decitenth},0", allow_timeout=True + ) + await self._send_command(f"READS 0,NUMBER={reads_number}", allow_timeout=True) + await self._send_command(f"READS 1,NUMBER={reads_number}", allow_timeout=True) + await self._send_command("TIME 0,READDELAY=0", allow_timeout=True) + await self._send_command("TIME 1,READDELAY=0", allow_timeout=True) + await self._send_command("SCAN DIRECTION=ALTUP", allow_timeout=True) + await self._send_command("#RATIO LABELS", allow_timeout=True) + await self._send_command( + f"BEAM DIAMETER={self._capability_numeric('ABS', '#BEAM DIAMETER', 700)}", allow_timeout=True + ) + await self._send_command("RATIO LABELS=1", allow_timeout=True) + await self._send_command("PREPARE REF", allow_timeout=True, read_response=False) + + def _auto_bandwidth(self, wavelength_nm: int) -> float: + """Return bandwidth in nm based on Infinite M specification.""" + + return 9.0 if wavelength_nm > 315 else 5.0 + + async def read_fluorescence( + self, + plate: Plate, + wells: List[Well], + excitation_wavelength: int, + emission_wavelength: int, + focal_height: float, + ) -> List[Dict]: + """Queue and execute a fluorescence scan.""" + + if not 230 <= excitation_wavelength <= 850: + raise ValueError("Excitation wavelength must be between 230 nm and 850 nm.") + if not 230 <= emission_wavelength <= 850: + raise ValueError("Emission wavelength must be between 230 nm and 850 nm.") + if focal_height < 0: + raise ValueError("Focal height must be non-negative for fluorescence scans.") + + ordered_wells = wells if wells else plate.get_all_items() + scan_wells = self._scan_visit_order(ordered_wells, serpentine=True) + + await self._begin_run() + try: + await self._configure_fluorescence(excitation_wavelength, emission_wavelength, focal_height) + decoder = _FluorescenceRunDecoder(len(scan_wells)) + + await self._run_scan( + ordered_wells=ordered_wells, + decoder=decoder, + mode="Fluorescence", + step_loss_commands=[ + "CHECK MTP.STEPLOSS", + "CHECK FI.TOP.STEPLOSS", + "CHECK FI.STEPLOSS.Z", + ], + serpentine=True, + scan_direction="UP", + ) + + if len(decoder.intensities) != len(scan_wells): + raise RuntimeError("Fluorescence decoder did not complete scan.") + intensities = decoder.intensities + matrix = self._format_plate_result(plate, scan_wells, intensities) + return [ + { + "ex_wavelength": excitation_wavelength, + "em_wavelength": emission_wavelength, + "time": time.time(), + "temperature": None, + "data": matrix, + } + ] + finally: + await self._end_run() + + async def _configure_fluorescence( + self, excitation_nm: int, emission_nm: int, focal_height: float + ) -> None: + ex_decitenth = int(round(excitation_nm * 10)) + em_decitenth = int(round(emission_nm * 10)) + reads_number = max(1, int(self.config.flashes)) + beam_diameter = self._capability_numeric("FI.TOP", "#BEAM DIAMETER", 3000) + z_position = int(round(focal_height * self.config.counts_per_mm_z)) + + # UI issues the entire FI configuration twice before PREPARE REF. + for _ in range(2): + await self._send_command("MODE FI.TOP", allow_timeout=True) + await self._clear_mode_settings(excitation=True, emission=True) + await self._send_command(f"EXCITATION 0,FI,{ex_decitenth},50,0", allow_timeout=True) + await self._send_command(f"EMISSION 0,FI,{em_decitenth},200,0", allow_timeout=True) + await self._send_command("TIME 0,INTEGRATION=20", allow_timeout=True) + await self._send_command("TIME 0,LAG=0", allow_timeout=True) + await self._send_command("TIME 0,READDELAY=0", allow_timeout=True) + await self._send_command("GAIN 0,VALUE=100", allow_timeout=True) + await self._send_command(f"POSITION 0,Z={z_position}", allow_timeout=True) + await self._send_command(f"BEAM DIAMETER={beam_diameter}", allow_timeout=True) + await self._send_command("SCAN DIRECTION=UP", allow_timeout=True) + await self._send_command("RATIO LABELS=1", allow_timeout=True) + await self._send_command(f"READS 0,NUMBER={reads_number}", allow_timeout=True) + await self._send_command(f"EXCITATION 1,FI,{ex_decitenth},50,0", allow_timeout=True) + await self._send_command(f"EMISSION 1,FI,{em_decitenth},200,0", allow_timeout=True) + await self._send_command("TIME 1,INTEGRATION=20", allow_timeout=True) + await self._send_command("TIME 1,LAG=0", allow_timeout=True) + await self._send_command("TIME 1,READDELAY=0", allow_timeout=True) + await self._send_command("GAIN 1,VALUE=100", allow_timeout=True) + await self._send_command(f"POSITION 1,Z={z_position}", allow_timeout=True) + await self._send_command(f"READS 1,NUMBER={reads_number}", allow_timeout=True) + await self._send_command("PREPARE REF", allow_timeout=True, read_response=False) + + async def read_luminescence( + self, + plate: Plate, + wells: List[Well], + focal_height: float, + ) -> List[Dict]: + """Queue and execute a luminescence scan.""" + + if focal_height < 0: + raise ValueError("Focal height must be non-negative for luminescence scans.") + + ordered_wells = wells if wells else plate.get_all_items() + scan_wells = self._scan_visit_order(ordered_wells, serpentine=False) + + dark_integration = 3_000_000 + meas_integration = 1_000_000 + + await self._begin_run() + try: + await self._configure_luminescence(dark_integration, meas_integration, focal_height) + + decoder = _LuminescenceRunDecoder( + len(scan_wells), + dark_integration_s=_integration_microseconds_to_seconds(dark_integration), + meas_integration_s=_integration_microseconds_to_seconds(meas_integration), + ) + + await self._run_scan( + ordered_wells=ordered_wells, + decoder=decoder, + mode="Luminescence", + step_loss_commands=["CHECK MTP.STEPLOSS", "CHECK LUM.STEPLOSS"], + serpentine=False, + scan_direction="UP", + ) + + if len(decoder.measurements) != len(scan_wells): + raise RuntimeError("Luminescence decoder did not complete scan.") + intensities = [measurement.intensity for measurement in decoder.measurements] + matrix = self._format_plate_result(plate, scan_wells, intensities) + return [ + { + "time": time.time(), + "temperature": None, + "data": matrix, + } + ] + finally: + await self._end_run() + + async def _await_measurements( + self, decoder: "_MeasurementDecoder", row_count: int, mode: str + ) -> None: + target = decoder.count + row_count + if self._pending_bin_events: + for payload_len, blob in self._pending_bin_events: + decoder.feed_bin(payload_len, blob) + self._pending_bin_events.clear() + iterations = 0 + while decoder.count < target and iterations < self._max_read_iterations: + chunk = await self._read_packet(self._read_chunk_size) + if not chunk: + raise RuntimeError(f"{mode} read returned empty chunk; transport may not support reads.") + decoder.feed(chunk) + iterations += 1 + if decoder.count < target: + raise RuntimeError(f"Timed out while parsing {mode.lower()} results.") + + async def _await_scan_terminal(self, saw_terminal: bool) -> None: + if saw_terminal: + return + await self._read_command_response() + + async def _configure_luminescence( + self, dark_integration: int, meas_integration: int, focal_height: float + ) -> None: + await self._send_command("MODE LUM") + # Pre-flight safety checks observed in captures (queries omitted). + await self._send_command("CHECK LUM.FIBER") + await self._send_command("CHECK LUM.LID") + await self._send_command("CHECK LUM.STEPLOSS") + await self._send_command("MODE LUM") + reads_number = max(1, int(self.config.flashes)) + z_position = int(round(focal_height * self.config.counts_per_mm_z)) + await self._clear_mode_settings(emission=True) + await self._send_command(f"POSITION LUM,Z={z_position}", allow_timeout=True) + await self._send_command(f"TIME 0,INTEGRATION={dark_integration}", allow_timeout=True) + await self._send_command(f"READS 0,NUMBER={reads_number}", allow_timeout=True) + await self._send_command("SCAN DIRECTION=UP", allow_timeout=True) + await self._send_command("RATIO LABELS=1", allow_timeout=True) + await self._send_command("EMISSION 1,EMPTY,0,0,0", allow_timeout=True) + await self._send_command(f"TIME 1,INTEGRATION={meas_integration}", allow_timeout=True) + await self._send_command("TIME 1,READDELAY=0", allow_timeout=True) + await self._send_command(f"READS 1,NUMBER={reads_number}", allow_timeout=True) + await self._send_command("#EMISSION ATTENUATION", allow_timeout=True) + await self._send_command("PREPARE REF", allow_timeout=True, read_response=False) + + def _group_by_row(self, wells: Sequence[Well]) -> List[Tuple[int, List[Well]]]: + grouped: Dict[int, List[Well]] = {} + for well in wells: + grouped.setdefault(well.get_row(), []).append(well) + for row in grouped.values(): + row.sort(key=lambda w: w.get_column()) + return sorted(grouped.items(), key=lambda item: item[0]) + + def _scan_visit_order(self, wells: Sequence[Well], serpentine: bool) -> List[Well]: + visit: List[Well] = [] + for row_index, row_wells in self._group_by_row(wells): + if serpentine and row_index % 2 == 1: + visit.extend(reversed(row_wells)) + else: + visit.extend(row_wells) + return visit + + def _map_well_to_stage(self, well: Well) -> StagePosition: + if well.location is None: + raise ValueError("Well does not have a location assigned within its plate definition.") + center = well.location + well.get_anchor(x="c", y="c") + cfg = self.config + stage_x = int(round(center.x * cfg.counts_per_mm_x)) + parent_plate = well.parent + if parent_plate is None or not isinstance(parent_plate, Plate): + raise ValueError("Well is not assigned to a plate; cannot derive stage coordinates.") + plate_height_mm = parent_plate.get_size_y() + stage_y = int(round((plate_height_mm - center.y) * cfg.counts_per_mm_y)) + return stage_x, stage_y + + def _scan_range( + self, row_index: int, row_wells: Sequence[Well], serpentine: bool + ) -> Tuple[int, int, int]: + """Return start/end/count for a row, honoring serpentine layout when requested.""" + + first_x, _ = self._map_well_to_stage(row_wells[0]) + last_x, _ = self._map_well_to_stage(row_wells[-1]) + count = len(row_wells) + if not serpentine: + return min(first_x, last_x), max(first_x, last_x), count + if row_index % 2 == 0: + return first_x, last_x, count + return last_x, first_x, count + + def _format_plate_result( + self, plate: Plate, wells: Sequence[Well], values: Sequence[float] + ) -> List[List[Optional[float]]]: + matrix: List[List[Optional[float]]] = [ + [None for _ in range(plate.num_items_x)] for _ in range(plate.num_items_y) + ] + for well, val in zip(wells, values): + r, c = well.get_row(), well.get_column() + if 0 <= r < plate.num_items_y and 0 <= c < plate.num_items_x: + matrix[r][c] = float(val) + return matrix + + async def _initialize_device(self) -> None: + try: + await self._send_command("QQ") + except TimeoutError: + logger.warning("QQ produced no response; continuing with initialization.") + await self._send_command("INIT FORCE") + + async def _begin_run(self) -> None: + self._reset_stream_state() + await self._send_command("KEYLOCK ON") + self._run_active = True + + def _reset_stream_state(self) -> None: + self._pending_bin_events.clear() + self._parser = _StreamParser(allow_bare_ascii=True) + + async def _read_packet(self, size: int) -> bytes: + try: + data = await self.io.read(size=size) + except TimeoutError: + await self._recover_transport() + raise + return data + + async def _recover_transport(self) -> None: + try: + await self.io.stop() + await asyncio.sleep(0.2) + await self.io.setup() + except Exception: + return + self._mode_capabilities.clear() + self._reset_stream_state() + await self._initialize_device() + + async def _end_run(self) -> None: + try: + await self._send_command("TERMINATE", allow_timeout=True) + for cmd in self._active_step_loss_commands: + await self._send_command(cmd, allow_timeout=True) + await self._send_command("KEYLOCK OFF", allow_timeout=True) + await self._send_command("ABSOLUTE MTP,IN", allow_timeout=True) + finally: + self._run_active = False + self._active_step_loss_commands = [] + + async def _cleanup_protocol(self) -> None: + async def send_cleanup_cmd(cmd: str) -> None: + try: + await self._send_command(cmd, allow_timeout=True, read_response=False) + except Exception: + logger.warning("Cleanup command failed: %s", cmd) + + if self._run_active or self._active_step_loss_commands: + await send_cleanup_cmd("TERMINATE") + for cmd in self._active_step_loss_commands: + await send_cleanup_cmd(cmd) + await send_cleanup_cmd("KEYLOCK OFF") + await send_cleanup_cmd("ABSOLUTE MTP,IN") + self._run_active = False + self._active_step_loss_commands = [] + + async def _query_mode_capabilities(self, mode: str) -> None: + commands = self._MODE_CAPABILITY_COMMANDS.get(mode) + if not commands: + return + try: + await self._send_command(f"MODE {mode}") + except TimeoutError: + logger.warning("Capability MODE %s timed out; continuing without mode capabilities.", mode) + return + collected: Dict[str, str] = {} + for cmd in commands: + try: + frames = await self._send_command(cmd) + except TimeoutError: + logger.warning("Capability query '%s' timed out; proceeding with defaults.", cmd) + continue + if frames: + collected[cmd] = frames[-1] + if collected: + self._mode_capabilities[mode] = collected + + def _get_mode_capability(self, mode: str, command: str) -> Optional[str]: + return self._mode_capabilities.get(mode, {}).get(command) + + def _capability_numeric(self, mode: str, command: str, fallback: int) -> int: + resp = self._get_mode_capability(mode, command) + if not resp: + return fallback + token = resp.split("|")[0].split(":")[0].split("~")[0].strip() + if not token: + return fallback + try: + return int(float(token)) + except ValueError: + return fallback + + @staticmethod + def _frame_command(command: str) -> bytes: + """Return a framed command with length/checksum trailer.""" + + payload = command.encode("ascii") + xor = 0 + for byte in payload: + xor ^= byte + checksum = (xor ^ 0x01) & 0xFF + length = len(payload) & 0xFF + return b"\x02" + payload + b"\x03\x00\x00" + bytes([length, checksum]) + b"\x0d" + + async def _send_command( + self, + command: str, + wait_for_terminal: bool = True, + allow_timeout: bool = False, + read_response: bool = True, + ) -> List[str]: + logger.debug("[tecan] >> %s", command) + framed = self._frame_command(command) + await self.io.write(framed) + if not read_response: + return [] + if command.startswith(("#", "?")): + try: + return await self._read_command_response(require_terminal=False) + except TimeoutError: + if allow_timeout: + logger.warning("Timeout waiting for response to %s", command) + return [] + raise + try: + frames = await self._read_command_response(require_terminal=wait_for_terminal) + except TimeoutError: + if allow_timeout: + logger.warning("Timeout waiting for response to %s", command) + return [] + raise + for pkt in frames: + logger.debug("[tecan] << %s", pkt) + return frames + + async def _drain(self, attempts: int = 4) -> None: + """Read and discard a few packets to clear the stream.""" + for _ in range(attempts): + data = await self._read_packet(128) + if not data: + break + + async def _read_command_response( + self, max_iterations: int = 8, require_terminal: bool = True + ) -> List[str]: + """Read response frames and cache any binary payloads that arrive.""" + frames: List[str] = [] + saw_terminal = False + for _ in range(max_iterations): + chunk = await self._read_packet(128) + if not chunk: + break + for event in self._parser.feed(chunk): + if event.text is not None: + frames.append(event.text) + if self._is_terminal_frame(event.text): + saw_terminal = True + elif event.payload_len is not None and event.blob is not None: + self._pending_bin_events.append((event.payload_len, event.blob)) + if not require_terminal and frames and not self._parser.has_pending_bin(): + break + if require_terminal and saw_terminal and not self._parser.has_pending_bin(): + break + if require_terminal and not saw_terminal: + # best effort: drain once more so pending ST doesn't leak into next command + await self._drain(1) + return frames + + @staticmethod + def _is_terminal_frame(text: str) -> bool: + """Return True if the ASCII frame is a terminal marker.""" + return text in {"ST", "+", "-"} or text.startswith("BY#T") + + +@dataclass +class _AbsorbanceMeasurement: + sample: int + reference: int + items: Optional[List[Tuple[int, int]]] = None + + +class _AbsorbanceRunDecoder(_MeasurementDecoder): + """Incrementally decode absorbance measurement frames.""" + + STATUS_FRAME_LEN = 31 + + def __init__(self, expected: int) -> None: + super().__init__(expected) + self.measurements: List[_AbsorbanceMeasurement] = [] + self._calibration: Optional[_AbsorbanceCalibration] = None + + @property + def count(self) -> int: + return len(self.measurements) + + @property + def calibration(self) -> Optional[_AbsorbanceCalibration]: + """Return the absorbance calibration data, if available.""" + return self._calibration + + def _should_consume_bin(self, payload_len: int) -> bool: + return _is_abs_calibration_len(payload_len) or _is_abs_data_len(payload_len) + + def _handle_bin(self, payload_len: int, blob: bytes) -> None: + if _is_abs_calibration_len(payload_len): + if self._calibration is not None: + return + cal = _decode_abs_calibration(payload_len, blob) + if cal is not None: + self._calibration = cal + return + if _is_abs_data_len(payload_len): + data = _decode_abs_data(payload_len, blob) + if data is None: + return + _label, _ex, items = data + sample, reference = items[0] if items else (0, 0) + self.measurements.append( + _AbsorbanceMeasurement(sample=sample, reference=reference, items=items) + ) + + +class _FluorescenceRunDecoder(_MeasurementDecoder): + """Incrementally decode fluorescence measurement frames.""" + + STATUS_FRAME_LEN = 31 + + def __init__(self, expected_wells: int) -> None: + super().__init__(expected_wells) + self._intensities: List[int] = [] + self._calibration: Optional[_FluorescenceCalibration] = None + + @property + def count(self) -> int: + return len(self._intensities) + + @property + def intensities(self) -> List[int]: + """Return decoded fluorescence intensities.""" + return self._intensities + + def _should_consume_bin(self, payload_len: int) -> bool: + if payload_len == 18: + return True + if payload_len >= 16 and (payload_len - 6) % 10 == 0: + return True + return False + + def _handle_bin(self, payload_len: int, blob: bytes) -> None: + if payload_len == 18: + cal = _decode_flr_calibration(payload_len, blob) + if cal is not None: + self._calibration = cal + return + data = _decode_flr_data(payload_len, blob) + if data is None: + return + _label, _ex, _em, items = data + if self._calibration is not None: + intensity = _fluorescence_corrected(self._calibration, items) + else: + if not items: + intensity = 0 + else: + intensity = int(round(sum(m for m, _ in items) / len(items))) + self._intensities.append(intensity) + + +@dataclass +class _LuminescenceMeasurement: + intensity: int + + +class _LuminescenceRunDecoder(_MeasurementDecoder): + """Incrementally decode luminescence measurement frames.""" + + def __init__( + self, + expected: int, + *, + dark_integration_s: float = 0.0, + meas_integration_s: float = 0.0, + ) -> None: + super().__init__(expected) + self.measurements: List[_LuminescenceMeasurement] = [] + self._calibration: Optional[_LuminescenceCalibration] = None + self._dark_integration_s = float(dark_integration_s) + self._meas_integration_s = float(meas_integration_s) + + @property + def count(self) -> int: + return len(self.measurements) + + def _should_consume_bin(self, payload_len: int) -> bool: + if payload_len == 10: + return True + if payload_len >= 14 and (payload_len - 4) % 10 == 0: + return True + return False + + def _handle_bin(self, payload_len: int, blob: bytes) -> None: + if payload_len == 10: + cal = _decode_lum_calibration(payload_len, blob) + if cal is not None: + self._calibration = cal + return + data = _decode_lum_data(payload_len, blob) + if data is None: + return + _label, _em, counts = data + if self._calibration is not None and self._dark_integration_s and self._meas_integration_s: + intensity = _luminescence_intensity( + self._calibration, counts, self._dark_integration_s, self._meas_integration_s + ) + else: + intensity = int(round(sum(counts) / len(counts))) if counts else 0 + self.measurements.append(_LuminescenceMeasurement(intensity=intensity)) + + +__all__ = [ + "TecanInfinite200ProBackend", + "InfiniteScanConfig", +] diff --git a/pylabrobot/plate_reading/tecan/infinite_backend_tests.py b/pylabrobot/plate_reading/tecan/infinite_backend_tests.py new file mode 100644 index 00000000000..859456693e4 --- /dev/null +++ b/pylabrobot/plate_reading/tecan/infinite_backend_tests.py @@ -0,0 +1,882 @@ +import unittest +from unittest.mock import AsyncMock, call, patch + +from pylabrobot.io.usb import USB +from pylabrobot.plate_reading.tecan.infinite_backend import ( + InfiniteScanConfig, + TecanInfinite200ProBackend, + _absorbance_od_calibrated, + _AbsorbanceRunDecoder, + _consume_leading_ascii_frame, + _FluorescenceRunDecoder, + _LuminescenceRunDecoder, +) +from pylabrobot.resources import Coordinate, Plate, Well, create_ordered_items_2d +from pylabrobot.resources.tecan.plates import Plate_384_Well + + +def _pack_u16(words): + return b"".join(int(word).to_bytes(2, "big") for word in words) + + +def _bin_blob(payload): + payload_len = len(payload) + trailer = b"\x00\x00\x00\x00" + return payload_len, payload + trailer + + +def _abs_calibration_blob(ex_decitenth, meas_dark, meas_bright, ref_dark, ref_bright): + header = _pack_u16([0, ex_decitenth]) + item = (0).to_bytes(4, "big") + _pack_u16([0, 0, meas_dark, meas_bright, 0, ref_dark, ref_bright]) + return _bin_blob(header + item) + + +def _abs_data_blob(ex_decitenth, meas, ref): + payload = _pack_u16([0, ex_decitenth, 0, 0, 0, meas, ref]) + return _bin_blob(payload) + + +def _flr_calibration_blob(ex_decitenth, meas_dark, ref_dark, ref_bright): + words = [ex_decitenth, 0, 0, 0, 0, meas_dark, 0, ref_dark, ref_bright] + return _bin_blob(_pack_u16(words)) + + +def _flr_data_blob(ex_decitenth, em_decitenth, meas, ref): + words = [0, ex_decitenth, em_decitenth, 0, 0, 0, meas, ref] + return _bin_blob(_pack_u16(words)) + + +def _lum_data_blob(em_decitenth: int, intensity: int): + payload = bytearray(14) + payload[0:2] = (0).to_bytes(2, "big") + payload[2:4] = int(em_decitenth).to_bytes(2, "big") + payload[10:14] = int(intensity).to_bytes(4, "big", signed=True) + return _bin_blob(bytes(payload)) + + +def _make_test_plate(): + plate = Plate( + "plate", + size_x=30, + size_y=20, + size_z=10, + ordered_items=create_ordered_items_2d( + Well, + num_items_x=3, + num_items_y=2, + dx=1, + dy=2, + dz=0, + item_dx=10, + item_dy=8, + size_x=4, + size_y=4, + size_z=5, + ), + ) + plate.location = Coordinate.zero() + return plate + + +def _egg_grid(): + return [ + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 9, + 31, + 46, + 42, + 7, + 2, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 1, + 24, + 69, + 100, + 137, + 142, + 70, + 24, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 24, + 77, + 128, + 135, + 123, + 68, + 52, + 26, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 4, + 60, + 104, + 114, + 86, + 72, + 48, + 2, + 2, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 9, + 75, + 122, + 82, + 71, + 99, + 69, + 4, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 3, + 64, + 132, + 148, + 61, + 75, + 137, + 86, + 17, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 23, + 98, + 160, + 87, + 92, + 139, + 133, + 65, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 4, + 53, + 100, + 93, + 104, + 125, + 146, + 46, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 33, + 73, + 103, + 128, + 143, + 164, + 169, + 61, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 4, + 60, + 93, + 113, + 90, + 107, + 124, + 137, + 118, + 7, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 3, + 64, + 97, + 98, + 63, + 94, + 95, + 135, + 121, + 8, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 36, + 100, + 118, + 119, + 126, + 140, + 154, + 65, + 1, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 3, + 40, + 98, + 141, + 150, + 121, + 61, + 6, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 8, + 75, + 88, + 12, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 45, + 53, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + [ + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 9, + 11, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + 0, + ], + ] + + +class TestTecanInfiniteDecoders(unittest.TestCase): + def setUp(self): + self.backend = TecanInfinite200ProBackend() + self.plate = Plate_384_Well(name="plate") + self.grid = _egg_grid() + self.max_intensity = max(max(row) for row in self.grid) + self.scan_wells = self.backend._scan_visit_order(self.plate.get_all_items(), serpentine=True) + + def _assert_matrix(self, actual, expected): + self.assertEqual(len(actual), len(expected)) + for row_actual, row_expected in zip(actual, expected): + self.assertEqual(len(row_actual), len(row_expected)) + for value, exp in zip(row_actual, row_expected): + self.assertAlmostEqual(value or 0.0, exp) + + def _run_decoder_case(self, decoder, build_packet, extract_actual): + expected_values = [] + for well in self.scan_wells: + intensity = self.grid[well.get_row()][well.get_column()] + payload_len, blob, expected = build_packet(intensity) + decoder.feed_bin(payload_len, blob) + expected_values.append(expected) + self.assertTrue(decoder.done) + actual_values = extract_actual(decoder) + matrix = self.backend._format_plate_result(self.plate, self.scan_wells, actual_values) + expected = self.backend._format_plate_result(self.plate, self.scan_wells, expected_values) + self._assert_matrix(matrix, expected) + + def test_decode_absorbance_pattern(self): + wavelength = 600 + reference = 10000 + max_absorbance = 1.0 + decoder = _AbsorbanceRunDecoder(len(self.scan_wells)) + cal_len, cal_blob = _abs_calibration_blob( + wavelength * 10, + meas_dark=0, + meas_bright=1000, + ref_dark=0, + ref_bright=1000, + ) + decoder.feed_bin(cal_len, cal_blob) + cal = decoder.calibration + assert cal is not None + + def build_packet(intensity): + target = 0.0 + if self.max_intensity: + target = (intensity / self.max_intensity) * max_absorbance + sample = max(1, int(round(reference / (10**target)))) + payload_len, blob = _abs_data_blob(wavelength * 10, sample, reference) + expected = _absorbance_od_calibrated(cal, [(sample, reference)]) + return payload_len, blob, expected + + def extract_actual(decoder): + return [ + _absorbance_od_calibrated(cal, [(meas.sample, meas.reference)]) + for meas in decoder.measurements + ] + + self._run_decoder_case(decoder, build_packet, extract_actual) + + def test_decode_fluorescence_pattern(self): + excitation = 485 + emission = 520 + decoder = _FluorescenceRunDecoder(len(self.scan_wells)) + cal_len, cal_blob = _flr_calibration_blob( + excitation * 10, meas_dark=0, ref_dark=0, ref_bright=1000 + ) + decoder.feed_bin(cal_len, cal_blob) + + def build_packet(intensity): + payload_len, blob = _flr_data_blob(excitation * 10, emission * 10, intensity, 1000) + return payload_len, blob, intensity + + def extract_actual(decoder): + return decoder.intensities + + self._run_decoder_case(decoder, build_packet, extract_actual) + + def test_decode_luminescence_pattern(self): + decoder = _LuminescenceRunDecoder(len(self.scan_wells)) + + def build_packet(intensity): + payload_len, blob = _lum_data_blob(0, intensity) + return payload_len, blob, intensity + + def extract_actual(decoder): + return [measurement.intensity for measurement in decoder.measurements] + + self._run_decoder_case(decoder, build_packet, extract_actual) + + +class TestTecanInfiniteScanGeometry(unittest.TestCase): + def setUp(self): + self.backend = TecanInfinite200ProBackend( + scan_config=InfiniteScanConfig(counts_per_mm_x=1, counts_per_mm_y=1) + ) + self.plate = _make_test_plate() + + def test_scan_visit_order_serpentine(self): + order = self.backend._scan_visit_order(self.plate.get_all_items(), serpentine=True) + identifiers = [well.get_identifier() for well in order] + self.assertEqual(identifiers, ["A1", "A2", "A3", "B3", "B2", "B1"]) + + def test_scan_visit_order_linear(self): + order = self.backend._scan_visit_order(self.plate.get_all_items(), serpentine=False) + identifiers = [well.get_identifier() for well in order] + self.assertEqual(identifiers, ["A1", "A2", "A3", "B1", "B2", "B3"]) + + def test_scan_range_serpentine(self): + setattr(self.backend, "_map_well_to_stage", lambda well: (well.get_column(), well.get_row())) + row_index, row_wells = self.backend._group_by_row(self.plate.get_all_items())[0] + start_x, end_x, count = self.backend._scan_range(row_index, row_wells, serpentine=True) + self.assertEqual((start_x, end_x, count), (0, 2, 3)) + row_index, row_wells = self.backend._group_by_row(self.plate.get_all_items())[1] + start_x, end_x, count = self.backend._scan_range(row_index, row_wells, serpentine=True) + self.assertEqual((start_x, end_x, count), (2, 0, 3)) + + def test_map_well_to_stage(self): + stage_x, stage_y = self.backend._map_well_to_stage(self.plate.get_well("A1")) + self.assertEqual((stage_x, stage_y), (3, 8)) + stage_x, stage_y = self.backend._map_well_to_stage(self.plate.get_well("B1")) + self.assertEqual((stage_x, stage_y), (3, 16)) + + +class TestTecanInfiniteAscii(unittest.TestCase): + def test_frame_command(self): + framed = TecanInfinite200ProBackend._frame_command("A") + self.assertEqual(framed, b"\x02A\x03\x00\x00\x01\x40\x0d") + + def test_consume_leading_ascii_frame(self): + buffer = bytearray(TecanInfinite200ProBackend._frame_command("ST") + b"XYZ") + consumed, text = _consume_leading_ascii_frame(buffer) + self.assertTrue(consumed) + self.assertEqual(text, "ST") + self.assertEqual(buffer, bytearray(b"XYZ")) + + def test_terminal_frames(self): + self.assertTrue(TecanInfinite200ProBackend._is_terminal_frame("ST")) + self.assertTrue(TecanInfinite200ProBackend._is_terminal_frame("+")) + self.assertTrue(TecanInfinite200ProBackend._is_terminal_frame("-")) + self.assertTrue(TecanInfinite200ProBackend._is_terminal_frame("BY#T5000")) + self.assertFalse(TecanInfinite200ProBackend._is_terminal_frame("OK")) + + +class TestTecanInfiniteCommands(unittest.IsolatedAsyncioTestCase): + """Tests that verify correct commands are sent to the device.""" + + def setUp(self): + self.mock_usb = AsyncMock(spec=USB) + self.mock_usb.setup = AsyncMock() + self.mock_usb.stop = AsyncMock() + self.mock_usb.write = AsyncMock() + # Default to returning terminal response + self.mock_usb.read = AsyncMock(return_value=self._frame("ST")) + + patcher = patch( + "pylabrobot.plate_reading.tecan.infinite_backend.USB", + return_value=self.mock_usb, + ) + self.mock_usb_class = patcher.start() + self.addCleanup(patcher.stop) + + self.backend = TecanInfinite200ProBackend( + scan_config=InfiniteScanConfig(counts_per_mm_x=1000, counts_per_mm_y=1000) + ) + self.plate = _make_test_plate() + self.plate.location = Coordinate.zero() + + def _frame(self, command: str) -> bytes: + """Helper to frame a command.""" + return TecanInfinite200ProBackend._frame_command(command) + + async def test_open(self): + self.backend._ready = True + + await self.backend.open() + + self.mock_usb.write.assert_has_calls( + [ + call(self._frame("ABSOLUTE MTP,OUT")), + call(self._frame("BY#T5000")), + ] + ) + + async def test_close(self): + self.backend._ready = True + + await self.backend.close(self.plate) + + self.mock_usb.write.assert_has_calls( + [ + call(self._frame("ABSOLUTE MTP,IN")), + call(self._frame("BY#T5000")), + ] + ) + + async def test_read_absorbance_commands(self): + """Test that read_absorbance sends the correct configuration commands.""" + self.backend._ready = True + + async def mock_await(decoder, row_count, mode): + cal_len, cal_blob = _abs_calibration_blob(6000, 0, 1000, 0, 1000) + decoder.feed_bin(cal_len, cal_blob) + for _ in range(row_count): + data_len, data_blob = _abs_data_blob(6000, 500, 1000) + decoder.feed_bin(data_len, data_blob) + + with patch.object(self.backend, "_await_measurements", side_effect=mock_await): + with patch.object(self.backend, "_await_scan_terminal", new_callable=AsyncMock): + await self.backend.read_absorbance(self.plate, [], wavelength=600) + + self.mock_usb.write.assert_has_calls( + [ + # _begin_run + call(self._frame("KEYLOCK ON")), + # _configure_absorbance + call(self._frame("MODE ABS")), + call(self._frame("EXCITATION CLEAR")), + call(self._frame("TIME CLEAR")), + call(self._frame("GAIN CLEAR")), + call(self._frame("READS CLEAR")), + call(self._frame("POSITION CLEAR")), + call(self._frame("MIRROR CLEAR")), + call(self._frame("EXCITATION 0,ABS,6000,90,0")), + call(self._frame("EXCITATION 1,ABS,6000,90,0")), + call(self._frame("READS 0,NUMBER=25")), + call(self._frame("READS 1,NUMBER=25")), + call(self._frame("TIME 0,READDELAY=0")), + call(self._frame("TIME 1,READDELAY=0")), + call(self._frame("SCAN DIRECTION=ALTUP")), + call(self._frame("#RATIO LABELS")), + call(self._frame("BEAM DIAMETER=700")), + call(self._frame("RATIO LABELS=1")), + call(self._frame("PREPARE REF")), + # row scans (2 rows in test plate) + call(self._frame("ABSOLUTE MTP,Y=8000")), + call(self._frame("SCAN DIRECTION=ALTUP")), + call(self._frame("SCANX 3000,23000,3")), + call(self._frame("ABSOLUTE MTP,Y=16000")), + call(self._frame("SCAN DIRECTION=ALTUP")), + call(self._frame("SCANX 23000,3000,3")), + # _end_run + call(self._frame("TERMINATE")), + call(self._frame("CHECK MTP.STEPLOSS")), + call(self._frame("CHECK ABS.STEPLOSS")), + call(self._frame("KEYLOCK OFF")), + call(self._frame("ABSOLUTE MTP,IN")), + ] + ) + + async def test_read_fluorescence_commands(self): + """Test that read_fluorescence sends the correct configuration commands.""" + self.backend._ready = True + + async def mock_await(decoder, row_count, mode): + cal_len, cal_blob = _flr_calibration_blob(4850, 0, 0, 1000) + decoder.feed_bin(cal_len, cal_blob) + for _ in range(row_count): + data_len, data_blob = _flr_data_blob(4850, 5200, 500, 1000) + decoder.feed_bin(data_len, data_blob) + + with patch.object(self.backend, "_await_measurements", side_effect=mock_await): + with patch.object(self.backend, "_await_scan_terminal", new_callable=AsyncMock): + await self.backend.read_fluorescence( + self.plate, [], excitation_wavelength=485, emission_wavelength=520, focal_height=20.0 + ) + + # Fluorescence config is sent twice (UI behavior) + fl_config_commands = [ + call(self._frame("MODE FI.TOP")), + call(self._frame("EXCITATION CLEAR")), + call(self._frame("EMISSION CLEAR")), + call(self._frame("TIME CLEAR")), + call(self._frame("GAIN CLEAR")), + call(self._frame("READS CLEAR")), + call(self._frame("POSITION CLEAR")), + call(self._frame("MIRROR CLEAR")), + call(self._frame("EXCITATION 0,FI,4850,50,0")), + call(self._frame("EMISSION 0,FI,5200,200,0")), + call(self._frame("TIME 0,INTEGRATION=20")), + call(self._frame("TIME 0,LAG=0")), + call(self._frame("TIME 0,READDELAY=0")), + call(self._frame("GAIN 0,VALUE=100")), + call(self._frame("POSITION 0,Z=20000")), + call(self._frame("BEAM DIAMETER=3000")), + call(self._frame("SCAN DIRECTION=UP")), + call(self._frame("RATIO LABELS=1")), + call(self._frame("READS 0,NUMBER=25")), + call(self._frame("EXCITATION 1,FI,4850,50,0")), + call(self._frame("EMISSION 1,FI,5200,200,0")), + call(self._frame("TIME 1,INTEGRATION=20")), + call(self._frame("TIME 1,LAG=0")), + call(self._frame("TIME 1,READDELAY=0")), + call(self._frame("GAIN 1,VALUE=100")), + call(self._frame("POSITION 1,Z=20000")), + call(self._frame("READS 1,NUMBER=25")), + ] + + self.mock_usb.write.assert_has_calls( + [ + # _begin_run + call(self._frame("KEYLOCK ON")), + # _configure_fluorescence (sent twice) + *fl_config_commands, + *fl_config_commands, + call(self._frame("PREPARE REF")), + # row scans (2 rows in test plate) + call(self._frame("ABSOLUTE MTP,Y=8000")), + call(self._frame("SCAN DIRECTION=UP")), + call(self._frame("SCANX 3000,23000,3")), + call(self._frame("ABSOLUTE MTP,Y=16000")), + call(self._frame("SCAN DIRECTION=UP")), + call(self._frame("SCANX 23000,3000,3")), + # _end_run + call(self._frame("TERMINATE")), + call(self._frame("CHECK MTP.STEPLOSS")), + call(self._frame("CHECK FI.TOP.STEPLOSS")), + call(self._frame("CHECK FI.STEPLOSS.Z")), + call(self._frame("KEYLOCK OFF")), + call(self._frame("ABSOLUTE MTP,IN")), + ] + ) + + async def test_read_luminescence_commands(self): + """Test that read_luminescence sends the correct configuration commands.""" + self.backend._ready = True + + async def mock_await(decoder, row_count, mode): + cal_blob = bytes(14) + decoder.feed_bin(10, cal_blob) + for _ in range(row_count): + data_len, data_blob = _lum_data_blob(0, 1000) + decoder.feed_bin(data_len, data_blob) + + with patch.object(self.backend, "_await_measurements", side_effect=mock_await): + with patch.object(self.backend, "_await_scan_terminal", new_callable=AsyncMock): + await self.backend.read_luminescence(self.plate, [], focal_height=14.62) + + self.mock_usb.write.assert_has_calls( + [ + # _begin_run + call(self._frame("KEYLOCK ON")), + # _configure_luminescence + call(self._frame("MODE LUM")), + call(self._frame("CHECK LUM.FIBER")), + call(self._frame("CHECK LUM.LID")), + call(self._frame("CHECK LUM.STEPLOSS")), + call(self._frame("MODE LUM")), + call(self._frame("EMISSION CLEAR")), + call(self._frame("TIME CLEAR")), + call(self._frame("GAIN CLEAR")), + call(self._frame("READS CLEAR")), + call(self._frame("POSITION CLEAR")), + call(self._frame("MIRROR CLEAR")), + call(self._frame("POSITION LUM,Z=14620")), + call(self._frame("TIME 0,INTEGRATION=3000000")), + call(self._frame("READS 0,NUMBER=25")), + call(self._frame("SCAN DIRECTION=UP")), + call(self._frame("RATIO LABELS=1")), + call(self._frame("EMISSION 1,EMPTY,0,0,0")), + call(self._frame("TIME 1,INTEGRATION=1000000")), + call(self._frame("TIME 1,READDELAY=0")), + call(self._frame("READS 1,NUMBER=25")), + call(self._frame("#EMISSION ATTENUATION")), + call(self._frame("PREPARE REF")), + # row scans (2 rows, non-serpentine so both scan left-to-right) + call(self._frame("ABSOLUTE MTP,Y=8000")), + call(self._frame("SCAN DIRECTION=UP")), + call(self._frame("SCANX 3000,23000,3")), + call(self._frame("ABSOLUTE MTP,Y=16000")), + call(self._frame("SCAN DIRECTION=UP")), + call(self._frame("SCANX 3000,23000,3")), + # _end_run + call(self._frame("TERMINATE")), + call(self._frame("CHECK MTP.STEPLOSS")), + call(self._frame("CHECK LUM.STEPLOSS")), + call(self._frame("KEYLOCK OFF")), + call(self._frame("ABSOLUTE MTP,IN")), + ] + )