Skip to content

Commit

Permalink
Safeguard against link overflows in ConcurrentHashMap
Browse files Browse the repository at this point in the history
Summary:
PROBLEM
Folly ConcurrentHashMaps use Hazard pointers to ensure map entries that were recently removed (using `erase`, `insert_or_assign`, etc) aren't cleaned up when there are readers for those objects. Instead, they are removed as part of a reclamation process which typically happens asynchronously. Moreover within ConcurrentHashMap, entries are linked to one another, and this linkage needs to be known within the hazard pointer logic to ensure we don't clean up an object that itself doesn't have any direct hazard pointers, but is referenced by another object that might have hazard pointers. That logic is within `HazptrObjLinked`.
Under high contention situations (see #2097 ) , the link counting logic can overflow, because a single object has too many dangling links. For example, consider a map that has 2 entries with the same hash code- `(A,0)` and `(B,0)`. Let's assume that `A` is stored before `B` internally within the `ConcurrentHashMap`'s `BucketTable`. `B` stores initially that it has a 1 link (to `A`). Now, let's assume that we replace `(A,0)` with `(A,1)`. While `(A,0)` is erased out of the `ConcurrentHashMap`, its not immediately reclaimed/deleted. During this interim, `B` has a link count of 2 to the 2 entries of `A`. This link count is stored as a 16 bit unsigned integer. If the above operation happens very quickly, then we end up in a situation where `B`'s link count overflows past 65535, and wraps around.
This situation is caught in debug compilation (due to `DCHECK`), but in opt builds, it results in bad retirements. For eg, if `B`'s link count goes past 65535 to 65537 (i.e. `1`), then when 1 object of `A` is reclaimed, the `B`'s link count would decrement past `1` back to `0`, causing `B` to be incorrectly retired. Now if we actually end up removing all of `A`, the link count will overflow backwards, from `0` back to `65535` and then back to `0`, causing a double retirement - a sign to corruption.

SOLUTION
While the situation is rare, it can arise for skewed data with a lot of contention. There are 3 options to "solve" this:
1. Increase the link count data structure size from 16bit to something higher - Simple, but a work-around. Eventually high-enough contention would bugs to show up there as well.
2. Crash the process when there is very high contention - Maintains the current performance guarantees, and when ConcurrentHashMap cannot meet those guarantees, it causes a fatal error.
3. Slow ConcurrentHashMap erasures under high contention (this diff) - Very high contention would cause ConcurrentHashMap to slow down, and give reclamation time to act. Functionally `ConcurrentHashMap` remains the same, but does exhibit different perf characteristics.

In this change, the `HazptrObjLinked` code is changed is disallow for overflows since it leads to corruption, and the callers are responsible for handling cases where links cannot be created. For `ConcurrentHashMap`, we keep waiting, until we can acquire a link : which means erasures under high contention are lock-free but not wait-free.
For reclamation, there are buffers within the cohort to store both retired objects (aka `list`) and reclaimed objects (aka `safe list`). In cases where `ConcurrentHashMap` is unable to acquire a link, it's imperative it tries to initiate a reclamation cycle to make progress, and thus I added a `cleanup()` method within the cohort to flush any existing retired objects to the hazard pointer domain for retirement-evaluation, kick off a reclamation cycle, and also retire any retired objects pending within the cohort.

Differential Revision: D51647789
  • Loading branch information
instw authored and facebook-github-bot committed Nov 30, 2023
1 parent 18a1822 commit 2a18f7a
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 19 deletions.
34 changes: 27 additions & 7 deletions folly/concurrency/detail/ConcurrentHashMap-detail.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class NodeT : public hazptr_obj_base_linked<
this->set_deleter( // defined in hazptr_obj
concurrenthashmap::HazptrDeleter<Allocator>());
this->set_cohort_tag(cohort); // defined in hazptr_obj
this->acquire_link_safe(); // defined in hazptr_obj_base_linked
CHECK(this->acquire_link_safe()); // defined in hazptr_obj_base_linked
}

ValueHolder<KeyType, ValueType, Allocator, Atom> item_;
Expand Down Expand Up @@ -361,7 +361,11 @@ class alignas(64) BucketTable {
}
}
// Set longest last run in new bucket, incrementing the refcount.
lastrun->acquire_link(); // defined in hazptr_obj_base_linked
while (!lastrun->acquire_link()) {
cohort->cleanup();
std::this_thread::yield();
}

newbuckets->buckets_[lastidx]().store(lastrun, std::memory_order_relaxed);
// Clone remaining nodes
for (; node != lastrun;
Expand Down Expand Up @@ -407,7 +411,12 @@ class alignas(64) BucketTable {
}

template <typename K, typename MatchFunc>
std::size_t erase(size_t h, const K& key, Iterator* iter, MatchFunc match) {
std::size_t erase(
size_t h,
const K& key,
Iterator* iter,
MatchFunc match,
hazptr_obj_cohort<Atom>* cohort) {
Node* node{nullptr};
{
std::lock_guard<Mutex> g(m_);
Expand All @@ -426,7 +435,10 @@ class alignas(64) BucketTable {
}
auto next = node->next_.load(std::memory_order_relaxed);
if (next) {
next->acquire_link(); // defined in hazptr_obj_base_linked
while (!next->acquire_link()) {
cohort->cleanup();
std::this_thread::yield();
} // defined in hazptr_obj_base_linked
}
if (prev) {
prev->next_.store(next, std::memory_order_release);
Expand Down Expand Up @@ -709,7 +721,10 @@ class alignas(64) BucketTable {
auto next = node->next_.load(std::memory_order_relaxed);
cur->next_.store(next, std::memory_order_relaxed);
if (next) {
next->acquire_link(); // defined in hazptr_obj_base_linked
while (!next->acquire_link()) {
cohort->cleanup();
std::this_thread::yield();
} // defined in hazptr_obj_base_linked
}
prev->store(cur, std::memory_order_release);
it.setNode(cur, buckets, bcount, idx);
Expand Down Expand Up @@ -1347,7 +1362,12 @@ class alignas(64) SIMDTable {
}

template <typename K, typename MatchFunc>
std::size_t erase(size_t h, const K& key, Iterator* iter, MatchFunc match) {
std::size_t erase(
size_t h,
const K& key,
Iterator* iter,
MatchFunc match,
hazptr_obj_cohort<Atom>* /* cohort */) {
const HashPair hp = splitHash(h);

std::unique_lock<Mutex> g(m_);
Expand Down Expand Up @@ -1880,7 +1900,7 @@ class alignas(64) ConcurrentHashMapSegment {
template <typename K, typename MatchFunc>
size_type erase_internal(
size_t h, const K& key, Iterator* iter, MatchFunc match) {
return impl_.erase(h, key, iter, match);
return impl_.erase(h, key, iter, match, cohort_);
}

// Unfortunately because we are reusing nodes on rehash, we can't
Expand Down
52 changes: 51 additions & 1 deletion folly/concurrency/test/ConcurrentHashMapTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
#include <folly/concurrency/ConcurrentHashMap.h>

#include <atomic>
#include <latch>
#include <limits>
#include <memory>
#include <thread>
#include <vector>

#include <folly/Traits.h>
#include <folly/container/test/TrackingTypes.h>
Expand Down Expand Up @@ -1131,6 +1134,52 @@ TYPED_TEST_P(ConcurrentHashMapTest, ConcurrentInsertClear) {
}
}

TYPED_TEST_P(ConcurrentHashMapTest, StressTestReclamation) {
// Create a map where we keep reclaiming a lot of objects that are linked to
// one node.

// Ensure all entries are mapped to a single segment.
auto constant_hash = [](unsigned long) -> uint64_t { return 0; };
CHM<unsigned long, unsigned long, decltype(constant_hash)> map;
static constexpr unsigned long key_prev =
0; // A key that the test key has a link to - to guard against immediate
// reclamation.
static constexpr unsigned long key_test =
1; // A key that keeps being reclaimed repeatedly.
static constexpr unsigned long key_link_explosion =
2; // A key that is linked to the test key.

EXPECT_TRUE(map.insert(std::make_pair(key_prev, 0)).second);
EXPECT_TRUE(map.insert(std::make_pair(key_test, 0)).second);
EXPECT_TRUE(map.insert(std::make_pair(key_link_explosion, 0)).second);

std::vector<std::thread> threads;
// The number of links are stored as a uint16_t, so having 65K threads should
// cause sufficient racing
static constexpr uint64_t num_threads = std::numeric_limits<uint16_t>::max();
static constexpr uint64_t iters = 100;
std::latch start{num_threads};
for (uint64_t t = 0; t < num_threads; t++) {
threads.push_back(lib::thread([t, &map, &start]() {
start.arrive_and_wait();
static constexpr uint64_t progress_report_pct =
(iters / 20); // Every 5% we log progress
for (uint64_t i = 0; i < iters; i++) {
if (t == 0 && (i % progress_report_pct) == 0) {
// To a casual observer - to know that the test is progressing, even
// if slowly
LOG(INFO) << "Progress: " << (i * 100 / iters);
}

map.insert_or_assign(key_test, i * num_threads);
}
}));
}
for (auto& t : threads) {
join;
}
}

REGISTER_TYPED_TEST_SUITE_P(
ConcurrentHashMapTest,
MapTest,
Expand Down Expand Up @@ -1174,7 +1223,8 @@ REGISTER_TYPED_TEST_SUITE_P(
HeterogeneousInsert,
InsertOrAssignIterator,
EraseClonedNonCopyable,
ConcurrentInsertClear);
ConcurrentInsertClear,
StressTestReclamation);

using folly::detail::concurrenthashmap::bucket::BucketTable;

Expand Down
7 changes: 7 additions & 0 deletions folly/synchronization/HazptrObj.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,13 @@ class hazptr_obj_cohort {
DCHECK(l_.empty());
}

/** force reclaiming any items that are retired. Executes reclamation */
void cleanup() {
check_threshold_push();
default_hazptr_domain<Atom>().cleanup();
reclaim_safe_list();
}

private:
friend class hazptr_domain<Atom>;
friend class hazptr_obj<Atom>;
Expand Down
30 changes: 19 additions & 11 deletions folly/synchronization/HazptrObjLinked.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ class hazptr_obj_linked : public hazptr_obj<Atom> {
Atom<Count> count_{0};

public:
void acquire_link() noexcept { count_inc(kLink); }
bool acquire_link() noexcept { return count_inc(kLink, kLinkMask); }

void acquire_link_safe() noexcept { count_inc_safe(kLink); }
bool acquire_link_safe() noexcept { return count_inc_safe(kLink, kLinkMask); }

void acquire_ref() noexcept { count_inc(kRef); }
bool acquire_ref() noexcept { return count_inc(kRef, kRefMask); }

void acquire_ref_safe() noexcept { count_inc_safe(kRef); }
bool acquire_ref_safe() noexcept { return count_inc_safe(kRef, kRefMask); }

private:
template <typename, template <typename> class, typename>
Expand All @@ -116,17 +116,25 @@ class hazptr_obj_linked : public hazptr_obj<Atom> {
count_.store(val, std::memory_order_release);
}

void count_inc(Count add) noexcept {
auto oldval = count_.fetch_add(add, std::memory_order_acq_rel);
DCHECK_LT(oldval & kLinkMask, kLinkMask);
DCHECK_LT(oldval & kRefMask, kRefMask);
bool count_inc(Count add, Count mask) noexcept {
Count oldval = count();
while (true) {
if ((oldval & mask) == mask) {
return false;
}
if (count_cas(oldval, oldval + add)) {
return true;
}
}
}

void count_inc_safe(Count add) noexcept {
bool count_inc_safe(Count add, Count mask) noexcept {
auto oldval = count();
if ((oldval & mask) == mask) {
return false;
}
count_set(oldval + add);
DCHECK_LT(oldval & kLinkMask, kLinkMask);
DCHECK_LT(oldval & kRefMask, kRefMask);
return true;
}

bool count_cas(Count& oldval, Count newval) noexcept {
Expand Down

0 comments on commit 2a18f7a

Please sign in to comment.