diff --git a/dwio/nimble/velox/FieldReader.cpp b/dwio/nimble/velox/FieldReader.cpp index e96fb02..6a8d267 100644 --- a/dwio/nimble/velox/FieldReader.cpp +++ b/dwio/nimble/velox/FieldReader.cpp @@ -1729,11 +1729,13 @@ class RowFieldReader final : public FieldReader { Decoder* decoder, std::vector> childrenReaders, Vector& boolBuffer, - folly::Executor* executor) + folly::Executor* executor, + bool withRowNumbersColumn) : FieldReader{pool, std::move(type), decoder}, childrenReaders_{std::move(childrenReaders)}, boolBuffer_{boolBuffer}, - executor_{executor} {} + executor_{executor}, + withRowNumbersColumn_{withRowNumbersColumn} {} std::optional> estimatedRowSize() const final { uint64_t totalBytes{0}; @@ -1791,8 +1793,17 @@ class RowFieldReader final : public FieldReader { auto rowCount = scatterCount(count, scatterBitmap); auto vector = VectorInitializer::initialize( &pool_, output, type_, rowCount); - vector->children().resize(childrenReaders_.size()); vector->unsafeResize(rowCount); + + // If we determine that the presence of an additional child (representing + // row numbers) beyond those in the row's type, there is no need to resize + // its children since this only happens when the row vector is being reused. + bool reuseVector = withRowNumbersColumn_ && + (vector->children().size() == type_->size() + 1); + if (!reuseVector) { + vector->children().resize(childrenReaders_.size()); + } + const void* childrenBits = nullptr; uint32_t selectedNonNullCount = 0; @@ -1907,6 +1918,7 @@ class RowFieldReader final : public FieldReader { std::vector> childrenReaders_; Vector& boolBuffer_; folly::Executor* executor_; + bool withRowNumbersColumn_; }; class RowFieldReaderFactory final : public FieldReaderFactory { @@ -1917,11 +1929,13 @@ class RowFieldReaderFactory final : public FieldReaderFactory { velox::TypePtr veloxType, const Type* type, std::vector> children, - folly::Executor* executor) + folly::Executor* executor, + bool withRowNumbersColumn) : FieldReaderFactory{pool, std::move(veloxType), type}, children_{std::move(children)}, boolBuffer_{&pool_}, - executor_{executor} {} + executor_{executor}, + withRowNumbersColumn_{withRowNumbersColumn} {} std::unique_ptr createReader( const folly::F14FastMap>& decoders) @@ -1944,7 +1958,8 @@ class RowFieldReaderFactory final : public FieldReaderFactory { nulls, std::move(childrenReaders), boolBuffer_, - executor_); + executor_, + withRowNumbersColumn_); } return std::make_unique>( @@ -1953,13 +1968,15 @@ class RowFieldReaderFactory final : public FieldReaderFactory { nulls, std::move(childrenReaders), boolBuffer_, - executor_); + executor_, + withRowNumbersColumn_); } private: std::vector> children_; Vector boolBuffer_; folly::Executor* executor_; + bool withRowNumbersColumn_; }; // Represent a keyed value node for flat map @@ -2886,6 +2903,9 @@ std::unique_ptr createFieldReaderFactory( children.push_back(std::move(factory)); } + // Support row number column only at root level. + bool withRowNumbersColumn = level == 0 && parameters.withRowNumbersColumn; + // Underlying reader may return a different vector type than what // specified, (eg. flat map read as struct). So create new ROW type based // on children types. Note this special logic is only for Row type based @@ -2897,7 +2917,8 @@ std::unique_ptr createFieldReaderFactory( std::move(childTypes)), nimbleType.get(), std::move(children), - executor); + executor, + withRowNumbersColumn); } case velox::TypeKind::MAP: { NIMBLE_CHECK( diff --git a/dwio/nimble/velox/FieldReader.h b/dwio/nimble/velox/FieldReader.h index 4870bb9..e5512d5 100644 --- a/dwio/nimble/velox/FieldReader.h +++ b/dwio/nimble/velox/FieldReader.h @@ -51,6 +51,10 @@ struct FieldReaderParams { // Callback to populate feature projection stats when needed std::function keySelectionCallback{nullptr}; + + // If true, we'll expect a row number column to be present as the last child + // of each velox row vector. + bool withRowNumbersColumn{false}; }; class FieldReader { diff --git a/dwio/nimble/velox/VeloxReader.cpp b/dwio/nimble/velox/VeloxReader.cpp index 103472b..ab87ccc 100644 --- a/dwio/nimble/velox/VeloxReader.cpp +++ b/dwio/nimble/velox/VeloxReader.cpp @@ -264,6 +264,7 @@ VeloxReader::VeloxReader( } nextStripe_ = firstStripe_; + firstRowInCurrentStripe_ = firstRow_; if (parameters_.stripeCountCallback) { if (firstStripe_ <= lastStripe_) { @@ -333,6 +334,13 @@ void VeloxReader::loadNextStripe() { ->ensureLoaded(); } } + if (loadedStripe_.has_value()) { + firstRowInCurrentStripe_ += + tabletReader_->stripeRowCount(loadedStripe_.value()); + } + firstRowInCurrentStripe_ += stripeRowsSkipped_; + stripeRowsSkipped_ = 0; + loadedStripe_ = nextStripe_++; rootReader_ = rootFieldReaderFactory_->createReader(decoders_); } @@ -372,13 +380,10 @@ uint64_t VeloxReader::estimatedRowSize() { } bool VeloxReader::next(uint64_t rowCount, velox::VectorPtr& result) { - if (rowsRemainingInStripe_ == 0) { - if (nextStripe_ < lastStripe_) { - loadNextStripe(); - } else { - return false; - } + if (!hasMoreRows()) { + return false; } + uint64_t rowsToRead = std::min(rowsRemainingInStripe_, rowCount); std::optional startTime; if (parameters_.decodingTimeCallback) { @@ -431,6 +436,8 @@ uint64_t VeloxReader::seekToRow(uint64_t rowNumber) { << " which is outside of the allowed range [" << firstRow_ << ", " << lastRow_ << ")."; + firstRowInCurrentStripe_ = firstRow_; + stripeRowsSkipped_ = 0; nextStripe_ = firstStripe_; rowsRemainingInStripe_ = 0; return firstRow_; @@ -496,6 +503,7 @@ uint64_t VeloxReader::skipStripes( while (startStripeIndex < lastStripe_ && rowsToSkip >= tabletReader_->stripeRowCount(startStripeIndex)) { rowsToSkip -= tabletReader_->stripeRowCount(startStripeIndex); + stripeRowsSkipped_ += tabletReader_->stripeRowCount(startStripeIndex); ++startStripeIndex; } @@ -547,4 +555,22 @@ uint32_t VeloxReader::getCurrentRowInStripe() const { static_cast(rowsRemainingInStripe_); } +uint64_t VeloxReader::getRowNumber() { + if (hasMoreRows()) { + return firstRowInCurrentStripe_ + getCurrentRowInStripe(); + } + return lastRow_; +} + +bool VeloxReader::hasMoreRows() { + if (rowsRemainingInStripe_ == 0) { + if (nextStripe_ < lastStripe_) { + loadNextStripe(); + } else { + return false; + } + } + return true; +} + } // namespace facebook::nimble diff --git a/dwio/nimble/velox/VeloxReader.h b/dwio/nimble/velox/VeloxReader.h index 2f4e288..049613e 100644 --- a/dwio/nimble/velox/VeloxReader.h +++ b/dwio/nimble/velox/VeloxReader.h @@ -114,6 +114,10 @@ class VeloxReader { return pool_; } + // Returns the current row number the reader is pointing to. If there are no + // more rows to read in the file, this will return the last row number. + uint64_t getRowNumber(); + // Seeks to |rowNumber| from the beginning of the file (row 0). // If |rowNumber| is greater than the number of rows in the file, the // seek will stop at the end of file and following reads will return @@ -163,6 +167,8 @@ class VeloxReader { uint32_t getCurrentRowInStripe() const; + bool hasMoreRows(); + velox::memory::MemoryPool& pool_; std::shared_ptr tabletReader_; std::optional stripeIdentifier_; @@ -182,6 +188,8 @@ class VeloxReader { // Reading state for reader uint32_t nextStripe_{0}; uint64_t rowsRemainingInStripe_{0}; + uint64_t stripeRowsSkipped_{0}; + uint64_t firstRowInCurrentStripe_{0}; // stripe currently loaded. std::optional loadedStripe_;