Skip to content

Commit

Permalink
move lid labware invalidation and deck slot support checking
Browse files Browse the repository at this point in the history
  • Loading branch information
CaseyBatten committed Jan 13, 2025
1 parent 069570c commit c1a17c0
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 110 deletions.
52 changes: 36 additions & 16 deletions api/src/opentrons/protocol_api/core/engine/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def move_labware(
existing_module_ids=list(self._module_cores_by_id.keys()),
)

def move_lid(
def move_lid( # noqa: C901
self,
source_location: Union[DeckSlotName, StagingSlotName, LabwareCore],
new_location: Union[
Expand All @@ -467,9 +467,13 @@ def move_lid(
else:
strategy = LabwareMovementStrategy.MANUAL_MOVE_WITHOUT_PAUSE

if isinstance(source_location, DeckSlotName) or isinstance(source_location, StagingSlotName):
if isinstance(source_location, DeckSlotName) or isinstance(
source_location, StagingSlotName
):
# Find the source labware at the provided deck slot
labware_in_slot = self._engine_client.state.labware.get_by_slot(source_location)
labware_in_slot = self._engine_client.state.labware.get_by_slot(
source_location
)
if labware_in_slot is None:
raise LabwareNotLoadedOnLabwareError(
"Lid cannot be loaded on non-labware position."
Expand All @@ -478,11 +482,9 @@ def move_lid(
labware = LabwareCore(labware_in_slot.id, self._engine_client)
else:
labware = source_location

# if this is a labware stack, we need to find the labware at the top of the stack
if labware_validation.is_lid_stack(
labware.load_name
):
if labware_validation.is_lid_stack(labware.load_name):
lid_id = self._engine_client.state.labware.get_highest_child_labware(
labware.labware_id
)
Expand All @@ -509,21 +511,33 @@ def move_lid(
else None
)

if isinstance(new_location, DeckSlotName) or isinstance(new_location, StagingSlotName):
if isinstance(new_location, DeckSlotName) or isinstance(
new_location, StagingSlotName
):
# Find the destination labware at the provided deck slot
destination_labware_in_slot = self._engine_client.state.labware.get_by_slot(new_location)
destination_labware_in_slot = self._engine_client.state.labware.get_by_slot(
new_location
)
if destination_labware_in_slot is None:
to_location = self._convert_labware_location(location=new_location)
else:
highest_child_location = self._engine_client.state.labware.get_highest_child_labware(
destination_labware_in_slot.id
highest_child_location = (
self._engine_client.state.labware.get_highest_child_labware(
destination_labware_in_slot.id
)
)
to_location = self._convert_labware_location(
location=LabwareCore(highest_child_location, self._engine_client)
)
to_location = self._convert_labware_location(location=LabwareCore(highest_child_location, self._engine_client))
elif isinstance(new_location, LabwareCore):
highest_child_location = self._engine_client.state.labware.get_highest_child_labware(
highest_child_location = (
self._engine_client.state.labware.get_highest_child_labware(
new_location.labware_id
)
to_location = self._convert_labware_location(location=LabwareCore(highest_child_location, self._engine_client))
)
to_location = self._convert_labware_location(
location=LabwareCore(highest_child_location, self._engine_client)
)
else:
to_location = self._convert_labware_location(location=new_location)

Expand Down Expand Up @@ -559,8 +573,14 @@ def move_lid(

# If we end up create a new lid stack, return the lid stack
parent_location = self._engine_client.state.labware.get_location(lid_id)
if isinstance(parent_location, OnLabwareLocation) and labware_validation.is_lid_stack(self._engine_client.state.labware.get_load_name(parent_location.labwareId)):
return LabwareCore(labware_id=parent_location.labwareId, engine_client=self._engine_client)
if isinstance(
parent_location, OnLabwareLocation
) and labware_validation.is_lid_stack(
self._engine_client.state.labware.get_load_name(parent_location.labwareId)
):
return LabwareCore(
labware_id=parent_location.labwareId, engine_client=self._engine_client
)
return None

def _resolve_module_hardware(
Expand Down
6 changes: 3 additions & 3 deletions api/src/opentrons/protocol_engine/commands/load_lid.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ async def execute(self, params: LoadLidParams) -> SuccessData[LoadLidResult]:
state_update = StateUpdate()

# In the case of lids being loaded on top of other labware, set the parent labware's lid
state_update.set_lid(
parent_labware_id=params.location.labwareId,
lid_id=loaded_labware.labware_id,
state_update.set_lids(
parent_labware_ids=[params.location.labwareId],
lid_ids=[loaded_labware.labware_id],
)

state_update.set_loaded_labware(
Expand Down
101 changes: 12 additions & 89 deletions api/src/opentrons/protocol_engine/commands/move_lid.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Models and implementation for the ``moveLid`` command."""

from __future__ import annotations
from typing import TYPE_CHECKING, Optional, Type, Any, Dict, List
from typing import TYPE_CHECKING, Optional, Type, Any, List

from pydantic.json_schema import SkipJsonSchema
from pydantic import BaseModel, Field
Expand Down Expand Up @@ -133,7 +133,7 @@ async def execute(self, params: MoveLidParams) -> _ExecuteReturn: # noqa: C901
self._state_view.labware.get_definition(params.labwareId)
):
raise ValueError(
f"MoveLid command can only move labware with allowed role 'lid'. {self._state_view.labware.get_load_name(params.labwareId)}"
f"MoveLid command can only move labware with allowed role 'lid'. {self._state_view.labware.get_load_name(params.labwareId)}"
)

# Allow propagation of LabwareNotLoadedError.
Expand Down Expand Up @@ -192,6 +192,13 @@ async def execute(self, params: MoveLidParams) -> _ExecuteReturn: # noqa: C901
)

elif isinstance(params.newLocation, DeckSlotLocation):
if (
current_labware_definition.parameters.isDeckSlotCompatible is not None
and not current_labware_definition.parameters.isDeckSlotCompatible
):
raise ValueError(
f"Lid Labware {current_labware.loadName} cannot be moved onto a Deck Slot."
)
self._state_view.addressable_areas.raise_if_area_not_in_deck_configuration(
params.newLocation.slotName.id
)
Expand Down Expand Up @@ -325,8 +332,9 @@ async def execute(self, params: MoveLidParams) -> _ExecuteReturn: # noqa: C901
self._state_view.labware.get_load_name(current_labware.location.labwareId)
):
# if the source location is a labware stack, then this is the final lid in the stack and remove it
# NEW FUNCTION IN STATE UPDATE TO MAKE INVALIDATED LOCATION
raise ValueError("Lid Stack Invalidated")
state_update.set_labware_invalidated(
labware_id=current_labware.location.labwareId
)
elif (
self._state_view.labware.get_lid_by_labware_id(
current_labware.location.labwareId
Expand Down Expand Up @@ -400,91 +408,6 @@ async def execute(self, params: MoveLidParams) -> _ExecuteReturn: # noqa: C901
parent_labware_ids=parent_updates,
lid_ids=lid_updates,
)
# if len(parent_updates) > 1:
# raise ValueError(f"parents: {parent_updates} {self._state_view.labware.get_load_name(parent_updates[0])} lids: {lid_updates} --- labware location of the lid object currently: {current_labware.location} desired location: {available_new_location}")
# state_update.set_lids(
# parent_labware_ids=parent_updates,
# lid_ids=lid_updates,
# )

# parent_updates: List[str] = []
# lid_updates: List[str | None] = []
# # If the Lid originated on a parent labware, update the parent labware

# parent_labware = self._state_view.labware.get_labware_by_lid_id(
# lid_id=params.labwareId
# )
# if parent_labware:
# if labware_validation.is_lid_stack(load_name=parent_labware.loadName):
# # Move the empty Lid Stack Object to the Invalidated location
# # CASEY NOTE: NEED NEW STATE UPDATE TO INVALIDATE LID STACK
# state_update.set_labware_location(
# labware_id=parent_labware.id,
# new_location="invalidated",
# new_offset_id=None,
# )
# else:
# parent_updates.append(parent_labware.id)
# lid_updates.append(None)

# # If moving to a location with no lid stack, create one
# if isinstance(available_new_location, DeckSlotLocation) or (
# isinstance(available_new_location, OnLabwareLocation)
# and labware_validation.validate_definition_is_adapter(
# self._state_view.labware.get_definition(
# available_new_location.labwareId
# )
# )
# ):
# # we will need to generate a labware ID for a new lid stack
# lid_stack_object = await self._equipment.load_labware(
# load_name=_LID_STACK_PE_LABWARE,
# namespace=_LID_STACK_PE_NAMESPACE,
# version=_LID_STACK_PE_VERSION,
# location=available_new_location,
# labware_id=None,
# )
# if not labware_validation.validate_definition_is_system(
# lid_stack_object.definition
# ):
# raise ProtocolEngineError(
# message="Lid Stack Labware Object Labware Definition does not contain required allowed role 'system'."
# )
# # we will need to state update to add the lid stack to this position in space
# state_update.set_loaded_labware(
# definition=lid_stack_object.definition,
# labware_id=lid_stack_object.labware_id,
# offset_id=lid_stack_object.offsetId,
# display_name=None,
# location=available_new_location,
# )

# # Update the labware location to the new lid stack
# state_update.set_labware_location(
# labware_id=params.labwareId,
# new_location=OnLabwareLocation(labwareId=lid_stack_object.labware_id),
# new_offset_id=new_offset_id,
# )
# else:
# state_update.set_labware_location(
# labware_id=params.labwareId,
# new_location=available_new_location,
# new_offset_id=new_offset_id,
# )
# if isinstance(
# available_new_location, OnLabwareLocation
# ) and not labware_validation.is_lid_stack(
# self._state_view.labware.get_load_name(available_new_location.labwareId)
# ):
# parent_updates.append(available_new_location.labwareId)
# lid_updates.append(params.labwareId)
# # Update relevant Lid States
# state_update.set_lids(
# parent_labware_ids=parent_updates,
# lid_ids=lid_updates,
# )
# if len(parent_updates) > 1:
# raise ValueError(f"parents: {parent_updates} lids: {lid_updates}")

return SuccessData(
public=MoveLidResult(offsetId=new_offset_id),
Expand Down
14 changes: 12 additions & 2 deletions api/src/opentrons/protocol_engine/state/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
LabwareMovementOffsetData,
OnDeckLabwareLocation,
OFF_DECK_LOCATION,
INVALIDATED_LOCATION,
)
from ..actions import (
Action,
Expand Down Expand Up @@ -303,6 +304,15 @@ def _set_labware_location(self, state_update: update_types.StateUpdate) -> None:

self._state.labware_by_id[labware_id].location = new_location

def _set_labware_invalidated(self, state_update: update_types.StateUpdate) -> None:
labware_invalidation_update = state_update.invalidate_labware
if labware_invalidation_update != update_types.NO_CHANGE:
labware_id = labware_invalidation_update.labware_id

self._state.labware_by_id[labware_id].offsetId = None

self._state.labware_by_id[labware_id].location = INVALIDATED_LOCATION


class LabwareView:
"""Read-only labware state view."""
Expand Down Expand Up @@ -482,7 +492,7 @@ def get_labware_stack(
return self.get_labware_stack(labware_stack)
return labware_stack

def get_lid_by_labware_id(self, labware_id) -> LoadedLabware | None:
def get_lid_by_labware_id(self, labware_id: str) -> LoadedLabware | None:
"""Get the Lid Labware that is currently on top of a given labware, if there is one."""
lid_id = self._state.labware_by_id[labware_id].lid_id
if lid_id:
Expand Down Expand Up @@ -966,7 +976,7 @@ def raise_if_labware_cannot_be_stacked( # noqa: C901
for lw in labware_stack:
if not labware_validation.validate_definition_is_adapter(
self.get_definition(lw.id)
):
) and not labware_validation.is_lid_stack(self.get_load_name(lw.id)):
stack_without_adapters.append(lw)
if len(stack_without_adapters) >= self.get_labware_stacking_maximum(
top_labware_definition
Expand Down
19 changes: 19 additions & 0 deletions api/src/opentrons/protocol_engine/state/update_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ class LabwareLocationUpdate:
"""The ID of the labware's new offset, for its new location."""


@dataclasses.dataclass
class LabwareInvalidationUpdate:
"""Invalidate a labware's location."""

labware_id: str
"""The ID of the already-loaded labware."""


@dataclasses.dataclass
class LoadedLabwareUpdate:
"""An update that loads a new labware."""
Expand Down Expand Up @@ -362,6 +370,8 @@ class StateUpdate:

labware_location: LabwareLocationUpdate | NoChangeType = NO_CHANGE

invalidate_labware: LabwareInvalidationUpdate | NoChangeType = NO_CHANGE

loaded_labware: LoadedLabwareUpdate | NoChangeType = NO_CHANGE

loaded_lid_stack: LoadedLidStackUpdate | NoChangeType = NO_CHANGE
Expand Down Expand Up @@ -543,6 +553,15 @@ def set_lids(
)
return self

def set_labware_invalidated(
self: Self,
*,
labware_id: str,
) -> Self:
"""Invalidate a labware's location. See `LabwareInvalidationUpdate`."""
self.invalidate_labware = LabwareInvalidationUpdate(labware_id=labware_id)
return self

def set_load_pipette(
self: Self,
pipette_id: str,
Expand Down

0 comments on commit c1a17c0

Please sign in to comment.