Skip to content

Commit

Permalink
data_tamer added to rosx_introspection
Browse files Browse the repository at this point in the history
  • Loading branch information
facontidavide committed Nov 23, 2023
1 parent a5246dc commit f81bf2d
Show file tree
Hide file tree
Showing 9 changed files with 260 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ using VarNumber = std::variant<

VarNumber DeserializeToVarNumber(BasicType type, const void* src);

/// Return the number of bytes needed to serialize the type
size_t SizeOf(const BasicType& type);

const std::string& ToStr(const BasicType& type);

/// Convert string to its type
BasicType FromStr(const std::string &str);

/**
* @brief DataTamer uses a simple "flat" schema of key/value pairs (each pair is a "field").
*/
Expand Down Expand Up @@ -132,22 +124,6 @@ bool ParseSnapshot(const Schema& schema,
//---------------------------------------------------------
//---------------------------------------------------------

inline BasicType FromStr(const std::string& str)
{
static const auto kMap = []() {
std::unordered_map<std::string, BasicType> map;
for(size_t i=0; i<TypesCount; i++)
{
auto type = static_cast<BasicType>(i);
map[ToStr(type)] = type;
}
return map;
}();

auto const it = kMap.find(str);
return it == kMap.end() ? BasicType::OTHER : it->second;
}

template<typename T> inline
T Deserialize(BufferSpan& buffer)
{
Expand Down Expand Up @@ -192,30 +168,6 @@ inline VarNumber DeserializeToVarNumber(BasicType type,
return {};
}

inline size_t SizeOf(const BasicType& type)
{
static constexpr std::array<size_t, TypesCount> kSizes =
{ 1, 1,
1, 1,
2, 2, 4, 4, 8, 8,
4, 8, 0 };
return kSizes[static_cast<size_t>(type)];
}

const std::string& ToStr(const BasicType& type)
{
static const std::array<std::string, TypesCount> kNames = {
"bool", "char",
"int8", "uint8",
"int16", "uint16",
"int32", "uint32",
"int64", "uint64",
"float32", "float64",
"other"
};
return kNames[static_cast<size_t>(type)];
}

inline bool GetBit(BufferSpan mask, size_t index)
{
const uint8_t& byte = mask.data[index >> 3];
Expand Down Expand Up @@ -255,11 +207,6 @@ bool Schema::Field::operator==(const Field &other) const

inline Schema BuilSchemaFromText(const std::string& txt)
{
auto startWith = [](const std::string& str, const std::string& match) -> bool
{
auto pos = str.find(match);
return pos == 0;
};
auto trimString = [](std::string& str)
{
while(str.back() == ' ' || str.back() == '\r') {
Expand Down Expand Up @@ -287,68 +234,104 @@ inline Schema BuilSchemaFromText(const std::string& txt)
// we are not interested to this section of the schema
break;
}
if(line.find("__version__:") != std::string::npos)

// a single space is expected
auto space_pos = line.find(' ');
if(space_pos == std::string::npos)
{
throw std::runtime_error("Unexpected line: " + line);
}
std::string str_left = line.substr(0, space_pos);
std::string str_right = line.substr(space_pos+1, line.size() - (space_pos+1));
trimString(str_left);
trimString(str_right);

const std::string* str_type = &str_left;
const std::string* str_name = &str_right;

if(str_left == "__version__:")
{
// check compatibility
line.erase(0, sizeof("__version__:"));
if(std::stoi(line) != SCHEMA_VERSION)
if(std::stoi(str_right) != SCHEMA_VERSION)
{
throw std::runtime_error("Wrong SCHEMA_VERSION");
}
continue;
}
if(line.find("__hash__:") != std::string::npos)
if(str_left == "__hash__:")
{
// check compatibility
line.erase(0, sizeof("__hash__:"));
declared_schema = std::stoul(line);
declared_schema = std::stoul(str_right);
continue;
}

if(line.find("__channel_name__:") != std::string::npos)
if(str_left == "__channel_name__:")
{
// check compatibility
line.erase(0, sizeof("__channel_name__:"));
schema.channel_name = line;
schema.channel_name = str_right;
schema.hash = std::hash<std::string>()(schema.channel_name);
continue;
}

Schema::Field field;

static const std::array<std::string, TypesCount> kNamesNew = {
"bool", "char",
"int8", "uint8",
"int16", "uint16",
"int32", "uint32",
"int64", "uint64",
"float32", "float64",
"other"
};
// backcompatibility to old format
static const std::array<std::string, TypesCount> kNamesOld = {
"BOOL", "CHAR",
"INT8", "UINT8",
"INT16", "UINT16",
"INT32", "UINT32",
"INT64", "UINT64",
"FLOAT", "DOUBLE",
"OTHER"
};

for(size_t i=0; i<TypesCount; i++)
{
auto type = static_cast<BasicType>(i);
const auto& type_name = ToStr(type);
if(startWith(line, type_name))
if(str_left.find(kNamesNew[i]) == 0)
{
field.type = static_cast<BasicType>(i);
break;
}
if(str_right.find(kNamesOld[i]) == 0)
{
field.type = type;
field.type = static_cast<BasicType>(i);
std::swap(str_type, str_name);
break;
}
}

auto offset = line.find_first_of(" [");
if(field.type == BasicType::OTHER)
{
field.custom_type_name = line.substr(0, offset);
field.custom_type_name = *str_type;
}

if(line[offset]=='[')
auto offset = str_type->find_first_of(" [");
if(offset != std::string::npos && str_type->at(offset)=='[')
{
field.is_vector = true;
auto pos = line.find(']', offset);
auto pos = str_type->find(']', offset);
if(pos != offset+1)
{
// get number
std::string sub_string = line.substr(offset+1, pos - offset);
field.array_size = static_cast<uint16_t>(std::stoi(sub_string));
std::string number_string = line.substr(offset+1, pos - offset - 1);
field.array_size = static_cast<uint16_t>(std::stoi(number_string));
}
}
offset = line.find(' ', offset);
field.name = line.substr(offset + 1, line.size() - offset -1);

field.name = *str_name;
trimString(field.name);

// update the hash
// update the hash
schema.hash = AddFieldToHash(field, schema.hash);

schema.fields.push_back(field);
Expand All @@ -361,10 +344,10 @@ inline Schema BuilSchemaFromText(const std::string& txt)
}

template <typename NumberCallback, typename CustomCallback> inline
bool ParseSnapshot(const Schema& schema,
SnapshotView snapshot,
const NumberCallback& callback_number,
const CustomCallback& callback_custom)
bool ParseSnapshot(const Schema& schema,
SnapshotView snapshot,
const NumberCallback& callback_number,
const CustomCallback& callback_custom)
{
if(schema.hash != snapshot.schema_hash)
{
Expand All @@ -376,7 +359,7 @@ bool ParseSnapshot(const Schema& schema,
{
const auto& field = schema.fields[i];
if(GetBit(snapshot.active_mask, i))
{
{
if(!field.is_vector)
{
// regular field, not vector/array
Expand Down
2 changes: 1 addition & 1 deletion plotjuggler_base/include/PlotJuggler/messageparser_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace PJ
{
/*
* A messgaeParser is a clas that is able to convert a message received by
* A messageParser is a class that is able to convert a message received by
* a DataStreamer plugin into data in PlotDataMapRef.
*
* - Each data Source has its own instance of MessageParser
Expand Down
20 changes: 20 additions & 0 deletions plotjuggler_base/include/PlotJuggler/special_messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ struct JointState
static const char* id() { return "sensor_msgs/JointState"; }
};

//--------------------

struct DataTamerSchemas
{
// no need to save any additional information

static const char* id() { return "data_tamer_msgs/Schemas"; }
};

struct DataTamerSnapshot
{
std::string prefix;
uint64_t timestamp_nsec;
uint64_t schema_hash;
std::vector<uint8_t> active_mask;
std::vector<uint8_t> payload;

static const char* id() { return "data_tamer_msgs/Snapshot"; }
};

}

#endif // SPECIAL_MESSAGES_H
37 changes: 31 additions & 6 deletions plotjuggler_plugins/DataLoadMCAP/dataload_mcap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <QInputDialog>
#include <QPushButton>

#include "data_tamer_parser/data_tamer_parser.hpp"

#include "mcap/reader.hpp"
#include "dialog_mcap.h"

Expand Down Expand Up @@ -61,25 +63,40 @@ bool DataLoadMCAP::readDataFromFile(FileLoadInfo* info, PlotDataMapRef& plot_dat
}
auto statistics = reader.statistics();

std::unordered_map<int, mcap::SchemaPtr> schemas; // schema_id
std::unordered_map<int, mcap::SchemaPtr> mcap_schemas; // schema_id
std::unordered_map<int, mcap::ChannelPtr> channels; // channel_id
std::unordered_map<int, MessageParserPtr> parsers_by_channel; // channel_id

for (const auto& [schema_id, shema_ptr] : reader.schemas())
std::unordered_map<int, DataTamerParser::Schema> dt_schames;
int total_dt_schemas = 0;

std::unordered_set<mcap::ChannelId> channels_containing_datatamer_schema;
std::unordered_set<mcap::ChannelId> channels_containing_datatamer_data;

for (const auto& [schema_id, schema_ptr] : reader.schemas())
{
schemas.insert( {schema_id, shema_ptr} );
mcap_schemas.insert( {schema_id, schema_ptr} );
}

std::set<QString> notified_encoding_problem;

for (const auto& [channel_id, channel_ptr] : reader.channels())
{
channels.insert( {channel_id, channel_ptr} );
const auto& schema = schemas.at(channel_ptr->schemaId);
const auto& schema = mcap_schemas.at(channel_ptr->schemaId);
const auto& topic_name = channel_ptr->topic;
std::string definition(reinterpret_cast<const char*>(schema->data.data()),
schema->data.size());

if(schema->name == "data_tamer_msgs/msg/Schemas")
{
channels_containing_datatamer_schema.insert(channel_id);
total_dt_schemas += statistics->channelMessageCounts.at(channel_id);
}
if(schema->name == "data_tamer_msgs/msg/Snapshot")
{
channels_containing_datatamer_data.insert(channel_id);
}

QString channel_encoding = QString::fromStdString(channel_ptr->messageEncoding);
QString schema_encoding = QString::fromStdString(schema->encoding);
Expand Down Expand Up @@ -112,7 +129,7 @@ bool DataLoadMCAP::readDataFromFile(FileLoadInfo* info, PlotDataMapRef& plot_dat
parsers_by_channel.insert( {channel_ptr->id, parser} );
};

DialogMCAP dialog(channels, schemas);
DialogMCAP dialog(channels, mcap_schemas);
auto ret = dialog.exec();
if (ret != QDialog::Accepted)
{
Expand Down Expand Up @@ -165,7 +182,6 @@ bool DataLoadMCAP::readDataFromFile(FileLoadInfo* info, PlotDataMapRef& plot_dat

// MCAP always represents publishTime in nanoseconds
double timestamp_sec = double(msg_view.message.publishTime) * 1e-9;

auto parser_it = parsers_by_channel.find(msg_view.channel->id);
if( parser_it == parsers_by_channel.end() )
{
Expand All @@ -177,6 +193,15 @@ bool DataLoadMCAP::readDataFromFile(FileLoadInfo* info, PlotDataMapRef& plot_dat
MessageRef msg(msg_view.message.data, msg_view.message.dataSize);
parser->parseMessage(msg, timestamp_sec);

// data tamer schema
if( channels_containing_datatamer_schema.count(msg_view.channel->id) != 0)
{


}

// regular message

if (msg_count++ % 1000 == 0)
{
QApplication::processEvents();
Expand Down
2 changes: 1 addition & 1 deletion plotjuggler_plugins/ParserDataTamer/datatamer_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <iostream>

#include "datatamer_parser.h"
#include "contrib/data_tamer_parser.hpp"
#include "data_tamer_parser/data_tamer_parser.hpp"
#include "PlotJuggler/fmt/format.h"

using namespace PJ;
Expand Down
Loading

0 comments on commit f81bf2d

Please sign in to comment.