Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wfd-515 #187

Merged
merged 9 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion telematic_system/docker-compose.historical.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ services:
- TO_STR_FIELDS=[]
- IGNORE_FIELDS=[]
- FILE_PROCESSING_SUBJECT=ui.file.processing
- ACCEPTED_FILE_EXTENSIONS=[".mcap"]
- ACCEPTED_FILE_EXTENSIONS=[".mcap"]
command: bash -c 'source /ws/install/setup.bash && wait-for-it ${DB_HOST}:${DB_PORT} -- ros2 run rosbag2_processing_service main'
24 changes: 15 additions & 9 deletions telematic_system/docker-compose.local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,38 +181,44 @@ services:
dockerfile: Dockerfile
network: host
image: usdotfhwastoldev/rosbag2_processing_service:develop
restart: always
depends_on:
- nats
- mysqldb
- influxdb
logging:
options:
max-size: "2g"
max-file: "1"
container_name: rosbag2_processing_service
entrypoint: /ws/entrypoint.sh
network_mode: host
volumes:
- /opt/telematics/logs:/opt/telematics/logs
- /opt/telematics/upload:/opt/telematics/upload
environment:
- TOPIC_EXCLUSION_LIST=["/rosout","/parameter_events","/robot_description","/tf_static","/tf","/environment/fused_external_objects","/environment/base_map","/environment/external_objects_viz","/environment/motion_computation_visualize","/environment/external_object_predictions","/localization/current_pose","/localization/map_param_loader/georeference","/environment/semantic_map","/hardware_interface/comms/outbound_binary_msg","/environment/lanelet2_map_viz","/localization/points_map"]
- NATS_SERVER_IP_PORT=${NATS_SERVERS}
- TOPIC_EXCLUSION_LIST=${ROSBAG2_EXCLUSION_LIST}
- NATS_SERVER_IP_PORT=nats://localhost:4222
- LOG_LEVEL=debug
- LOG_NAME=rosbag2_processing_service
- LOG_PATH=/opt/telematics/logs
- LOG_ROTATION_SIZE_BYTES=2147483648
- LOG_HANDLER_TYPE=console
- INFLUX_BUCKET=${INFLUXDB_DEV_INIT_BUCKET}
- INFLUX_ORG=my-org
- INFLUX_TOKEN=my-super-secret-token
- INFLUX_URL=<>
- MYSQL_HOST=${DB_HOST}# Add host IP where mysql database is hosted
- INFLUX_TOKEN=my-super-secret-auth-token
- INFLUX_URL=127.0.0.1:8086
- MYSQL_HOST=${DB_HOST} #Add host IP where mysql database is hosted
- MYSQL_PORT=${DB_PORT}
- MYSQL_DB=${MYSQL_DATABASE} # Name of Mysql databse
- MYSQL_DB=wfd_grafana # Name of Mysql databse
- MYSQL_USER=${DB_USER} # Login credentials for mysql database, User
- MYSQL_PASSWORD=${DB_PASSWORD} # Login credentials for mysql database, Password
- MYSQL_TABLE=file_infos
- UPLOAD_DESTINATION_PATH=/opt/telematics/upload # For cloud deployment:This is the directory S3 bucket is mounted
- UPLOAD_DESTINATION_PATH=${UPLOAD_DESTINATION_PATH} # For cloud deployment:This is the directory S3 bucket is mounted
- TO_STR_FIELDS=[]
- IGNORE_FIELDS=[]
- FILE_PROCESSING_SUBJECT=${FILE_PROCESSING_SUBJECT}
- FILE_PROCESSING_SUBJECT=ui.file.*
- ACCEPTED_FILE_EXTENSIONS=[".mcap"]
command: bash -c 'source /ws/install/setup.bash && wait-for-it ${DB_HOST}:${DB_PORT} -- ros2 run rosbag2_processing_service main'

secrets:
mysql_password:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ RUN chmod +x /ws/src/rosbag2_processing_service/docker/*
RUN sudo bash /ws/src/rosbag2_processing_service/docker/checkout.sh
RUN sudo bash /ws/src/rosbag2_processing_service/docker/install.sh

COPY rosbag2_processing_service/docker/entrypoint.sh entrypoint.sh
RUN chmod +x entrypoint.sh


RUN echo "source /ws/install/setup.bash" >> ~/.bashrc

CMD ["bash"]
CMD ["ros2 run rosbag2_processing_service main"]
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ sudo apt update
sudo apt install -y apt-utils \
python3-pybind11 \
ros-foxy-test-msgs \
python3-colcon-common-extensions
python3-colcon-common-extensions \
wait-for-it

# Build rosbag2
colcon build --packages-up-to rosbag2_processing_service
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
# the License.
#

from mcap_ros2.reader import read_ros2_messages
from mcap_ros2.reader import read_ros2_messages, DecoderFactory
from mcap.reader import McapReader, NonSeekingReader, SeekingReader, make_reader
import re
import time
from influxdb.exceptions import InfluxDBClientError
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNCHRONOUS
from .config import Config
Expand Down Expand Up @@ -80,7 +82,17 @@ def process_rosbag(self,rosbag2_path):

# Load the rosbag from the config directory
try:
for msg in read_ros2_messages(rosbag2_path):
fd = open(rosbag2_path, "rb")
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved
reader = make_reader(fd, decoder_factories=[DecoderFactory()])
unique_topics = []
for schema, channel, message in reader.iter_messages():
if channel.topic not in unique_topics:
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved
unique_topics.append(channel.topic)
fd.close()

inclusion_topics = [topic for topic in unique_topics if topic not in self.config.topic_exclusion_list ]

for msg in read_ros2_messages(rosbag2_path, inclusion_topics):
if msg.channel.topic in self.config.topic_exclusion_list:
continue

Expand All @@ -89,10 +101,10 @@ def process_rosbag(self,rosbag2_path):
#Write record to influx
self.write_api.write(bucket=self.config.influx_bucket, org=self.config.influx_org, record=record)

except InfluxDBClientError as e:
self.config.logger.error(f"Error from Influx Client: {(e)}")
except influxdb.exceptions.InfluxDBClientError as e:
self.config.logger.error(f"Error from Influx Client: {(e.message)}")
except Exception as e:
self.config.logger.error(f"Failed to process ros message with exception: {(e)}")
self.config.logger.error(f"Failed to write to influx with exception: {(e)}")


except exceptions.McapError as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from pathlib import Path

import mysql.connector
from mysql.connector import errorcode
from mysql.connector import Error
import time


class ServiceManager:
Expand Down Expand Up @@ -145,22 +146,26 @@ def create_mysql_conn(self):

try:
conn = mysql.connector.connect(user= self.config.mysql_user, password= self.config.mysql_password,
host= self.config.mysql_host,
database= self.config.mysql_db, port = self.config.mysql_port)
host= self.config.mysql_host,
database= self.config.mysql_db, port = self.config.mysql_port)
self.config.logger.info("Connected to MySQL database!")
return conn
except mysql.connector.Error as err:
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
if err.errno == mysql.connector.Error.errorcode.ER_ACCESS_DENIED_ERROR:
self.config.logger.error(f"Mysql User name or password not accepted for user: {self.config.mysql_user} and pass: {self.config.mysql_password}")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
elif err.errno == mysql.connector.Error.errorcode.ER_BAD_DB_ERROR:
self.config.logger.error(f"Mysql Database {self.config.mysql_db} does not exist")
else:
self.config.logger.error(f"Error connecting to mysql database: {err}")



def update_mysql_entry(self, file_name, process_status, process_error_msg="NA"):
# This method updates the mysql database entry for the rosbag to process
# Update the update fields with update values
if not self.mysql_conn.is_connected():
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved
self.mysql_conn = self.create_mysql_conn()

try:
cursor = self.mysql_conn.cursor()

Expand All @@ -170,6 +175,7 @@ def update_mysql_entry(self, file_name, process_status, process_error_msg="NA"):
self.mysql_conn.commit()

self.config.logger.info(f"Updated mysql entry for {file_name} to {process_status}")
cursor.close()

except mysql.connector.Error as e:
self.config.logger.error(f"Failed to update mysql table with error: {e}")