Skip to content

Commit

Permalink
feat(api): handle stalls in addressable area moves
Browse files Browse the repository at this point in the history
Both move to addressable are and move to addressable area for drop tip
handle stalls now.
  • Loading branch information
sfoster1 committed Nov 18, 2024
1 parent 7839fde commit 452efc0
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,29 @@
from opentrons_shared_data.pipette.types import PipetteNameType

from ..errors import LocationNotAccessibleByPipetteError
from ..state import update_types
from ..types import DeckPoint, AddressableOffsetVector
from ..types import AddressableOffsetVector
from ..resources import fixture_validation
from .pipetting_common import (
PipetteIdMixin,
)
from .movement_common import (
MovementMixin,
DestinationPositionResult,
move_to_addressable_area,
StallOrCollisionError,
)
from .command import (
AbstractCommandImpl,
BaseCommand,
BaseCommandCreate,
SuccessData,
DefinedErrorData,
)
from .command import AbstractCommandImpl, BaseCommand, BaseCommandCreate, SuccessData
from ..errors.error_occurrence import ErrorOccurrence

if TYPE_CHECKING:
from ..execution import MovementHandler
from ..state.state import StateView
from ..resources.model_utils import ModelUtils

MoveToAddressableAreaCommandType = Literal["moveToAddressableArea"]

Expand Down Expand Up @@ -76,25 +83,29 @@ class MoveToAddressableAreaResult(DestinationPositionResult):
pass


_ExecuteReturn = (
SuccessData[MoveToAddressableAreaResult] | DefinedErrorData[StallOrCollisionError]
)


class MoveToAddressableAreaImplementation(
AbstractCommandImpl[
MoveToAddressableAreaParams, SuccessData[MoveToAddressableAreaResult]
]
AbstractCommandImpl[MoveToAddressableAreaParams, _ExecuteReturn]
):
"""Move to addressable area command implementation."""

def __init__(
self, movement: MovementHandler, state_view: StateView, **kwargs: object
self,
movement: MovementHandler,
state_view: StateView,
model_utils: ModelUtils,
**kwargs: object,
) -> None:
self._movement = movement
self._state_view = state_view
self._model_utils = model_utils

async def execute(
self, params: MoveToAddressableAreaParams
) -> SuccessData[MoveToAddressableAreaResult]:
async def execute(self, params: MoveToAddressableAreaParams) -> _ExecuteReturn:
"""Move the requested pipette to the requested addressable area."""
state_update = update_types.StateUpdate()

self._state_view.addressable_areas.raise_if_area_not_in_deck_configuration(
params.addressableAreaName
)
Expand All @@ -117,7 +128,9 @@ async def execute(
f"Cannot move pipette to staging slot {params.addressableAreaName}"
)

x, y, z = await self._movement.move_to_addressable_area(
result = await move_to_addressable_area(
movement=self._movement,
model_utils=self._model_utils,
pipette_id=params.pipetteId,
addressable_area_name=params.addressableAreaName,
offset=params.offset,
Expand All @@ -127,22 +140,20 @@ async def execute(
stay_at_highest_possible_z=params.stayAtHighestPossibleZ,
highest_possible_z_extra_offset=extra_z_offset,
)
deck_point = DeckPoint.construct(x=x, y=y, z=z)
state_update.set_pipette_location(
pipette_id=params.pipetteId,
new_addressable_area_name=params.addressableAreaName,
new_deck_point=deck_point,
)

return SuccessData(
public=MoveToAddressableAreaResult(position=DeckPoint(x=x, y=y, z=z)),
state_update=state_update,
)
if isinstance(result, DefinedErrorData):
return result
else:
return SuccessData(
public=MoveToAddressableAreaResult(position=result.public.position),
state_update=result.state_update,
)


class MoveToAddressableArea(
BaseCommand[
MoveToAddressableAreaParams, MoveToAddressableAreaResult, ErrorOccurrence
MoveToAddressableAreaParams,
MoveToAddressableAreaResult,
StallOrCollisionError,
]
):
"""Move to addressable area command model."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,29 @@
from typing_extensions import Literal

from ..errors import LocationNotAccessibleByPipetteError
from ..state import update_types
from ..types import DeckPoint, AddressableOffsetVector
from ..types import AddressableOffsetVector
from ..resources import fixture_validation
from .pipetting_common import (
PipetteIdMixin,
)
from .movement_common import (
MovementMixin,
DestinationPositionResult,
move_to_addressable_area,
StallOrCollisionError,
)
from .command import (
AbstractCommandImpl,
BaseCommand,
BaseCommandCreate,
SuccessData,
DefinedErrorData,
)
from .command import AbstractCommandImpl, BaseCommand, BaseCommandCreate, SuccessData
from ..errors.error_occurrence import ErrorOccurrence

if TYPE_CHECKING:
from ..execution import MovementHandler
from ..state.state import StateView
from ..resources.model_utils import ModelUtils

MoveToAddressableAreaForDropTipCommandType = Literal["moveToAddressableAreaForDropTip"]

Expand Down Expand Up @@ -85,26 +92,32 @@ class MoveToAddressableAreaForDropTipResult(DestinationPositionResult):
pass


_ExecuteReturn = (
SuccessData[MoveToAddressableAreaForDropTipResult]
| DefinedErrorData[StallOrCollisionError]
)


class MoveToAddressableAreaForDropTipImplementation(
AbstractCommandImpl[
MoveToAddressableAreaForDropTipParams,
SuccessData[MoveToAddressableAreaForDropTipResult],
]
AbstractCommandImpl[MoveToAddressableAreaForDropTipParams, _ExecuteReturn]
):
"""Move to addressable area for drop tip command implementation."""

def __init__(
self, movement: MovementHandler, state_view: StateView, **kwargs: object
self,
movement: MovementHandler,
state_view: StateView,
model_utils: ModelUtils,
**kwargs: object,
) -> None:
self._movement = movement
self._state_view = state_view
self._model_utils = model_utils

async def execute(
self, params: MoveToAddressableAreaForDropTipParams
) -> SuccessData[MoveToAddressableAreaForDropTipResult]:
) -> _ExecuteReturn:
"""Move the requested pipette to the requested addressable area in preperation of a drop tip."""
state_update = update_types.StateUpdate()

self._state_view.addressable_areas.raise_if_area_not_in_deck_configuration(
params.addressableAreaName
)
Expand All @@ -122,7 +135,9 @@ async def execute(
else:
offset = params.offset

x, y, z = await self._movement.move_to_addressable_area(
result = await move_to_addressable_area(
movement=self._movement,
model_utils=self._model_utils,
pipette_id=params.pipetteId,
addressable_area_name=params.addressableAreaName,
offset=offset,
Expand All @@ -131,26 +146,22 @@ async def execute(
speed=params.speed,
ignore_tip_configuration=params.ignoreTipConfiguration,
)
deck_point = DeckPoint.construct(x=x, y=y, z=z)
state_update.set_pipette_location(
pipette_id=params.pipetteId,
new_addressable_area_name=params.addressableAreaName,
new_deck_point=deck_point,
)

return SuccessData(
public=MoveToAddressableAreaForDropTipResult(
position=DeckPoint(x=x, y=y, z=z)
),
state_update=state_update,
)
if isinstance(result, DefinedErrorData):
return result
else:
return SuccessData(
public=MoveToAddressableAreaForDropTipResult(
position=result.public.position,
),
state_update=result.state_update,
)


class MoveToAddressableAreaForDropTip(
BaseCommand[
MoveToAddressableAreaForDropTipParams,
MoveToAddressableAreaForDropTipResult,
ErrorOccurrence,
StallOrCollisionError,
]
):
"""Move to addressable area for drop tip command model."""
Expand Down
54 changes: 54 additions & 0 deletions api/src/opentrons/protocol_engine/commands/movement_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
DeckPoint,
CurrentWell,
MovementAxis,
AddressableOffsetVector,
)
from ..state.update_types import StateUpdate
from .command import SuccessData, DefinedErrorData
Expand Down Expand Up @@ -223,3 +224,56 @@ async def move_relative(
pipette_id=pipette_id, new_deck_point=deck_point
),
)


async def move_to_addressable_area(
movement: MovementHandler,
model_utils: ModelUtils,
pipette_id: str,
addressable_area_name: str,
offset: AddressableOffsetVector,
force_direct: bool = False,
minimum_z_height: float | None = None,
speed: float | None = None,
stay_at_highest_possible_z: bool = False,
ignore_tip_configuration: bool | None = True,
highest_possible_z_extra_offset: float | None = None,
) -> SuccessData[DestinationPositionResult] | DefinedErrorData[StallOrCollisionError]:
"""Move to an addressable area identified by name."""
try:
x, y, z = await movement.move_to_addressable_area(
pipette_id=pipette_id,
addressable_area_name=addressable_area_name,
offset=offset,
force_direct=force_direct,
minimum_z_height=minimum_z_height,
speed=speed,
stay_at_highest_possible_z=stay_at_highest_possible_z,
ignore_tip_configuration=ignore_tip_configuration,
highest_possible_z_extra_offset=highest_possible_z_extra_offset,
)
except StallOrCollisionDetectedError as e:
return DefinedErrorData(
public=StallOrCollisionError(
id=model_utils.generate_id(),
createdAt=model_utils.get_timestamp(),
wrappedErrors=[
ErrorOccurrence.from_failed(
id=model_utils.generate_id(),
createdAt=model_utils.get_timestamp(),
error=e,
)
],
),
state_update=StateUpdate().clear_all_pipette_locations(),
)
else:
deck_point = DeckPoint.construct(x=x, y=y, z=z)
return SuccessData(
public=DestinationPositionResult(position=deck_point),
state_update=StateUpdate().set_pipette_location(
pipette_id=pipette_id,
new_addressable_area_name=addressable_area_name,
new_deck_point=deck_point,
),
)
Loading

0 comments on commit 452efc0

Please sign in to comment.