Skip to content

Commit

Permalink
Append row number column in batch reader (#78)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #78

We extend the nimble velox reader to support an additional row numbers column when present and also create a function to return the row number of the first row in the last read.

Differential Revision: D61506232
  • Loading branch information
MacVincent Agha-Oko authored and facebook-github-bot committed Oct 1, 2024
1 parent ce7921a commit 563f4e4
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 10 deletions.
30 changes: 24 additions & 6 deletions dwio/nimble/velox/VeloxReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ VeloxReader::VeloxReader(
// file splits will cover the rest of the file.

const auto stripeCount = tabletReader_->stripeCount();
cummulativeStripeRowCounts_.reserve(stripeCount);
firstStripe_ = stripeCount;
lastStripe_ = 0;
firstRow_ = 0;
Expand All @@ -261,6 +262,7 @@ VeloxReader::VeloxReader(
}
}
rows += stripeRowCount;
cummulativeStripeRowCounts_.push_back(rows);
}

nextStripe_ = firstStripe_;
Expand Down Expand Up @@ -372,13 +374,10 @@ uint64_t VeloxReader::estimatedRowSize() {
}

bool VeloxReader::next(uint64_t rowCount, velox::VectorPtr& result) {
if (rowsRemainingInStripe_ == 0) {
if (nextStripe_ < lastStripe_) {
loadNextStripe();
} else {
return false;
}
if (!hasMoreRows()) {
return false;
}

uint64_t rowsToRead = std::min(rowsRemainingInStripe_, rowCount);
std::optional<std::chrono::steady_clock::time_point> startTime;
if (parameters_.decodingTimeCallback) {
Expand Down Expand Up @@ -547,4 +546,23 @@ uint32_t VeloxReader::getCurrentRowInStripe() const {
static_cast<uint32_t>(rowsRemainingInStripe_);
}

uint64_t VeloxReader::getRowNumber() {
if (hasMoreRows()) {
return cummulativeStripeRowCounts_[loadedStripe_.value()] -
rowsRemainingInStripe_;
}
return lastRow_;
}

bool VeloxReader::hasMoreRows() {
if (rowsRemainingInStripe_ == 0) {
if (nextStripe_ < lastStripe_) {
loadNextStripe();
} else {
return false;
}
}
return true;
}

} // namespace facebook::nimble
8 changes: 8 additions & 0 deletions dwio/nimble/velox/VeloxReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ class VeloxReader {
return pool_;
}

// Returns the current row number the reader is pointing to. If there are no
// more rows to read in the file, this will return the last row number.
uint64_t getRowNumber();

// Seeks to |rowNumber| from the beginning of the file (row 0).
// If |rowNumber| is greater than the number of rows in the file, the
// seek will stop at the end of file and following reads will return
Expand Down Expand Up @@ -163,13 +167,16 @@ class VeloxReader {

uint32_t getCurrentRowInStripe() const;

bool hasMoreRows();

velox::memory::MemoryPool& pool_;
std::shared_ptr<const TabletReader> tabletReader_;
std::optional<TabletReader::StripeIdentifier> stripeIdentifier_;
const VeloxReadParams parameters_;
std::shared_ptr<const Type> schema_;
std::shared_ptr<const velox::RowType> type_;
std::vector<uint32_t> offsets_;
std::vector<uint32_t> cummulativeStripeRowCounts_;
folly::F14FastMap<offset_size, std::unique_ptr<Decoder>> decoders_;
std::unique_ptr<FieldReaderFactory> rootFieldReaderFactory_;
std::unique_ptr<FieldReader> rootReader_;
Expand All @@ -182,6 +189,7 @@ class VeloxReader {
// Reading state for reader
uint32_t nextStripe_{0};
uint64_t rowsRemainingInStripe_{0};
// uint64_t firstRowInCurrentStripe_{0};

// stripe currently loaded.
std::optional<uint32_t> loadedStripe_;
Expand Down
Loading

0 comments on commit 563f4e4

Please sign in to comment.