Skip to content

Commit

Permalink
Add test to do map->flatmap and then passthrough that same flatmap
Browse files Browse the repository at this point in the history
Summary:
Test that creates a fuzz map, writes it to a nimble file with flatmap encoding turned on, then reads it back and writes the same vector again. Test ensures that the two paths 
map input ->flatmap vector to write
and
flatmap input->flatmap vector to write

have the same right side output

Reviewed By: helfman

Differential Revision: D62110004
  • Loading branch information
Kunal Kataria authored and facebook-github-bot committed Sep 26, 2024
1 parent 46a7d80 commit 47c51a0
Showing 1 changed file with 145 additions and 19 deletions.
164 changes: 145 additions & 19 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,24 +555,13 @@ class VeloxReaderTests : public ::testing::Test {
}
}

template <typename T = int32_t>
void writeAndVerify(
std::vector<velox::VectorPtr> writeFile(
std::mt19937& rng,
velox::memory::MemoryPool& pool,
const velox::RowTypePtr& type,
std::function<velox::VectorPtr(const velox::RowTypePtr&)> generator,
std::function<bool(
const velox::VectorPtr&,
const velox::VectorPtr&,
velox::vector_size_t)> validator,
size_t count,
nimble::VeloxWriterOptions writerOptions = {},
nimble::VeloxReadParams readParams = {},
std::function<bool(std::string&)> isKeyPresent = nullptr,
std::function<void(const velox::VectorPtr&)> comparator = nullptr,
bool multiSkip = false,
bool checkMemoryLeak = false) {
std::string file;
std::function<velox::VectorPtr(const velox::RowTypePtr&, int)> generator,
size_t batchCount,
std::string& file,
nimble::VeloxWriterOptions writerOptions) {
auto writeFile = std::make_unique<velox::InMemoryWriteFile>(&file);
nimble::FlushDecision decision;
writerOptions.enableChunking = true;
Expand All @@ -586,8 +575,8 @@ class VeloxReaderTests : public ::testing::Test {
nimble::VeloxWriter writer(
*rootPool_, type, std::move(writeFile), std::move(writerOptions));
bool perBatchFlush = folly::Random::oneIn(2, rng);
for (auto i = 0; i < count; ++i) {
auto vector = generator(type);
for (auto i = 0; i < batchCount; ++i) {
auto vector = generator(type, i);
int32_t rowIndex = 0;
while (rowIndex < vector->size()) {
decision = nimble::FlushDecision::None;
Expand All @@ -612,6 +601,34 @@ class VeloxReaderTests : public ::testing::Test {
folly::writeFile(file, FLAGS_output_test_file_path.c_str());
}

return expected;
}

template <typename T = int32_t>
void writeAndVerify(
std::mt19937& rng,
velox::memory::MemoryPool& pool,
const velox::RowTypePtr& type,
std::function<velox::VectorPtr(const velox::RowTypePtr&)> generator,
std::function<bool(
const velox::VectorPtr&,
const velox::VectorPtr&,
velox::vector_size_t)> validator,
size_t batchCount,
nimble::VeloxWriterOptions writerOptions = {},
nimble::VeloxReadParams readParams = {},
std::function<bool(std::string&)> isKeyPresent = nullptr,
std::function<void(const velox::VectorPtr&)> comparator = nullptr,
bool multiSkip = false,
bool checkMemoryLeak = false) {
std::string file;
auto generatorWithIndex = [&](const velox::RowTypePtr& rowType,
int /* index */) {
return generator(rowType);
};
auto expected = writeFile(
rng, type, generatorWithIndex, batchCount, file, writerOptions);

velox::InMemoryReadFile readFile(file);
auto selector = std::make_shared<velox::dwio::common::ColumnSelector>(type);
// new pool with to limit already used memory and with tracking enabled
Expand Down Expand Up @@ -3312,10 +3329,10 @@ TEST_F(VeloxReaderTests, FlatMapPassthroughFuzzer) {
}
uint32_t seed = FLAGS_reader_tests_seed > 0 ? FLAGS_reader_tests_seed
: folly::Random::rand32();
LOG(INFO) << "seed: " << seed;
VeloxMapGeneratorConfig generatorConfig{
.seed = seed,
};
LOG(INFO) << "seed: " << seed;
VeloxMapGenerator generator(leafPool_.get(), generatorConfig);

auto compareFlatMapToFlatMap = [](const velox::VectorPtr& expected,
Expand Down Expand Up @@ -3372,6 +3389,115 @@ TEST_F(VeloxReaderTests, FlatMapPassthroughFuzzer) {
}
}

TEST_F(VeloxReaderTests, MapToFlatMapAndPassthrough) {
auto mapType = velox::ROW(
{{"id_list_features",
velox::MAP(velox::INTEGER(), velox::ARRAY(velox::BIGINT()))}});
auto type = std::dynamic_pointer_cast<const velox::RowType>(mapType);

uint32_t seed = FLAGS_reader_tests_seed > 0 ? FLAGS_reader_tests_seed
: folly::Random::rand32();
LOG(INFO) << "seed: " << seed;
VeloxMapGeneratorConfig generatorConfig{
.featureTypes = mapType,
.keyType = velox::TypeKind::INTEGER,
.maxNumKVPerRow = 10,
.variantNumKV = true,
.seed = seed};
VeloxMapGenerator generator(leafPool_.get(), generatorConfig);
auto& rng = generator.rng();

nimble::VeloxWriterOptions writerOptions;
writerOptions.flatMapColumns.insert("id_list_features");
auto numFeatures = 5;
auto numRows = 6;
auto vectorGenerator = [&](const velox::RowTypePtr&, int) {
return generator.generateBatch(numRows);
};

auto batchCount = 10;

// First write, with a map input, converted to flatmap by nimble writer
std::string file;
auto expected =
writeFile(rng, type, vectorGenerator, batchCount, file, writerOptions);

velox::InMemoryReadFile readFile(file);
auto selector = std::make_shared<velox::dwio::common::ColumnSelector>(type);
// new pool with to limit already used memory and with tracking enabled
auto leakDetectPool =
facebook::velox::memory::deprecatedDefaultMemoryManager().addRootPool(
"memory_leak_detect");
auto readerPool = leakDetectPool->addLeafChild("reader_pool");

nimble::VeloxReadParams readParams;
readParams.readFlatMapFieldAsStruct.insert("id_list_features");
for (auto i = 0; i < numFeatures; ++i) {
readParams.flatMapFeatureSelector["id_list_features"].features.push_back(
folly::to<std::string>(i));
}

nimble::VeloxReader reader(
*readerPool.get(), &readFile, selector, readParams);

auto rootTypeFromSchema = convertToVeloxType(*reader.schema());
EXPECT_EQ(*type, *rootTypeFromSchema)
<< "Expected: " << type->toString()
<< ", actual: " << rootTypeFromSchema->toString();

std::vector<velox::VectorPtr> fullReadResult;
for (auto i = 0; i < expected.size(); ++i) {
velox::VectorPtr result;
auto& current = expected.at(i);
ASSERT_TRUE(reader.next(current->size(), result));

// ensure result is all flatmaps
auto rowVector = result->as<velox::RowVector>();
for (int i = 0; i < rowVector->childrenSize(); ++i) {
ASSERT_TRUE(
result->as<velox::RowVector>()->childAt(i)->type()->kind() ==
velox::TypeKind::ROW);
}
fullReadResult.push_back(std::move(result));
}
velox::VectorPtr lastResult;
ASSERT_FALSE(reader.next(1, lastResult));

auto readResultGenerator = [&](const velox::RowTypePtr&, int index) {
return fullReadResult.at(index);
};
std::string newFile;

// Do second write, this time with a flatmap input
auto nextExpected = writeFile(
rng, type, readResultGenerator, batchCount, newFile, writerOptions);

// sanity check, they should be the exact same vectors.
EXPECT_EQ(nextExpected.size(), fullReadResult.size());

velox::InMemoryReadFile flatMapReadFile(newFile);
nimble::VeloxReader flatReader(
*readerPool.get(), &flatMapReadFile, selector, readParams);

auto flatRootTypeFromSchema = convertToVeloxType(*flatReader.schema());
EXPECT_EQ(*type, *flatRootTypeFromSchema)
<< "Expected: " << type->toString()
<< ", actual: " << flatRootTypeFromSchema->toString();

// fullReadResult and nextExpected are interchangable at this point
for (auto i = 0; i < fullReadResult.size(); ++i) {
velox::VectorPtr flatResult;
auto& current = fullReadResult.at(i);
ASSERT_TRUE(flatReader.next(current->size(), flatResult));
ASSERT_EQ(current->size(), flatResult->size());
for (int j = 0; j < current->size(); ++j) {
ASSERT_TRUE(current->equalValueAt(flatResult.get(), j, j));
}
}
velox::VectorPtr lastFlatResult;
ASSERT_FALSE(flatReader.next(1, lastFlatResult));
}

TEST_F(VeloxReaderTests, FlatMapToStruct) {
auto floatFeatures = velox::MAP(velox::INTEGER(), velox::REAL());
auto idListFeatures =
Expand Down

0 comments on commit 47c51a0

Please sign in to comment.