Skip to content

Commit

Permalink
Add caching support for dictionary encoded passthrough (#88)
Browse files Browse the repository at this point in the history
Summary:

Allows for caching of the last element when a dictionary encoded array is given for passthrough.

This will bring us up to parity of local file encoding with the original arrayWithOffsets, as two consecutive datasets, either passed as unencoded array or already encoded dictionaryArray will now have the same output into the file.

Reviewed By: helfman

Differential Revision: D60413733
  • Loading branch information
Kunal Kataria authored and facebook-github-bot committed Oct 10, 2024
1 parent e712dfd commit dd7b52d
Show file tree
Hide file tree
Showing 2 changed files with 311 additions and 28 deletions.
35 changes: 30 additions & 5 deletions dwio/nimble/velox/FieldWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1124,13 +1124,27 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
dictionaryVector.valueVector()->template as<velox::ArrayVector>();

auto previousOffset = -1;
bool newElementIngested = false;
auto ingestDictionaryIndex = [&](auto index) {
// skip writing length if previous offset was the same
if (previousOffset < 0 || offsetsData.empty() ||
offsets[index] != previousOffset) {
bool match = false;
// Only write length if first element or if consecutive offset is
// different, meaning we have reached a new value element.
if (previousOffset >= 0) {
match = (offsets[index] == previousOffset);
} else if (cached_) {
velox::CompareFlags flags;
match =
(valuesArrayVector->sizeAt(offsets[index]) == cachedSize_ &&
valuesArrayVector
->compare(cachedValue_.get(), offsets[index], 0, flags)
.value_or(-1) == 0);
}

if (!match) {
auto arrayOffset = valuesArrayVector->offsetAt(offsets[index]);
auto length = valuesArrayVector->sizeAt(offsets[index]);
lengthsData.push_back(length);
newElementIngested = true;
if (length > 0) {
filteredRanges.add(arrayOffset, length);
}
Expand All @@ -1152,8 +1166,19 @@ class ArrayWithOffsetsFieldWriter : public FieldWriter {
} else {
ranges.applyEach([&](auto index) { ingestDictionaryIndex(index); });
}
// ensure that we mark cache as invalidated
cached_ = false;

// insert last element discovered into cache
if (newElementIngested) {
cached_ = true;
cachedSize_ = lengthsData[lengthsData.size() - 1];
cachedValue_->prepareForReuse();
velox::BaseVector::CopyRange cacheRange{
static_cast<velox::vector_size_t>(previousOffset) /* source index*/,
0 /* target index*/,
1 /* count*/};
cachedValue_->copyRanges(valuesArrayVector, folly::Range(&cacheRange, 1));
}

return valuesArrayVector;
}

Expand Down
Loading

0 comments on commit dd7b52d

Please sign in to comment.