From c1a17c0921de17419779c0375a51efc6bd0e5cbc Mon Sep 17 00:00:00 2001 From: CaseyBatten Date: Mon, 13 Jan 2025 18:13:35 -0500 Subject: [PATCH] move lid labware invalidation and deck slot support checking --- .../protocol_api/core/engine/protocol.py | 52 ++++++--- .../protocol_engine/commands/load_lid.py | 6 +- .../protocol_engine/commands/move_lid.py | 101 +++--------------- .../protocol_engine/state/labware.py | 14 ++- .../protocol_engine/state/update_types.py | 19 ++++ 5 files changed, 82 insertions(+), 110 deletions(-) diff --git a/api/src/opentrons/protocol_api/core/engine/protocol.py b/api/src/opentrons/protocol_api/core/engine/protocol.py index 47a5186e553..d3b71210fc5 100644 --- a/api/src/opentrons/protocol_api/core/engine/protocol.py +++ b/api/src/opentrons/protocol_api/core/engine/protocol.py @@ -443,7 +443,7 @@ def move_labware( existing_module_ids=list(self._module_cores_by_id.keys()), ) - def move_lid( + def move_lid( # noqa: C901 self, source_location: Union[DeckSlotName, StagingSlotName, LabwareCore], new_location: Union[ @@ -467,9 +467,13 @@ def move_lid( else: strategy = LabwareMovementStrategy.MANUAL_MOVE_WITHOUT_PAUSE - if isinstance(source_location, DeckSlotName) or isinstance(source_location, StagingSlotName): + if isinstance(source_location, DeckSlotName) or isinstance( + source_location, StagingSlotName + ): # Find the source labware at the provided deck slot - labware_in_slot = self._engine_client.state.labware.get_by_slot(source_location) + labware_in_slot = self._engine_client.state.labware.get_by_slot( + source_location + ) if labware_in_slot is None: raise LabwareNotLoadedOnLabwareError( "Lid cannot be loaded on non-labware position." @@ -478,11 +482,9 @@ def move_lid( labware = LabwareCore(labware_in_slot.id, self._engine_client) else: labware = source_location - + # if this is a labware stack, we need to find the labware at the top of the stack - if labware_validation.is_lid_stack( - labware.load_name - ): + if labware_validation.is_lid_stack(labware.load_name): lid_id = self._engine_client.state.labware.get_highest_child_labware( labware.labware_id ) @@ -509,21 +511,33 @@ def move_lid( else None ) - if isinstance(new_location, DeckSlotName) or isinstance(new_location, StagingSlotName): + if isinstance(new_location, DeckSlotName) or isinstance( + new_location, StagingSlotName + ): # Find the destination labware at the provided deck slot - destination_labware_in_slot = self._engine_client.state.labware.get_by_slot(new_location) + destination_labware_in_slot = self._engine_client.state.labware.get_by_slot( + new_location + ) if destination_labware_in_slot is None: to_location = self._convert_labware_location(location=new_location) else: - highest_child_location = self._engine_client.state.labware.get_highest_child_labware( - destination_labware_in_slot.id + highest_child_location = ( + self._engine_client.state.labware.get_highest_child_labware( + destination_labware_in_slot.id + ) + ) + to_location = self._convert_labware_location( + location=LabwareCore(highest_child_location, self._engine_client) ) - to_location = self._convert_labware_location(location=LabwareCore(highest_child_location, self._engine_client)) elif isinstance(new_location, LabwareCore): - highest_child_location = self._engine_client.state.labware.get_highest_child_labware( + highest_child_location = ( + self._engine_client.state.labware.get_highest_child_labware( new_location.labware_id ) - to_location = self._convert_labware_location(location=LabwareCore(highest_child_location, self._engine_client)) + ) + to_location = self._convert_labware_location( + location=LabwareCore(highest_child_location, self._engine_client) + ) else: to_location = self._convert_labware_location(location=new_location) @@ -559,8 +573,14 @@ def move_lid( # If we end up create a new lid stack, return the lid stack parent_location = self._engine_client.state.labware.get_location(lid_id) - if isinstance(parent_location, OnLabwareLocation) and labware_validation.is_lid_stack(self._engine_client.state.labware.get_load_name(parent_location.labwareId)): - return LabwareCore(labware_id=parent_location.labwareId, engine_client=self._engine_client) + if isinstance( + parent_location, OnLabwareLocation + ) and labware_validation.is_lid_stack( + self._engine_client.state.labware.get_load_name(parent_location.labwareId) + ): + return LabwareCore( + labware_id=parent_location.labwareId, engine_client=self._engine_client + ) return None def _resolve_module_hardware( diff --git a/api/src/opentrons/protocol_engine/commands/load_lid.py b/api/src/opentrons/protocol_engine/commands/load_lid.py index 4f2e49c7447..323b1ab4271 100644 --- a/api/src/opentrons/protocol_engine/commands/load_lid.py +++ b/api/src/opentrons/protocol_engine/commands/load_lid.py @@ -99,9 +99,9 @@ async def execute(self, params: LoadLidParams) -> SuccessData[LoadLidResult]: state_update = StateUpdate() # In the case of lids being loaded on top of other labware, set the parent labware's lid - state_update.set_lid( - parent_labware_id=params.location.labwareId, - lid_id=loaded_labware.labware_id, + state_update.set_lids( + parent_labware_ids=[params.location.labwareId], + lid_ids=[loaded_labware.labware_id], ) state_update.set_loaded_labware( diff --git a/api/src/opentrons/protocol_engine/commands/move_lid.py b/api/src/opentrons/protocol_engine/commands/move_lid.py index 14ad6517fb9..34808c018a5 100644 --- a/api/src/opentrons/protocol_engine/commands/move_lid.py +++ b/api/src/opentrons/protocol_engine/commands/move_lid.py @@ -1,7 +1,7 @@ """Models and implementation for the ``moveLid`` command.""" from __future__ import annotations -from typing import TYPE_CHECKING, Optional, Type, Any, Dict, List +from typing import TYPE_CHECKING, Optional, Type, Any, List from pydantic.json_schema import SkipJsonSchema from pydantic import BaseModel, Field @@ -133,7 +133,7 @@ async def execute(self, params: MoveLidParams) -> _ExecuteReturn: # noqa: C901 self._state_view.labware.get_definition(params.labwareId) ): raise ValueError( - f"MoveLid command can only move labware with allowed role 'lid'. {self._state_view.labware.get_load_name(params.labwareId)}" + f"MoveLid command can only move labware with allowed role 'lid'. {self._state_view.labware.get_load_name(params.labwareId)}" ) # Allow propagation of LabwareNotLoadedError. @@ -192,6 +192,13 @@ async def execute(self, params: MoveLidParams) -> _ExecuteReturn: # noqa: C901 ) elif isinstance(params.newLocation, DeckSlotLocation): + if ( + current_labware_definition.parameters.isDeckSlotCompatible is not None + and not current_labware_definition.parameters.isDeckSlotCompatible + ): + raise ValueError( + f"Lid Labware {current_labware.loadName} cannot be moved onto a Deck Slot." + ) self._state_view.addressable_areas.raise_if_area_not_in_deck_configuration( params.newLocation.slotName.id ) @@ -325,8 +332,9 @@ async def execute(self, params: MoveLidParams) -> _ExecuteReturn: # noqa: C901 self._state_view.labware.get_load_name(current_labware.location.labwareId) ): # if the source location is a labware stack, then this is the final lid in the stack and remove it - # NEW FUNCTION IN STATE UPDATE TO MAKE INVALIDATED LOCATION - raise ValueError("Lid Stack Invalidated") + state_update.set_labware_invalidated( + labware_id=current_labware.location.labwareId + ) elif ( self._state_view.labware.get_lid_by_labware_id( current_labware.location.labwareId @@ -400,91 +408,6 @@ async def execute(self, params: MoveLidParams) -> _ExecuteReturn: # noqa: C901 parent_labware_ids=parent_updates, lid_ids=lid_updates, ) - # if len(parent_updates) > 1: - # raise ValueError(f"parents: {parent_updates} {self._state_view.labware.get_load_name(parent_updates[0])} lids: {lid_updates} --- labware location of the lid object currently: {current_labware.location} desired location: {available_new_location}") - # state_update.set_lids( - # parent_labware_ids=parent_updates, - # lid_ids=lid_updates, - # ) - - # parent_updates: List[str] = [] - # lid_updates: List[str | None] = [] - # # If the Lid originated on a parent labware, update the parent labware - - # parent_labware = self._state_view.labware.get_labware_by_lid_id( - # lid_id=params.labwareId - # ) - # if parent_labware: - # if labware_validation.is_lid_stack(load_name=parent_labware.loadName): - # # Move the empty Lid Stack Object to the Invalidated location - # # CASEY NOTE: NEED NEW STATE UPDATE TO INVALIDATE LID STACK - # state_update.set_labware_location( - # labware_id=parent_labware.id, - # new_location="invalidated", - # new_offset_id=None, - # ) - # else: - # parent_updates.append(parent_labware.id) - # lid_updates.append(None) - - # # If moving to a location with no lid stack, create one - # if isinstance(available_new_location, DeckSlotLocation) or ( - # isinstance(available_new_location, OnLabwareLocation) - # and labware_validation.validate_definition_is_adapter( - # self._state_view.labware.get_definition( - # available_new_location.labwareId - # ) - # ) - # ): - # # we will need to generate a labware ID for a new lid stack - # lid_stack_object = await self._equipment.load_labware( - # load_name=_LID_STACK_PE_LABWARE, - # namespace=_LID_STACK_PE_NAMESPACE, - # version=_LID_STACK_PE_VERSION, - # location=available_new_location, - # labware_id=None, - # ) - # if not labware_validation.validate_definition_is_system( - # lid_stack_object.definition - # ): - # raise ProtocolEngineError( - # message="Lid Stack Labware Object Labware Definition does not contain required allowed role 'system'." - # ) - # # we will need to state update to add the lid stack to this position in space - # state_update.set_loaded_labware( - # definition=lid_stack_object.definition, - # labware_id=lid_stack_object.labware_id, - # offset_id=lid_stack_object.offsetId, - # display_name=None, - # location=available_new_location, - # ) - - # # Update the labware location to the new lid stack - # state_update.set_labware_location( - # labware_id=params.labwareId, - # new_location=OnLabwareLocation(labwareId=lid_stack_object.labware_id), - # new_offset_id=new_offset_id, - # ) - # else: - # state_update.set_labware_location( - # labware_id=params.labwareId, - # new_location=available_new_location, - # new_offset_id=new_offset_id, - # ) - # if isinstance( - # available_new_location, OnLabwareLocation - # ) and not labware_validation.is_lid_stack( - # self._state_view.labware.get_load_name(available_new_location.labwareId) - # ): - # parent_updates.append(available_new_location.labwareId) - # lid_updates.append(params.labwareId) - # # Update relevant Lid States - # state_update.set_lids( - # parent_labware_ids=parent_updates, - # lid_ids=lid_updates, - # ) - # if len(parent_updates) > 1: - # raise ValueError(f"parents: {parent_updates} lids: {lid_updates}") return SuccessData( public=MoveLidResult(offsetId=new_offset_id), diff --git a/api/src/opentrons/protocol_engine/state/labware.py b/api/src/opentrons/protocol_engine/state/labware.py index 7f820ea64a5..d272520690f 100644 --- a/api/src/opentrons/protocol_engine/state/labware.py +++ b/api/src/opentrons/protocol_engine/state/labware.py @@ -49,6 +49,7 @@ LabwareMovementOffsetData, OnDeckLabwareLocation, OFF_DECK_LOCATION, + INVALIDATED_LOCATION, ) from ..actions import ( Action, @@ -303,6 +304,15 @@ def _set_labware_location(self, state_update: update_types.StateUpdate) -> None: self._state.labware_by_id[labware_id].location = new_location + def _set_labware_invalidated(self, state_update: update_types.StateUpdate) -> None: + labware_invalidation_update = state_update.invalidate_labware + if labware_invalidation_update != update_types.NO_CHANGE: + labware_id = labware_invalidation_update.labware_id + + self._state.labware_by_id[labware_id].offsetId = None + + self._state.labware_by_id[labware_id].location = INVALIDATED_LOCATION + class LabwareView: """Read-only labware state view.""" @@ -482,7 +492,7 @@ def get_labware_stack( return self.get_labware_stack(labware_stack) return labware_stack - def get_lid_by_labware_id(self, labware_id) -> LoadedLabware | None: + def get_lid_by_labware_id(self, labware_id: str) -> LoadedLabware | None: """Get the Lid Labware that is currently on top of a given labware, if there is one.""" lid_id = self._state.labware_by_id[labware_id].lid_id if lid_id: @@ -966,7 +976,7 @@ def raise_if_labware_cannot_be_stacked( # noqa: C901 for lw in labware_stack: if not labware_validation.validate_definition_is_adapter( self.get_definition(lw.id) - ): + ) and not labware_validation.is_lid_stack(self.get_load_name(lw.id)): stack_without_adapters.append(lw) if len(stack_without_adapters) >= self.get_labware_stacking_maximum( top_labware_definition diff --git a/api/src/opentrons/protocol_engine/state/update_types.py b/api/src/opentrons/protocol_engine/state/update_types.py index 66c0d5f11d8..d48421e074a 100644 --- a/api/src/opentrons/protocol_engine/state/update_types.py +++ b/api/src/opentrons/protocol_engine/state/update_types.py @@ -104,6 +104,14 @@ class LabwareLocationUpdate: """The ID of the labware's new offset, for its new location.""" +@dataclasses.dataclass +class LabwareInvalidationUpdate: + """Invalidate a labware's location.""" + + labware_id: str + """The ID of the already-loaded labware.""" + + @dataclasses.dataclass class LoadedLabwareUpdate: """An update that loads a new labware.""" @@ -362,6 +370,8 @@ class StateUpdate: labware_location: LabwareLocationUpdate | NoChangeType = NO_CHANGE + invalidate_labware: LabwareInvalidationUpdate | NoChangeType = NO_CHANGE + loaded_labware: LoadedLabwareUpdate | NoChangeType = NO_CHANGE loaded_lid_stack: LoadedLidStackUpdate | NoChangeType = NO_CHANGE @@ -543,6 +553,15 @@ def set_lids( ) return self + def set_labware_invalidated( + self: Self, + *, + labware_id: str, + ) -> Self: + """Invalidate a labware's location. See `LabwareInvalidationUpdate`.""" + self.invalidate_labware = LabwareInvalidationUpdate(labware_id=labware_id) + return self + def set_load_pipette( self: Self, pipette_id: str,