Skip to content

Commit

Permalink
Make TabletReader thread safe
Browse files Browse the repository at this point in the history
Summary: We want to use this class in parallel.

Reviewed By: helfman

Differential Revision: D56796589
  • Loading branch information
Daniel Munoz authored and facebook-github-bot committed May 2, 2024
1 parent 07590e1 commit 3a4f520
Show file tree
Hide file tree
Showing 8 changed files with 529 additions and 181 deletions.
149 changes: 86 additions & 63 deletions dwio/nimble/tablet/TabletReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,11 @@ MetadataBuffer::MetadataBuffer(
iobuf.computeChainDataLength(),
type} {}

void TabletReader::StripeGroup::reset(
TabletReader::StripeGroup::StripeGroup(
uint32_t stripeGroupIndex,
const MetadataBuffer& stripes,
uint32_t stripeIndex,
std::unique_ptr<MetadataBuffer> stripeGroup) {
index_ = stripeGroupIndex;
metadata_ = std::move(stripeGroup);
std::unique_ptr<MetadataBuffer> stripeGroup)
: metadata_{std::move(stripeGroup)}, index_{stripeGroupIndex} {
auto metadataRoot =
asFlatBuffersRoot<serialization::StripeGroup>(metadata_->content());
auto stripesRoot =
Expand All @@ -179,13 +177,16 @@ void TabletReader::StripeGroup::reset(

// Find the first stripe that use this stripe group
auto groupIndices = stripesRoot->group_indices()->data();
while (stripeIndex > 0) {
if (groupIndices[stripeIndex] != groupIndices[stripeIndex - 1]) {
break;
for (uint32_t stripeIndex = 0,
groupIndicesSize = stripesRoot->group_indices()->size();
stripeIndex < groupIndicesSize;
++stripeIndex) {
if (groupIndices[stripeIndex] == stripeGroupIndex) {
firstStripe_ = stripeIndex;
return;
}
--stripeIndex;
}
firstStripe_ = stripeIndex;
NIMBLE_UNREACHABLE("No stripe found for stripe group");
}

std::span<const uint32_t> TabletReader::StripeGroup::streamOffsets(
Expand Down Expand Up @@ -254,12 +255,18 @@ TabletReader::TabletReader(
ownedFile_{std::move(readFile)},
ps_{std::move(postscript)},
footer_{std::make_unique<MetadataBuffer>(memoryPool, footer)},
stripes_{std::make_unique<MetadataBuffer>(memoryPool, stripes)} {
stripeGroup_.reset(
/* stripeGroupIndex */ 0,
*stripes_,
/* stripeIndex */ 0,
std::make_unique<MetadataBuffer>(memoryPool, stripeGroup));
stripes_{std::make_unique<MetadataBuffer>(memoryPool, stripes)},
stripeGroupCache_{[this](uint32_t stripeGroupIndex) {
return loadStripeGroup(stripeGroupIndex);
}} {
auto stripeGroupPtr =
stripeGroupCache_.get(0, [this, stripeGroup](uint32_t stripeGroupIndex) {
return std::make_shared<StripeGroup>(
stripeGroupIndex,
*stripes_,
std::make_unique<MetadataBuffer>(memoryPool_, stripeGroup));
});
*firstStripeGroup_.wlock() = std::move(stripeGroupPtr);
initStripes();
auto optionalSectionsCacheLock = optionalSectionsCache_.wlock();
for (auto& pair : optionalSections) {
Expand All @@ -281,7 +288,11 @@ TabletReader::TabletReader(
MemoryPool& memoryPool,
velox::ReadFile* readFile,
const std::vector<std::string>& preloadOptionalSections)
: memoryPool_{memoryPool}, file_{readFile} {
: memoryPool_{memoryPool},
file_{readFile},
stripeGroupCache_{[this](uint32_t stripeGroupIndex) {
return loadStripeGroup(stripeGroupIndex);
}} {
// We make an initial read of the last piece of the file, and then do
// another read if our first one didn't cover the whole footer. We could
// make this a parameter to the constructor later.
Expand Down Expand Up @@ -343,16 +354,20 @@ TabletReader::TabletReader(
auto stripeGroup = stripeGroups->Get(0);
if (stripeGroups->size() == 1 &&
stripeGroup->offset() + readSize >= fileSize) {
stripeGroup_.reset(
/* stripeGroupIndex */ 0,
*stripes_,
/* stripeIndex */ 0,
std::make_unique<MetadataBuffer>(
memoryPool_,
footerIOBuf,
stripeGroup->offset() + readSize - fileSize,
stripeGroup->size(),
static_cast<CompressionType>(stripeGroup->compression_type())));
auto stripeGroupPtr =
stripeGroupCache_.get(0, [&](uint32_t stripeGroupIndex) {
return std::make_shared<StripeGroup>(
stripeGroupIndex,
*stripes_,
std::make_unique<MetadataBuffer>(
memoryPool_,
footerIOBuf,
stripeGroup->offset() + readSize - fileSize,
stripeGroup->size(),
static_cast<CompressionType>(
stripeGroup->compression_type())));
});
*firstStripeGroup_.wlock() = std::move(stripeGroupPtr);
}
}

Expand Down Expand Up @@ -514,55 +529,63 @@ void TabletReader::initStripes() {
}
}

void TabletReader::ensureStripeGroup(uint32_t stripe) const {
uint32_t TabletReader::getStripeGroupIndex(uint32_t stripeIndex) const {
const auto stripesRoot =
asFlatBuffersRoot<serialization::Stripes>(stripes_->content());
return stripesRoot->group_indices()->Get(stripeIndex);
}

std::shared_ptr<TabletReader::StripeGroup> TabletReader::loadStripeGroup(
uint32_t stripeGroupIndex) const {
auto footerRoot =
asFlatBuffersRoot<serialization::Footer>(footer_->content());
auto stripesRoot =
asFlatBuffersRoot<serialization::Stripes>(stripes_->content());
auto targetIndex = stripesRoot->group_indices()->Get(stripe);
if (targetIndex != stripeGroup_.index()) {
auto stripeGroup = footerRoot->stripe_groups()->Get(targetIndex);
velox::common::Region stripeGroupRegion{
stripeGroup->offset(), stripeGroup->size(), "StripeGroup"};
folly::IOBuf result;
file_->preadv({&stripeGroupRegion, 1}, {&result, 1});

stripeGroup_.reset(
targetIndex,
*stripes_,
stripe,
std::make_unique<MetadataBuffer>(
memoryPool_,
result,
static_cast<CompressionType>(stripeGroup->compression_type())));
}
auto stripeGroupInfo = footerRoot->stripe_groups()->Get(stripeGroupIndex);
velox::common::Region stripeGroupRegion{
stripeGroupInfo->offset(), stripeGroupInfo->size(), "StripeGroup"};
folly::IOBuf buffer;
file_->preadv({&stripeGroupRegion, 1}, {&buffer, 1});

// Reset the first stripe group that was loaded when we load another one
firstStripeGroup_.wlock()->reset();

return std::make_shared<StripeGroup>(
stripeGroupIndex,
*stripes_,
std::make_unique<MetadataBuffer>(
memoryPool_,
buffer,
static_cast<CompressionType>(stripeGroupInfo->compression_type())));
}

std::shared_ptr<TabletReader::StripeGroup> TabletReader::getStripeGroup(
uint32_t stripeGroupIndex) const {
return stripeGroupCache_.get(stripeGroupIndex);
}

std::span<const uint32_t> TabletReader::streamOffsets(uint32_t stripe) const {
ensureStripeGroup(stripe);
return stripeGroup_.streamOffsets(stripe);
std::span<const uint32_t> TabletReader::streamOffsets(
const StripeIdentifier& stripe) const {
return stripe.stripeGroup_->streamOffsets(stripe.stripeId_);
}

std::span<const uint32_t> TabletReader::streamSizes(uint32_t stripe) const {
ensureStripeGroup(stripe);
return stripeGroup_.streamSizes(stripe);
std::span<const uint32_t> TabletReader::streamSizes(
const StripeIdentifier& stripe) const {
return stripe.stripeGroup_->streamSizes(stripe.stripeId_);
}

uint32_t TabletReader::streamCount(uint32_t stripe) const {
ensureStripeGroup(stripe);
return stripeGroup_.streamCount();
uint32_t TabletReader::streamCount(const StripeIdentifier& stripe) const {
return stripe.stripeGroup_->streamCount();
}

std::vector<std::unique_ptr<StreamLoader>> TabletReader::load(
uint32_t stripe,
const StripeIdentifier& stripe,
std::span<const uint32_t> streamIdentifiers,
std::function<std::string_view(uint32_t)> streamLabel) const {
NIMBLE_CHECK(stripe < stripeCount_, "Stripe is out of range.");
NIMBLE_CHECK(stripe.stripeId_ < stripeCount_, "Stripe is out of range.");

const uint64_t stripeOffset = this->stripeOffset(stripe);
ensureStripeGroup(stripe);
const auto stripeStreamOffsets = stripeGroup_.streamOffsets(stripe);
const auto stripeStreamSizes = stripeGroup_.streamSizes(stripe);
const uint64_t stripeOffset = this->stripeOffset(stripe.stripeId_);
const auto& stripeGroup = stripe.stripeGroup_;
const auto stripeStreamOffsets = stripeGroup->streamOffsets(stripe.stripeId_);
const auto stripeStreamSizes = stripeGroup->streamSizes(stripe.stripeId_);
const uint32_t streamsToLoad = streamIdentifiers.size();

std::vector<std::unique_ptr<StreamLoader>> streams(streamsToLoad);
Expand All @@ -573,7 +596,7 @@ std::vector<std::unique_ptr<StreamLoader>> TabletReader::load(

for (uint32_t i = 0; i < streamsToLoad; ++i) {
const uint32_t streamIdentifier = streamIdentifiers[i];
if (streamIdentifier >= stripeGroup_.streamCount()) {
if (streamIdentifier >= stripeGroup->streamCount()) {
streams[i] = nullptr;
continue;
}
Expand Down
128 changes: 95 additions & 33 deletions dwio/nimble/tablet/TabletReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,48 @@ class StreamLoader {
virtual const std::string_view getStream() const = 0;
};

template <typename Key, typename Value>
class ReferenceCountedCache {
public:
using BuilderCallback = std::function<std::shared_ptr<Value>(Key)>;

explicit ReferenceCountedCache(BuilderCallback builder)
: builder_{std::move(builder)} {}

std::shared_ptr<Value> get(Key key) {
return getPopulatedCacheEntry(key, builder_);
}

std::shared_ptr<Value> get(Key key, const BuilderCallback& builder) {
return getPopulatedCacheEntry(key, builder);
}

private:
folly::Synchronized<std::weak_ptr<Value>>& getCacheEntry(Key key) {
return cache_.wlock()->emplace(key, std::weak_ptr<Value>()).first->second;
}

std::shared_ptr<Value> getPopulatedCacheEntry(
Key key,
const BuilderCallback& builder) {
auto& entry = getCacheEntry(key);
auto wlockedEntry = entry.wlock();
auto sharedPtr = wlockedEntry->lock();
if (sharedPtr) {
return sharedPtr;
}
auto element = builder(key);
std::weak_ptr<Value>(element).swap(*wlockedEntry);
NIMBLE_DASSERT(!wlockedEntry->expired(), "Shouldn't be expired");
return element;
}

BuilderCallback builder_;
folly::Synchronized<
std::unordered_map<Key, folly::Synchronized<std::weak_ptr<Value>>>>
cache_;
};

// Provides read access to a tablet written by a TabletWriter.
// Example usage to read all streams from stripe 0 in a file:
// auto readFile = std::make_unique<LocalReadFile>("/tmp/myfile");
Expand All @@ -138,7 +180,45 @@ class StreamLoader {
// |serializedStreams[i]| now contains the stream corresponding to
// the stream identifier provided in the input vector.
class TabletReader {
struct StripeGroup {
StripeGroup(
uint32_t stripeGroupIndex,
const MetadataBuffer& stripes,
std::unique_ptr<MetadataBuffer> metadata);

uint32_t index() const {
return index_;
}

uint32_t streamCount() const {
return streamCount_;
}

std::span<const uint32_t> streamOffsets(uint32_t stripe) const;
std::span<const uint32_t> streamSizes(uint32_t stripe) const;

private:
std::unique_ptr<MetadataBuffer> metadata_;
uint32_t index_;
uint32_t streamCount_;
uint32_t firstStripe_;
const uint32_t* streamOffsets_;
const uint32_t* streamSizes_;
};

public:
class StripeIdentifier {
explicit StripeIdentifier(
uint32_t stripeId,
std::shared_ptr<StripeGroup> stripeGroup)
: stripeId_{stripeId}, stripeGroup_{std::move(stripeGroup)} {}

uint32_t stripeId_;
std::shared_ptr<StripeGroup> stripeGroup_;

friend class TabletReader;
};

// Compute checksum from the beginning of the file all the way to footer
// size and footer compression type field in postscript.
// chunkSize means each time reads up to chunkSize, until all data are done.
Expand All @@ -161,7 +241,7 @@ class TabletReader {
// span. If a stream was not present in the given stripe a nullptr is returned
// in its slot.
std::vector<std::unique_ptr<StreamLoader>> load(
uint32_t stripe,
const StripeIdentifier& stripe,
std::span<const uint32_t> streamIdentifiers,
std::function<std::string_view(uint32_t)> streamLabel = [](uint32_t) {
return std::string_view{};
Expand Down Expand Up @@ -221,48 +301,28 @@ class TabletReader {
// Returns stream offsets for the specified stripe. Number of streams is
// determined by schema node count at the time when stripe is written, so it
// may have fewer number of items than the final schema node count
std::span<const uint32_t> streamOffsets(uint32_t stripe) const;
std::span<const uint32_t> streamOffsets(const StripeIdentifier& stripe) const;

// Returns stream sizes for the specified stripe. Has same constraint as
// `streamOffsets()`.
std::span<const uint32_t> streamSizes(uint32_t stripe) const;
std::span<const uint32_t> streamSizes(const StripeIdentifier& stripe) const;

// Returns stream count for the specified stripe. Has same constraint as
// `streamOffsets()`.
uint32_t streamCount(uint32_t stripe) const;
uint32_t streamCount(const StripeIdentifier& stripe) const;

private:
struct StripeGroup {
uint32_t index() const {
return index_;
}

uint32_t streamCount() const {
return streamCount_;
}

void reset(
uint32_t stripeGroupIndex,
const MetadataBuffer& stripes,
uint32_t stripeIndex,
std::unique_ptr<MetadataBuffer> metadata);

std::span<const uint32_t> streamOffsets(uint32_t stripe) const;
std::span<const uint32_t> streamSizes(uint32_t stripe) const;
StripeIdentifier getStripeIdentifier(uint32_t stripeIndex) const {
return StripeIdentifier{
stripeIndex, getStripeGroup(getStripeGroupIndex(stripeIndex))};
}

private:
std::unique_ptr<MetadataBuffer> metadata_;
uint32_t index_{std::numeric_limits<uint32_t>::max()};
uint32_t streamCount_{0};
uint32_t firstStripe_{0};
const uint32_t* streamOffsets_{nullptr};
const uint32_t* streamSizes_{nullptr};
};
private:
uint32_t getStripeGroupIndex(uint32_t stripeIndex) const;
std::shared_ptr<StripeGroup> loadStripeGroup(uint32_t stripeGroupIndex) const;
std::shared_ptr<StripeGroup> getStripeGroup(uint32_t stripeGroupIndex) const;

void initStripes();

void ensureStripeGroup(uint32_t stripe) const;

// For testing use
TabletReader(
MemoryPool& memoryPool,
Expand All @@ -280,7 +340,9 @@ class TabletReader {
Postscript ps_;
std::unique_ptr<MetadataBuffer> footer_;
std::unique_ptr<MetadataBuffer> stripes_;
mutable StripeGroup stripeGroup_;

mutable ReferenceCountedCache<uint32_t, StripeGroup> stripeGroupCache_;
mutable folly::Synchronized<std::shared_ptr<StripeGroup>> firstStripeGroup_;

uint64_t tabletRowCount_;
uint32_t stripeCount_{0};
Expand Down
Loading

0 comments on commit 3a4f520

Please sign in to comment.