diff --git a/pylabrobot/liquid_handling/liquid_handler.py b/pylabrobot/liquid_handling/liquid_handler.py index 027ec486ab5..f20084b6244 100644 --- a/pylabrobot/liquid_handling/liquid_handler.py +++ b/pylabrobot/liquid_handling/liquid_handler.py @@ -327,6 +327,66 @@ def _log_command(self, name: str, **kwargs) -> None: params = ", ".join(f"{k}={self._format_param(v)}" for k, v in kwargs.items()) logger.debug("%s(%s)", name, params) + # Attempt to add function of repeated going into same well instead of raising error of well too small. + # This part changes the return into a list-valued argument to be accepted by the Backend and leave Scalars where required. + + _SCALAR_BACKEND_KWARGS = { + "minimum_traverse_height_at_beginning_of_a_command", + "min_z_endpos", + } + + def _normalize_backend_kwargs(self, backend_kwargs, n_ops: int, index: int | None = None): + """Normalize backend kwargs to lists of length n_ops.""" + normalized = {} + + for key, val in backend_kwargs.items(): + if key in self._SCALAR_BACKEND_KWARGS: + normalized[key] = val + continue + if val is None: + normalized[key] = None + elif isinstance(val, list): + if index is not None: + if index >= len(val): + raise ValueError( + f"Backend kwarg '{key}' has length {len(val)}, " + f"but channel index {index} was requested." + ) + normalized[key] = [val[index]] + else: + normalized[key] = val + else: + normalized[key] = [val] * n_ops + return normalized + + async def _run_backend_op( + self, + fn, + ops, + use_channels, + sequential: bool, + **backend_kwargs, + ): + """Run a backend liquid operation, sequentially if needed. + + This is used to safely access a single resource with multiple channels + when geometry spacing is not possible. + """ + if sequential and len(use_channels) > 1: + # ⚠️ Debug log for sequential fallback + logger.debug( + "Sequential fallback active: operating one channel at a time " "with %d channels", + len(use_channels), + ) + + for i, (ch, op) in enumerate(zip(use_channels, ops)): + normalized_kwargs = self._normalize_backend_kwargs(backend_kwargs, n_ops=1, index=i) + + await fn(ops=[op], use_channels=[ch], **normalized_kwargs) + else: + normalized_kwargs = self._normalize_backend_kwargs(backend_kwargs, n_ops=len(ops)) + await fn(ops=ops, use_channels=use_channels, **normalized_kwargs) + def get_picked_up_resource(self) -> Optional[Resource]: """Get the resource that is currently picked up. @@ -888,6 +948,10 @@ async def aspirate( # If the user specified a single resource, but multiple channels to use, we will assume they # want to space the channels evenly across the resource. Note that offsets are relative to the # center of the resource. + + # Adding Sequential function, but is default off. + sequential = False + if len(set(resources)) == 1: resource = resources[0] resources = [resource] * len(use_channels) @@ -907,6 +971,9 @@ async def aspirate( # add user defined offsets to the computed centers offsets = [c + o for c, o in zip(center_offsets, offsets)] + # Detect sequential fallback + sequential = len(use_channels) > 1 and len(set((o.x, o.y, o.z) for o in offsets)) == 1 + # create operations aspirations = [ SingleChannelAspiration( @@ -950,7 +1017,16 @@ async def aspirate( # actually aspirate the liquid error: Optional[Exception] = None try: - await self.backend.aspirate(ops=aspirations, use_channels=use_channels, **backend_kwargs) + # Below is replaced with try sequential pipetting. + # await self.backend.aspirate(ops=aspirations, use_channels=use_channels, **backend_kwargs) + # Code for trying sequential pipetting. + await self._run_backend_op( + self.backend.aspirate, + ops=aspirations, + use_channels=use_channels, + sequential=sequential, + **backend_kwargs, + ) except Exception as e: error = e diff --git a/pylabrobot/liquid_handling/utils.py b/pylabrobot/liquid_handling/utils.py index 0437be31037..b62b55cee9e 100644 --- a/pylabrobot/liquid_handling/utils.py +++ b/pylabrobot/liquid_handling/utils.py @@ -11,7 +11,10 @@ def _get_centers_with_margin(dim_size: float, n: int, margin: float, min_spacing: float): """Get the centers of the channels with a minimum margin on the edges.""" if dim_size < margin * 2 + (n - 1) * min_spacing: - raise ValueError("Resource is too small to space channels.") + # raise ValueError("Resource is too small to space channels.") + # Returns middle of source well for sequential pipetting. + center = dim_size / 2 + return [center] * n if dim_size - (n - 1) * min_spacing <= min_spacing * 2: remaining_space = dim_size - (n - 1) * min_spacing - margin * 2 return [margin + remaining_space / 2 + i * min_spacing for i in range(n)]