Skip to content

Commit

Permalink
Perform linting
Browse files Browse the repository at this point in the history
  • Loading branch information
andchiind committed Feb 20, 2024
1 parent 270f9d6 commit 5f6bea1
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/isar_exr/api/energy_robotics_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ def get_battery_level(self, exr_robot_id: str) -> float:
except Exception as e:
message: str = "Could not check robot battery level"
self.logger.error(message)
raise RobotMissionStatusException( #TODO: occurs after obstacle
raise RobotMissionStatusException( # TODO: occurs after obstacle
error_description=message,
)

Expand Down
59 changes: 37 additions & 22 deletions src/isar_exr/robotinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ def create_new_stage(self) -> str:
stage_id: str = self.api.create_stage(site_id=settings.ROBOT_EXR_SITE_ID)
return stage_id

def update_site_with_tasks(self, tasks: List[Task]) -> List[str]: # Returns a list of POI IDs
new_stage_id = None
def update_site_with_tasks(
self, tasks: List[Task]
) -> List[str]: # Returns a list of POI IDs
new_stage_id: str = None
poi_ids: List[str] = []
is_possible_return_to_home_mission = False
steps_n = 0
Expand All @@ -119,48 +121,59 @@ def update_site_with_tasks(self, tasks: List[Task]) -> List[str]: # Returns a li
robot_pose: Pose = step.pose
if isinstance(step, InspectionStep):
customer_tag: str = task.tag_id + robot_pose.to_string()
existing_poi_id = self.api.get_point_of_interest_by_customer_tag(
customer_tag=customer_tag, site_id=settings.ROBOT_EXR_SITE_ID
existing_poi_id = (
self.api.get_point_of_interest_by_customer_tag(
customer_tag=customer_tag,
site_id=settings.ROBOT_EXR_SITE_ID,
)
)
if existing_poi_id == None:
new_stage_id: str = self.create_new_stage()
poi_id: str = (
self._create_and_add_poi(
task=task,
step=step,
robot_pose=robot_pose, # This pose is set by the previously received DriveToStep
stage_id=new_stage_id,
customer_tag=customer_tag,
)
new_stage_id = self.create_new_stage()
poi_id: str = self._create_and_add_poi(
task=task,
step=step,
robot_pose=robot_pose, # This pose is set by the previously received DriveToStep
stage_id=new_stage_id,
customer_tag=customer_tag,
)
poi_ids.append(poi_id)
else:
poi_ids.append(existing_poi_id)

if steps_n == 0 or (steps_n == 1 and is_possible_return_to_home_mission):
time.sleep(settings.API_SLEEP_TIME) # We need to sleep to allow events to reach flotilla in the right order
raise RobotMissionNotSupportedException("Robot does not support localisation or return to home mission")
time.sleep(
settings.API_SLEEP_TIME
) # We need to sleep to allow events to reach flotilla in the right order
raise RobotMissionNotSupportedException(
"Robot does not support localisation or return to home mission"
)

if new_stage_id is not None:
# We should only do the following if we changed the site
snapshot_id: str = self.api.commit_site_to_snapshot(stage_id=new_stage_id)
snapshot_id: str = self.api.commit_site_to_snapshot(
stage_id=new_stage_id
)

self.api.set_snapshot_as_head(
snapshot_id=snapshot_id, site_id=settings.ROBOT_EXR_SITE_ID
)
except Exception as e:
if new_stage_id is not None:
self.api.discard_stage(stage_id=new_stage_id) # Discard stage if we did not manage to use it
self.api.discard_stage(
stage_id=new_stage_id
) # Discard stage if we did not manage to use it
raise e
if new_stage_id is not None: # Here we wait for the site update to complete

if new_stage_id is not None: # Here we wait for the site update to complete
while not self.api.is_pipeline_completed(
site_id=settings.ROBOT_EXR_SITE_ID
):
time.sleep(settings.API_SLEEP_TIME)
return poi_ids

def create_mission_definition(self, mission_name: str, tasks: List[Task], poi_ids: List[str]) -> str: # Returns a mission definition ID

def create_mission_definition(
self, mission_name: str, tasks: List[Task], poi_ids: List[str]
) -> str: # Returns a mission definition ID
# Note that the POI IDs need to be in the same order as inspection steps in the provided mission
mission_definition_id: str = self.api.create_mission_definition(
site_id=settings.ROBOT_EXR_SITE_ID,
Expand Down Expand Up @@ -193,7 +206,9 @@ def initiate_mission(self, mission: Mission) -> None:
except RobotMissionNotSupportedException:
return

mission_definition_id: str = self.create_mission_definition(mission.id, mission.tasks, poi_ids)
mission_definition_id: str = self.create_mission_definition(
mission.id, mission.tasks, poi_ids
)

self.api.start_mission_execution(
mission_definition_id=mission_definition_id, robot_id=settings.ROBOT_EXR_ID
Expand Down

0 comments on commit 5f6bea1

Please sign in to comment.