Skip to content

Commit

Permalink
Enable limiting chunk sizes to parallel_for
Browse files Browse the repository at this point in the history
Summary: Very often the input workloads for a parallel_for are variable.  In cases where each loop iteration is lightweight, parallel_for had high overhead when there wasn't much work to do.  We add a new option `minItemsPerChunk` which controls the sizes of chunks that are divvied for threads to work on when load balancing is kAuto or kStatic.  This option will be ignored if a concrete chunk size is provided to makeChunkedRange.

Reviewed By: EscapeZero

Differential Revision: D49334866

fbshipit-source-id: 62040329e0a6dcf872bc8a27514b821403097ffe
  • Loading branch information
graphicsMan authored and facebook-github-bot committed Oct 10, 2023
1 parent 5c96355 commit c297716
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 39 deletions.
47 changes: 34 additions & 13 deletions benchmarks/simple_for_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ static constexpr int kSmallSize = 1000;
static constexpr int kMediumSize = 1000000;
static constexpr int kLargeSize = 100000000;

static constexpr uint32_t kMinSizePerChunk = 10000;

const std::vector<int>& getInputs(int num_elements) {
static std::unordered_map<int, std::vector<int>> vecs;
auto it = vecs.find(num_elements);
Expand Down Expand Up @@ -73,12 +75,18 @@ void BM_dispenso(benchmark::State& state) {
std::vector<int> output(num_elements, 0);
dispenso::ThreadPool pool(num_threads);

dispenso::ParForOptions options;
options.minItemsPerChunk = kMinSizePerChunk;

auto& input = getInputs(num_elements);
for (auto UNUSED_VAR : state) {
dispenso::TaskSet tasks(pool);
dispenso::parallel_for(tasks, 0, num_elements, [&input, &output](size_t i) {
output[i] = input[i] * input[i] - 3 * input[i];
});
dispenso::parallel_for(
tasks,
0,
num_elements,
[&input, &output](size_t i) { output[i] = input[i] * input[i] - 3 * input[i]; },
options);
}
checkResults(input, output);
}
Expand All @@ -90,15 +98,22 @@ void BM_dispenso_static_chunk(benchmark::State& state) {
std::vector<int> output(num_elements, 0);
dispenso::ThreadPool pool(num_threads);

dispenso::ParForOptions options;
options.minItemsPerChunk = kMinSizePerChunk;

auto& input = getInputs(num_elements);
for (auto UNUSED_VAR : state) {
dispenso::TaskSet tasks(pool);
auto range = dispenso::makeChunkedRange(0, num_elements, dispenso::ParForChunking::kStatic);
dispenso::parallel_for(tasks, range, [&input, &output](size_t begin, size_t end) {
for (size_t i = begin; i < end; ++i) {
output[i] = input[i] * input[i] - 3 * input[i];
}
});
dispenso::parallel_for(
tasks,
range,
[&input, &output](size_t begin, size_t end) {
for (size_t i = begin; i < end; ++i) {
output[i] = input[i] * input[i] - 3 * input[i];
}
},
options);
}
checkResults(input, output);
}
Expand All @@ -109,16 +124,22 @@ void BM_dispenso_auto_chunk(benchmark::State& state) {

std::vector<int> output(num_elements, 0);
dispenso::ThreadPool pool(num_threads);
dispenso::ParForOptions options;
options.minItemsPerChunk = kMinSizePerChunk;

auto& input = getInputs(num_elements);
for (auto UNUSED_VAR : state) {
dispenso::TaskSet tasks(pool);
auto range = dispenso::makeChunkedRange(0, num_elements, dispenso::ParForChunking::kAuto);
dispenso::parallel_for(tasks, range, [&input, &output](size_t begin, size_t end) {
for (size_t i = begin; i < end; ++i) {
output[i] = input[i] * input[i] - 3 * input[i];
}
});
dispenso::parallel_for(
tasks,
range,
[&input, &output](size_t begin, size_t end) {
for (size_t i = begin; i < end; ++i) {
output[i] = input[i] * input[i] - 3 * input[i];
}
},
options);
}
checkResults(input, output);
}
Expand Down
6 changes: 5 additions & 1 deletion benchmarks/summing_for_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ void BM_dispenso(benchmark::State& state) {
int64_t sum = 0;
int foo = 0;

dispenso::ParForOptions options;
options.minItemsPerChunk = 50000;

auto& input = getInputs(num_elements);
for (auto UNUSED_VAR : state) {
dispenso::TaskSet tasks(pool);
Expand All @@ -98,7 +101,8 @@ void BM_dispenso(benchmark::State& state) {
lsum += input[i] * input[i] - 3 * foo * input[i];
}
lsumStore += lsum;
});
},
options);
sum = 0;
for (auto s : sums) {
sum += s;
Expand Down
6 changes: 5 additions & 1 deletion benchmarks/trivial_compute_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ void BM_dispenso(benchmark::State& state) {
uint64_t sum = 0;
int foo = 0;

dispenso::ParForOptions options;
options.minItemsPerChunk = 4000;

auto input = getInputs(num_elements);
for (auto UNUSED_VAR : state) {
dispenso::TaskSet tasks(pool);
Expand All @@ -95,7 +98,8 @@ void BM_dispenso(benchmark::State& state) {
lsum += calculate(input, i, foo);
}
lsumStore += lsum;
});
},
options);
sum = 0;
for (auto s : sums) {
sum += s;
Expand Down
89 changes: 65 additions & 24 deletions dispenso/parallel_for.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ struct ParForOptions {
* used when invoking the version of parallel_for that takes index parameters (vs a ChunkedRange).
**/
ParForChunking defaultChunking = ParForChunking::kStatic;

/**
* Specify a minimum number of items per chunk for static or auto dynamic load balancing. Cheaper
* workloads should have a higher number of minWorkItems. Will be ignored if an explicit chunk
* size is provided to ChunkedRange.
**/
uint32_t minItemsPerChunk = 1;
};

/**
Expand Down Expand Up @@ -110,6 +117,10 @@ struct ChunkedRange {
return chunk == kStatic;
}

bool isAuto() const {
return chunk == 0;
}

bool empty() const {
return end <= start;
}
Expand All @@ -119,17 +130,19 @@ struct ChunkedRange {
}

template <typename OtherInt>
std::tuple<size_type, size_type> calcChunkSize(OtherInt numLaunched, bool oneOnCaller) const {
std::tuple<size_type, size_type>
calcChunkSize(OtherInt numLaunched, bool oneOnCaller, size_type minChunkSize) const {
size_type workingThreads = static_cast<size_type>(numLaunched) + size_type{oneOnCaller};
assert(workingThreads > 1);

if (!chunk) {
// TODO(bbudge): play with different load balancing factors for auto.
// IntegerT dynFactor = std::log2(1.0f + (end_ - start_) / (cyclesPerIndex_ *
// cyclesPerIndex_));
constexpr size_type dynFactor = 16;
const size_type chunks = dynFactor * workingThreads;
const size_type chunkSize = (size() + chunks - 1) / chunks;
size_type dynFactor = std::min<size_type>(16, size() / workingThreads);
size_type chunkSize;
do {
size_type roughChunks = dynFactor * workingThreads;
chunkSize = (size() + roughChunks - 1) / roughChunks;
--dynFactor;
} while (chunkSize < minChunkSize);
return {chunkSize, (size() + chunkSize - 1) / chunkSize};
} else if (chunk == kStatic) {
// This should never be called. The static distribution versions of the parallel_for
Expand Down Expand Up @@ -219,26 +232,29 @@ void parallel_for_staticImpl(
const StateGen& defaultState,
const ChunkedRange<IntegerT>& range,
F&& f,
ParForOptions options) {
ssize_t numThreads = std::min<ssize_t>(taskSet.numPoolThreads(), options.maxThreads);
ssize_t maxThreads,
bool wait) {
using size_type = typename ChunkedRange<IntegerT>::size_type;

size_type numThreads = std::min<size_type>(taskSet.numPoolThreads(), maxThreads);
// Reduce threads used if they exceed work to be done.
numThreads = std::min<ssize_t>(numThreads, range.size()) + options.wait;
numThreads = std::min(numThreads, range.size()) + wait;

for (ssize_t i = 0; i < numThreads; ++i) {
for (size_type i = 0; i < numThreads; ++i) {
states.emplace_back(defaultState());
}

auto chunking = detail::staticChunkSize(range.size(), numThreads);
IntegerT chunkSize = static_cast<IntegerT>(chunking.ceilChunkSize);

bool perfectlyChunked = chunking.transitionTaskIndex == numThreads;
bool perfectlyChunked = static_cast<size_type>(chunking.transitionTaskIndex) == numThreads;

// (!perfectlyChunked) ? chunking.transitionTaskIndex : numThreads - 1;
ssize_t firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;
size_type firstLoopLen = chunking.transitionTaskIndex - perfectlyChunked;

auto stateIt = states.begin();
IntegerT start = range.start;
ssize_t t;
size_type t;
for (t = 0; t < firstLoopLen; ++t) {
IntegerT next = static_cast<IntegerT>(start + chunkSize);
taskSet.schedule([it = stateIt++, start, next, f]() {
Expand All @@ -260,7 +276,7 @@ void parallel_for_staticImpl(
start = next;
}

if (options.wait) {
if (wait) {
f(*stateIt, start, range.end);
taskSet.wait();
} else {
Expand Down Expand Up @@ -310,8 +326,16 @@ void parallel_for(
}
return;
}
const ssize_t N = taskSet.numPoolThreads();
if (N == 0 || !options.maxThreads || range.size() == 1 ||

using size_type = typename ChunkedRange<IntegerT>::size_type;

// Ensure minItemsPerChunk is sane
uint32_t minItemsPerChunk = std::max<uint32_t>(1, options.minItemsPerChunk);
size_type maxThreads = options.maxThreads;
bool isStatic = range.isStatic();

const size_type N = taskSet.numPoolThreads();
if (N == 0 || !options.maxThreads || range.size() <= minItemsPerChunk ||
detail::PerPoolPerThreadInfo::isParForRecursive(&taskSet.pool())) {
states.emplace_back(defaultState());
f(*states.begin(), range.start, range.end);
Expand All @@ -321,15 +345,32 @@ void parallel_for(
return;
}

if (range.isStatic()) {
// Adjust down workers if we would have too-small chunks
if (minItemsPerChunk > 1) {
size_type maxWorkers = range.size() / minItemsPerChunk;
if (maxWorkers < maxThreads) {
maxThreads = static_cast<uint32_t>(maxWorkers);
}
if (range.size() / (maxThreads + options.wait) < minItemsPerChunk && range.isAuto()) {
isStatic = true;
}
} else if (range.size() <= N + options.wait) {
if (range.isAuto()) {
isStatic = true;
} else if (!range.isStatic()) {
maxThreads = range.size() - options.wait;
}
}

if (isStatic) {
detail::parallel_for_staticImpl(
taskSet, states, defaultState, range, std::forward<F>(f), options);
taskSet, states, defaultState, range, std::forward<F>(f), maxThreads, options.wait);
return;
}

const ssize_t numToLaunch = std::min<ssize_t>(options.maxThreads, N);
const size_type numToLaunch = std::min<size_type>(maxThreads, N);

for (ssize_t i = 0; i < numToLaunch + options.wait; ++i) {
for (size_type i = 0; i < numToLaunch + options.wait; ++i) {
states.emplace_back(defaultState());
}

Expand All @@ -340,7 +381,7 @@ void parallel_for(
return;
}

auto chunkInfo = range.calcChunkSize(numToLaunch, options.wait);
auto chunkInfo = range.calcChunkSize(numToLaunch, options.wait, minItemsPerChunk);
auto chunkSize = std::get<0>(chunkInfo);
auto numChunks = std::get<1>(chunkInfo);

Expand All @@ -365,7 +406,7 @@ void parallel_for(
};

auto it = states.begin();
for (ssize_t i = 0; i < numToLaunch; ++i) {
for (size_type i = 0; i < numToLaunch; ++i) {
taskSet.schedule([&s = *it++, worker]() { worker(s); });
}
worker(*it);
Expand Down Expand Up @@ -400,7 +441,7 @@ void parallel_for(
};

auto it = states.begin();
for (ssize_t i = 0; i < numToLaunch; ++i) {
for (size_type i = 0; i < numToLaunch; ++i) {
taskSet.schedule([&s = *it++, worker]() { worker(s); }, ForceQueuingTag());
}
}
Expand Down
41 changes: 41 additions & 0 deletions tests/chunked_for_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <list>
#include <vector>

#include <dispenso/concurrent_vector.h>
#include <dispenso/parallel_for.h>
#include <gtest/gtest.h>

Expand Down Expand Up @@ -267,3 +268,43 @@ TEST(ChunkedFor, LoopSmallRangeWithStateWithExternalWait) {
}
EXPECT_EQ(total, (1 << 16) - 1);
}

void minChunkSize(dispenso::ParForChunking choice, int start, int end, int minSize) {
dispenso::ConcurrentVector<std::pair<int, int>> ranges;

dispenso::ThreadPool pool(16);

dispenso::ParForOptions options;
options.minItemsPerChunk = minSize;

dispenso::parallel_for(
dispenso::makeChunkedRange(start, end, choice),
[&ranges](int ystart, int yend) {
ranges.push_back({ystart, yend});
},
options);

EXPECT_GE(ranges.size(), 1);

for (auto& r : ranges) {
if (r.second != end) {
EXPECT_LE(minSize, r.second - r.first);
}
}
}

TEST(ChunkedFor, MinChunkSizeLoopAuto) {
minChunkSize(dispenso::ParForChunking::kAuto, 0, 1000000, 200);
minChunkSize(dispenso::ParForChunking::kAuto, 0, 100, 200);
minChunkSize(dispenso::ParForChunking::kAuto, 10000, 10020, 200);
minChunkSize(dispenso::ParForChunking::kAuto, 1000000, 10000000, 20000);
minChunkSize(dispenso::ParForChunking::kAuto, -10000000, -1000000, 20000);
}

TEST(ChunkedFor, MinChunkSizeLoopStatic) {
minChunkSize(dispenso::ParForChunking::kStatic, 0, 1000000, 200);
minChunkSize(dispenso::ParForChunking::kStatic, 0, 100, 200);
minChunkSize(dispenso::ParForChunking::kStatic, 10000, 10020, 200);
minChunkSize(dispenso::ParForChunking::kStatic, 1000000, 10000000, 20000);
minChunkSize(dispenso::ParForChunking::kStatic, -10000000, -1000000, 20000);
}
14 changes: 14 additions & 0 deletions tests/greedy_for_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,17 @@ TEST(GreedyFor, ZeroThreadsWithState) {
dispenso::ThreadPool pool(0);
loopWithStateImpl<std::vector<int64_t>>(pool);
}

TEST(GreedyFor, SimpleLoopFewerItemsThanThreads) {
int w = 1000;
int h = 3;
dispenso::ThreadPool pool(5);
std::vector<int> image(w * h, 7);

std::atomic<int64_t> sum(0);

dispenso::TaskSet tasks(pool);
dispenso::parallel_for(tasks, 0, h, [w, &image, &sum](int y) { simpleInner(w, y, image, sum); });

EXPECT_EQ(sum.load(std::memory_order_relaxed), w * h * 7);
}

0 comments on commit c297716

Please sign in to comment.