Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunk stream groups #219

Merged
merged 74 commits into from
Jul 9, 2023
Merged

Add chunk stream groups #219

merged 74 commits into from
Jul 9, 2023

Conversation

bmerry
Copy link
Contributor

@bmerry bmerry commented Jul 5, 2023

This allows multiple streams to work on assembling a shared chunk, giving better scalability provided the incoming data can be distributed across streams. A good starting point for review is doc/recv-chunk-group.rst.

This is quite a large PR, because it also incorporates a change in the way streams are shut down.

I'm not totally happy with the amount of testing, but I'm starting the ball rolling, and I can always add more tests before release.

bmerry added 30 commits June 21, 2023 15:03
It wanted setup renamed to setup_method and teardown to teardown_method.
The logic around acquiring and readying chunks is now handed off to a
template parameter helper class. This will simplify plugging in a policy
for stream groups later.
This will simplify reusing some logic for chunk_stream_group.
It still needs some mechanisms to unblock streams that aren't receiving
data, and it's failing unit tests.
This is an attempt to simplify the reader shutdown path. Instead of
stream::stop blocking until the readers signal that their completion
handlers have all run, try to make it safe for the handlers to run after
the stream has shut down.

To do this, the queue_mutex is moved inside an object managed by
shared_ptr, so that the handlers can keep it alive even after the stream
is destroyed. This does end up requiring creation and destruction of a
shared_ptr for every handler invocation, which isn't ideal; perhaps
there is some way to transfer it down the handler chain (custom
executor?).

This also means that reader no longer need a specific `stop` member
function; they just stop things in their destructors, generally
automatically.
Readers typically have a chain of completion handlers, each of which
schedules the next. Previously, each completion handler would create a
new copy of the std::shared_ptr<stream_base::shared_state>, which would
cause atomic refcount twiddling potentially for every packet.

Instead, encapsulate the shared_ptr in an opaque `handler_context`
structure which gets passed along the chain. The `handler_context` is
marked as non-copyable, to ensure that we don't accidentally inc and dec
the ref-count unnecessarily. This revealed that using the legacy
`io_context::post` function doesn't support non-copyable completion
handlers, and we had to use `boost::asio::post` instead. That means
Boost 1.66 will be the minimum version.
The linked list pointers are stored inside the chunks themselves. This
allows the graveyard to grow arbitrarily large without incurring any
memory allocations, which could throw during a destructor.
It's all untested beyond the fact that it compiles.

chunk_ring_pair was split out of chunk_ring_stream to reduce
duplication.
Move from `streams` instead of copying.
clang was (quite rightly I believe) complaining about the derived
class implementation chaining to the base class when the base class
methods were private.
This already exposed a bunch of compilation issues that appear just
because templates are now being instantiated. chunk_ring_stream is
extended to take on the responsibility for making the chunk allocate and
ready functions, simplifying adjust_chunk_config and
adjust_group_config.

The Python wiring is incomplete, as it does not include
chunk_stream_group_config.
This affected the chunk_stream_group feature.
release_chunk_unlocked was nevertheless locking, leading to a
self-deadlock.
add_free_chunk was moved to a chunk_ring_pair base class, but the
signature for the lambda was not updated properly.
Now there is one true window and when something falls off the back, we
wait for all the streams to get there, but using a condition variable
(and dropping the group mutex) to avoid deadlocks.
This would potentially lead to deadlocks.
When a stream flushes a chunk, also flush from the group as far as
possible.
This simplifies some code.

Also removed chunk_stream_state::flush_head since it wasn't necessary.
It could call flush_heap on an empty chunk_window, which is illegal.
bmerry added 22 commits June 29, 2023 14:40
Also add the corresponding getting to the C++ API.

For consistency, I also renamed the `group_config` constructor argument
to `config` in Python.
Reference counts on chunks led to some corner cases that were probably
buggy and definitely hard to reason about, involving either streams with
empty windows (where the head = tail value still impacted which streams
could be allocated in future, but no references were held) and null
chunks.

Instead, have the group keep track of the head_chunk of each stream, so
that it can determine when no stream can *ever* request the chunk we're
about to flush (important in lossless eviction mode).
Instead, re-instate proactive flushing in stream_head_updated, which
automatically handles the problem that live_streams was meant to solve.
There are a few changes here:
- Stopping any stream stops the whole group
- Stopping the group in lossless eviction mode uses async_flush_until to
  ensure that all streams flush their data and hence get unblocked.
It was always the same as chunks.get_head_chunk(), so I just made that
explicit.

Also some documentation tidying.
It was partially implemented, but now it's fully implemented and
registered as a virtual subclass of collections.abc.Sequence.

As part of this, the C++ implementation of __iter__ was removed, in
favour of the mixin version from collections.abc. The latter is more
robust as it won't access undefined memory if the sequence is mutated
while iterating.
Ensures that ChunkStreamRingGroup.__getitem__(slice) returns streams
that keep the group alive.
When fast-forwarding the window, get_chunk was not correctly calling
head_updated to reflect the fast-forward.
A chunk ID close to 2^63 could lead to overflow bugs. Fix it by using
unsigned chunk IDs when dealing with head and tail chunk IDs. This does
require some more careful handling for determining whether a chunk ID is
behind the head, since it's a comparison of a signed and an unsigned
value.
Apart from fixing a spurious flake8 warning, this strengthens the test.
Classic mistake of initialising the lock_guard as a temporary (which
immediately evaporates) instead of as a variable.
It's still there, but a no-op, since
chunk_stream_group_member::stop_received takes care of advancing the
chunk id window to UINT64_MAX.

Also removed stream_head_updated_unlocked since it didn't need to exist
as a separate function (folded into single caller).
It was failing because the indexing wasn't accounting for the missing
chunks.
:cpp:class:`spead2::recv::chunk_stream` would modify the config. Passing
the same config to construct two chunk streams would fail with an error.
- Fix the type annotation for the :py:class:`~.ChunkRingStream` constructor:
the parameter name for `chunk_stream_config` was incorrect.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I'll need to update the changelog for chunk stream groups and the new shutdown method.

Sequence considers __getitem__ and __len__ to be abstract, so they need
to be repeated as concrete in the subclass.
Copy link
Contributor

@james-smith-za james-smith-za left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all honesty I don't have a lot to contribute here apart from a little spelling / punctuation bike-shedding.

I am not familiar enough (or at all really) with the internals of spead2 in order to be able to comment on the nitty gritty.
What I will say though was that the user-level documentation was pretty clear, and the examples were easy enough to follow. My C++ is good enough to follow that one at least.

Comment on lines +46 to +50
/* The definition order is important here: the buffer must outlive the peer
* socket, so that the destructor cancels an asynchronous buffer read
* before the buffer is destroyed.
*
* Similarly, the accepter must be destroyed before the peer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I wouldn't have thought that the order made a difference.

doc/recv-chunk-group.rst Outdated Show resolved Hide resolved
doc/recv-chunk-group.rst Outdated Show resolved Hide resolved
bmerry and others added 2 commits July 9, 2023 11:31
Co-authored-by: James Smith <james-smith-za@users.noreply.github.com>
@bmerry bmerry merged commit f31c615 into master Jul 9, 2023
16 checks passed
@bmerry bmerry deleted the chunk_stream_group branch July 9, 2023 09:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants