From caebb330d9989e89547765a0c5d754dd7a8f93eb Mon Sep 17 00:00:00 2001 From: Bruce Merry Date: Tue, 27 Jun 2023 16:34:31 +0200 Subject: [PATCH] Make stream_base::shared_state private It's now only exposed indirectly via reader::handler_context and the new stream_base::get_queue_mutex and stream_base::post. --- include/spead2/recv_chunk_stream.h | 2 +- include/spead2/recv_stream.h | 59 +++++++++++++++++++++++++++++- src/recv_chunk_stream.cpp | 2 +- src/recv_chunk_stream_group.cpp | 17 +++------ 4 files changed, 66 insertions(+), 14 deletions(-) diff --git a/include/spead2/recv_chunk_stream.h b/include/spead2/recv_chunk_stream.h index 9c3defc1c..57262df9a 100644 --- a/include/spead2/recv_chunk_stream.h +++ b/include/spead2/recv_chunk_stream.h @@ -890,7 +890,7 @@ void chunk_ring_stream::stop() { // Locking is probably not needed, as all readers are terminated by // chunk_stream::stop(). But it should be safe. - std::lock_guard lock(shared->queue_mutex); + std::lock_guard lock(get_queue_mutex()); this->graveyard.reset(); // free chunks that didn't make it into data_ring } } diff --git a/include/spead2/recv_stream.h b/include/spead2/recv_stream.h index 3a1ca7bda..aa592e5bc 100644 --- a/include/spead2/recv_stream.h +++ b/include/spead2/recv_stream.h @@ -570,7 +570,7 @@ class stream_base /// Stream configuration const stream_config config; -protected: +private: struct shared_state { /** @@ -667,6 +667,50 @@ class stream_base */ virtual void stop_received(); + std::mutex &get_queue_mutex() const { return shared->queue_mutex; } + + /** + * Schedule a function to be called on an executor, with the lock held. + * This is a fire-and-forget operation. If the stream is stopped before the + * callback fires, the callback is silently ignored. + */ + template + void post(ExecutionContext &ex, F &&func) + { + class wrapper + { + private: + std::shared_ptr shared; + typename std::decay::type func; + + public: + wrapper(std::shared_ptr shared, F&& func) + : shared(std::move(shared)), func(std::forward(func)) + { + } + + /* Prevent copying, while allowing moving (copying is safe but inefficient) + * Move assignment is not implemented because it fails to compile if + * F is not move-assignable. This can probably be solved with + * std::enable_if, but it doesn't seem worth the effort. + */ + wrapper(const wrapper &) = delete; + wrapper &operator=(const wrapper &) = delete; + wrapper(wrapper &&) = default; + + void operator()() const + { + std::lock_guard(shared->queue_mutex); + stream_base *self = shared->self; + if (self) + func(*self); + } + }; + + // TODO: can do this with a lambda (with perfect forwarding) in C++14 + boost::asio::post(ex, wrapper(shared, std::forward(func))); + } + public: /** * State for a batch of calls to @ref add_packet. Constructing this object @@ -940,6 +984,19 @@ class stream : protected stream_base /// Actual implementation of @ref stop void stop_impl(); + using stream_base::post; // Make base class version visible, despite being overloaded + + /** + * Schedule a function to be called on the stream's io_service, with the + * lock held. This is a fire-and-forget operation. If the stream is stopped + * before the callback fires, the callback is silently dropped. + */ + template + void post(F &&func) + { + post(get_io_service(), std::forward(func)); + } + public: using stream_base::get_config; using stream_base::get_stats; diff --git a/src/recv_chunk_stream.cpp b/src/recv_chunk_stream.cpp index c79586659..ba7911a4e 100644 --- a/src/recv_chunk_stream.cpp +++ b/src/recv_chunk_stream.cpp @@ -270,7 +270,7 @@ void chunk_stream::stop_received() void chunk_stream::stop() { { - std::lock_guard lock(shared->queue_mutex); + std::lock_guard lock(get_queue_mutex()); flush_chunks(); } stream::stop(); diff --git a/src/recv_chunk_stream_group.cpp b/src/recv_chunk_stream_group.cpp index 3e8dd978e..0d3656486 100644 --- a/src/recv_chunk_stream_group.cpp +++ b/src/recv_chunk_stream_group.cpp @@ -246,17 +246,12 @@ void chunk_stream_group_member::heap_ready(live_heap &&lh) void chunk_stream_group_member::async_flush_until(std::int64_t chunk_id) { - std::shared_ptr shared = this->shared; - // TODO: once we depend on C++14, move rather than copying into the lambda - boost::asio::post(get_io_service(), [shared, chunk_id]() { - std::lock_guard lock(shared->queue_mutex); - if (!shared->self) - return; // We've stopped, which means everything is flushed - chunk_stream_group_member *self = static_cast(shared->self); - while (self->chunks.get_head_chunk() < chunk_id && !self->chunks.empty()) + post([chunk_id](stream_base &s) { + chunk_stream_group_member &self = static_cast(s); + while (self.chunks.get_head_chunk() < chunk_id && !self.chunks.empty()) { - self->chunks.flush_head([self](chunk *c) { - self->group.release_chunk(c, self->batch_stats.data()); + self.chunks.flush_head([&self](chunk *c) { + self.group.release_chunk(c, self.batch_stats.data()); }); } }); @@ -273,7 +268,7 @@ void chunk_stream_group_member::stop() { group.stream_pre_stop(*this); { - std::lock_guard lock(shared->queue_mutex); + std::lock_guard lock(get_queue_mutex()); flush_chunks(); } stream::stop();