Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate web_video_server into a component and an executable #168

Merged
merged 2 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions CMakeLists.txt
bjsowa marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ find_package(async_web_server_cpp REQUIRED)
find_package(cv_bridge REQUIRED)
find_package(image_transport REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclcpp_components REQUIRED)
find_package(sensor_msgs REQUIRED)

find_package(OpenCV REQUIRED)
Expand Down Expand Up @@ -43,7 +44,7 @@ include_directories(include
)

## Declare a cpp executable
bjsowa marked this conversation as resolved.
Show resolved Hide resolved
add_executable(${PROJECT_NAME}
add_library(${PROJECT_NAME}_lib SHARED
src/web_video_server.cpp
src/image_streamer.cpp
src/libav_streamer.cpp
Expand All @@ -57,12 +58,8 @@ add_executable(${PROJECT_NAME}
src/utils.cpp
)

ament_target_dependencies(${PROJECT_NAME}
sensor_msgs
)

## Specify libraries to link a library or executable target against
target_link_libraries(${PROJECT_NAME}
target_link_libraries(${PROJECT_NAME}_lib
async_web_server_cpp::async_web_server_cpp
cv_bridge::cv_bridge
image_transport::image_transport
Expand All @@ -76,12 +73,43 @@ target_link_libraries(${PROJECT_NAME}
${swscale_LIBRARIES}
)

## Declare a cpp executable
add_executable(${PROJECT_NAME}
src/web_video_server_node.cpp
)

target_link_libraries(${PROJECT_NAME}
${PROJECT_NAME}_lib
)

ament_target_dependencies(${PROJECT_NAME}_lib
rclcpp_components
sensor_msgs
)

rclcpp_components_register_nodes(${PROJECT_NAME}_lib "web_video_server::WebVideoServer")

#############
## Install ##
#############

## Mark executables and/or libraries for installation
install(TARGETS ${PROJECT_NAME}
install(
DIRECTORY include/
DESTINATION include/${PROJECT_NAME}
)

install(
TARGETS ${PROJECT_NAME}_lib
EXPORT export_${PROJECT_NAME}
LIBRARY DESTINATION lib
ARCHIVE DESTINATION lib
RUNTIME DESTINATION bin
)
ament_export_targets(export_${PROJECT_NAME} HAS_LIBRARY_TARGET)

install(
TARGETS ${PROJECT_NAME}
DESTINATION lib/${PROJECT_NAME}
)

Expand Down
10 changes: 2 additions & 8 deletions include/web_video_server/web_video_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,20 @@ namespace web_video_server
* @class WebVideoServer
* @brief
*/
class WebVideoServer
class WebVideoServer : public rclcpp::Node
{
public:
/**
* @brief Constructor
* @return
*/
explicit WebVideoServer(rclcpp::Node::SharedPtr & node);
explicit WebVideoServer(const rclcpp::NodeOptions & options);

/**
* @brief Destructor - Cleans up
*/
virtual ~WebVideoServer();

/**
* @brief Starts the server and spins
*/
void spin();

void setup_cleanup_inactive_streams();

bool handle_request(
Expand Down Expand Up @@ -104,7 +99,6 @@ class WebVideoServer
void restreamFrames(double max_age);
void cleanup_inactive_streams();

rclcpp::Node::SharedPtr node_;
rclcpp::WallTimer<rclcpp::VoidCallbackType>::SharedPtr cleanup_timer_;

// Parameters
Expand Down
2 changes: 2 additions & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
<buildtool_depend>ament_cmake_ros</buildtool_depend>

<build_depend>rclcpp</build_depend>
<build_depend>rclcpp_components</build_depend>
<build_depend>cv_bridge</build_depend>
<build_depend>image_transport</build_depend>
<build_depend>async_web_server_cpp</build_depend>
<build_depend>ffmpeg</build_depend>
<build_depend>sensor_msgs</build_depend>

<exec_depend>rclcpp</exec_depend>
<exec_depend>rclcpp_components</exec_depend>
<exec_depend>cv_bridge</exec_depend>
<exec_depend>image_transport</exec_depend>
<exec_depend>async_web_server_cpp</exec_depend>
Expand Down
94 changes: 42 additions & 52 deletions src/web_video_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <boost/algorithm/string/predicate.hpp>
#include <opencv2/opencv.hpp>

#include "rclcpp/rclcpp.hpp"

#include "sensor_msgs/image_encodings.hpp"
#include "web_video_server/ros_compressed_streamer.hpp"
#include "web_video_server/jpeg_streamers.hpp"
Expand All @@ -51,26 +53,26 @@ using namespace boost::placeholders; // NOLINT
namespace web_video_server
{

WebVideoServer::WebVideoServer(rclcpp::Node::SharedPtr & node)
: node_(node), handler_group_(
WebVideoServer::WebVideoServer(const rclcpp::NodeOptions & options)
: rclcpp::Node("web_video_server", options), handler_group_(
async_web_server_cpp::HttpReply::stock_reply(async_web_server_cpp::HttpReply::not_found))
{
node_->declare_parameter("port", 8080);
node_->declare_parameter("verbose", true);
node_->declare_parameter("address", "0.0.0.0");
node_->declare_parameter("server_threads", 1);
node_->declare_parameter("ros_threads", 2);
node_->declare_parameter("publish_rate", -1.0);
node_->declare_parameter("default_stream_type", "mjpeg");

node_->get_parameter("port", port_);
node_->get_parameter("verbose", verbose_);
node_->get_parameter("address", address_);
declare_parameter("port", 8080);
declare_parameter("verbose", true);
declare_parameter("address", "0.0.0.0");
declare_parameter("server_threads", 1);
declare_parameter("ros_threads", 2);
declare_parameter("publish_rate", -1.0);
declare_parameter("default_stream_type", "mjpeg");

get_parameter("port", port_);
get_parameter("verbose", verbose_);
get_parameter("address", address_);
int server_threads;
node_->get_parameter("server_threads", server_threads);
node_->get_parameter("ros_threads", ros_threads_);
node_->get_parameter("publish_rate", publish_rate_);
node_->get_parameter("default_stream_type", default_stream_type_);
get_parameter("server_threads", server_threads);
get_parameter("ros_threads", ros_threads_);
bjsowa marked this conversation as resolved.
Show resolved Hide resolved
get_parameter("publish_rate", publish_rate_);
get_parameter("default_stream_type", default_stream_type_);

stream_types_["mjpeg"] = std::make_shared<MjpegStreamerType>();
stream_types_["png"] = std::make_shared<PngStreamerType>();
Expand Down Expand Up @@ -102,33 +104,26 @@ WebVideoServer::WebVideoServer(rclcpp::Node::SharedPtr & node)
);
} catch (boost::exception & e) {
RCLCPP_ERROR(
node_->get_logger(), "Exception when creating the web server! %s:%d",
get_logger(), "Exception when creating the web server! %s:%d",
address_.c_str(), port_);
throw;
}
RCLCPP_INFO(get_logger(), "Waiting For connections on %s:%d", address_.c_str(), port_);
if (publish_rate_ > 0) {
create_wall_timer(1s / publish_rate_, [this]() {restreamFrames(1.0 / publish_rate_);});
}
server_->run();
bjsowa marked this conversation as resolved.
Show resolved Hide resolved
}

WebVideoServer::~WebVideoServer()
{
server_->stop();
}

void WebVideoServer::setup_cleanup_inactive_streams()
{
std::function<void()> callback = std::bind(&WebVideoServer::cleanup_inactive_streams, this);
cleanup_timer_ = node_->create_wall_timer(500ms, callback);
}

void WebVideoServer::spin()
{
server_->run();
RCLCPP_INFO(node_->get_logger(), "Waiting For connections on %s:%d", address_.c_str(), port_);
rclcpp::executors::MultiThreadedExecutor spinner(rclcpp::ExecutorOptions(), ros_threads_);
spinner.add_node(node_);
if (publish_rate_ > 0) {
node_->create_wall_timer(1s / publish_rate_, [this]() {restreamFrames(1.0 / publish_rate_);});
}
spinner.spin();
server_->stop();
cleanup_timer_ = create_wall_timer(500ms, callback);
}

void WebVideoServer::restreamFrames(double max_age)
Expand All @@ -149,7 +144,7 @@ void WebVideoServer::cleanup_inactive_streams()
[](const std::shared_ptr<ImageStreamer> & streamer) {return !streamer->isInactive();});
if (verbose_) {
for (auto itr = new_end; itr < image_subscribers_.end(); ++itr) {
RCLCPP_INFO(node_->get_logger(), "Removed Stream: %s", (*itr)->getTopic().c_str());
RCLCPP_INFO(get_logger(), "Removed Stream: %s", (*itr)->getTopic().c_str());
}
}
image_subscribers_.erase(new_end, image_subscribers_.end());
Expand All @@ -162,13 +157,13 @@ bool WebVideoServer::handle_request(
const char * end)
{
if (verbose_) {
RCLCPP_INFO(node_->get_logger(), "Handling Request: %s", request.uri.c_str());
RCLCPP_INFO(get_logger(), "Handling Request: %s", request.uri.c_str());
}

try {
return handler_group_(request, connection, begin, end);
} catch (std::exception & e) {
RCLCPP_WARN(node_->get_logger(), "Error Handling Request: %s", e.what());
RCLCPP_WARN(get_logger(), "Error Handling Request: %s", e.what());
return false;
}
}
Expand All @@ -184,7 +179,7 @@ bool WebVideoServer::handle_stream(
// Fallback for topics without corresponding compressed topics
if (type == std::string("ros_compressed")) {
std::string compressed_topic_name = topic + "/compressed";
auto tnat = node_->get_topic_names_and_types();
auto tnat = get_topic_names_and_types();
bool did_find_compressed_topic = false;
for (auto topic_and_types : tnat) {
if (topic_and_types.second.size() > 1) {
Expand All @@ -201,13 +196,13 @@ bool WebVideoServer::handle_stream(
}
if (!did_find_compressed_topic) {
RCLCPP_WARN(
node_->get_logger(),
get_logger(),
"Could not find compressed image topic for %s, falling back to mjpeg", topic.c_str());
type = "mjpeg";
}
}
std::shared_ptr<ImageStreamer> streamer = stream_types_[type]->create_streamer(
request, connection, node_);
request, connection, shared_from_this());
bjsowa marked this conversation as resolved.
Show resolved Hide resolved
streamer->start();
std::scoped_lock lock(subscriber_mutex_);
image_subscribers_.push_back(streamer);
Expand All @@ -224,7 +219,7 @@ bool WebVideoServer::handle_snapshot(
const char * end)
{
std::shared_ptr<ImageStreamer> streamer = std::make_shared<JpegSnapshotStreamer>(
request, connection, node_);
request, connection, shared_from_this());
streamer->start();

std::scoped_lock lock(subscriber_mutex_);
Expand All @@ -243,7 +238,7 @@ bool WebVideoServer::handle_stream_viewer(
// Fallback for topics without corresponding compressed topics
if (type == std::string("ros_compressed")) {
std::string compressed_topic_name = topic + "/compressed";
auto tnat = node_->get_topic_names_and_types();
auto tnat = get_topic_names_and_types();
bool did_find_compressed_topic = false;
for (auto topic_and_types : tnat) {
if (topic_and_types.second.size() > 1) {
Expand All @@ -260,7 +255,7 @@ bool WebVideoServer::handle_stream_viewer(
}
if (!did_find_compressed_topic) {
RCLCPP_WARN(
node_->get_logger(),
get_logger(),
"Could not find compressed image topic for %s, falling back to mjpeg", topic.c_str());
type = "mjpeg";
}
Expand Down Expand Up @@ -292,7 +287,7 @@ bool WebVideoServer::handle_list_streams(
{
std::vector<std::string> image_topics;
std::vector<std::string> camera_info_topics;
auto tnat = node_->get_topic_names_and_types();
auto tnat = get_topic_names_and_types();
for (auto topic_and_types : tnat) {
if (topic_and_types.second.size() > 1) {
// skip over topics with more than one type
Expand Down Expand Up @@ -380,14 +375,9 @@ bool WebVideoServer::handle_list_streams(

} // namespace web_video_server

int main(int argc, char ** argv)
{
rclcpp::init(argc, argv);
auto node = std::make_shared<rclcpp::Node>("web_video_server");

web_video_server::WebVideoServer server(node);
server.setup_cleanup_inactive_streams();
server.spin();
#include "rclcpp_components/register_node_macro.hpp"

return 0;
}
// Register the component with class_loader.
// This acts as a sort of entry point, allowing the component to be discoverable when its library
// is being loaded into a running process.
RCLCPP_COMPONENTS_REGISTER_NODE(web_video_server::WebVideoServer)
47 changes: 47 additions & 0 deletions src/web_video_server_node.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2014, Worcester Polytechnic Institute
// Copyright (c) 2024, The Robot Web Tools Contributors
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// * Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
#include "rclcpp/rclcpp.hpp"

#include "web_video_server/web_video_server.hpp"

int main(int argc, char ** argv)
{
rclcpp::init(argc, argv);
auto node = std::make_shared<web_video_server::WebVideoServer>(rclcpp::NodeOptions());
node->setup_cleanup_inactive_streams();
bjsowa marked this conversation as resolved.
Show resolved Hide resolved

int ros_threads;
node->get_parameter("ros_threads", ros_threads);
rclcpp::executors::MultiThreadedExecutor spinner(rclcpp::ExecutorOptions(), ros_threads);
spinner.add_node(node);
spinner.spin();

return 0;
}