diff --git a/doc/dev-recv-chunk-group.rst b/doc/dev-recv-chunk-group.rst index 2c51db0ed..6231ed737 100644 --- a/doc/dev-recv-chunk-group.rst +++ b/doc/dev-recv-chunk-group.rst @@ -18,16 +18,16 @@ to the chunk at the time. Additionally, it might not be possible to allocate a new chunk until an old chunk is flushed e.g., if there is a fixed pool of chunks rather than dynamic allocation. -Each chunk has a reference count, indicating the number of streams that still -have the chunk in their window. This reference count is non-atomic since it is -protected by the group's mutex. When the group wishes to evict a chunk, it -first needs to wait for the reference count of the head chunk to drop to zero. -It needs a way to be notified that it should try again, which is provided by a -condition variable. Using a condition variable (rather than, say, replacing -the simple reference count with a semaphore) allows the group mutex to be -dropped while waiting, which prevents the deadlocks that might otherwise occur -if the mutex was held while waiting and another stream was attemping to lock -the group mutex to make forward progress. +The group keeps its own copy of the head pointers (oldest heap) from the +individual streams, protected by the group mutex rather than the stream +mutexes. This allows the group to track the oldest chunk that any stream owns +or may potentially own in future (``min_head_chunk``). When the group wishes to +evict a chunk, it first needs to wait for ``min_head_chunk`` to become greater +than the ID of the chunk to be evicted. The wait is achieved using a condition +variable that is notified whenever ``min_head_chunk`` increases. This allows +the group mutex to be dropped while waiting, which prevents the deadlocks that +might otherwise occur if the mutex was held while waiting and another stream +was attemping to lock the group mutex to make forward progress. In lossless eviction mode, this is all that is needed, although it is non-trivial to see that this won't deadlock with all the streams sitting in diff --git a/doc/recv-chunk-group.rst b/doc/recv-chunk-group.rst index c5f3c1587..ddb16ce88 100644 --- a/doc/recv-chunk-group.rst +++ b/doc/recv-chunk-group.rst @@ -66,9 +66,9 @@ performance, and thus some care is needed to use it safely. as otherwise deadlocks can occur. For example, if they share a thread pool, the pool must have at least as many threads as streams. It's recommended that each stream has its own single-threaded thread pool. -- The streams should all be added to the group before adding any readers to - the streams. Things will probably work even if this is not done, but the - design is sufficiently complicated that it is not advisable. +- The streams must all be added to the group before adding any readers to + the streams. Once data has group has received some data, an exception will + be thrown if one attempts to add a new stream. - The stream ID associated with each chunk will be the stream ID of one of the component streams, but it is undefined which one. - When the allocate and ready callbacks are invoked, it's not specified which diff --git a/include/spead2/recv_chunk_stream.h b/include/spead2/recv_chunk_stream.h index 615bae5db..33a1540ad 100644 --- a/include/spead2/recv_chunk_stream.h +++ b/include/spead2/recv_chunk_stream.h @@ -54,14 +54,6 @@ class chunk friend class chunk_stream_group; template friend class detail::chunk_ring_pair; private: - /** - * Reference count for chunks belonging to stream groups. - * - * This must only be manipulated from a single thread at a time e.g. - * with the group's mutex locked. - */ - std::size_t ref_count = 0; - /// Linked list of chunks to dispose of at shutdown std::unique_ptr graveyard_next; @@ -247,14 +239,38 @@ class chunk_window head_pos = 0; // wrap around the circular buffer } + /// Send the oldest chunk to the ready callback + template + void flush_head(const F1 &ready_chunk, const F2 &head_updated) + { + flush_head(ready_chunk); + head_updated(head_chunk); + } + + /// Send all the chunks to the ready callback + template + void flush_all(const F1 &ready_chunk, const F2 &head_updated) + { + if (!empty()) + { + while (!empty()) + flush_head(ready_chunk); + head_updated(head_chunk); + } + } + /// Flush until the head is at least @a target - template - void flush_until(std::int64_t target, const F &ready_chunk) + template + void flush_until(std::int64_t target, const F1 &ready_chunk, const F2 &head_updated) { - while (head_chunk != tail_chunk && head_chunk < target) - flush_head(ready_chunk); - if (head_chunk == tail_chunk && head_chunk < target) - head_chunk = tail_chunk = target; + if (head_chunk < target) + { + while (head_chunk != tail_chunk && head_chunk < target) + flush_head(ready_chunk); + if (head_chunk == tail_chunk && head_chunk < target) + head_chunk = tail_chunk = target; + head_updated(target); + } } explicit chunk_window(std::size_t max_chunks); @@ -282,11 +298,14 @@ class chunk_window * Obtain a pointer to a chunk with ID @a chunk_id. * * If @a chunk_id is behind the window, returns nullptr. If it is ahead of - * the window, the window is advanced using @a ready_chunk and @a allocate_chunk. + * the window, the window is advanced using @a allocate_chunk and + * @a ready_chunk. If the head_chunk is updated, the new value is passed to + * @a head_updated. */ - template + template chunk *get_chunk( - std::int64_t chunk_id, std::uintptr_t stream_id, const F1 &allocate_chunk, const F2 &ready_chunk) + std::int64_t chunk_id, std::uintptr_t stream_id, + const F1 &allocate_chunk, const F2 &ready_chunk, const F3 &head_updated) { const std::size_t max_chunks = chunks.size(); if (chunk_id >= head_chunk) @@ -300,14 +319,13 @@ class chunk_window * We leave it to the while loop below to actually allocate * the chunks. */ - while (!empty()) - flush_head(ready_chunk); + flush_all(ready_chunk, head_updated); head_chunk = tail_chunk = chunk_id - (max_chunks - 1); } while (chunk_id >= tail_chunk) { if (std::size_t(tail_chunk - head_chunk) == max_chunks) - flush_head(ready_chunk); + flush_head(ready_chunk, head_updated); chunks[tail_pos] = allocate_chunk(tail_chunk); if (chunks[tail_pos]) { @@ -459,6 +477,7 @@ class chunk_manager_simple std::uint64_t *get_batch_stats(chunk_stream_state &state) const; chunk *allocate_chunk(chunk_stream_state &state, std::int64_t chunk_id); void ready_chunk(chunk_stream_state &state, chunk *c); + void head_updated(chunk_stream_state &state, std::int64_t head_chunk) {} }; /** @@ -682,8 +701,10 @@ stream_config chunk_stream_state::adjust_config(const stream_config &config) template void chunk_stream_state::flush_chunks() { - while (!chunks.empty()) - chunks.flush_head([this](chunk *c) { chunk_manager.ready_chunk(*this, c); }); + chunks.flush_all( + [this](chunk *c) { chunk_manager.ready_chunk(*this, c); }, + [this](std::int64_t head_chunk) { chunk_manager.head_updated(*this, head_chunk); } + ); } template @@ -754,7 +775,8 @@ chunk_stream_state::allocate(std::size_t size, const packet_header &packet) chunk_id, stream_id, [this](std::int64_t chunk_id) { return chunk_manager.allocate_chunk(*this, chunk_id); }, - [this](chunk *c) { chunk_manager.ready_chunk(*this, c); } + [this](chunk *c) { chunk_manager.ready_chunk(*this, c); }, + [this](std::int64_t head_chunk) { chunk_manager.head_updated(*this, head_chunk); } ); if (chunk_ptr) { diff --git a/include/spead2/recv_chunk_stream_group.h b/include/spead2/recv_chunk_stream_group.h index 5a3ee1aff..32eabac79 100644 --- a/include/spead2/recv_chunk_stream_group.h +++ b/include/spead2/recv_chunk_stream_group.h @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -103,7 +104,8 @@ class chunk_manager_group std::uint64_t *get_batch_stats(chunk_stream_state &state) const; chunk *allocate_chunk(chunk_stream_state &state, std::int64_t chunk_id); - void ready_chunk(chunk_stream_state &state, chunk *c); + void ready_chunk(chunk_stream_state &state, chunk *c) {} + void head_updated(chunk_stream_state &state, std::int64_t head_chunk); }; } // namespace detail @@ -152,6 +154,15 @@ class chunk_stream_group */ std::vector> streams; + /** + * Copy of the head chunk ID from each stream. This copy is protected by + * the group's mutex rather than the streams'. + */ + std::vector head_chunks; + + /// Minimum element of head_chunks + std::int64_t min_head_chunk = 0; + /** * Last value passed to all streams' async_flush_until. */ @@ -172,18 +183,22 @@ class chunk_stream_group chunk *get_chunk(std::int64_t chunk_id, std::uintptr_t stream_id, std::uint64_t *batch_stats); /** - * Decrement chunk reference count. - * - * If the reference count reaches zero, the chunk is valid to pass to - * the ready callback. - * - * This function is thread-safe. + * Update the head_chunk copy for a stream. This version assumes the caller takes + * the mutex, and is only used internally. */ - void release_chunk(chunk *c, std::uint64_t *batch_stats); + void stream_head_updated_unlocked(chunk_stream_group_member &s, std::int64_t head_chunk); + + /** + * Called by a stream to report movement in its head pointer. This function + * takes the group mutex. + */ + void stream_head_updated(chunk_stream_group_member &s, std::int64_t head_chunk); /** * Pass a chunk to the user-provided ready function. The caller is - * responsible for ensuring that c->ref_count is zero. + * responsible for ensuring that the chunk is no longer in use. + * + * The caller must hold the group mutex. */ void ready_chunk(chunk *c, std::uint64_t *batch_stats); @@ -295,6 +310,7 @@ class chunk_stream_group_member : private detail::chunk_stream_state T &chunk_stream_group::emplace_back(Args&&... args) { std::lock_guard lock(mutex); - std::unique_ptr stream(new T(*this, std::forward(args)...)); + if (chunks.get_tail_chunk() != 0 || last_flush_until != 0) + { + throw std::runtime_error("Cannot add a stream after group has started receiving data"); + } + std::unique_ptr stream(new T( + *this, streams.size(), std::forward(args)...)); chunk_stream_group_member &ret = *stream; streams.push_back(std::move(stream)); + head_chunks.push_back(0); + min_head_chunk = 0; // shouldn't be necessary, but just in case live_streams++; - if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSY - && last_flush_until > 0) - { - ret.async_flush_until(last_flush_until); - } stream_added(ret); return ret; } diff --git a/src/recv_chunk_stream_group.cpp b/src/recv_chunk_stream_group.cpp index 129e4376c..3c07ab2e0 100644 --- a/src/recv_chunk_stream_group.cpp +++ b/src/recv_chunk_stream_group.cpp @@ -21,6 +21,8 @@ #include #include #include +#include +#include #include #include @@ -76,10 +78,10 @@ chunk *chunk_manager_group::allocate_chunk( return group.get_chunk(chunk_id, state.stream_id, state.place_data->batch_stats); } -void chunk_manager_group::ready_chunk(chunk_stream_state &state, chunk *c) +void chunk_manager_group::head_updated( + chunk_stream_state &state, std::int64_t head_chunk) { - std::uint64_t *batch_stats = static_cast(&state)->batch_stats.data(); - group.release_chunk(c, batch_stats); + group.stream_head_updated(static_cast(state), head_chunk); } } // namespace detail @@ -152,16 +154,18 @@ void chunk_stream_group::stop() void chunk_stream_group::stream_stop_received(chunk_stream_group_member &s) { std::lock_guard lock(mutex); + // Set the head_chunk to the largest possible value, so that this stream + // no longer blocks anything. + stream_head_updated_unlocked(s, std::numeric_limits::max()); if (--live_streams == 0) { // Once all the streams have stopped, make all the chunks in the - // window available. It's not necessary to check c->ref_count - // because no stream can have a reference once they're all stopped. + // window available. std::uint64_t *batch_stats = s.batch_stats.data(); - while (!chunks.empty()) - { - chunks.flush_head([this, batch_stats](chunk *c) { ready_chunk(c, batch_stats); }); - } + chunks.flush_all( + [this, batch_stats](chunk *c) { ready_chunk(c, batch_stats); }, + [](std::int64_t) {} + ); } } @@ -190,9 +194,8 @@ chunk *chunk_stream_group::get_chunk(std::int64_t chunk_id, std::uintptr_t strea } while (chunks.get_head_chunk() < std::min(chunks.get_tail_chunk(), target)) { - chunk *c = chunks.get_chunk(chunks.get_head_chunk()); - if (!c || c->ref_count == 0) - chunks.flush_head([this, batch_stats](chunk *c2) { ready_chunk(c2, batch_stats); }); + if (min_head_chunk > chunks.get_head_chunk()) + chunks.flush_head([this, batch_stats](chunk *c) { ready_chunk(c, batch_stats); }); else ready_condition.wait(lock); } @@ -207,35 +210,48 @@ chunk *chunk_stream_group::get_chunk(std::int64_t chunk_id, std::uintptr_t strea [](chunk *) { // Should be unreachable, as we've done the necessary flushing above assert(false); - } + }, + [](std::int64_t) {} // Don't need notification for head moving ); - if (c) - c->ref_count++; return c; } void chunk_stream_group::ready_chunk(chunk *c, std::uint64_t *batch_stats) { - assert(c->ref_count == 0); + assert(c->chunk_id < min_head_chunk); std::unique_ptr owned(c); config.get_ready()(std::move(owned), batch_stats); } -void chunk_stream_group::release_chunk(chunk *c, std::uint64_t *batch_stats) +void chunk_stream_group::stream_head_updated_unlocked(chunk_stream_group_member &s, std::int64_t head_chunk) +{ + std::size_t stream_index = s.group_index; + std::int64_t old = head_chunks[stream_index]; + head_chunks[stream_index] = head_chunk; + // Update min_head_chunk. We can skip the work if we weren't previously the oldest. + if (min_head_chunk == old) + { + min_head_chunk = *std::min_element(head_chunks.begin(), head_chunks.end()); + if (min_head_chunk != old) + ready_condition.notify_all(); + } +} + +void chunk_stream_group::stream_head_updated(chunk_stream_group_member &s, std::int64_t head_chunk) { std::lock_guard lock(mutex); - if (--c->ref_count == 0) - ready_condition.notify_all(); + stream_head_updated_unlocked(s, head_chunk); } chunk_stream_group_member::chunk_stream_group_member( chunk_stream_group &group, + std::size_t group_index, io_service_ref io_service, const stream_config &config, const chunk_stream_config &chunk_config) : chunk_stream_state(config, chunk_config, detail::chunk_manager_group(group)), stream(std::move(io_service), adjust_config(config)), - group(group) + group(group), group_index(group_index) { if (chunk_config.get_max_chunks() > group.config.get_max_chunks()) throw std::invalid_argument("stream max_chunks must not be larger than group max_chunks"); @@ -250,9 +266,13 @@ void chunk_stream_group_member::async_flush_until(std::int64_t chunk_id) { post([chunk_id](stream_base &s) { chunk_stream_group_member &self = static_cast(s); - self.chunks.flush_until(chunk_id, [&self](chunk *c) { - self.group.release_chunk(c, self.batch_stats.data()); - }); + self.chunks.flush_until( + chunk_id, + [](chunk *) {}, + [&self](std::int64_t head_chunk) { + self.group.stream_head_updated(self, head_chunk); + } + ); }); } diff --git a/tests/test_recv_chunk_stream_group.py b/tests/test_recv_chunk_stream_group.py index 25e431ddb..6aa91034e 100644 --- a/tests/test_recv_chunk_stream_group.py +++ b/tests/test_recv_chunk_stream_group.py @@ -28,6 +28,8 @@ ) STREAMS = 4 +LOSSY_PARAM = pytest.param(recv.ChunkStreamGroupConfig.EvictionMode.LOSSY, id="lossy") +LOSSLESS_PARAM = pytest.param(recv.ChunkStreamGroupConfig.EvictionMode.LOSSLESS, id="lossless") class TestChunkStreamGroupConfig: @@ -75,9 +77,13 @@ def free_ring(self): def queues(self): return [spead2.InprocQueue() for _ in range(STREAMS)] + @pytest.fixture(params=[LOSSY_PARAM, LOSSLESS_PARAM]) + def eviction_mode(self, request): + return request.param + @pytest.fixture - def group(self, data_ring, free_ring, queues): - group_config = recv.ChunkStreamGroupConfig(max_chunks=4) + def group(self, eviction_mode, data_ring, free_ring, queues): + group_config = recv.ChunkStreamGroupConfig(max_chunks=4, eviction_mode=eviction_mode) group = recv.ChunkStreamRingGroup(group_config, data_ring, free_ring) # max_heaps is artificially high to make test_packet_too_old work config = spead2.recv.StreamConfig(max_heaps=128) @@ -101,81 +107,103 @@ def group(self, data_ring, free_ring, queues): def send_stream(self, queues): return send.InprocStream(spead2.ThreadPool(), queues, send.StreamConfig()) - def _send_data(self, send_stream, data, heaps=None): + def _send_data(self, send_stream, data, eviction_mode, heaps=None): """Send the data. To send only a subset of heaps (or to send out of order), pass the indices to skip in `heaps`. """ + lossy = (eviction_mode == recv.ChunkStreamGroupConfig.EvictionMode.LOSSY) data_by_heap = data.reshape(-1, HEAP_PAYLOAD_SIZE) ig = spead2.send.ItemGroup() ig.add_item(0x1000, 'position', 'position in stream', (), format=[('u', 32)]) ig.add_item(0x1001, 'payload', 'payload data', (HEAP_PAYLOAD_SIZE,), dtype=np.uint8) - # Stream groups are impractical to test deterministically, because - # they rely on concurrent forward progress. So we just feed the - # data in slowly enough that we expect heaps provided before a - # sleep to be processed before those after the sleep. - if heaps is None: - heaps = range(len(data_by_heap)) + # In lossy mode the behaviour is inherently non-deterministic. + # We just feed the data in slowly enough that we expect heaps provided + # before a sleep to be processed before those after the sleep. for i in heaps: ig['position'].value = i ig['payload'].value = data_by_heap[i] heap = ig.get_heap(data='all', descriptors='none') send_stream.send_heap(heap, substream_index=i % STREAMS) - time.sleep(0.001) - # Stop all the queues, which should flush everything and stop the - # data ring. - for queue in send_stream.queues: - queue.stop() + if lossy: + time.sleep(0.001) - def test_full_in_order(self, group, queues, send_stream, data_ring, free_ring): - """Send all the data, in order.""" - chunks = 20 - rng = np.random.default_rng(seed=1) - data = rng.integers(0, 256, chunks * CHUNK_PAYLOAD_SIZE, np.uint8) - data_by_chunk = data.reshape(chunks, -1) - send_thread = threading.Thread(target=self._send_data, args=(send_stream, data)) - send_thread.start() + def _verify(self, group, data, expected_present): + expected_present = expected_present.reshape(-1, HEAPS_PER_CHUNK) + chunks = len(expected_present) + data_by_heap = data.reshape(chunks, HEAPS_PER_CHUNK, -1) - for i in range(chunks): - chunk = data_ring.get() + for i in range(len(expected_present)): + chunk = group.data_ringbuffer.get() assert chunk.chunk_id == i - np.testing.assert_equal(chunk.present, 1) - np.testing.assert_equal(chunk.data, data_by_chunk[i]) + np.testing.assert_equal(chunk.present, expected_present[i]) + actual_data = chunk.data.reshape(HEAPS_PER_CHUNK, -1) + for j in range(HEAPS_PER_CHUNK): + if expected_present[i, j]: + np.testing.assert_equal(actual_data[j], data_by_heap[i, j]) group.add_free_chunk(chunk) # Stopping all the queues should shut down the data ringbuffer with pytest.raises(spead2.Stopped): - data_ring.get() + group.data_ringbuffer.get() + + def _test_simple(self, group, send_stream, chunks, heaps): + """Send a given set of heaps (in order) and check that they arrive correctly.""" + rng = np.random.default_rng(seed=1) + data = rng.integers(0, 256, chunks * CHUNK_PAYLOAD_SIZE, np.uint8) + data_by_heap = data.reshape(chunks, HEAPS_PER_CHUNK, -1) + + def send(): + self._send_data(send_stream, data, group.config.eviction_mode, heaps) + # Stop all the queues, which should flush everything and stop the + # data ring. + for queue in send_stream.queues: + queue.stop() + + send_thread = threading.Thread(target=send) + send_thread.start() + + expected_present = np.zeros(chunks * HEAPS_PER_CHUNK, np.uint8) + expected_present[heaps] = True + self._verify(group, data, expected_present) send_thread.join() - def test_missing_stream(self, group, queues, send_stream, data_ring, free_ring): + def test_full_in_order(self, group, send_stream): + """Send all the data, in order.""" + chunks = 20 + heaps = list(range(chunks * HEAPS_PER_CHUNK)) + self._test_simple(group, send_stream, chunks, heaps) + + def test_missing_stream(self, group, send_stream): """Skip sending data to one of the streams.""" chunks = 20 + heaps = [i for i in range(chunks * HEAPS_PER_CHUNK) if i % STREAMS != 2] + self._test_simple(group, send_stream, chunks, heaps) + + @pytest.mark.parametrize("eviction_mode", [LOSSLESS_PARAM]) + def test_lossless_late_stream(self, group, send_stream): + """Send one stream later than the others, to make sure lossless mode really works.""" rng = np.random.default_rng(seed=1) + chunks = 20 data = rng.integers(0, 256, chunks * CHUNK_PAYLOAD_SIZE, np.uint8) - data_by_heap = data.reshape(chunks, HEAPS_PER_CHUNK, -1) - heaps = [i for i in range(chunks * HEAPS_PER_CHUNK) if i % STREAMS != 2] - send_thread = threading.Thread(target=self._send_data, args=(send_stream, data, heaps)) + heaps1 = [i for i in range(chunks * HEAPS_PER_CHUNK) if i % STREAMS != 2] + heaps2 = [i for i in range(chunks * HEAPS_PER_CHUNK) if i % STREAMS == 2] + + def send(): + self._send_data(send_stream, data, group.config.eviction_mode, heaps1) + time.sleep(0.01) + self._send_data(send_stream, data, group.config.eviction_mode, heaps2) + # Stop all the queues, which should flush everything and stop the + # data ring. + for queue in send_stream.queues: + queue.stop() + + send_thread = threading.Thread(target=send) send_thread.start() - expected_present = np.ones(chunks * HEAPS_PER_CHUNK, bool) - expected_present[2::STREAMS] = False - expected_present = expected_present.reshape(chunks, HEAPS_PER_CHUNK) - - for i in range(chunks): - chunk = data_ring.get() - assert chunk.chunk_id == i - np.testing.assert_equal(chunk.present, expected_present[i]) - actual_data = chunk.data.reshape(HEAPS_PER_CHUNK, -1) - for j in range(HEAPS_PER_CHUNK): - if expected_present[i, j]: - np.testing.assert_equal(actual_data[j], data_by_heap[i, j]) - group.add_free_chunk(chunk) - - # Stopping all the queues should shut down the data ringbuffer - with pytest.raises(spead2.Stopped): - data_ring.get() + expected_present = np.ones(chunks * HEAPS_PER_CHUNK, np.uint8) + self._verify(group, data, expected_present) send_thread.join()