Skip to content

Commit

Permalink
Make stream_base::shared_state private
Browse files Browse the repository at this point in the history
It's now only exposed indirectly via reader::handler_context and the new
stream_base::get_queue_mutex and stream_base::post.
  • Loading branch information
bmerry committed Jun 27, 2023
1 parent 1dd4c97 commit caebb33
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 14 deletions.
2 changes: 1 addition & 1 deletion include/spead2/recv_chunk_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ void chunk_ring_stream<DataRingbuffer, FreeRingbuffer>::stop()
{
// Locking is probably not needed, as all readers are terminated by
// chunk_stream::stop(). But it should be safe.
std::lock_guard<std::mutex> lock(shared->queue_mutex);
std::lock_guard<std::mutex> lock(get_queue_mutex());
this->graveyard.reset(); // free chunks that didn't make it into data_ring
}
}
Expand Down
59 changes: 58 additions & 1 deletion include/spead2/recv_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ class stream_base
/// Stream configuration
const stream_config config;

protected:
private:
struct shared_state
{
/**
Expand Down Expand Up @@ -667,6 +667,50 @@ class stream_base
*/
virtual void stop_received();

std::mutex &get_queue_mutex() const { return shared->queue_mutex; }

/**
* Schedule a function to be called on an executor, with the lock held.
* This is a fire-and-forget operation. If the stream is stopped before the
* callback fires, the callback is silently ignored.
*/
template<typename ExecutionContext, typename F>
void post(ExecutionContext &ex, F &&func)
{
class wrapper
{
private:
std::shared_ptr<shared_state> shared;
typename std::decay<F>::type func;

public:
wrapper(std::shared_ptr<shared_state> shared, F&& func)
: shared(std::move(shared)), func(std::forward<F>(func))
{
}

/* Prevent copying, while allowing moving (copying is safe but inefficient)
* Move assignment is not implemented because it fails to compile if
* F is not move-assignable. This can probably be solved with
* std::enable_if, but it doesn't seem worth the effort.
*/
wrapper(const wrapper &) = delete;
wrapper &operator=(const wrapper &) = delete;
wrapper(wrapper &&) = default;

void operator()() const
{
std::lock_guard<std::mutex>(shared->queue_mutex);
stream_base *self = shared->self;
if (self)
func(*self);
}
};

// TODO: can do this with a lambda (with perfect forwarding) in C++14
boost::asio::post(ex, wrapper(shared, std::forward<F>(func)));
}

public:
/**
* State for a batch of calls to @ref add_packet. Constructing this object
Expand Down Expand Up @@ -940,6 +984,19 @@ class stream : protected stream_base
/// Actual implementation of @ref stop
void stop_impl();

using stream_base::post; // Make base class version visible, despite being overloaded

/**
* Schedule a function to be called on the stream's io_service, with the
* lock held. This is a fire-and-forget operation. If the stream is stopped
* before the callback fires, the callback is silently dropped.
*/
template<typename F>
void post(F &&func)
{
post(get_io_service(), std::forward<F>(func));
}

public:
using stream_base::get_config;
using stream_base::get_stats;
Expand Down
2 changes: 1 addition & 1 deletion src/recv_chunk_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ void chunk_stream::stop_received()
void chunk_stream::stop()
{
{
std::lock_guard<std::mutex> lock(shared->queue_mutex);
std::lock_guard<std::mutex> lock(get_queue_mutex());
flush_chunks();
}
stream::stop();
Expand Down
17 changes: 6 additions & 11 deletions src/recv_chunk_stream_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,12 @@ void chunk_stream_group_member::heap_ready(live_heap &&lh)

void chunk_stream_group_member::async_flush_until(std::int64_t chunk_id)
{
std::shared_ptr<stream_base::shared_state> shared = this->shared;
// TODO: once we depend on C++14, move rather than copying into the lambda
boost::asio::post(get_io_service(), [shared, chunk_id]() {
std::lock_guard<std::mutex> lock(shared->queue_mutex);
if (!shared->self)
return; // We've stopped, which means everything is flushed
chunk_stream_group_member *self = static_cast<chunk_stream_group_member *>(shared->self);
while (self->chunks.get_head_chunk() < chunk_id && !self->chunks.empty())
post([chunk_id](stream_base &s) {
chunk_stream_group_member &self = static_cast<chunk_stream_group_member &>(s);
while (self.chunks.get_head_chunk() < chunk_id && !self.chunks.empty())
{
self->chunks.flush_head([self](chunk *c) {
self->group.release_chunk(c, self->batch_stats.data());
self.chunks.flush_head([&self](chunk *c) {
self.group.release_chunk(c, self.batch_stats.data());
});
}
});
Expand All @@ -273,7 +268,7 @@ void chunk_stream_group_member::stop()
{
group.stream_pre_stop(*this);
{
std::lock_guard<std::mutex> lock(shared->queue_mutex);
std::lock_guard<std::mutex> lock(get_queue_mutex());
flush_chunks();
}
stream::stop();
Expand Down

0 comments on commit caebb33

Please sign in to comment.