Skip to content

Commit

Permalink
chunk_stream_group: remove refcounts
Browse files Browse the repository at this point in the history
Reference counts on chunks led to some corner cases that were probably
buggy and definitely hard to reason about, involving either streams with
empty windows (where the head = tail value still impacted which streams
could be allocated in future, but no references were held) and null
chunks.

Instead, have the group keep track of the head_chunk of each stream, so
that it can determine when no stream can *ever* request the chunk we're
about to flush (important in lossless eviction mode).
  • Loading branch information
bmerry committed Jun 30, 2023
1 parent 015db87 commit 64ba802
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 122 deletions.
20 changes: 10 additions & 10 deletions doc/dev-recv-chunk-group.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ to the chunk at the time. Additionally, it might not be possible to allocate a
new chunk until an old chunk is flushed e.g., if there is a fixed pool of
chunks rather than dynamic allocation.

Each chunk has a reference count, indicating the number of streams that still
have the chunk in their window. This reference count is non-atomic since it is
protected by the group's mutex. When the group wishes to evict a chunk, it
first needs to wait for the reference count of the head chunk to drop to zero.
It needs a way to be notified that it should try again, which is provided by a
condition variable. Using a condition variable (rather than, say, replacing
the simple reference count with a semaphore) allows the group mutex to be
dropped while waiting, which prevents the deadlocks that might otherwise occur
if the mutex was held while waiting and another stream was attemping to lock
the group mutex to make forward progress.
The group keeps its own copy of the head pointers (oldest heap) from the
individual streams, protected by the group mutex rather than the stream
mutexes. This allows the group to track the oldest chunk that any stream owns
or may potentially own in future (``min_head_chunk``). When the group wishes to
evict a chunk, it first needs to wait for ``min_head_chunk`` to become greater
than the ID of the chunk to be evicted. The wait is achieved using a condition
variable that is notified whenever ``min_head_chunk`` increases. This allows
the group mutex to be dropped while waiting, which prevents the deadlocks that
might otherwise occur if the mutex was held while waiting and another stream
was attemping to lock the group mutex to make forward progress.

In lossless eviction mode, this is all that is needed, although it is
non-trivial to see that this won't deadlock with all the streams sitting in
Expand Down
6 changes: 3 additions & 3 deletions doc/recv-chunk-group.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ performance, and thus some care is needed to use it safely.
as otherwise deadlocks can occur. For example, if they share a thread pool,
the pool must have at least as many threads as streams. It's recommended
that each stream has its own single-threaded thread pool.
- The streams should all be added to the group before adding any readers to
the streams. Things will probably work even if this is not done, but the
design is sufficiently complicated that it is not advisable.
- The streams must all be added to the group before adding any readers to
the streams. Once data has group has received some data, an exception will
be thrown if one attempts to add a new stream.
- The stream ID associated with each chunk will be the stream ID of one of the
component streams, but it is undefined which one.
- When the allocate and ready callbacks are invoked, it's not specified which
Expand Down
68 changes: 45 additions & 23 deletions include/spead2/recv_chunk_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@ class chunk
friend class chunk_stream_group;
template<typename DataRingbuffer, typename FreeRingbuffer> friend class detail::chunk_ring_pair;
private:
/**
* Reference count for chunks belonging to stream groups.
*
* This must only be manipulated from a single thread at a time e.g.
* with the group's mutex locked.
*/
std::size_t ref_count = 0;

/// Linked list of chunks to dispose of at shutdown
std::unique_ptr<chunk> graveyard_next;

Expand Down Expand Up @@ -247,14 +239,38 @@ class chunk_window
head_pos = 0; // wrap around the circular buffer
}

/// Send the oldest chunk to the ready callback
template<typename F1, typename F2>
void flush_head(const F1 &ready_chunk, const F2 &head_updated)
{
flush_head(ready_chunk);
head_updated(head_chunk);
}

/// Send all the chunks to the ready callback
template<typename F1, typename F2>
void flush_all(const F1 &ready_chunk, const F2 &head_updated)
{
if (!empty())
{
while (!empty())
flush_head(ready_chunk);
head_updated(head_chunk);
}
}

/// Flush until the head is at least @a target
template<typename F>
void flush_until(std::int64_t target, const F &ready_chunk)
template<typename F1, typename F2>
void flush_until(std::int64_t target, const F1 &ready_chunk, const F2 &head_updated)
{
while (head_chunk != tail_chunk && head_chunk < target)
flush_head(ready_chunk);
if (head_chunk == tail_chunk && head_chunk < target)
head_chunk = tail_chunk = target;
if (head_chunk < target)
{
while (head_chunk != tail_chunk && head_chunk < target)
flush_head(ready_chunk);
if (head_chunk == tail_chunk && head_chunk < target)
head_chunk = tail_chunk = target;
head_updated(target);
}
}

explicit chunk_window(std::size_t max_chunks);
Expand Down Expand Up @@ -282,11 +298,14 @@ class chunk_window
* Obtain a pointer to a chunk with ID @a chunk_id.
*
* If @a chunk_id is behind the window, returns nullptr. If it is ahead of
* the window, the window is advanced using @a ready_chunk and @a allocate_chunk.
* the window, the window is advanced using @a allocate_chunk and
* @a ready_chunk. If the head_chunk is updated, the new value is passed to
* @a head_updated.
*/
template<typename F1, typename F2>
template<typename F1, typename F2, typename F3>
chunk *get_chunk(
std::int64_t chunk_id, std::uintptr_t stream_id, const F1 &allocate_chunk, const F2 &ready_chunk)
std::int64_t chunk_id, std::uintptr_t stream_id,
const F1 &allocate_chunk, const F2 &ready_chunk, const F3 &head_updated)
{
const std::size_t max_chunks = chunks.size();
if (chunk_id >= head_chunk)
Expand All @@ -300,14 +319,13 @@ class chunk_window
* We leave it to the while loop below to actually allocate
* the chunks.
*/
while (!empty())
flush_head(ready_chunk);
flush_all(ready_chunk, head_updated);
head_chunk = tail_chunk = chunk_id - (max_chunks - 1);
}
while (chunk_id >= tail_chunk)
{
if (std::size_t(tail_chunk - head_chunk) == max_chunks)
flush_head(ready_chunk);
flush_head(ready_chunk, head_updated);
chunks[tail_pos] = allocate_chunk(tail_chunk);
if (chunks[tail_pos])
{
Expand Down Expand Up @@ -459,6 +477,7 @@ class chunk_manager_simple
std::uint64_t *get_batch_stats(chunk_stream_state<chunk_manager_simple> &state) const;
chunk *allocate_chunk(chunk_stream_state<chunk_manager_simple> &state, std::int64_t chunk_id);
void ready_chunk(chunk_stream_state<chunk_manager_simple> &state, chunk *c);
void head_updated(chunk_stream_state<chunk_manager_simple> &state, std::int64_t head_chunk) {}
};

/**
Expand Down Expand Up @@ -682,8 +701,10 @@ stream_config chunk_stream_state<CM>::adjust_config(const stream_config &config)
template<typename CM>
void chunk_stream_state<CM>::flush_chunks()
{
while (!chunks.empty())
chunks.flush_head([this](chunk *c) { chunk_manager.ready_chunk(*this, c); });
chunks.flush_all(
[this](chunk *c) { chunk_manager.ready_chunk(*this, c); },
[this](std::int64_t head_chunk) { chunk_manager.head_updated(*this, head_chunk); }
);
}

template<typename CM>
Expand Down Expand Up @@ -754,7 +775,8 @@ chunk_stream_state<CM>::allocate(std::size_t size, const packet_header &packet)
chunk_id,
stream_id,
[this](std::int64_t chunk_id) { return chunk_manager.allocate_chunk(*this, chunk_id); },
[this](chunk *c) { chunk_manager.ready_chunk(*this, c); }
[this](chunk *c) { chunk_manager.ready_chunk(*this, c); },
[this](std::int64_t head_chunk) { chunk_manager.head_updated(*this, head_chunk); }
);
if (chunk_ptr)
{
Expand Down
50 changes: 35 additions & 15 deletions include/spead2/recv_chunk_stream_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <condition_variable>
#include <mutex>
#include <memory>
#include <stdexcept>
#include <boost/iterator/transform_iterator.hpp>
#include <spead2/recv_stream.h>
#include <spead2/recv_chunk_stream.h>
Expand Down Expand Up @@ -103,7 +104,8 @@ class chunk_manager_group

std::uint64_t *get_batch_stats(chunk_stream_state<chunk_manager_group> &state) const;
chunk *allocate_chunk(chunk_stream_state<chunk_manager_group> &state, std::int64_t chunk_id);
void ready_chunk(chunk_stream_state<chunk_manager_group> &state, chunk *c);
void ready_chunk(chunk_stream_state<chunk_manager_group> &state, chunk *c) {}
void head_updated(chunk_stream_state<chunk_manager_group> &state, std::int64_t head_chunk);
};

} // namespace detail
Expand Down Expand Up @@ -152,6 +154,15 @@ class chunk_stream_group
*/
std::vector<std::unique_ptr<chunk_stream_group_member>> streams;

/**
* Copy of the head chunk ID from each stream. This copy is protected by
* the group's mutex rather than the streams'.
*/
std::vector<std::int64_t> head_chunks;

/// Minimum element of head_chunks
std::int64_t min_head_chunk = 0;

/**
* Last value passed to all streams' async_flush_until.
*/
Expand All @@ -172,18 +183,22 @@ class chunk_stream_group
chunk *get_chunk(std::int64_t chunk_id, std::uintptr_t stream_id, std::uint64_t *batch_stats);

/**
* Decrement chunk reference count.
*
* If the reference count reaches zero, the chunk is valid to pass to
* the ready callback.
*
* This function is thread-safe.
* Update the head_chunk copy for a stream. This version assumes the caller takes
* the mutex, and is only used internally.
*/
void release_chunk(chunk *c, std::uint64_t *batch_stats);
void stream_head_updated_unlocked(chunk_stream_group_member &s, std::int64_t head_chunk);

/**
* Called by a stream to report movement in its head pointer. This function
* takes the group mutex.
*/
void stream_head_updated(chunk_stream_group_member &s, std::int64_t head_chunk);

/**
* Pass a chunk to the user-provided ready function. The caller is
* responsible for ensuring that c->ref_count is zero.
* responsible for ensuring that the chunk is no longer in use.
*
* The caller must hold the group mutex.
*/
void ready_chunk(chunk *c, std::uint64_t *batch_stats);

Expand Down Expand Up @@ -295,6 +310,7 @@ class chunk_stream_group_member : private detail::chunk_stream_state<detail::chu

private:
chunk_stream_group &group; // TODO: redundant - also stored inside the manager
const std::size_t group_index; ///< Position of the chunk within the group

virtual void heap_ready(live_heap &&) override;

Expand All @@ -318,6 +334,7 @@ class chunk_stream_group_member : private detail::chunk_stream_state<detail::chu
* ignored, and the group's callbacks are used instead.
*
* @param group Group to which this stream belongs
* @param index Position of this stream within the group
* @param io_service I/O service (also used by the readers).
* @param config Basic stream configuration
* @param chunk_config Configuration for chunking
Expand All @@ -327,6 +344,7 @@ class chunk_stream_group_member : private detail::chunk_stream_state<detail::chu
*/
chunk_stream_group_member(
chunk_stream_group &group,
std::size_t group_index,
io_service_ref io_service,
const stream_config &config,
const chunk_stream_config &chunk_config);
Expand Down Expand Up @@ -390,15 +408,17 @@ template<typename T, typename... Args>
T &chunk_stream_group::emplace_back(Args&&... args)
{
std::lock_guard<std::mutex> lock(mutex);
std::unique_ptr<chunk_stream_group_member> stream(new T(*this, std::forward<Args>(args)...));
if (chunks.get_tail_chunk() != 0 || last_flush_until != 0)
{
throw std::runtime_error("Cannot add a stream after group has started receiving data");
}
std::unique_ptr<chunk_stream_group_member> stream(new T(
*this, streams.size(), std::forward<Args>(args)...));
chunk_stream_group_member &ret = *stream;
streams.push_back(std::move(stream));
head_chunks.push_back(0);
min_head_chunk = 0; // shouldn't be necessary, but just in case
live_streams++;
if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSY
&& last_flush_until > 0)
{
ret.async_flush_until(last_flush_until);
}
stream_added(ret);
return ret;
}
Expand Down
Loading

0 comments on commit 64ba802

Please sign in to comment.