Skip to content

Commit

Permalink
Use chrono steady_clock for frame timing
Browse files Browse the repository at this point in the history
  • Loading branch information
bjsowa committed Nov 11, 2024
1 parent 697883f commit ed719e5
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 63 deletions.
10 changes: 6 additions & 4 deletions include/web_video_server/image_streamer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef IMAGE_STREAMER_H_
#define IMAGE_STREAMER_H_

#include <chrono>

#include <ros/ros.h>
#include <image_transport/image_transport.h>
#include <opencv2/opencv.hpp>
Expand Down Expand Up @@ -29,7 +31,7 @@ class ImageStreamer
/**
* Restreams the last received image frame if older than max_age.
*/
virtual void restreamFrame(double max_age) = 0;
virtual void restreamFrame(std::chrono::duration<double> max_age) = 0;

std::string getTopic()
{
Expand All @@ -56,8 +58,8 @@ class ImageTransportImageStreamer : public ImageStreamer

protected:
virtual cv::Mat decodeImage(const sensor_msgs::ImageConstPtr& msg);
virtual void sendImage(const cv::Mat &, const ros::Time &time) = 0;
virtual void restreamFrame(double max_age);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point &time) = 0;
virtual void restreamFrame(std::chrono::duration<double> max_age);
virtual void initialize(const cv::Mat &);

image_transport::Subscriber image_sub_;
Expand All @@ -66,7 +68,7 @@ class ImageTransportImageStreamer : public ImageStreamer
bool invert_;
std::string default_transport_;

ros::Time last_frame;
std::chrono::steady_clock::time_point last_frame_;
cv::Mat output_size_image;
boost::mutex send_mutex_;

Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/jpeg_streamers.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class MjpegStreamer : public ImageTransportImageStreamer
ros::NodeHandle& nh);
~MjpegStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point &time);

private:
MultipartStream stream_;
Expand All @@ -41,7 +41,7 @@ class JpegSnapshotStreamer : public ImageTransportImageStreamer
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh);
~JpegSnapshotStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point &time);

private:
int quality_;
Expand Down
6 changes: 4 additions & 2 deletions include/web_video_server/libav_streamer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef LIBAV_STREAMERS_H_
#define LIBAV_STREAMERS_H_

#include <optional>

#include <image_transport/image_transport.h>
#include "web_video_server/image_streamer.h"
#include "async_web_server_cpp/http_request.hpp"
Expand Down Expand Up @@ -32,7 +34,7 @@ class LibavStreamer : public ImageTransportImageStreamer

protected:
virtual void initializeEncoder();
virtual void sendImage(const cv::Mat&, const ros::Time& time);
virtual void sendImage(const cv::Mat&, const std::chrono::steady_clock::time_point& time);
virtual void initialize(const cv::Mat&);
AVOutputFormat* output_format_;
AVFormatContext* format_context_;
Expand All @@ -45,7 +47,7 @@ class LibavStreamer : public ImageTransportImageStreamer
private:
AVFrame* frame_;
struct SwsContext* sws_context_;
ros::Time first_image_timestamp_;
std::optional<std::chrono::steady_clock::time_point> first_image_timestamp_;
boost::mutex encode_mutex_;

std::string format_name_;
Expand Down
10 changes: 5 additions & 5 deletions include/web_video_server/multipart_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace web_video_server
{

struct PendingFooter {
ros::Time timestamp;
std::chrono::steady_clock::time_point timestamp;
boost::weak_ptr<std::string> contents;
};

Expand All @@ -21,10 +21,10 @@ class MultipartStream {
std::size_t max_queue_size=1);

void sendInitialHeader();
void sendPartHeader(const ros::Time &time, const std::string& type, size_t payload_size);
void sendPartFooter(const ros::Time &time);
void sendPartAndClear(const ros::Time &time, const std::string& type, std::vector<unsigned char> &data);
void sendPart(const ros::Time &time, const std::string& type, const boost::asio::const_buffer &buffer,
void sendPartHeader(const std::chrono::steady_clock::time_point &time, const std::string& type, size_t payload_size);
void sendPartFooter(const std::chrono::steady_clock::time_point &time);
void sendPartAndClear(const std::chrono::steady_clock::time_point &time, const std::string& type, std::vector<unsigned char> &data);
void sendPart(const std::chrono::steady_clock::time_point &time, const std::string& type, const boost::asio::const_buffer &buffer,
async_web_server_cpp::HttpConnection::ResourcePtr resource);

private:
Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/png_streamers.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class PngStreamer : public ImageTransportImageStreamer
ros::NodeHandle& nh);
~PngStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point &time);
virtual cv::Mat decodeImage(const sensor_msgs::ImageConstPtr& msg);

private:
Expand All @@ -42,7 +42,7 @@ class PngSnapshotStreamer : public ImageTransportImageStreamer
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh);
~PngSnapshotStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);
virtual void sendImage(const cv::Mat &, const std::chrono::steady_clock::time_point &time);
virtual cv::Mat decodeImage(const sensor_msgs::ImageConstPtr& msg);

private:
Expand Down
6 changes: 3 additions & 3 deletions include/web_video_server/ros_compressed_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ class RosCompressedStreamer : public ImageStreamer
~RosCompressedStreamer();

virtual void start();
virtual void restreamFrame(double max_age);
virtual void restreamFrame(std::chrono::duration<double> max_age);

protected:
virtual void sendImage(const sensor_msgs::CompressedImageConstPtr &msg, const ros::Time &time);
virtual void sendImage(const sensor_msgs::CompressedImageConstPtr &msg, const std::chrono::steady_clock::time_point &time);

private:
void imageCallback(const sensor_msgs::CompressedImageConstPtr &msg);
MultipartStream stream_;
ros::Subscriber image_sub_;
ros::Time last_frame;
std::chrono::steady_clock::time_point last_frame_;
sensor_msgs::CompressedImageConstPtr last_msg;
boost::mutex send_mutex_;
};
Expand Down
2 changes: 1 addition & 1 deletion include/web_video_server/web_video_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class WebVideoServer
async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end);

private:
void restreamFrames(double max_age);
void restreamFrames(std::chrono::duration<double> max_age);
void cleanup_inactive_streams();

ros::NodeHandle nh_;
Expand Down
11 changes: 6 additions & 5 deletions src/image_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ void ImageTransportImageStreamer::initialize(const cv::Mat &)
{
}

void ImageTransportImageStreamer::restreamFrame(double max_age)
void ImageTransportImageStreamer::restreamFrame(std::chrono::duration<double> max_age)
{
if (inactive_ || !initialized_ )
return;
try {
if ( last_frame + ros::Duration(max_age) < ros::Time::now() ) {
if (last_frame_ + max_age < std::chrono::steady_clock::now()) {
boost::mutex::scoped_lock lock(send_mutex_);
sendImage(output_size_image, ros::Time::now() ); // don't update last_frame, it may remain an old value.
// don't update last_frame, it may remain an old value.
sendImage(output_size_image, std::chrono::steady_clock::now());
}
}
catch (boost::system::system_error &e)
Expand Down Expand Up @@ -148,8 +149,8 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr
initialized_ = true;
}

last_frame = ros::Time::now();
sendImage(output_size_image, msg->header.stamp);
last_frame_ = std::chrono::steady_clock::now();
sendImage(output_size_image, last_frame_);
}
catch (cv_bridge::Exception &e)
{
Expand Down
11 changes: 8 additions & 3 deletions src/jpeg_streamers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ MjpegStreamer::~MjpegStreamer()
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void MjpegStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
void MjpegStreamer::sendImage(
const cv::Mat & img,
const std::chrono::steady_clock::time_point & time)
{
std::vector<int> encode_params;
#if CV_VERSION_MAJOR >= 3
Expand Down Expand Up @@ -64,7 +66,9 @@ JpegSnapshotStreamer::~JpegSnapshotStreamer()
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
void JpegSnapshotStreamer::sendImage(
const cv::Mat & img,
const std::chrono::steady_clock::time_point & time)
{
std::vector<int> encode_params;
#if CV_VERSION_MAJOR >= 3
Expand All @@ -78,7 +82,8 @@ void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
cv::imencode(".jpeg", img, encoded_buffer, encode_params);

char stamp[20];
sprintf(stamp, "%.06lf", time.toSec());
snprintf(stamp, sizeof(stamp), "%.06lf",
std::chrono::duration_cast<std::chrono::duration<double>>(time.time_since_epoch()).count());
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok)
.header("Connection", "close")
.header("Server", "web_video_server")
Expand Down
12 changes: 7 additions & 5 deletions src/libav_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ LibavStreamer::LibavStreamer(const async_web_server_cpp::HttpRequest &request,
const std::string &format_name, const std::string &codec_name,
const std::string &content_type) :
ImageTransportImageStreamer(request, connection, nh), output_format_(0), format_context_(0), codec_(0), codec_context_(0), video_stream_(
0), frame_(0), sws_context_(0), first_image_timestamp_(0), format_name_(
0), frame_(0), sws_context_(0), first_image_timestamp_(std::nullopt), format_name_(
format_name), codec_name_(codec_name), content_type_(content_type), opt_(0), io_buffer_(0)
{

Expand Down Expand Up @@ -256,11 +256,12 @@ void LibavStreamer::initializeEncoder()
{
}

void LibavStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
void LibavStreamer::sendImage(
const cv::Mat & img,
const std::chrono::steady_clock::time_point & time)
{
boost::mutex::scoped_lock lock(encode_mutex_);
if (first_image_timestamp_.isZero())
{
if (!first_image_timestamp_.has_value()) {
first_image_timestamp_ = time;
}
std::vector<uint8_t> encoded_frame;
Expand Down Expand Up @@ -353,7 +354,8 @@ void LibavStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
std::size_t size;
uint8_t *output_buf;

double seconds = (time - first_image_timestamp_).toSec();
double seconds = std::chrono::duration_cast<std::chrono::duration<double>>(time -
first_image_timestamp_.value()).count();
// Encode video at 1/0.95 to minimize delay
pkt.pts = (int64_t)(seconds / av_q2d(video_stream_->time_base) * 0.95);
if (pkt.pts <= 0)
Expand Down
45 changes: 27 additions & 18 deletions src/multipart_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ void MultipartStream::sendInitialHeader() {
connection_->write("--"+boundry_+"\r\n");
}

void MultipartStream::sendPartHeader(const ros::Time &time, const std::string& type, size_t payload_size) {
void MultipartStream::sendPartHeader(
const std::chrono::steady_clock::time_point & time, const std::string & type,
size_t payload_size)
{
char stamp[20];
sprintf(stamp, "%.06lf", time.toSec());
snprintf(stamp, sizeof(stamp), "%.06lf",
std::chrono::duration_cast<std::chrono::duration<double>>(time.time_since_epoch()).count());
boost::shared_ptr<std::vector<async_web_server_cpp::HttpHeader> > headers(
new std::vector<async_web_server_cpp::HttpHeader>());
headers->push_back(async_web_server_cpp::HttpHeader("Content-type", type));
Expand All @@ -37,7 +41,8 @@ void MultipartStream::sendPartHeader(const ros::Time &time, const std::string& t
connection_->write(async_web_server_cpp::HttpReply::to_buffers(*headers), headers);
}

void MultipartStream::sendPartFooter(const ros::Time &time) {
void MultipartStream::sendPartFooter(const std::chrono::steady_clock::time_point & time)
{
boost::shared_ptr<std::string> str(new std::string("\r\n--"+boundry_+"\r\n"));
PendingFooter pf;
pf.timestamp = time;
Expand All @@ -46,36 +51,40 @@ void MultipartStream::sendPartFooter(const ros::Time &time) {
if (max_queue_size_ > 0) pending_footers_.push(pf);
}

void MultipartStream::sendPartAndClear(const ros::Time &time, const std::string& type,
std::vector<unsigned char> &data) {
if (!isBusy())
{
void MultipartStream::sendPartAndClear(
const std::chrono::steady_clock::time_point & time, const std::string & type,
std::vector<unsigned char> & data)
{
if (!isBusy()) {
sendPartHeader(time, type, data.size());
connection_->write_and_clear(data);
sendPartFooter(time);
}
}

void MultipartStream::sendPart(const ros::Time &time, const std::string& type,
const boost::asio::const_buffer &buffer,
async_web_server_cpp::HttpConnection::ResourcePtr resource) {
if (!isBusy())
{
void MultipartStream::sendPart(
const std::chrono::steady_clock::time_point & time, const std::string & type,
const boost::asio::const_buffer & buffer,
async_web_server_cpp::HttpConnection::ResourcePtr resource)
{
if (!isBusy()) {
sendPartHeader(time, type, boost::asio::buffer_size(buffer));
connection_->write(buffer, resource);
sendPartFooter(time);
}
}

bool MultipartStream::isBusy() {
ros::Time currentTime = ros::Time::now();
while (!pending_footers_.empty())
{
bool MultipartStream::isBusy()
{
auto current_time = std::chrono::steady_clock::now();
while (!pending_footers_.empty()) {
if (pending_footers_.front().contents.expired()) {
pending_footers_.pop();
} else {
ros::Time footerTime = pending_footers_.front().timestamp;
if ((currentTime - footerTime).toSec() > 0.5) {
auto footer_time = pending_footers_.front().timestamp;
if (std::chrono::duration_cast<std::chrono::duration<double>>((current_time -
footer_time)).count() > 0.5)
{
pending_footers_.pop();
} else {
break;
Expand Down
9 changes: 5 additions & 4 deletions src/png_streamers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ cv::Mat PngStreamer::decodeImage(const sensor_msgs::ImageConstPtr& msg)
}
}


void PngStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
void PngStreamer::sendImage(const cv::Mat & img, const std::chrono::steady_clock::time_point & time)
{
std::vector<int> encode_params;
#if CV_VERSION_MAJOR >= 3
Expand Down Expand Up @@ -95,7 +94,7 @@ cv::Mat PngSnapshotStreamer::decodeImage(const sensor_msgs::ImageConstPtr& msg)
}
}

void PngSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
void PngSnapshotStreamer::sendImage(const cv::Mat &img, const std::chrono::steady_clock::time_point &time)
{
std::vector<int> encode_params;
#if CV_VERSION_MAJOR >= 3
Expand All @@ -109,7 +108,9 @@ void PngSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
cv::imencode(".png", img, encoded_buffer, encode_params);

char stamp[20];
sprintf(stamp, "%.06lf", time.toSec());
snprintf(
stamp, sizeof(stamp), "%.06lf",
std::chrono::duration_cast<std::chrono::duration<double>>(time.time_since_epoch()).count());
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok)
.header("Connection", "close")
.header("Server", "web_video_server")
Expand Down
Loading

0 comments on commit ed719e5

Please sign in to comment.