diff --git a/src/video_stream.cpp b/src/video_stream.cpp index 88ff61e..8cf1cf5 100644 --- a/src/video_stream.cpp +++ b/src/video_stream.cpp @@ -63,6 +63,7 @@ boost::shared_ptr > dyn_srv; VideoStreamConfig config; std::mutex q_mutex, s_mutex, c_mutex, p_mutex; std::queue framesQueue; +bool has_captured; cv::Mat frame; boost::shared_ptr cap; std::string video_stream_provider; @@ -105,7 +106,7 @@ virtual sensor_msgs::CameraInfo get_default_camera_info_from_image(sensor_msgs:: virtual void do_capture() { NODELET_DEBUG("Capture thread started"); - cv::Mat frame; + cv::Mat capture_frame; VideoStreamConfig latest_config = config; ros::Rate camera_fps_rate(latest_config.set_camera_fps); @@ -122,13 +123,18 @@ virtual void do_capture() { cv::waitKey(100); continue; } - if (!cap->read(frame)) { - NODELET_ERROR_STREAM_THROTTLE(1.0, "Could not capture frame (frame_counter: " << frame_counter << ")"); - if (latest_config.reopen_on_read_failure) { - NODELET_WARN_STREAM_THROTTLE(1.0, "trying to reopen the device"); - unsubscribe(); - subscribe(); + try { + if (!cap->read(capture_frame)) { + NODELET_ERROR_STREAM_THROTTLE(1.0, "Could not capture frame (frame_counter: " << frame_counter << ")"); + if (latest_config.reopen_on_read_failure) { + NODELET_WARN_STREAM_THROTTLE(1.0, "trying to reopen the device"); + unsubscribe(); + subscribe(); + } } + } catch (const cv::Exception &e) { + NODELET_WARN("Error reading frame"); + continue; } frame_counter++; @@ -153,13 +159,14 @@ virtual void do_capture() { } } - if(!frame.empty()) { + if(!capture_frame.empty()) { std::lock_guard g(q_mutex); // accumulate only until max_queue_size while (framesQueue.size() >= latest_config.buffer_queue_size) { framesQueue.pop(); } - framesQueue.push(frame.clone()); + framesQueue.push(capture_frame.clone()); + has_captured = true; } } NODELET_DEBUG("Capture thread finished"); @@ -167,6 +174,7 @@ virtual void do_capture() { virtual void do_publish(const ros::TimerEvent& event) { bool is_new_image = false; + bool ready_to_publish = false; sensor_msgs::ImagePtr msg; std_msgs::Header header; VideoStreamConfig latest_config; @@ -184,10 +192,11 @@ virtual void do_publish(const ros::TimerEvent& event) { framesQueue.pop(); is_new_image = true; } + ready_to_publish = !frame.empty() && has_captured; } // Check if grabbed frame is actually filled with some content - if(!frame.empty() && is_new_image) { + if(ready_to_publish && is_new_image) { // From http://docs.opencv.org/modules/core/doc/operations_on_arrays.html#void flip(InputArray src, OutputArray dst, int flipCode) // FLIP_HORIZONTAL == 1, FLIP_VERTICAL == 0 or FLIP_BOTH == -1 // Flip the image if necessary @@ -220,7 +229,7 @@ virtual void do_publish(const ros::TimerEvent& event) { } } -virtual void subscribe() { +virtual bool subscribe() { ROS_DEBUG("Subscribe"); VideoStreamConfig& latest_config = config; // initialize camera info publisher @@ -263,8 +272,8 @@ virtual void subscribe() { cap->set(cv::CAP_PROP_POS_FRAMES, latest_config.start_frame); } if (!cap->isOpened()) { - NODELET_FATAL_STREAM("Invalid 'video_stream_provider': " << video_stream_provider); - return; + NODELET_ERROR_STREAM("Invalid 'video_stream_provider': " << video_stream_provider); + return false; } } NODELET_INFO_STREAM("Video stream provider type detected: " << video_stream_provider_type); @@ -281,7 +290,7 @@ virtual void subscribe() { cap->set(cv::CAP_PROP_FPS, latest_config.set_camera_fps); if(!cap->isOpened()){ NODELET_ERROR_STREAM("Could not open the stream."); - return; + return false; } if (latest_config.width != 0 && latest_config.height != 0){ cap->set(cv::CAP_PROP_FRAME_WIDTH, latest_config.width); @@ -301,6 +310,15 @@ virtual void subscribe() { cap->set(cv::CAP_PROP_EXPOSURE, latest_config.exposure); } + // cleanup previous session + { + std::lock_guard g(q_mutex); + while (!framesQueue.empty()) { + framesQueue.pop(); + } + has_captured = false; + } + try { capture_thread = boost::thread( boost::bind(&VideoStreamNodelet::do_capture, this)); @@ -308,14 +326,18 @@ virtual void subscribe() { ros::Duration(1.0 / latest_config.fps), &VideoStreamNodelet::do_publish, this); } catch (std::exception& e) { NODELET_ERROR_STREAM("Failed to start capture thread: " << e.what()); + return false; } + return true; } virtual void unsubscribe() { ROS_DEBUG("Unsubscribe"); publish_timer.stop(); - capture_thread_running = false; - capture_thread.join(); + if (capture_thread_running) { + capture_thread_running = false; + capture_thread.join(); + } cap.reset(); } @@ -323,40 +345,34 @@ virtual void connectionCallbackImpl() { std::lock_guard lock(s_mutex); subscriber_num++; if (subscriber_num == 1) { - subscribe(); + if (!subscribe()) { + subscriber_num--; + ROS_DEBUG("not subscribed"); + } + else { + ROS_DEBUG("subscribed"); + } } } virtual void disconnectionCallbackImpl() { std::lock_guard lock(s_mutex); - bool always_subscribe = false; - pnh->getParamCached("always_subscribe", always_subscribe); - if (video_stream_provider == "videofile" || always_subscribe) { + bool always_subscribe; + if (!pnh->getParamCached("always_subscribe", always_subscribe)) { + always_subscribe = false; + } + if (always_subscribe) { return; } - subscriber_num--; - if (subscriber_num == 0) { - unsubscribe(); + if (subscriber_num > 0) { + subscriber_num--; + if (subscriber_num == 0) { + unsubscribe(); + } } } -virtual void connectionCallback(const image_transport::SingleSubscriberPublisher&) { - connectionCallbackImpl(); -} - -virtual void infoConnectionCallback(const ros::SingleSubscriberPublisher&) { - connectionCallbackImpl(); -} - -virtual void disconnectionCallback(const image_transport::SingleSubscriberPublisher&) { - disconnectionCallbackImpl(); -} - -virtual void infoDisconnectionCallback(const ros::SingleSubscriberPublisher&) { - disconnectionCallbackImpl(); -} - virtual void configCallback(VideoStreamConfig& new_config, uint32_t level) { NODELET_DEBUG("configCallback"); @@ -444,15 +460,14 @@ virtual void onInit() { auto f = boost::bind(&VideoStreamNodelet::configCallback, this, _1, _2); dyn_srv->setCallback(f); - subscriber_num = 0; image_transport::SubscriberStatusCallback connect_cb = - boost::bind(&VideoStreamNodelet::connectionCallback, this, _1); + boost::bind(&VideoStreamNodelet::connectionCallbackImpl, this); ros::SubscriberStatusCallback info_connect_cb = - boost::bind(&VideoStreamNodelet::infoConnectionCallback, this, _1); + boost::bind(&VideoStreamNodelet::connectionCallbackImpl, this); image_transport::SubscriberStatusCallback disconnect_cb = - boost::bind(&VideoStreamNodelet::disconnectionCallback, this, _1); + boost::bind(&VideoStreamNodelet::disconnectionCallbackImpl, this); ros::SubscriberStatusCallback info_disconnect_cb = - boost::bind(&VideoStreamNodelet::infoDisconnectionCallback, this, _1); + boost::bind(&VideoStreamNodelet::disconnectionCallbackImpl, this); pub = image_transport::ImageTransport(*nh).advertiseCamera( "image_raw", 1, connect_cb, disconnect_cb,