Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 77 additions & 1 deletion pylabrobot/liquid_handling/liquid_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,66 @@ def _log_command(self, name: str, **kwargs) -> None:
params = ", ".join(f"{k}={self._format_param(v)}" for k, v in kwargs.items())
logger.debug("%s(%s)", name, params)

# Attempt to add function of repeated going into same well instead of raising error of well too small.
# This part changes the return into a list-valued argument to be accepted by the Backend and leave Scalars where required.

_SCALAR_BACKEND_KWARGS = {
"minimum_traverse_height_at_beginning_of_a_command",
"min_z_endpos",
}

def _normalize_backend_kwargs(self, backend_kwargs, n_ops: int, index: int | None = None):
"""Normalize backend kwargs to lists of length n_ops."""
normalized = {}

for key, val in backend_kwargs.items():
if key in self._SCALAR_BACKEND_KWARGS:
normalized[key] = val
continue
if val is None:
normalized[key] = None
elif isinstance(val, list):
if index is not None:
if index >= len(val):
raise ValueError(
f"Backend kwarg '{key}' has length {len(val)}, "
f"but channel index {index} was requested."
)
normalized[key] = [val[index]]
else:
normalized[key] = val
else:
normalized[key] = [val] * n_ops
return normalized

async def _run_backend_op(
self,
fn,
ops,
use_channels,
sequential: bool,
**backend_kwargs,
):
"""Run a backend liquid operation, sequentially if needed.

This is used to safely access a single resource with multiple channels
when geometry spacing is not possible.
"""
if sequential and len(use_channels) > 1:
# ⚠️ Debug log for sequential fallback
logger.debug(
"Sequential fallback active: operating one channel at a time " "with %d channels",
len(use_channels),
)

for i, (ch, op) in enumerate(zip(use_channels, ops)):
normalized_kwargs = self._normalize_backend_kwargs(backend_kwargs, n_ops=1, index=i)

await fn(ops=[op], use_channels=[ch], **normalized_kwargs)
else:
normalized_kwargs = self._normalize_backend_kwargs(backend_kwargs, n_ops=len(ops))
await fn(ops=ops, use_channels=use_channels, **normalized_kwargs)

def get_picked_up_resource(self) -> Optional[Resource]:
"""Get the resource that is currently picked up.

Expand Down Expand Up @@ -888,6 +948,10 @@ async def aspirate(
# If the user specified a single resource, but multiple channels to use, we will assume they
# want to space the channels evenly across the resource. Note that offsets are relative to the
# center of the resource.

# Adding Sequential function, but is default off.
sequential = False

if len(set(resources)) == 1:
resource = resources[0]
resources = [resource] * len(use_channels)
Expand All @@ -907,6 +971,9 @@ async def aspirate(
# add user defined offsets to the computed centers
offsets = [c + o for c, o in zip(center_offsets, offsets)]

# Detect sequential fallback
sequential = len(use_channels) > 1 and len(set((o.x, o.y, o.z) for o in offsets)) == 1

# create operations
aspirations = [
SingleChannelAspiration(
Expand Down Expand Up @@ -950,7 +1017,16 @@ async def aspirate(
# actually aspirate the liquid
error: Optional[Exception] = None
try:
await self.backend.aspirate(ops=aspirations, use_channels=use_channels, **backend_kwargs)
# Below is replaced with try sequential pipetting.
# await self.backend.aspirate(ops=aspirations, use_channels=use_channels, **backend_kwargs)
# Code for trying sequential pipetting.
await self._run_backend_op(
self.backend.aspirate,
ops=aspirations,
use_channels=use_channels,
sequential=sequential,
**backend_kwargs,
)
except Exception as e:
error = e

Expand Down
5 changes: 4 additions & 1 deletion pylabrobot/liquid_handling/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
def _get_centers_with_margin(dim_size: float, n: int, margin: float, min_spacing: float):
"""Get the centers of the channels with a minimum margin on the edges."""
if dim_size < margin * 2 + (n - 1) * min_spacing:
raise ValueError("Resource is too small to space channels.")
# raise ValueError("Resource is too small to space channels.")
# Returns middle of source well for sequential pipetting.
center = dim_size / 2
return [center] * n
if dim_size - (n - 1) * min_spacing <= min_spacing * 2:
remaining_space = dim_size - (n - 1) * min_spacing - margin * 2
return [margin + remaining_space / 2 + i * min_spacing for i in range(n)]
Expand Down