diff --git a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py index 42a63aa6cb6..513d85a7095 100644 --- a/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py +++ b/pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py @@ -59,6 +59,8 @@ SingleChannelDispense, ) from pylabrobot.liquid_handling.utils import ( + MIN_SPACING_BETWEEN_CHANNELS, + MIN_SPACING_EDGE, get_tight_single_resource_liquid_op_offsets, get_wide_single_resource_liquid_op_offsets, ) @@ -1781,6 +1783,38 @@ async def probe_liquid_heights( search_speed: float = 10.0, n_replicates: int = 1, move_to_z_safety_after: bool = True, + allow_duplicate_channels: bool = False, + # Traverse height parameters (None = full Z safety, float = absolute Z position in mm) + min_traverse_height_at_beginning_of_command: Optional[float] = None, + min_traverse_height_during_command: Optional[float] = None, + z_position_at_end_of_command: Optional[float] = None, + # Shared detection parameters + channel_acceleration: float = 800.0, + post_detection_trajectory: Literal[0, 1] = 1, + post_detection_dist: float = 0.0, + # cLLD-specific parameters (used when lld_mode=GAMMA) + detection_edge: int = 10, + detection_drop: int = 2, + # pLLD-specific parameters (used when lld_mode=PRESSURE) + channel_speed_above_start_pos_search: float = 120.0, + z_drive_current_limit: int = 3, + tip_has_filter: bool = False, + dispense_drive_speed: float = 5.0, + dispense_drive_acceleration: float = 0.2, + dispense_drive_max_speed: float = 14.5, + dispense_drive_current_limit: int = 3, + plld_detection_edge: int = 30, + plld_detection_drop: int = 10, + clld_verification: bool = False, + clld_detection_edge: int = 10, + clld_detection_drop: int = 2, + max_delta_plld_clld: float = 5.0, + plld_mode: Optional[PressureLLDMode] = None, # defaults to PressureLLDMode.LIQUID + plld_foam_detection_drop: int = 30, + plld_foam_detection_edge_tolerance: int = 30, + plld_foam_ad_values: int = 30, + plld_foam_search_speed: float = 10.0, + dispense_back_plld_volume: Optional[float] = None, ) -> List[float]: """Probe liquid surface heights in containers using liquid level detection. @@ -1791,43 +1825,94 @@ async def probe_liquid_heights( Args: containers: List of Container objects to probe, one per channel. use_channels: Channel indices to use for probing (0-indexed). - resource_offsets: Optional XYZ offsets from container centers. Auto-calculated for single containers with odd channel counts to avoid center dividers. Defaults to container centers. - lld_mode: Detection mode - LLDMode(1) for capacitive, LLDMode(2) for pressure-based. Defaults to capacitive. + resource_offsets: Optional XYZ offsets from container centers. Auto-calculated for single + containers with odd channel counts to avoid center dividers. Defaults to container centers. + lld_mode: Detection mode - LLDMode(1) for capacitive, LLDMode(2) for pressure-based. + Defaults to capacitive. search_speed: Z-axis search speed in mm/s. Default 10.0 mm/s. n_replicates: Number of measurements per channel. Default 1. - move_to_z_safety_after: Whether to move channels to safe Z height after probing. Default True. + move_to_z_safety_after: Whether to move channels to safe Z height after probing. + Default True. + allow_duplicate_channels: Whether to allow the same channel index to appear multiple times + in use_channels. Default False. + min_traverse_height_at_beginning_of_command: Absolute Z height (mm) to move involved + channels to before the first batch. None (default) uses full Z safety. + min_traverse_height_during_command: Absolute Z height (mm) to move involved channels to + between batches (X groups and Y sub-batches). None (default) uses full Z safety. + z_position_at_end_of_command: Absolute Z height (mm) to move involved channels to after + probing (only used when move_to_z_safety_after is True). None (default) uses full Z safety. + channel_acceleration: Search acceleration in mm/s^2. Default 800.0. + post_detection_trajectory: Post-detection move mode (0 or 1). Default 1. + post_detection_dist: Distance in mm to move up after detection. Default 0.0. + detection_edge: cLLD edge steepness threshold (0-1023). Default 10. + detection_drop: cLLD offset after edge detection (0-1023). Default 2. + channel_speed_above_start_pos_search: pLLD speed above search start in mm/s. Default 120.0. + z_drive_current_limit: pLLD Z-drive current limit. Default 3. + tip_has_filter: Whether tip has a filter. Default False. + dispense_drive_speed: pLLD dispense drive speed in mm/s. Default 5.0. + dispense_drive_acceleration: pLLD dispense drive acceleration in mm/s^2. Default 0.2. + dispense_drive_max_speed: pLLD dispense drive max speed in mm/s. Default 14.5. + dispense_drive_current_limit: pLLD dispense drive current limit. Default 3. + plld_detection_edge: pLLD edge detection threshold. Default 30. + plld_detection_drop: pLLD detection drop. Default 10. + clld_verification: Enable cLLD verification in pLLD mode. Default False. + clld_detection_edge: cLLD verification edge threshold. Default 10. + clld_detection_drop: cLLD verification drop. Default 2. + max_delta_plld_clld: Max allowed delta between pLLD and cLLD in mm. Default 5.0. + plld_mode: Pressure LLD mode. Defaults to PressureLLDMode.LIQUID for pLLD. + plld_foam_detection_drop: Foam detection drop. Default 30. + plld_foam_detection_edge_tolerance: Foam detection edge tolerance. Default 30. + plld_foam_ad_values: Foam AD values. Default 30. + plld_foam_search_speed: Foam search speed in mm/s. Default 10.0. + dispense_back_plld_volume: Volume to dispense back after pLLD in uL. Default None. Returns: Mean of measured liquid heights for each container (mm from cavity bottom). Raises: RuntimeError: If channels lack tips. - NotImplementedError: If channels require different X positions. Notes: - All specified channels must have tips attached - - All channels must be at the same X position (single-row operation) + - Containers at different X positions are probed in sequential groups (single X carriage) - For single containers with odd channel counts, Y-offsets are applied to avoid center dividers (Hamilton 1000 uL spacing: 9mm, offset: 5.5mm) """ if use_channels is None: use_channels = list(range(len(containers))) + if len(use_channels) == 0: + raise ValueError("use_channels must not be empty.") + if not all(0 <= ch < self.num_channels for ch in use_channels): + raise ValueError( + f"All use_channels must be integers in range [0, {self.num_channels - 1}], " + f"got {use_channels}." + ) # Handle tip positioning ... if SINGLE container instance if resource_offsets is None: if len(set(containers)) == 1: - resource_offsets = get_wide_single_resource_liquid_op_offsets( - resource=containers[0], num_channels=len(containers) + container_size_y = containers[0].get_absolute_size_y() + # For non-consecutive channels (e.g. [0,1,2,5,6,7]), we must account for + # phantom intermediate channels (3,4) that physically exist between them. + # Compute offsets for the full channel range (min to max), then pick only + # the offsets corresponding to the actual channels being used. + num_channels_in_span = max(use_channels) - min(use_channels) + 1 + min_required = ( + MIN_SPACING_EDGE * 2 + (num_channels_in_span - 1) * MIN_SPACING_BETWEEN_CHANNELS ) + if container_size_y >= min_required: + all_offsets = get_wide_single_resource_liquid_op_offsets( + containers[0], num_channels_in_span + ) + min_ch = min(use_channels) + resource_offsets = [all_offsets[ch - min_ch] for ch in use_channels] - if len(use_channels) % 2 != 0: - # Hamilton 1000 uL channels are 9 mm apart, so offset by half the distance - # + extra for the potential central 'splash guard' - y_offset = 5.5 - resource_offsets = [ - resource_offsets[i] + Coordinate(0, y_offset, 0) for i in range(len(use_channels)) - ] + if num_channels_in_span % 2 != 0: + y_offset = 5.5 + resource_offsets = [offset + Coordinate(0, y_offset, 0) for offset in resource_offsets] + # else: container too small to fit all channels — fall back to center offsets. + # Y sub-batching will serialize channels that can't coexist. resource_offsets = resource_offsets or [Coordinate.zero()] * len(containers) @@ -1835,9 +1920,14 @@ async def probe_liquid_heights( if lld_mode not in {self.LLDMode.GAMMA, self.LLDMode.PRESSURE}: raise ValueError(f"LLDMode must be 1 (capacitive) or 2 (pressure-based), is {lld_mode}") + if not allow_duplicate_channels and len(use_channels) != len(set(use_channels)): + raise ValueError( + "use_channels must not contain duplicates. Set allow_duplicate_channels=True to override." + ) + if not len(containers) == len(use_channels) == len(resource_offsets): raise ValueError( - "Length of containers, use_channels, resource_offsets and tip_lengths must match." + "Length of containers, use_channels, and resource_offsets must match." f"are {len(containers)}, {len(use_channels)}, {len(resource_offsets)}." ) @@ -1848,111 +1938,202 @@ async def probe_liquid_heights( tip_lengths = [await self.request_tip_len_on_channel(channel_idx=idx) for idx in use_channels] - # Move channels to safe Z height before starting + # Move all channels to Z safety first (including uninvolved channels), then optionally + # lower only the involved channels to the requested traverse height. await self.move_all_channels_in_z_safety() + if min_traverse_height_at_beginning_of_command is not None: + await self.position_channels_in_z_direction( + {ch: min_traverse_height_at_beginning_of_command for ch in use_channels} + ) - # Check if all channels are on the same x position, then move there + # Compute X and Y positions for all containers x_pos = [ resource.get_location_wrt(self.deck, x="c", y="c", z="b").x + offset.x for resource, offset in zip(containers, resource_offsets) ] - if len(set(x_pos)) > 1: # TODO: implement - raise NotImplementedError( - "probe_liquid_heights is not yet supported for multiple x positions." - ) - await self.move_channel_x(0, x_pos[0]) - - # Move channels to their y positions y_pos = [ resource.get_location_wrt(self.deck, x="c", y="c", z="b").y + offset.y for resource, offset in zip(containers, resource_offsets) ] - await self.position_channels_in_y_direction( - {channel: y for channel, y in zip(use_channels, y_pos)} - ) - # Detect liquid heights + # Group indices by unique X position (preserving order of first appearance). + # Round to 0.1mm to avoid floating point splitting of same-position containers. + x_groups: Dict[float, List[int]] = {} + for i, x in enumerate(x_pos): + x_rounded = round(x, 1) + x_groups.setdefault(x_rounded, []).append(i) + + # Precompute detection function and kwargs (mode doesn't change between groups) + detect_func: Callable[..., Any] + if lld_mode == self.LLDMode.GAMMA: + detect_func = self._move_z_drive_to_liquid_surface_using_clld + extra_kwargs: dict = { + "channel_acceleration": channel_acceleration, + "detection_edge": detection_edge, + "detection_drop": detection_drop, + "post_detection_trajectory": post_detection_trajectory, + "post_detection_dist": post_detection_dist, + } + else: + detect_func = self._search_for_surface_using_plld + extra_kwargs = { + "channel_acceleration": channel_acceleration, + "channel_speed_above_start_pos_search": channel_speed_above_start_pos_search, + "z_drive_current_limit": z_drive_current_limit, + "tip_has_filter": tip_has_filter, + "dispense_drive_speed": dispense_drive_speed, + "dispense_drive_acceleration": dispense_drive_acceleration, + "dispense_drive_max_speed": dispense_drive_max_speed, + "dispense_drive_current_limit": dispense_drive_current_limit, + "plld_detection_edge": plld_detection_edge, + "plld_detection_drop": plld_detection_drop, + "clld_verification": clld_verification, + "clld_detection_edge": clld_detection_edge, + "clld_detection_drop": clld_detection_drop, + "max_delta_plld_clld": max_delta_plld_clld, + "plld_mode": plld_mode if plld_mode is not None else self.PressureLLDMode.LIQUID, + "plld_foam_detection_drop": plld_foam_detection_drop, + "plld_foam_detection_edge_tolerance": plld_foam_detection_edge_tolerance, + "plld_foam_ad_values": plld_foam_ad_values, + "plld_foam_search_speed": plld_foam_search_speed, + "dispense_back_plld_volume": dispense_back_plld_volume, + "post_detection_trajectory": post_detection_trajectory, + "post_detection_dist": post_detection_dist, + } + + # Detect liquid heights, iterating over X groups sequentially (single X carriage) absolute_heights_measurements: Dict[int, List[Optional[float]]] = { ch: [] for ch in use_channels } - lowest_immers_positions = [ - container.get_absolute_location("c", "c", "cavity_bottom").z - + tip_len - - self.DEFAULT_TIP_FITTING_DEPTH - for container, tip_len in zip(containers, tip_lengths) - ] - start_pos_searches = [ - container.get_absolute_location("c", "c", "t").z - + tip_len - - self.DEFAULT_TIP_FITTING_DEPTH - + 5 - for container, tip_len in zip(containers, tip_lengths) - ] - try: - for _ in range(n_replicates): - if lld_mode == self.LLDMode.GAMMA: - results = await asyncio.gather( - *[ - self._move_z_drive_to_liquid_surface_using_clld( - channel_idx=channel, - lowest_immers_pos=lip, - start_pos_search=sps, - channel_speed=search_speed, - ) - for channel, lip, sps in zip( - use_channels, lowest_immers_positions, start_pos_searches + is_first_x_group = True + prev_indices: Optional[List[int]] = None + for _, indices in x_groups.items(): + # Use the actual (non-rounded) X position of the first container in this group + group_x = x_pos[indices[0]] + + # Raise channels before moving X carriage (tips may be lowered from previous group) + if not is_first_x_group: + assert prev_indices is not None + if min_traverse_height_during_command is None: + await self.move_all_channels_in_z_safety() + else: + prev_channels = [use_channels[i] for i in prev_indices] + await self.position_channels_in_z_direction( + {ch: min_traverse_height_during_command for ch in prev_channels} + ) + await self.move_channel_x(0, group_x) + + # Within this X group, partition into Y sub-batches of channels that can be + # positioned simultaneously. Channels must be in descending Y order by channel + # index, with at least _channel_minimum_y_spacing between consecutive channels. + # Sort by channel index ascending, then greedily assign to compatible batches. + sorted_entries = sorted(indices, key=lambda i: use_channels[i]) + y_batches: List[List[int]] = [] # each batch is a list of indices into original arrays + for idx in sorted_entries: + ch = use_channels[idx] + y = y_pos[idx] + placed = False + for batch in y_batches: + last_idx = batch[-1] + last_ch = use_channels[last_idx] + last_y = y_pos[last_idx] + # Channel index increases, so Y must decrease by at least + # (channel gap) * minimum_spacing to leave room for intermediate channels. + if last_y - y >= (ch - last_ch) * self._channel_minimum_y_spacing: + batch.append(idx) + placed = True + break + if not placed: + y_batches.append([idx]) + + for y_batch_idx, y_batch in enumerate(y_batches): + batch_channels = [use_channels[i] for i in y_batch] + batch_containers = [containers[i] for i in y_batch] + batch_tip_lengths = [tip_lengths[i] for i in y_batch] + + # Raise channels before Y repositioning (skip first batch in each X group — + # already safe from the X-group-level raise or initial raise) + if y_batch_idx > 0: + if min_traverse_height_during_command is None: + await self.move_all_channels_in_z_safety() + else: + prev_batch_channels = [use_channels[i] for i in y_batches[y_batch_idx - 1]] + await self.position_channels_in_z_direction( + {ch: min_traverse_height_during_command for ch in prev_batch_channels} ) - ], - return_exceptions=True, - ) - else: - results = await asyncio.gather( - *[ - self._search_for_surface_using_plld( - channel_idx=channel, - lowest_immers_pos=lip, - start_pos_search=sps, - channel_speed=search_speed, - dispense_drive_speed=5.0, - plld_mode=self.PressureLLDMode.LIQUID, - clld_verification=False, - post_detection_dist=0.0, - ) - for channel, lip, sps in zip( - use_channels, lowest_immers_positions, start_pos_searches - ) - ], - return_exceptions=True, - ) + # Position the batch's channels in Y, including any intermediate channels + # (channels between batch members that aren't part of this batch) to ensure + # they don't violate the descending-order / minimum-spacing constraint. + y_positions: Dict[int, float] = {use_channels[i]: y_pos[i] for i in y_batch} + sorted_batch_chs = sorted(batch_channels) + for k in range(len(sorted_batch_chs) - 1): + ch_lo, ch_hi = sorted_batch_chs[k], sorted_batch_chs[k + 1] + for intermediate_ch in range(ch_lo + 1, ch_hi): + if intermediate_ch not in y_positions: + y_positions[intermediate_ch] = ( + y_positions[ch_lo] - (intermediate_ch - ch_lo) * self._channel_minimum_y_spacing + ) + await self.position_channels_in_y_direction(y_positions) + + # Compute Z search bounds for this batch + batch_lowest_immers = [ + container.get_absolute_location("c", "c", "cavity_bottom").z + + tip_len + - self.DEFAULT_TIP_FITTING_DEPTH + for container, tip_len in zip(batch_containers, batch_tip_lengths) + ] + batch_start_pos = [ + container.get_absolute_location("c", "c", "t").z + + tip_len + - self.DEFAULT_TIP_FITTING_DEPTH + + 5 + for container, tip_len in zip(batch_containers, batch_tip_lengths) + ] - # Get heights for ALL channels, handling failures for channels with no liquid - # (indexed 0 to self.num_channels-1) but only store for used channels - current_absolute_liquid_heights = await self.request_pip_height_last_lld() - for idx, (ch_idx, result) in enumerate(zip(use_channels, results)): - if isinstance(result, STARFirmwareError): - # Check if it's specifically the "no liquid found" error - error_msg = str(result).lower() - if "no liquid level found" in error_msg or "no liquid was present" in error_msg: - height = None # No liquid detected - this is expected - msg = ( - f"Channel {ch_idx}: No liquid detected. Could be because there is " - f"no liquid in container {containers[idx].name} or liquid level is too low." - ) - if lld_mode == self.LLDMode.GAMMA: - msg += " Consider using pressure-based LLD if liquid is believed to exist." - logger.warning(msg) - else: - # Some other firmware error - re-raise it - raise result - elif isinstance(result, Exception): - # Some other unexpected error - re-raise it - raise result - else: - height = current_absolute_liquid_heights[ch_idx] - absolute_heights_measurements[ch_idx].append(height) + # Run n_replicates detection loop for this batch + for _ in range(n_replicates): + results = await asyncio.gather( + *[ + detect_func( + channel_idx=channel, + lowest_immers_pos=lip, + start_pos_search=sps, + channel_speed=search_speed, + **extra_kwargs, + ) + for channel, lip, sps in zip(batch_channels, batch_lowest_immers, batch_start_pos) + ], + return_exceptions=True, + ) + + # Get heights for ALL channels, handling failures for channels with no liquid + current_absolute_liquid_heights = await self.request_pip_height_last_lld() + for local_idx, (ch_idx, result) in enumerate(zip(batch_channels, results)): + orig_idx = y_batch[local_idx] + if isinstance(result, STARFirmwareError): + error_msg = str(result).lower() + if "no liquid level found" in error_msg or "no liquid was present" in error_msg: + height = None + msg = ( + f"Channel {ch_idx}: No liquid detected. Could be because there is " + f"no liquid in container {containers[orig_idx].name} or liquid level " + f"is too low." + ) + if lld_mode == self.LLDMode.GAMMA: + msg += " Consider using pressure-based LLD if liquid is believed to exist." + logger.warning(msg) + else: + raise result + elif isinstance(result, Exception): + raise result + else: + height = current_absolute_liquid_heights[ch_idx] + absolute_heights_measurements[ch_idx].append(height) + prev_indices = y_batches[-1] # last Y batch's indices, for Z raise on next X group + is_first_x_group = False except: await self.move_all_channels_in_z_safety() raise @@ -1983,7 +2164,12 @@ async def probe_liquid_heights( ) if move_to_z_safety_after: - await self.move_all_channels_in_z_safety() + if z_position_at_end_of_command is None: + await self.move_all_channels_in_z_safety() + else: + await self.position_channels_in_z_direction( + {ch: z_position_at_end_of_command for ch in use_channels} + ) return relative_to_well @@ -2016,8 +2202,7 @@ async def probe_liquid_volumes( Volumes in each container (uL). Raises: - ValueError: If any container doesn't support height-to-volume conversion (raised by probe_liquid_heights). - NotImplementedError: If channels require different X positions. + ValueError: If any container doesn't support height-to-volume conversion. Notes: - Delegates all motion, LLD, validation, and safety logic to probe_liquid_heights @@ -2474,6 +2659,7 @@ async def aspirate( use_channels=use_channels, resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, + allow_duplicate_channels=True, ) # override minimum traversal height because we don't want to move channels up. we are already above the liquid. @@ -2836,6 +3022,7 @@ async def dispense( use_channels=use_channels, resource_offsets=[op.offset for op in ops], move_to_z_safety_after=False, + allow_duplicate_channels=True, ) # override minimum traversal height because we don't want to move channels up. we are already above the liquid.