diff --git a/docs/user_guide/01_material-handling/storage/liconic.ipynb b/docs/user_guide/01_material-handling/storage/liconic.ipynb
new file mode 100644
index 00000000000..aa361977397
--- /dev/null
+++ b/docs/user_guide/01_material-handling/storage/liconic.ipynb
@@ -0,0 +1,241 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "b63b4656",
+ "metadata": {},
+ "source": [
+ "
Liconic STX Series
\n",
+ "\n",
+ "The Liconic STX line of automated incubators come in a variety of sizes including STX 1000, STX 500, STX 280, STX 220, STX 110, STX 44. Which corresponds to the number of plates each size can store using the standard 22 plate capacity cassettes/cartridges (plate height 17mm, 505mm total height.) There are other cassette size for plates height ranging from 5 to 104mm in height (higher plates = less number of plates storage capacity.)\n",
+ "\n",
+ "The Liconic STX line comes in a variety of climate control options including Ultra High Temp. (HTT), Incubator (IC), Dry Storage (DC2), Humid Cooler (HC), Humid Wide Range (HR), Dry Wide Range (DR2), Humidity Controlled (AR), Deep Freezer (DF) and Ultra Deep Freezer (UDF). Each have different ranges of temperatures and humidity control ability.\n",
+ "\n",
+ "Other accessories that can be included with the STX and can be utilized with this driver include N2 gassing, CO2 gassing, a Turn Station (rotation of plates 90 degrees on the transfer station), internal barcode scanners, a swap station (two transfer plate positions that can be rotated 180 degrees) and internal shaking. \n",
+ "\n",
+ "This tutorial shows how to\n",
+ " - Connect the Liconic incubator\n",
+ " - Configure racks\n",
+ " - Move plates in and out\n",
+ " - Set and monitor temperature and humidity values"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "fcd75e15",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "\n",
+ "from pylabrobot.resources.coordinate import Coordinate\n",
+ "from pylabrobot.storage import LiconicBackend\n",
+ "from pylabrobot.storage.incubator import Incubator\n",
+ "from pylabrobot.storage.liconic.racks import liconic_rack_17mm_22, liconic_rack_44mm_10\n",
+ "\n",
+ "backend = LiconicBackend(port=\"COM3\", model=\"STX220_HC\", barcode_installed=True, barcode_port=\"COM4\")\n",
+ "\n",
+ "rack = [\n",
+ " liconic_rack_44mm_10(\"cassette_0\"),\n",
+ " liconic_rack_44mm_10(\"cassette_1\"),\n",
+ " liconic_rack_44mm_10(\"cassette_2\"),\n",
+ " liconic_rack_17mm_22(\"cassette_3\"),\n",
+ " liconic_rack_17mm_22(\"cassette_4\"),\n",
+ " liconic_rack_17mm_22(\"cassette_5\"),\n",
+ " liconic_rack_17mm_22(\"cassette_6\"),\n",
+ " liconic_rack_17mm_22(\"cassette_7\"),\n",
+ " liconic_rack_17mm_22(\"cassette_8\"),\n",
+ " liconic_rack_17mm_22(\"cassette_9\")\n",
+ " ]\n",
+ "\n",
+ "incubator = Incubator(\n",
+ " backend=backend,\n",
+ " name=\"My Incubator\",\n",
+ " size_x=100,\n",
+ " size_y=100,\n",
+ " size_z=100,\n",
+ " racks=rack,\n",
+ " loading_tray_location=Coordinate(x=0, y=0, z=0),\n",
+ ")\n",
+ "\n",
+ "await incubator.setup()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "19b3a6cc",
+ "metadata": {
+ "vscode": {
+ "languageId": "plaintext"
+ }
+ },
+ "source": [
+ "Setup\n",
+ "\n",
+ "To setup the incubator and start sending commands first the backed needs to be declared. For the Liconic the LiconcBackend class is used with the COM port used for connection (in this case COM3) and the model needs to specified (in this case the STX 220 Humid Cooler, STX220_HC). If an internal barcode is installed the barcode_installed parameter is set to True and its COM port is also specified. These two parameters are optional so can be omitted for Liconics without an internal barcode scanner. \n",
+ "\n",
+ "Given a STX 220 (220 plate position / 22 plates per rack = 10 racks) can hold 10 racks the list of racks is built and includes mixing and matching different plate height racks. The differences in racks are handled prior to plate retrieval and storage. \n",
+ "\n",
+ "Once the these are built the base Incubator class is created and the connection to the incubator is initialized using:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "9d7a4f49",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "await incubator.setup()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "52f79811",
+ "metadata": {},
+ "source": [
+ "To store a plate first a plate resource is initialized and then assigned to the loading tray. The method take_in_plate is then called on the incubator object.\n",
+ "\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "d26e039d",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pylabrobot.resources import Azenta4titudeFrameStar_96_wellplate_200ul_Vb\n",
+ "\n",
+ "new_plate = Azenta4titudeFrameStar_96_wellplate_200ul_Vb(name=\"TEST\")\n",
+ "incubator.loading_tray.assign_child_resource(new_plate)\n",
+ "await incubator.take_in_plate(\"smallest\") # choose the smallest free site\n",
+ "\n",
+ "# other options:\n",
+ "# await incubator.take_in_plate(\"random\") # random free site\n",
+ "# await incubator.take_in_plate(rack[3]) # store at rack position 3"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "85dcddb7",
+ "metadata": {},
+ "source": [
+ "To retrieve a plate the plate name can used"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "00308838",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "await incubator.fetch_plate_to_loading_tray(plate=\"TEST\")\n",
+ "retrieved = incubator.loading_tray.resource"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0045e703",
+ "metadata": {},
+ "source": [
+ "You can also print a barcode from this call (if barcode is installed per the backend insatiation). Returning of the barcode as a return object still needs to be implemented. Currently the barcode is just printed to the terminal.\n",
+ "\n",
+ "Barcode can returned by setting the read_barcode to True for \n",
+ "- take_in_plate\n",
+ "- fetch_plate_to_loading_tray\n",
+ "- move_position_to_position"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "c8730560",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "position = rack[9][0] # rack number 9 position 1\n",
+ "\n",
+ "await incubator.fetch_plate_to_loading_tray(plate_name=\"TEST\",read_barcode=True)\n",
+ "\n",
+ "await incubator.take_in_plate(position,read_barcode=True)\n",
+ "\n",
+ "await incubator.move_position_to_position(plate_name=\"TEST\",dest_site=position,read_barcode=True)\n",
+ "# will print the barcode to the terminal"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "14efdf69",
+ "metadata": {},
+ "source": [
+ "The humdity, temperature, N2 gas and CO2 gas levels can all be controlled and queried. For temperature for example:\n",
+ "\n",
+ "- To get the current temperature"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "73e38f2a",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "temperature = await incubator.get_temperature() # returns temperature as float in Celsius to the 10th place\n",
+ "print(str(temperature))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c7383277",
+ "metadata": {},
+ "source": [
+ "- To set the temperature of the Liconic"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "2c51c385",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "await incubator.set_temperature(8.0) # set the temperature to 8 degrees Celsius"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "4f07f349",
+ "metadata": {},
+ "source": [
+ "- You can also retrieve the set value (the value sent for set_temperature) using:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "288dea91",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "set_temperature = await incubator.get_set_temperature() # will return a float for the set temperature in degrees Celsius"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "3a1d9ef3",
+ "metadata": {},
+ "source": [
+ "This pattern is the same for CO2, N2 and Humidity control of the Liconic. "
+ ]
+ }
+ ],
+ "metadata": {
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/pylabrobot/barcode_scanners/__init__.py b/pylabrobot/barcode_scanners/__init__.py
new file mode 100644
index 00000000000..befd981f9a9
--- /dev/null
+++ b/pylabrobot/barcode_scanners/__init__.py
@@ -0,0 +1,3 @@
+from .backend import BarcodeScannerBackend, BarcodeScannerError
+from .barcode_scanner import BarcodeScanner
+from .keyence import KeyenceBarcodeScannerBackend
diff --git a/pylabrobot/barcode_scanners/backend.py b/pylabrobot/barcode_scanners/backend.py
new file mode 100644
index 00000000000..ce7bbe2b7a1
--- /dev/null
+++ b/pylabrobot/barcode_scanners/backend.py
@@ -0,0 +1,18 @@
+from abc import ABCMeta, abstractmethod
+
+from pylabrobot.machines.backend import MachineBackend
+from pylabrobot.resources.barcode import Barcode
+
+
+class BarcodeScannerError(Exception):
+ """Error raised by a barcode scanner backend."""
+
+
+class BarcodeScannerBackend(MachineBackend, metaclass=ABCMeta):
+ def __init__(self):
+ super().__init__()
+
+ @abstractmethod
+ async def scan_barcode(self) -> Barcode:
+ """Scan a barcode and return its value."""
+ pass
diff --git a/pylabrobot/barcode_scanners/barcode_scanner.py b/pylabrobot/barcode_scanners/barcode_scanner.py
new file mode 100644
index 00000000000..821e5789ae2
--- /dev/null
+++ b/pylabrobot/barcode_scanners/barcode_scanner.py
@@ -0,0 +1,15 @@
+from pylabrobot.barcode_scanners.backend import BarcodeScannerBackend
+from pylabrobot.machines.machine import Machine
+from pylabrobot.resources.barcode import Barcode
+
+
+class BarcodeScanner(Machine):
+ """Frontend for barcode scanners."""
+
+ def __init__(self, backend: BarcodeScannerBackend):
+ super().__init__(backend=backend)
+ self.backend: BarcodeScannerBackend = backend
+
+ async def scan(self) -> Barcode:
+ """Scan a barcode and return its value."""
+ return await self.backend.scan_barcode()
diff --git a/pylabrobot/barcode_scanners/keyence/__init__.py b/pylabrobot/barcode_scanners/keyence/__init__.py
new file mode 100644
index 00000000000..7f99f5acbdd
--- /dev/null
+++ b/pylabrobot/barcode_scanners/keyence/__init__.py
@@ -0,0 +1 @@
+from .barcode_scanner_backend import KeyenceBarcodeScannerBackend
diff --git a/pylabrobot/barcode_scanners/keyence/barcode_scanner_backend.py b/pylabrobot/barcode_scanners/keyence/barcode_scanner_backend.py
new file mode 100644
index 00000000000..219b37a23d2
--- /dev/null
+++ b/pylabrobot/barcode_scanners/keyence/barcode_scanner_backend.py
@@ -0,0 +1,111 @@
+import asyncio
+import time
+from typing import Awaitable, Callable, Optional
+
+import serial
+
+from pylabrobot.barcode_scanners.backend import (
+ BarcodeScannerBackend,
+ BarcodeScannerError,
+)
+from pylabrobot.io.serial import Serial
+from pylabrobot.resources.barcode import Barcode
+
+
+class KeyenceBarcodeScannerBackend(BarcodeScannerBackend):
+ default_baudrate = 9600
+ serial_messaging_encoding = "ascii"
+ init_timeout = 1.0 # seconds
+ poll_interval = 0.2 # seconds
+
+ def __init__(
+ self,
+ serial_port: str,
+ ):
+ super().__init__()
+
+ # BL-1300 Barcode reader factory default serial communication settings
+ # should be the same factory default for the BL-600HA and BL-1300 models
+ self.io = Serial(
+ port=serial_port,
+ baudrate=self.default_baudrate,
+ bytesize=serial.SEVENBITS,
+ parity=serial.PARITY_EVEN,
+ stopbits=serial.STOPBITS_ONE,
+ write_timeout=1,
+ timeout=1,
+ rtscts=False,
+ )
+
+ async def setup(self):
+ await self.io.setup()
+ await self.initialize_scanner()
+
+ async def initialize_scanner(self):
+ """Initialize the Keyence barcode scanner."""
+
+ response = await self.send_command("RMOTOR")
+
+ deadline = time.time() + self.init_timeout
+ while time.time() < deadline:
+ response = await self.send_command("RMOTOR")
+ if response.strip() == "MOTORON":
+ print("Barcode scanner motor is ON.")
+ break
+ elif response.strip() == "MOTOROFF":
+ raise BarcodeScannerError("Failed to initialize Keyence barcode scanner: Motor is off.")
+ await asyncio.sleep(self.poll_interval)
+ else:
+ raise BarcodeScannerError(
+ "Failed to initialize Keyence barcode scanner: " "Timeout waiting for motor to turn on."
+ )
+
+ async def send_command(self, command: str) -> str:
+ """Send a command to the barcode scanner and return the response.
+ Keyence uses carriage return \r as the line ending by default."""
+
+ await self.io.write((command + "\r").encode(self.serial_messaging_encoding))
+ response = await self.io.read()
+ return response.decode(self.serial_messaging_encoding).strip()
+
+ async def send_command_and_stream(
+ self,
+ command: str,
+ on_response: Callable[[str], Awaitable[None]],
+ timeout: float = 5.0,
+ stop_condition: Optional[Callable[[str], bool]] = None,
+ ):
+ """Send a command and call on_response for each barcode response."""
+ await self.io.write((command + "\r").encode(self.serial_messaging_encoding))
+ deadline = time.time() + timeout
+
+ while time.time() < deadline:
+ try:
+ response = await asyncio.wait_for(self.io.readline(), timeout=1.0)
+ if response:
+ decoded = response.decode(self.serial_messaging_encoding).strip()
+ print(f"Received from barcode scanner: {decoded}")
+ if decoded:
+ try:
+ await on_response(decoded) # Call the callback
+ except Exception as e:
+ print(f"Error in callback: {e}")
+ if stop_condition and stop_condition(decoded):
+ break
+ except asyncio.TimeoutError:
+ print("Barcode scanner timeout, continuing...")
+ continue
+ except Exception as e:
+ print(f"Error reading from barcode scanner: {e}")
+ continue
+
+ async def stop(self):
+ await self.io.stop()
+
+ async def scan_barcode(self) -> Barcode:
+ data = await self.send_command("LON")
+ if data.startswith("NG"):
+ raise BarcodeScannerError("Barcode reader is off: cannot read barcode")
+ if data.startswith("ERR99"):
+ raise BarcodeScannerError(f"Error response from barcode reader: {data}")
+ return Barcode(data=data, symbology="unknown", position_on_resource="front")
diff --git a/pylabrobot/storage/__init__.py b/pylabrobot/storage/__init__.py
index 0a987a7df1f..0a6f5fb87ac 100644
--- a/pylabrobot/storage/__init__.py
+++ b/pylabrobot/storage/__init__.py
@@ -2,4 +2,5 @@
from .chatterbox import IncubatorChatterboxBackend
from .cytomat import CytomatBackend
from .incubator import Incubator
+from .liconic import LiconicBackend
# from .inheco import *
diff --git a/pylabrobot/storage/backend.py b/pylabrobot/storage/backend.py
index 82af1917e04..2a0bacbac2b 100644
--- a/pylabrobot/storage/backend.py
+++ b/pylabrobot/storage/backend.py
@@ -3,6 +3,7 @@
from pylabrobot.machines.backend import MachineBackend
from pylabrobot.resources import Plate, PlateCarrier, PlateHolder
+from pylabrobot.resources.barcode import Barcode
class IncubatorBackend(MachineBackend, metaclass=ABCMeta):
@@ -27,11 +28,11 @@ async def close_door(self):
pass
@abstractmethod
- async def fetch_plate_to_loading_tray(self, plate: Plate):
+ async def fetch_plate_to_loading_tray(self, plate: Plate, **kwargs):
pass
@abstractmethod
- async def take_in_plate(self, plate: Plate, site: PlateHolder):
+ async def take_in_plate(self, plate: Plate, site: PlateHolder, **kwargs):
pass
@abstractmethod
@@ -50,3 +51,87 @@ async def start_shaking(self, frequency: float):
@abstractmethod
async def stop_shaking(self):
pass
+
+ """ Methods added for Liconic incubator options."""
+
+ @abstractmethod
+ async def get_set_temperature(self) -> float:
+ """Get the set value temperature of the incubator in degrees Celsius."""
+ pass
+
+ @abstractmethod
+ async def set_humidity(self, humidity: float):
+ """Set operation humidity of the incubator in % RH; e.g. 90.0% RH."""
+ pass
+
+ @abstractmethod
+ async def get_humidity(self) -> float:
+ """Get the current humidity of the incubator in % RH; e.g. 90.0% RH."""
+ pass
+
+ @abstractmethod
+ async def get_set_humidity(self) -> float:
+ """Get the set value humidity of the incubator in % RH; e.g. 90.0% RH."""
+ pass
+
+ @abstractmethod
+ async def set_co2_level(self, co2_level: float):
+ """Set operation CO2 level of the incubator in %; e.g. 5.0%."""
+ pass
+
+ @abstractmethod
+ async def get_co2_level(self) -> float:
+ """Get the current CO2 level of the incubator in %; e.g. 5.0%."""
+ pass
+
+ @abstractmethod
+ async def get_set_co2_level(self) -> float:
+ """Get the set value CO2 level of the incubator in %; e.g. 5.0%."""
+ pass
+
+ @abstractmethod
+ async def set_n2_level(self, n2_level: float):
+ """Set operation N2 level of the incubator in %; e.g. 90.0%."""
+ pass
+
+ @abstractmethod
+ async def get_n2_level(self) -> float:
+ """Get the current N2 level of the incubator in %; e.g. 90.0%."""
+ pass
+
+ @abstractmethod
+ async def get_set_n2_level(self) -> float:
+ """Get the set value N2 level of the incubator in %; e.g. 90.0%."""
+ pass
+
+ @abstractmethod
+ async def turn_swap_station(self, home: bool):
+ """Swap the incubator station to home or 180 degree position."""
+ pass
+
+ @abstractmethod
+ async def check_shovel_sensor(self) -> bool:
+ """Check if there is a plate on the shovel plate sensor."""
+ pass
+
+ @abstractmethod
+ async def check_transfer_sensor(self) -> bool:
+ """Check if there is a plate on the transfer sensor."""
+ pass
+
+ @abstractmethod
+ async def check_second_transfer_sensor(self) -> bool:
+ """Check 2nd transfer station plate sensor."""
+ pass
+
+ @abstractmethod
+ async def scan_barcode(self, site: PlateHolder) -> Barcode:
+ """Scan barcode at given position."""
+ pass
+
+ @abstractmethod
+ async def move_position_to_position(
+ self, plate: Plate, dest_site: PlateHolder, read_barcode: bool = False
+ ):
+ """Move plate to another position in the storage unit"""
+ pass
diff --git a/pylabrobot/storage/incubator.py b/pylabrobot/storage/incubator.py
index 6ed68482173..a54764893b6 100644
--- a/pylabrobot/storage/incubator.py
+++ b/pylabrobot/storage/incubator.py
@@ -73,13 +73,13 @@ def get_site_by_plate_name(self, plate_name: str) -> PlateHolder:
return site
raise ResourceNotFoundError(f"Plate {plate_name} not found in incubator '{self.name}'")
- async def fetch_plate_to_loading_tray(self, plate_name: str) -> Plate:
+ async def fetch_plate_to_loading_tray(self, plate_name: str, **backend_kwargs) -> Plate:
"""Fetch a plate from the incubator and put it on the loading tray."""
site = self.get_site_by_plate_name(plate_name)
plate = site.resource
assert plate is not None
- await self.backend.fetch_plate_to_loading_tray(plate)
+ await self.backend.fetch_plate_to_loading_tray(plate, **backend_kwargs)
plate.unassign()
self.loading_tray.assign_child_resource(plate)
return plate
@@ -112,7 +112,9 @@ def find_smallest_site_for_plate(self, plate: Plate) -> PlateHolder:
def find_random_site(self, plate: Plate) -> PlateHolder:
return random.choice(self._find_available_sites_sorted(plate))
- async def take_in_plate(self, site: Union[PlateHolder, Literal["random", "smallest"]]):
+ async def take_in_plate(
+ self, site: Union[PlateHolder, Literal["random", "smallest"]], **backend_kwargs
+ ):
"""Take a plate from the loading tray and put it in the incubator."""
plate = cast(Plate, self.loading_tray.resource)
@@ -128,7 +130,7 @@ async def take_in_plate(self, site: Union[PlateHolder, Literal["random", "smalle
raise ValueError(f"Site {site.name} is not available for plate {plate.name}")
else:
raise ValueError(f"Invalid site: {site}")
- await self.backend.take_in_plate(plate, site)
+ await self.backend.take_in_plate(plate, site, **backend_kwargs)
plate.unassign()
site.assign_child_resource(plate)
@@ -207,3 +209,74 @@ def deserialize(cls, data: dict, allow_marshal: bool = False):
category=data["category"],
model=data["model"],
)
+
+ async def scan_barcode(self, site: PlateHolder):
+ return await self.backend.scan_barcode(site)
+
+ async def get_set_temperature(self) -> float:
+ """Get the set value temperature of the incubator in degrees Celsius."""
+ return await self.backend.get_set_temperature()
+
+ async def set_humidity(self, humidity: float):
+ """Set the humidity of the incubator in percentage (%)."""
+ return await self.backend.set_humidity(humidity)
+
+ async def get_humidity(self) -> float:
+ """Get the humidity of the incubator in percentage (%)."""
+ return await self.backend.get_humidity()
+
+ async def get_set_humidity(self) -> float:
+ """Get the set value humidity of the incubator in percentage (%)."""
+ return await self.backend.get_set_humidity()
+
+ async def set_co2_level(self, co2_level: float):
+ """Set the CO2 level of the incubator in percentage (%)."""
+ return await self.backend.set_co2_level(co2_level)
+
+ async def get_co2_level(self) -> float:
+ """Get the CO2 level of the incubator in percentage (%)."""
+ return await self.backend.get_co2_level()
+
+ async def get_set_co2_level(self) -> float:
+ """Get the set value CO2 level of the incubator in percentage (%)."""
+ return await self.backend.get_set_co2_level()
+
+ async def set_n2_level(self, n2_level: float):
+ """Set the N2 level of the incubator in percentage (%)."""
+ return await self.backend.set_n2_level(n2_level)
+
+ async def get_n2_level(self) -> float:
+ """Get the N2 level of the incubator in percentage (%)."""
+ return await self.backend.get_n2_level()
+
+ async def get_set_n2_level(self) -> float:
+ """Get the set value N2 level of the incubator in percentage (%)."""
+ return await self.backend.get_set_n2_level()
+
+ async def turn_swap_station(self, home: bool):
+ """Turn the swap station of the incubator. If home is True, turn to home position."""
+ return await self.backend.turn_swap_station(home)
+
+ async def check_shovel_sensor(self) -> bool:
+ """Check if the shovel plate sensor is activated."""
+ return await self.backend.check_shovel_sensor()
+
+ async def check_transfer_sensor(self) -> bool:
+ """Check if the transfer plate sensor is activated."""
+ return await self.backend.check_transfer_sensor()
+
+ async def check_second_transfer_sensor(self) -> bool:
+ """Check if the second transfer plate sensor is activated."""
+ return await self.backend.check_second_transfer_sensor()
+
+ async def move_position_to_position(
+ self, plate_name: str, dest_site: PlateHolder, **backend_kwargs
+ ) -> Plate:
+ """Move a plate to another internal position in the storage unit."""
+ site = self.get_site_by_plate_name(plate_name)
+ plate = site.resource
+ assert plate is not None
+ await self.backend.move_position_to_position(plate, dest_site, **backend_kwargs)
+ plate.unassign()
+ dest_site.assign_child_resource(plate)
+ return plate
diff --git a/pylabrobot/storage/liconic/__init__.py b/pylabrobot/storage/liconic/__init__.py
new file mode 100644
index 00000000000..99b93f73f0c
--- /dev/null
+++ b/pylabrobot/storage/liconic/__init__.py
@@ -0,0 +1 @@
+from .liconic_backend import LiconicBackend
diff --git a/pylabrobot/storage/liconic/constants.py b/pylabrobot/storage/liconic/constants.py
new file mode 100644
index 00000000000..73617096d96
--- /dev/null
+++ b/pylabrobot/storage/liconic/constants.py
@@ -0,0 +1,178 @@
+from enum import Enum, IntEnum
+
+
+class LiconicType(Enum):
+ STX44_IC = "STX44_IC" # incubator
+ STX44_HC = "STX44_HC" # humid cooler
+ STX44_DC2 = "STX44_DC2" # dry storage
+ STX44_HR = "STX44_HR" # humid wide range
+ STX44_DR2 = "STX44_DR2" # dry wide range
+ STX44_AR = "STX44_AR" # humidity controlled
+ STX44_DF = "STX44_DF" # deep freezer
+ STX44_NC = "STX44_NC" # no climate
+ STX44_DH = "STX44_DH" # dry humid
+
+ STX110_IC = "STX110_IC" # incubator
+ STX110_HC = "STX110_HC" # humid cooler
+ STX110_DC2 = "STX110_DC2" # dry storage
+ STX110_HR = "STX110_HR" # humid wide range
+ STX110_DR2 = "STX110_DR2" # dry wide range
+ STX110_AR = "STX110_AR" # humidity controlled
+ STX110_DF = "STX110_DF" # deep freezer
+ STX110_NC = "STX110_NC" # no climate
+ STX110_DH = "STX110_DH" # dry humid
+
+ STX220_IC = "STX220_IC" # incubator
+ STX220_HC = "STX220_HC" # humid cooler
+ STX220_DC2 = "STX220_DC2" # dry storage
+ STX220_HR = "STX220_HR" # humid wide range
+ STX220_DR2 = "STX220_DR2" # dry wide range
+ STX220_AR = "STX220_AR" # humidity controlled
+ STX220_DF = "STX220_DF" # deep freezer
+ STX220_NC = "STX220_NC" # no climate
+ STX220_DH = "STX220_DH" # dry humid
+
+ STX280_IC = "STX280_IC" # incubator
+ STX280_HC = "STX280_HC" # humid cooler
+ STX280_DC2 = "STX280_DC2" # dry storage
+ STX280_HR = "STX280_HR" # humid wide range
+ STX280_DR2 = "STX280_DR2" # dry wide range
+ STX280_AR = "STX280_AR" # humidity controlled
+ STX280_DF = "STX280_DF" # deep freezer
+ STX280_NC = "STX280_NC" # no climate
+ STX280_DH = "STX44_DH" # dry humid
+
+ STX500_IC = "STX500_IC" # incubator
+ STX500_HC = "STX500_HC" # humid cooler
+ STX500_DC2 = "STX500_DC2" # dry storage
+ STX500_HR = "STX500_HR" # humid wide range
+ STX500_DR2 = "STX500_DR2" # dry wide range
+ STX500_AR = "STX500_AR" # humidity controlled
+ STX500_DF = "STX500_DF" # deep freezer
+ STX500_NC = "STX500_NC" # no climate
+ STX500_DH = "STX500_DH" # dry humid
+
+ STX1000_IC = "STX1000_IC" # incubator
+ STX1000_HC = "STX1000_HC" # humid cooler
+ STX1000_DC2 = "STX1000_DC2" # dry storage
+ STX1000_HR = "STX1000_HR" # humid wide range
+ STX1000_DR2 = "STX1000_DR2" # dry wide range
+ STX1000_AR = "STX1000_AR" # humidity controlled
+ STX1000_DF = "STX1000_DF" # deep freezer
+ STX1000_NC = "STX1000_NC" # no climate
+ STX1000_DH = "STX1000_DH" # dry humid
+
+
+class ControllerError(Enum):
+ RELAY_ERROR = "E0"
+ COMMAND_ERROR = "E1"
+ PROGRAM_ERROR = "E2"
+ HARDWARE_ERROR = "E3"
+ WRITE_PROTECTED_ERROR = "E4"
+ BASE_UNIT_ERROR = "E5"
+
+
+class HandlingError(Enum):
+ GENERAL_HANDLING_ERROR = "00001"
+ GATE_OPEN_ERROR = "00007"
+ GATE_CLOSE_ERROR = "00008"
+ GENERAL_LIFT_POSITIONING_ERROR = "00009"
+ USER_ACCESS_ERROR = "00010"
+ STACKER_SLOT_ERROR = "00011"
+ REMOTE_ACCESS_LEVEL_ERROR = "00012"
+ PLATE_TRANSFER_DETECTION_ERROR = "00013"
+ LIFT_INITIALIZATION_ERROR = "00014"
+ PLATE_ON_SHOVEL_DETECTION = "00015"
+ NO_PLATE_ON_SHOVEL_DETECTION = "00016"
+ NO_RECOVERY = "00017"
+
+ IMPORT_PLATE_STACKER_POSITIONING_ERROR = "00100"
+ IMPORT_PLATE_HANDLER_TRANSFER_TURN_OUT_ERROR = "00101"
+ IMPORT_PLATE_SHOVEL_TRANSFER_OUTER_ERROR = "00102"
+ IMPORT_PLATE_LIFT_TRANSFER_ERROR = "00103"
+ IMPORT_PLATE_SHOVEL_TRANSFER_INNER_ERROR = "00104"
+ IMPORT_PLATE_HANDLER_TRANSFER_TURN_IN_ERROR = "00105"
+ IMPORT_PLATE_LIFT_STACKER_TRAVEL_ERROR = "00106"
+ IMPORT_PLATE_SHOVEL_STACKER_FRONT_ERROR = "00107"
+ IMPORT_PLATE_LIFT_STACKER_PLACE_ERROR = "00108"
+ IMPORT_PLATE_SHOVEL_STACKER_INNER_ERROR = "00109"
+ IMPORT_PLATE_LIFT_TRAVEL_BACK_ERROR = "00110"
+ IMPORT_PLATE_LIFT_INIT_ERROR = "00111"
+
+ EXPORT_PLATE_LIFT_STACKER_TRAVEL_ERROR = "00200"
+ EXPORT_PLATE_SHOVEL_STACKER_FRONT_ERROR = "00201"
+ EXPORT_PLATE_LIFT_STACKER_IMPORT_ERROR = "00202"
+ EXPORT_PLATE_SHOVEL_STACKER_INNER_ERROR = "00203"
+ EXPORT_PLATE_LIFT_TRANSFER_POSITIONING_ERROR = "00204"
+ EXPORT_PLATE_HANDLER_TRANSFER_TURN_OUT_ERROR = "00205"
+ EXPORT_PLATE_SHOVEL_TRANSFER_OUTER_ERROR = "00206"
+ EXPORT_PLATE_LIFT_TRANSFER_PLACE_ERROR = "00207"
+ EXPORT_PLATE_SHOVEL_TRANSFER_INNER_ERROR = "00208"
+ EXPORT_PLATE_HANDLER_TRANSFER_TURN_IN_ERROR = "00209"
+ EXPORT_PLATE_LIFT_TRAVEL_BACK_ERROR = "00210"
+ EXPORT_PLATE_LIFT_INITIALIZING_ERROR = "00211"
+
+ PLATE_REMOVE_GENERAL_HANDLING_ERROR = "00301"
+ PLATE_REMOVE_GATE_OPEN_ERROR = "00307"
+ PLATE_REMOVE_GATE_CLOSE_ERROR = "00308"
+ PLATE_REMOVE_GENERAL_LIFT_POSITIONING_ERROR = "00309"
+ PLATE_REMOVE_USER_ACCESS_ERROR = "00310"
+ PLATE_REMOVE_STACKER_SLOT_ERROR = "00311"
+ PLATE_REMOVE_REMOTE_ACCESS_LEVEL_ERROR = "00312"
+ PLATE_REMOVE_PLATE_TRANSFER_DETECTION_ERROR = "00313"
+ PLATE_REMOVE_LIFT_INITIALIZATION_ERROR = "00314"
+ PLATE_REMOVE_PLATE_ON_SHOVEL_DETECTION = "00315"
+ PLATE_REMOVE_NO_PLATE_ON_SHOVEL_DETECTION = "00316"
+ PLATE_REMOVE_NO_RECOVERY = "00317"
+
+ BARCODE_READ_GENERAL_HANDLING_ERROR = "00401"
+ BARCODE_READ_GATE_OPEN_ERROR = "00407"
+ BARCODE_READ_GATE_CLOSE_ERROR = "00408"
+ BARCODE_READ_GENERAL_LIFT_POSITIONING_ERROR = "00409"
+ BARCODE_READ_USER_ACCESS_ERROR = "00410"
+ BARCODE_READ_STACKER_SLOT_ERROR = "00411"
+ BARCODE_READ_REMOTE_ACCESS_LEVEL_ERROR = "00412"
+ BARCODE_READ_PLATE_TRANSFER_DETECTION_ERROR = "00413"
+ BARCODE_READ_LIFT_INITIALIZATION_ERROR = "00414"
+ BARCODE_READ_PLATE_ON_SHOVEL_DETECTION = "00415"
+ BARCODE_READ_NO_PLATE_ON_SHOVEL_DETECTION = "00416"
+ BARCODE_READ_NO_RECOVERY = "00417"
+
+ PLATE_PLACE_GENERAL_HANDLING_ERROR = "00501"
+ PLATE_PLACE_GATE_OPEN_ERROR = "00507"
+ PLATE_PLACE_GATE_CLOSE_ERROR = "00508"
+ PLATE_PLACE_GENERAL_LIFT_POSITIONING_ERROR = "00509"
+ PLATE_PLACE_USER_ACCESS_ERROR = "00510"
+ PLATE_PLACE_STACKER_SLOT_ERROR = "00511"
+ PLATE_PLACE_REMOTE_ACCESS_LEVEL_ERROR = "00512"
+ PLATE_PLACE_PLATE_TRANSFER_DETECTION_ERROR = "00513"
+ PLATE_PLACE_LIFT_INITIALIZATION_ERROR = "00514"
+ PLATE_PLACE_PLATE_ON_SHOVEL_DETECTION = "00515"
+ PLATE_PLACE_NO_PLATE_ON_SHOVEL_DETECTION = "00516"
+ PLATE_PLACE_NO_RECOVERY = "00517"
+
+ PLATE_SET_GENERAL_HANDLING_ERROR = "000601"
+ PLATE_SET_GATE_OPEN_ERROR = "00607"
+ PLATE_SET_GATE_CLOSE_ERROR = "00608"
+ PLATE_SET_GENERAL_LIFT_POSITIONING_ERROR = "00609"
+ PLATE_SET_USER_ACCESS_ERROR = "00610"
+ PLATE_SET_STACKER_SLOT_ERROR = "00611"
+ PLATE_SET_REMOTE_ACCESS_LEVEL_ERROR = "00612"
+ PLATE_SET_PLATE_TRANSFER_DETECTION_ERROR = "00613"
+ PLATE_SET_LIFT_INITIALIZATION_ERROR = "00614"
+ PLATE_SET_PLATE_ON_SHOVEL_DETECTION = "00615"
+ PLATE_SET_NO_PLATE_ON_SHOVEL_DETECTION = "00616"
+ PLATE_SET_NO_RECOVERY = "00617"
+
+ PLATE_GET_GENERAL_HANDLING_ERROR = "00701"
+ PLATE_GET_GATE_OPEN_ERROR = "00707"
+ PLATE_GET_GATE_CLOSE_ERROR = "00708"
+ PLATE_GET_GENERAL_LIFT_POSITIONING_ERROR = "00709"
+ PLATE_GET_USER_ACCESS_ERROR = "00710"
+ PLATE_GET_STACKER_SLOT_ERROR = "00711"
+ PLATE_GET_REMOTE_ACCESS_LEVEL_ERROR = "00712"
+ PLATE_GET_PLATE_TRANSFER_DETECTION_ERROR = "00713"
+ PLATE_GET_LIFT_INITIALIZATION_ERROR = "00714"
+ PLATE_GET_PLATE_ON_SHOVEL_DETECTION = "00715"
+ PLATE_GET_NO_PLATE_ON_SHOVEL_DETECTION = "00716"
+ PLATE_GET_NO_RECOVERY = "00717"
diff --git a/pylabrobot/storage/liconic/errors.py b/pylabrobot/storage/liconic/errors.py
new file mode 100644
index 00000000000..03e65894d66
--- /dev/null
+++ b/pylabrobot/storage/liconic/errors.py
@@ -0,0 +1,365 @@
+from typing import Dict
+
+from pylabrobot.storage.liconic.constants import ControllerError, HandlingError
+
+
+class LiconicControllerRelayError(Exception):
+ pass
+
+
+class LiconicControllerCommandError(Exception):
+ pass
+
+
+class LiconicControllerProgramError(Exception):
+ pass
+
+
+class LiconicControllerHardwareError(Exception):
+ pass
+
+
+class LiconicControllerWriteProtectedError(Exception):
+ pass
+
+
+class LiconicControllerBaseUnitError(Exception):
+ pass
+
+
+controller_error_map: Dict[ControllerError, Exception] = {
+ ControllerError.RELAY_ERROR: LiconicControllerRelayError(
+ "Controller system error. Undefined timer, counter, data memory, check if requested unit is valid"
+ ),
+ ControllerError.COMMAND_ERROR: LiconicControllerCommandError(
+ "Controller system error. Invalid command, check if communication is opened by CR, check command sent to controller, check for interruptions during string transmission"
+ ),
+ ControllerError.PROGRAM_ERROR: LiconicControllerProgramError(
+ "Controller system error. Firmware lost, reprogram controller"
+ ),
+ ControllerError.HARDWARE_ERROR: LiconicControllerHardwareError(
+ "Controller hardware error, turn controller ON/OFF, controller is faulty has to be replaced"
+ ),
+ ControllerError.WRITE_PROTECTED_ERROR: LiconicControllerWriteProtectedError(
+ "Controller system error. Unauthorized Access"
+ ),
+ ControllerError.BASE_UNIT_ERROR: LiconicControllerBaseUnitError(
+ "Controller system error. Unauthorized Access"
+ ),
+}
+
+
+class LiconicHandlerPlateRemoveError(Exception):
+ pass
+
+
+class LiconicHandlerBarcodeReadError(Exception):
+ pass
+
+
+class LiconicHandlerPlatePlaceError(Exception):
+ pass
+
+
+class LiconicHandlerPlateSetError(Exception):
+ pass
+
+
+class LiconicHandlerPlateGetError(Exception):
+ pass
+
+
+class LiconicHandlerImportPlateError(Exception):
+ pass
+
+
+class LiconicHandlerExportPlateError(Exception):
+ pass
+
+
+class LiconicHandlerGeneralError(Exception):
+ pass
+
+
+handler_error_map: Dict[HandlingError, Exception] = {
+ HandlingError.GENERAL_HANDLING_ERROR: LiconicHandlerGeneralError(
+ "Handling action could not be performed in time"
+ ),
+ HandlingError.GATE_OPEN_ERROR: LiconicHandlerGeneralError(
+ "Gate could not reach upper position or Gate did not reach upper position in time"
+ ),
+ HandlingError.GATE_CLOSE_ERROR: LiconicHandlerGeneralError(
+ "Gate could not reach lower position or Gate did not reach lower position in time"
+ ),
+ HandlingError.GENERAL_LIFT_POSITIONING_ERROR: LiconicHandlerGeneralError(
+ "Handler-Lift could not reach desired level position or does not move"
+ ),
+ HandlingError.USER_ACCESS_ERROR: LiconicHandlerGeneralError(
+ "Unauthorized user access in combination with manual rotation of carrousel"
+ ),
+ HandlingError.STACKER_SLOT_ERROR: LiconicHandlerGeneralError("Stacker slot cannot be reached "),
+ HandlingError.REMOTE_ACCESS_LEVEL_ERROR: LiconicHandlerGeneralError(
+ "Undefined stacker level has been requested"
+ ),
+ HandlingError.PLATE_TRANSFER_DETECTION_ERROR: LiconicHandlerGeneralError(
+ "Export operation while plate is on transfer station"
+ ),
+ HandlingError.LIFT_INITIALIZATION_ERROR: LiconicHandlerGeneralError(
+ "Lift could not be initialized "
+ ),
+ HandlingError.PLATE_ON_SHOVEL_DETECTION: LiconicHandlerGeneralError(
+ "Trying to load a plate, when a plate is already on the shovel"
+ ),
+ HandlingError.NO_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerGeneralError(
+ "Trying to remove or place plate with no plate on the shovel"
+ ),
+ HandlingError.NO_RECOVERY: LiconicHandlerGeneralError("Recovery was not possible "),
+ HandlingError.IMPORT_PLATE_STACKER_POSITIONING_ERROR: LiconicHandlerImportPlateError(
+ "Carrousel could not reach desired radial position during Import Plate procedure or Lift could not reach transfer level during Import Plate procedure."
+ ),
+ HandlingError.IMPORT_PLATE_HANDLER_TRANSFER_TURN_OUT_ERROR: LiconicHandlerImportPlateError(
+ "Handler could not reach outer turn position at transfer level during Import Plate procedure."
+ ),
+ HandlingError.IMPORT_PLATE_SHOVEL_TRANSFER_OUTER_ERROR: LiconicHandlerImportPlateError(
+ "Shovel could not reach outer position at transfer level during Import Plate procedure."
+ ),
+ HandlingError.IMPORT_PLATE_LIFT_TRANSFER_ERROR: LiconicHandlerImportPlateError(
+ "Lift did not reach upper pick position at transfer level during Import Plate procedure."
+ ),
+ HandlingError.IMPORT_PLATE_SHOVEL_TRANSFER_INNER_ERROR: LiconicHandlerImportPlateError(
+ "Shovel could not reach inner position at transfer level during Import Plate procedure."
+ ),
+ HandlingError.IMPORT_PLATE_HANDLER_TRANSFER_TURN_IN_ERROR: LiconicHandlerImportPlateError(
+ "Handler could not reach inner turn position at transfer level during Import Plate procedure."
+ ),
+ HandlingError.IMPORT_PLATE_LIFT_STACKER_TRAVEL_ERROR: LiconicHandlerImportPlateError(
+ "Lift could not reach desired stacker level during Import Plate procedure."
+ ),
+ HandlingError.IMPORT_PLATE_SHOVEL_STACKER_FRONT_ERROR: LiconicHandlerImportPlateError(
+ "Shovel could not reach front position on stacker access during Plate Import procedure."
+ ),
+ HandlingError.IMPORT_PLATE_LIFT_STACKER_PLACE_ERROR: LiconicHandlerImportPlateError(
+ "Lift could not reach stacker place level during Import Plate procedure."
+ ),
+ HandlingError.IMPORT_PLATE_SHOVEL_STACKER_INNER_ERROR: LiconicHandlerImportPlateError(
+ "Shovel could not reach inner position at stacker plate placement during Import Plate procedure."
+ ),
+ HandlingError.IMPORT_PLATE_LIFT_TRAVEL_BACK_ERROR: LiconicHandlerImportPlateError(
+ "Lift could not reach zero level during Import Plate procedure."
+ ),
+ HandlingError.IMPORT_PLATE_LIFT_INIT_ERROR: LiconicHandlerImportPlateError(
+ "Lift could not be initialized after Import Plate procedure."
+ ),
+ HandlingError.EXPORT_PLATE_LIFT_STACKER_TRAVEL_ERROR: LiconicHandlerExportPlateError(
+ "Carrousel could not reach desired radial position during Export Plate procedure or Lift could not reach desired stacker level during Export Plate procedure."
+ ),
+ HandlingError.EXPORT_PLATE_SHOVEL_STACKER_FRONT_ERROR: LiconicHandlerExportPlateError(
+ "Shovel could not reach front position on stacker access during Plate Export procedure."
+ ),
+ HandlingError.EXPORT_PLATE_LIFT_STACKER_IMPORT_ERROR: LiconicHandlerExportPlateError(
+ "Lift could not reach stacker pick level during Export Plate procedure."
+ ),
+ HandlingError.EXPORT_PLATE_SHOVEL_STACKER_INNER_ERROR: LiconicHandlerExportPlateError(
+ "Shovel could not reach inner position at stacker plate pick during Export Plate procedure."
+ ),
+ HandlingError.EXPORT_PLATE_LIFT_TRANSFER_POSITIONING_ERROR: LiconicHandlerExportPlateError(
+ "Lift could not reach transfer level during Export Plate procedure."
+ ),
+ HandlingError.EXPORT_PLATE_HANDLER_TRANSFER_TURN_OUT_ERROR: LiconicHandlerExportPlateError(
+ "Handler could not reach outer turn position at transfer level during Export Plate procedure."
+ ),
+ HandlingError.EXPORT_PLATE_SHOVEL_TRANSFER_OUTER_ERROR: LiconicHandlerExportPlateError(
+ "Shovel could not reach outer position at transfer level during Export Plate procedure."
+ ),
+ HandlingError.EXPORT_PLATE_LIFT_TRANSFER_PLACE_ERROR: LiconicHandlerExportPlateError(
+ "Lift did not reach lower place position at transfer level during Export Plate procedure."
+ ),
+ HandlingError.EXPORT_PLATE_SHOVEL_TRANSFER_INNER_ERROR: LiconicHandlerExportPlateError(
+ "Shovel could not reach inner position at transfer level during Export Plate procedure."
+ ),
+ HandlingError.EXPORT_PLATE_HANDLER_TRANSFER_TURN_IN_ERROR: LiconicHandlerExportPlateError(
+ "Handler could not reach inner turn position at transfer level during Export Plate procedure"
+ ),
+ HandlingError.EXPORT_PLATE_LIFT_TRAVEL_BACK_ERROR: LiconicHandlerExportPlateError(
+ "Lift could not reach Zero position during Export Plate procedure."
+ ),
+ HandlingError.EXPORT_PLATE_LIFT_INITIALIZING_ERROR: LiconicHandlerExportPlateError(
+ "Lift could not be initialized after Export Plate procedure."
+ ),
+ HandlingError.PLATE_REMOVE_GENERAL_HANDLING_ERROR: LiconicHandlerPlateRemoveError(
+ "Handling action could not be performed in time."
+ ),
+ HandlingError.PLATE_REMOVE_GATE_OPEN_ERROR: LiconicHandlerPlateRemoveError(
+ "Gate could not reach upper position or Gate did not reach upper position in time"
+ ),
+ HandlingError.PLATE_REMOVE_GATE_CLOSE_ERROR: LiconicHandlerPlateRemoveError(
+ "Gate could not reach lower position or Gate did not reach lower position in time"
+ ),
+ HandlingError.PLATE_REMOVE_GENERAL_LIFT_POSITIONING_ERROR: LiconicHandlerPlateRemoveError(
+ "Handler-Lift could not reach desired level position or does not move"
+ ),
+ HandlingError.PLATE_REMOVE_USER_ACCESS_ERROR: LiconicHandlerPlateRemoveError(
+ "Unauthorized user access in combination with manual rotation of carrousel"
+ ),
+ HandlingError.PLATE_REMOVE_STACKER_SLOT_ERROR: LiconicHandlerPlateRemoveError(
+ "Stacker slot cannot be reached"
+ ),
+ HandlingError.PLATE_REMOVE_REMOTE_ACCESS_LEVEL_ERROR: LiconicHandlerPlateRemoveError(
+ "Undefined stacker level has been requested"
+ ),
+ HandlingError.PLATE_REMOVE_PLATE_TRANSFER_DETECTION_ERROR: LiconicHandlerPlateRemoveError(
+ "Export operation while plate is on transfer station"
+ ),
+ HandlingError.PLATE_REMOVE_LIFT_INITIALIZATION_ERROR: LiconicHandlerPlateRemoveError(
+ "Lift could not be initialized"
+ ),
+ HandlingError.PLATE_REMOVE_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerPlateRemoveError(
+ "Trying to load a plate, when a plate is already on the shovel"
+ ),
+ HandlingError.PLATE_REMOVE_NO_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerPlateRemoveError(
+ "Trying to remove or place plate with no plate on the shovel"
+ ),
+ HandlingError.PLATE_REMOVE_NO_RECOVERY: LiconicHandlerPlateRemoveError(
+ "Recovery was not possible"
+ ),
+ HandlingError.BARCODE_READ_GENERAL_HANDLING_ERROR: LiconicHandlerBarcodeReadError(
+ "Handling action could not be performed in time."
+ ),
+ HandlingError.BARCODE_READ_GATE_OPEN_ERROR: LiconicHandlerBarcodeReadError(
+ "Gate could not reach upper position or Gate did not reach upper position in time"
+ ),
+ HandlingError.BARCODE_READ_GATE_CLOSE_ERROR: LiconicHandlerBarcodeReadError(
+ "Gate could not reach lower position or Gate did not reach lower position in time"
+ ),
+ HandlingError.BARCODE_READ_GENERAL_LIFT_POSITIONING_ERROR: LiconicHandlerBarcodeReadError(
+ "Handler-Lift could not reach desired level position or does not move"
+ ),
+ HandlingError.BARCODE_READ_USER_ACCESS_ERROR: LiconicHandlerBarcodeReadError(
+ "Unauthorized user access in combination with manual rotation of carrousel"
+ ),
+ HandlingError.BARCODE_READ_STACKER_SLOT_ERROR: LiconicHandlerBarcodeReadError(
+ "Stacker slot cannot be reached"
+ ),
+ HandlingError.BARCODE_READ_REMOTE_ACCESS_LEVEL_ERROR: LiconicHandlerBarcodeReadError(
+ "Undefined stacker level has been requested"
+ ),
+ HandlingError.BARCODE_READ_PLATE_TRANSFER_DETECTION_ERROR: LiconicHandlerBarcodeReadError(
+ "Export operation while plate is on transfer station"
+ ),
+ HandlingError.BARCODE_READ_LIFT_INITIALIZATION_ERROR: LiconicHandlerBarcodeReadError(
+ "Lift could not be initialized"
+ ),
+ HandlingError.BARCODE_READ_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerBarcodeReadError(
+ "Trying to load a plate, when a plate is already on the shovel"
+ ),
+ HandlingError.BARCODE_READ_NO_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerBarcodeReadError(
+ "Trying to remove or place plate with no plate on the shovel"
+ ),
+ HandlingError.BARCODE_READ_NO_RECOVERY: LiconicHandlerBarcodeReadError(
+ "Recovery was not possible"
+ ),
+ HandlingError.PLATE_PLACE_GENERAL_HANDLING_ERROR: LiconicHandlerPlatePlaceError(
+ "Handling action could not be performed in time."
+ ),
+ HandlingError.PLATE_PLACE_GATE_OPEN_ERROR: LiconicHandlerPlatePlaceError(
+ "Gate could not reach upper position or Gate did not reach upper position in time"
+ ),
+ HandlingError.PLATE_PLACE_GATE_CLOSE_ERROR: LiconicHandlerPlatePlaceError(
+ "Gate could not reach lower position or Gate did not reach lower position in time"
+ ),
+ HandlingError.PLATE_PLACE_GENERAL_LIFT_POSITIONING_ERROR: LiconicHandlerPlatePlaceError(
+ "Handler-Lift could not reach desired level position or does not move"
+ ),
+ HandlingError.PLATE_PLACE_USER_ACCESS_ERROR: LiconicHandlerPlatePlaceError(
+ "Unauthorized user access in combination with manual rotation of carrousel"
+ ),
+ HandlingError.PLATE_PLACE_STACKER_SLOT_ERROR: LiconicHandlerPlatePlaceError(
+ "Stacker slot cannot be reached"
+ ),
+ HandlingError.PLATE_PLACE_REMOTE_ACCESS_LEVEL_ERROR: LiconicHandlerPlatePlaceError(
+ "Undefined stacker level has been requested"
+ ),
+ HandlingError.PLATE_PLACE_PLATE_TRANSFER_DETECTION_ERROR: LiconicHandlerPlatePlaceError(
+ "Export operation while plate is on transfer station"
+ ),
+ HandlingError.PLATE_PLACE_LIFT_INITIALIZATION_ERROR: LiconicHandlerPlatePlaceError(
+ "Lift could not be initialized"
+ ),
+ HandlingError.PLATE_PLACE_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerPlatePlaceError(
+ "Trying to load a plate, when a plate is already on the shovel"
+ ),
+ HandlingError.PLATE_PLACE_NO_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerPlatePlaceError(
+ "Trying to remove or place plate with no plate on the shovel"
+ ),
+ HandlingError.PLATE_PLACE_NO_RECOVERY: LiconicHandlerPlatePlaceError("Recovery was not possible"),
+ HandlingError.PLATE_SET_GENERAL_HANDLING_ERROR: LiconicHandlerPlateSetError(
+ "Handling action could not be performed in time."
+ ),
+ HandlingError.PLATE_SET_GATE_OPEN_ERROR: LiconicHandlerPlateSetError(
+ "Gate could not reach upper position or Gate did not reach upper position in time"
+ ),
+ HandlingError.PLATE_SET_GATE_CLOSE_ERROR: LiconicHandlerPlateSetError(
+ "Gate could not reach lower position or Gate did not reach lower position in time"
+ ),
+ HandlingError.PLATE_SET_GENERAL_LIFT_POSITIONING_ERROR: LiconicHandlerPlateSetError(
+ "Handler-Lift could not reach desired level position or does not move"
+ ),
+ HandlingError.PLATE_SET_USER_ACCESS_ERROR: LiconicHandlerPlateSetError(
+ "Unauthorized user access in combination with manual rotation of carrousel"
+ ),
+ HandlingError.PLATE_SET_STACKER_SLOT_ERROR: LiconicHandlerPlateSetError(
+ "Stacker slot cannot be reached"
+ ),
+ HandlingError.PLATE_SET_REMOTE_ACCESS_LEVEL_ERROR: LiconicHandlerPlateSetError(
+ "Undefined stacker level has been requested"
+ ),
+ HandlingError.PLATE_SET_PLATE_TRANSFER_DETECTION_ERROR: LiconicHandlerPlateSetError(
+ "Export operation while plate is on transfer station"
+ ),
+ HandlingError.PLATE_SET_LIFT_INITIALIZATION_ERROR: LiconicHandlerPlateSetError(
+ "Lift could not be initialized"
+ ),
+ HandlingError.PLATE_SET_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerPlateSetError(
+ "Trying to load a plate, when a plate is already on the shovel"
+ ),
+ HandlingError.PLATE_SET_NO_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerPlateSetError(
+ "Trying to remove or place plate with no plate on the shovel"
+ ),
+ HandlingError.PLATE_SET_NO_RECOVERY: LiconicHandlerPlateSetError("Recovery was not possible"),
+ HandlingError.PLATE_GET_GENERAL_HANDLING_ERROR: LiconicHandlerPlateGetError(
+ "Handling action could not be performed in time."
+ ),
+ HandlingError.PLATE_GET_GATE_OPEN_ERROR: LiconicHandlerPlateGetError(
+ "Gate could not reach upper position or Gate did not reach upper position in time"
+ ),
+ HandlingError.PLATE_GET_GATE_CLOSE_ERROR: LiconicHandlerPlateGetError(
+ "Gate could not reach lower position or Gate did not reach lower position in time"
+ ),
+ HandlingError.PLATE_GET_GENERAL_LIFT_POSITIONING_ERROR: LiconicHandlerPlateGetError(
+ "Handler-Lift could not reach desired level position or does not move"
+ ),
+ HandlingError.PLATE_GET_USER_ACCESS_ERROR: LiconicHandlerPlateGetError(
+ "Unauthorized user access in combination with manual rotation of carrousel"
+ ),
+ HandlingError.PLATE_GET_STACKER_SLOT_ERROR: LiconicHandlerPlateGetError(
+ "Stacker slot cannot be reached"
+ ),
+ HandlingError.PLATE_GET_REMOTE_ACCESS_LEVEL_ERROR: LiconicHandlerPlateGetError(
+ "Undefined stacker level has been requested"
+ ),
+ HandlingError.PLATE_GET_PLATE_TRANSFER_DETECTION_ERROR: LiconicHandlerPlateGetError(
+ "Export operation while plate is on transfer station"
+ ),
+ HandlingError.PLATE_GET_LIFT_INITIALIZATION_ERROR: LiconicHandlerPlateGetError(
+ "Lift could not be initialized"
+ ),
+ HandlingError.PLATE_GET_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerPlateGetError(
+ "Trying to load a plate, when a plate is already on the shovel"
+ ),
+ HandlingError.PLATE_GET_NO_PLATE_ON_SHOVEL_DETECTION: LiconicHandlerPlateGetError(
+ "Trying to remove or place plate with no plate on the shovel"
+ ),
+ HandlingError.PLATE_GET_NO_RECOVERY: LiconicHandlerPlateGetError(
+ "Recovery was not possible during get plate"
+ ),
+}
diff --git a/pylabrobot/storage/liconic/liconic_backend.py b/pylabrobot/storage/liconic/liconic_backend.py
new file mode 100644
index 00000000000..16269cb9458
--- /dev/null
+++ b/pylabrobot/storage/liconic/liconic_backend.py
@@ -0,0 +1,559 @@
+import asyncio
+import logging
+import re
+import time
+import warnings
+from typing import List, Optional, Tuple, Union
+
+import serial
+
+from pylabrobot.barcode_scanners import BarcodeScanner
+from pylabrobot.io.serial import Serial
+from pylabrobot.resources import Plate, PlateHolder
+from pylabrobot.resources.barcode import Barcode
+from pylabrobot.resources.carrier import PlateCarrier
+from pylabrobot.storage.backend import IncubatorBackend
+from pylabrobot.storage.liconic.constants import ControllerError, HandlingError, LiconicType
+from pylabrobot.storage.liconic.errors import controller_error_map, handler_error_map
+
+logger = logging.getLogger(__name__)
+
+# Mapping site_height to motor steps for Liconic cassettes
+LICONIC_SITE_HEIGHT_TO_STEPS = {
+ 5: 377, # pitch=11, site_height=5
+ 11: 582, # pitch=17, site_height=11
+ 12: 617, # pitch=18, site_height=12
+ 17: 788, # pitch=23, site_height=17
+ 22: 959, # pitch=28, site_height=22
+ 23: 994, # pitch=29, site_height=23
+ 24: 1028, # pitch=30, site_height=24
+ 27: 1131, # pitch=33, site_height=27
+ 44: 1713, # pitch=50, site_height=44
+ 53: 2021, # pitch=59, site_height=53
+ 66: 2467, # pitch=72, site_height=66
+ 104: 3563, # pitch=110, site_height=104
+}
+
+
+class LiconicBackend(IncubatorBackend):
+ """Backend for Liconic incubators.
+
+ Optionally accepts a BarcodeScanner instance for internal barcode reading.
+ """
+
+ default_baud = 9600
+ serial_message_encoding = "ascii"
+ init_timeout = 1.0
+ start_timeout = 15.0
+ poll_interval = 0.2
+
+ def __init__(
+ self,
+ model: Union[LiconicType, str],
+ port: str,
+ barcode_scanner: Optional[BarcodeScanner] = None,
+ ):
+ super().__init__()
+
+ self.barcode_scanner = barcode_scanner
+
+ if isinstance(model, str):
+ try:
+ model = LiconicType(model)
+ except ValueError:
+ raise ValueError(f"Unsupported Liconic model: '{model}")
+
+ self.model = model
+ self._racks: List[PlateCarrier] = []
+
+ self.io_plc = Serial(
+ port=port,
+ baudrate=self.default_baud,
+ bytesize=serial.EIGHTBITS,
+ parity=serial.PARITY_EVEN,
+ stopbits=serial.STOPBITS_ONE,
+ write_timeout=1,
+ timeout=1,
+ rtscts=True,
+ )
+
+ self.co2_installed: Optional[bool] = None
+ self.n2_installed: Optional[bool] = None
+
+ # Function to setup serial connection with Liconic PLC
+ async def setup(self):
+ """
+ 1. Open serial port (9600 8E1, RTS/CTS) via the Serial wrapper.
+ 2. Send >200 ms break, wait 150 ms, flush buffers.
+ 3. Handshake: CR → wait for CC
+ 4. Activate handling: ST 1801 → expect OK
+ 5. Poll ready-flag: RD 1915 → wait for "1"
+ """
+ try:
+ await self.io_plc.setup()
+ except serial.SerialException as e:
+ raise RuntimeError(f"Could not open {self.io_plc.port}: {e}")
+
+ await self.io_plc.send_break(duration=0.2) # >100 ms required
+ await asyncio.sleep(0.15)
+ await self.io_plc.reset_input_buffer()
+ await self.io_plc.reset_output_buffer()
+
+ await self.io_plc.write(b"CR\r")
+ deadline = time.time() + self.init_timeout
+ while time.time() < deadline:
+ resp = await self.io_plc.readline() # reads through LF
+ if resp.strip() == b"CC":
+ break
+ else:
+ await self.io_plc.stop()
+ raise TimeoutError(f"No CC response from Liconic PLC within {self.init_timeout} seconds")
+
+ await self.io_plc.write(b"ST 1801\r")
+ resp = await self.io_plc.readline()
+ if resp.strip() != b"OK":
+ await self.io_plc.stop()
+ raise RuntimeError(f"Unexpected reply to ST 1801: {resp!r}")
+
+ deadline = time.time() + self.start_timeout
+ while time.time() < deadline:
+ await self.io_plc.write(b"RD 1915\r")
+ flag = await self.io_plc.readline()
+ if flag.strip() == b"1":
+ break
+ await asyncio.sleep(self.poll_interval)
+ else:
+ await self.io_plc.stop()
+ raise TimeoutError(f"PLC did not signal ready within {self.start_timeout} seconds")
+
+ def _site_to_m_n(self, site: PlateHolder) -> Tuple[int, int]:
+ rack = site.parent
+ assert isinstance(rack, PlateCarrier), "Site not in rack"
+ assert self._racks is not None, "Racks not set"
+ rack_idx = self._racks.index(rack) + 1 # plr is 0-indexed, liconic is 1-indexed
+ site_idx = next(idx for idx, s in rack.sites.items() if s == site) + 1 # 1-indexed
+ return rack_idx, site_idx
+
+ # Wrote this function to return motor step size and plate position number from PlateCarrier model name
+ def _carrier_to_steps_pos(self, site: PlateHolder) -> Tuple[int, int]:
+ rack = site.parent
+ assert isinstance(rack, PlateCarrier), "Site not in rack"
+ assert self._racks is not None, "Racks not set"
+ if not rack.model.startswith("liconic"):
+ raise ValueError(f"The plate carrier used: {rack.model} is not compatible with the Liconic")
+ match = re.search(r"_(\d+)mm", rack.model)
+ if match:
+ site_height = int(match.group(1))
+ site_num = int(rack.model.split("_")[-1])
+ return LICONIC_SITE_HEIGHT_TO_STEPS.get(site_height), site_num
+ raise ValueError(
+ f"Could not parse site height and pos num from PlateCarrier model: {rack.model}"
+ )
+
+ async def stop(self):
+ await self.io_plc.stop()
+
+ async def set_racks(self, racks: List[PlateCarrier]):
+ await super().set_racks(racks)
+ warnings.warn("Liconic racks need to be configured manually on each setup")
+
+ async def initialize(self):
+ await self._send_command_plc("ST 1900")
+ await self._send_command_plc("ST 1801")
+ await self._wait_ready()
+
+ async def open_door(self):
+ await self._send_command_plc("ST 1901")
+ await self._wait_ready()
+
+ async def close_door(self):
+ await self._send_command_plc("ST 1902")
+ await self._wait_ready()
+
+ async def fetch_plate_to_loading_tray(self, plate: Plate, read_barcode: bool = False):
+ """Fetch a plate from the incubator to the loading tray."""
+ site = plate.parent
+ assert isinstance(site, PlateHolder), "Plate not in storage"
+
+ m, n = self._site_to_m_n(site)
+ step_size, pos_num = self._carrier_to_steps_pos(site)
+
+ await self._send_command_plc(f"WR DM0 {m}") # carousel number
+ await self._send_command_plc(f"WR DM23 {step_size}") # motor step size
+ await self._send_command_plc(f"WR DM25 {pos_num}") # number of positions in cassette
+ await self._send_command_plc(f"WR DM5 {n}") # plate position in carousel
+
+ if read_barcode:
+ plate.barcode = await self.read_barcode_inline(m, n)
+
+ await self._send_command_plc("ST 1905") # plate to transfer station
+ await self._wait_ready()
+ await self._send_command_plc("ST 1903") # terminate access
+
+ async def take_in_plate(self, plate: Plate, site: PlateHolder, read_barcode: bool = False):
+ """Take in a plate from the loading tray to the incubator."""
+ m, n = self._site_to_m_n(site)
+ step_size, pos_num = self._carrier_to_steps_pos(site)
+
+ await self._send_command_plc(f"WR DM0 {m}") # carousel number
+ await self._send_command_plc(f"WR DM23 {step_size}") # motor step size
+ await self._send_command_plc(f"WR DM25 {pos_num}") # number of positions in cassette
+ await self._send_command_plc(f"WR DM5 {n}") # plate position in cassette
+ await self._send_command_plc("ST 1904") # plate from transfer station
+ await self._wait_ready()
+
+ if read_barcode:
+ plate.barcode = await self.read_barcode_inline(m, n)
+
+ await self._send_command_plc("ST 1903") # terminate access
+
+ async def move_position_to_position(
+ self, plate: Plate, dest_site: PlateHolder, read_barcode: bool = False
+ ):
+ """Move plate from one internal position to another"""
+ orig_site = plate.parent
+ assert isinstance(orig_site, PlateHolder)
+ assert isinstance(dest_site, PlateHolder)
+
+ if dest_site.resource is not None:
+ raise RuntimeError(f"Position {dest_site} already has a plate assigned!")
+
+ orig_m, orig_n = self._site_to_m_n(orig_site) # origin cassette # and plate position #
+ dest_m, dest_n = self._site_to_m_n(dest_site) # destination cassette # and plate position #
+
+ await self._send_command_plc(f"WR DM0 {orig_m}") # origin cassette #
+ orig_step_size, orig_pos_num = self._carrier_to_steps_pos(orig_site)
+ dest_step_size, dest_pos_num = self._carrier_to_steps_pos(dest_site)
+
+ await self._send_command_plc(f"WR DM0 {orig_m}") # carousel number
+ await self._send_command_plc(f"WR DM23 {orig_step_size}") # motor step size
+ await self._send_command_plc(f"WR DM25 {orig_pos_num}") # number of positions in cassette
+ await self._send_command_plc(f"WR DM5 {orig_n}") # origin plate position #
+
+ if read_barcode:
+ plate.barcode = await self.read_barcode_inline(orig_m, orig_n)
+
+ await self._send_command_plc("ST 1908") # pick plate from origin position
+
+ await self._wait_ready()
+
+ if orig_m != dest_m:
+ await self._send_command_plc(f"WR DM0 {dest_m}") # destination cassette # if different
+ await self._send_command_plc(f"WR DM23 {dest_step_size}") # motor step size
+ await self._send_command_plc(f"WR DM25 {dest_pos_num}") # number of positions in cassette
+ await self._send_command_plc(f"WR DM5 {dest_n}") # destination plate position #
+ await self._send_command_plc("ST 1909") # place plate in destination position
+
+ await self._wait_ready()
+ await self._send_command_plc("ST 1903") # terminate access
+
+ async def read_barcode_inline(self, cassette: int, plt_position: int) -> Barcode:
+ if self.barcode_scanner is None:
+ raise RuntimeError("Barcode scanner not configured for this incubator instance")
+
+ await self._send_command_plc("ST 1910") # move shovel to barcode reading position
+ await self._wait_ready()
+ barcode = await self.barcode_scanner.scan()
+ logger.info(
+ f"Read barcode from plate at cassette {cassette}, position {plt_position}: {barcode.data}"
+ )
+ reset = await self._send_command_plc("RS 1910") # move shovel back to normal position
+ if reset != "OK":
+ raise RuntimeError("Failed to reset shovel position after barcode reading")
+ await self._wait_ready()
+ return barcode
+
+ async def _send_command_plc(self, command: str) -> str:
+ """
+ Send an ASCII command to the Liconic PLC over serial and return the response.
+ """
+ cmd = command.strip() + "\r"
+ logger.debug(f"Sending command to Liconic PLC: {cmd!r}")
+ await self.io_plc.write(cmd.encode(self.serial_message_encoding))
+ resp = (await self.io_plc.read(128)).decode(self.serial_message_encoding)
+ if not resp:
+ raise RuntimeError(f"No response from Liconic PLC for command {command!r}")
+ resp = resp.strip()
+ if resp.startswith("E"):
+ logger.error(f"Command {command} failed with {resp}")
+ for member in ControllerError:
+ if resp == member.value:
+ raise controller_error_map[member]
+ raise RuntimeError(f"Unknown error {resp} when sending command {command}")
+ return resp
+
+ async def _wait_plate_ready(self, timeout: int = 60):
+ """
+ Poll the plate-ready flag (RD 1914) until it is set, or timeout is reached.
+ """
+ start = time.time()
+ deadline = start + timeout
+ while time.time() < deadline:
+ resp = await self._send_command_plc("RD 1914")
+ if resp == "1":
+ return
+ await asyncio.sleep(0.1)
+ raise TimeoutError(f"Plate did not become ready within {timeout} seconds")
+
+ async def _wait_ready(self, timeout: int = 60):
+ """
+ Poll the ready-flag (RD 1915) until it is set. If timeout is reached
+ the error flag is read and if true aka "1" then the error register is read.
+ """
+ start = time.time()
+ deadline = start + timeout
+ while time.time() < deadline:
+ resp = await self._send_command_plc("RD 1915")
+ if resp == "1":
+ return
+ await asyncio.sleep(0.1)
+ err_flag = await self._send_command_plc("RD 1814")
+ if err_flag == "1":
+ error = await self._send_command_plc("RD DM200")
+ for member in HandlingError:
+ if error == member.value:
+ raise handler_error_map[member]
+ raise RuntimeError(f" Liconic Handler in unknown error state with memory showing {error}")
+ raise TimeoutError(f"Incubator did not become ready within {timeout} seconds")
+
+ async def set_temperature(self, temperature: float):
+ """Set the temperature of the incubator in degrees Celsius. Using command WR DM890 ttttt
+ where ttttt is temperature in 0.1 degrees Celsius (e.g. 37.0C = 370)"""
+ if self.model.value.split("_")[-1] == "NC":
+ raise NotImplementedError("Climate control is not supported on this model")
+
+ temp_value = int(temperature * 10)
+ temp_str = str(temp_value).zfill(5)
+ await self._send_command_plc(f"WR DM890 {temp_str}")
+ await self._wait_ready()
+
+ async def get_temperature(self) -> float:
+ """Get the temperature of the incubator in degrees Celsius. Using command RD DM982"""
+ if self.model.value.split("_")[-1] == "NC":
+ raise NotImplementedError("Climate control is not supported on this model")
+
+ resp = await self._send_command_plc("RD DM982")
+ try:
+ temp_value = int(resp)
+ temperature = temp_value / 10.0
+ return temperature
+ except ValueError:
+ raise RuntimeError(f"Invalid temperature value received from incubator: {resp!r}")
+
+ # UNTESTED
+ # Unsure if 1 means ON and 0 means OFF, needs to be confirmed.
+ async def shaker_status(self) -> int:
+ """Determines whether the shaker is ON (1) or OFF (0)"""
+ # TODO: Missing PLC command - need to determine correct command from Liconic documentation
+ raise NotImplementedError("shaker_status command not yet implemented")
+
+ # UNTESTED
+ # Unsure if a liconic will return 00250 for 25 or 00025. Assuming former.
+ # Should be in Hz
+ async def get_shaker_speed(self) -> float:
+ """Gets the current shaker speed default = 25"""
+ speed_val = await self._send_command_plc("RD DM39")
+ speed = int(speed_val) / 10.0
+ await self._wait_ready()
+ return speed
+
+ # UNTESTED
+ # Unsure if setting WR DM39 00250 will set it at 25 Hz or if WR DM39 00025 will. Assuming former
+ async def start_shaking(self, frequency):
+ """Start shaking. Must be between 1 and 50 Hz. Frequency by default is 10 Hz. Using command
+ ST 1913. This functionality is not currently able to be tested."""
+ if frequency < 1.0 or frequency > 50.0:
+ raise ValueError("Shaking frequency must be between 1.0 and 50.0 Hz")
+ else:
+ frequency_value = int(frequency) # assuming incubator expects frequency in 0.1 Hz units
+ frequency = frequency_value * 10
+ await self._send_command_plc(f"WR DM39 {str(frequency).zfill(5)}")
+ await self._send_command_plc("ST 1913")
+ await self._wait_ready()
+
+ # UNTESTED
+ async def stop_shaking(self):
+ """Stop shaking. Using command RS 1913"""
+ await self._send_command_plc("RS 1913")
+ await self._wait_ready()
+
+ async def get_set_temperature(self) -> float:
+ """Get the set value temperature of the incubator in degrees Celsius."""
+ if self.model.value.split("_")[-1] == "NC":
+ raise NotImplementedError("Climate control is not supported on this model")
+
+ resp = await self._send_command_plc("RD DM890")
+ try:
+ temp_value = int(resp)
+ temperature = temp_value / 10.0
+ return temperature
+ except ValueError:
+ raise RuntimeError(f"Invalid set temperature value received from incubator: {resp!r}")
+
+ async def set_humidity(self, humidity: float):
+ """Set the humidity of the incubator in percentage (%)."""
+ if self.model.value.split("_")[-1] == "NC":
+ raise NotImplementedError("Climate control is not supported on this model")
+
+ humidity_val = int(humidity * 10)
+ await self._send_command_plc(f"WR DM893 {str(humidity_val).zfill(5)}")
+ await self._wait_ready()
+
+ async def get_humidity(self) -> float:
+ """Get the actual humidity of the incubator in percentage (%)."""
+ if self.model.value.split("_")[-1] == "NC":
+ raise NotImplementedError("Climate control is not supported on this model")
+
+ resp = await self._send_command_plc("RD DM983")
+ try:
+ humidity_value = int(resp)
+ humidity = humidity_value / 10.0
+ return humidity
+ except ValueError:
+ raise RuntimeError(f"Invalid humidity value received from incubator: {resp!r}")
+
+ async def get_set_humidity(self) -> float:
+ """Get the set value humidity of the incubator in percentage (%)."""
+ if self.model.value.split("_")[-1] == "NC":
+ raise NotImplementedError("Climate control is not supported on this model")
+
+ resp = await self._send_command_plc("RD DM893")
+ try:
+ humidity_value = int(resp)
+ humidity = humidity_value / 10.0
+ return humidity
+ except ValueError:
+ raise RuntimeError(f"Invalid set humidity value received from incubator: {resp!r}")
+
+ # UNTESTED
+ async def set_co2_level(self, co2_level: float):
+ """Set the CO2 level of the incubator in 1/100% vol. percentage (%) 500 = 5.0 % ."""
+ co2_val = int(co2_level * 100)
+ await self._send_command_plc(f"WR DM894 {str(co2_val).zfill(5)}")
+ await self._wait_ready()
+
+ # UNTESTED
+ async def get_co2_level(self) -> float:
+ """Get the CO2 level of the incubator in percentage (%)."""
+ resp = await self._send_command_plc("RD DM984")
+ try:
+ co2_value = int(resp)
+ co2 = co2_value / 100.0
+ return co2
+ except ValueError:
+ raise RuntimeError(f"Invalid co2 value received from incubator: {resp!r}")
+
+ # UNTESTED
+ async def get_set_co2_level(self) -> float:
+ """Get the set value CO2 level of the incubator in percentage (%)."""
+ resp = await self._send_command_plc("RD DM894")
+ try:
+ co2_set_value = int(resp)
+ co2 = co2_set_value / 100.0
+ return co2
+ except ValueError:
+ raise RuntimeError(f"Invalid co2 set value received from incubator: {resp!r}")
+
+ # UNTESTED
+ async def set_n2_level(self, n2_level: float):
+ """Set the N2 level of the incubator in percentage (%)."""
+ n2_val = int(n2_level * 100)
+ await self._send_command_plc(f"WR DM895 {str(n2_val).zfill(5)}")
+
+ # UNTESTED
+ async def get_n2_level(self) -> float:
+ """Get the N2 level of the incubator in percentage (%)."""
+ resp = await self._send_command_plc("RD DM985")
+ try:
+ n2_value = int(resp)
+ n2 = n2_value / 100.0
+ return n2
+ except ValueError:
+ raise RuntimeError(f"Invalid N2 value received from incubator: {resp!r}")
+
+ # UNTESTED
+ async def get_set_n2_level(self) -> float:
+ """Get the set value N2 level of the incubator in percentage (%)."""
+ resp = await self._send_command_plc("RD DM895")
+ try:
+ n2_set_value = int(resp)
+ n2 = n2_set_value / 100.0
+ return n2
+ except ValueError:
+ raise RuntimeError(f"Invalid N2 set value received from incubator: {resp!r}")
+
+ # UNTESTED
+ # Unsure what RD 1912 returns (is 1 home or swapped?)
+ # Another avenue is to read the first byte of T16 or T17 but don't have ability to test
+ async def turn_swap_station(self, home: bool):
+ """Turn the swap station of the incubator. If home is True, turn to home position."""
+ resp = await self._send_command_plc("RD 1912")
+ if home and resp == "1":
+ await self._send_command_plc("RS 1912")
+ else:
+ await self._send_command_plc("ST 1912")
+
+ # UNTESTED
+ # Activate plate sensor (ST 1911) used in HT units only because it is off by default
+ async def check_shovel_sensor(self) -> bool:
+ """First need to activate shovel transfer sensor deactivated by default, wait 0.1 seconds
+ and then Check if the shovel plate sensor is activated."""
+ await self._send_command_plc("ST 1911")
+ await asyncio.sleep(0.1)
+ resp = await self._send_command_plc("RD 1812")
+ if resp == "1":
+ return True
+ elif resp == "0":
+ return False
+ else:
+ raise RuntimeError(f"Unexpected response from incubator read shovel sensor: {resp!r}")
+
+ # UNTESTED
+ async def check_transfer_sensor(self) -> bool:
+ """Check if the transfer plate sensor is activated."""
+ resp = await self._send_command_plc("RD 1813")
+ if resp == "1":
+ return True
+ elif resp == "0":
+ return False
+ else:
+ raise RuntimeError(f"Unexpected response from read transfer station sensor: {resp!r}")
+
+ # UNTESTED
+ async def check_second_transfer_sensor(self) -> bool:
+ """Check if the second transfer plate sensor is activated."""
+ resp = await self._send_command_plc("RD 1807")
+ if resp == "1":
+ return True
+ elif resp == "0":
+ return False
+ else:
+ raise RuntimeError(f"Unexpected response from read 2nd transfer station sensor: {resp!r}")
+
+ async def scan_barcode(self, site: PlateHolder) -> Barcode:
+ """Scan a barcode using the internal barcode reader."""
+ if self.barcode_scanner is None:
+ raise RuntimeError("Barcode scanner not configured for this incubator instance")
+
+ m, n = self._site_to_m_n(site)
+ step_size, pos_num = self._carrier_to_steps_pos(site)
+
+ await self._send_command_plc(f"WR DM0 {m}") # carousel number
+ await self._send_command_plc(f"WR DM23 {step_size}") # pitch of plate in mm
+ await self._send_command_plc(f"WR DM25 {pos_num}") # plate
+ await self._send_command_plc(f"WR DM5 {n}") # plate position in carousel
+ await self._send_command_plc("ST 1910") # move shovel to barcode reading position
+
+ barcode = await self.barcode_scanner.scan()
+ logger.info(f"Scanned barcode: {barcode.data}")
+ return barcode
+
+ def serialize(self) -> dict:
+ return {
+ **super().serialize(),
+ "port": self.io_plc.port,
+ }
+
+ @classmethod
+ def deserialize(cls, data: dict):
+ return cls(port=data["port"])
diff --git a/pylabrobot/storage/liconic/racks.py b/pylabrobot/storage/liconic/racks.py
new file mode 100644
index 00000000000..0fc340d43b7
--- /dev/null
+++ b/pylabrobot/storage/liconic/racks.py
@@ -0,0 +1,435 @@
+from typing import Optional
+
+from pylabrobot.resources import Coordinate
+from pylabrobot.resources.carrier import PlateCarrier, PlateHolder
+
+
+def _liconic_rack(
+ name: str,
+ pitch: int,
+ steps: int,
+ site_height: int,
+ num_sites: int,
+ model: str,
+ total_height: Optional[int] = 505, # 645 and 1210 for STX 500 and STX1000 only
+ bicarousel: Optional[bool] = False, # for STX500 and STX1000 only
+):
+ start = 17.2 # rough height of first plate position
+ pitch = (pitch,)
+ steps = (steps,)
+ bicarousel = (bicarousel,)
+ return PlateCarrier(
+ name=name,
+ size_x=109, # based off cytomat rack dimensions roughly the same
+ size_y=142,
+ size_z=total_height,
+ sites={
+ i: PlateHolder(
+ size_x=85.48,
+ size_y=127.27,
+ # estimates
+ size_z=max(site_height, total_height - site_height) if i == num_sites - 1 else site_height,
+ name=f"{name} - {i + 1}",
+ pedestal_size_z=0,
+ ).at(
+ Coordinate(
+ x=11.76, # estimate
+ y=0,
+ z=start + site_height * i,
+ )
+ )
+ for i in range(num_sites)
+ },
+ model=model,
+ )
+
+
+""" The motor step size used to set DM23 in the Liconic is calculated using the known step size of
+ for 23mm pitch which is 788 and for 50 mm which is 1713.
+
+ Therefore for the other pitch sizes: step size = pitch / (50 / 1713) and then rounded to nearest
+ whole number"""
+
+
+def liconic_rack_5mm_42(name: str):
+ return _liconic_rack(
+ name=name, pitch=11, steps=377, site_height=5, num_sites=42, model="liconic_rack_5mm_42"
+ )
+
+
+def liconic_rack_5mm_55(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=11,
+ steps=377,
+ site_height=5,
+ num_sites=55,
+ model="liconic_rack_5mm_55",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_5mm_111(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=11,
+ steps=377,
+ site_height=5,
+ num_sites=111,
+ model="liconic_rack_5mm_111",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_11mm_28(name: str):
+ return _liconic_rack(
+ name=name, pitch=17, steps=582, site_height=11, num_sites=28, model="liconic_rack_11mm_28"
+ )
+
+
+def liconic_rack_11mm_37(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=17,
+ steps=582,
+ site_height=11,
+ num_sites=37,
+ model="liconic_rack_11mm_37",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_11mm_72(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=17,
+ steps=582,
+ site_height=11,
+ num_sites=72,
+ model="liconic_rack_11mm_72",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_12mm_27(name: str):
+ return _liconic_rack(
+ name=name, pitch=18, steps=617, site_height=12, num_sites=27, model="liconic_rack_12mm_27"
+ )
+
+
+def liconic_rack_12mm_35(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=18,
+ steps=617,
+ site_height=12,
+ num_sites=35,
+ model="liconic_rack_12mm_35",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_12mm_68(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=18,
+ steps=617,
+ site_height=12,
+ num_sites=68,
+ model="liconic_rack_12mm_68",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_17mm_22(name: str):
+ return _liconic_rack(
+ name=name, pitch=23, steps=788, site_height=17, num_sites=22, model="liconic_rack_17mm_22"
+ )
+
+
+def liconic_rack_17mm_28(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=23,
+ steps=788,
+ site_height=17,
+ num_sites=28,
+ model="liconic_rack_17mm_28",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_17mm_53(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=23,
+ steps=788,
+ site_height=17,
+ num_sites=53,
+ model="liconic_rack_17mm_53",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_22mm_17(name: str):
+ return _liconic_rack(
+ name=name, pitch=28, steps=959, site_height=22, num_sites=17, model="liconic_rack_22mm_17"
+ )
+
+
+def liconic_rack_22mm_23(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=28,
+ steps=959,
+ site_height=22,
+ num_sites=23,
+ model="liconic_rack_22mm_23",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_22mm_43(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=28,
+ steps=959,
+ site_height=22,
+ num_sites=43,
+ model="liconic_rack_22mm_43",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_23mm_17(name: str):
+ return _liconic_rack(
+ name=name, pitch=29, steps=994, site_height=23, num_sites=17, model="liconic_rack_23mm_17"
+ )
+
+
+def liconic_rack_23mm_22(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=29,
+ steps=994,
+ site_height=23,
+ num_sites=22,
+ model="liconic_rack_23mm_22",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_23mm_42(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=29,
+ steps=994,
+ site_height=23,
+ num_sites=42,
+ model="liconic_rack_23mm_42",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_24mm_17(name: str):
+ return _liconic_rack(
+ name=name, pitch=30, steps=1028, site_height=24, num_sites=17, model="liconic_rack_24mm_17"
+ )
+
+
+def liconic_rack_24mm_21(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=30,
+ steps=1028,
+ site_height=24,
+ num_sites=21,
+ model="liconic_rack_24mm_21",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_24mm_41(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=30,
+ steps=1028,
+ site_height=24,
+ num_sites=41,
+ model="liconic_rack_24mm_41",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_27mm_15(name: str):
+ return _liconic_rack(
+ name=name, pitch=33, steps=1131, site_height=27, num_sites=15, model="liconic_rack_27mm_15"
+ )
+
+
+def liconic_rack_27mm_19(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=33,
+ steps=1131,
+ site_height=27,
+ num_sites=19,
+ model="liconic_rack_27mm_19",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_27mm_37(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=33,
+ steps=1131,
+ site_height=27,
+ num_sites=37,
+ model="liconic_rack_27mm_37",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_44mm_10(name: str):
+ return _liconic_rack(
+ name=name, pitch=50, steps=1713, site_height=44, num_sites=10, model="liconic_rack_44mm_10"
+ )
+
+
+def liconic_rack_44mm_13(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=50,
+ steps=1713,
+ site_height=44,
+ num_sites=13,
+ model="liconic_rack_44mm_13",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_44mm_25(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=50,
+ steps=1713,
+ site_height=44,
+ num_sites=25,
+ model="liconic_rack_44mm_25",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_53mm_8(name: str):
+ return _liconic_rack(
+ name=name, pitch=59, steps=2021, site_height=53, num_sites=8, model="liconic_rack_53mm_8"
+ )
+
+
+def liconic_rack_53mm_10(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=59,
+ steps=2021,
+ site_height=53,
+ num_sites=10,
+ model="liconic_rack_53mm_10",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_53mm_21(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=59,
+ steps=2021,
+ site_height=53,
+ num_sites=21,
+ model="liconic_rack_53mm_21",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_66mm_7(name: str):
+ return _liconic_rack(
+ name=name, pitch=72, steps=2467, site_height=66, num_sites=7, model="liconic_rack_66mm_7"
+ )
+
+
+def liconic_rack_66mm_8(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=72,
+ steps=2467,
+ site_height=66,
+ num_sites=8,
+ model="liconic_rack_66mm_8",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_66mm_17(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=72,
+ steps=2467,
+ site_height=66,
+ num_sites=17,
+ model="liconic_rack_66mm_17",
+ total_height=1210,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_104mm_4(name: str):
+ return _liconic_rack(
+ name=name, pitch=110, steps=3563, site_height=104, num_sites=4, model="liconic_rack_104mm_4"
+ )
+
+
+def liconic_rack_104mm_5(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=110,
+ steps=3563,
+ site_height=104,
+ num_sites=5,
+ model="liconic_rack_104mm_5",
+ total_height=645,
+ bicarousel=True,
+ )
+
+
+def liconic_rack_104mm_11(name: str):
+ return _liconic_rack(
+ name=name,
+ pitch=110,
+ steps=3563,
+ site_height=104,
+ num_sites=11,
+ model="liconic_rack_104mm_11",
+ total_height=1210,
+ bicarousel=True,
+ )