Skip to content

Commit

Permalink
RFC Add TaskFlow pipeline to pipeline benchmarks
Browse files Browse the repository at this point in the history
Summary: As title.  Unfortunately, the parallel version of this crashes (or hangs in TSAN mode).  Suggestions welcome.

Reviewed By: elliotsegal-fb

Differential Revision: D66110306

fbshipit-source-id: b5e6d27b89facb41c0be0fafcc72b0c9f3479716
  • Loading branch information
graphicsMan authored and facebook-github-bot committed Nov 27, 2024
1 parent bdef795 commit c3a7a3d
Showing 1 changed file with 115 additions and 0 deletions.
115 changes: 115 additions & 0 deletions benchmarks/pipeline_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
#include "tbb/pipeline.h"
#endif // !BENCHMARK_WITHOUT_TBB

#include <taskflow/taskflow.hpp>
#if TF_VERSION > 300000
#include <taskflow/algorithm/pipeline.hpp>
#endif // TF_VERSION

// (1) Generate images
// (2) Calculate geometric mean
// (3) Tonemap based on geometric mean
Expand All @@ -31,6 +36,16 @@ constexpr size_t kSeed = 55;
struct Work {
Work(size_t idx) : index(idx) {}

Work(Work&& w)
: index(w.index), geometricMean(w.geometricMean), inputImage(std::move(w.inputImage)) {}

Work& operator=(Work&& w) {
index = w.index;
geometricMean = w.geometricMean;
inputImage = std::move(w.inputImage);
return *this;
}

size_t index;
double geometricMean = 0;
std::unique_ptr<uint16_t[]> inputImage;
Expand Down Expand Up @@ -253,14 +268,114 @@ void BM_tbb_par(benchmark::State& state) {
}
#endif // !BENCHMARK_WITHOUT_TBB

void runTaskflow(std::vector<std::unique_ptr<uint8_t[]>>& results, tf::Executor& exec) {
results.resize(kNumImages);
std::vector<std::unique_ptr<Work>> work;
// Ensure we don't resize underlying buffer causing data races
work.reserve(kNumImages);

tf::Taskflow taskflow;

size_t counter2 = 0;
size_t counter3 = 0;
tf::Pipeline pl(
std::thread::hardware_concurrency(),
tf::Pipe{
tf::PipeType::SERIAL,
[&work](auto& pf) mutable {
if (work.size() < kNumImages) {
work.push_back(std::make_unique<Work>(fillImage(Work(work.size()))));
} else {
pf.stop();
}
}},
tf::Pipe{
tf::PipeType::SERIAL,
[&counter2, &work](auto& pf) mutable {
Work& w = *work[counter2++];
w = computeGeometricMean(std::move(w));
}},
tf::Pipe{tf::PipeType::SERIAL, [&counter3, &work, &results](auto& pf) mutable {
Work& w = *work[counter3];
results[counter3++] = tonemap(std::move(w));
}});
taskflow.composed_of(pl);
exec.run(taskflow).wait();
}

void BM_taskflow(benchmark::State& state) {
std::vector<std::unique_ptr<uint8_t[]>> results;
tf::Executor executor(std::thread::hardware_concurrency());

for (auto UNUSED_VAR : state) {
runTaskflow(results, executor);
}

checkResults(results);
}

// TODO(bbudge): Debug this. Unclear exactly why this crashes and/or hangs (TSAN)
void runTaskflowPar(std::vector<std::unique_ptr<uint8_t[]>>& results, tf::Executor& exec) {
results.resize(kNumImages);
std::vector<std::unique_ptr<Work>> work;
// Ensure we don't resize underlying buffer causing data races
work.reserve(kNumImages);

tf::Taskflow taskflow;

std::atomic<size_t> counter2 = 0;
std::atomic<size_t> counter3 = 0;
tf::Pipeline pl(
std::thread::hardware_concurrency(),
tf::Pipe{
tf::PipeType::SERIAL,
[&work](auto& pf) mutable {
if (work.size() < kNumImages) {
work.push_back(std::make_unique<Work>(fillImage(Work(work.size()))));
} else {
pf.stop();
}
}},
tf::Pipe{
tf::PipeType::PARALLEL,
[&counter2, &work](auto& pf) mutable {
Work& w = *work[counter2.fetch_add(1, std::memory_order_relaxed)];
w = computeGeometricMean(std::move(w));
}},
tf::Pipe{tf::PipeType::PARALLEL, [&counter3, &work, &results](auto& pf) mutable {
size_t index = counter3.fetch_add(1, std::memory_order_relaxed);
Work& w = *work[index];
results[index] = tonemap(std::move(w));
}});

taskflow.composed_of(pl);
exec.run(taskflow).wait();
}

void BM_taskflow_par(benchmark::State& state) {
std::vector<std::unique_ptr<uint8_t[]>> results;
tf::Executor executor(std::thread::hardware_concurrency());

for (auto UNUSED_VAR : state) {
runTaskflowPar(results, executor);
}

checkResults(results);
}

BENCHMARK(BM_serial)->UseRealTime();
BENCHMARK(BM_dispenso)->UseRealTime();
#if !defined(BENCHMARK_WITHOUT_TBB)
BENCHMARK(BM_tbb)->UseRealTime();
#endif // !BENCHMARK_WITHOUT_TBB
BENCHMARK(BM_taskflow)->UseRealTime();

BENCHMARK(BM_dispenso_par)->UseRealTime();
#if !defined(BENCHMARK_WITHOUT_TBB)
BENCHMARK(BM_tbb_par)->UseRealTime();
#endif // !BENCHMARK_WITHOUT_TBB

// TODO(bbudge): Re-enable once this is fixed.
// BENCHMARK(BM_taskflow_par)->UseRealTime();

BENCHMARK_MAIN();

0 comments on commit c3a7a3d

Please sign in to comment.