Skip to content

Commit

Permalink
Append row number column in batch reader (#78)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #78

We extend the nimble velox reader to support an additional row numbers column when present and also create a function to return the row number of the first row in the last read.

Differential Revision: D61506232
  • Loading branch information
MacVincent Agha-Oko authored and facebook-github-bot committed Sep 17, 2024
1 parent 6a53405 commit db15ee6
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 14 deletions.
37 changes: 29 additions & 8 deletions dwio/nimble/velox/FieldReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1729,11 +1729,13 @@ class RowFieldReader final : public FieldReader {
Decoder* decoder,
std::vector<std::unique_ptr<FieldReader>> childrenReaders,
Vector<bool>& boolBuffer,
folly::Executor* executor)
folly::Executor* executor,
bool withRowNumbersColumn)
: FieldReader{pool, std::move(type), decoder},
childrenReaders_{std::move(childrenReaders)},
boolBuffer_{boolBuffer},
executor_{executor} {}
executor_{executor},
withRowNumbersColumn_{withRowNumbersColumn} {}

std::optional<std::pair<uint32_t, uint64_t>> estimatedRowSize() const final {
uint64_t totalBytes{0};
Expand Down Expand Up @@ -1791,8 +1793,17 @@ class RowFieldReader final : public FieldReader {
auto rowCount = scatterCount(count, scatterBitmap);
auto vector = VectorInitializer<velox::RowVector>::initialize(
&pool_, output, type_, rowCount);
vector->children().resize(childrenReaders_.size());
vector->unsafeResize(rowCount);

// If we determine that the presence of an additional child (representing
// row numbers) beyond those in the row's type, there is no need to resize
// its children since this only happens when the row vector is being reused.
bool reuseVector = withRowNumbersColumn_ &&
(vector->children().size() == type_->size() + 1);
if (!reuseVector) {
vector->children().resize(childrenReaders_.size());
}

const void* childrenBits = nullptr;
uint32_t selectedNonNullCount = 0;

Expand Down Expand Up @@ -1907,6 +1918,7 @@ class RowFieldReader final : public FieldReader {
std::vector<std::unique_ptr<FieldReader>> childrenReaders_;
Vector<bool>& boolBuffer_;
folly::Executor* executor_;
bool withRowNumbersColumn_;
};

class RowFieldReaderFactory final : public FieldReaderFactory {
Expand All @@ -1917,11 +1929,13 @@ class RowFieldReaderFactory final : public FieldReaderFactory {
velox::TypePtr veloxType,
const Type* type,
std::vector<std::unique_ptr<FieldReaderFactory>> children,
folly::Executor* executor)
folly::Executor* executor,
bool withRowNumbersColumn)
: FieldReaderFactory{pool, std::move(veloxType), type},
children_{std::move(children)},
boolBuffer_{&pool_},
executor_{executor} {}
executor_{executor},
withRowNumbersColumn_{withRowNumbersColumn} {}

std::unique_ptr<FieldReader> createReader(
const folly::F14FastMap<offset_size, std::unique_ptr<Decoder>>& decoders)
Expand All @@ -1944,7 +1958,8 @@ class RowFieldReaderFactory final : public FieldReaderFactory {
nulls,
std::move(childrenReaders),
boolBuffer_,
executor_);
executor_,
withRowNumbersColumn_);
}

return std::make_unique<RowFieldReader<true>>(
Expand All @@ -1953,13 +1968,15 @@ class RowFieldReaderFactory final : public FieldReaderFactory {
nulls,
std::move(childrenReaders),
boolBuffer_,
executor_);
executor_,
withRowNumbersColumn_);
}

private:
std::vector<std::unique_ptr<FieldReaderFactory>> children_;
Vector<bool> boolBuffer_;
folly::Executor* executor_;
bool withRowNumbersColumn_;
};

// Represent a keyed value node for flat map
Expand Down Expand Up @@ -2886,6 +2903,9 @@ std::unique_ptr<FieldReaderFactory> createFieldReaderFactory(
children.push_back(std::move(factory));
}

// Support row number column only at root level.
bool withRowNumbersColumn = level == 0 && parameters.withRowNumbersColumn;

// Underlying reader may return a different vector type than what
// specified, (eg. flat map read as struct). So create new ROW type based
// on children types. Note this special logic is only for Row type based
Expand All @@ -2897,7 +2917,8 @@ std::unique_ptr<FieldReaderFactory> createFieldReaderFactory(
std::move(childTypes)),
nimbleType.get(),
std::move(children),
executor);
executor,
withRowNumbersColumn);
}
case velox::TypeKind::MAP: {
NIMBLE_CHECK(
Expand Down
4 changes: 4 additions & 0 deletions dwio/nimble/velox/FieldReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ struct FieldReaderParams {
// Callback to populate feature projection stats when needed
std::function<void(velox::dwio::common::flatmap::FlatMapKeySelectionStats)>
keySelectionCallback{nullptr};

// If true, we'll expect a row number column to be present as the last child
// of each velox row vector.
bool withRowNumbersColumn{false};
};

class FieldReader {
Expand Down
38 changes: 32 additions & 6 deletions dwio/nimble/velox/VeloxReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ VeloxReader::VeloxReader(
}

nextStripe_ = firstStripe_;
firstRowInCurrentStripe_ = firstRow_;

if (parameters_.stripeCountCallback) {
if (firstStripe_ <= lastStripe_) {
Expand Down Expand Up @@ -333,6 +334,13 @@ void VeloxReader::loadNextStripe() {
->ensureLoaded();
}
}
if (loadedStripe_.has_value()) {
firstRowInCurrentStripe_ +=
tabletReader_->stripeRowCount(loadedStripe_.value());
}
firstRowInCurrentStripe_ += stripeRowsSkipped_;
stripeRowsSkipped_ = 0;

loadedStripe_ = nextStripe_++;
rootReader_ = rootFieldReaderFactory_->createReader(decoders_);
}
Expand Down Expand Up @@ -372,13 +380,10 @@ uint64_t VeloxReader::estimatedRowSize() {
}

bool VeloxReader::next(uint64_t rowCount, velox::VectorPtr& result) {
if (rowsRemainingInStripe_ == 0) {
if (nextStripe_ < lastStripe_) {
loadNextStripe();
} else {
return false;
}
if (!hasMoreRows()) {
return false;
}

uint64_t rowsToRead = std::min(rowsRemainingInStripe_, rowCount);
std::optional<std::chrono::steady_clock::time_point> startTime;
if (parameters_.decodingTimeCallback) {
Expand Down Expand Up @@ -431,6 +436,8 @@ uint64_t VeloxReader::seekToRow(uint64_t rowNumber) {
<< " which is outside of the allowed range [" << firstRow_ << ", "
<< lastRow_ << ").";

firstRowInCurrentStripe_ = firstRow_;
stripeRowsSkipped_ = 0;
nextStripe_ = firstStripe_;
rowsRemainingInStripe_ = 0;
return firstRow_;
Expand Down Expand Up @@ -496,6 +503,7 @@ uint64_t VeloxReader::skipStripes(
while (startStripeIndex < lastStripe_ &&
rowsToSkip >= tabletReader_->stripeRowCount(startStripeIndex)) {
rowsToSkip -= tabletReader_->stripeRowCount(startStripeIndex);
stripeRowsSkipped_ += tabletReader_->stripeRowCount(startStripeIndex);
++startStripeIndex;
}

Expand Down Expand Up @@ -547,4 +555,22 @@ uint32_t VeloxReader::getCurrentRowInStripe() const {
static_cast<uint32_t>(rowsRemainingInStripe_);
}

uint64_t VeloxReader::getRowNumber() {
if (hasMoreRows()) {
return firstRowInCurrentStripe_ + getCurrentRowInStripe();
}
return lastRow_;
}

bool VeloxReader::hasMoreRows() {
if (rowsRemainingInStripe_ == 0) {
if (nextStripe_ < lastStripe_) {
loadNextStripe();
} else {
return false;
}
}
return true;
}

} // namespace facebook::nimble
8 changes: 8 additions & 0 deletions dwio/nimble/velox/VeloxReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ class VeloxReader {
return pool_;
}

// Returns the current row number the reader is pointing to. If there are no
// more rows to read in the file, this will return the last row number.
uint64_t getRowNumber();

// Seeks to |rowNumber| from the beginning of the file (row 0).
// If |rowNumber| is greater than the number of rows in the file, the
// seek will stop at the end of file and following reads will return
Expand Down Expand Up @@ -163,6 +167,8 @@ class VeloxReader {

uint32_t getCurrentRowInStripe() const;

bool hasMoreRows();

velox::memory::MemoryPool& pool_;
std::shared_ptr<const TabletReader> tabletReader_;
std::optional<TabletReader::StripeIdentifier> stripeIdentifier_;
Expand All @@ -182,6 +188,8 @@ class VeloxReader {
// Reading state for reader
uint32_t nextStripe_{0};
uint64_t rowsRemainingInStripe_{0};
uint64_t stripeRowsSkipped_{0};
uint64_t firstRowInCurrentStripe_{0};

// stripe currently loaded.
std::optional<uint32_t> loadedStripe_;
Expand Down

0 comments on commit db15ee6

Please sign in to comment.