Skip to content

Commit

Permalink
Fix error with very large chunk ID
Browse files Browse the repository at this point in the history
A chunk ID close to 2^63 could lead to overflow bugs. Fix it by using
unsigned chunk IDs when dealing with head and tail chunk IDs. This does
require some more careful handling for determining whether a chunk ID is
behind the head, since it's a comparison of a signed and an unsigned
value.
  • Loading branch information
bmerry committed Jul 4, 2023
1 parent c866445 commit f42ed76
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 48 deletions.
48 changes: 29 additions & 19 deletions include/spead2/recv_chunk_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,16 @@ namespace detail

/**
* Sliding window of chunk pointers.
*
* @internal The chunk IDs are kept as unsigned values, so that the tail can
* be larger than any actual chunk ID.
*/
class chunk_window
{
private:
/// Circular buffer of chunks under construction.
std::vector<chunk *> chunks;
std::int64_t head_chunk = 0, tail_chunk = 0; ///< chunk IDs of valid chunk range
std::uint64_t head_chunk = 0, tail_chunk = 0; ///< chunk IDs of valid chunk range
std::size_t head_pos = 0, tail_pos = 0; ///< Positions corresponding to @ref head and @ref tail in @ref chunks

public:
Expand Down Expand Up @@ -253,9 +256,9 @@ class chunk_window
* the head and tail are both advanced to @a next_chunk.
*/
template<typename F1, typename F2>
void flush_all(std::int64_t next_chunk, const F1 &ready_chunk, const F2 &head_updated)
void flush_all(std::uint64_t next_chunk, const F1 &ready_chunk, const F2 &head_updated)
{
std::int64_t orig_head = head_chunk;
std::uint64_t orig_head = head_chunk;
while (!empty())
flush_head(ready_chunk);
head_chunk = tail_chunk = next_chunk;
Expand All @@ -265,13 +268,13 @@ class chunk_window

/// Flush until the head is at least @a target
template<typename F1, typename F2>
void flush_until(std::int64_t target, const F1 &ready_chunk, const F2 &head_updated)
void flush_until(std::uint64_t target, const F1 &ready_chunk, const F2 &head_updated)
{
if (head_chunk < target)
{
while (head_chunk != tail_chunk && head_chunk < target)
flush_head(ready_chunk);
if (head_chunk == tail_chunk && head_chunk < target)
if (head_chunk < target)
head_chunk = tail_chunk = target;
head_updated(target);
}
Expand All @@ -284,7 +287,7 @@ class chunk_window
*
* If @a chunk_id falls outside the window, returns nullptr.
*/
chunk *get_chunk(std::int64_t chunk_id) const
chunk *get_chunk(std::uint64_t chunk_id) const
{
if (chunk_id >= head_chunk && chunk_id < tail_chunk)
{
Expand All @@ -308,15 +311,17 @@ class chunk_window
*/
template<typename F1, typename F2, typename F3>
chunk *get_chunk(
std::int64_t chunk_id, std::uintptr_t stream_id,
std::uint64_t chunk_id, std::uintptr_t stream_id,
const F1 &allocate_chunk, const F2 &ready_chunk, const F3 &head_updated)
{
// chunk_id must be a valid int64_t
assert(chunk_id <= std::uint64_t(std::numeric_limits<std::int64_t>::max()));
const std::size_t max_chunks = chunks.size();
if (chunk_id >= head_chunk)
{
// We've moved beyond the end of our current window, and need to
// allocate fresh chunks.
if (chunk_id >= tail_chunk + std::int64_t(max_chunks))
if (chunk_id >= tail_chunk && chunk_id - tail_chunk >= max_chunks)
{
/* We've jumped ahead so far that the entire current window
* is stale. Flush it all and fast-forward to the new window.
Expand All @@ -327,7 +332,7 @@ class chunk_window
}
while (chunk_id >= tail_chunk)
{
if (std::size_t(tail_chunk - head_chunk) == max_chunks)
if (tail_chunk - head_chunk == max_chunks)
flush_head(ready_chunk, head_updated);
chunks[tail_pos] = allocate_chunk(tail_chunk);
if (chunks[tail_pos])
Expand All @@ -350,8 +355,8 @@ class chunk_window
return nullptr;
}

std::int64_t get_head_chunk() const { return head_chunk; }
std::int64_t get_tail_chunk() const { return tail_chunk; }
std::uint64_t get_head_chunk() const { return head_chunk; }
std::uint64_t get_tail_chunk() const { return tail_chunk; }
bool empty() const { return head_chunk == tail_chunk; }
};

Expand Down Expand Up @@ -395,8 +400,13 @@ class chunk_stream_state_base
void do_heap_ready(live_heap &&lh);

protected:
std::int64_t get_head_chunk() const { return chunks.get_head_chunk(); }
std::int64_t get_tail_chunk() const { return chunks.get_tail_chunk(); }
std::uint64_t get_head_chunk() const { return chunks.get_head_chunk(); }
std::uint64_t get_tail_chunk() const { return chunks.get_tail_chunk(); }
bool chunk_too_old(std::int64_t chunk_id) const
{
// Need to check against 0 explicitly to avoid signed/unsigned mixup
return chunk_id < 0 || std::uint64_t(chunk_id) < chunks.get_head_chunk();
}

public:
/// Constructor
Expand Down Expand Up @@ -480,7 +490,7 @@ class chunk_manager_simple
std::uint64_t *get_batch_stats(chunk_stream_state<chunk_manager_simple> &state) const;
chunk *allocate_chunk(chunk_stream_state<chunk_manager_simple> &state, std::int64_t chunk_id);
void ready_chunk(chunk_stream_state<chunk_manager_simple> &state, chunk *c);
void head_updated(chunk_stream_state<chunk_manager_simple> &state, std::int64_t head_chunk) {}
void head_updated(chunk_stream_state<chunk_manager_simple> &state, std::uint64_t head_chunk) {}
};

/**
Expand Down Expand Up @@ -705,9 +715,9 @@ template<typename CM>
void chunk_stream_state<CM>::flush_chunks()
{
chunks.flush_all(
std::numeric_limits<std::int64_t>::max(),
std::numeric_limits<std::uint64_t>::max(),
[this](chunk *c) { chunk_manager.ready_chunk(*this, c); },
[this](std::int64_t head_chunk) { chunk_manager.head_updated(*this, head_chunk); }
[this](std::uint64_t head_chunk) { chunk_manager.head_updated(*this, head_chunk); }
);
}

Expand Down Expand Up @@ -763,8 +773,8 @@ chunk_stream_state<CM>::allocate(std::size_t size, const packet_header &packet)
place_data->extra_offset = 0;
place_data->extra_size = 0;
chunk_config.get_place()(place_data, sizeof(*place_data));
auto chunk_id = place_data->chunk_id;
if (chunk_id < get_head_chunk())
std::int64_t chunk_id = place_data->chunk_id;
if (chunk_too_old(chunk_id))
{
// We don't want this heap.
metadata.chunk_id = -1;
Expand All @@ -780,7 +790,7 @@ chunk_stream_state<CM>::allocate(std::size_t size, const packet_header &packet)
stream_id,
[this](std::int64_t chunk_id) { return chunk_manager.allocate_chunk(*this, chunk_id); },
[this](chunk *c) { chunk_manager.ready_chunk(*this, c); },
[this](std::int64_t head_chunk) { chunk_manager.head_updated(*this, head_chunk); }
[this](std::uint64_t head_chunk) { chunk_manager.head_updated(*this, head_chunk); }
);
if (chunk_ptr)
{
Expand Down
14 changes: 7 additions & 7 deletions include/spead2/recv_chunk_stream_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class chunk_manager_group
std::uint64_t *get_batch_stats(chunk_stream_state<chunk_manager_group> &state) const;
chunk *allocate_chunk(chunk_stream_state<chunk_manager_group> &state, std::int64_t chunk_id);
void ready_chunk(chunk_stream_state<chunk_manager_group> &state, chunk *c) {}
void head_updated(chunk_stream_state<chunk_manager_group> &state, std::int64_t head_chunk);
void head_updated(chunk_stream_state<chunk_manager_group> &state, std::uint64_t head_chunk);
};

} // namespace detail
Expand Down Expand Up @@ -161,12 +161,12 @@ class chunk_stream_group
*
* The minimum element must always be equal to @c chunks.get_head_chunk().
*/
std::vector<std::int64_t> head_chunks;
std::vector<std::uint64_t> head_chunks;

/**
* Last value passed to all streams' async_flush_until.
*/
std::int64_t last_flush_until = 0;
std::uint64_t last_flush_until = 0;

/**
* Obtain the chunk with a given ID.
Expand All @@ -177,19 +177,19 @@ class chunk_stream_group
*
* This function is thread-safe.
*/
chunk *get_chunk(std::int64_t chunk_id, std::uintptr_t stream_id, std::uint64_t *batch_stats);
chunk *get_chunk(std::uint64_t chunk_id, std::uintptr_t stream_id, std::uint64_t *batch_stats);

/**
* Update the head_chunk copy for a stream. This version assumes the caller takes
* the mutex, and is only used internally.
*/
void stream_head_updated_unlocked(chunk_stream_group_member &s, std::int64_t head_chunk);
void stream_head_updated_unlocked(chunk_stream_group_member &s, std::uint64_t head_chunk);

/**
* Called by a stream to report movement in its head pointer. This function
* takes the group mutex.
*/
void stream_head_updated(chunk_stream_group_member &s, std::int64_t head_chunk);
void stream_head_updated(chunk_stream_group_member &s, std::uint64_t head_chunk);

/**
* Pass a chunk to the user-provided ready function. The caller is
Expand Down Expand Up @@ -306,7 +306,7 @@ class chunk_stream_group_member : private detail::chunk_stream_state<detail::chu
* This function returns immediately, and the work is done later on the
* io_service. It is safe to call from any thread.
*/
void async_flush_until(std::int64_t chunk_id);
void async_flush_until(std::uint64_t chunk_id);

protected:
/**
Expand Down
5 changes: 3 additions & 2 deletions src/recv_chunk_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ void chunk_stream_state_base::packet_memcpy(
const packet_header &packet) const
{
const heap_metadata &metadata = *get_heap_metadata(allocation);
if (metadata.chunk_id < get_head_chunk())
if (chunk_too_old(metadata.chunk_id))
{
// The packet corresponds to a chunk that has already been aged out
// TODO: increment a counter / log a warning
Expand All @@ -201,7 +201,8 @@ void chunk_stream_state_base::do_heap_ready(live_heap &&lh)
auto metadata = get_heap_metadata(h.get_payload());
// We need to check the chunk_id because the chunk might have been aged
// out while the heap was incomplete.
if (metadata && metadata->chunk_ptr && metadata->chunk_id >= get_head_chunk()
if (metadata && metadata->chunk_ptr
&& !chunk_too_old(metadata->chunk_id)
&& !get_chunk_config().get_packet_presence_payload_size())
{
assert(metadata->heap_index < metadata->chunk_ptr->present_size);
Expand Down
26 changes: 13 additions & 13 deletions src/recv_chunk_stream_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ chunk *chunk_manager_group::allocate_chunk(
}

void chunk_manager_group::head_updated(
chunk_stream_state<chunk_manager_group> &state, std::int64_t head_chunk)
chunk_stream_state<chunk_manager_group> &state, std::uint64_t head_chunk)
{
group.stream_head_updated(static_cast<chunk_stream_group_member &>(state), head_chunk);
}
Expand Down Expand Up @@ -152,7 +152,7 @@ void chunk_stream_group::stop()
*/
for (const auto &stream : streams)
{
stream->async_flush_until(std::numeric_limits<std::int64_t>::max());
stream->async_flush_until(std::numeric_limits<std::uint64_t>::max());
}
}
for (const auto &stream : streams)
Expand All @@ -164,10 +164,10 @@ void chunk_stream_group::stream_stop_received(chunk_stream_group_member &s)
std::lock_guard<std::mutex> lock(mutex);
// Set the head_chunk to the largest possible value, so that this stream
// no longer blocks anything.
stream_head_updated_unlocked(s, std::numeric_limits<std::int64_t>::max());
stream_head_updated_unlocked(s, std::numeric_limits<std::uint64_t>::max());
}

chunk *chunk_stream_group::get_chunk(std::int64_t chunk_id, std::uintptr_t stream_id, std::uint64_t *batch_stats)
chunk *chunk_stream_group::get_chunk(std::uint64_t chunk_id, std::uintptr_t stream_id, std::uint64_t *batch_stats)
{
std::unique_lock<std::mutex> lock(mutex);
/* Streams should not be requesting chunks older than their heads, and the group
Expand All @@ -184,9 +184,9 @@ chunk *chunk_stream_group::get_chunk(std::int64_t chunk_id, std::uintptr_t strea
* state after a wait.
*/
const std::size_t max_chunks = config.get_max_chunks();
if (std::uint64_t(chunk_id - chunks.get_head_chunk()) >= max_chunks)
if (chunk_id - chunks.get_head_chunk() >= max_chunks)
{
std::int64_t target = chunk_id - max_chunks + 1; // first chunk we don't need to flush
std::uint64_t target = chunk_id - (max_chunks - 1); // first chunk we don't need to flush
if (config.get_eviction_mode() == chunk_stream_group_config::eviction_mode::LOSSY
&& target > last_flush_until)
{
Expand All @@ -210,7 +210,7 @@ chunk *chunk_stream_group::get_chunk(std::int64_t chunk_id, std::uintptr_t strea
// Should be unreachable, as we've ensured this by waiting above
assert(false);
},
[](std::int64_t) {} // Don't need notification for head moving
[](std::uint64_t) {} // Don't need notification for head moving
);
return c;
}
Expand All @@ -221,10 +221,10 @@ void chunk_stream_group::ready_chunk(chunk *c, std::uint64_t *batch_stats)
config.get_ready()(std::move(owned), batch_stats);
}

void chunk_stream_group::stream_head_updated_unlocked(chunk_stream_group_member &s, std::int64_t head_chunk)
void chunk_stream_group::stream_head_updated_unlocked(chunk_stream_group_member &s, std::uint64_t head_chunk)
{
std::size_t stream_index = s.group_index;
std::int64_t old = head_chunks[stream_index];
std::uint64_t old = head_chunks[stream_index];
head_chunks[stream_index] = head_chunk;
// Update so that our head chunk is min(head_chunks). We can skip the work
// if we weren't previously the oldest.
Expand All @@ -234,12 +234,12 @@ void chunk_stream_group::stream_head_updated_unlocked(chunk_stream_group_member
chunks.flush_until(
min_head_chunk,
[this, &s](chunk *c) { ready_chunk(c, s.batch_stats.data()); },
[this](std::int64_t) { ready_condition.notify_all(); }
[this](std::uint64_t) { ready_condition.notify_all(); }
);
}
}

void chunk_stream_group::stream_head_updated(chunk_stream_group_member &s, std::int64_t head_chunk)
void chunk_stream_group::stream_head_updated(chunk_stream_group_member &s, std::uint64_t head_chunk)
{
std::lock_guard<std::mutex> lock(mutex);
stream_head_updated_unlocked(s, head_chunk);
Expand All @@ -264,14 +264,14 @@ void chunk_stream_group_member::heap_ready(live_heap &&lh)
do_heap_ready(std::move(lh));
}

void chunk_stream_group_member::async_flush_until(std::int64_t chunk_id)
void chunk_stream_group_member::async_flush_until(std::uint64_t chunk_id)
{
post([chunk_id](stream_base &s) {
chunk_stream_group_member &self = static_cast<chunk_stream_group_member &>(s);
self.chunks.flush_until(
chunk_id,
[](chunk *) {},
[&self](std::int64_t head_chunk) {
[&self](std::uint64_t head_chunk) {
self.group.stream_head_updated(self, head_chunk);
}
);
Expand Down
Loading

0 comments on commit f42ed76

Please sign in to comment.