Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rosbag processing service #175

Merged
merged 35 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2ddf0ae
Add rosbag2 processing service
adev4a Feb 5, 2024
e7164f5
add copyright
adev4a Feb 5, 2024
45d755a
revert change
adev4a Feb 6, 2024
203ee04
fix rosmsg record
adev4a Feb 6, 2024
a217817
update async
adev4a Feb 7, 2024
d946f40
adds nats request
adev4a Feb 9, 2024
53fb6be
add dockerfile
adev4a Feb 12, 2024
dafc7bb
update docker build files
adev4a Feb 12, 2024
ae1a49c
add docker compose and update async implementation
adev4a Feb 13, 2024
387b1ac
update docker compose env vars
adev4a Feb 13, 2024
342562c
add nats request topic to env
adev4a Feb 13, 2024
34ca0e6
address comments
adev4a Feb 15, 2024
7edd074
add nats connect to separate thread
adev4a Feb 15, 2024
5f4c9b4
fix multi-threading
adev4a Feb 15, 2024
4adc368
add initial unit tests
adev4a Feb 15, 2024
13eabb8
address comments
adev4a Feb 15, 2024
a0d7749
address comments
adev4a Feb 15, 2024
689077d
catch exception for rosbag reading error
adev4a Feb 15, 2024
93595b8
add config
adev4a Feb 15, 2024
b57d3ec
add comment for generic exception
adev4a Feb 16, 2024
3d0c37e
fix
adev4a Feb 16, 2024
af7bdbb
address comments
adev4a Feb 16, 2024
92f3e48
Feature add CI/CD for rosbag2_processing_service (#177)
SaikrishnaBairamoni Feb 19, 2024
fefb79f
update unit tests and ci steps
adev4a Feb 20, 2024
3a06741
fix pytest.ini
adev4a Feb 21, 2024
564d307
fix unit test
adev4a Feb 21, 2024
627b29f
update resource file path
adev4a Feb 22, 2024
e9b3e95
update build steps
adev4a Feb 22, 2024
2753411
update tests
adev4a Feb 22, 2024
615f4f2
add check for file exists
adev4a Feb 22, 2024
6be9b5a
fix unit test
adev4a Feb 22, 2024
bafbd04
fix test
adev4a Feb 22, 2024
a48a60c
fix nats request path
adev4a Feb 22, 2024
133137f
fix unit test
adev4a Feb 22, 2024
45799fd
rename dir
adev4a Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we name this historical_data_processing.env?

Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
TOPIC_EXCLUSION_LIST=["/rosout","/parameter_events","/robot_description","/tf_static","/tf","/environment/fused_external_objects","/environment/base_map","/environment/external_objects_viz","/environment/motion_computation_visualize","/environment/external_object_predictions","/localization/current_pose","/localization/map_param_loader/georeference","/environment/semantic_map","/hardware_interface/comms/outbound_binary_msg","/environment/lanelet2_map_viz","/localization/points_map"]
NATS_SERVER_IP_PORT=<>
LOG_LEVEL=debug
LOG_NAME=rosbag2_processing_service
LOG_PATH=/var/logs
LOG_ROTATION_SIZE_BYTES=2147483648
LOG_HANDLER_TYPE=console
INFLUX_BUCKET=test-bucket
INFLUX_ORG=my-org
INFLUX_TOKEN=<>
INFLUX_URL=<>
ROSBAG_STORAGE_DIR=/home/carma/rosbag_log
TO_STR_FIELDS=[]
IGNORE_FIELDS=[]
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#
# Copyright (C) 2024 LEIDOS.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#

import asyncio
from .service_manager import ServiceManager

from dotenv import load_dotenv

load_dotenv('/home/carma/cda-telematics/telematic_system/historical_data_processing/rosbag2_processing_service/.env')
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved


def main(args=None):
adamlm marked this conversation as resolved.
Show resolved Hide resolved

loop = asyncio.get_event_loop()

service_manager = ServiceManager()

tasks = [
loop.create_task(service_manager.nats_connect()),
loop.create_task(service_manager.process_rosbag())
]

loop.run_until_complete(asyncio.wait(tasks))

loop.close()

if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#
# Copyright (C) 2024 LEIDOS.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#

from mcap_ros2.reader import read_ros2_messages
import json
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved
import re
import time
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNCHRONOUS

import asyncio
import os


class Rosbag2Parser:
# Class that defines the rosbag parser
# Stores the global definitions that don't change on each iteration of rosbag processing
# influx authentication
def __init__(self, influx_bucket, influx_org, influx_token, influx_url, topic_exclusion_list, log_dir, to_str_fields, ignore_fields, logger):

# Set influxdb parameters
self.influx_bucket = influx_bucket
self.influx_org = influx_org
self.influx_client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org)
self.topic_exclusion_list = topic_exclusion_list
self.log_dir = log_dir
self.logger = logger

#Fields in the ros message to force to string type.
self.to_str_fields= to_str_fields
# Fields in the ros message to ignore
self.ignore_fields= ignore_fields

# Create write API
self.write_api = self.influx_client.write_api(write_options=ASYNCHRONOUS)

# Processing status
self.is_processing = False


async def process_rosbag(self,rosbag2_name):
measurement_name = rosbag2_name.split('.mcap')[0] # Measurement name is rosbag name without mcap extension
adamlm marked this conversation as resolved.
Show resolved Hide resolved

rosbag_path = os.path.join(self.log_dir, rosbag2_name)


# Load the rosbag from the config directory
for msg in read_ros2_messages(rosbag_path):
try:
if msg.channel.topic not in self.topic_exclusion_list:
adamlm marked this conversation as resolved.
Show resolved Hide resolved
topic = msg.channel.topic
ros_msg = msg.ros_msg
msg_attributes = self.extract_attributes(ros_msg)
msg_timestamp = msg.publish_time_ns

record = f"{measurement_name},topic_name={topic},"

for attr_name, attr_value in msg_attributes:
if attr_name in self.ignore_fields:
continue

elif attr_name in self.to_str_fields:
attr_value = f'"{attr_value}"'
record += f"{attr_name}={attr_value},"

elif isinstance(attr_value, list): # Handle arrays
record += f'{attr_name}="{str(attr_value)}",'
else:
if isinstance(attr_value, str):
attr_value = f'"{attr_value}"' # Correctly format string values
record += f"{attr_name}={attr_value},"


# Remove last comma
record = record[:-1]
# Add timestamp at the end
record += f" timestamp={msg_timestamp}"
# print(record)
#Write record to influx
self.write_api.write(bucket=self.influx_bucket, org=self.influx_org, record=record)

except Exception as e:
adamlm marked this conversation as resolved.
Show resolved Hide resolved
self.logger.warn(f"Failed to process ros message with exception: " + str(e))
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved
self.logger.info("Completed rosbag processing for {rosbag2_name}")
self.is_processing = False


def extract_attributes(self, obj, parent_attr=None):
attributes = []
for attr_name in dir(obj):
try:
if not callable(getattr(obj, attr_name)) and not attr_name.startswith("__") and not attr_name.startswith("_"):
adamlm marked this conversation as resolved.
Show resolved Hide resolved
attr_value = getattr(obj, attr_name)
if parent_attr:
attr_name = f"{parent_attr}.{attr_name}"
if hasattr(attr_value, '__dict__'):
# Recursively extract attributes for nested objects
nested_attributes = self.extract_attributes(attr_value, attr_name)
attributes.extend(nested_attributes)
else:
attributes.append((attr_name, attr_value))
except Exception as e:
adamlm marked this conversation as resolved.
Show resolved Hide resolved
self.logger.error("Unable to get attributes for ros message with exception: " + str(e))
return attributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#
# Copyright (C) 2024 LEIDOS.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
import asyncio
from nats.aio.client import Client as NATS
import os
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime
from enum import Enum
from .rosbag_processor import Rosbag2Parser

class LogType(Enum):
FILE = "file"
CONSOLE = "console"
ALL = "all"

class ServiceManager:

rosbag_queue = []
def __init__(self):
#NATS client to receive requests from the web ui
self.nc = NATS()
# List of rosbags to be processeds
self.rosbag_queue = ['rosbag2_2024_01_26-15_00_24_0.mcap']
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved

# Load config parameters
# Configured directory to read rosbags from
self.rosbag_dir = os.getenv("ROSBAG_STORAGE_DIR")
# List of topics to be excluded from reading
self.topic_exclusion_list = os.getenv("TOPIC_EXCLUSION_LIST")
#NATS params
self.nats_ip_port = os.getenv("NATS_SERVER_IP_PORT")
#Logging configuration parameters
self.log_level = os.getenv("LOG_LEVEL")
self.log_name = os.getenv("LOG_NAME")
self.log_path = os.getenv("LOG_PATH")
self.log_rotation = int(os.getenv("LOG_ROTATION_SIZE_BYTES"))
self.log_handler_type = os.getenv("LOG_HANDLER_TYPE")

# Create logger
if self.log_handler_type == LogType.ALL.value:
# If all create log handler for both file and console
self.createLogger(LogType.FILE.value)
self.createLogger(LogType.CONSOLE.value)
elif self.log_handler_type == LogType.FILE.value or self.log_handler_type == LogType.CONSOLE.value:
self.createLogger(self.log_handler_type)
else:
self.createLogger(LogType.CONSOLE.value)
self.logger.warn("Incorrect Log type defined, defaulting to console")

# Load Influx params
self.influx_bucket = os.getenv("INFLUX_BUCKET")
self.influx_org = os.getenv("INFLUX_ORG")
self.influx_token = os.getenv("INFLUX_TOKEN")
self.influx_url = os.getenv("INFLUX_URL")

#Fields in the ros message to force to string type.
self.to_str_fields= os.getenv("TO_STR_FIELDS")
# Fields in the ros message to ignore
self.ignore_fields= os.getenv("IGNORE_FIELDS")

# Create rosbag parser object
self.rosbag_parser = Rosbag2Parser(self.influx_bucket, self.influx_org, self.influx_token, self.influx_url, self.topic_exclusion_list, self.rosbag_dir, self.to_str_fields, self.ignore_fields, self.logger)

#nats connection status
self.is_nats_connected = False


def createLogger(self, log_type):
"""Creates log file for the ROS2NatsBridge with configuration items based on the settings input in the params.yaml file"""
# create log file and set log levels
self.logger = logging.getLogger(self.log_name)
now = datetime.now()
dt_string = now.strftime("_%m_%d_%Y_%H_%M_%S")
log_name = self.log_name + dt_string + ".log"
formatter = logging.Formatter(
dan-du-car marked this conversation as resolved.
Show resolved Hide resolved
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# Create a rotating log handler that will rotate after maxBytes rotation, that can be configured in the
# params yaml file. The backup count is how many rotating logs will be created after reaching the maxBytes size
if log_type == LogType.FILE.value:
self.log_handler = RotatingFileHandler(
self.log_path+log_name, maxBytes=self.log_rotation, backupCount=5)
else:
self.log_handler = logging.StreamHandler()
self.log_handler.setFormatter(formatter)

if(self.log_level == "debug"):
self.logger.setLevel(logging.DEBUG)
self.log_handler.setLevel(logging.DEBUG)
elif(self.log_level == "info"):
self.logger.setLevel(logging.INFO)
self.log_handler.setLevel(logging.INFO)
elif(self.log_level == "error"):
self.logger.setLevel(logging.ERROR)
self.log_handler.setLevel(logging.ERROR)

self.logger.addHandler(self.log_handler)

async def nats_connect(self):
"""
connect to nats server on EC2
"""

async def disconnected_cb():
self.registered = False
self.logger.warn("Got disconnected...")

async def reconnected_cb():
self.logger.warn("Got reconnected...")

async def error_cb(err):
self.logger.error("{0}".format(err))

# while self.running:
if not self.is_nats_connected:
try:
await self.nc.connect(self.nats_ip_port,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
error_cb=error_cb,
max_reconnect_attempts=-1)
self.logger.info("Connected to NATS Server!")
self.is_nats_connected = True
finally:
self.logger.info("Client is trying to connect to NATS Server Done.")



async def process_rosbag(self):
# This task is responsible for processing the rosbag in the queue - As long as the queue is not empty - keep processin
# This is async because we should be able to keep adding items to the rosbag and keep this task active at the same time
#while self.running:
#If Queue is not empty - create a new rosbag parser
if self.rosbag_queue and not self.rosbag_parser.is_processing:
self.logger.info("Entering queue processing")
self.rosbag_parser.is_processing = True
await self.rosbag_parser.process_rosbag(self.rosbag_queue.pop())
Loading