diff --git a/main.py b/main.py index 1ad85e7a..66b67a84 100644 --- a/main.py +++ b/main.py @@ -19,9 +19,6 @@ from isar.services.service_connections.mqtt.robot_info_publisher import ( RobotInfoPublisher, ) -from isar.services.service_connections.mqtt.robot_status_publisher import ( - RobotStatusPublisher, -) from isar.state_machine.state_machine import StateMachine, main from isar.storage.uploader import Uploader from robot_interface.robot_interface import RobotInterface @@ -56,15 +53,6 @@ target=mqtt_client.run, name="ISAR MQTT Client", daemon=True ) threads.append(mqtt_thread) - robot_status_publisher: RobotStatusPublisher = RobotStatusPublisher( - mqtt_queue=queues.mqtt_queue, robot=robot, state_machine=state_machine - ) - robot_status_thread: Thread = Thread( - target=robot_status_publisher.run, - name="ISAR Robot Status Publisher", - daemon=True, - ) - threads.append(robot_status_thread) robot_info_publisher: RobotInfoPublisher = RobotInfoPublisher( mqtt_queue=queues.mqtt_queue diff --git a/src/isar/config/settings.py b/src/isar/config/settings.py index d4d3ac38..26ebde31 100644 --- a/src/isar/config/settings.py +++ b/src/isar/config/settings.py @@ -229,14 +229,13 @@ def __init__(self) -> None: DATA_CLASSIFICATION: str = Field(default="internal") # List of MQTT Topics - TOPIC_ISAR_STATE: str = Field(default="state", validate_default=True) + TOPIC_ISAR_STATUS: str = Field(default="status", validate_default=True) TOPIC_ISAR_MISSION: str = Field(default="mission", validate_default=True) TOPIC_ISAR_TASK: str = Field(default="task", validate_default=True) TOPIC_ISAR_STEP: str = Field(default="step", validate_default=True) TOPIC_ISAR_INSPECTION_RESULT: str = Field( default="inspection_result", validate_default=True ) - TOPIC_ISAR_ROBOT_STATUS: str = Field(default="robot_status", validate_default=True) TOPIC_ISAR_ROBOT_INFO: str = Field(default="robot_info", validate_default=True) TOPIC_ISAR_ROBOT_HEARTBEAT: str = Field( default="robot_heartbeat", validate_default=True @@ -284,11 +283,10 @@ def set_log_levels(cls, v: Any, info: ValidationInfo) -> dict: } @field_validator( - "TOPIC_ISAR_STATE", + "TOPIC_ISAR_STATUS", "TOPIC_ISAR_MISSION", "TOPIC_ISAR_TASK", "TOPIC_ISAR_STEP", - "TOPIC_ISAR_ROBOT_STATUS", "TOPIC_ISAR_ROBOT_INFO", "TOPIC_ISAR_ROBOT_HEARTBEAT", "TOPIC_ISAR_INSPECTION_RESULT", diff --git a/src/isar/services/service_connections/mqtt/robot_status_publisher.py b/src/isar/services/service_connections/mqtt/robot_status_publisher.py deleted file mode 100644 index e0ee3f1c..00000000 --- a/src/isar/services/service_connections/mqtt/robot_status_publisher.py +++ /dev/null @@ -1,125 +0,0 @@ -import json -import logging -import time -from datetime import datetime -from logging import Logger -from queue import Queue -from threading import Thread -from typing import Optional - -from isar.config.settings import settings -from isar.state_machine.state_machine import StateMachine -from isar.state_machine.states_enum import States -from robot_interface.models.exceptions.robot_exceptions import ( - RobotAPIException, - RobotCommunicationException, - RobotException, -) -from robot_interface.models.mission.status import RobotStatus -from robot_interface.robot_interface import RobotInterface -from robot_interface.telemetry.mqtt_client import MqttPublisher -from robot_interface.telemetry.payloads import RobotStatusPayload -from robot_interface.utilities.json_service import EnhancedJSONEncoder - - -class RobotStatusPublisher: - def __init__( - self, mqtt_queue: Queue, robot: RobotInterface, state_machine: StateMachine - ): - self.mqtt_publisher: MqttPublisher = MqttPublisher(mqtt_queue=mqtt_queue) - self.robot: RobotInterface = robot - self.state_machine: StateMachine = state_machine - self.logger: Logger = logging.getLogger("mqtt") - - def _get_combined_robot_status( - self, robot_status: RobotStatus, current_state: States - ) -> RobotStatus: - if robot_status == RobotStatus.Offline: - return RobotStatus.Offline - elif current_state == States.Idle and robot_status == RobotStatus.Available: - return RobotStatus.Available - elif robot_status == RobotStatus.Blocked: - return RobotStatus.Blocked - elif current_state != States.Idle or robot_status == RobotStatus.Busy: - return RobotStatus.Busy - return None - - def run(self) -> None: - robot_status_monitor: RobotStatusMonitor = RobotStatusMonitor(robot=self.robot) - robot_status_thread: Thread = Thread( - target=robot_status_monitor.run, - name="Robot Status Monitor", - daemon=True, - ) - robot_status_thread.start() - - previous_robot_status: Optional[RobotStatus] = None - - while True: - time.sleep(settings.ROBOT_STATUS_PUBLISH_INTERVAL) - - combined_status: RobotStatus = self._get_combined_robot_status( - robot_status=robot_status_monitor.robot_status, - current_state=self.state_machine.current_state, - ) - - if previous_robot_status: - if previous_robot_status == combined_status: - continue - - payload: RobotStatusPayload = RobotStatusPayload( - isar_id=settings.ISAR_ID, - robot_name=settings.ROBOT_NAME, - robot_status=combined_status, - previous_robot_status=previous_robot_status, - current_isar_state=self.state_machine.current_state, - current_mission_id=( - self.state_machine.current_mission.id - if self.state_machine.current_mission - else None - ), - current_task_id=( - self.state_machine.current_task.id - if self.state_machine.current_task - else None - ), - current_step_id=( - self.state_machine.current_step.id - if self.state_machine.current_step - else None - ), - timestamp=datetime.utcnow(), - ) - - self.logger.info( - f"Combined status has changed to {combined_status} from {previous_robot_status} " - f"with current state {self.state_machine.current_state}" - ) - - self.mqtt_publisher.publish( - topic=settings.TOPIC_ISAR_ROBOT_STATUS, - payload=json.dumps(payload, cls=EnhancedJSONEncoder), - ) - - previous_robot_status = combined_status - - -class RobotStatusMonitor: - def __init__(self, robot: RobotInterface): - self.robot: RobotInterface = robot - self.robot_status: RobotStatus = RobotStatus.Offline - self.logger: Logger = logging.getLogger("robot_status_monitor") - - def run(self) -> None: - while True: - try: - self.robot_status = self.robot.robot_status() - except ( - RobotCommunicationException, - RobotAPIException, - RobotException, - ) as e: - self.logger.error( - f"Failed to get robot status because: {e.error_description}" - ) - time.sleep(settings.ROBOT_API_STATUS_POLL_INTERVAL) diff --git a/src/isar/state_machine/state_machine.py b/src/isar/state_machine/state_machine.py index 1486ad74..903e5933 100644 --- a/src/isar/state_machine/state_machine.py +++ b/src/isar/state_machine/state_machine.py @@ -32,7 +32,12 @@ from robot_interface.models.exceptions.robot_exceptions import ErrorMessage from robot_interface.models.initialize.initialize_params import InitializeParams from robot_interface.models.mission.mission import Mission -from robot_interface.models.mission.status import MissionStatus, StepStatus, TaskStatus +from robot_interface.models.mission.status import ( + MissionStatus, + RobotStatus, + StepStatus, + TaskStatus, +) from robot_interface.models.mission.step import Step from robot_interface.models.mission.task import Task from robot_interface.robot_interface import RobotInterface @@ -438,7 +443,7 @@ def update_state(self): self.send_state_status() self._log_state_transition(self.current_state) self.logger.info(f"State: {self.current_state}") - self.publish_state() + self.publish_status() def reset_state_machine(self) -> None: self.logger.info("Resetting state machine") @@ -580,25 +585,33 @@ def publish_step_status(self, step: Step) -> None: retain=False, ) - def publish_state(self) -> None: + def publish_status(self) -> None: if not self.mqtt_publisher: return payload: str = json.dumps( { "isar_id": settings.ISAR_ID, "robot_name": settings.ROBOT_NAME, - "state": self.current_state, + "status": self._current_status(), "timestamp": datetime.utcnow(), }, cls=EnhancedJSONEncoder, ) self.mqtt_publisher.publish( - topic=settings.TOPIC_ISAR_STATE, + topic=settings.TOPIC_ISAR_STATUS, payload=payload, retain=False, ) + def _current_status(self) -> RobotStatus: + if self.current_state == States.Idle: + return RobotStatus.Available + elif self.current_state == States.Offline: + return RobotStatus.Offline + else: + return RobotStatus.Busy + def _log_state_transition(self, next_state): """Logs all state transitions that are not self-transitions.""" self.transitions_list.append(next_state)