Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RID_Owner synchronization #98488

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 108 additions & 3 deletions core/templates/rid_owner.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,40 @@
#include <stdio.h>
#include <typeinfo>

#ifdef SANITIZERS_ENABLED
#ifdef __has_feature
#if __has_feature(thread_sanitizer)
#define TSAN_ENABLED
#endif
#elif defined(__SANITIZE_THREAD__)
#define TSAN_ENABLED
#endif
#endif

#ifdef TSAN_ENABLED
#include <sanitizer/tsan_interface.h>
#endif

// The following macros would need to be implemented somehow
// for purely weakly ordered architectures. There's a test case
// ("[RID_Owner] Thread safety") with potential to catch issues
// on such architectures if these primitives fail to be implemented.
// For now, they will be just markers about needs that may arise.
#define WEAK_MEMORY_ORDER 0
#if WEAK_MEMORY_ORDER
// Ideally, we'd have implementations that collaborate with the
// sync mechanism used (e.g., the mutex) so instead of some full
// memory barriers being issued, some acquire-release on the
// primitive itself. However, these implementations will at least
// provide correctness.
#define SYNC_ACQUIRE std::atomic_thread_fence(std::memory_order_acquire);
#define SYNC_RELEASE std::atomic_thread_fence(std::memory_order_release);
#else
// Compiler barriers are enough in this case.
#define SYNC_ACQUIRE std::atomic_signal_fence(std::memory_order_acquire);
#define SYNC_RELEASE std::atomic_signal_fence(std::memory_order_release);
#endif

class RID_AllocBase {
static SafeNumeric<uint64_t> base_id;

Expand Down Expand Up @@ -120,7 +154,12 @@ class RID_Alloc : public RID_AllocBase {
free_list_chunks[chunk_count][i] = alloc_count + i;
}

max_alloc += elements_in_chunk;
if constexpr (THREAD_SAFE) {
// Store atomically to avoid data race with the load in get_or_null().
((std::atomic<uint32_t> *)&max_alloc)->store(max_alloc + elements_in_chunk, std::memory_order_relaxed);
} else {
max_alloc += elements_in_chunk;
}
}

uint32_t free_index = free_list_chunks[alloc_count / elements_in_chunk][alloc_count % elements_in_chunk];
Expand Down Expand Up @@ -168,9 +207,19 @@ class RID_Alloc : public RID_AllocBase {
return nullptr;
}

if constexpr (THREAD_SAFE) {
SYNC_ACQUIRE;
}

uint64_t id = p_rid.get_id();
uint32_t idx = uint32_t(id & 0xFFFFFFFF);
if (unlikely(idx >= max_alloc)) {
uint32_t ma;
if constexpr (THREAD_SAFE) { // Read atomically to avoid data race with the store in _allocate_rid().
ma = ((std::atomic<uint32_t> *)&max_alloc)->load(std::memory_order_relaxed);
} else {
ma = max_alloc;
}
if (unlikely(idx >= ma)) {
return nullptr;
}

Expand All @@ -179,7 +228,23 @@ class RID_Alloc : public RID_AllocBase {

uint32_t validator = uint32_t(id >> 32);

if constexpr (THREAD_SAFE) {
#ifdef TSAN_ENABLED
__tsan_acquire(&chunks[idx_chunk]); // We know not a race in practice.
__tsan_acquire(&chunks[idx_chunk][idx_element]); // We know not a race in practice.
#endif
}

Chunk &c = chunks[idx_chunk][idx_element];

if constexpr (THREAD_SAFE) {
#ifdef TSAN_ENABLED
__tsan_release(&chunks[idx_chunk]);
__tsan_release(&chunks[idx_chunk][idx_element]);
__tsan_acquire(&c.validator); // We know not a race in practice.
#endif
}

if (unlikely(p_initialize)) {
if (unlikely(!(c.validator & 0x80000000))) {
ERR_FAIL_V_MSG(nullptr, "Initializing already initialized RID");
Expand All @@ -198,19 +263,54 @@ class RID_Alloc : public RID_AllocBase {
return nullptr;
}

if constexpr (THREAD_SAFE) {
#ifdef TSAN_ENABLED
__tsan_release(&c.validator);
#endif
}

T *ptr = &c.data;

return ptr;
}
void initialize_rid(RID p_rid) {
T *mem = get_or_null(p_rid, true);
ERR_FAIL_NULL(mem);

if constexpr (THREAD_SAFE) {
#ifdef TSAN_ENABLED
__tsan_acquire(mem); // We know not a race in practice.
#endif
}

memnew_placement(mem, T);

if constexpr (THREAD_SAFE) {
#ifdef TSAN_ENABLED
__tsan_release(mem);
#endif
SYNC_RELEASE;
}
}

void initialize_rid(RID p_rid, const T &p_value) {
T *mem = get_or_null(p_rid, true);
ERR_FAIL_NULL(mem);

if constexpr (THREAD_SAFE) {
#ifdef TSAN_ENABLED
__tsan_acquire(mem); // We know not a race in practice.
#endif
}

memnew_placement(mem, T(p_value));

if constexpr (THREAD_SAFE) {
#ifdef TSAN_ENABLED
__tsan_release(mem);
#endif
SYNC_RELEASE;
}
}

_FORCE_INLINE_ bool owns(const RID &p_rid) const {
Expand Down Expand Up @@ -329,16 +429,21 @@ class RID_Alloc : public RID_AllocBase {
chunk_limit = (p_maximum_number_of_elements / elements_in_chunk) + 1;
chunks = (Chunk **)memalloc(sizeof(Chunk *) * chunk_limit);
free_list_chunks = (uint32_t **)memalloc(sizeof(uint32_t *) * chunk_limit);
SYNC_RELEASE;
}
}

~RID_Alloc() {
if constexpr (THREAD_SAFE) {
SYNC_ACQUIRE;
}

if (alloc_count) {
print_error(vformat("ERROR: %d RID allocations of type '%s' were leaked at exit.",
alloc_count, description ? description : typeid(T).name()));

for (size_t i = 0; i < max_alloc; i++) {
uint64_t validator = chunks[i / elements_in_chunk][i % elements_in_chunk].validator;
uint32_t validator = chunks[i / elements_in_chunk][i % elements_in_chunk].validator;
if (validator & 0x80000000) {
continue; //uninitialized
}
Expand Down
153 changes: 153 additions & 0 deletions tests/core/templates/test_rid.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,27 @@
#ifndef TEST_RID_H
#define TEST_RID_H

#include "core/os/thread.h"
#include "core/templates/local_vector.h"
#include "core/templates/rid.h"
#include "core/templates/rid_owner.h"

#include "tests/test_macros.h"

#ifdef SANITIZERS_ENABLED
#ifdef __has_feature
#if __has_feature(thread_sanitizer)
#define TSAN_ENABLED
#endif
#elif defined(__SANITIZE_THREAD__)
#define TSAN_ENABLED
#endif
#endif

#ifdef TSAN_ENABLED
#include <sanitizer/tsan_interface.h>
#endif

namespace TestRID {
TEST_CASE("[RID] Default Constructor") {
RID rid;
Expand Down Expand Up @@ -96,6 +113,142 @@ TEST_CASE("[RID] 'get_local_index'") {
CHECK(RID::from_uint64(4'294'967'295).get_local_index() == 4'294'967'295);
CHECK(RID::from_uint64(4'294'967'297).get_local_index() == 1);
}

// This case would let sanitizers realize data races.
// Additionally, on purely weakly ordered architectures, it would detect synchronization issues
// if RID_Alloc failed to impose proper memory ordering and the test's threads are distributed
// among multiple L1 caches.
TEST_CASE("[RID_Owner] Thread safety") {
struct DataHolder {
char data[Thread::CACHE_LINE_BYTES];
};

struct RID_OwnerTester {
uint32_t thread_count = 0;
RID_Owner<DataHolder, true> rid_owner;
TightLocalVector<Thread> threads;
SafeNumeric<uint32_t> next_thread_idx;
// Using std::atomic directly since SafeNumeric doesn't support relaxed ordering.
TightLocalVector<std::atomic<uint64_t>> rids;
std::atomic<uint32_t> sync[2] = {};
std::atomic<uint32_t> correct = 0;

// A barrier that doesn't introduce memory ordering constraints, only compiler ones.
// The idea is not to cause any sync traffic that would make the code we want to test
// seem correct as a side effect.
void lockstep(uint32_t p_step) {
uint32_t buf_idx = p_step % 2;
uint32_t target = (p_step / 2 + 1) * threads.size();
sync[buf_idx].fetch_add(1, std::memory_order_relaxed);
do {
std::this_thread::yield();
} while (sync[buf_idx].load(std::memory_order_relaxed) != target);
}

explicit RID_OwnerTester(bool p_chunk_for_all, bool p_chunks_preallocated) :
thread_count(OS::get_singleton()->get_processor_count()),
rid_owner(sizeof(DataHolder) * (p_chunk_for_all ? thread_count : 1)) {
threads.resize(thread_count);
rids.resize(threads.size());
if (p_chunks_preallocated) {
LocalVector<RID> prealloc_rids;
for (uint32_t i = 0; i < (p_chunk_for_all ? 1 : threads.size()); i++) {
prealloc_rids.push_back(rid_owner.make_rid());
}
for (uint32_t i = 0; i < prealloc_rids.size(); i++) {
rid_owner.free(prealloc_rids[i]);
}
}
}

~RID_OwnerTester() {
for (uint32_t i = 0; i < threads.size(); i++) {
rid_owner.free(RID::from_uint64(rids[i].load(std::memory_order_relaxed)));
}
}

void test() {
for (uint32_t i = 0; i < threads.size(); i++) {
threads[i].start(
[](void *p_data) {
RID_OwnerTester *rot = (RID_OwnerTester *)p_data;

auto _compute_thread_unique_byte = [](uint32_t p_idx) -> char {
return ((p_idx & 0xff) ^ (0b11111110 << (p_idx % 8)));
};

// 1. Each thread gets a zero-based index.
uint32_t self_th_idx = rot->next_thread_idx.postincrement();

rot->lockstep(0);

// 2. Each thread makes a RID holding unique data.
DataHolder initial_data;
memset(&initial_data, _compute_thread_unique_byte(self_th_idx), Thread::CACHE_LINE_BYTES);
RID my_rid = rot->rid_owner.make_rid(initial_data);
rot->rids[self_th_idx].store(my_rid.get_id(), std::memory_order_relaxed);

rot->lockstep(1);

// 3. Each thread verifies all the others.
uint32_t local_correct = 0;
for (uint32_t th_idx = 0; th_idx < rot->threads.size(); th_idx++) {
if (th_idx == self_th_idx) {
continue;
}
char expected_unique_byte = _compute_thread_unique_byte(th_idx);
RID rid = RID::from_uint64(rot->rids[th_idx].load(std::memory_order_relaxed));
DataHolder *data = rot->rid_owner.get_or_null(rid);
#ifdef TSAN_ENABLED
__tsan_acquire(data); // We know not a race in practice.
#endif
bool ok = true;
for (uint32_t j = 0; j < Thread::CACHE_LINE_BYTES; j++) {
if (data->data[j] != expected_unique_byte) {
ok = false;
break;
}
}
if (ok) {
local_correct++;
}
#ifdef TSAN_ENABLED
__tsan_release(data);
#endif
}

rot->lockstep(2);

rot->correct.fetch_add(local_correct, std::memory_order_acq_rel);
},
this);
}

for (uint32_t i = 0; i < threads.size(); i++) {
threads[i].wait_to_finish();
}

CHECK_EQ(correct.load(), threads.size() * (threads.size() - 1));
}
};

SUBCASE("All items in one chunk, pre-allocated") {
RID_OwnerTester tester(true, true);
tester.test();
}
SUBCASE("All items in one chunk, NOT pre-allocated") {
RID_OwnerTester tester(true, false);
tester.test();
}
SUBCASE("One item per chunk, pre-allocated") {
RID_OwnerTester tester(false, true);
tester.test();
}
SUBCASE("One item per chunk, NOT pre-allocated") {
RID_OwnerTester tester(false, false);
tester.test();
}
}
} // namespace TestRID

#endif // TEST_RID_H