-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add rosbag processing service #175
Merged
Changes from 5 commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
2ddf0ae
Add rosbag2 processing service
adev4a e7164f5
add copyright
adev4a 45d755a
revert change
adev4a 203ee04
fix rosmsg record
adev4a a217817
update async
adev4a d946f40
adds nats request
adev4a 53fb6be
add dockerfile
adev4a dafc7bb
update docker build files
adev4a ae1a49c
add docker compose and update async implementation
adev4a 387b1ac
update docker compose env vars
adev4a 342562c
add nats request topic to env
adev4a 34ca0e6
address comments
adev4a 7edd074
add nats connect to separate thread
adev4a 5f4c9b4
fix multi-threading
adev4a 4adc368
add initial unit tests
adev4a 13eabb8
address comments
adev4a a0d7749
address comments
adev4a 689077d
catch exception for rosbag reading error
adev4a 93595b8
add config
adev4a b57d3ec
add comment for generic exception
adev4a 3d0c37e
fix
adev4a af7bdbb
address comments
adev4a 92f3e48
Feature add CI/CD for rosbag2_processing_service (#177)
SaikrishnaBairamoni fefb79f
update unit tests and ci steps
adev4a 3a06741
fix pytest.ini
adev4a 564d307
fix unit test
adev4a 627b29f
update resource file path
adev4a e9b3e95
update build steps
adev4a 2753411
update tests
adev4a 615f4f2
add check for file exists
adev4a 6be9b5a
fix unit test
adev4a bafbd04
fix test
adev4a a48a60c
fix nats request path
adev4a 133137f
fix unit test
adev4a 45799fd
rename dir
adev4a File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
14 changes: 14 additions & 0 deletions
14
telematic_system/historical_data_processing/rosbag2_processing_service/.env
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
TOPIC_EXCLUSION_LIST=["/rosout","/parameter_events","/robot_description","/tf_static","/tf","/environment/fused_external_objects","/environment/base_map","/environment/external_objects_viz","/environment/motion_computation_visualize","/environment/external_object_predictions","/localization/current_pose","/localization/map_param_loader/georeference","/environment/semantic_map","/hardware_interface/comms/outbound_binary_msg","/environment/lanelet2_map_viz","/localization/points_map"] | ||
NATS_SERVER_IP_PORT=<> | ||
LOG_LEVEL=debug | ||
LOG_NAME=rosbag2_processing_service | ||
LOG_PATH=/var/logs | ||
LOG_ROTATION_SIZE_BYTES=2147483648 | ||
LOG_HANDLER_TYPE=console | ||
INFLUX_BUCKET=test-bucket | ||
INFLUX_ORG=my-org | ||
INFLUX_TOKEN=<> | ||
INFLUX_URL=<> | ||
ROSBAG_STORAGE_DIR=/home/carma/rosbag_log | ||
TO_STR_FIELDS=[] | ||
IGNORE_FIELDS=[] |
dan-du-car marked this conversation as resolved.
Show resolved
Hide resolved
|
Empty file.
41 changes: 41 additions & 0 deletions
41
.../historical_data_processing/rosbag2_processing_service/rosbag2_processing_service/main.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
# | ||
# Copyright (C) 2024 LEIDOS. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
# use this file except in compliance with the License. You may obtain a copy of | ||
# the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
# License for the specific language governing permissions and limitations under | ||
# the License. | ||
# | ||
|
||
import asyncio | ||
from .service_manager import ServiceManager | ||
|
||
from dotenv import load_dotenv | ||
|
||
load_dotenv('/home/carma/cda-telematics/telematic_system/historical_data_processing/rosbag2_processing_service/.env') | ||
dan-du-car marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def main(args=None): | ||
adamlm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
loop = asyncio.get_event_loop() | ||
|
||
service_manager = ServiceManager() | ||
|
||
tasks = [ | ||
loop.create_task(service_manager.nats_connect()), | ||
loop.create_task(service_manager.process_rosbag()) | ||
] | ||
|
||
loop.run_until_complete(asyncio.wait(tasks)) | ||
|
||
loop.close() | ||
|
||
if __name__ == '__main__': | ||
main() |
117 changes: 117 additions & 0 deletions
117
...data_processing/rosbag2_processing_service/rosbag2_processing_service/rosbag_processor.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
# | ||
# Copyright (C) 2024 LEIDOS. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
# use this file except in compliance with the License. You may obtain a copy of | ||
# the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
# License for the specific language governing permissions and limitations under | ||
# the License. | ||
# | ||
|
||
from mcap_ros2.reader import read_ros2_messages | ||
import json | ||
dan-du-car marked this conversation as resolved.
Show resolved
Hide resolved
|
||
import re | ||
import time | ||
from influxdb_client import InfluxDBClient, Point | ||
from influxdb_client.client.write_api import ASYNCHRONOUS | ||
|
||
import asyncio | ||
import os | ||
|
||
|
||
class Rosbag2Parser: | ||
# Class that defines the rosbag parser | ||
# Stores the global definitions that don't change on each iteration of rosbag processing | ||
# influx authentication | ||
def __init__(self, influx_bucket, influx_org, influx_token, influx_url, topic_exclusion_list, log_dir, to_str_fields, ignore_fields, logger): | ||
|
||
# Set influxdb parameters | ||
self.influx_bucket = influx_bucket | ||
self.influx_org = influx_org | ||
self.influx_client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org) | ||
self.topic_exclusion_list = topic_exclusion_list | ||
self.log_dir = log_dir | ||
self.logger = logger | ||
|
||
#Fields in the ros message to force to string type. | ||
self.to_str_fields= to_str_fields | ||
# Fields in the ros message to ignore | ||
self.ignore_fields= ignore_fields | ||
|
||
# Create write API | ||
self.write_api = self.influx_client.write_api(write_options=ASYNCHRONOUS) | ||
|
||
# Processing status | ||
self.is_processing = False | ||
|
||
|
||
async def process_rosbag(self,rosbag2_name): | ||
measurement_name = rosbag2_name.split('.mcap')[0] # Measurement name is rosbag name without mcap extension | ||
adamlm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
rosbag_path = os.path.join(self.log_dir, rosbag2_name) | ||
|
||
|
||
# Load the rosbag from the config directory | ||
for msg in read_ros2_messages(rosbag_path): | ||
try: | ||
if msg.channel.topic not in self.topic_exclusion_list: | ||
adamlm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
topic = msg.channel.topic | ||
ros_msg = msg.ros_msg | ||
msg_attributes = self.extract_attributes(ros_msg) | ||
msg_timestamp = msg.publish_time_ns | ||
|
||
record = f"{measurement_name},topic_name={topic}," | ||
|
||
for attr_name, attr_value in msg_attributes: | ||
if attr_name in self.ignore_fields: | ||
continue | ||
|
||
elif attr_name in self.to_str_fields: | ||
attr_value = f'"{attr_value}"' | ||
record += f"{attr_name}={attr_value}," | ||
|
||
elif isinstance(attr_value, list): # Handle arrays | ||
record += f'{attr_name}="{str(attr_value)}",' | ||
else: | ||
if isinstance(attr_value, str): | ||
attr_value = f'"{attr_value}"' # Correctly format string values | ||
record += f"{attr_name}={attr_value}," | ||
|
||
|
||
# Remove last comma | ||
record = record[:-1] | ||
# Add timestamp at the end | ||
record += f" timestamp={msg_timestamp}" | ||
# print(record) | ||
#Write record to influx | ||
self.write_api.write(bucket=self.influx_bucket, org=self.influx_org, record=record) | ||
|
||
except Exception as e: | ||
adamlm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.logger.warn(f"Failed to process ros message with exception: " + str(e)) | ||
dan-du-car marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.logger.info("Completed rosbag processing for {rosbag2_name}") | ||
self.is_processing = False | ||
|
||
|
||
def extract_attributes(self, obj, parent_attr=None): | ||
attributes = [] | ||
for attr_name in dir(obj): | ||
try: | ||
if not callable(getattr(obj, attr_name)) and not attr_name.startswith("__") and not attr_name.startswith("_"): | ||
adamlm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
attr_value = getattr(obj, attr_name) | ||
if parent_attr: | ||
attr_name = f"{parent_attr}.{attr_name}" | ||
if hasattr(attr_value, '__dict__'): | ||
# Recursively extract attributes for nested objects | ||
nested_attributes = self.extract_attributes(attr_value, attr_name) | ||
attributes.extend(nested_attributes) | ||
else: | ||
attributes.append((attr_name, attr_value)) | ||
except Exception as e: | ||
adamlm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.logger.error("Unable to get attributes for ros message with exception: " + str(e)) | ||
return attributes |
151 changes: 151 additions & 0 deletions
151
..._data_processing/rosbag2_processing_service/rosbag2_processing_service/service_manager.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
# | ||
# Copyright (C) 2024 LEIDOS. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
# use this file except in compliance with the License. You may obtain a copy of | ||
# the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
# License for the specific language governing permissions and limitations under | ||
# the License. | ||
# | ||
import asyncio | ||
from nats.aio.client import Client as NATS | ||
import os | ||
import logging | ||
from logging.handlers import RotatingFileHandler | ||
from datetime import datetime | ||
from enum import Enum | ||
from .rosbag_processor import Rosbag2Parser | ||
|
||
class LogType(Enum): | ||
FILE = "file" | ||
CONSOLE = "console" | ||
ALL = "all" | ||
|
||
class ServiceManager: | ||
|
||
rosbag_queue = [] | ||
def __init__(self): | ||
#NATS client to receive requests from the web ui | ||
self.nc = NATS() | ||
# List of rosbags to be processeds | ||
self.rosbag_queue = ['rosbag2_2024_01_26-15_00_24_0.mcap'] | ||
dan-du-car marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Load config parameters | ||
# Configured directory to read rosbags from | ||
self.rosbag_dir = os.getenv("ROSBAG_STORAGE_DIR") | ||
# List of topics to be excluded from reading | ||
self.topic_exclusion_list = os.getenv("TOPIC_EXCLUSION_LIST") | ||
#NATS params | ||
self.nats_ip_port = os.getenv("NATS_SERVER_IP_PORT") | ||
#Logging configuration parameters | ||
self.log_level = os.getenv("LOG_LEVEL") | ||
self.log_name = os.getenv("LOG_NAME") | ||
self.log_path = os.getenv("LOG_PATH") | ||
self.log_rotation = int(os.getenv("LOG_ROTATION_SIZE_BYTES")) | ||
self.log_handler_type = os.getenv("LOG_HANDLER_TYPE") | ||
|
||
# Create logger | ||
if self.log_handler_type == LogType.ALL.value: | ||
# If all create log handler for both file and console | ||
self.createLogger(LogType.FILE.value) | ||
self.createLogger(LogType.CONSOLE.value) | ||
elif self.log_handler_type == LogType.FILE.value or self.log_handler_type == LogType.CONSOLE.value: | ||
self.createLogger(self.log_handler_type) | ||
else: | ||
self.createLogger(LogType.CONSOLE.value) | ||
self.logger.warn("Incorrect Log type defined, defaulting to console") | ||
|
||
# Load Influx params | ||
self.influx_bucket = os.getenv("INFLUX_BUCKET") | ||
self.influx_org = os.getenv("INFLUX_ORG") | ||
self.influx_token = os.getenv("INFLUX_TOKEN") | ||
self.influx_url = os.getenv("INFLUX_URL") | ||
|
||
#Fields in the ros message to force to string type. | ||
self.to_str_fields= os.getenv("TO_STR_FIELDS") | ||
# Fields in the ros message to ignore | ||
self.ignore_fields= os.getenv("IGNORE_FIELDS") | ||
|
||
# Create rosbag parser object | ||
self.rosbag_parser = Rosbag2Parser(self.influx_bucket, self.influx_org, self.influx_token, self.influx_url, self.topic_exclusion_list, self.rosbag_dir, self.to_str_fields, self.ignore_fields, self.logger) | ||
|
||
#nats connection status | ||
self.is_nats_connected = False | ||
|
||
|
||
def createLogger(self, log_type): | ||
"""Creates log file for the ROS2NatsBridge with configuration items based on the settings input in the params.yaml file""" | ||
# create log file and set log levels | ||
self.logger = logging.getLogger(self.log_name) | ||
now = datetime.now() | ||
dt_string = now.strftime("_%m_%d_%Y_%H_%M_%S") | ||
log_name = self.log_name + dt_string + ".log" | ||
formatter = logging.Formatter( | ||
dan-du-car marked this conversation as resolved.
Show resolved
Hide resolved
|
||
'%(asctime)s - %(name)s - %(levelname)s - %(message)s') | ||
|
||
# Create a rotating log handler that will rotate after maxBytes rotation, that can be configured in the | ||
# params yaml file. The backup count is how many rotating logs will be created after reaching the maxBytes size | ||
if log_type == LogType.FILE.value: | ||
self.log_handler = RotatingFileHandler( | ||
self.log_path+log_name, maxBytes=self.log_rotation, backupCount=5) | ||
else: | ||
self.log_handler = logging.StreamHandler() | ||
self.log_handler.setFormatter(formatter) | ||
|
||
if(self.log_level == "debug"): | ||
self.logger.setLevel(logging.DEBUG) | ||
self.log_handler.setLevel(logging.DEBUG) | ||
elif(self.log_level == "info"): | ||
self.logger.setLevel(logging.INFO) | ||
self.log_handler.setLevel(logging.INFO) | ||
elif(self.log_level == "error"): | ||
self.logger.setLevel(logging.ERROR) | ||
self.log_handler.setLevel(logging.ERROR) | ||
|
||
self.logger.addHandler(self.log_handler) | ||
|
||
async def nats_connect(self): | ||
""" | ||
connect to nats server on EC2 | ||
""" | ||
|
||
async def disconnected_cb(): | ||
self.registered = False | ||
self.logger.warn("Got disconnected...") | ||
|
||
async def reconnected_cb(): | ||
self.logger.warn("Got reconnected...") | ||
|
||
async def error_cb(err): | ||
self.logger.error("{0}".format(err)) | ||
|
||
# while self.running: | ||
if not self.is_nats_connected: | ||
try: | ||
await self.nc.connect(self.nats_ip_port, | ||
reconnected_cb=reconnected_cb, | ||
disconnected_cb=disconnected_cb, | ||
error_cb=error_cb, | ||
max_reconnect_attempts=-1) | ||
self.logger.info("Connected to NATS Server!") | ||
self.is_nats_connected = True | ||
finally: | ||
self.logger.info("Client is trying to connect to NATS Server Done.") | ||
|
||
|
||
|
||
async def process_rosbag(self): | ||
# This task is responsible for processing the rosbag in the queue - As long as the queue is not empty - keep processin | ||
# This is async because we should be able to keep adding items to the rosbag and keep this task active at the same time | ||
#while self.running: | ||
#If Queue is not empty - create a new rosbag parser | ||
if self.rosbag_queue and not self.rosbag_parser.is_processing: | ||
self.logger.info("Entering queue processing") | ||
self.rosbag_parser.is_processing = True | ||
await self.rosbag_parser.process_rosbag(self.rosbag_queue.pop()) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we name this historical_data_processing.env?