-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add chunk stream groups #219
Merged
Merged
Changes from all commits
Commits
Show all changes
74 commits
Select commit
Hold shift + click to select a range
bc22a99
Fix some pytest deprecation warnings
bmerry feea31e
Refactor chunk_stream_state
bmerry dfcdddb
Move template code into header file
bmerry 38e23db
Factor out chunk_window
bmerry 1c5c176
Work in progress on chunk stream groups
bmerry 3fe8ccf
WIP on shutdown rewrite
bmerry 5befeaa
Eliminate some std::shared_ptr refcount twiddling
bmerry c24dddd
Fix signedness of a chunk_id parameter
bmerry 3e1735c
Change chunk graveyard to a linked list
bmerry e647275
Write a bunch more code for chunk_stream_ring_group
bmerry c763ba1
Avoid allocating memory in chunk_stream_group::stop
bmerry cad9f0f
Make some method protected not private
bmerry d627bba
Don't change methods from protected to private when overriding
bmerry f8fe714
Start wiring up chunk_stream_group to Python
bmerry c463a8e
Suppress a warning in release builds
bmerry cfb915f
Add Python wrapper for chunk_stream_group_config
bmerry 80c3c90
More Python wiring/fixups for chunk_stream_group
bmerry c1b3c99
Fix mismatch of ::operator new / operator delete[]
bmerry 5115ca9
Fix incorrect indexing in some py::keep_alive call
bmerry b92c237
Fix a typo in an error message
bmerry 2ba5978
Make chunk_stream_group constructor explicit
bmerry aa1f03e
Fix constructor for chunk_stream_group
bmerry 672b8f5
Fix incorrect locking in chunk_stream_group
bmerry cf76a8e
Make Python chunk_stream_group.add_free_chunk work
bmerry d15baee
Another redesign of chunk_stream_group synchronisation
bmerry eca67b7
Prevent stream's max_chunks from exceeding the group's
bmerry 108a4bf
Ensure that stopping group member streams doesn't lose chunks
bmerry f91d946
Add initial unit tests for chunk_stream_group
bmerry d2da046
Add chunk_window::empty helper
bmerry c6b888f
Fix bug in stream group flushing
bmerry 3a9fcbc
Add chunk_stream_group test where one stream does nothing
bmerry 88eca2d
Add overview documentation for chunk_stream_group
bmerry 9339d57
Add chunk_stream_group_config::eviction_mode
bmerry 5fb5be8
Make chunk_stream_group own the streams
bmerry 714f5c3
Add ChunkRingPair to public interface
bmerry dda788c
Fix Python wrapper for chunk_ring_group
bmerry a62fef2
Fix a bug with unwanted sharing of stats between stream_config
bmerry c67734c
Update recv/__init__.pyi for chunk stream groups
bmerry 1dd4c97
Fix type annotation for ChunkRingStream constructor
bmerry caebb33
Make stream_base::shared_state private
bmerry 3c085f7
Fix some errors from doxygen
bmerry bdd4c3b
Add reference C++ documentation for chunk stream groups
bmerry 15ac3d7
More documentation chunk chunk stream group C++ API
bmerry 6e1eb3b
Doc that chunk_stream_group presents a vector-like interface
bmerry 19936e1
Remove proactive flushing in chunk_stream_group
bmerry 3482abf
Simplify flushing in chunk_stream_group::get_chunk
bmerry 2cddd9f
Fix corner case in async_flush_until
bmerry afab1f7
Reduce number of calls to async_flush_until
bmerry 53927e7
Remove unnecessary reset of head_pos = tail_pos = 0
bmerry 186dea6
Add some developer documentation
bmerry 8405e45
Add example C++ program using chunk_stream_ring_group
bmerry ac79277
Add Python example code for ChunkStreamRingGroup
bmerry 1883687
Fix a null pointer dereference bug
bmerry 015db87
Add `config` property to ChunkStreamRingGroup
bmerry 64ba802
chunk_stream_group: remove refcounts
bmerry 836aa74
Eliminate live_streams from chunk_stream_group
bmerry 3703e0d
Fix deadlock on group stop in lossless mode
bmerry e8fa2a7
Eliminate min_head_chunk
bmerry 0806897
Add py-recv-chunk-group reference documentation
bmerry 9608637
Fix some errors in ChunkStreamRingGroup docs
bmerry af9463c
Implement Sequence protocol for ChunkStreamRingGroup
bmerry e60da41
Remove some unused code
bmerry 07d659c
Fix a flake8 error
bmerry d51a852
Add a unit test to validate a return value policy
bmerry c866445
Fix chunk_stream_group deadlock
bmerry f42ed76
Fix error with very large chunk ID
bmerry 5276c95
Refine test_getitem_slice_gc
bmerry 36ef6e0
Fix missing lock
bmerry 66692ec
Remove chunk_stream_group::stream_stop_received
bmerry 8bfdac7
Add an additional assert
bmerry fbf2f7a
Fix up TestChunkStreamRingRing::test_missing_chunks
bmerry 1634bb0
Fix mypy errors about ChunkStreamRingGroup being abstract
bmerry ddb7812
Update changelog for chunk stream groups
bmerry aaad4f4
Bike-shedding on documentation
bmerry File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,4 +5,5 @@ Advanced features | |
:maxdepth: 2 | ||
|
||
recv-chunk | ||
recv-chunk-group | ||
recv-stats |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
Chunking stream groups | ||
====================== | ||
|
||
For an overview, refer to :doc:`recv-chunk-group`. This page is a reference for the | ||
C++ API. | ||
|
||
.. doxygenclass:: spead2::recv::chunk_stream_group_config | ||
:members: | ||
|
||
.. doxygenclass:: spead2::recv::chunk_stream_group | ||
:members: | ||
|
||
.. doxygenclass:: spead2::recv::chunk_stream_group_member | ||
:members: | ||
|
||
Ringbuffer convenience API | ||
-------------------------- | ||
.. doxygenclass:: spead2::recv::chunk_stream_ring_group | ||
:members: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
Synchronisation in chunk stream groups | ||
====================================== | ||
.. cpp:namespace-push:: spead2::recv | ||
|
||
For chunk stream groups to achieve the goal of allowing multi-core scaling, it | ||
is necessary to minimise locking. The implementation achieves this by avoiding | ||
any packet- or heap-granularity locking, and performing locking only at chunk | ||
granularity. Chunks are assumed to be large enough that this minimises total | ||
overhead, although it should be noted that these locks are expected to be | ||
highly contended and there may be further work possible to reduce the | ||
overheads. | ||
|
||
To avoid the need for heap-level locking, each member stream has its own | ||
sliding window with pointers to the chunks, so that heaps which fall inside an | ||
existing chunk can be serviced without locking. However, this causes a problem | ||
when flushing chunks from the group's window: a stream might still be writing | ||
to the chunk at the time. Additionally, it might not be possible to allocate a | ||
new chunk until an old chunk is flushed e.g., if there is a fixed pool of | ||
chunks rather than dynamic allocation. | ||
|
||
The group keeps its own copy of the head positions (oldest chunk) from the | ||
individual streams, protected by the group mutex rather than the stream | ||
mutexes. The group then maintains its head chunk position to match the oldest | ||
head position of any of the member streams. When the group wishes to | ||
evict a chunk, it simply needs to wait for all streams to make enough progress | ||
that the group's head moves past that chunk. | ||
|
||
The wait is achieved using a condition variable that is notified whenever the | ||
head position increases. This allows the group mutex to be dropped while | ||
waiting, which prevents the deadlocks that might otherwise occur if the mutex | ||
was held while waiting and another stream was attemping to lock the group mutex | ||
to make forward progress. | ||
|
||
In lossless eviction mode, this is all that is needed, although it is | ||
non-trivial to see that this won't deadlock with all the streams sitting in | ||
the wait loop waiting for other streams to make forward progress. That this | ||
cannot happen is due to the requirement that the stream's window cannot be | ||
larger than the group's. Consider the active call to | ||
:cpp:func:`chunk_stream_group::get_chunk` with the smallest chunk ID. That | ||
stream is guaranteed to have already readied any chunk due to be evicted from | ||
the group, and the same is true of any other stream that is waiting in | ||
:cpp:func:`~chunk_stream_group::get_chunk`, and so forward progress depends | ||
only on streams that are not blocked in | ||
:cpp:func:`~chunk_stream_group::get_chunk`. | ||
|
||
In lossy eviction mode, we need to make sure that such streams make forward | ||
progress even if no new packets arrive on them. This is achieved by posting an | ||
asynchronous callback to all streams requesting them to flush out chunks that | ||
are now too old. The callback will never reach streams that have already | ||
stopped; we handle this at the time the stream stops, by treating it as having | ||
a head of ``INT64_MAX``. | ||
|
||
While lossless mode is normally allowed to block indefinitely, we do need to | ||
interrupt things in :cpp:func:`chunk_stream_group::stop`. This is handled | ||
similarly to lossy eviction mode, where all streams are requested to flush up | ||
to ``INT64_MAX``. | ||
|
||
.. cpp:namespace-pop:: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
Destruction of receive streams | ||
============================== | ||
The asynchronous and parallel nature of spead2 makes destroying a receive | ||
stream a tricky operation: there may be pending asio completion handlers that | ||
will try to push packets into the stream, leading to a race condition. While | ||
asio guarantees that closing a socket will cancel any pending asynchronous | ||
operations on that socket, this doesn't account for cases where the operation | ||
has already completed but the completion handler is either pending or is | ||
currently running. | ||
|
||
Up to version 3.11, this was handled by a shutdown protocol | ||
between :cpp:class:`spead2::recv::stream` and | ||
:cpp:class:`spead2::recv::reader`. The reader was required to notify the | ||
stream when it had completely shut down, and | ||
:cpp:func:`spead2::recv::stream::stop` would block until all readers had | ||
performed this notification (via a semaphore). This protocol was complicated, | ||
and it relied on the reader being able to make forward progress while the | ||
thread calling :cpp:func:`~spead2::recv::stream::stop` was blocked. | ||
|
||
Newer versions take a different approach based on shared pointers. The ideal | ||
case would be to have the whole stream always managed by a shared pointer, so | ||
that a completion handler that interfaces with the stream could keep a copy of | ||
the shared pointer and thus keep it alive as long as needed. However, that is | ||
not possible to do in a backwards-compatible way. Instead, a minimal set of | ||
fields is placed inside a shared pointer, namely: | ||
|
||
- The ``queue_mutex`` | ||
- A flag indicating whether the stream has stopped. | ||
|
||
For convenience, the flag is encoded as a pointer, which holds either a | ||
pointer to the stream (if not stopped) or a null pointer (if stopped). Each | ||
completion handler holds a shared reference to this structure. When it wishes | ||
to access the stream, it should: | ||
|
||
1. Lock the mutex. | ||
2. Get the pointer back to the stream from the shared structure, aborting if | ||
it gets a null pointer. | ||
3. Manipulate the stream. | ||
4. Drop the mutex. | ||
|
||
This prevents use-after-free errors because the stream cannot be destroyed | ||
without first stopping, and stopping locks the mutex. Hence, the stream cannot | ||
disappear asynchronously during step 3. Note that it can, however, stop | ||
during step 3 if the completion handler causes it to stop. | ||
|
||
Using shared pointers in this way can add overhead because atomically | ||
incrementing and decrementing reference counts can be expensive, particularly | ||
if it causes cache line migrations between processor cores. To minimise | ||
reference count manipulation, the :cpp:class:`~spead2::recv::reader` class | ||
encapsulates this workflow in its | ||
:cpp:class:`~spead2::recv::reader::bind_handler` member function, which | ||
provides the facilities to move the shared pointer along a linear chain of | ||
completion handlers so that the reference count does not need to be | ||
adjusted. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
Developer documentation | ||
======================= | ||
|
||
This section documents internal design decisions that users will generally not | ||
need to be aware of, although some of it may be useful if you plan to subclass | ||
the C++ classes to extend functionality. | ||
|
||
.. toctree:: | ||
:maxdepth: 2 | ||
|
||
dev-recv-destruction | ||
dev-recv-chunk-group |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ Contents: | |
perf | ||
tools | ||
migrate-3 | ||
developer | ||
changelog | ||
license | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
Chunking stream groups | ||
====================== | ||
|
||
For an overview, refer to :doc:`recv-chunk-group`. This page is a reference for the | ||
Python API. It extends the API for :doc:`chunks <py-recv-chunk>`. | ||
|
||
.. py:class:: spead2.recv.ChunkStreamGroupConfig(**kwargs) | ||
|
||
Parameters for a chunk stream group. The configuration options | ||
can either be passed to the constructor (as keyword arguments) or set as | ||
properties after construction. | ||
|
||
:param int max_chunks: | ||
The maximum number of chunks that can be live at the same time. | ||
:param EvictionMode eviction_mode: | ||
The chunk eviction mode. | ||
|
||
.. py:class:: EvictionMode | ||
|
||
Eviction mode when it is necessary to advance the group window. See | ||
the :doc:`overview <recv-chunk-group>` for more details. | ||
|
||
.. py:attribute:: LOSSY | ||
|
||
force streams to release incomplete chunks | ||
|
||
.. py:attribute:: LOSSLESS | ||
|
||
a chunk will only be marked ready when all streams have marked it | ||
ready | ||
|
||
.. py:class:: spead2.recv.ChunkStreamRingGroup(config, data_ringbuffer, free_ringbuffer) | ||
|
||
Stream group that uses ringbuffers to manage chunks. | ||
|
||
When a fresh chunk is needed, it is retrieved from a ringbuffer of free | ||
chunks (the "free ring"). When a chunk is flushed, it is pushed to a "data | ||
ring". These may be shared between groups, but both will be stopped as soon | ||
as any of the members streams are stopped. The intended use case is | ||
parallel groups that are started and stopped together. | ||
|
||
It behaves like a :py:class:`~collections.abc.Sequence` of the contained | ||
streams. | ||
|
||
:param config: Group configuration | ||
:type config: :py:class:`spead2.recv.ChunkStreamGroupConfig` | ||
:param data_ringbuffer: Ringbuffer onto which the completed chunks are placed. | ||
:type data_ringbuffer: :py:class:`spead2.recv.ChunkRingbuffer` | ||
:param free_ringbuffer: Ringbuffer from which new chunks are obtained. | ||
:type free_ringbuffer: :py:class:`spead2.recv.ChunkRingbuffer` | ||
|
||
.. py:attribute:: data_ringbuffer | ||
|
||
The data ringbuffer given to the constructor. | ||
|
||
.. py:attribute:: free_ringbuffer | ||
|
||
The free ringbuffer given to the constructor. | ||
|
||
.. py:method:: add_free_chunk(chunk) | ||
|
||
Add a chunk to the free ringbuffer. This takes care of zeroing out the | ||
:py:attr:`.Chunk.present` array, and it will suppress the | ||
:exc:`spead2.Stopped` exception if the free ringbuffer has been stopped. | ||
|
||
If the free ring is full, it will raise :exc:`spead2.Full` rather than | ||
blocking. The free ringbuffer should be constructed with enough slots that | ||
this does not happen. | ||
|
||
.. py:method:: emplace_back(thread_pool, config, chunk_stream_config) | ||
|
||
Add a new stream. | ||
|
||
:param thread_pool: Thread pool handling the I/O | ||
:type thread_pool: :py:class:`spead2.ThreadPool` | ||
:param config: Stream configuration | ||
:type config: :py:class:`spead2.recv.StreamConfig` | ||
:param chunk_config: Chunking configuration | ||
:type chunk_config: :py:class:`spead2.recv.ChunkStreamConfig` | ||
:rtype: :py:class:`spead2.recv.ChunkStreamGroupMember` | ||
|
||
.. py:class:: spead2.recv.ChunkStreamGroupMember | ||
|
||
A component stream in a :py:class:`~spead2.recv.ChunkStreamRingGroup`. | ||
This class cannot be instantiated directly. Use | ||
:py:meth:`.ChunkStreamRingGroup.emplace_back` instead. | ||
|
||
It provides the same methods for adding readers as | ||
:py:class:`spead2.recv.Stream`. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,3 +21,4 @@ with the C++ backend. | |
py-logging | ||
py-ibverbs | ||
py-recv-chunk | ||
py-recv-chunk-group |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
Chunking stream groups | ||
====================== | ||
.. cpp:namespace-push:: spead2::recv | ||
|
||
While the :doc:`recv-chunk` allows for high-bandwidth streams to be received | ||
with low overhead, it still has a fundamental scaling limitation: each chunk | ||
can only be constructed from a single thread. :dfn:`Chunk stream groups` allow | ||
this overhead to be overcome, although not without caveats. | ||
|
||
Each stream is still limited to a single thread. However, a :dfn:`group` of | ||
streams can share the same sequence of chunks, with each stream contributing | ||
a subset of the data in each chunk. Making use of this feature requires | ||
that load balancing is implemented at the network level, using different | ||
destination addresses or ports so that the incoming heaps can be multiplexed | ||
into multiple streams. | ||
|
||
As with a single chunk stream, the group keeps a sliding window of chunks and | ||
obtains new ones from an allocation callback. When the window slides forward, | ||
chunks that fall out the back of the window are provided to a ready callback. | ||
Each member stream also has its own sliding window, which can be smaller (but not | ||
larger) than the group's window. When the group's window slides forward, the | ||
streams' windows are adjusted to ensure they still fit within the group's | ||
window. In other words, a stream's window determines how much reordering is | ||
tolerated within a stream, while the group's window determines how out-of-sync | ||
the streams are allowed to become. | ||
|
||
When desynchronisation does occur, there is a choice of strategies. The default | ||
strategy is eager but potentially lossy: when the group's window moves forward, | ||
the trailing chunk is marked ready as soon as possible, even if this causes | ||
some stream windows to shrink below their normal size. An alternative strategy | ||
is lossless: when the group's window needs to move forward, it is blocked | ||
until all the member streams have caught up. This latter mode is intended for | ||
use with lossless transports such as TCP. However, if one of the component streams | ||
stops functioning (for example, because it is routed on a network path that is | ||
down) it prevents the entire group from making forward progress. | ||
|
||
The general flow (in C++) is | ||
|
||
1. Create a :cpp:class:`chunk_stream_group_config`. | ||
2. Create a :cpp:class:`chunk_stream_group`. | ||
3. Use :cpp:func:`chunk_stream_group::emplace_back` to | ||
create the streams. | ||
4. Add readers to the streams. | ||
5. Process the data. | ||
6. Optionally, call :cpp:func:`chunk_stream_group::stop()` | ||
(otherwise it will be called on destruction). | ||
7. Destroy the group. | ||
|
||
In Python the process is similar, although garbage collection replaces | ||
explicit destruction. | ||
|
||
Ringbuffer convenience API | ||
-------------------------- | ||
As for standalone chunk streams, there is a simplified API using ringbuffers, | ||
which is also the only API available for Python. A | ||
:cpp:class:`chunk_stream_ring_group` is a group that allocates | ||
data from one ringbuffer and send ready data to another. The description of | ||
:ref:`that api <recv-chunk-ringbuffer>` largely applies here too. The | ||
ringbuffers can be shared between groups. | ||
|
||
Caveats | ||
------- | ||
This is an advanced API that sacrifices some user-friendliness for | ||
performance, and thus some care is needed to use it safely. | ||
|
||
- It is vital that all the streams can make forward progress independently, | ||
as otherwise deadlocks can occur. For example, if they share a thread pool, | ||
the pool must have at least as many threads as streams. It's recommended | ||
that each stream has its own single-threaded thread pool. | ||
- The streams must all be added to the group before adding any readers to | ||
the streams. Once a group has received some data, an exception will be thrown | ||
if one attempts to add a new stream. | ||
- The stream ID associated with each chunk will be the stream ID of one of the | ||
component streams, but it is undefined which one. | ||
- When the allocate and ready callbacks are invoked, it's not specified which | ||
stream's batch statistics pointer will be passed. | ||
- Two streams must not write to the same bytes of a chunk (in the payload, | ||
present array or extra data), as this is undefined behaviour in C++. | ||
- Calling :cpp:func:`~stream::stop` on a member stream will stop the whole | ||
group. | ||
|
||
.. cpp:namespace-pop:: |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I'll need to update the changelog for chunk stream groups and the new shutdown method.