diff --git a/telematic_system/docker-compose.historical.yml b/telematic_system/docker-compose.historical.yml index 55e7fce3..1ed6cc28 100644 --- a/telematic_system/docker-compose.historical.yml +++ b/telematic_system/docker-compose.historical.yml @@ -36,4 +36,5 @@ services: - TO_STR_FIELDS=[] - IGNORE_FIELDS=[] - FILE_PROCESSING_SUBJECT=ui.file.processing - - ACCEPTED_FILE_EXTENSIONS=[".mcap"] \ No newline at end of file + - ACCEPTED_FILE_EXTENSIONS=[".mcap"] + command: bash -c 'source /ws/install/setup.bash && wait-for-it ${DB_HOST}:${DB_PORT} -- ros2 run rosbag2_processing_service main' \ No newline at end of file diff --git a/telematic_system/docker-compose.local.yml b/telematic_system/docker-compose.local.yml index cccba9b6..b6f6ac7f 100644 --- a/telematic_system/docker-compose.local.yml +++ b/telematic_system/docker-compose.local.yml @@ -187,6 +187,7 @@ services: context: ./telematic_historical_data_processing dockerfile: Dockerfile network: host + image: usdotfhwastoldev/rosbag2_processing_service:develop container_name: rosbag2_processing_service restart: always depends_on: @@ -197,14 +198,14 @@ services: options: max-size: "2g" max-file: "1" + container_name: rosbag2_processing_service network_mode: host - entrypoint: /ws/entrypoint.sh volumes: - /opt/telematics/logs:/opt/telematics/logs - /opt/telematics/upload:/opt/telematics/upload environment: - - TOPIC_EXCLUSION_LIST=["/rosout","/parameter_events","/robot_description","/tf_static","/tf","/environment/fused_external_objects","/environment/base_map","/environment/external_objects_viz","/environment/motion_computation_visualize","/environment/external_object_predictions","/localization/current_pose","/localization/map_param_loader/georeference","/environment/semantic_map","/hardware_interface/comms/outbound_binary_msg","/environment/lanelet2_map_viz","/localization/points_map"] - - NATS_SERVER_IP_PORT=${NATS_SERVERS} + - TOPIC_EXCLUSION_LIST=${ROSBAG2_EXCLUSION_LIST} + - NATS_SERVER_IP_PORT=nats://localhost:4222 - LOG_LEVEL=debug - LOG_NAME=rosbag2_processing_service - LOG_PATH=/opt/telematics/logs @@ -212,19 +213,20 @@ services: - LOG_HANDLER_TYPE=console - INFLUX_BUCKET=${INFLUXDB_DEV_INIT_BUCKET} - INFLUX_ORG=my-org - - INFLUX_TOKEN=my-super-secret-token - - INFLUX_URL=${INFLUX_URL} - - MYSQL_HOST=${DB_HOST} # Add host IP where mysql database is hosted + - INFLUX_TOKEN=my-super-secret-auth-token + - INFLUX_URL=127.0.0.1:8086 + - MYSQL_HOST=${DB_HOST} #Add host IP where mysql database is hosted - MYSQL_PORT=${DB_PORT} - - MYSQL_DB=${MYSQL_DATABASE} # Name of Mysql databse + - MYSQL_DB=wfd_grafana # Name of Mysql databse - MYSQL_USER=${DB_USER} # Login credentials for mysql database, User - MYSQL_PASSWORD=${DB_PASSWORD} # Login credentials for mysql database, Password - MYSQL_TABLE=file_infos - - UPLOAD_DESTINATION_PATH=/opt/telematics/upload # For cloud deployment:This is the directory S3 bucket is mounted + - UPLOAD_DESTINATION_PATH=${UPLOAD_DESTINATION_PATH} # For cloud deployment:This is the directory S3 bucket is mounted - TO_STR_FIELDS=[] - IGNORE_FIELDS=[] - - FILE_PROCESSING_SUBJECT=${FILE_PROCESSING_SUBJECT} + - FILE_PROCESSING_SUBJECT=ui.file.* - ACCEPTED_FILE_EXTENSIONS=[".mcap"] + command: bash -c 'source /ws/install/setup.bash && wait-for-it ${DB_HOST}:${DB_PORT} -- ros2 run rosbag2_processing_service main' secrets: mysql_password: diff --git a/telematic_system/telematic_historical_data_processing/Dockerfile b/telematic_system/telematic_historical_data_processing/Dockerfile index dd15c58d..e176541b 100644 --- a/telematic_system/telematic_historical_data_processing/Dockerfile +++ b/telematic_system/telematic_historical_data_processing/Dockerfile @@ -29,10 +29,7 @@ RUN chmod +x /ws/src/rosbag2_processing_service/docker/* RUN sudo bash /ws/src/rosbag2_processing_service/docker/checkout.sh RUN sudo bash /ws/src/rosbag2_processing_service/docker/install.sh -COPY rosbag2_processing_service/docker/entrypoint.sh entrypoint.sh -RUN chmod +x entrypoint.sh - RUN echo "source /ws/install/setup.bash" >> ~/.bashrc -CMD ["bash"] \ No newline at end of file +CMD ["ros2 run rosbag2_processing_service main"] \ No newline at end of file diff --git a/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/docker/entrypoint.sh b/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/docker/entrypoint.sh deleted file mode 100644 index d7ccd4f2..00000000 --- a/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/docker/entrypoint.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -# Copyright (C) 2024 LEIDOS. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may not -# use this file except in compliance with the License. You may obtain a copy of -# the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations under -# the License - -source /ws/install/setup.bash -ros2 run rosbag2_processing_service main \ No newline at end of file diff --git a/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/docker/install.sh b/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/docker/install.sh index 73dc6c66..789628ba 100644 --- a/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/docker/install.sh +++ b/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/docker/install.sh @@ -31,7 +31,8 @@ sudo apt update sudo apt install -y apt-utils \ python3-pybind11 \ ros-foxy-test-msgs \ - python3-colcon-common-extensions + python3-colcon-common-extensions \ + wait-for-it # Build rosbag2 colcon build --packages-up-to rosbag2_processing_service \ No newline at end of file diff --git a/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/rosbag2_processing_service/rosbag_processor.py b/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/rosbag2_processing_service/rosbag_processor.py index 9e2a474e..65954ada 100644 --- a/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/rosbag2_processing_service/rosbag_processor.py +++ b/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/rosbag2_processing_service/rosbag_processor.py @@ -14,7 +14,8 @@ # the License. # -from mcap_ros2.reader import read_ros2_messages +from mcap_ros2.reader import read_ros2_messages, DecoderFactory +from mcap.reader import McapReader, NonSeekingReader, SeekingReader, make_reader import re import time from influxdb.exceptions import InfluxDBClientError @@ -81,7 +82,16 @@ def process_rosbag(self,rosbag2_path): # Load the rosbag from the config directory try: - for msg in read_ros2_messages(rosbag2_path): + with open(rosbag2_path, "rb") as file: + reader = make_reader(file, decoder_factories=[DecoderFactory()]) + unique_topics = set() + for schema, channel, message in reader.iter_messages(): + unique_topics.add(channel.topic) + + + inclusion_topics = [topic for topic in unique_topics if topic not in self.config.topic_exclusion_list ] + + for msg in read_ros2_messages(rosbag2_path, inclusion_topics): if msg.channel.topic in self.config.topic_exclusion_list: continue @@ -91,9 +101,9 @@ def process_rosbag(self,rosbag2_path): self.write_api.write(bucket=self.config.influx_bucket, org=self.config.influx_org, record=record) except influxdb.exceptions.InfluxDBClientError as e: - self.config.logger.error(f"Error from Influx Client: {(e)}") + self.config.logger.error(f"Error from Influx Client: {(e.message)}") except Exception as e: - self.config.logger.error(f"Failed to process ros message with exception: {(e)}") + self.config.logger.error(f"Failed to write to influx with exception: {(e)}") except exceptions.McapError as e: diff --git a/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/rosbag2_processing_service/service_manager.py b/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/rosbag2_processing_service/service_manager.py index f15392f1..c94966b1 100644 --- a/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/rosbag2_processing_service/service_manager.py +++ b/telematic_system/telematic_historical_data_processing/rosbag2_processing_service/rosbag2_processing_service/service_manager.py @@ -24,7 +24,8 @@ from pathlib import Path import mysql.connector -from mysql.connector import errorcode +from mysql.connector import Error +import time class ServiceManager: @@ -114,12 +115,12 @@ async def nats_error_cb(err): # Nats request callback async def get_file_path_from_nats(self, msg): - self.config.logger.info("Entering process nats request") data = msg.data.decode() msg_json_object = json.loads(data) rosbag_path = msg_json_object["filepath"] + self.config.logger.info(f"Received nats request to process {rosbag_path}") # Add rosbag name to queue self.rosbag_queue.append(rosbag_path) @@ -145,22 +146,23 @@ def create_mysql_conn(self): try: conn = mysql.connector.connect(user= self.config.mysql_user, password= self.config.mysql_password, - host= self.config.mysql_host, - database= self.config.mysql_db, port = self.config.mysql_port) + host= self.config.mysql_host, + database= self.config.mysql_db, port = self.config.mysql_port) self.config.logger.info("Connected to MySQL database!") return conn except mysql.connector.Error as err: - if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: - self.config.logger.error(f"Mysql User name or password not accepted for user: {self.config.mysql_user} and pass: {self.config.mysql_password}") - elif err.errno == errorcode.ER_BAD_DB_ERROR: - self.config.logger.error(f"Mysql Database {self.config.mysql_db} does not exist") - else: - self.config.logger.error(f"Error connecting to mysql database: {err}") + self.config.logger.error(f"Error connecting to mysql database: {err.msg}") + def update_mysql_entry(self, file_name, process_status, process_error_msg="NA"): # This method updates the mysql database entry for the rosbag to process # Update the update fields with update values + if not self.mysql_conn.is_connected(): + # TODO restart service if not connected + self.config.logger.error("Mysqldb not connected") + + try: cursor = self.mysql_conn.cursor() @@ -170,6 +172,7 @@ def update_mysql_entry(self, file_name, process_status, process_error_msg="NA"): self.mysql_conn.commit() self.config.logger.info(f"Updated mysql entry for {file_name} to {process_status}") + cursor.close() except mysql.connector.Error as e: self.config.logger.error(f"Failed to update mysql table with error: {e}")