diff --git a/src/isar_exr/api/energy_robotics_api.py b/src/isar_exr/api/energy_robotics_api.py index eb55102..968578e 100644 --- a/src/isar_exr/api/energy_robotics_api.py +++ b/src/isar_exr/api/energy_robotics_api.py @@ -531,7 +531,7 @@ def get_battery_level(self, exr_robot_id: str) -> float: except Exception as e: message: str = "Could not check robot battery level" self.logger.error(message) - raise RobotMissionStatusException( #TODO: occurs after obstacle + raise RobotMissionStatusException( # TODO: occurs after obstacle error_description=message, ) diff --git a/src/isar_exr/robotinterface.py b/src/isar_exr/robotinterface.py index c1f5db7..cfc50d2 100644 --- a/src/isar_exr/robotinterface.py +++ b/src/isar_exr/robotinterface.py @@ -95,8 +95,10 @@ def create_new_stage(self) -> str: stage_id: str = self.api.create_stage(site_id=settings.ROBOT_EXR_SITE_ID) return stage_id - def update_site_with_tasks(self, tasks: List[Task]) -> List[str]: # Returns a list of POI IDs - new_stage_id = None + def update_site_with_tasks( + self, tasks: List[Task] + ) -> List[str]: # Returns a list of POI IDs + new_stage_id: str = None poi_ids: List[str] = [] is_possible_return_to_home_mission = False steps_n = 0 @@ -119,48 +121,59 @@ def update_site_with_tasks(self, tasks: List[Task]) -> List[str]: # Returns a li robot_pose: Pose = step.pose if isinstance(step, InspectionStep): customer_tag: str = task.tag_id + robot_pose.to_string() - existing_poi_id = self.api.get_point_of_interest_by_customer_tag( - customer_tag=customer_tag, site_id=settings.ROBOT_EXR_SITE_ID + existing_poi_id = ( + self.api.get_point_of_interest_by_customer_tag( + customer_tag=customer_tag, + site_id=settings.ROBOT_EXR_SITE_ID, + ) ) if existing_poi_id == None: - new_stage_id: str = self.create_new_stage() - poi_id: str = ( - self._create_and_add_poi( - task=task, - step=step, - robot_pose=robot_pose, # This pose is set by the previously received DriveToStep - stage_id=new_stage_id, - customer_tag=customer_tag, - ) + new_stage_id = self.create_new_stage() + poi_id: str = self._create_and_add_poi( + task=task, + step=step, + robot_pose=robot_pose, # This pose is set by the previously received DriveToStep + stage_id=new_stage_id, + customer_tag=customer_tag, ) poi_ids.append(poi_id) else: poi_ids.append(existing_poi_id) if steps_n == 0 or (steps_n == 1 and is_possible_return_to_home_mission): - time.sleep(settings.API_SLEEP_TIME) # We need to sleep to allow events to reach flotilla in the right order - raise RobotMissionNotSupportedException("Robot does not support localisation or return to home mission") + time.sleep( + settings.API_SLEEP_TIME + ) # We need to sleep to allow events to reach flotilla in the right order + raise RobotMissionNotSupportedException( + "Robot does not support localisation or return to home mission" + ) if new_stage_id is not None: # We should only do the following if we changed the site - snapshot_id: str = self.api.commit_site_to_snapshot(stage_id=new_stage_id) + snapshot_id: str = self.api.commit_site_to_snapshot( + stage_id=new_stage_id + ) self.api.set_snapshot_as_head( snapshot_id=snapshot_id, site_id=settings.ROBOT_EXR_SITE_ID ) except Exception as e: if new_stage_id is not None: - self.api.discard_stage(stage_id=new_stage_id) # Discard stage if we did not manage to use it + self.api.discard_stage( + stage_id=new_stage_id + ) # Discard stage if we did not manage to use it raise e - - if new_stage_id is not None: # Here we wait for the site update to complete + + if new_stage_id is not None: # Here we wait for the site update to complete while not self.api.is_pipeline_completed( site_id=settings.ROBOT_EXR_SITE_ID ): time.sleep(settings.API_SLEEP_TIME) return poi_ids - - def create_mission_definition(self, mission_name: str, tasks: List[Task], poi_ids: List[str]) -> str: # Returns a mission definition ID + + def create_mission_definition( + self, mission_name: str, tasks: List[Task], poi_ids: List[str] + ) -> str: # Returns a mission definition ID # Note that the POI IDs need to be in the same order as inspection steps in the provided mission mission_definition_id: str = self.api.create_mission_definition( site_id=settings.ROBOT_EXR_SITE_ID, @@ -193,7 +206,9 @@ def initiate_mission(self, mission: Mission) -> None: except RobotMissionNotSupportedException: return - mission_definition_id: str = self.create_mission_definition(mission.id, mission.tasks, poi_ids) + mission_definition_id: str = self.create_mission_definition( + mission.id, mission.tasks, poi_ids + ) self.api.start_mission_execution( mission_definition_id=mission_definition_id, robot_id=settings.ROBOT_EXR_ID