diff --git a/dwio/nimble/tablet/TabletReader.cpp b/dwio/nimble/tablet/TabletReader.cpp index d1ab40a..03f9a42 100644 --- a/dwio/nimble/tablet/TabletReader.cpp +++ b/dwio/nimble/tablet/TabletReader.cpp @@ -153,13 +153,11 @@ MetadataBuffer::MetadataBuffer( iobuf.computeChainDataLength(), type} {} -void TabletReader::StripeGroup::reset( +TabletReader::StripeGroup::StripeGroup( uint32_t stripeGroupIndex, const MetadataBuffer& stripes, - uint32_t stripeIndex, - std::unique_ptr stripeGroup) { - index_ = stripeGroupIndex; - metadata_ = std::move(stripeGroup); + std::unique_ptr stripeGroup) + : metadata_{std::move(stripeGroup)}, index_{stripeGroupIndex} { auto metadataRoot = asFlatBuffersRoot(metadata_->content()); auto stripesRoot = @@ -179,13 +177,16 @@ void TabletReader::StripeGroup::reset( // Find the first stripe that use this stripe group auto groupIndices = stripesRoot->group_indices()->data(); - while (stripeIndex > 0) { - if (groupIndices[stripeIndex] != groupIndices[stripeIndex - 1]) { - break; + for (uint32_t stripeIndex = 0, + groupIndicesSize = stripesRoot->group_indices()->size(); + stripeIndex < groupIndicesSize; + ++stripeIndex) { + if (groupIndices[stripeIndex] == stripeGroupIndex) { + firstStripe_ = stripeIndex; + return; } - --stripeIndex; } - firstStripe_ = stripeIndex; + NIMBLE_UNREACHABLE("No stripe found for stripe group"); } std::span TabletReader::StripeGroup::streamOffsets( @@ -254,12 +255,18 @@ TabletReader::TabletReader( ownedFile_{std::move(readFile)}, ps_{std::move(postscript)}, footer_{std::make_unique(memoryPool, footer)}, - stripes_{std::make_unique(memoryPool, stripes)} { - stripeGroup_.reset( - /* stripeGroupIndex */ 0, - *stripes_, - /* stripeIndex */ 0, - std::make_unique(memoryPool, stripeGroup)); + stripes_{std::make_unique(memoryPool, stripes)}, + stripeGroupCache_{[this](uint32_t stripeGroupIndex) { + return loadStripeGroup(stripeGroupIndex); + }} { + auto stripeGroupPtr = + stripeGroupCache_.get(0, [this, stripeGroup](uint32_t stripeGroupIndex) { + return std::make_shared( + stripeGroupIndex, + *stripes_, + std::make_unique(memoryPool_, stripeGroup)); + }); + *firstStripeGroup_.wlock() = std::move(stripeGroupPtr); initStripes(); auto optionalSectionsCacheLock = optionalSectionsCache_.wlock(); for (auto& pair : optionalSections) { @@ -281,7 +288,11 @@ TabletReader::TabletReader( MemoryPool& memoryPool, velox::ReadFile* readFile, const std::vector& preloadOptionalSections) - : memoryPool_{memoryPool}, file_{readFile} { + : memoryPool_{memoryPool}, + file_{readFile}, + stripeGroupCache_{[this](uint32_t stripeGroupIndex) { + return loadStripeGroup(stripeGroupIndex); + }} { // We make an initial read of the last piece of the file, and then do // another read if our first one didn't cover the whole footer. We could // make this a parameter to the constructor later. @@ -343,16 +354,20 @@ TabletReader::TabletReader( auto stripeGroup = stripeGroups->Get(0); if (stripeGroups->size() == 1 && stripeGroup->offset() + readSize >= fileSize) { - stripeGroup_.reset( - /* stripeGroupIndex */ 0, - *stripes_, - /* stripeIndex */ 0, - std::make_unique( - memoryPool_, - footerIOBuf, - stripeGroup->offset() + readSize - fileSize, - stripeGroup->size(), - static_cast(stripeGroup->compression_type()))); + auto stripeGroupPtr = + stripeGroupCache_.get(0, [&](uint32_t stripeGroupIndex) { + return std::make_shared( + stripeGroupIndex, + *stripes_, + std::make_unique( + memoryPool_, + footerIOBuf, + stripeGroup->offset() + readSize - fileSize, + stripeGroup->size(), + static_cast( + stripeGroup->compression_type()))); + }); + *firstStripeGroup_.wlock() = std::move(stripeGroupPtr); } } @@ -514,55 +529,63 @@ void TabletReader::initStripes() { } } -void TabletReader::ensureStripeGroup(uint32_t stripe) const { +uint32_t TabletReader::getStripeGroupIndex(uint32_t stripeIndex) const { + const auto stripesRoot = + asFlatBuffersRoot(stripes_->content()); + return stripesRoot->group_indices()->Get(stripeIndex); +} + +std::shared_ptr TabletReader::loadStripeGroup( + uint32_t stripeGroupIndex) const { auto footerRoot = asFlatBuffersRoot(footer_->content()); - auto stripesRoot = - asFlatBuffersRoot(stripes_->content()); - auto targetIndex = stripesRoot->group_indices()->Get(stripe); - if (targetIndex != stripeGroup_.index()) { - auto stripeGroup = footerRoot->stripe_groups()->Get(targetIndex); - velox::common::Region stripeGroupRegion{ - stripeGroup->offset(), stripeGroup->size(), "StripeGroup"}; - folly::IOBuf result; - file_->preadv({&stripeGroupRegion, 1}, {&result, 1}); - - stripeGroup_.reset( - targetIndex, - *stripes_, - stripe, - std::make_unique( - memoryPool_, - result, - static_cast(stripeGroup->compression_type()))); - } + auto stripeGroupInfo = footerRoot->stripe_groups()->Get(stripeGroupIndex); + velox::common::Region stripeGroupRegion{ + stripeGroupInfo->offset(), stripeGroupInfo->size(), "StripeGroup"}; + folly::IOBuf buffer; + file_->preadv({&stripeGroupRegion, 1}, {&buffer, 1}); + + // Reset the first stripe group that was loaded when we load another one + firstStripeGroup_.wlock()->reset(); + + return std::make_shared( + stripeGroupIndex, + *stripes_, + std::make_unique( + memoryPool_, + buffer, + static_cast(stripeGroupInfo->compression_type()))); +} + +std::shared_ptr TabletReader::getStripeGroup( + uint32_t stripeGroupIndex) const { + return stripeGroupCache_.get(stripeGroupIndex); } -std::span TabletReader::streamOffsets(uint32_t stripe) const { - ensureStripeGroup(stripe); - return stripeGroup_.streamOffsets(stripe); +std::span TabletReader::streamOffsets( + const StripeIdentifier& stripe) const { + return stripe.stripeGroup_->streamOffsets(stripe.stripeId_); } -std::span TabletReader::streamSizes(uint32_t stripe) const { - ensureStripeGroup(stripe); - return stripeGroup_.streamSizes(stripe); +std::span TabletReader::streamSizes( + const StripeIdentifier& stripe) const { + return stripe.stripeGroup_->streamSizes(stripe.stripeId_); } -uint32_t TabletReader::streamCount(uint32_t stripe) const { - ensureStripeGroup(stripe); - return stripeGroup_.streamCount(); +uint32_t TabletReader::streamCount(const StripeIdentifier& stripe) const { + return stripe.stripeGroup_->streamCount(); } std::vector> TabletReader::load( - uint32_t stripe, + const StripeIdentifier& stripe, std::span streamIdentifiers, std::function streamLabel) const { - NIMBLE_CHECK(stripe < stripeCount_, "Stripe is out of range."); + NIMBLE_CHECK(stripe.stripeId_ < stripeCount_, "Stripe is out of range."); - const uint64_t stripeOffset = this->stripeOffset(stripe); - ensureStripeGroup(stripe); - const auto stripeStreamOffsets = stripeGroup_.streamOffsets(stripe); - const auto stripeStreamSizes = stripeGroup_.streamSizes(stripe); + const uint64_t stripeOffset = this->stripeOffset(stripe.stripeId_); + const auto& stripeGroup = stripe.stripeGroup_; + const auto stripeStreamOffsets = stripeGroup->streamOffsets(stripe.stripeId_); + const auto stripeStreamSizes = stripeGroup->streamSizes(stripe.stripeId_); const uint32_t streamsToLoad = streamIdentifiers.size(); std::vector> streams(streamsToLoad); @@ -573,7 +596,7 @@ std::vector> TabletReader::load( for (uint32_t i = 0; i < streamsToLoad; ++i) { const uint32_t streamIdentifier = streamIdentifiers[i]; - if (streamIdentifier >= stripeGroup_.streamCount()) { + if (streamIdentifier >= stripeGroup->streamCount()) { streams[i] = nullptr; continue; } diff --git a/dwio/nimble/tablet/TabletReader.h b/dwio/nimble/tablet/TabletReader.h index 8e36544..3530e42 100644 --- a/dwio/nimble/tablet/TabletReader.h +++ b/dwio/nimble/tablet/TabletReader.h @@ -130,6 +130,48 @@ class StreamLoader { virtual const std::string_view getStream() const = 0; }; +template +class ReferenceCountedCache { + public: + using BuilderCallback = std::function(Key)>; + + explicit ReferenceCountedCache(BuilderCallback builder) + : builder_{std::move(builder)} {} + + std::shared_ptr get(Key key) { + return getPopulatedCacheEntry(key, builder_); + } + + std::shared_ptr get(Key key, const BuilderCallback& builder) { + return getPopulatedCacheEntry(key, builder); + } + + private: + folly::Synchronized>& getCacheEntry(Key key) { + return cache_.wlock()->emplace(key, std::weak_ptr()).first->second; + } + + std::shared_ptr getPopulatedCacheEntry( + Key key, + const BuilderCallback& builder) { + auto& entry = getCacheEntry(key); + auto wlockedEntry = entry.wlock(); + auto sharedPtr = wlockedEntry->lock(); + if (sharedPtr) { + return sharedPtr; + } + auto element = builder(key); + std::weak_ptr(element).swap(*wlockedEntry); + NIMBLE_DASSERT(!wlockedEntry->expired(), "Shouldn't be expired"); + return element; + } + + BuilderCallback builder_; + folly::Synchronized< + std::unordered_map>>> + cache_; +}; + // Provides read access to a tablet written by a TabletWriter. // Example usage to read all streams from stripe 0 in a file: // auto readFile = std::make_unique("/tmp/myfile"); @@ -138,7 +180,45 @@ class StreamLoader { // |serializedStreams[i]| now contains the stream corresponding to // the stream identifier provided in the input vector. class TabletReader { + struct StripeGroup { + StripeGroup( + uint32_t stripeGroupIndex, + const MetadataBuffer& stripes, + std::unique_ptr metadata); + + uint32_t index() const { + return index_; + } + + uint32_t streamCount() const { + return streamCount_; + } + + std::span streamOffsets(uint32_t stripe) const; + std::span streamSizes(uint32_t stripe) const; + + private: + std::unique_ptr metadata_; + uint32_t index_; + uint32_t streamCount_; + uint32_t firstStripe_; + const uint32_t* streamOffsets_; + const uint32_t* streamSizes_; + }; + public: + class StripeIdentifier { + explicit StripeIdentifier( + uint32_t stripeId, + std::shared_ptr stripeGroup) + : stripeId_{stripeId}, stripeGroup_{std::move(stripeGroup)} {} + + uint32_t stripeId_; + std::shared_ptr stripeGroup_; + + friend class TabletReader; + }; + // Compute checksum from the beginning of the file all the way to footer // size and footer compression type field in postscript. // chunkSize means each time reads up to chunkSize, until all data are done. @@ -161,7 +241,7 @@ class TabletReader { // span. If a stream was not present in the given stripe a nullptr is returned // in its slot. std::vector> load( - uint32_t stripe, + const StripeIdentifier& stripe, std::span streamIdentifiers, std::function streamLabel = [](uint32_t) { return std::string_view{}; @@ -221,48 +301,28 @@ class TabletReader { // Returns stream offsets for the specified stripe. Number of streams is // determined by schema node count at the time when stripe is written, so it // may have fewer number of items than the final schema node count - std::span streamOffsets(uint32_t stripe) const; + std::span streamOffsets(const StripeIdentifier& stripe) const; // Returns stream sizes for the specified stripe. Has same constraint as // `streamOffsets()`. - std::span streamSizes(uint32_t stripe) const; + std::span streamSizes(const StripeIdentifier& stripe) const; // Returns stream count for the specified stripe. Has same constraint as // `streamOffsets()`. - uint32_t streamCount(uint32_t stripe) const; + uint32_t streamCount(const StripeIdentifier& stripe) const; - private: - struct StripeGroup { - uint32_t index() const { - return index_; - } - - uint32_t streamCount() const { - return streamCount_; - } - - void reset( - uint32_t stripeGroupIndex, - const MetadataBuffer& stripes, - uint32_t stripeIndex, - std::unique_ptr metadata); - - std::span streamOffsets(uint32_t stripe) const; - std::span streamSizes(uint32_t stripe) const; + StripeIdentifier getStripeIdentifier(uint32_t stripeIndex) const { + return StripeIdentifier{ + stripeIndex, getStripeGroup(getStripeGroupIndex(stripeIndex))}; + } - private: - std::unique_ptr metadata_; - uint32_t index_{std::numeric_limits::max()}; - uint32_t streamCount_{0}; - uint32_t firstStripe_{0}; - const uint32_t* streamOffsets_{nullptr}; - const uint32_t* streamSizes_{nullptr}; - }; + private: + uint32_t getStripeGroupIndex(uint32_t stripeIndex) const; + std::shared_ptr loadStripeGroup(uint32_t stripeGroupIndex) const; + std::shared_ptr getStripeGroup(uint32_t stripeGroupIndex) const; void initStripes(); - void ensureStripeGroup(uint32_t stripe) const; - // For testing use TabletReader( MemoryPool& memoryPool, @@ -280,7 +340,9 @@ class TabletReader { Postscript ps_; std::unique_ptr footer_; std::unique_ptr stripes_; - mutable StripeGroup stripeGroup_; + + mutable ReferenceCountedCache stripeGroupCache_; + mutable folly::Synchronized> firstStripeGroup_; uint64_t tabletRowCount_; uint32_t stripeCount_{0}; diff --git a/dwio/nimble/tablet/tests/TabletTests.cpp b/dwio/nimble/tablet/tests/TabletTests.cpp index 108b412..5597b31 100644 --- a/dwio/nimble/tablet/tests/TabletTests.cpp +++ b/dwio/nimble/tablet/tests/TabletTests.cpp @@ -154,10 +154,11 @@ void parameterizedTest( EXPECT_EQ(stripesData[stripe].rowCount, tablet.stripeRowCount(stripe)); readFile.resetChunks(); - std::vector identifiers(tablet.streamCount(stripe)); + auto stripeIdentifier = tablet.getStripeIdentifier(stripe); + std::vector identifiers(tablet.streamCount(stripeIdentifier)); std::iota(identifiers.begin(), identifiers.end(), 0); - auto serializedStreams = - tablet.load(stripe, {identifiers.cbegin(), identifiers.cend()}); + auto serializedStreams = tablet.load( + stripeIdentifier, {identifiers.cbegin(), identifiers.cend()}); auto chunks = readFile.chunks(); auto expectedReads = stripesData[stripe].streams.size(); auto diff = chunks.size() - expectedReads; @@ -755,3 +756,223 @@ TEST(TabletTests, OptionalSectionsPreload) { }); } } + +namespace { + +enum class ActionEnum { kCreated, kDestroyed }; + +using Action = std::pair; +using Actions = std::vector; + +class Guard { + public: + Guard(int id, Actions& actions) : id_{id}, actions_{actions} { + actions_.push_back(std::make_pair(ActionEnum::kCreated, id_)); + } + + ~Guard() { + actions_.push_back(std::make_pair(ActionEnum::kDestroyed, id_)); + } + + Guard(const Guard&) = delete; + Guard(Guard&&) = delete; + Guard& operator=(const Guard&) = delete; + Guard& operator=(Guard&&) = delete; + + int id() const { + return id_; + } + + private: + int id_; + Actions& actions_; +}; + +} // namespace + +TEST(TabletTests, ReferenceCountedCache) { + Actions actions; + facebook::nimble::ReferenceCountedCache cache{ + [&](int id) { return std::make_shared(id, actions); }}; + + auto e1 = cache.get(0); + EXPECT_EQ(e1->id(), 0); + EXPECT_EQ(actions, Actions({{ActionEnum::kCreated, 0}})); + + auto e2 = cache.get(0); + EXPECT_EQ(e2->id(), 0); + EXPECT_EQ(actions, Actions({{ActionEnum::kCreated, 0}})); + e2.reset(); + EXPECT_EQ(actions, Actions({{ActionEnum::kCreated, 0}})); + + auto e3 = cache.get(1); + EXPECT_EQ(e3->id(), 1); + EXPECT_EQ( + actions, Actions({{ActionEnum::kCreated, 0}, {ActionEnum::kCreated, 1}})); + + e1.reset(); + EXPECT_EQ( + actions, + Actions( + {{ActionEnum::kCreated, 0}, + {ActionEnum::kCreated, 1}, + {ActionEnum::kDestroyed, 0}})); + + auto e4 = e3; + EXPECT_EQ(e4->id(), 1); + e3.reset(); + EXPECT_EQ( + actions, + Actions( + {{ActionEnum::kCreated, 0}, + {ActionEnum::kCreated, 1}, + {ActionEnum::kDestroyed, 0}})); + + e4.reset(); + EXPECT_EQ( + actions, + Actions( + {{ActionEnum::kCreated, 0}, + {ActionEnum::kCreated, 1}, + {ActionEnum::kDestroyed, 0}, + {ActionEnum::kDestroyed, 1}})); + + auto e5 = cache.get(1); + EXPECT_EQ(e5->id(), 1); + EXPECT_EQ( + actions, + Actions( + {{ActionEnum::kCreated, 0}, + {ActionEnum::kCreated, 1}, + {ActionEnum::kDestroyed, 0}, + {ActionEnum::kDestroyed, 1}, + {ActionEnum::kCreated, 1}})); + + auto e6 = cache.get(0); + EXPECT_EQ(e6->id(), 0); + EXPECT_EQ( + actions, + Actions( + {{ActionEnum::kCreated, 0}, + {ActionEnum::kCreated, 1}, + {ActionEnum::kDestroyed, 0}, + {ActionEnum::kDestroyed, 1}, + {ActionEnum::kCreated, 1}, + {ActionEnum::kCreated, 0}})); + + e5.reset(); + EXPECT_EQ( + actions, + Actions( + {{ActionEnum::kCreated, 0}, + {ActionEnum::kCreated, 1}, + {ActionEnum::kDestroyed, 0}, + {ActionEnum::kDestroyed, 1}, + {ActionEnum::kCreated, 1}, + {ActionEnum::kCreated, 0}, + {ActionEnum::kDestroyed, 1}})); + + e6.reset(); + EXPECT_EQ( + actions, + Actions( + {{ActionEnum::kCreated, 0}, + {ActionEnum::kCreated, 1}, + {ActionEnum::kDestroyed, 0}, + {ActionEnum::kDestroyed, 1}, + {ActionEnum::kCreated, 1}, + {ActionEnum::kCreated, 0}, + {ActionEnum::kDestroyed, 1}, + {ActionEnum::kDestroyed, 0}})); +} + +TEST(TabletTests, ReferenceCountedCacheStressParallelDuplicates) { + std::atomic_int counter{0}; + facebook::nimble::ReferenceCountedCache cache{[&](int id) { + ++counter; + return std::make_shared(id); + }}; + folly::CPUThreadPoolExecutor executor(10); + velox::dwio::common::ExecutorBarrier barrier(executor); + constexpr int kEntryIds = 100; + constexpr int kEntryDuplicates = 10; + for (int i = 0; i < kEntryIds; ++i) { + for (int n = 0; n < kEntryDuplicates; ++n) { + barrier.add([i, &cache]() { + auto e = cache.get(i); + EXPECT_EQ(*e, i); + }); + } + } + barrier.waitAll(); + EXPECT_GE(counter.load(), kEntryIds); +} + +TEST(TabletTests, ReferenceCountedCacheStressParallelDuplicatesSaveEntries) { + std::atomic_int counter{0}; + facebook::nimble::ReferenceCountedCache cache{[&](int id) { + ++counter; + return std::make_shared(id); + }}; + folly::Synchronized>> entries; + folly::CPUThreadPoolExecutor executor(10); + velox::dwio::common::ExecutorBarrier barrier(executor); + constexpr int kEntryIds = 100; + constexpr int kEntryDuplicates = 10; + for (int i = 0; i < kEntryIds; ++i) { + for (int n = 0; n < kEntryDuplicates; ++n) { + barrier.add([i, &cache, &entries]() { + auto e = cache.get(i); + EXPECT_EQ(*e, i); + entries.wlock()->push_back(e); + }); + } + } + barrier.waitAll(); + EXPECT_EQ(counter.load(), kEntryIds); +} + +TEST(TabletTests, ReferenceCountedCacheStress) { + std::atomic_int counter{0}; + facebook::nimble::ReferenceCountedCache cache{[&](int id) { + ++counter; + return std::make_shared(id); + }}; + folly::CPUThreadPoolExecutor executor(10); + velox::dwio::common::ExecutorBarrier barrier(executor); + constexpr int kEntryIds = 100; + constexpr int kEntryDuplicates = 10; + for (int n = 0; n < kEntryDuplicates; ++n) { + for (int i = 0; i < kEntryIds; ++i) { + barrier.add([i, &cache]() { + auto e = cache.get(i); + EXPECT_EQ(*e, i); + }); + } + } + barrier.waitAll(); + EXPECT_GE(counter.load(), kEntryIds); +} +TEST(TabletTests, ReferenceCountedCacheStressSaveEntries) { + std::atomic_int counter{0}; + facebook::nimble::ReferenceCountedCache cache{[&](int id) { + ++counter; + return std::make_shared(id); + }}; + folly::Synchronized>> entries; + folly::CPUThreadPoolExecutor executor(10); + velox::dwio::common::ExecutorBarrier barrier(executor); + constexpr int kEntryIds = 100; + constexpr int kEntryDuplicates = 10; + for (int n = 0; n < kEntryDuplicates; ++n) { + for (int i = 0; i < kEntryIds; ++i) { + barrier.add([i, &cache, &entries]() { + auto e = cache.get(i); + EXPECT_EQ(*e, i); + entries.wlock()->push_back(e); + }); + } + } + barrier.waitAll(); + EXPECT_EQ(counter.load(), kEntryIds); +} diff --git a/dwio/nimble/tools/NimbleDumpLib.cpp b/dwio/nimble/tools/NimbleDumpLib.cpp index a1efe6b..65ffb5a 100644 --- a/dwio/nimble/tools/NimbleDumpLib.cpp +++ b/dwio/nimble/tools/NimbleDumpLib.cpp @@ -120,10 +120,13 @@ void traverseTablet( stripeVisitor(i); } if (streamVisitor) { - std::vector identifiers(tabletReader.streamCount(i)); - std::iota(identifiers.begin(), identifiers.end(), 0); - auto streams = - tabletReader.load(i, {identifiers.cbegin(), identifiers.cend()}); + auto stripeIdentifier = tabletReader.getStripeIdentifier(i); + std::vector streamIdentifiers( + tabletReader.streamCount(stripeIdentifier)); + std::iota(streamIdentifiers.begin(), streamIdentifiers.end(), 0); + auto streams = tabletReader.load( + stripeIdentifier, + {streamIdentifiers.cbegin(), streamIdentifiers.cend()}); for (uint32_t j = 0; j < streams.size(); ++j) { auto& stream = streams[j]; if (stream) { @@ -332,7 +335,8 @@ void NimbleDumpLib::emitStripes(bool noHeader) { {"Row Count", 15}}, noHeader); traverseTablet(*pool_, tabletReader, std::nullopt, [&](uint32_t stripeIndex) { - auto sizes = tabletReader.streamSizes(stripeIndex); + auto stripeIdentifier = tabletReader.getStripeIdentifier(stripeIndex); + auto sizes = tabletReader.streamSizes(stripeIdentifier); auto stripeSize = std::accumulate(sizes.begin(), sizes.end(), 0UL); formatter.writeRow({ folly::to(stripeIndex), @@ -374,6 +378,7 @@ void NimbleDumpLib::emitStreams( stripeId, nullptr /* stripeVisitor */, [&](ChunkedStream& stream, uint32_t stripeId, uint32_t streamId) { + auto stripeIdentifier = tabletReader->getStripeIdentifier(stripeId); uint32_t itemCount = 0; while (stream.hasNext()) { auto chunk = stream.nextChunk(); @@ -384,9 +389,9 @@ void NimbleDumpLib::emitStreams( values.push_back(folly::to(stripeId)); values.push_back(folly::to(streamId)); values.push_back(folly::to( - tabletReader->streamOffsets(stripeId)[streamId])); + tabletReader->streamOffsets(stripeIdentifier)[streamId])); values.push_back(folly::to( - tabletReader->streamSizes(stripeId)[streamId])); + tabletReader->streamSizes(stripeIdentifier)[streamId])); values.push_back(folly::to(itemCount)); if (streamLabels) { auto it = values.emplace_back(labels->streamLabel(streamId)); @@ -463,15 +468,16 @@ void NimbleDumpLib::emitContent( uint32_t maxStreamCount; bool found = false; traverseTablet(*pool_, tabletReader, stripeId, [&](uint32_t stripeId) { + auto stripeIdentifier = tabletReader.getStripeIdentifier(stripeId); maxStreamCount = - std::max(maxStreamCount, tabletReader.streamCount(stripeId)); - if (streamId >= tabletReader.streamCount(stripeId)) { + std::max(maxStreamCount, tabletReader.streamCount(stripeIdentifier)); + if (streamId >= tabletReader.streamCount(stripeIdentifier)) { return; } found = true; - auto streams = tabletReader.load(stripeId, std::vector{streamId}); + auto streams = tabletReader.load(stripeIdentifier, std::vector{streamId}); if (auto& stream = streams[0]) { InMemoryChunkedStream chunkedStream{*pool_, std::move(stream)}; @@ -503,16 +509,17 @@ void NimbleDumpLib::emitBinary( uint32_t streamId, uint32_t stripeId) { TabletReader tabletReader{*pool_, file_.get()}; - if (streamId >= tabletReader.streamCount(stripeId)) { + auto stripeIdentifier = tabletReader.getStripeIdentifier(stripeId); + if (streamId >= tabletReader.streamCount(stripeIdentifier)) { throw folly::ProgramExit( -1, fmt::format( "Stream identifier {} is out of bound. Must be between 0 and {}\n", streamId, - tabletReader.streamCount(stripeId))); + tabletReader.streamCount(stripeIdentifier))); } - auto streams = tabletReader.load(stripeId, std::vector{streamId}); + auto streams = tabletReader.load(stripeIdentifier, std::vector{streamId}); if (auto& stream = streams[0]) { auto output = outputFactory(); diff --git a/dwio/nimble/velox/VeloxReader.cpp b/dwio/nimble/velox/VeloxReader.cpp index 597666a..a25da33 100644 --- a/dwio/nimble/velox/VeloxReader.cpp +++ b/dwio/nimble/velox/VeloxReader.cpp @@ -233,8 +233,10 @@ void VeloxReader::loadStripe() { // streams than later stripes. // In the extreme case, a stripe can return zero streams (for example, if // all the streams in that stripe were contained all nulls). + stripeIdentifier_.emplace( + tabletReader_->getStripeIdentifier(nextStripe_)); auto streams = tabletReader_->load( - nextStripe_, offsets_, [this](offset_size offset) { + stripeIdentifier_.value(), offsets_, [this](offset_size offset) { return streamLabels_.streamLabel(offset); }); for (uint32_t i = 0; i < streams.size(); ++i) { diff --git a/dwio/nimble/velox/VeloxReader.h b/dwio/nimble/velox/VeloxReader.h index 5b1b972..93eb8ee 100644 --- a/dwio/nimble/velox/VeloxReader.h +++ b/dwio/nimble/velox/VeloxReader.h @@ -154,6 +154,7 @@ class VeloxReader { velox::memory::MemoryPool& pool_; std::shared_ptr tabletReader_; + std::optional stripeIdentifier_; const VeloxReadParams parameters_; std::shared_ptr schema_; StreamLabels streamLabels_; diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index 69a72df..3ab6b33 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -345,7 +345,8 @@ size_t streamsReadCount( VELOX_CHECK_EQ(false, readFile->shouldCoalesce()); nimble::TabletReader tablet(pool, readFile); VELOX_CHECK_GE(tablet.stripeCount(), 1); - auto offsets = tablet.streamOffsets(0); + auto stripeIdentifier = tablet.getStripeIdentifier(0); + auto offsets = tablet.streamOffsets(stripeIdentifier); std::unordered_set streamOffsets; LOG(INFO) << "Number of streams: " << offsets.size(); for (auto offset : offsets) { diff --git a/dwio/nimble/velox/tests/VeloxWriterTests.cpp b/dwio/nimble/velox/tests/VeloxWriterTests.cpp index 214f801..3ec254d 100644 --- a/dwio/nimble/velox/tests/VeloxWriterTests.cpp +++ b/dwio/nimble/velox/tests/VeloxWriterTests.cpp @@ -544,12 +544,13 @@ TEST_F(VeloxWriterTests, EncodingLayout) { const auto& flatMapKey2Node = findChild(flatMapNode, "2")->asScalar(); for (auto i = 0; i < tablet.stripeCount(); ++i) { + auto stripeIdentifier = tablet.getStripeIdentifier(i); std::vector identifiers{ mapNode.lengthsDescriptor().offset(), mapValuesNode.scalarDescriptor().offset(), flatMapKey1Node.scalarDescriptor().offset(), flatMapKey2Node.scalarDescriptor().offset()}; - auto streams = tablet.load(i, identifiers); + auto streams = tablet.load(stripeIdentifier, identifiers); { nimble::InMemoryChunkedStream chunkedStream{ *leafPool_, std::move(streams[0])}; @@ -997,15 +998,17 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsNoChunks) { {{vector, nimble::FlushDecision::None}, {vector, nimble::FlushDecision::None}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // Logically, there should be two streams in the tablet. // However, when writing stripes, we do not write empty streams. // In this case, the integer column is empty, and therefore, omitted. - ASSERT_EQ(1, tablet.streamCount(0)); - EXPECT_LT(0, tablet.streamSizes(0)[0]); + ASSERT_EQ(1, tablet.streamCount(stripeIdentifier)); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[0]); - auto streamLoaders = tablet.load(0, std::array{0}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0}); ASSERT_EQ(1, streamLoaders.size()); // No chunks used, so expecting single chunk @@ -1031,15 +1034,17 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsWithChunksMinSizeBig) { {{vector, nimble::FlushDecision::Chunk}, {vector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // Logically, there should be two streams in the tablet. // However, when writing stripes, we do not write empty streams. // In this case, the integer column is empty, and therefore, omitted. - ASSERT_EQ(1, tablet.streamCount(0)); - EXPECT_LT(0, tablet.streamSizes(0)[0]); + ASSERT_EQ(1, tablet.streamCount(stripeIdentifier)); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[0]); - auto streamLoaders = tablet.load(0, std::array{0}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0}); ASSERT_EQ(1, streamLoaders.size()); // Chunks requested, but min chunk size is too big, so expecting one @@ -1066,15 +1071,17 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowAllNullsWithChunksMinSizeZero) { {{vector, nimble::FlushDecision::Chunk}, {vector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // Logically, there should be two streams in the tablet. // However, when writing stripes, we do not write empty streams. // In this case, the integer column is empty, and therefore, omitted. - ASSERT_EQ(1, tablet.streamCount(0)); - EXPECT_LT(0, tablet.streamSizes(0)[0]); + ASSERT_EQ(1, tablet.streamCount(stripeIdentifier)); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[0]); - auto streamLoaders = tablet.load(0, std::array{0}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0}); ASSERT_EQ(1, streamLoaders.size()); // Chunks requested, and min chunk size is zero, so expecting two @@ -1108,14 +1115,16 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowSomeNullsNoChunks) { {{nullsVector, nimble::FlushDecision::None}, {nonNullsVector, nimble::FlushDecision::None}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // We have values in stream 2, so it is not optimized away. - ASSERT_EQ(2, tablet.streamCount(0)); - EXPECT_LT(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); + ASSERT_EQ(2, tablet.streamCount(stripeIdentifier)); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); - auto streamLoaders = tablet.load(0, std::array{0, 1}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0, 1}); ASSERT_EQ(2, streamLoaders.size()); { // No chunks requested, so expecting single chunk. @@ -1155,13 +1164,15 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowSomeNullsWithChunksMinSizeBig) { {{nullsVector, nimble::FlushDecision::Chunk}, {nonNullsVector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); - ASSERT_EQ(2, tablet.streamCount(0)); - EXPECT_LT(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); + ASSERT_EQ(2, tablet.streamCount(stripeIdentifier)); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); - auto streamLoaders = tablet.load(0, std::array{0, 1}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0, 1}); ASSERT_EQ(2, streamLoaders.size()); { // Chunks requested, but min chunk size is too big, so expecting one @@ -1203,13 +1214,15 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowSomeNullsWithChunksMinSizeZero) { {{nullsVector, nimble::FlushDecision::Chunk}, {nonNullsVector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); - ASSERT_EQ(2, tablet.streamCount(0)); - EXPECT_LT(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); + ASSERT_EQ(2, tablet.streamCount(stripeIdentifier)); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); - auto streamLoaders = tablet.load(0, std::array{0, 1}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0, 1}); ASSERT_EQ(2, streamLoaders.size()); { // Chunks requested, and min chunk size is zero, so expecting two @@ -1243,15 +1256,17 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowNoNullsNoChunks) { {{vector, nimble::FlushDecision::None}, {vector, nimble::FlushDecision::None}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); - ASSERT_EQ(2, tablet.streamCount(0)); + ASSERT_EQ(2, tablet.streamCount(stripeIdentifier)); // When there are no nulls, the nulls stream is omitted. - EXPECT_EQ(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); + EXPECT_EQ(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); - auto streamLoaders = tablet.load(0, std::array{0, 1}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0, 1}); ASSERT_EQ(2, streamLoaders.size()); { // Nulls stream should be missing, as all values are non-null @@ -1281,15 +1296,17 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowNoNullsWithChunksMinSizeBig) { {{vector, nimble::FlushDecision::Chunk}, {vector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); - ASSERT_EQ(2, tablet.streamCount(0)); + ASSERT_EQ(2, tablet.streamCount(stripeIdentifier)); // When there are no nulls, the nulls stream is omitted. - EXPECT_EQ(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); + EXPECT_EQ(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); - auto streamLoaders = tablet.load(0, std::array{0, 1}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0, 1}); ASSERT_EQ(2, streamLoaders.size()); { // Nulls stream should be missing, as all values are non-null @@ -1320,15 +1337,17 @@ TEST_F(VeloxWriterTests, ChunkedStreamsRowNoNullsWithChunksMinSizeZero) { {{vector, nimble::FlushDecision::Chunk}, {vector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); - ASSERT_EQ(2, tablet.streamCount(0)); + ASSERT_EQ(2, tablet.streamCount(stripeIdentifier)); // When there are no nulls, the nulls stream is omitted. - EXPECT_EQ(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); + EXPECT_EQ(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); - auto streamLoaders = tablet.load(0, std::array{0, 1}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0, 1}); ASSERT_EQ(2, streamLoaders.size()); { // Nulls stream should be missing, as all values are non-null @@ -1359,12 +1378,13 @@ TEST_F(VeloxWriterTests, ChunkedStreamsChildAllNullsNoChunks) { {{vector, nimble::FlushDecision::None}, {vector, nimble::FlushDecision::None}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // When all rows are not null, the nulls stream is omitted. // When all values are null, the values stream is omitted. // Since these are the last two stream, they are optimized away. - ASSERT_EQ(0, tablet.streamCount(0)); + ASSERT_EQ(0, tablet.streamCount(stripeIdentifier)); }); } @@ -1384,12 +1404,13 @@ TEST_F(VeloxWriterTests, ChunkedStreamsChildAllNullsWithChunksMinSizeBig) { {{vector, nimble::FlushDecision::Chunk}, {vector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // When all rows are not null, the nulls stream is omitted. // When all values are null, the values stream is omitted. // Since these are the last two stream, they are optimized away. - ASSERT_EQ(0, tablet.streamCount(0)); + ASSERT_EQ(0, tablet.streamCount(stripeIdentifier)); }); } @@ -1409,12 +1430,13 @@ TEST_F(VeloxWriterTests, ChunkedStreamsChildAllNullsWithChunksMinSizeZero) { {{vector, nimble::FlushDecision::Chunk}, {vector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // When all rows are not null, the nulls stream is omitted. // When all values are null, the values stream is omitted. // Since these are the last two stream, they are optimized away. - ASSERT_EQ(0, tablet.streamCount(0)); + ASSERT_EQ(0, tablet.streamCount(stripeIdentifier)); }); } @@ -1441,16 +1463,18 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapAllNullsNoChunks) { {{vector, nimble::FlushDecision::None}, {vector, nimble::FlushDecision::None}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // Expected streams: // 0: Row nulls stream (expected empty, as all values are not null) // 1: Flatmap nulls stream - ASSERT_EQ(2, tablet.streamCount(0)); - EXPECT_EQ(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); + ASSERT_EQ(2, tablet.streamCount(stripeIdentifier)); + EXPECT_EQ(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); - auto streamLoaders = tablet.load(0, std::array{0, 1}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0, 1}); ASSERT_EQ(2, streamLoaders.size()); // No chunks used, so expecting single chunk @@ -1482,16 +1506,18 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapAllNullsWithChunksMinSizeBig) { {{vector, nimble::FlushDecision::Chunk}, {vector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // Expected streams: // 0: Row nulls stream (expected empty, as all values are not null) // 1: Flatmap nulls stream - ASSERT_EQ(2, tablet.streamCount(0)); - EXPECT_EQ(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); + ASSERT_EQ(2, tablet.streamCount(stripeIdentifier)); + EXPECT_EQ(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); - auto streamLoaders = tablet.load(0, std::array{0, 1}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0, 1}); ASSERT_EQ(2, streamLoaders.size()); // Chunks requested, but min size is too big, so expecting single merged @@ -1524,16 +1550,18 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapAllNullsWithChunksMinSizeZero) { {{vector, nimble::FlushDecision::Chunk}, {vector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // Expected streams: // 0: Row nulls stream (expected empty, as all values are not null) // 1: Flatmap nulls stream - ASSERT_EQ(2, tablet.streamCount(0)); - EXPECT_EQ(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); + ASSERT_EQ(2, tablet.streamCount(stripeIdentifier)); + EXPECT_EQ(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); - auto streamLoaders = tablet.load(0, std::array{0, 1}); + auto streamLoaders = + tablet.load(stripeIdentifier, std::array{0, 1}); ASSERT_EQ(2, streamLoaders.size()); // Chunks requested, with min size zero, so expecting two chunks @@ -1577,6 +1605,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsNoChunks) { {{nullsVector, nimble::FlushDecision::None}, {nonNullsVector, nimble::FlushDecision::None}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // Expected streams: @@ -1584,14 +1613,14 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsNoChunks) { // 1: Flatmap nulls stream // 2: Scalar stream (flatmap value for key 5) // 3: Scalar stream (flatmap in-map for key 5) - ASSERT_EQ(4, tablet.streamCount(0)); - EXPECT_EQ(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); - EXPECT_LT(0, tablet.streamSizes(0)[2]); - EXPECT_LT(0, tablet.streamSizes(0)[3]); + ASSERT_EQ(4, tablet.streamCount(stripeIdentifier)); + EXPECT_EQ(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[2]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[3]); auto streamLoaders = - tablet.load(0, std::array{0, 1, 2, 3}); + tablet.load(stripeIdentifier, std::array{0, 1, 2, 3}); ASSERT_EQ(4, streamLoaders.size()); EXPECT_FALSE(streamLoaders[0]); @@ -1654,6 +1683,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsWithChunksMinSizeBig) { {{nullsVector, nimble::FlushDecision::Chunk}, {nonNullsVector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // Expected streams: @@ -1661,14 +1691,14 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsWithChunksMinSizeBig) { // 1: Flatmap nulls stream // 2: Scalar stream (flatmap value for key 5) // 3: Scalar stream (flatmap in-map for key 5) - ASSERT_EQ(4, tablet.streamCount(0)); - EXPECT_EQ(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); - EXPECT_LT(0, tablet.streamSizes(0)[2]); - EXPECT_LT(0, tablet.streamSizes(0)[3]); + ASSERT_EQ(4, tablet.streamCount(stripeIdentifier)); + EXPECT_EQ(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[2]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[3]); auto streamLoaders = - tablet.load(0, std::array{0, 1, 2, 3}); + tablet.load(stripeIdentifier, std::array{0, 1, 2, 3}); ASSERT_EQ(4, streamLoaders.size()); EXPECT_FALSE(streamLoaders[0]); @@ -1734,6 +1764,7 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsWithChunksMinSizeZero) { {{nullsVector, nimble::FlushDecision::Chunk}, {nonNullsVector, nimble::FlushDecision::Chunk}}, [&](const auto& tablet) { + auto stripeIdentifier = tablet.getStripeIdentifier(0); ASSERT_EQ(1, tablet.stripeCount()); // Expected streams: @@ -1741,14 +1772,14 @@ TEST_F(VeloxWriterTests, ChunkedStreamsFlatmapSomeNullsWithChunksMinSizeZero) { // 1: Flatmap nulls stream // 2: Scalar stream (flatmap value for key 5) // 3: Scalar stream (flatmap in-map for key 5) - ASSERT_EQ(4, tablet.streamCount(0)); - EXPECT_EQ(0, tablet.streamSizes(0)[0]); - EXPECT_LT(0, tablet.streamSizes(0)[1]); - EXPECT_LT(0, tablet.streamSizes(0)[2]); - EXPECT_LT(0, tablet.streamSizes(0)[3]); + ASSERT_EQ(4, tablet.streamCount(stripeIdentifier)); + EXPECT_EQ(0, tablet.streamSizes(stripeIdentifier)[0]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[1]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[2]); + EXPECT_LT(0, tablet.streamSizes(stripeIdentifier)[3]); auto streamLoaders = - tablet.load(0, std::array{0, 1, 2, 3}); + tablet.load(stripeIdentifier, std::array{0, 1, 2, 3}); ASSERT_EQ(4, streamLoaders.size()); EXPECT_FALSE(streamLoaders[0]);