Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add "rosx_introspection" to support JSON encoding #307

Merged
merged 10 commits into from
Jul 29, 2024
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2")
find_package(rclcpp REQUIRED)
find_package(rclcpp_components REQUIRED)
find_package(resource_retriever REQUIRED)
find_package(rosx_introspection REQUIRED)

add_library(foxglove_bridge_component SHARED
ros2_foxglove_bridge/src/message_definition_cache.cpp
Expand All @@ -168,7 +169,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2")
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/ros2_foxglove_bridge/include>
$<INSTALL_INTERFACE:include>
)
ament_target_dependencies(foxglove_bridge_component ament_index_cpp rclcpp rclcpp_components resource_retriever rosgraph_msgs)
ament_target_dependencies(foxglove_bridge_component ament_index_cpp rclcpp rclcpp_components resource_retriever rosgraph_msgs rosx_introspection)
target_link_libraries(foxglove_bridge_component foxglove_bridge_base)
rclcpp_components_register_nodes(foxglove_bridge_component "foxglove_bridge::FoxgloveBridge")
enable_strict_compiler_warnings(foxglove_bridge_component)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,10 +761,10 @@ inline void Server<ServerConfiguration>::handleBinaryMessage(ConnHandle hdl, Mes
length,
data};
_handlers.clientMessageHandler(clientMessage, hdl);
} catch (const ServiceError& e) {
} catch (const ClientChannelError& e) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
} catch (...) {
sendStatusAndLogMsg(hdl, StatusLevel::Error, "callService: Failed to execute handler");
sendStatusAndLogMsg(hdl, StatusLevel::Error, "clientMessage: Failed to execute handler");
}
} break;
case ClientBinaryOpcode::SERVICE_CALL_REQUEST: {
Expand Down
3 changes: 3 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
<depend condition="$ROS_VERSION == 2">rclcpp</depend>
<depend condition="$ROS_VERSION == 2">rclcpp_components</depend>

<depend condition="$ROS_VERSION == 2">rosx_introspection</depend>


<!-- Test dependencies -->
<test_depend condition="$ROS_VERSION == 1">gtest</test_depend>
<test_depend condition="$ROS_VERSION == 1">rostest</test_depend>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <rclcpp/rclcpp.hpp>
#include <rosgraph_msgs/msg/clock.hpp>
#include <rosx_introspection/ros_parser.hpp>
#include <websocketpp/common/connection_hdl.hpp>

#include <foxglove_bridge/callback_queue.hpp>
Expand Down Expand Up @@ -83,6 +84,7 @@ class FoxgloveBridge : public rclcpp::Node {
bool _includeHidden = false;
bool _disableLoanMessage = true;
std::unique_ptr<foxglove::CallbackQueue> _fetchAssetQueue;
std::unordered_map<std::string, std::shared_ptr<RosMsgParser::Parser>> _jsonParsers;
std::atomic<bool> _shuttingDown = false;

void subscribeConnectionGraph(bool subscribe);
Expand Down
102 changes: 89 additions & 13 deletions ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ FoxgloveBridge::FoxgloveBridge(const rclcpp::NodeOptions& options)
if (_useSimTime) {
serverOptions.capabilities.push_back(foxglove::CAPABILITY_TIME);
}
serverOptions.supportedEncodings = {"cdr"};
serverOptions.supportedEncodings = {"cdr", "json"};
serverOptions.metadata = {{"ROS_DISTRO", rosDistro}};
serverOptions.sendBufferLimitBytes = send_buffer_limit;
serverOptions.sessionId = std::to_string(std::time(nullptr));
Expand Down Expand Up @@ -266,7 +266,6 @@ void FoxgloveBridge::updateAdvertisedTopics(
topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), err.what());
continue;
}

channelsToAdd.push_back(newChannel);
}

Expand Down Expand Up @@ -602,6 +601,42 @@ void FoxgloveBridge::clientAdvertise(const foxglove::ClientAdvertisement& advert
std::to_string(advertisement.channelId) + " it had already advertised");
}

if (advertisement.schemaName.empty()) {
throw foxglove::ClientChannelError(
advertisement.channelId,
"Received client advertisement from " + _server->remoteEndpointString(hdl) + " for channel " +
std::to_string(advertisement.channelId) + " with empty schema name");
}

if (advertisement.encoding == "json") {
// register the JSON parser for this schemaName
auto parserIt = _jsonParsers.find(advertisement.schemaName);
if (parserIt == _jsonParsers.end()) {
const auto& schemaName = advertisement.schemaName;
std::string schema = "";

if (!advertisement.schema.empty()) {
// Schema is given by the advertisement
schema = std::string(reinterpret_cast<const char*>(advertisement.schema.data()),
advertisement.schema.size());
} else {
// Schema not given, look it up.
auto [format, msgDefinition] = _messageDefinitionCache.get_full_text(schemaName);
if (format != foxglove::MessageDefinitionFormat::MSG) {
throw foxglove::ClientChannelError(
advertisement.channelId,
"Message definition (.msg) for schema " + schemaName + " not found.");
}

schema = msgDefinition;
}

auto parser = std::make_shared<RosMsgParser::Parser>(
advertisement.topic, RosMsgParser::ROSType(schemaName), schema);
_jsonParsers.insert({schemaName, parser});
}
}

try {
// Create a new topic advertisement
const auto& topicName = advertisement.topic;
Expand Down Expand Up @@ -629,9 +664,10 @@ void FoxgloveBridge::clientAdvertise(const foxglove::ClientAdvertisement& advert
publisherOptions.callback_group = _clientPublishCallbackGroup;
auto publisher = this->create_generic_publisher(topicName, topicType, qos, publisherOptions);

RCLCPP_INFO(this->get_logger(), "Client %s is advertising \"%s\" (%s) on channel %d",
RCLCPP_INFO(this->get_logger(),
"Client %s is advertising \"%s\" (%s) on channel %d with encoding \"%s\"",
_server->remoteEndpointString(hdl).c_str(), topicName.c_str(), topicType.c_str(),
advertisement.channelId);
advertisement.channelId, advertisement.encoding.c_str());

// Store the new topic advertisement
clientPublications.emplace(advertisement.channelId, std::move(publisher));
Expand Down Expand Up @@ -704,17 +740,57 @@ void FoxgloveBridge::clientMessage(const foxglove::ClientMessage& message, Conne
publisher = it2->second;
}

// Copy the message payload into a SerializedMessage object
rclcpp::SerializedMessage serializedMessage{message.getLength()};
auto& rclSerializedMsg = serializedMessage.get_rcl_serialized_message();
std::memcpy(rclSerializedMsg.buffer, message.getData(), message.getLength());
rclSerializedMsg.buffer_length = message.getLength();
auto publishMessage = [publisher, this](const void* data, size_t size) {
// Copy the message payload into a SerializedMessage object
rclcpp::SerializedMessage serializedMessage{size};
auto& rclSerializedMsg = serializedMessage.get_rcl_serialized_message();
std::memcpy(rclSerializedMsg.buffer, data, size);
rclSerializedMsg.buffer_length = size;
// Publish the message
if (_disableLoanMessage || !publisher->can_loan_messages()) {
publisher->publish(serializedMessage);
} else {
publisher->publish_as_loaned_msg(serializedMessage);
}
};

// Publish the message
if (_disableLoanMessage || !publisher->can_loan_messages()) {
publisher->publish(serializedMessage);
if (message.advertisement.encoding == "cdr") {
publishMessage(message.getData(), message.getLength());
} else if (message.advertisement.encoding == "json") {
// get the specific parser for this schemaName
std::shared_ptr<RosMsgParser::Parser> parser;
{
std::lock_guard<std::mutex> lock(_clientAdvertisementsMutex);
auto parserIt = _jsonParsers.find(message.advertisement.schemaName);
if (parserIt != _jsonParsers.end()) {
parser = parserIt->second;
}
}
if (!parser) {
throw foxglove::ClientChannelError(message.advertisement.channelId,
"Dropping client message from " +
_server->remoteEndpointString(hdl) +
" with encoding \"json\": no parser found");
} else {
thread_local RosMsgParser::ROS2_Serializer serializer;
serializer.reset();
const std::string jsonMessage(reinterpret_cast<const char*>(message.getData()),
message.getLength());
try {
parser->serializeFromJson(jsonMessage, &serializer);
publishMessage(serializer.getBufferData(), serializer.getBufferSize());
} catch (const std::exception& ex) {
throw foxglove::ClientChannelError(message.advertisement.channelId,
"Dropping client message from " +
_server->remoteEndpointString(hdl) +
" with encoding \"json\": " + ex.what());
}
}
} else {
publisher->publish_as_loaned_msg(serializedMessage);
throw foxglove::ClientChannelError(
message.advertisement.channelId,
"Dropping client message from " + _server->remoteEndpointString(hdl) +
" with unknown encoding \"" + message.advertisement.encoding + "\"");
}
}

Expand Down
63 changes: 48 additions & 15 deletions ros2_foxglove_bridge/tests/smoke_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
constexpr char URI[] = "ws://localhost:8765";

// Binary representation of std_msgs/msg/String for "hello world"
constexpr uint8_t HELLO_WORLD_BINARY[] = {0, 1, 0, 0, 12, 0, 0, 0, 104, 101,
108, 108, 111, 32, 119, 111, 114, 108, 100, 0};
constexpr uint8_t HELLO_WORLD_CDR[] = {0, 1, 0, 0, 12, 0, 0, 0, 104, 101,
108, 108, 111, 32, 119, 111, 114, 108, 100, 0};
constexpr char HELLO_WORLD_JSON[] = "{\"data\": \"hello world\"}";
constexpr char STD_MSGS_STRING_SCHEMA[] = "data string";

constexpr auto ONE_SECOND = std::chrono::seconds(1);
constexpr auto DEFAULT_TIMEOUT = std::chrono::seconds(10);
Expand Down Expand Up @@ -124,7 +126,11 @@ class ServiceTest : public TestWithExecutor {
std::shared_ptr<foxglove::Client<websocketpp::config::asio_client>> _wsClient;
};

class ExistingPublisherTest : public TestWithExecutor {
class PublisherTest
: public TestWithExecutor,
public testing::WithParamInterface<std::pair<std::string, std::vector<uint8_t>>> {};

class ExistingPublisherTest : public PublisherTest {
public:
inline static const std::string TOPIC_NAME = "/some_topic";

Expand Down Expand Up @@ -198,8 +204,8 @@ TEST(SmokeTest, testSubscription) {
client->subscribe({{subscriptionId, channel.id}});
ASSERT_EQ(std::future_status::ready, msgFuture.wait_for(ONE_SECOND));
const auto msgData = msgFuture.get();
ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size()));
ASSERT_EQ(sizeof(HELLO_WORLD_CDR), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_CDR, msgData.data(), msgData.size()));

// Unsubscribe from the channel again.
client->unsubscribe({subscriptionId});
Expand Down Expand Up @@ -243,21 +249,25 @@ TEST(SmokeTest, testSubscriptionParallel) {
for (auto& future : futures) {
ASSERT_EQ(std::future_status::ready, future.wait_for(DEFAULT_TIMEOUT));
auto msgData = future.get();
ASSERT_EQ(sizeof(HELLO_WORLD_BINARY), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_BINARY, msgData.data(), msgData.size()));
ASSERT_EQ(sizeof(HELLO_WORLD_CDR), msgData.size());
EXPECT_EQ(0, std::memcmp(HELLO_WORLD_CDR, msgData.data(), msgData.size()));
}

for (auto client : clients) {
client->unsubscribe({subscriptionId});
}
}

TEST_F(TestWithExecutor, testPublishing) {
TEST_P(PublisherTest, testPublishing) {
const auto& [encoding, message] = GetParam();

foxglove::ClientAdvertisement advertisement;
advertisement.channelId = 1;
advertisement.topic = "/foo";
advertisement.encoding = "cdr";
advertisement.schemaName = "std_msgs/String";
advertisement.encoding = encoding;
advertisement.schemaName = "std_msgs/msg/String";
advertisement.schema =
std::vector<uint8_t>(STD_MSGS_STRING_SCHEMA, std::end(STD_MSGS_STRING_SCHEMA));

// Set up a ROS node with a subscriber
std::promise<std::string> msgPromise;
Expand All @@ -279,7 +289,7 @@ TEST_F(TestWithExecutor, testPublishing) {
ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND));

// Publish the message and unadvertise again
client->publish(advertisement.channelId, HELLO_WORLD_BINARY, sizeof(HELLO_WORLD_BINARY));
client->publish(advertisement.channelId, message.data(), message.size());
client->unadvertise({advertisement.channelId});

// Ensure that we have received the correct message via our ROS subscriber
Expand All @@ -288,12 +298,25 @@ TEST_F(TestWithExecutor, testPublishing) {
EXPECT_EQ("hello world", msgFuture.get());
}

TEST_F(ExistingPublisherTest, testPublishingWithExistingPublisher) {
INSTANTIATE_TEST_SUITE_P(
TestPublishingCDR, PublisherTest,
testing::Values(std::make_pair("cdr", std::vector<uint8_t>(HELLO_WORLD_CDR,
std::end(HELLO_WORLD_CDR)))));

INSTANTIATE_TEST_SUITE_P(
TestPublishingJSON, PublisherTest,
testing::Values(std::make_pair("json", std::vector<uint8_t>(HELLO_WORLD_JSON,
std::end(HELLO_WORLD_JSON)))));

TEST_P(ExistingPublisherTest, testPublishingWithExistingPublisher) {
const auto& [encoding, message] = GetParam();

foxglove::ClientAdvertisement advertisement;
advertisement.channelId = 1;
advertisement.topic = TOPIC_NAME;
advertisement.encoding = "cdr";
advertisement.schemaName = "std_msgs/String";
advertisement.encoding = encoding;
advertisement.schemaName = "std_msgs/msg/String";
advertisement.schema = {}; // Schema intentionally left empty.

// Set up a ROS node with a subscriber
std::promise<std::string> msgPromise;
Expand All @@ -316,7 +339,7 @@ TEST_F(ExistingPublisherTest, testPublishingWithExistingPublisher) {
ASSERT_EQ(std::future_status::ready, channelFuture.wait_for(ONE_SECOND));

// Publish the message and unadvertise again
client->publish(advertisement.channelId, HELLO_WORLD_BINARY, sizeof(HELLO_WORLD_BINARY));
client->publish(advertisement.channelId, message.data(), message.size());
client->unadvertise({advertisement.channelId});

// Ensure that we have received the correct message via our ROS subscriber
Expand All @@ -325,6 +348,16 @@ TEST_F(ExistingPublisherTest, testPublishingWithExistingPublisher) {
EXPECT_EQ("hello world", msgFuture.get());
}

INSTANTIATE_TEST_SUITE_P(
ExistingPublisherTestCDR, ExistingPublisherTest,
testing::Values(std::make_pair("cdr", std::vector<uint8_t>(HELLO_WORLD_CDR,
std::end(HELLO_WORLD_CDR)))));

INSTANTIATE_TEST_SUITE_P(
ExistingPublisherTestJSON, ExistingPublisherTest,
testing::Values(std::make_pair("json", std::vector<uint8_t>(HELLO_WORLD_JSON,
std::end(HELLO_WORLD_JSON)))));

TEST_F(ParameterTest, testGetAllParams) {
const std::string requestId = "req-testGetAllParams";
auto future = foxglove::waitForParameters(_wsClient, requestId);
Expand Down
Loading