Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunk stream groups #219

Merged
merged 74 commits into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
bc22a99
Fix some pytest deprecation warnings
bmerry Jun 21, 2023
feea31e
Refactor chunk_stream_state
bmerry Jun 19, 2023
dfcdddb
Move template code into header file
bmerry Jun 19, 2023
38e23db
Factor out chunk_window
bmerry Jun 19, 2023
1c5c176
Work in progress on chunk stream groups
bmerry Jun 19, 2023
3fe8ccf
WIP on shutdown rewrite
bmerry Jun 20, 2023
5befeaa
Eliminate some std::shared_ptr refcount twiddling
bmerry Jun 21, 2023
c24dddd
Fix signedness of a chunk_id parameter
bmerry Jun 21, 2023
3e1735c
Change chunk graveyard to a linked list
bmerry Jun 21, 2023
e647275
Write a bunch more code for chunk_stream_ring_group
bmerry Jun 21, 2023
c763ba1
Avoid allocating memory in chunk_stream_group::stop
bmerry Jun 22, 2023
cad9f0f
Make some method protected not private
bmerry Jun 22, 2023
d627bba
Don't change methods from protected to private when overriding
bmerry Jun 22, 2023
f8fe714
Start wiring up chunk_stream_group to Python
bmerry Jun 22, 2023
c463a8e
Suppress a warning in release builds
bmerry Jun 22, 2023
cfb915f
Add Python wrapper for chunk_stream_group_config
bmerry Jun 22, 2023
80c3c90
More Python wiring/fixups for chunk_stream_group
bmerry Jun 22, 2023
c1b3c99
Fix mismatch of ::operator new / operator delete[]
bmerry Jun 22, 2023
5115ca9
Fix incorrect indexing in some py::keep_alive call
bmerry Jun 22, 2023
b92c237
Fix a typo in an error message
bmerry Jun 22, 2023
2ba5978
Make chunk_stream_group constructor explicit
bmerry Jun 22, 2023
aa1f03e
Fix constructor for chunk_stream_group
bmerry Jun 22, 2023
672b8f5
Fix incorrect locking in chunk_stream_group
bmerry Jun 22, 2023
cf76a8e
Make Python chunk_stream_group.add_free_chunk work
bmerry Jun 22, 2023
d15baee
Another redesign of chunk_stream_group synchronisation
bmerry Jun 23, 2023
eca67b7
Prevent stream's max_chunks from exceeding the group's
bmerry Jun 23, 2023
108a4bf
Ensure that stopping group member streams doesn't lose chunks
bmerry Jun 23, 2023
f91d946
Add initial unit tests for chunk_stream_group
bmerry Jun 23, 2023
d2da046
Add chunk_window::empty helper
bmerry Jun 23, 2023
c6b888f
Fix bug in stream group flushing
bmerry Jun 23, 2023
3a9fcbc
Add chunk_stream_group test where one stream does nothing
bmerry Jun 23, 2023
88eca2d
Add overview documentation for chunk_stream_group
bmerry Jun 23, 2023
9339d57
Add chunk_stream_group_config::eviction_mode
bmerry Jun 23, 2023
5fb5be8
Make chunk_stream_group own the streams
bmerry Jun 26, 2023
714f5c3
Add ChunkRingPair to public interface
bmerry Jun 26, 2023
dda788c
Fix Python wrapper for chunk_ring_group
bmerry Jun 26, 2023
a62fef2
Fix a bug with unwanted sharing of stats between stream_config
bmerry Jun 27, 2023
c67734c
Update recv/__init__.pyi for chunk stream groups
bmerry Jun 27, 2023
1dd4c97
Fix type annotation for ChunkRingStream constructor
bmerry Jun 27, 2023
caebb33
Make stream_base::shared_state private
bmerry Jun 27, 2023
3c085f7
Fix some errors from doxygen
bmerry Jun 28, 2023
bdd4c3b
Add reference C++ documentation for chunk stream groups
bmerry Jun 28, 2023
15ac3d7
More documentation chunk chunk stream group C++ API
bmerry Jun 28, 2023
6e1eb3b
Doc that chunk_stream_group presents a vector-like interface
bmerry Jun 29, 2023
19936e1
Remove proactive flushing in chunk_stream_group
bmerry Jun 29, 2023
3482abf
Simplify flushing in chunk_stream_group::get_chunk
bmerry Jun 29, 2023
2cddd9f
Fix corner case in async_flush_until
bmerry Jun 29, 2023
afab1f7
Reduce number of calls to async_flush_until
bmerry Jun 29, 2023
53927e7
Remove unnecessary reset of head_pos = tail_pos = 0
bmerry Jun 29, 2023
186dea6
Add some developer documentation
bmerry Jun 29, 2023
8405e45
Add example C++ program using chunk_stream_ring_group
bmerry Jun 29, 2023
ac79277
Add Python example code for ChunkStreamRingGroup
bmerry Jun 29, 2023
1883687
Fix a null pointer dereference bug
bmerry Jun 30, 2023
015db87
Add `config` property to ChunkStreamRingGroup
bmerry Jun 30, 2023
64ba802
chunk_stream_group: remove refcounts
bmerry Jun 30, 2023
836aa74
Eliminate live_streams from chunk_stream_group
bmerry Jun 30, 2023
3703e0d
Fix deadlock on group stop in lossless mode
bmerry Jun 30, 2023
e8fa2a7
Eliminate min_head_chunk
bmerry Jun 30, 2023
0806897
Add py-recv-chunk-group reference documentation
bmerry Jul 3, 2023
9608637
Fix some errors in ChunkStreamRingGroup docs
bmerry Jul 3, 2023
af9463c
Implement Sequence protocol for ChunkStreamRingGroup
bmerry Jul 3, 2023
e60da41
Remove some unused code
bmerry Jul 4, 2023
07d659c
Fix a flake8 error
bmerry Jul 4, 2023
d51a852
Add a unit test to validate a return value policy
bmerry Jul 4, 2023
c866445
Fix chunk_stream_group deadlock
bmerry Jul 4, 2023
f42ed76
Fix error with very large chunk ID
bmerry Jul 4, 2023
5276c95
Refine test_getitem_slice_gc
bmerry Jul 4, 2023
36ef6e0
Fix missing lock
bmerry Jul 4, 2023
66692ec
Remove chunk_stream_group::stream_stop_received
bmerry Jul 4, 2023
8bfdac7
Add an additional assert
bmerry Jul 4, 2023
fbf2f7a
Fix up TestChunkStreamRingRing::test_missing_chunks
bmerry Jul 4, 2023
1634bb0
Fix mypy errors about ChunkStreamRingGroup being abstract
bmerry Jul 6, 2023
ddb7812
Update changelog for chunk stream groups
bmerry Jul 9, 2023
aaad4f4
Bike-shedding on documentation
bmerry Jul 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/advanced.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ Advanced features
:maxdepth: 2

recv-chunk
recv-chunk-group
recv-stats
13 changes: 13 additions & 0 deletions doc/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,21 @@ Changelog

.. rubric:: Development version

- Add support for :doc:`recv-chunk-group` to assemble chunks in parallel.
- Simplify the way receive streams shut down. Users should not notice any
change, but custom reader implementations will need to be updated.
- Update :meth:`!test_async_flush` and :meth:`!test_async_flush_fail` to keep
handles to async tasks, to prevent them being garbage collected too early.
- Fix a bug where copying a :cpp:class:`spead2::recv::stream_config` would not
deep copy the names of custom statistics, and so any statistics added to the
copy would also affect the original, and there were also potential race
conditions if a stream config was modified while holding stream statistics.
- Fix a bug (caused by the bug above) where passing a
:cpp:class:`spead2::recv::stream_config` to construct a
:cpp:class:`spead2::recv::chunk_stream` would modify the config. Passing
the same config to construct two chunk streams would fail with an error.
- Fix the type annotation for the :py:class:`~.ChunkRingStream` constructor:
the parameter name for `chunk_stream_config` was incorrect.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'll need to update the changelog for chunk stream groups and the new shutdown method.


.. rubric:: 3.11.1

Expand Down
19 changes: 19 additions & 0 deletions doc/cpp-recv-chunk-group.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Chunking stream groups
======================

For an overview, refer to :doc:`recv-chunk-group`. This page is a reference for the
C++ API.

.. doxygenclass:: spead2::recv::chunk_stream_group_config
:members:

.. doxygenclass:: spead2::recv::chunk_stream_group
:members:

.. doxygenclass:: spead2::recv::chunk_stream_group_member
:members:

Ringbuffer convenience API
--------------------------
.. doxygenclass:: spead2::recv::chunk_stream_ring_group
:members:
1 change: 1 addition & 0 deletions doc/cpp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ search path, and you need to set :envvar:`PKG_CONFIG_PATH` to
cpp-logging
cpp-ibverbs
cpp-recv-chunk
cpp-recv-chunk-group
58 changes: 58 additions & 0 deletions doc/dev-recv-chunk-group.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
Synchronisation in chunk stream groups
======================================
.. cpp:namespace-push:: spead2::recv

For chunk stream groups to achieve the goal of allowing multi-core scaling, it
is necessary to minimise locking. The implementation achieves this by avoiding
any packet- or heap-granularity locking, and performing locking only at chunk
granularity. Chunks are assumed to be large enough that this minimises total
overhead, although it should be noted that these locks are expected to be
highly contended and there may be further work possible to reduce the
overheads.

To avoid the need for heap-level locking, each member stream has its own
sliding window with pointers to the chunks, so that heaps which fall inside an
existing chunk can be serviced without locking. However, this causes a problem
when flushing chunks from the group's window: a stream might still be writing
to the chunk at the time. Additionally, it might not be possible to allocate a
new chunk until an old chunk is flushed e.g., if there is a fixed pool of
chunks rather than dynamic allocation.

The group keeps its own copy of the head positions (oldest chunk) from the
individual streams, protected by the group mutex rather than the stream
mutexes. The group then maintains its head chunk position to match the oldest
head position of any of the member streams. When the group wishes to
evict a chunk, it simply needs to wait for all streams to make enough progress
that the group's head moves past that chunk.

The wait is achieved using a condition variable that is notified whenever the
head position increases. This allows the group mutex to be dropped while
waiting, which prevents the deadlocks that might otherwise occur if the mutex
was held while waiting and another stream was attemping to lock the group mutex
to make forward progress.

In lossless eviction mode, this is all that is needed, although it is
non-trivial to see that this won't deadlock with all the streams sitting in
the wait loop waiting for other streams to make forward progress. That this
cannot happen is due to the requirement that the stream's window cannot be
larger than the group's. Consider the active call to
:cpp:func:`chunk_stream_group::get_chunk` with the smallest chunk ID. That
stream is guaranteed to have already readied any chunk due to be evicted from
the group, and the same is true of any other stream that is waiting in
:cpp:func:`~chunk_stream_group::get_chunk`, and so forward progress depends
only on streams that are not blocked in
:cpp:func:`~chunk_stream_group::get_chunk`.

In lossy eviction mode, we need to make sure that such streams make forward
progress even if no new packets arrive on them. This is achieved by posting an
asynchronous callback to all streams requesting them to flush out chunks that
are now too old. The callback will never reach streams that have already
stopped; we handle this at the time the stream stops, by treating it as having
a head of ``INT64_MAX``.

While lossless mode is normally allowed to block indefinitely, we do need to
interrupt things in :cpp:func:`chunk_stream_group::stop`. This is handled
similarly to lossy eviction mode, where all streams are requested to flush up
to ``INT64_MAX``.

.. cpp:namespace-pop::
54 changes: 54 additions & 0 deletions doc/dev-recv-destruction.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
Destruction of receive streams
==============================
The asynchronous and parallel nature of spead2 makes destroying a receive
stream a tricky operation: there may be pending asio completion handlers that
will try to push packets into the stream, leading to a race condition. While
asio guarantees that closing a socket will cancel any pending asynchronous
operations on that socket, this doesn't account for cases where the operation
has already completed but the completion handler is either pending or is
currently running.

Up to version 3.11, this was handled by a shutdown protocol
between :cpp:class:`spead2::recv::stream` and
:cpp:class:`spead2::recv::reader`. The reader was required to notify the
stream when it had completely shut down, and
:cpp:func:`spead2::recv::stream::stop` would block until all readers had
performed this notification (via a semaphore). This protocol was complicated,
and it relied on the reader being able to make forward progress while the
thread calling :cpp:func:`~spead2::recv::stream::stop` was blocked.

Newer versions take a different approach based on shared pointers. The ideal
case would be to have the whole stream always managed by a shared pointer, so
that a completion handler that interfaces with the stream could keep a copy of
the shared pointer and thus keep it alive as long as needed. However, that is
not possible to do in a backwards-compatible way. Instead, a minimal set of
fields is placed inside a shared pointer, namely:

- The ``queue_mutex``
- A flag indicating whether the stream has stopped.

For convenience, the flag is encoded as a pointer, which holds either a
pointer to the stream (if not stopped) or a null pointer (if stopped). Each
completion handler holds a shared reference to this structure. When it wishes
to access the stream, it should:

1. Lock the mutex.
2. Get the pointer back to the stream from the shared structure, aborting if
it gets a null pointer.
3. Manipulate the stream.
4. Drop the mutex.

This prevents use-after-free errors because the stream cannot be destroyed
without first stopping, and stopping locks the mutex. Hence, the stream cannot
disappear asynchronously during step 3. Note that it can, however, stop
during step 3 if the completion handler causes it to stop.

Using shared pointers in this way can add overhead because atomically
incrementing and decrementing reference counts can be expensive, particularly
if it causes cache line migrations between processor cores. To minimise
reference count manipulation, the :cpp:class:`~spead2::recv::reader` class
encapsulates this workflow in its
:cpp:class:`~spead2::recv::reader::bind_handler` member function, which
provides the facilities to move the shared pointer along a linear chain of
completion handlers so that the reference count does not need to be
adjusted.
12 changes: 12 additions & 0 deletions doc/developer.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Developer documentation
=======================

This section documents internal design decisions that users will generally not
need to be aware of, although some of it may be useful if you plan to subclass
the C++ classes to extend functionality.

.. toctree::
:maxdepth: 2

dev-recv-destruction
dev-recv-chunk-group
1 change: 1 addition & 0 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Contents:
perf
tools
migrate-3
developer
changelog
license

Expand Down
89 changes: 89 additions & 0 deletions doc/py-recv-chunk-group.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
Chunking stream groups
======================

For an overview, refer to :doc:`recv-chunk-group`. This page is a reference for the
Python API. It extends the API for :doc:`chunks <py-recv-chunk>`.

.. py:class:: spead2.recv.ChunkStreamGroupConfig(**kwargs)

Parameters for a chunk stream group. The configuration options
can either be passed to the constructor (as keyword arguments) or set as
properties after construction.

:param int max_chunks:
The maximum number of chunks that can be live at the same time.
:param EvictionMode eviction_mode:
The chunk eviction mode.

.. py:class:: EvictionMode

Eviction mode when it is necessary to advance the group window. See
the :doc:`overview <recv-chunk-group>` for more details.

.. py:attribute:: LOSSY

force streams to release incomplete chunks

.. py:attribute:: LOSSLESS

a chunk will only be marked ready when all streams have marked it
ready

.. py:class:: spead2.recv.ChunkStreamRingGroup(config, data_ringbuffer, free_ringbuffer)

Stream group that uses ringbuffers to manage chunks.

When a fresh chunk is needed, it is retrieved from a ringbuffer of free
chunks (the "free ring"). When a chunk is flushed, it is pushed to a "data
ring". These may be shared between groups, but both will be stopped as soon
as any of the members streams are stopped. The intended use case is
parallel groups that are started and stopped together.

It behaves like a :py:class:`~collections.abc.Sequence` of the contained
streams.

:param config: Group configuration
:type config: :py:class:`spead2.recv.ChunkStreamGroupConfig`
:param data_ringbuffer: Ringbuffer onto which the completed chunks are placed.
:type data_ringbuffer: :py:class:`spead2.recv.ChunkRingbuffer`
:param free_ringbuffer: Ringbuffer from which new chunks are obtained.
:type free_ringbuffer: :py:class:`spead2.recv.ChunkRingbuffer`

.. py:attribute:: data_ringbuffer

The data ringbuffer given to the constructor.

.. py:attribute:: free_ringbuffer

The free ringbuffer given to the constructor.

.. py:method:: add_free_chunk(chunk)

Add a chunk to the free ringbuffer. This takes care of zeroing out the
:py:attr:`.Chunk.present` array, and it will suppress the
:exc:`spead2.Stopped` exception if the free ringbuffer has been stopped.

If the free ring is full, it will raise :exc:`spead2.Full` rather than
blocking. The free ringbuffer should be constructed with enough slots that
this does not happen.

.. py:method:: emplace_back(thread_pool, config, chunk_stream_config)

Add a new stream.

:param thread_pool: Thread pool handling the I/O
:type thread_pool: :py:class:`spead2.ThreadPool`
:param config: Stream configuration
:type config: :py:class:`spead2.recv.StreamConfig`
:param chunk_config: Chunking configuration
:type chunk_config: :py:class:`spead2.recv.ChunkStreamConfig`
:rtype: :py:class:`spead2.recv.ChunkStreamGroupMember`

.. py:class:: spead2.recv.ChunkStreamGroupMember

A component stream in a :py:class:`~spead2.recv.ChunkStreamRingGroup`.
This class cannot be instantiated directly. Use
:py:meth:`.ChunkStreamRingGroup.emplace_back` instead.

It provides the same methods for adding readers as
:py:class:`spead2.recv.Stream`.
16 changes: 8 additions & 8 deletions doc/py-recv-chunk.rst
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,18 @@ Reference

.. py:attribute:: data_ringbuffer

The data ringbuffer given to the constructor.
The data ringbuffer given to the constructor.

.. py:attribute:: free_ringbuffer

The free ringbuffer given to the constructor.
The free ringbuffer given to the constructor.

.. py:method:: add_free_chunk(chunk)

Add a chunk to the free ringbuffer. This takes care of zeroing out the
:py:attr:`.Chunk.present` array, and it will suppress the
:exc:`spead2.Stopped` exception if the free ringbuffer has been stopped.
Add a chunk to the free ringbuffer. This takes care of zeroing out the
:py:attr:`.Chunk.present` array, and it will suppress the
:exc:`spead2.Stopped` exception if the free ringbuffer has been stopped.

If the free ring is full, it will raise :exc:`spead2.Full` rather than
blocking. The free ringbuffer should be constructed with enough slots that
this does not happen.
If the free ring is full, it will raise :exc:`spead2.Full` rather than
blocking. The free ringbuffer should be constructed with enough slots that
this does not happen.
1 change: 1 addition & 0 deletions doc/py.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ with the C++ backend.
py-logging
py-ibverbs
py-recv-chunk
py-recv-chunk-group
82 changes: 82 additions & 0 deletions doc/recv-chunk-group.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
Chunking stream groups
======================
.. cpp:namespace-push:: spead2::recv

While the :doc:`recv-chunk` allows for high-bandwidth streams to be received
with low overhead, it still has a fundamental scaling limitation: each chunk
can only be constructed from a single thread. :dfn:`Chunk stream groups` allow
this overhead to be overcome, although not without caveats.

Each stream is still limited to a single thread. However, a :dfn:`group` of
streams can share the same sequence of chunks, with each stream contributing
a subset of the data in each chunk. Making use of this feature requires
that load balancing is implemented at the network level, using different
destination addresses or ports so that the incoming heaps can be multiplexed
into multiple streams.

As with a single chunk stream, the group keeps a sliding window of chunks and
obtains new ones from an allocation callback. When the window slides forward,
chunks that fall out the back of the window are provided to a ready callback.
Each member stream also has its own sliding window, which can be smaller (but not
larger) than the group's window. When the group's window slides forward, the
streams' windows are adjusted to ensure they still fit within the group's
window. In other words, a stream's window determines how much reordering is
tolerated within a stream, while the group's window determines how out-of-sync
the streams are allowed to become.

When desynchronisation does occur, there is a choice of strategies. The default
strategy is eager but potentially lossy: when the group's window moves forward,
the trailing chunk is marked ready as soon as possible, even if this causes
some stream windows to shrink below their normal size. An alternative strategy
is lossless: when the group's window needs to move forward, it is blocked
until all the member streams have caught up. This latter mode is intended for
use with lossless transports such as TCP. However, if one of the component streams
stops functioning (for example, because it is routed on a network path that is
down) it prevents the entire group from making forward progress.

The general flow (in C++) is

1. Create a :cpp:class:`chunk_stream_group_config`.
2. Create a :cpp:class:`chunk_stream_group`.
3. Use :cpp:func:`chunk_stream_group::emplace_back` to
create the streams.
4. Add readers to the streams.
5. Process the data.
6. Optionally, call :cpp:func:`chunk_stream_group::stop()`
(otherwise it will be called on destruction).
7. Destroy the group.

In Python the process is similar, although garbage collection replaces
explicit destruction.

Ringbuffer convenience API
--------------------------
As for standalone chunk streams, there is a simplified API using ringbuffers,
which is also the only API available for Python. A
:cpp:class:`chunk_stream_ring_group` is a group that allocates
data from one ringbuffer and send ready data to another. The description of
:ref:`that api <recv-chunk-ringbuffer>` largely applies here too. The
ringbuffers can be shared between groups.

Caveats
-------
This is an advanced API that sacrifices some user-friendliness for
performance, and thus some care is needed to use it safely.

- It is vital that all the streams can make forward progress independently,
as otherwise deadlocks can occur. For example, if they share a thread pool,
the pool must have at least as many threads as streams. It's recommended
that each stream has its own single-threaded thread pool.
- The streams must all be added to the group before adding any readers to
the streams. Once a group has received some data, an exception will be thrown
if one attempts to add a new stream.
- The stream ID associated with each chunk will be the stream ID of one of the
component streams, but it is undefined which one.
- When the allocate and ready callbacks are invoked, it's not specified which
stream's batch statistics pointer will be passed.
- Two streams must not write to the same bytes of a chunk (in the payload,
present array or extra data), as this is undefined behaviour in C++.
- Calling :cpp:func:`~stream::stop` on a member stream will stop the whole
group.

.. cpp:namespace-pop::
Loading