diff --git a/docs/user_guide/01_material-handling/thermocycling/inheco-odtc.ipynb b/docs/user_guide/01_material-handling/thermocycling/inheco-odtc.ipynb new file mode 100644 index 00000000000..82220fb2dcc --- /dev/null +++ b/docs/user_guide/01_material-handling/thermocycling/inheco-odtc.ipynb @@ -0,0 +1,310 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Inheco ODTC (On Deck Thermal Cycler)\n", + "\n", + "The Inheco ODTC is an on-deck thermal cycler designed for automated PCR workflows. It features:\n", + "\n", + "- Precise temperature control for PCR cycling\n", + "- Heated lid to prevent condensation\n", + "- Motorized door for automated plate handling\n", + "- SiLA 2 communication interface\n", + "\n", + "**Specifications:**\n", + "- Temperature range: 4°C to 99°C\n", + "- Heating/cooling rate: up to 4.4°C/s\n", + "- 96-well plate format\n", + "\n", + "See the [Inheco ODTC product page](https://www.inheco.com/odtc.html) for more information." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Setup\n", + "\n", + "The ODTC communicates over Ethernet using the SiLA 2 protocol. You'll need:\n", + "1. The IP address of the ODTC\n", + "2. Network connectivity between your computer and the ODTC" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pylabrobot.thermocycling.inheco import ODTC\n", + "\n", + "odtc = ODTC(\n", + " name=\"odtc\",\n", + " ip=\"169.254.151.99\", # Replace with your ODTC's IP address\n", + ")\n", + "await odtc.setup()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Door Control\n", + "\n", + "Open and close the door for plate access:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await odtc.open_lid()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await odtc.close_lid()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Temperature Control\n", + "\n", + "### Reading Sensor Data\n", + "\n", + "Get current temperatures from all sensors:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sensor_data = await odtc.get_sensor_data()\n", + "print(sensor_data)\n", + "# Example output:\n", + "# {'Mount': 25.0, 'Mount_Monitor': 25.1, 'Lid': 30.0, 'Lid_Monitor': 30.1,\n", + "# 'Ambient': 22.0, 'PCB': 28.0, 'Heatsink': 26.0, 'Heatsink_TEC': 25.5}" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Setting Block Temperature\n", + "\n", + "Set a constant block temperature. Note that the ODTC uses a \"pre-method\" approach which takes several minutes to stabilize:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await odtc.set_block_temperature([37.0]) # Set to 37°C" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Check current block temperature\n", + "temp = await odtc.get_block_current_temperature()\n", + "print(f\"Block temperature: {temp[0]}°C\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Deactivating Temperature Control" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await odtc.deactivate_block()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Running PCR Protocols\n", + "\n", + "The ODTC can run complex PCR protocols defined using `Protocol`, `Stage`, and `Step` objects.\n", + "\n", + "### Defining a Protocol\n", + "\n", + "A protocol consists of stages, each containing steps with temperature and hold time. Stages can repeat multiple times for cycling." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pylabrobot.thermocycling.standard import Protocol, Stage, Step\n", + "\n", + "# Example: Standard 3-step PCR protocol\n", + "pcr_protocol = Protocol(\n", + " stages=[\n", + " # Initial denaturation\n", + " Stage(\n", + " steps=[Step(temperature=[95.0], hold_seconds=300)], # 95°C for 5 min\n", + " repeats=1\n", + " ),\n", + " # PCR cycling (30 cycles)\n", + " Stage(\n", + " steps=[\n", + " Step(temperature=[95.0], hold_seconds=30), # Denature: 95°C for 30s\n", + " Step(temperature=[55.0], hold_seconds=30), # Anneal: 55°C for 30s\n", + " Step(temperature=[72.0], hold_seconds=60), # Extend: 72°C for 60s\n", + " ],\n", + " repeats=30\n", + " ),\n", + " # Final extension\n", + " Stage(\n", + " steps=[Step(temperature=[72.0], hold_seconds=600)], # 72°C for 10 min\n", + " repeats=1\n", + " ),\n", + " # Hold\n", + " Stage(\n", + " steps=[Step(temperature=[4.0], hold_seconds=0)], # 4°C hold\n", + " repeats=1\n", + " ),\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Running the Protocol" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await odtc.run_protocol(\n", + " protocol=pcr_protocol,\n", + " block_max_volume=20.0, # Maximum sample volume in µL\n", + " start_block_temperature=25.0, # Starting block temperature\n", + " start_lid_temperature=105.0, # Lid temperature (typically 105°C to prevent condensation)\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Custom Ramp Rates\n", + "\n", + "You can specify custom temperature ramp rates for each step:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Protocol with custom ramp rates\n", + "custom_protocol = Protocol(\n", + " stages=[\n", + " Stage(\n", + " steps=[\n", + " Step(temperature=[95.0], hold_seconds=60, rate=4.4), # Fast ramp (4.4°C/s)\n", + " Step(temperature=[60.0], hold_seconds=30, rate=2.0), # Slower ramp (2.0°C/s)\n", + " ],\n", + " repeats=1\n", + " ),\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await odtc.run_protocol(\n", + " protocol=custom_protocol,\n", + " block_max_volume=25.0,\n", + " start_block_temperature=25.0,\n", + " start_lid_temperature=105.0,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "---\n", + "\n", + "## Closing the Connection" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "await odtc.stop()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "env", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.24" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/docs/user_guide/01_material-handling/thermocycling/thermocycling.md b/docs/user_guide/01_material-handling/thermocycling/thermocycling.md index b08df7730c1..7b8e1dee2ee 100644 --- a/docs/user_guide/01_material-handling/thermocycling/thermocycling.md +++ b/docs/user_guide/01_material-handling/thermocycling/thermocycling.md @@ -13,4 +13,5 @@ Thermocyclers are essential for temperature-controlled processes like PCR (Polym ## Supported Thermocyclers +- [Inheco ODTC](inheco-odtc.ipynb) - Opentrons Thermocycler diff --git a/pylabrobot/storage/inheco/scila/inheco_sila_interface.py b/pylabrobot/storage/inheco/scila/inheco_sila_interface.py index 64b505215aa..7197d6a213b 100644 --- a/pylabrobot/storage/inheco/scila/inheco_sila_interface.py +++ b/pylabrobot/storage/inheco/scila/inheco_sila_interface.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import datetime import http.server import logging import random @@ -12,7 +13,13 @@ from dataclasses import dataclass from typing import Any, Optional, Tuple -from pylabrobot.storage.inheco.scila.soap import XSI, soap_decode, soap_encode +from pylabrobot.storage.inheco.scila.soap import ( + XSI, + _localname, + soap_body_payload, + soap_decode, + soap_encode, +) SOAP_RESPONSE_ResponseEventResponse = """ """ +SOAP_RESPONSE_DataEventResponse = """ + + + + 1 + Success + PT0S + 0 + + + +""" + + def _get_local_ip(machine_ip: str) -> str: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: @@ -57,6 +79,15 @@ def _get_local_ip(machine_ip: str) -> str: return local_ip +class SiLAError(RuntimeError): + def __init__(self, code: int, message: str, command: str, details: Optional[dict] = None): + self.code = code + self.message = message + self.command = command + self.details = details or {} + super().__init__(f"Command {command} failed with code {code}: '{message}'") + + class InhecoSiLAInterface: @dataclass(frozen=True) class _HTTPRequest: @@ -188,33 +219,50 @@ async def _on_http(self, req: _HTTPRequest) -> bytes: cmd = self._pending - if cmd is not None and not cmd.fut.done(): - response_event = soap_decode(req.body.decode("utf-8")) - if "ResponseEvent" in response_event: - request_id = response_event["ResponseEvent"].get("requestId") - if request_id != cmd.request_id: - self._logger.warning("Request ID does not match pending command.") - else: - return_value = response_event["ResponseEvent"].get("returnValue", {}) - return_code = return_value.get("returnCode") - if return_code != 3: # error - err_msg = return_value.get("message", "Unknown error").replace("\n", " ") + try: + xml_str = req.body.decode("utf-8") + payload = soap_body_payload(xml_str) + tag_local = _localname(payload.tag) + + if cmd is not None and not cmd.fut.done() and tag_local == "ResponseEvent": + response_event = soap_decode(xml_str) + if response_event["ResponseEvent"].get("requestId") == cmd.request_id: + ret = response_event["ResponseEvent"].get("returnValue", {}) + rc = ret.get("returnCode") + if rc != 3: # 3=Success cmd.fut.set_exception( - RuntimeError(f"Command {cmd.name} failed with code {return_code}: '{err_msg}'") + SiLAError(rc, ret.get("message", "").replace(chr(10), " "), cmd.name, details=ret) ) else: - response_data = response_event["ResponseEvent"].get("responseData", "") - root = ET.fromstring(response_data) - cmd.fut.set_result(root) - else: - self._logger.warning("No pending command to match response to.") + cmd.fut.set_result( + ET.fromstring(d) + if (d := response_event["ResponseEvent"].get("responseData")) + else ET.Element("EmptyResponse") + ) + + if tag_local == "DataEvent": + try: + raw = next(e.text for e in payload.iter() if _localname(e.tag) == "dataValue") + series = ET.fromstring(ET.fromstring(raw).find(".//AnyData").text).findall( + ".//dataSeries" + ) + data = {} + for s in series: + val = s.findall(".//integerValue")[-1].text + unit = s.get("unit") + data[s.get("nameId")] = f"{val} {unit}" if unit else val + print(f"[{datetime.datetime.now().strftime('%H:%M:%S.%f')[:-3]}] [SiLA DataEvent] {data}") + except: + pass + return SOAP_RESPONSE_DataEventResponse.encode("utf-8") + + if tag_local == "StatusEvent": + return SOAP_RESPONSE_StatusEventResponse.encode("utf-8") + return SOAP_RESPONSE_ResponseEventResponse.encode("utf-8") - if "ResponseEvent" in req.body.decode("utf-8"): + except Exception as e: + self._logger.error(f"Error handling event: {e}") return SOAP_RESPONSE_ResponseEventResponse.encode("utf-8") - if "StatusEvent" in req.body.decode("utf-8"): - return SOAP_RESPONSE_StatusEventResponse.encode("utf-8") - self._logger.warning("Unknown event type received.") - return SOAP_RESPONSE_ResponseEventResponse.encode("utf-8") def _get_return_code_and_message(self, command_name: str, response: Any) -> Tuple[int, str]: resp_level = response.get(f"{command_name}Response", {}) # first level diff --git a/pylabrobot/thermocycling/inheco/__init__.py b/pylabrobot/thermocycling/inheco/__init__.py new file mode 100644 index 00000000000..d3ff376c539 --- /dev/null +++ b/pylabrobot/thermocycling/inheco/__init__.py @@ -0,0 +1,2 @@ +from .odtc import ODTC +from .odtc_backend import ODTCBackend diff --git a/pylabrobot/thermocycling/inheco/odtc.py b/pylabrobot/thermocycling/inheco/odtc.py new file mode 100644 index 00000000000..773b88a66bb --- /dev/null +++ b/pylabrobot/thermocycling/inheco/odtc.py @@ -0,0 +1,47 @@ +from typing import Dict, Optional + +from pylabrobot.resources import Coordinate, Rotation +from pylabrobot.thermocycling.thermocycler import Thermocycler + +from .odtc_backend import ODTCBackend + + +class ODTC(Thermocycler): + """Inheco ODTC (On Deck Thermal Cycler).""" + + def __init__( + self, + name: str, + ip: str, + client_ip: Optional[str] = None, + child_location: Coordinate = Coordinate(0, 0, 0), + rotation: Optional[Rotation] = None, + ): + """ + Initialize the Inheco ODTC. + + Args: + name: The name of the resource. + ip: The IP address of the ODTC. + client_ip: The IP address of the client (this computer). If None, it will be automatically + determined. + child_location: The location of the child resource (plate) relative to the ODTC. + rotation: The rotation of the ODTC. + """ + backend = ODTCBackend(ip=ip, client_ip=client_ip) + super().__init__( + name=name, + size_x=159.0, # Approximate dimensions, verify with spec + size_y=245.0, + size_z=228.0, + backend=backend, + child_location=child_location, + category="thermocycler", + model="ODTC", + ) + if rotation is not None: + self.rotation = rotation + + async def get_sensor_data(self) -> Dict[str, float]: + """Get all sensor data from the device.""" + return await self.backend.get_sensor_data() # type: ignore diff --git a/pylabrobot/thermocycling/inheco/odtc_backend.py b/pylabrobot/thermocycling/inheco/odtc_backend.py new file mode 100644 index 00000000000..d734a830fe9 --- /dev/null +++ b/pylabrobot/thermocycling/inheco/odtc_backend.py @@ -0,0 +1,392 @@ +import asyncio +import datetime +import time +import xml.etree.ElementTree as ET +from typing import Any, Dict, List, Optional + +from pylabrobot.storage.inheco.scila.inheco_sila_interface import InhecoSiLAInterface, SiLAError +from pylabrobot.thermocycling.backend import ThermocyclerBackend +from pylabrobot.thermocycling.standard import BlockStatus, LidStatus, Protocol + + +def _format_number(n: Any) -> str: + if n is None: + return "0" + try: + f = float(n) + return str(int(f)) if f.is_integer() else str(f) + except: + return str(n) + + +def _recursive_find_key(data: Any, key: str) -> Any: + if isinstance(data, dict): + if key in data: + return data[key] + for v in data.values(): + item = _recursive_find_key(v, key) + if item is not None: + return item + elif isinstance(data, list): + for v in data: + item = _recursive_find_key(v, key) + if item is not None: + return item + elif hasattr(data, "find"): + node = data.find(f".//{key}") + if node is not None: + return node.text + if str(data.tag).endswith(key): + return data.text + return None + + +class ODTCBackend(ThermocyclerBackend): + def __init__(self, ip: str, client_ip: Optional[str] = None) -> None: + self._sila_interface = InhecoSiLAInterface(client_ip=client_ip, machine_ip=ip) + self._block_target_temp: Optional[float] = None + self._lid_target_temp: Optional[float] = None + self._current_sensors: Dict[str, float] = {} + self._temp_update_time = 0 + + async def setup(self) -> None: + await self._sila_interface.setup() + await self._reset_and_initialize() + + async def stop(self): + await self._sila_interface.close() + + async def _reset_and_initialize(self) -> None: + try: + event_uri = f"http://{self._sila_interface._client_ip}:{self._sila_interface.bound_port}/" + await self._sila_interface.send_command( + command="Reset", deviceId="ODTC", eventReceiverURI=event_uri, simulationMode=False + ) + await self._sila_interface.send_command("Initialize") + except Exception as e: + print(f"Warning during ODTC initialization: {e}") + + async def _wait_for_idle(self, timeout=30): + """Wait until device state is not Busy.""" + start = time.time() + while time.time() - start < timeout: + root = await self._sila_interface.send_command("GetStatus") + st = _recursive_find_key(root, "state") + if st and st in ["idle", "standby"]: + return + await asyncio.sleep(1) + raise RuntimeError("Timeout waiting for ODTC idle state") + + # ------------------------------------------------------------------------- + # Lid + # ------------------------------------------------------------------------- + + async def open_lid(self): + await self._sila_interface.send_command("OpenDoor") + + async def close_lid(self): + await self._sila_interface.send_command("CloseDoor") + + async def get_lid_open(self) -> bool: + raise NotImplementedError() + + async def get_lid_status(self) -> LidStatus: + raise NotImplementedError() + + # ------------------------------------------------------------------------- + # Temperature Helpers + # ------------------------------------------------------------------------- + + async def get_sensor_data(self) -> Dict[str, float]: + """ + Get all sensor data from the device. + Returns a dictionary with keys: 'Mount', 'Mount_Monitor', 'Lid', 'Lid_Monitor', + 'Ambient', 'PCB', 'Heatsink', 'Heatsink_TEC'. + Values are in degrees Celsius. + """ + if time.time() - self._temp_update_time < 2.0 and self._current_sensors: + return self._current_sensors + + try: + root = await self._sila_interface.send_command("ReadActualTemperature") + + embedded_xml = _recursive_find_key(root, "String") + + if embedded_xml and isinstance(embedded_xml, str): + sensor_root = ET.fromstring(embedded_xml) + + data = {} + for child in sensor_root: + if child.tag and child.text: + try: + # Values are integers scaled by 100 (3700 -> 37.0 C) + data[child.tag] = float(child.text) / 100.0 + except ValueError: + pass + + self._current_sensors = data + self._temp_update_time = time.time() + return self._current_sensors + except Exception as e: + print(f"Error reading sensor data: {e}") + pass + return self._current_sensors + + async def _run_pre_method(self, block_temp: float, lid_temp: float, dynamic_time: bool = True): + """ + Define and run a PreMethod (Hold) used for setting constant temperature. + WARNING: ODTC pre-methods take 7-10 minutes to pre-warm evenly the block and lid before a run. + This command is not ideal for quick temperature changes. + dynamic_time: if True, method will complete in less than 10 minutes (like 7) + if False, command holds temp for 10 minutes before proceeding + """ + now = datetime.datetime.now().astimezone() + method_name = f"PLR_Hold_{now.strftime('%Y%m%d_%H%M%S')}" + + methods_xml = ( + f'' + f"" + f"false" + f'' + f"{_format_number(block_temp)}" + f"{_format_number(lid_temp)}" + f"{'true' if dynamic_time else 'false'}" + f"" + f"" + ) + + ps = ET.Element("ParameterSet") + pm = ET.SubElement(ps, "Parameter", name="MethodsXML") + ET.SubElement(pm, "String").text = methods_xml + params_xml = ET.tostring(ps, encoding="unicode") + + await self.stop_method() + await self._wait_for_idle() + + await self._sila_interface.send_command("SetParameters", paramsXML=params_xml) + await self._sila_interface.send_command("ExecuteMethod", methodName=method_name) + + # ------------------------------------------------------------------------- + # Block Temperature + # ------------------------------------------------------------------------- + + async def set_block_temperature(self, temperature: List[float], dynamic_time: bool = True): + if not temperature: + return + self._block_target_temp = temperature[0] + lid = self._lid_target_temp if self._lid_target_temp is not None else 105.0 + await self._run_pre_method(self._block_target_temp, lid, dynamic_time=dynamic_time) + + async def deactivate_block(self): + await self.stop_method() + + async def get_block_current_temperature(self) -> List[float]: + temps = await self.get_sensor_data() + return [temps.get("Mount", 0.0)] + + async def get_block_target_temperature(self) -> List[float]: + raise NotImplementedError() + + async def get_block_status(self) -> BlockStatus: + raise NotImplementedError() + + # ------------------------------------------------------------------------- + # Lid Temperature + # ------------------------------------------------------------------------- + + async def set_lid_temperature(self, temperature: List[float], dynamic_time: bool = True): + if not temperature: + return + self._lid_target_temp = temperature[0] + block = self._block_target_temp if self._block_target_temp is not None else 25.0 + await self._run_pre_method(block, self._lid_target_temp, dynamic_time=dynamic_time) + + async def deactivate_lid(self): + raise NotImplementedError() + + async def get_lid_current_temperature(self) -> List[float]: + temps = await self.get_sensor_data() + return [temps.get("Lid", 0.0)] + + async def get_lid_target_temperature(self) -> List[float]: + raise NotImplementedError() + + # ------------------------------------------------------------------------- + # Protocol + # ------------------------------------------------------------------------- + + def _generate_method_xml( + self, + protocol: Protocol, + block_max_volume: float, + start_block_temperature: float, + start_lid_temperature: float, + post_heating: bool, + method_name: Optional[str] = None, + **kwargs, + ) -> tuple[str, str]: + if not method_name: + method_name = f"PLR_Protocol_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}" + + if block_max_volume < 30.0: + fluid_quantity = "0" + elif block_max_volume < 75.0: + fluid_quantity = "1" + else: + fluid_quantity = "2" + + # Use ISO format with timezone for strict SiLA compliance (e.g. 2026-01-06T18:39:30.503368-08:00) + now = datetime.datetime.now().astimezone() + now_str = now.isoformat() + + root = ET.Element("MethodSet") + ET.SubElement(root, "DeleteAllMethods").text = "false" + + method_elem = ET.SubElement( + root, "Method", methodName=method_name, creator="PyLabRobot", dateTime=now_str + ) + ET.SubElement(method_elem, "Variant").text = "960000" + ET.SubElement(method_elem, "PlateType").text = "0" + ET.SubElement(method_elem, "FluidQuantity").text = fluid_quantity + + ET.SubElement(method_elem, "PostHeating").text = "true" if post_heating else "false" + + ET.SubElement(method_elem, "StartBlockTemperature").text = _format_number( + start_block_temperature + ) + ET.SubElement(method_elem, "StartLidTemperature").text = _format_number(start_lid_temperature) + + # Step defaults + def_slope = _format_number(kwargs.get("slope", "4.4")) + def_os_slope1 = _format_number(kwargs.get("overshoot_slope1", "0.1")) + def_os_temp = _format_number(kwargs.get("overshoot_temperature", "0")) + def_os_time = _format_number(kwargs.get("overshoot_time", "0")) + def_os_slope2 = _format_number(kwargs.get("overshoot_slope2", "0.1")) + pid_number = _format_number(kwargs.get("pid_number", "1")) + + step_counter = 1 + for stage_idx, stage in enumerate(protocol.stages): + if not stage.steps: + continue + start_of_stage = step_counter + + for i, step in enumerate(stage.steps): + b_temp = step.temperature[0] if step.temperature else 25 + l_temp = start_lid_temperature # Keep lid at start temp, could be extended to support step-specific lid temps + duration = step.hold_seconds + s_slope = _format_number(step.rate) if step.rate is not None else def_slope + + s = ET.SubElement(method_elem, "Step") + ET.SubElement(s, "Number").text = str(step_counter) + ET.SubElement(s, "Slope").text = s_slope + ET.SubElement(s, "PlateauTemperature").text = _format_number(b_temp) + ET.SubElement(s, "PlateauTime").text = _format_number(duration) + + # OverShoot params - use defaults passed to function + ET.SubElement(s, "OverShootSlope1").text = def_os_slope1 + ET.SubElement(s, "OverShootTemperature").text = def_os_temp + ET.SubElement(s, "OverShootTime").text = def_os_time + ET.SubElement(s, "OverShootSlope2").text = def_os_slope2 + + # Loop logic on the last step of the stage + if i == len(stage.steps) - 1 and stage.repeats > 1: + ET.SubElement(s, "GotoNumber").text = str(start_of_stage) + ET.SubElement(s, "LoopNumber").text = str(stage.repeats - 1) + else: + ET.SubElement(s, "GotoNumber").text = "0" + ET.SubElement(s, "LoopNumber").text = "0" + + ET.SubElement(s, "PIDNumber").text = pid_number + ET.SubElement(s, "LidTemp").text = _format_number(l_temp) + step_counter += 1 + + # Default PID + pid_set = ET.SubElement(method_elem, "PIDSet") + pid = ET.SubElement(pid_set, "PID", number=pid_number) + defaults = { + "PHeating": "60", + "PCooling": "80", + "IHeating": "250", + "ICooling": "100", + "DHeating": "10", + "DCooling": "10", + "PLid": "100", + "ILid": "70", + } + for k, v in defaults.items(): + # Allow kwargs to override specific PID values, e.g. PHeating="70" + val = kwargs.get(k, v) + ET.SubElement(pid, k).text = _format_number(val) + + xml_str = '' + ET.tostring(root, encoding="unicode") + return xml_str, method_name + + async def run_protocol( + self, + protocol: Protocol, + block_max_volume: float = 20.0, + start_block_temperature: float = 25.0, + start_lid_temperature: float = 30.0, + post_heating: bool = True, + method_name: Optional[str] = None, + **kwargs, + ): + """ + Run a PCR protocol. + + Args: + protocol: The protocol to run. + block_max_volume: Maximum block volume in microliters. + start_block_temperature: The starting block temperature in C. + start_lid_temperature: The starting lid temperature in C. + post_heating: Whether to keep last temperature after method end. + method_name: Optional name for the method on the device. + **kwargs: Additional XML parameters for the ODTC method, including: + slope, overshoot_slope1, overshoot_temperature, overshoot_time, overshoot_slope2, + pid_number, and PID parameters (PHeating, PCooling, etc.) + """ + + method_xml, method_name = self._generate_method_xml( + protocol, + block_max_volume, + start_block_temperature, + start_lid_temperature, + post_heating, + method_name=method_name, + **kwargs, + ) + + ps = ET.Element("ParameterSet") + pm = ET.SubElement(ps, "Parameter", name="MethodsXML") + ET.SubElement(pm, "String").text = method_xml + params_xml = ET.tostring(ps, encoding="unicode") + + print("[ODTC] Uploading MethodSet...") + await self._sila_interface.send_command("SetParameters", paramsXML=params_xml) + + print(f"[ODTC] Executing method '{method_name}'") + try: + await self._sila_interface.send_command("ExecuteMethod", methodName=method_name) + except SiLAError as e: + if e.code == 12: # SuccessWithWarning + print(f"[ODTC Warning] {e.message}") + else: + raise e + + async def stop_method(self): + await self._sila_interface.send_command("StopMethod") + + async def get_hold_time(self) -> float: + raise NotImplementedError() + + async def get_current_cycle_index(self) -> int: + raise NotImplementedError() + + async def get_total_cycle_count(self) -> int: + raise NotImplementedError() + + async def get_current_step_index(self) -> int: + raise NotImplementedError() + + async def get_total_step_count(self) -> int: + raise NotImplementedError()