From f81bf2d13f592dc6014e657129195d797ec7d56a Mon Sep 17 00:00:00 2001 From: Davide Faconti Date: Thu, 23 Nov 2023 14:37:55 +0100 Subject: [PATCH] data_tamer added to rosx_introspection --- .../data_tamer_parser}/data_tamer_parser.hpp | 143 ++++++++---------- .../include/PlotJuggler/messageparser_base.h | 2 +- .../include/PlotJuggler/special_messages.h | 20 +++ .../DataLoadMCAP/dataload_mcap.cpp | 37 ++++- .../ParserDataTamer/datatamer_parser.cpp | 2 +- plotjuggler_plugins/ParserROS/ros_parser.cpp | 101 +++++++++++-- plotjuggler_plugins/ParserROS/ros_parser.h | 15 +- .../rosx_introspection/deserializer.hpp | 24 +-- .../rosx_introspection/src/deserializer.cpp | 36 ++++- 9 files changed, 260 insertions(+), 120 deletions(-) rename {plotjuggler_plugins/ParserDataTamer/contrib => 3rdparty/data_tamer_parser}/data_tamer_parser.hpp (77%) diff --git a/plotjuggler_plugins/ParserDataTamer/contrib/data_tamer_parser.hpp b/3rdparty/data_tamer_parser/data_tamer_parser.hpp similarity index 77% rename from plotjuggler_plugins/ParserDataTamer/contrib/data_tamer_parser.hpp rename to 3rdparty/data_tamer_parser/data_tamer_parser.hpp index 68ffad940..292892735 100644 --- a/plotjuggler_plugins/ParserDataTamer/contrib/data_tamer_parser.hpp +++ b/3rdparty/data_tamer_parser/data_tamer_parser.hpp @@ -51,14 +51,6 @@ using VarNumber = std::variant< VarNumber DeserializeToVarNumber(BasicType type, const void* src); -/// Return the number of bytes needed to serialize the type -size_t SizeOf(const BasicType& type); - -const std::string& ToStr(const BasicType& type); - -/// Convert string to its type -BasicType FromStr(const std::string &str); - /** * @brief DataTamer uses a simple "flat" schema of key/value pairs (each pair is a "field"). */ @@ -132,22 +124,6 @@ bool ParseSnapshot(const Schema& schema, //--------------------------------------------------------- //--------------------------------------------------------- -inline BasicType FromStr(const std::string& str) -{ - static const auto kMap = []() { - std::unordered_map map; - for(size_t i=0; i(i); - map[ToStr(type)] = type; - } - return map; - }(); - - auto const it = kMap.find(str); - return it == kMap.end() ? BasicType::OTHER : it->second; -} - template inline T Deserialize(BufferSpan& buffer) { @@ -192,30 +168,6 @@ inline VarNumber DeserializeToVarNumber(BasicType type, return {}; } -inline size_t SizeOf(const BasicType& type) -{ - static constexpr std::array kSizes = - { 1, 1, - 1, 1, - 2, 2, 4, 4, 8, 8, - 4, 8, 0 }; - return kSizes[static_cast(type)]; -} - -const std::string& ToStr(const BasicType& type) -{ - static const std::array kNames = { - "bool", "char", - "int8", "uint8", - "int16", "uint16", - "int32", "uint32", - "int64", "uint64", - "float32", "float64", - "other" - }; - return kNames[static_cast(type)]; -} - inline bool GetBit(BufferSpan mask, size_t index) { const uint8_t& byte = mask.data[index >> 3]; @@ -255,11 +207,6 @@ bool Schema::Field::operator==(const Field &other) const inline Schema BuilSchemaFromText(const std::string& txt) { - auto startWith = [](const std::string& str, const std::string& match) -> bool - { - auto pos = str.find(match); - return pos == 0; - }; auto trimString = [](std::string& str) { while(str.back() == ' ' || str.back() == '\r') { @@ -287,68 +234,104 @@ inline Schema BuilSchemaFromText(const std::string& txt) // we are not interested to this section of the schema break; } - if(line.find("__version__:") != std::string::npos) + + // a single space is expected + auto space_pos = line.find(' '); + if(space_pos == std::string::npos) + { + throw std::runtime_error("Unexpected line: " + line); + } + std::string str_left = line.substr(0, space_pos); + std::string str_right = line.substr(space_pos+1, line.size() - (space_pos+1)); + trimString(str_left); + trimString(str_right); + + const std::string* str_type = &str_left; + const std::string* str_name = &str_right; + + if(str_left == "__version__:") { // check compatibility - line.erase(0, sizeof("__version__:")); - if(std::stoi(line) != SCHEMA_VERSION) + if(std::stoi(str_right) != SCHEMA_VERSION) { throw std::runtime_error("Wrong SCHEMA_VERSION"); } continue; } - if(line.find("__hash__:") != std::string::npos) + if(str_left == "__hash__:") { // check compatibility - line.erase(0, sizeof("__hash__:")); - declared_schema = std::stoul(line); + declared_schema = std::stoul(str_right); continue; } - if(line.find("__channel_name__:") != std::string::npos) + if(str_left == "__channel_name__:") { // check compatibility - line.erase(0, sizeof("__channel_name__:")); - schema.channel_name = line; + schema.channel_name = str_right; schema.hash = std::hash()(schema.channel_name); continue; } Schema::Field field; + static const std::array kNamesNew = { + "bool", "char", + "int8", "uint8", + "int16", "uint16", + "int32", "uint32", + "int64", "uint64", + "float32", "float64", + "other" + }; + // backcompatibility to old format + static const std::array kNamesOld = { + "BOOL", "CHAR", + "INT8", "UINT8", + "INT16", "UINT16", + "INT32", "UINT32", + "INT64", "UINT64", + "FLOAT", "DOUBLE", + "OTHER" + }; + for(size_t i=0; i(i); - const auto& type_name = ToStr(type); - if(startWith(line, type_name)) + if(str_left.find(kNamesNew[i]) == 0) + { + field.type = static_cast(i); + break; + } + if(str_right.find(kNamesOld[i]) == 0) { - field.type = type; + field.type = static_cast(i); + std::swap(str_type, str_name); break; } } - auto offset = line.find_first_of(" ["); if(field.type == BasicType::OTHER) { - field.custom_type_name = line.substr(0, offset); + field.custom_type_name = *str_type; } - if(line[offset]=='[') + auto offset = str_type->find_first_of(" ["); + if(offset != std::string::npos && str_type->at(offset)=='[') { field.is_vector = true; - auto pos = line.find(']', offset); + auto pos = str_type->find(']', offset); if(pos != offset+1) { // get number - std::string sub_string = line.substr(offset+1, pos - offset); - field.array_size = static_cast(std::stoi(sub_string)); + std::string number_string = line.substr(offset+1, pos - offset - 1); + field.array_size = static_cast(std::stoi(number_string)); } } - offset = line.find(' ', offset); - field.name = line.substr(offset + 1, line.size() - offset -1); + + field.name = *str_name; trimString(field.name); - // update the hash + // update the hash schema.hash = AddFieldToHash(field, schema.hash); schema.fields.push_back(field); @@ -361,10 +344,10 @@ inline Schema BuilSchemaFromText(const std::string& txt) } template inline -bool ParseSnapshot(const Schema& schema, - SnapshotView snapshot, - const NumberCallback& callback_number, - const CustomCallback& callback_custom) + bool ParseSnapshot(const Schema& schema, + SnapshotView snapshot, + const NumberCallback& callback_number, + const CustomCallback& callback_custom) { if(schema.hash != snapshot.schema_hash) { @@ -376,7 +359,7 @@ bool ParseSnapshot(const Schema& schema, { const auto& field = schema.fields[i]; if(GetBit(snapshot.active_mask, i)) - { + { if(!field.is_vector) { // regular field, not vector/array diff --git a/plotjuggler_base/include/PlotJuggler/messageparser_base.h b/plotjuggler_base/include/PlotJuggler/messageparser_base.h index bcd2055f3..c506e13ba 100644 --- a/plotjuggler_base/include/PlotJuggler/messageparser_base.h +++ b/plotjuggler_base/include/PlotJuggler/messageparser_base.h @@ -20,7 +20,7 @@ namespace PJ { /* - * A messgaeParser is a clas that is able to convert a message received by + * A messageParser is a class that is able to convert a message received by * a DataStreamer plugin into data in PlotDataMapRef. * * - Each data Source has its own instance of MessageParser diff --git a/plotjuggler_base/include/PlotJuggler/special_messages.h b/plotjuggler_base/include/PlotJuggler/special_messages.h index c80f73520..3c3e8037d 100644 --- a/plotjuggler_base/include/PlotJuggler/special_messages.h +++ b/plotjuggler_base/include/PlotJuggler/special_messages.h @@ -113,6 +113,26 @@ struct JointState static const char* id() { return "sensor_msgs/JointState"; } }; +//-------------------- + +struct DataTamerSchemas +{ + // no need to save any additional information + + static const char* id() { return "data_tamer_msgs/Schemas"; } +}; + +struct DataTamerSnapshot +{ + std::string prefix; + uint64_t timestamp_nsec; + uint64_t schema_hash; + std::vector active_mask; + std::vector payload; + + static const char* id() { return "data_tamer_msgs/Snapshot"; } +}; + } #endif // SPECIAL_MESSAGES_H diff --git a/plotjuggler_plugins/DataLoadMCAP/dataload_mcap.cpp b/plotjuggler_plugins/DataLoadMCAP/dataload_mcap.cpp index b1bf5e9bd..15a9cb19b 100644 --- a/plotjuggler_plugins/DataLoadMCAP/dataload_mcap.cpp +++ b/plotjuggler_plugins/DataLoadMCAP/dataload_mcap.cpp @@ -9,6 +9,8 @@ #include #include +#include "data_tamer_parser/data_tamer_parser.hpp" + #include "mcap/reader.hpp" #include "dialog_mcap.h" @@ -61,13 +63,19 @@ bool DataLoadMCAP::readDataFromFile(FileLoadInfo* info, PlotDataMapRef& plot_dat } auto statistics = reader.statistics(); - std::unordered_map schemas; // schema_id + std::unordered_map mcap_schemas; // schema_id std::unordered_map channels; // channel_id std::unordered_map parsers_by_channel; // channel_id - for (const auto& [schema_id, shema_ptr] : reader.schemas()) + std::unordered_map dt_schames; + int total_dt_schemas = 0; + + std::unordered_set channels_containing_datatamer_schema; + std::unordered_set channels_containing_datatamer_data; + + for (const auto& [schema_id, schema_ptr] : reader.schemas()) { - schemas.insert( {schema_id, shema_ptr} ); + mcap_schemas.insert( {schema_id, schema_ptr} ); } std::set notified_encoding_problem; @@ -75,11 +83,20 @@ bool DataLoadMCAP::readDataFromFile(FileLoadInfo* info, PlotDataMapRef& plot_dat for (const auto& [channel_id, channel_ptr] : reader.channels()) { channels.insert( {channel_id, channel_ptr} ); - const auto& schema = schemas.at(channel_ptr->schemaId); + const auto& schema = mcap_schemas.at(channel_ptr->schemaId); const auto& topic_name = channel_ptr->topic; std::string definition(reinterpret_cast(schema->data.data()), schema->data.size()); + if(schema->name == "data_tamer_msgs/msg/Schemas") + { + channels_containing_datatamer_schema.insert(channel_id); + total_dt_schemas += statistics->channelMessageCounts.at(channel_id); + } + if(schema->name == "data_tamer_msgs/msg/Snapshot") + { + channels_containing_datatamer_data.insert(channel_id); + } QString channel_encoding = QString::fromStdString(channel_ptr->messageEncoding); QString schema_encoding = QString::fromStdString(schema->encoding); @@ -112,7 +129,7 @@ bool DataLoadMCAP::readDataFromFile(FileLoadInfo* info, PlotDataMapRef& plot_dat parsers_by_channel.insert( {channel_ptr->id, parser} ); }; - DialogMCAP dialog(channels, schemas); + DialogMCAP dialog(channels, mcap_schemas); auto ret = dialog.exec(); if (ret != QDialog::Accepted) { @@ -165,7 +182,6 @@ bool DataLoadMCAP::readDataFromFile(FileLoadInfo* info, PlotDataMapRef& plot_dat // MCAP always represents publishTime in nanoseconds double timestamp_sec = double(msg_view.message.publishTime) * 1e-9; - auto parser_it = parsers_by_channel.find(msg_view.channel->id); if( parser_it == parsers_by_channel.end() ) { @@ -177,6 +193,15 @@ bool DataLoadMCAP::readDataFromFile(FileLoadInfo* info, PlotDataMapRef& plot_dat MessageRef msg(msg_view.message.data, msg_view.message.dataSize); parser->parseMessage(msg, timestamp_sec); + // data tamer schema + if( channels_containing_datatamer_schema.count(msg_view.channel->id) != 0) + { + + + } + + // regular message + if (msg_count++ % 1000 == 0) { QApplication::processEvents(); diff --git a/plotjuggler_plugins/ParserDataTamer/datatamer_parser.cpp b/plotjuggler_plugins/ParserDataTamer/datatamer_parser.cpp index 0b4f04a03..0496c2977 100644 --- a/plotjuggler_plugins/ParserDataTamer/datatamer_parser.cpp +++ b/plotjuggler_plugins/ParserDataTamer/datatamer_parser.cpp @@ -2,7 +2,7 @@ #include #include "datatamer_parser.h" -#include "contrib/data_tamer_parser.hpp" +#include "data_tamer_parser/data_tamer_parser.hpp" #include "PlotJuggler/fmt/format.h" using namespace PJ; diff --git a/plotjuggler_plugins/ParserROS/ros_parser.cpp b/plotjuggler_plugins/ParserROS/ros_parser.cpp index 32496eaea..b123f9baf 100644 --- a/plotjuggler_plugins/ParserROS/ros_parser.cpp +++ b/plotjuggler_plugins/ParserROS/ros_parser.cpp @@ -1,4 +1,5 @@ #include "ros_parser.h" +#include "data_tamer_parser/data_tamer_parser.hpp" #include "PlotJuggler/fmt/format.h" using namespace PJ; @@ -7,6 +8,8 @@ using namespace RosMsgParser; static ROSType quaternion_type( Msg::Quaternion::id() ); constexpr double RAD_TO_DEG = 180.0 / M_PI; +static std::unordered_map _global_data_tamer_schemas; + ParserROS::ParserROS(const std::string& topic_name, const std::string& type_name, const std::string& schema, @@ -22,10 +25,28 @@ ParserROS::ParserROS(const std::string& topic_name, _parser.setMaxArrayPolicy( policy, maxArraySize() ); - - _is_diangostic_msg = ( Msg::DiagnosticStatus::id() == type_name); - _is_jointstate_msg = ( Msg::JointState::id() == type_name); - _is_tf2_msg = ( Msg::TFMessage::id() == type_name); + using std::placeholders::_1; + using std::placeholders::_2; + if( Msg::DiagnosticStatus::id() == type_name) + { + _customized_parser = std::bind(&ParserROS::parseDiagnosticMsg, this, _1, _2); + } + else if( Msg::JointState::id() == type_name) + { + _customized_parser = std::bind(&ParserROS::parseJointStateMsg, this, _1, _2); + } + else if( Msg::TFMessage::id() == type_name) + { + _customized_parser = std::bind(&ParserROS::parseTF2Msg, this, _1, _2); + } + else if( Msg::DataTamerSchemas::id() == type_name) + { + _customized_parser = std::bind(&ParserROS::parseDataTamerSchemasMsg, this, _1, _2); + } + else if( Msg::DataTamerSnapshot::id() == type_name) + { + _customized_parser = std::bind(&ParserROS::parseDataTamerSnapshotMsg, this, _1, _2); + } //---------- detect quaternion in schema -------- auto hasQuaternion = [this](const auto& node) { @@ -40,19 +61,9 @@ ParserROS::ParserROS(const std::string& topic_name, bool ParserROS::parseMessage(const PJ::MessageRef serialized_msg, double ×tamp) { - if( _is_diangostic_msg ) - { - parseDiagnosticMsg(serialized_msg, timestamp); - return true; - } - if( _is_jointstate_msg ) + if( _customized_parser ) { - parseJointStateMsg(serialized_msg, timestamp); - return true; - } - if( _is_tf2_msg ) - { - parseTF2Msg(serialized_msg, timestamp); + _customized_parser(serialized_msg, timestamp); return true; } @@ -334,3 +345,61 @@ void ParserROS::parseTF2Msg(const MessageRef msg_buffer, double &stamp) } } + +void ParserROS::parseDataTamerSchemasMsg(const PJ::MessageRef msg_buffer, double ×tamp) +{ + _deserializer->init( Span(msg_buffer.data(), msg_buffer.size()) ); + + const size_t vector_size = _deserializer->deserializeUInt32(); + + for(size_t i=0; ideserialize(BuiltinType::UINT64).convert(); + std::string channel_name; + _deserializer->deserializeString(channel_name); + std::string schema_text; + _deserializer->deserializeString(schema_text); + + auto dt_schema = DataTamerParser::BuilSchemaFromText(schema_text); + dt_schema.channel_name = channel_name; + _global_data_tamer_schemas.insert({dt_schema.hash, dt_schema}); + } +} + +void ParserROS::parseDataTamerSnapshotMsg(const PJ::MessageRef msg_buffer, double ×tamp) +{ + _deserializer->init( Span(msg_buffer.data(), msg_buffer.size()) ); + + DataTamerParser::SnapshotView snapshot; + + snapshot.timestamp = _deserializer->deserialize(BuiltinType::UINT64).convert(); + snapshot.schema_hash = _deserializer->deserialize(BuiltinType::UINT64).convert(); + + auto active_mask = _deserializer->deserializeByteSequence(); + snapshot.active_mask = {active_mask.data(), active_mask.size()}; + + auto payload = _deserializer->deserializeByteSequence(); + snapshot.payload = {payload.data(), payload.size()}; + + auto it = _global_data_tamer_schemas.find(snapshot.schema_hash); + if(it == _global_data_tamer_schemas.end()) + { + return; + } + const auto& dt_schema = it->second; + + const auto toDouble = [](const auto& value) { + return static_cast(value); + }; + + auto callback = [&](const std::string& name_field, + const DataTamerParser::VarNumber& value) + { + double timestamp = double(snapshot.timestamp) * 1e-9; + auto name = fmt::format("{}/{}/{}", _topic_name, dt_schema.channel_name, name_field); + getSeries(name).pushBack( {timestamp, std::visit(toDouble, value)} ); + }; + + DataTamerParser::ParseSnapshot(dt_schema, snapshot, callback); +} diff --git a/plotjuggler_plugins/ParserROS/ros_parser.h b/plotjuggler_plugins/ParserROS/ros_parser.h index a32b7c4ac..68bcf1429 100644 --- a/plotjuggler_plugins/ParserROS/ros_parser.h +++ b/plotjuggler_plugins/ParserROS/ros_parser.h @@ -28,16 +28,19 @@ class ParserROS : public PJ::MessageParser void parseHeader(PJ::Msg::Header& header); - void parseDiagnosticMsg(const PJ::MessageRef serialized_msg, double ×tamp); + void parseDiagnosticMsg(const PJ::MessageRef msg_buffer, double ×tamp); - void parseJointStateMsg(const PJ::MessageRef serialized_msg, double ×tamp); + void parseJointStateMsg(const PJ::MessageRef msg_buffer, double ×tamp); - void parseTF2Msg(const PJ::MessageRef serialized_msg, double ×tamp); + void parseTF2Msg(const PJ::MessageRef msg_buffer, double ×tamp); + + void parseDataTamerSchemasMsg(const PJ::MessageRef msg_buffer, double ×tamp); + + void parseDataTamerSnapshotMsg(const PJ::MessageRef msg_buffer, double ×tamp); + + std::function _customized_parser; bool _contains_quaternion = false; - bool _is_diangostic_msg = false; - bool _is_jointstate_msg = false; - bool _is_tf2_msg = false; }; #endif // ROS_PARSER_H diff --git a/plotjuggler_plugins/ParserROS/rosx_introspection/include/rosx_introspection/deserializer.hpp b/plotjuggler_plugins/ParserROS/rosx_introspection/include/rosx_introspection/deserializer.hpp index 306175eb3..4ad0c9bff 100644 --- a/plotjuggler_plugins/ParserROS/rosx_introspection/include/rosx_introspection/deserializer.hpp +++ b/plotjuggler_plugins/ParserROS/rosx_introspection/include/rosx_introspection/deserializer.hpp @@ -35,6 +35,8 @@ class Deserializer // deserialize the current pointer into a variant (not a string) virtual Variant deserialize(BuiltinType type) = 0; + virtual Span deserializeByteSequence() = 0; + // deserialize the current pointer into a string virtual void deserializeString(std::string& out) = 0; @@ -61,17 +63,19 @@ class ROS_Deserializer : public Deserializer { public: - virtual Variant deserialize(BuiltinType type) override; + Variant deserialize(BuiltinType type) override; + + void deserializeString(std::string& dst) override; - virtual void deserializeString(std::string& dst) override; + uint32_t deserializeUInt32() override; - virtual uint32_t deserializeUInt32() override; + Span deserializeByteSequence() override; - virtual const uint8_t* getCurrentPtr() const override; + const uint8_t* getCurrentPtr() const override; void jump(size_t bytes) override; - virtual void reset() override; + void reset() override; protected: @@ -100,13 +104,15 @@ class FastCDR_Deserializer : public Deserializer { public: - virtual Variant deserialize(BuiltinType type) override; + Variant deserialize(BuiltinType type) override; + + void deserializeString(std::string& dst) override; - virtual void deserializeString(std::string& dst) override; + uint32_t deserializeUInt32() override; - virtual uint32_t deserializeUInt32() override; + Span deserializeByteSequence() override; - virtual const uint8_t* getCurrentPtr() const override; + const uint8_t* getCurrentPtr() const override; void jump(size_t bytes) override; diff --git a/plotjuggler_plugins/ParserROS/rosx_introspection/src/deserializer.cpp b/plotjuggler_plugins/ParserROS/rosx_introspection/src/deserializer.cpp index e740a1883..d242f70cb 100644 --- a/plotjuggler_plugins/ParserROS/rosx_introspection/src/deserializer.cpp +++ b/plotjuggler_plugins/ParserROS/rosx_introspection/src/deserializer.cpp @@ -46,7 +46,7 @@ void ROS_Deserializer::deserializeString(std::string &dst) if( string_size > _bytes_left ) { - throw std::runtime_error("Buffer overrun in RosMsgParser::ReadFromBuffer"); + throw std::runtime_error("Buffer overrun in ROS_Deserializer::deserializeString"); } if (string_size == 0) { @@ -66,6 +66,21 @@ uint32_t ROS_Deserializer::deserializeUInt32() return deserialize(); } +Span ROS_Deserializer::deserializeByteSequence() +{ + uint32_t vect_size = deserialize(); + if( vect_size > _bytes_left ) + { + throw std::runtime_error("Buffer overrun in ROS_Deserializer::deserializeByteSequence"); + } + if (vect_size == 0) { + return {}; + } + Span out(_ptr, vect_size); + jump(vect_size); + return out; +} + const uint8_t *ROS_Deserializer::getCurrentPtr() const { return _ptr; @@ -142,6 +157,25 @@ uint32_t FastCDR_Deserializer::deserializeUInt32() return Deserialize(*_cdr); } +Span FastCDR_Deserializer::deserializeByteSequence() +{ +// thread_local std::vector tmp; +// _cdr->deserialize(tmp); +// return {tmp.data(), tmp.size()}; + + uint32_t seqLength = 0; + _cdr->deserialize(seqLength); + + // dirty trick to change the internal state of cdr + auto* ptr = _cdr->getCurrentPosition(); + + uint8_t dummy; + _cdr->deserialize(dummy); + + _cdr->jump(seqLength - 1); + return {reinterpret_cast(ptr), seqLength}; +} + const uint8_t *FastCDR_Deserializer::getCurrentPtr() const { return reinterpret_cast(_cdr->getBufferPointer());