Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix subscribe/unsubscribe behavior #80

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 59 additions & 44 deletions src/video_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ boost::shared_ptr<dynamic_reconfigure::Server<VideoStreamConfig> > dyn_srv;
VideoStreamConfig config;
std::mutex q_mutex, s_mutex, c_mutex, p_mutex;
std::queue<cv::Mat> framesQueue;
bool has_captured;
cv::Mat frame;
boost::shared_ptr<cv::VideoCapture> cap;
std::string video_stream_provider;
Expand Down Expand Up @@ -105,7 +106,7 @@ virtual sensor_msgs::CameraInfo get_default_camera_info_from_image(sensor_msgs::

virtual void do_capture() {
NODELET_DEBUG("Capture thread started");
cv::Mat frame;
cv::Mat capture_frame;
VideoStreamConfig latest_config = config;
ros::Rate camera_fps_rate(latest_config.set_camera_fps);

Expand All @@ -122,13 +123,18 @@ virtual void do_capture() {
cv::waitKey(100);
continue;
}
if (!cap->read(frame)) {
NODELET_ERROR_STREAM_THROTTLE(1.0, "Could not capture frame (frame_counter: " << frame_counter << ")");
if (latest_config.reopen_on_read_failure) {
NODELET_WARN_STREAM_THROTTLE(1.0, "trying to reopen the device");
unsubscribe();
subscribe();
try {
if (!cap->read(capture_frame)) {
NODELET_ERROR_STREAM_THROTTLE(1.0, "Could not capture frame (frame_counter: " << frame_counter << ")");
if (latest_config.reopen_on_read_failure) {
NODELET_WARN_STREAM_THROTTLE(1.0, "trying to reopen the device");
unsubscribe();
subscribe();
}
}
} catch (const cv::Exception &e) {
NODELET_WARN("Error reading frame");
continue;
}

frame_counter++;
Expand All @@ -153,20 +159,22 @@ virtual void do_capture() {
}
}

if(!frame.empty()) {
if(!capture_frame.empty()) {
std::lock_guard<std::mutex> g(q_mutex);
// accumulate only until max_queue_size
while (framesQueue.size() >= latest_config.buffer_queue_size) {
framesQueue.pop();
}
framesQueue.push(frame.clone());
framesQueue.push(capture_frame.clone());
has_captured = true;
}
}
NODELET_DEBUG("Capture thread finished");
}

virtual void do_publish(const ros::TimerEvent& event) {
bool is_new_image = false;
bool ready_to_publish = false;
sensor_msgs::ImagePtr msg;
std_msgs::Header header;
VideoStreamConfig latest_config;
Expand All @@ -184,10 +192,11 @@ virtual void do_publish(const ros::TimerEvent& event) {
framesQueue.pop();
is_new_image = true;
}
ready_to_publish = !frame.empty() && has_captured;
}

// Check if grabbed frame is actually filled with some content
if(!frame.empty() && is_new_image) {
if(ready_to_publish && is_new_image) {
// From http://docs.opencv.org/modules/core/doc/operations_on_arrays.html#void flip(InputArray src, OutputArray dst, int flipCode)
// FLIP_HORIZONTAL == 1, FLIP_VERTICAL == 0 or FLIP_BOTH == -1
// Flip the image if necessary
Expand Down Expand Up @@ -220,7 +229,7 @@ virtual void do_publish(const ros::TimerEvent& event) {
}
}

virtual void subscribe() {
virtual bool subscribe() {
ROS_DEBUG("Subscribe");
VideoStreamConfig& latest_config = config;
// initialize camera info publisher
Expand Down Expand Up @@ -263,8 +272,8 @@ virtual void subscribe() {
cap->set(cv::CAP_PROP_POS_FRAMES, latest_config.start_frame);
}
if (!cap->isOpened()) {
NODELET_FATAL_STREAM("Invalid 'video_stream_provider': " << video_stream_provider);
return;
NODELET_ERROR_STREAM("Invalid 'video_stream_provider': " << video_stream_provider);
return false;
}
}
NODELET_INFO_STREAM("Video stream provider type detected: " << video_stream_provider_type);
Expand All @@ -281,7 +290,7 @@ virtual void subscribe() {
cap->set(cv::CAP_PROP_FPS, latest_config.set_camera_fps);
if(!cap->isOpened()){
NODELET_ERROR_STREAM("Could not open the stream.");
return;
return false;
}
if (latest_config.width != 0 && latest_config.height != 0){
cap->set(cv::CAP_PROP_FRAME_WIDTH, latest_config.width);
Expand All @@ -301,62 +310,69 @@ virtual void subscribe() {
cap->set(cv::CAP_PROP_EXPOSURE, latest_config.exposure);
}

// cleanup previous session
{
std::lock_guard<std::mutex> g(q_mutex);
while (!framesQueue.empty()) {
framesQueue.pop();
}
has_captured = false;
}

try {
capture_thread = boost::thread(
boost::bind(&VideoStreamNodelet::do_capture, this));
publish_timer = nh->createTimer(
ros::Duration(1.0 / latest_config.fps), &VideoStreamNodelet::do_publish, this);
} catch (std::exception& e) {
NODELET_ERROR_STREAM("Failed to start capture thread: " << e.what());
return false;
}
return true;
}

virtual void unsubscribe() {
ROS_DEBUG("Unsubscribe");
publish_timer.stop();
capture_thread_running = false;
capture_thread.join();
if (capture_thread_running) {
capture_thread_running = false;
capture_thread.join();
}
cap.reset();
}

virtual void connectionCallbackImpl() {
std::lock_guard<std::mutex> lock(s_mutex);
subscriber_num++;
if (subscriber_num == 1) {
subscribe();
if (!subscribe()) {
subscriber_num--;
ROS_DEBUG("not subscribed");
}
else {
ROS_DEBUG("subscribed");
}
}
}

virtual void disconnectionCallbackImpl() {
std::lock_guard<std::mutex> lock(s_mutex);
bool always_subscribe = false;
pnh->getParamCached("always_subscribe", always_subscribe);
if (video_stream_provider == "videofile" || always_subscribe) {
bool always_subscribe;
if (!pnh->getParamCached("always_subscribe", always_subscribe)) {
always_subscribe = false;
}
if (always_subscribe) {
return;
}

subscriber_num--;
if (subscriber_num == 0) {
unsubscribe();
if (subscriber_num > 0) {
subscriber_num--;
if (subscriber_num == 0) {
unsubscribe();
}
}
}

virtual void connectionCallback(const image_transport::SingleSubscriberPublisher&) {
connectionCallbackImpl();
}

virtual void infoConnectionCallback(const ros::SingleSubscriberPublisher&) {
connectionCallbackImpl();
}

virtual void disconnectionCallback(const image_transport::SingleSubscriberPublisher&) {
disconnectionCallbackImpl();
}

virtual void infoDisconnectionCallback(const ros::SingleSubscriberPublisher&) {
disconnectionCallbackImpl();
}

virtual void configCallback(VideoStreamConfig& new_config, uint32_t level) {
NODELET_DEBUG("configCallback");

Expand Down Expand Up @@ -444,15 +460,14 @@ virtual void onInit() {
auto f = boost::bind(&VideoStreamNodelet::configCallback, this, _1, _2);
dyn_srv->setCallback(f);

subscriber_num = 0;
image_transport::SubscriberStatusCallback connect_cb =
boost::bind(&VideoStreamNodelet::connectionCallback, this, _1);
boost::bind(&VideoStreamNodelet::connectionCallbackImpl, this);
ros::SubscriberStatusCallback info_connect_cb =
boost::bind(&VideoStreamNodelet::infoConnectionCallback, this, _1);
boost::bind(&VideoStreamNodelet::connectionCallbackImpl, this);
image_transport::SubscriberStatusCallback disconnect_cb =
boost::bind(&VideoStreamNodelet::disconnectionCallback, this, _1);
boost::bind(&VideoStreamNodelet::disconnectionCallbackImpl, this);
ros::SubscriberStatusCallback info_disconnect_cb =
boost::bind(&VideoStreamNodelet::infoDisconnectionCallback, this, _1);
boost::bind(&VideoStreamNodelet::disconnectionCallbackImpl, this);
pub = image_transport::ImageTransport(*nh).advertiseCamera(
"image_raw", 1,
connect_cb, disconnect_cb,
Expand Down