From c3a7a3d5323778ff31dbab7a7ee6c0b7f0f945c6 Mon Sep 17 00:00:00 2001 From: Brian Budge Date: Tue, 26 Nov 2024 17:21:21 -0800 Subject: [PATCH] RFC Add TaskFlow pipeline to pipeline benchmarks Summary: As title. Unfortunately, the parallel version of this crashes (or hangs in TSAN mode). Suggestions welcome. Reviewed By: elliotsegal-fb Differential Revision: D66110306 fbshipit-source-id: b5e6d27b89facb41c0be0fafcc72b0c9f3479716 --- benchmarks/pipeline_benchmark.cpp | 115 ++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/benchmarks/pipeline_benchmark.cpp b/benchmarks/pipeline_benchmark.cpp index 676813b..44b1168 100644 --- a/benchmarks/pipeline_benchmark.cpp +++ b/benchmarks/pipeline_benchmark.cpp @@ -19,6 +19,11 @@ #include "tbb/pipeline.h" #endif // !BENCHMARK_WITHOUT_TBB +#include +#if TF_VERSION > 300000 +#include +#endif // TF_VERSION + // (1) Generate images // (2) Calculate geometric mean // (3) Tonemap based on geometric mean @@ -31,6 +36,16 @@ constexpr size_t kSeed = 55; struct Work { Work(size_t idx) : index(idx) {} + Work(Work&& w) + : index(w.index), geometricMean(w.geometricMean), inputImage(std::move(w.inputImage)) {} + + Work& operator=(Work&& w) { + index = w.index; + geometricMean = w.geometricMean; + inputImage = std::move(w.inputImage); + return *this; + } + size_t index; double geometricMean = 0; std::unique_ptr inputImage; @@ -253,14 +268,114 @@ void BM_tbb_par(benchmark::State& state) { } #endif // !BENCHMARK_WITHOUT_TBB +void runTaskflow(std::vector>& results, tf::Executor& exec) { + results.resize(kNumImages); + std::vector> work; + // Ensure we don't resize underlying buffer causing data races + work.reserve(kNumImages); + + tf::Taskflow taskflow; + + size_t counter2 = 0; + size_t counter3 = 0; + tf::Pipeline pl( + std::thread::hardware_concurrency(), + tf::Pipe{ + tf::PipeType::SERIAL, + [&work](auto& pf) mutable { + if (work.size() < kNumImages) { + work.push_back(std::make_unique(fillImage(Work(work.size())))); + } else { + pf.stop(); + } + }}, + tf::Pipe{ + tf::PipeType::SERIAL, + [&counter2, &work](auto& pf) mutable { + Work& w = *work[counter2++]; + w = computeGeometricMean(std::move(w)); + }}, + tf::Pipe{tf::PipeType::SERIAL, [&counter3, &work, &results](auto& pf) mutable { + Work& w = *work[counter3]; + results[counter3++] = tonemap(std::move(w)); + }}); + taskflow.composed_of(pl); + exec.run(taskflow).wait(); +} + +void BM_taskflow(benchmark::State& state) { + std::vector> results; + tf::Executor executor(std::thread::hardware_concurrency()); + + for (auto UNUSED_VAR : state) { + runTaskflow(results, executor); + } + + checkResults(results); +} + +// TODO(bbudge): Debug this. Unclear exactly why this crashes and/or hangs (TSAN) +void runTaskflowPar(std::vector>& results, tf::Executor& exec) { + results.resize(kNumImages); + std::vector> work; + // Ensure we don't resize underlying buffer causing data races + work.reserve(kNumImages); + + tf::Taskflow taskflow; + + std::atomic counter2 = 0; + std::atomic counter3 = 0; + tf::Pipeline pl( + std::thread::hardware_concurrency(), + tf::Pipe{ + tf::PipeType::SERIAL, + [&work](auto& pf) mutable { + if (work.size() < kNumImages) { + work.push_back(std::make_unique(fillImage(Work(work.size())))); + } else { + pf.stop(); + } + }}, + tf::Pipe{ + tf::PipeType::PARALLEL, + [&counter2, &work](auto& pf) mutable { + Work& w = *work[counter2.fetch_add(1, std::memory_order_relaxed)]; + w = computeGeometricMean(std::move(w)); + }}, + tf::Pipe{tf::PipeType::PARALLEL, [&counter3, &work, &results](auto& pf) mutable { + size_t index = counter3.fetch_add(1, std::memory_order_relaxed); + Work& w = *work[index]; + results[index] = tonemap(std::move(w)); + }}); + + taskflow.composed_of(pl); + exec.run(taskflow).wait(); +} + +void BM_taskflow_par(benchmark::State& state) { + std::vector> results; + tf::Executor executor(std::thread::hardware_concurrency()); + + for (auto UNUSED_VAR : state) { + runTaskflowPar(results, executor); + } + + checkResults(results); +} + BENCHMARK(BM_serial)->UseRealTime(); BENCHMARK(BM_dispenso)->UseRealTime(); #if !defined(BENCHMARK_WITHOUT_TBB) BENCHMARK(BM_tbb)->UseRealTime(); #endif // !BENCHMARK_WITHOUT_TBB +BENCHMARK(BM_taskflow)->UseRealTime(); + BENCHMARK(BM_dispenso_par)->UseRealTime(); #if !defined(BENCHMARK_WITHOUT_TBB) BENCHMARK(BM_tbb_par)->UseRealTime(); #endif // !BENCHMARK_WITHOUT_TBB +// TODO(bbudge): Re-enable once this is fixed. +// BENCHMARK(BM_taskflow_par)->UseRealTime(); + BENCHMARK_MAIN();