From 9cf0ef0f59485a31e871a2311278af08a980cd0e Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Fri, 9 Feb 2024 14:46:46 -0800 Subject: [PATCH] Fix HashBuild unspilling stuck (#8715) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/8715 When the input of `HashBuild` is from spilling, they all come from the same partition. That means the spill partition bits are all same for the hashes from these rows. In case the hash table is large, there could be overlap between the hash bits we use to calculate bucket index and the bits for spill partitioning. These bits are fixed for all rows and because they are higher bits, we end up restricting ourselves to a smaller region of the hash table. This results in heavy hash collision and the hash build will take super long time and block driver threads. Fix this by adding a check to make sure that there will be no overlap between the spill partitioning bits and the bits used for bucket indexing, and increase the default spill start partition bit to 48. Reviewed By: oerling Differential Revision: D53589502 fbshipit-source-id: 969fe24f09a04ea3abaa4ff750de4541e438d988 --- velox/core/QueryConfig.h | 2 +- velox/exec/GroupingSet.cpp | 7 ++- velox/exec/HashBuild.cpp | 4 +- velox/exec/HashTable.cpp | 11 +++-- velox/exec/HashTable.h | 44 +++++++++++++++---- velox/exec/RowNumber.cpp | 13 ++++-- velox/exec/TopNRowNumber.cpp | 7 ++- velox/exec/tests/HashJoinTest.cpp | 20 +++++++++ velox/exec/tests/utils/ArbitratorTestUtil.cpp | 1 + 9 files changed, 90 insertions(+), 19 deletions(-) diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 774c87f52517..39bf52cfdab9 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -578,7 +578,7 @@ class QueryConfig { /// calculate the spilling partition number for join spill or aggregation /// spill. uint8_t spillStartPartitionBit() const { - constexpr uint8_t kDefaultStartBit = 29; + constexpr uint8_t kDefaultStartBit = 48; return get(kSpillStartPartitionBit, kDefaultStartBit); } diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index e45f4e5cb8c6..a0fb9136a89a 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -224,7 +224,12 @@ void GroupingSet::addInputForActiveRows( TestValue::adjust( "facebook::velox::exec::GroupingSet::addInputForActiveRows", this); - table_->prepareForGroupProbe(*lookup_, input, activeRows_, ignoreNullKeys_); + table_->prepareForGroupProbe( + *lookup_, + input, + activeRows_, + ignoreNullKeys_, + BaseHashTable::kNoSpillInputStartPartitionBit); if (lookup_->rows.empty()) { // No rows to probe. Can happen when ignoreNullKeys_ is true and all rows // have null keys. diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index ce8a2f76ceb8..bfeb1cd6ff4e 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -823,7 +823,9 @@ bool HashBuild::finishHashBuild() { table_->prepareJoinTable( std::move(otherTables), allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor() - : nullptr); + : nullptr, + isInputFromSpill() ? spillConfig()->startPartitionBit + : BaseHashTable::kNoSpillInputStartPartitionBit); addRuntimeStats(); if (joinBridge_->setHashTable( std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) { diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index ce89fe1d60dd..d09b3bc041f4 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -1607,7 +1607,8 @@ bool mayUseValueIds(const BaseHashTable& table) { template void HashTable::prepareJoinTable( std::vector> tables, - folly::Executor* executor) { + folly::Executor* executor, + int8_t spillInputStartPartitionBit) { buildExecutor_ = executor; otherTables_.reserve(tables.size()); for (auto& table : tables) { @@ -1650,6 +1651,7 @@ void HashTable::prepareJoinTable( } else { decideHashMode(0); } + checkHashBitsOverlap(spillInputStartPartitionBit); } template @@ -1982,7 +1984,9 @@ void BaseHashTable::prepareForGroupProbe( HashLookup& lookup, const RowVectorPtr& input, SelectivityVector& rows, - bool ignoreNullKeys) { + bool ignoreNullKeys, + int8_t spillInputStartPartitionBit) { + checkHashBitsOverlap(spillInputStartPartitionBit); auto& hashers = lookup.hashers; for (auto& hasher : hashers) { @@ -2015,7 +2019,8 @@ void BaseHashTable::prepareForGroupProbe( decideHashMode(input->size()); // Do not forward 'ignoreNullKeys' to avoid redundant evaluation of // deselectRowsWithNulls. - prepareForGroupProbe(lookup, input, rows, false); + prepareForGroupProbe( + lookup, input, rows, false, spillInputStartPartitionBit); return; } } diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 5dc6e128934c..eec394caf599 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -121,6 +121,8 @@ class BaseHashTable { /// Specifies the hash mode of a table. enum class HashMode { kHash, kArray, kNormalizedKey }; + static constexpr int8_t kNoSpillInputStartPartitionBit = -1; + /// Returns the string of the given 'mode'. static std::string modeString(HashMode mode); @@ -181,7 +183,8 @@ class BaseHashTable { HashLookup& lookup, const RowVectorPtr& input, SelectivityVector& rows, - bool ignoreNullKeys); + bool ignoreNullKeys, + int8_t spillInputStartPartitionBit); /// Finds or creates a group for each key in 'lookup'. The keys are /// returned in 'lookup.hits'. @@ -248,7 +251,8 @@ class BaseHashTable { virtual void prepareJoinTable( std::vector> tables, - folly::Executor* executor = nullptr) = 0; + folly::Executor* executor = nullptr, + int8_t spillInputStartPartitionBit = kNoSpillInputStartPartitionBit) = 0; /// Returns the memory footprint in bytes for any data structures /// owned by 'this'. @@ -328,7 +332,12 @@ class BaseHashTable { /// Extracts a 7 bit tag from a hash number. The high bit is always set. static uint8_t hashTag(uint64_t hash) { - return static_cast(hash >> 32) | 0x80; + // This is likely all 0 for small key types (<= 32 bits). Not an issue + // because small types have a range that makes them normalized key cases. + // If there are multiple small type keys, they are mixed which makes them a + // 64 bit hash. Normalized keys are mixed before being used as hash + // numbers. + return static_cast(hash >> 38) | 0x80; } /// Loads a vector of tags for bulk comparison. Disables tsan errors @@ -365,6 +374,20 @@ class BaseHashTable { virtual void setHashMode(HashMode mode, int32_t numNew) = 0; + virtual int sizeBits() const = 0; + + // We don't want any overlap in the bit ranges used by bucket index and those + // used by spill partitioning; otherwise because we receive data from only one + // partition, the overlapped bits would be the same and only a fraction of the + // buckets would be used. This would cause the insertion taking very long + // time and block driver threads. + void checkHashBitsOverlap(int8_t spillInputStartPartitionBit) { + if (spillInputStartPartitionBit != kNoSpillInputStartPartitionBit && + hashMode() != HashMode::kArray) { + VELOX_CHECK_LE(sizeBits(), spillInputStartPartitionBit); + } + } + std::vector> hashers_; std::unique_ptr rows_; @@ -525,7 +548,9 @@ class HashTable : public BaseHashTable { // and VectorHashers and decides the hash mode and representation. void prepareJoinTable( std::vector> tables, - folly::Executor* executor = nullptr) override; + folly::Executor* executor = nullptr, + int8_t spillInputStartPartitionBit = + kNoSpillInputStartPartitionBit) override; uint64_t hashTableSizeIncrease(int32_t numNewDistinct) const override { if (numDistinct_ + numNewDistinct > rehashSize()) { @@ -587,10 +612,6 @@ class HashTable : public BaseHashTable { // occupy exactly two (64 bytes) cache lines. class Bucket { public: - Bucket() { - static_assert(sizeof(Bucket) == 128); - } - uint8_t tagAt(int32_t slotIndex) { return reinterpret_cast(&tags_)[slotIndex]; } @@ -622,6 +643,7 @@ class HashTable : public BaseHashTable { char padding_[16]; }; + static_assert(sizeof(Bucket) == 128); static constexpr uint64_t kBucketSize = sizeof(Bucket); // Returns the bucket at byte offset 'offset' from 'table_'. @@ -881,6 +903,10 @@ class HashTable : public BaseHashTable { } } + int sizeBits() const final { + return sizeBits_; + } + // The min table size in row to trigger parallel join table build. const uint32_t minTableSizeForParallelJoinBuild_; @@ -938,7 +964,7 @@ class HashTable : public BaseHashTable { // Executor for parallelizing hash join build. This may be the // executor for Drivers. If this executor is indefinitely taken by - // other work, the thread of prepareJoinTables() will sequentially + // other work, the thread of prepareJoinTable() will sequentially // execute the parallel build steps. folly::Executor* buildExecutor_{nullptr}; diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index f753cde0e38f..04f289c818ee 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -78,7 +78,12 @@ void RowNumber::addInput(RowVectorPtr input) { } SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input, rows, false); + table_->prepareForGroupProbe( + *lookup_, + input, + rows, + false, + BaseHashTable::kNoSpillInputStartPartitionBit); table_->groupProbe(*lookup_); // Initialize new partitions with zeros. @@ -93,7 +98,8 @@ void RowNumber::addInput(RowVectorPtr input) { void RowNumber::addSpillInput() { const auto numInput = input_->size(); SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input_, rows, false); + table_->prepareForGroupProbe( + *lookup_, input_, rows, false, spillConfig_->startPartitionBit); table_->groupProbe(*lookup_); // Initialize new partitions with zeros. @@ -157,7 +163,8 @@ void RowNumber::restoreNextSpillPartition() { const auto numInput = input->size(); SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input, rows, false); + table_->prepareForGroupProbe( + *lookup_, input, rows, false, spillConfig_->startPartitionBit); table_->groupProbe(*lookup_); auto* counts = data->children().back()->as>(); diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index ceb9dc2131e3..5ad9184c0bd5 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -191,7 +191,12 @@ void TopNRowNumber::addInput(RowVectorPtr input) { ensureInputFits(input); SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input, rows, false); + table_->prepareForGroupProbe( + *lookup_, + input, + rows, + false, + BaseHashTable::kNoSpillInputStartPartitionBit); table_->groupProbe(*lookup_); // Initialize new partitions. diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 573c618989b4..08dc5b8f7604 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5038,6 +5038,22 @@ TEST_F(HashJoinTest, spillFileSize) { } } +TEST_F(HashJoinTest, spillPartitionBitsOverlap) { + auto builder = + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .numDrivers(numDrivers_) + .keyTypes({BIGINT(), BIGINT()}) + .probeVectors(2'000, 3) + .buildVectors(2'000, 3) + .referenceQuery( + "SELECT t_k0, t_k1, t_data, u_k0, u_k1, u_data FROM t, u WHERE t_k0 = u_k0 and t_k1 = u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "8") + .config(core::QueryConfig::kJoinSpillPartitionBits, "1") + .checkSpillStats(false) + .maxSpillLevel(0); + VELOX_ASSERT_THROW(builder.run(), "vs. 8"); +} + // The test is to verify if the hash build reservation has been released on // task error. DEBUG_ONLY_TEST_F(HashJoinTest, buildReservationReleaseCheck) { @@ -5242,6 +5258,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { .spillDirectory(testData.spillEnabled ? tempDirectory->path : "") .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { const auto statsPair = taskSpilledStats(*task); if (testData.expectedReclaimable) { @@ -5394,6 +5411,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) { .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { const auto statsPair = taskSpilledStats(*task); ASSERT_GT(statsPair.first.spilledBytes, 0); @@ -5788,6 +5806,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { const auto statsPair = taskSpilledStats(*task); ASSERT_GT(statsPair.first.spilledBytes, 0); @@ -6351,6 +6370,7 @@ TEST_F(HashJoinTest, exceededMaxSpillLevel) { .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { auto joinStats = task->taskStats() .pipelineStats.back() diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.cpp b/velox/exec/tests/utils/ArbitratorTestUtil.cpp index aeb75584beab..62761c9f33d8 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.cpp +++ b/velox/exec/tests/utils/ArbitratorTestUtil.cpp @@ -99,6 +99,7 @@ QueryTestResult runHashJoinTask( .spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool, result.task);