Skip to content

Commit

Permalink
Fix HashBuild unspilling stuck (facebookincubator#8715)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: facebookincubator#8715

When the input of `HashBuild` is from spilling, they all come from the
same partition.  That means the spill partition bits are all same for the hashes
from these rows.  In case the hash table is large, there could be overlap between the hash bits
we use to calculate bucket index and the bits for spill partitioning.  These
bits are fixed for all rows and because they are higher bits, we end up
restricting ourselves to a smaller region of the hash table.  This results in
heavy hash collision and the hash build will take super long time and block
driver threads.

Fix this by adding a check to make sure that there will be no overlap between
the spill partitioning bits and the bits used for bucket indexing, and increase
the default spill start partition bit to 48.

Reviewed By: oerling

Differential Revision: D53589502

fbshipit-source-id: 969fe24f09a04ea3abaa4ff750de4541e438d988
  • Loading branch information
Yuhta authored and facebook-github-bot committed Feb 9, 2024
1 parent 6a40488 commit 9cf0ef0
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 19 deletions.
2 changes: 1 addition & 1 deletion velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ class QueryConfig {
/// calculate the spilling partition number for join spill or aggregation
/// spill.
uint8_t spillStartPartitionBit() const {
constexpr uint8_t kDefaultStartBit = 29;
constexpr uint8_t kDefaultStartBit = 48;
return get<uint8_t>(kSpillStartPartitionBit, kDefaultStartBit);
}

Expand Down
7 changes: 6 additions & 1 deletion velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,12 @@ void GroupingSet::addInputForActiveRows(
TestValue::adjust(
"facebook::velox::exec::GroupingSet::addInputForActiveRows", this);

table_->prepareForGroupProbe(*lookup_, input, activeRows_, ignoreNullKeys_);
table_->prepareForGroupProbe(
*lookup_,
input,
activeRows_,
ignoreNullKeys_,
BaseHashTable::kNoSpillInputStartPartitionBit);
if (lookup_->rows.empty()) {
// No rows to probe. Can happen when ignoreNullKeys_ is true and all rows
// have null keys.
Expand Down
4 changes: 3 additions & 1 deletion velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,9 @@ bool HashBuild::finishHashBuild() {
table_->prepareJoinTable(
std::move(otherTables),
allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor()
: nullptr);
: nullptr,
isInputFromSpill() ? spillConfig()->startPartitionBit
: BaseHashTable::kNoSpillInputStartPartitionBit);
addRuntimeStats();
if (joinBridge_->setHashTable(
std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) {
Expand Down
11 changes: 8 additions & 3 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,8 @@ bool mayUseValueIds(const BaseHashTable& table) {
template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
folly::Executor* executor) {
folly::Executor* executor,
int8_t spillInputStartPartitionBit) {
buildExecutor_ = executor;
otherTables_.reserve(tables.size());
for (auto& table : tables) {
Expand Down Expand Up @@ -1650,6 +1651,7 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
} else {
decideHashMode(0);
}
checkHashBitsOverlap(spillInputStartPartitionBit);
}

template <bool ignoreNullKeys>
Expand Down Expand Up @@ -1982,7 +1984,9 @@ void BaseHashTable::prepareForGroupProbe(
HashLookup& lookup,
const RowVectorPtr& input,
SelectivityVector& rows,
bool ignoreNullKeys) {
bool ignoreNullKeys,
int8_t spillInputStartPartitionBit) {
checkHashBitsOverlap(spillInputStartPartitionBit);
auto& hashers = lookup.hashers;

for (auto& hasher : hashers) {
Expand Down Expand Up @@ -2015,7 +2019,8 @@ void BaseHashTable::prepareForGroupProbe(
decideHashMode(input->size());
// Do not forward 'ignoreNullKeys' to avoid redundant evaluation of
// deselectRowsWithNulls.
prepareForGroupProbe(lookup, input, rows, false);
prepareForGroupProbe(
lookup, input, rows, false, spillInputStartPartitionBit);
return;
}
}
Expand Down
44 changes: 35 additions & 9 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ class BaseHashTable {
/// Specifies the hash mode of a table.
enum class HashMode { kHash, kArray, kNormalizedKey };

static constexpr int8_t kNoSpillInputStartPartitionBit = -1;

/// Returns the string of the given 'mode'.
static std::string modeString(HashMode mode);

Expand Down Expand Up @@ -181,7 +183,8 @@ class BaseHashTable {
HashLookup& lookup,
const RowVectorPtr& input,
SelectivityVector& rows,
bool ignoreNullKeys);
bool ignoreNullKeys,
int8_t spillInputStartPartitionBit);

/// Finds or creates a group for each key in 'lookup'. The keys are
/// returned in 'lookup.hits'.
Expand Down Expand Up @@ -248,7 +251,8 @@ class BaseHashTable {

virtual void prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
folly::Executor* executor = nullptr) = 0;
folly::Executor* executor = nullptr,
int8_t spillInputStartPartitionBit = kNoSpillInputStartPartitionBit) = 0;

/// Returns the memory footprint in bytes for any data structures
/// owned by 'this'.
Expand Down Expand Up @@ -328,7 +332,12 @@ class BaseHashTable {

/// Extracts a 7 bit tag from a hash number. The high bit is always set.
static uint8_t hashTag(uint64_t hash) {
return static_cast<uint8_t>(hash >> 32) | 0x80;
// This is likely all 0 for small key types (<= 32 bits). Not an issue
// because small types have a range that makes them normalized key cases.
// If there are multiple small type keys, they are mixed which makes them a
// 64 bit hash. Normalized keys are mixed before being used as hash
// numbers.
return static_cast<uint8_t>(hash >> 38) | 0x80;
}

/// Loads a vector of tags for bulk comparison. Disables tsan errors
Expand Down Expand Up @@ -365,6 +374,20 @@ class BaseHashTable {

virtual void setHashMode(HashMode mode, int32_t numNew) = 0;

virtual int sizeBits() const = 0;

// We don't want any overlap in the bit ranges used by bucket index and those
// used by spill partitioning; otherwise because we receive data from only one
// partition, the overlapped bits would be the same and only a fraction of the
// buckets would be used. This would cause the insertion taking very long
// time and block driver threads.
void checkHashBitsOverlap(int8_t spillInputStartPartitionBit) {
if (spillInputStartPartitionBit != kNoSpillInputStartPartitionBit &&
hashMode() != HashMode::kArray) {
VELOX_CHECK_LE(sizeBits(), spillInputStartPartitionBit);
}
}

std::vector<std::unique_ptr<VectorHasher>> hashers_;
std::unique_ptr<RowContainer> rows_;

Expand Down Expand Up @@ -525,7 +548,9 @@ class HashTable : public BaseHashTable {
// and VectorHashers and decides the hash mode and representation.
void prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
folly::Executor* executor = nullptr) override;
folly::Executor* executor = nullptr,
int8_t spillInputStartPartitionBit =
kNoSpillInputStartPartitionBit) override;

uint64_t hashTableSizeIncrease(int32_t numNewDistinct) const override {
if (numDistinct_ + numNewDistinct > rehashSize()) {
Expand Down Expand Up @@ -587,10 +612,6 @@ class HashTable : public BaseHashTable {
// occupy exactly two (64 bytes) cache lines.
class Bucket {
public:
Bucket() {
static_assert(sizeof(Bucket) == 128);
}

uint8_t tagAt(int32_t slotIndex) {
return reinterpret_cast<uint8_t*>(&tags_)[slotIndex];
}
Expand Down Expand Up @@ -622,6 +643,7 @@ class HashTable : public BaseHashTable {
char padding_[16];
};

static_assert(sizeof(Bucket) == 128);
static constexpr uint64_t kBucketSize = sizeof(Bucket);

// Returns the bucket at byte offset 'offset' from 'table_'.
Expand Down Expand Up @@ -881,6 +903,10 @@ class HashTable : public BaseHashTable {
}
}

int sizeBits() const final {
return sizeBits_;
}

// The min table size in row to trigger parallel join table build.
const uint32_t minTableSizeForParallelJoinBuild_;

Expand Down Expand Up @@ -938,7 +964,7 @@ class HashTable : public BaseHashTable {

// Executor for parallelizing hash join build. This may be the
// executor for Drivers. If this executor is indefinitely taken by
// other work, the thread of prepareJoinTables() will sequentially
// other work, the thread of prepareJoinTable() will sequentially
// execute the parallel build steps.
folly::Executor* buildExecutor_{nullptr};

Expand Down
13 changes: 10 additions & 3 deletions velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ void RowNumber::addInput(RowVectorPtr input) {
}

SelectivityVector rows(numInput);
table_->prepareForGroupProbe(*lookup_, input, rows, false);
table_->prepareForGroupProbe(
*lookup_,
input,
rows,
false,
BaseHashTable::kNoSpillInputStartPartitionBit);
table_->groupProbe(*lookup_);

// Initialize new partitions with zeros.
Expand All @@ -93,7 +98,8 @@ void RowNumber::addInput(RowVectorPtr input) {
void RowNumber::addSpillInput() {
const auto numInput = input_->size();
SelectivityVector rows(numInput);
table_->prepareForGroupProbe(*lookup_, input_, rows, false);
table_->prepareForGroupProbe(
*lookup_, input_, rows, false, spillConfig_->startPartitionBit);
table_->groupProbe(*lookup_);

// Initialize new partitions with zeros.
Expand Down Expand Up @@ -157,7 +163,8 @@ void RowNumber::restoreNextSpillPartition() {

const auto numInput = input->size();
SelectivityVector rows(numInput);
table_->prepareForGroupProbe(*lookup_, input, rows, false);
table_->prepareForGroupProbe(
*lookup_, input, rows, false, spillConfig_->startPartitionBit);
table_->groupProbe(*lookup_);

auto* counts = data->children().back()->as<FlatVector<int64_t>>();
Expand Down
7 changes: 6 additions & 1 deletion velox/exec/TopNRowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,12 @@ void TopNRowNumber::addInput(RowVectorPtr input) {
ensureInputFits(input);

SelectivityVector rows(numInput);
table_->prepareForGroupProbe(*lookup_, input, rows, false);
table_->prepareForGroupProbe(
*lookup_,
input,
rows,
false,
BaseHashTable::kNoSpillInputStartPartitionBit);
table_->groupProbe(*lookup_);

// Initialize new partitions.
Expand Down
20 changes: 20 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5038,6 +5038,22 @@ TEST_F(HashJoinTest, spillFileSize) {
}
}

TEST_F(HashJoinTest, spillPartitionBitsOverlap) {
auto builder =
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.numDrivers(numDrivers_)
.keyTypes({BIGINT(), BIGINT()})
.probeVectors(2'000, 3)
.buildVectors(2'000, 3)
.referenceQuery(
"SELECT t_k0, t_k1, t_data, u_k0, u_k1, u_data FROM t, u WHERE t_k0 = u_k0 and t_k1 = u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "8")
.config(core::QueryConfig::kJoinSpillPartitionBits, "1")
.checkSpillStats(false)
.maxSpillLevel(0);
VELOX_ASSERT_THROW(builder.run(), "vs. 8");
}

// The test is to verify if the hash build reservation has been released on
// task error.
DEBUG_ONLY_TEST_F(HashJoinTest, buildReservationReleaseCheck) {
Expand Down Expand Up @@ -5242,6 +5258,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) {
.spillDirectory(testData.spillEnabled ? tempDirectory->path : "")
.referenceQuery(
"SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
const auto statsPair = taskSpilledStats(*task);
if (testData.expectedReclaimable) {
Expand Down Expand Up @@ -5394,6 +5411,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) {
.spillDirectory(tempDirectory->path)
.referenceQuery(
"SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
const auto statsPair = taskSpilledStats(*task);
ASSERT_GT(statsPair.first.spilledBytes, 0);
Expand Down Expand Up @@ -5788,6 +5806,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) {
.spillDirectory(tempDirectory->path)
.referenceQuery(
"SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
const auto statsPair = taskSpilledStats(*task);
ASSERT_GT(statsPair.first.spilledBytes, 0);
Expand Down Expand Up @@ -6351,6 +6370,7 @@ TEST_F(HashJoinTest, exceededMaxSpillLevel) {
.spillDirectory(tempDirectory->path)
.referenceQuery(
"SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1")
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.verifier([&](const std::shared_ptr<Task>& task, bool /*unused*/) {
auto joinStats = task->taskStats()
.pipelineStats.back()
Expand Down
1 change: 1 addition & 0 deletions velox/exec/tests/utils/ArbitratorTestUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ QueryTestResult runHashJoinTask(
.spillDirectory(spillDirectory->path)
.config(core::QueryConfig::kSpillEnabled, true)
.config(core::QueryConfig::kJoinSpillEnabled, true)
.config(core::QueryConfig::kSpillStartPartitionBit, "29")
.queryCtx(queryCtx)
.maxDrivers(numDrivers)
.copyResults(pool, result.task);
Expand Down

0 comments on commit 9cf0ef0

Please sign in to comment.