Skip to content

Commit

Permalink
Enable spin polling via a zero microsecond wait between lulls
Browse files Browse the repository at this point in the history
Summary: Dispenso's scheduling mechanism works roughly like this.  Loop until it is time to tear down the thread pool.  Inside that loop, grab work as long as there is work, and when we run out of work for some time, we yield for some time to the OS.  There are two variants of this, but those details are unimportant to this change.  In this change, if the yield time is set to zero, we will now no longer sleep.  This could be important for very latency critical cases, but should likely be used only with thread pools that do not dominate all hardware threads in the system to keep the OS and program moving smoothly.

Reviewed By: CalebVR

Differential Revision: D50331611

fbshipit-source-id: 781a9f24246ed7de759b3168f9d618d9a31efe7a
  • Loading branch information
graphicsMan authored and facebook-github-bot committed Oct 22, 2023
1 parent dd4581e commit d31040c
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 38 deletions.
6 changes: 5 additions & 1 deletion dispenso/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ void ThreadPool::PerThreadData::stop() {
}

uint32_t ThreadPool::wait(uint32_t currentEpoch) {
return epochWaiter_.waitFor(currentEpoch, sleepLengthUs_.load(std::memory_order_acquire));
if (sleepLengthUs_ > 0) {
return epochWaiter_.waitFor(currentEpoch, sleepLengthUs_.load(std::memory_order_acquire));
} else {
return epochWaiter_.current();
}
}
void ThreadPool::wake() {
epochWaiter_.bumpAndWake();
Expand Down
128 changes: 91 additions & 37 deletions tests/thread_pool_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class ThreadPoolTest : public testing::TestWithParam<ScheduleType> {
pool_.reset();
}

private:
protected:
std::unique_ptr<dispenso::ThreadPool> pool_;
size_t count_ = 0;
};
Expand All @@ -78,15 +78,11 @@ INSTANTIATE_TEST_SUITE_P(
TEST_P(ThreadPoolTest, SimpleWork) {
constexpr int kWorkItems = 10000;
std::vector<int> outputs(kWorkItems, 0);
std::atomic<int> completed(0);
{
initPool(10);
int i = 0;
for (int& o : outputs) {
schedule([i, &o, &completed]() {
o = i * i;
completed.fetch_add(1, std::memory_order_relaxed);
});
schedule([i, &o]() { o = i * i; });
++i;
}
destroyPool();
Expand All @@ -103,20 +99,13 @@ TEST_P(ThreadPoolTest, MixedWork) {
constexpr size_t kWorkItems = 10000;
std::vector<size_t> outputsA(kWorkItems, 0);
std::vector<size_t> outputsB(kWorkItems, 0);
std::atomic<int> completed(0);
{
initPool(10);
for (size_t i = 0; i < kWorkItems; ++i) {
auto& a = outputsA[i];
auto& b = outputsB[i];
schedule([i, &a, &completed]() {
a = i * i;
completed.fetch_add(1, std::memory_order_relaxed);
});
schedule([i, &b, &completed]() {
b = i * i * i;
completed.fetch_add(1, std::memory_order_relaxed);
});
schedule([i, &a]() { a = i * i; });
schedule([i, &b]() { b = i * i * i; });
}
destroyPool();
}
Expand All @@ -130,15 +119,11 @@ TEST_P(ThreadPoolTest, MixedWork) {
TEST(ThreadPool, ResizeConcurrent) {
constexpr int kWorkItems = 10000;
std::vector<int> outputs(kWorkItems, 0);
std::atomic<int> completed(0);
{
dispenso::ThreadPool pool(10);
int i = 0;
for (int& o : outputs) {
pool.schedule([i, &o, &completed]() {
o = i * i;
completed.fetch_add(1, std::memory_order_relaxed);
});
pool.schedule([i, &o]() { o = i * i; });
++i;

if ((i & 127) == 0) {
Expand All @@ -161,7 +146,6 @@ TEST(ThreadPool, ResizeConcurrent) {
TEST(ThreadPool, ResizeMoreConcurrent) {
constexpr int kWorkItems = 1000000;
std::vector<int64_t> outputs(kWorkItems, 0);
std::atomic<int> completed(0);
{
dispenso::ThreadPool pool(10);

Expand All @@ -179,10 +163,7 @@ TEST(ThreadPool, ResizeMoreConcurrent) {

int64_t i = 0;
for (int64_t& o : outputs) {
pool.schedule([i, &o, &completed]() {
o = i * i;
completed.fetch_add(1, std::memory_order_relaxed);
});
pool.schedule([i, &o]() { o = i * i; });
++i;
}
resizer0.join();
Expand All @@ -202,7 +183,6 @@ TEST(ThreadPool, SetSignalingWakeConcurrent) {
using namespace std::chrono_literals;
constexpr int kWorkItems = 1000000;
std::vector<int64_t> outputs(kWorkItems, 0);
std::atomic<int> completed(0);
{
dispenso::ThreadPool pool(10);

Expand All @@ -220,10 +200,7 @@ TEST(ThreadPool, SetSignalingWakeConcurrent) {

int64_t i = 0;
for (int64_t& o : outputs) {
pool.schedule([i, &o, &completed]() {
o = i * i;
completed.fetch_add(1, std::memory_order_relaxed);
});
pool.schedule([i, &o]() { o = i * i; });
++i;
}
resetter0.join();
Expand Down Expand Up @@ -285,18 +262,12 @@ TEST(ThreadPool, ResizeCheckApproxActualRunningThreads) {
TEST_P(ThreadPoolTest, CrossPoolTest) {
constexpr int kWorkItems = 10000;
std::vector<int> outputs(kWorkItems, 0);
std::atomic<int> completed(0);
{
dispenso::ThreadPool otherPool(8);
initPool(10);
int i = 0;
for (int& o : outputs) {
schedule([i, &o, &completed, &otherPool]() {
otherPool.schedule([i, &o, &completed]() {
o = i * i;
completed.fetch_add(1, std::memory_order_relaxed);
});
});
schedule([i, &o, &otherPool]() { otherPool.schedule([i, &o]() { o = i * i; }); });
++i;
}
destroyPool();
Expand All @@ -308,3 +279,86 @@ TEST_P(ThreadPoolTest, CrossPoolTest) {
++i;
}
}

TEST(ThreadPool, SetSignalingWakeConcurrentZeroLatency) {
using namespace std::chrono_literals;
constexpr int kWorkItems = 1000000;
std::vector<int64_t> outputs(kWorkItems, 0);
{
dispenso::ThreadPool pool(10);

std::thread resetter0([&pool]() {
for (int i = 0; i < 100; ++i) {
pool.setSignalingWake(true, 0us);
}
});

std::thread resetter1([&pool]() {
for (int i = 0; i < 100; ++i) {
pool.setSignalingWake(false, 400us);
}
});

int64_t i = 0;
for (int64_t& o : outputs) {
pool.schedule([i, &o]() { o = i * i; });
++i;
}
resetter0.join();
resetter1.join();

EXPECT_EQ(static_cast<int>(pool.numThreads()), 10);
}

int64_t i = 0;
for (int64_t o : outputs) {
EXPECT_EQ(o, i * i);
++i;
}
}

TEST_P(ThreadPoolTest, SimpleWorkZeroLatencyPoll) {
using namespace std::chrono_literals;
constexpr int kWorkItems = 10000;
std::vector<int> outputs(kWorkItems, 0);

int i;

{
initPool(10);
pool_->setSignalingWake(true, 0us);

i = 0;
for (int& o : outputs) {
schedule([i, &o]() { o = i * i; });
++i;
}
destroyPool();
}

i = 0;
for (int o : outputs) {
EXPECT_EQ(o, i * i);
++i;
}

// switch scheduling method

{
initPool(10);
pool_->setSignalingWake(false, 0us);

i = 0;
for (int& o : outputs) {
schedule([i, &o]() { o = i * i; });
++i;
}
destroyPool();
}

i = 0;
for (int o : outputs) {
EXPECT_EQ(o, i * i);
++i;
}
}

0 comments on commit d31040c

Please sign in to comment.