Skip to content

Commit

Permalink
Switch to storage read if SSD cache load fails (facebookincubator#10256)
Browse files Browse the repository at this point in the history
Summary:
Currently, if an exception occurs while loading data from the SSD cache, the query fails. Instead, we should fall back to reading the data from storage.

Pull Request resolved: facebookincubator#10256

Reviewed By: xiaoxmeng, Yuhta

Differential Revision: D58768825

Pulled By: zacw7

fbshipit-source-id: d61b9f0f93e93157d6c1470d130b0eb62bdc61d0
  • Loading branch information
zacw7 authored and facebook-github-bot committed Jun 22, 2024
1 parent 3c2cc4b commit 0048553
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 15 deletions.
27 changes: 12 additions & 15 deletions velox/dwio/common/CacheInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,21 +301,18 @@ bool CacheInputStream::loadFromSsd(
MicrosecondTimer timer(&ssdLoadUs);
file.load(ssdPins, pins);
} catch (const std::exception& e) {
try {
LOG(ERROR) << "IOERR: Failed SSD loadSync " << entry.toString()
<< e.what() << process::TraceContext::statusLine()
<< fmt::format(
"stream region {} {}b, start of load {} file {}",
region_.offset,
region_.length,
region.offset - region_.offset,
fileIds().string(fileNum_));
// Remove the non-loadable entry so that next access goes to storage.
file.erase(cache::RawFileCacheKey{fileNum_, region.offset});
} catch (const std::exception&) {
// Ignore error inside logging the error.
}
throw;
LOG(ERROR) << "IOERR: Failed SSD loadSync " << entry.toString() << ' '
<< e.what() << process::TraceContext::statusLine()
<< fmt::format(
"stream region {} {}b, start of load {} file {}",
region_.offset,
region_.length,
region.offset - region_.offset,
fileIds().string(fileNum_));
// Remove the non-loadable entry so that next access goes to storage.
file.erase(cache::RawFileCacheKey{fileNum_, region.offset});
pin_ = std::move(pins[0]);
return false;
}

VELOX_CHECK(pin_.empty());
Expand Down
35 changes: 35 additions & 0 deletions velox/dwio/dwrf/test/CacheInputTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "velox/dwio/dwrf/test/TestReadFile.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

#include <fcntl.h>
#include <gtest/gtest.h>

using namespace facebook::velox;
Expand Down Expand Up @@ -131,6 +132,14 @@ class CacheTest : public ::testing::Test {
}
}

// Corrupts the file by invalidate its content.
static void corruptSsdFile(const std::string& path) {
const auto fd = ::open(path.c_str(), O_WRONLY);
const auto size = ::lseek(fd, 0, SEEK_END);
ASSERT_EQ(ftruncate(fd, 0), 0);
ASSERT_EQ(ftruncate(fd, size), 0);
}

static void checkEntry(const cache::AsyncDataCacheEntry& entry) {
uint64_t seed = entry.key().fileNum.id();
if (entry.tinyData()) {
Expand Down Expand Up @@ -932,4 +941,30 @@ TEST_F(CacheTest, ssdReadVerification) {
ASSERT_GT(ioStats_->read().sum(), 0);
ASSERT_GT(ioStats_->ramHit().sum(), 0);
ASSERT_GT(ioStats_->ssdRead().sum(), 0);

// Corrupt SSD cache file.
corruptSsdFile(fmt::format("{}/cache0", tempDirectory_->getPath()));
// Clear memory cache to force ssd read.
cache_->testingClear();

// Record the baseline stats.
const auto prevStats = cache_->refreshStats();
const auto prevRead = ioStats_->read().sum();
const auto prevRamHit = ioStats_->ramHit().sum();
const auto prevSsdRead = ioStats_->ssdRead().sum();

// Read from the corrupted cache.
readData(kSsdBytes);
waitForWrite();
stats = cache_->refreshStats();
// Expect all new reads to be recorded as corruptions.
ASSERT_GT(ioStats_->read().sum(), prevRead);
ASSERT_GT(stats.ssdStats->readSsdCorruptions, 0);
ASSERT_EQ(
stats.ssdStats->readSsdCorruptions,
stats.ssdStats->entriesRead - prevStats.ssdStats->entriesRead);
// Expect no new succeeded cache hits.
ASSERT_EQ(stats.numHit, prevStats.numHit);
ASSERT_EQ(ioStats_->ramHit().sum(), prevRamHit);
ASSERT_EQ(ioStats_->ssdRead().sum(), prevSsdRead);
}

0 comments on commit 0048553

Please sign in to comment.