From 7fc09667d5e22c684fdeff81da529b79cc974fee Mon Sep 17 00:00:00 2001 From: Kevin Wilfong Date: Tue, 26 Mar 2024 13:09:58 -0700 Subject: [PATCH] Add estimateSerializedSize to BatchVectorSerializer (#8712) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/8712 This change adds an estimateSerializedSize function to the BatchVectorSerializer interface. In the DefaultBatchVectorSerializer this simple defers to the existing estimateSerializedSize in VectorSerde. However, in PrestoBatchVectorSerializer since we preserve encodings, its implementation needs to take this into account. PrestoBatchVectorSerializer's shares a lot of code with PrestoVectorSerde's estimateSerializedSize, but adds the following: * estimateConstantSerializedSize: this sets the size to the size of the single constant value, regardless of the width of the range. * estimateDictionarySerializedSize: this sets the size to the aggregate of the indices plus the size of the selected entries in the dictionary. * estimateSerializedSizeImpl: like estimateSerializedSizeInt in PrestoVectorSerde, this drives the estimation, but now calling our new functions I found a few bugs in the existing estimation logic which i fixed. Note that we inherit a number of inaccuracies from PrestoVectorSerde's estimateSerializedSize, but these are mostly constants if you ignore that they become multiplied when they occur in the elements of a complex type (it really is just an estimate). It also doesn't account for the fact the Serializer may disable dictionary encoding if it doesn't provide value. I plan to add this in a follow up to keep this change from becoming more complicated. Reviewed By: bikramSingh91 Differential Revision: D53593847 fbshipit-source-id: 639d01437797e07c378bc837e0a0283c7d08343d --- .../tests/SumDataSizeForStatsTest.cpp | 6 +- velox/serializers/PrestoSerializer.cpp | 306 ++++++++++++++-- .../tests/PrestoSerializerTest.cpp | 342 ++++++++++++++++++ velox/vector/VectorStream.cpp | 8 + velox/vector/VectorStream.h | 6 + 5 files changed, 638 insertions(+), 30 deletions(-) diff --git a/velox/functions/prestosql/aggregates/tests/SumDataSizeForStatsTest.cpp b/velox/functions/prestosql/aggregates/tests/SumDataSizeForStatsTest.cpp index d3d9d5f89915..eab51e9e6a66 100644 --- a/velox/functions/prestosql/aggregates/tests/SumDataSizeForStatsTest.cpp +++ b/velox/functions/prestosql/aggregates/tests/SumDataSizeForStatsTest.cpp @@ -217,7 +217,7 @@ TEST_F(SumDataSizeForStatsTest, complexRecursiveGlobalAggregate) { }), })}; - testAggregations(vectors, {}, {"sum_data_size_for_stats(c0)"}, "SELECT 118"); + testAggregations(vectors, {}, {"sum_data_size_for_stats(c0)"}, "SELECT 115"); } TEST_F(SumDataSizeForStatsTest, constantEncodingTest) { @@ -269,10 +269,10 @@ TEST_F(SumDataSizeForStatsTest, dictionaryEncodingTest) { BaseVector::wrapInDictionary(nullptr, indices, size, columnTwo); auto vectors = {makeRowVector({columnOne, columnTwoDictionaryEncoded})}; - testAggregations(vectors, {}, {"sum_data_size_for_stats(c1)"}, "SELECT 118"); + testAggregations(vectors, {}, {"sum_data_size_for_stats(c1)"}, "SELECT 115"); testAggregations( - vectors, {"c0"}, {"sum_data_size_for_stats(c1)"}, "VALUES (1,82),(2,36)"); + vectors, {"c0"}, {"sum_data_size_for_stats(c1)"}, "VALUES (1,79),(2,36)"); } TEST_F(SumDataSizeForStatsTest, mask) { diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 97b8cd52e100..602301f82599 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -2069,6 +2069,30 @@ static inline int32_t rangesTotalSize( return total; } +template +vector_size_t computeSelectedIndices( + const DictionaryVector* dictionaryVector, + const folly::Range& ranges, + Scratch& scratch, + vector_size_t* selectedIndices) { + // Create a bit set to track which values in the Dictionary are used. + ScratchPtr usedIndicesHolder(scratch); + auto* usedIndices = usedIndicesHolder.get( + bits::nwords(dictionaryVector->valueVector()->size())); + simd::memset(usedIndices, 0, usedIndicesHolder.size() * sizeof(uint64_t)); + + auto* indices = dictionaryVector->indices()->template as(); + for (const auto& range : ranges) { + for (auto i = 0; i < range.size; ++i) { + bits::setBit(usedIndices, indices[range.begin + i]); + } + } + + // Convert the bitset to a list of the used indices. + return simd::indicesOfSetBits( + usedIndices, 0, dictionaryVector->valueVector()->size(), selectedIndices); +} + template void serializeDictionaryVector( const VectorPtr& vector, @@ -2082,9 +2106,11 @@ void serializeDictionaryVector( return; } + auto numRows = rangesTotalSize(ranges); + // Cannot serialize dictionary as PrestoPage dictionary if it has nulls. if (vector->nulls()) { - stream->flattenStream(vector, rangesTotalSize(ranges)); + stream->flattenStream(vector, numRows); serializeWrapped(vector, ranges, stream, scratch); return; } @@ -2092,27 +2118,11 @@ void serializeDictionaryVector( using T = typename KindToFlatVector::WrapperType; auto dictionaryVector = vector->as>(); - // Create a bit set to track which values in the Dictionary are used. - ScratchPtr usedIndicesHolder(scratch); - auto* usedIndices = usedIndicesHolder.get( - bits::nwords(dictionaryVector->valueVector()->size())); - simd::memset(usedIndices, 0, usedIndicesHolder.size() * sizeof(uint64_t)); - - auto* indices = dictionaryVector->indices()->template as(); - vector_size_t numRows = 0; - for (const auto& range : ranges) { - numRows += range.size; - for (auto i = 0; i < range.size; ++i) { - bits::setBit(usedIndices, indices[range.begin + i]); - } - } - - // Convert the bitset to a list of the used indices. ScratchPtr selectedIndicesHolder(scratch); auto* mutableSelectedIndices = selectedIndicesHolder.get(dictionaryVector->valueVector()->size()); - auto numUsed = simd::indicesOfSetBits( - usedIndices, 0, selectedIndicesHolder.size(), mutableSelectedIndices); + auto numUsed = computeSelectedIndices( + dictionaryVector, ranges, scratch, mutableSelectedIndices); // If the values are fixed width and we aren't getting enough reuse to justify // the dictionary, flatten it. @@ -2158,6 +2168,7 @@ void serializeDictionaryVector( // Write out the indices, translating them using the above mapping. stream->appendNonNull(numRows); + auto* indices = dictionaryVector->indices()->template as(); for (const auto& range : ranges) { for (auto i = 0; i < range.size; ++i) { stream->appendOne(updatedIndices[indices[range.begin + i]]); @@ -2358,6 +2369,10 @@ int32_t rowsToRanges( auto ranges = rangesHolder.get(numInner); int32_t fill = 0; for (auto i = 0; i < numInner; ++i) { + // Add the size of the length. + if (sizesPtr) { + *sizesPtr[rawNulls ? nonNullRows[i] : i] += sizeof(int32_t); + } if (sizes[innerRows[i]] == 0) { continue; } @@ -2904,14 +2919,23 @@ void expandRepeatedRanges( for (int32_t i = 0; i < ranges.size(); ++i) { int32_t begin = ranges[i].begin; int32_t end = begin + ranges[i].size; - *sizes[i] += sizeof(int32_t); + bool hasNull = false; for (int32_t offset = begin; offset < end; ++offset) { - if (!vector->isNullAt(offset)) { + if (vector->isNullAt(offset)) { + hasNull = true; + } else { + // Add the size of the length. + *sizes[i] += sizeof(int32_t); childRanges->push_back( IndexRange{rawOffsets[offset], rawSizes[offset]}); childSizes->push_back(sizes[i]); } } + + if (hasNull) { + // Add the size of the null bit mask. + *sizes[i] += bits::nbytes(ranges[i].size); + } } } @@ -2926,11 +2950,17 @@ void estimateFlatSerializedSize( for (int32_t i = 0; i < ranges.size(); ++i) { auto end = ranges[i].begin + ranges[i].size; auto numValues = bits::countBits(rawNulls, ranges[i].begin, end); - *(sizes[i]) += - numValues * valueSize + bits::nbytes(ranges[i].size - numValues); + // Add the size of the values. + *(sizes[i]) += numValues * valueSize; + // Add the size of the null bit mask if there are nulls in the range. + if (numValues != ranges[i].size) { + *(sizes[i]) += bits::nbytes(ranges[i].size); + } } } else { for (int32_t i = 0; i < ranges.size(); ++i) { + // Add the size of the values (there's not bit mask since there are no + // nulls). *(sizes[i]) += ranges[i].size * valueSize; } } @@ -3024,7 +3054,7 @@ void estimateWrapperSerializedSize( } template -void estimateConstantSerializedSize( +void estimateFlattenedConstantSerializedSize( const BaseVector* vector, const folly::Range& ranges, vector_size_t** sizes, @@ -3065,7 +3095,7 @@ void estimateSerializedSizeInt( break; case VectorEncoding::Simple::CONSTANT: VELOX_DYNAMIC_TYPE_DISPATCH_ALL( - estimateConstantSerializedSize, + estimateFlattenedConstantSerializedSize, vector->typeKind(), vector, ranges, @@ -3278,7 +3308,7 @@ void estimateWrapperSerializedSize( } template -void estimateConstantSerializedSize( +void estimateFlattenedConstantSerializedSize( const BaseVector* vector, const folly::Range& rows, vector_size_t** sizes, @@ -3333,7 +3363,7 @@ void estimateSerializedSizeInt( } case VectorEncoding::Simple::CONSTANT: VELOX_DYNAMIC_TYPE_DISPATCH_ALL( - estimateConstantSerializedSize, + estimateFlattenedConstantSerializedSize, vector->typeKind(), vector, rows, @@ -3623,6 +3653,91 @@ FlushSizes flushStreams( } } +template +void estimateConstantSerializedSize( + const VectorPtr& vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) { + VELOX_CHECK(vector->encoding() == VectorEncoding::Simple::CONSTANT); + using T = typename KindToFlatVector::WrapperType; + auto constantVector = vector->as>(); + vector_size_t elementSize = 0; + if (constantVector->isNullAt(0)) { + // There's just a bit mask for the one null. + elementSize = 1; + } else if (constantVector->valueVector()) { + std::vector newRanges; + newRanges.push_back({constantVector->index(), 1}); + auto* elementSizePtr = &elementSize; + // In PrestoBatchVectorSerializer we don't preserve the encodings for the + // valueVector for a ConstantVector. + estimateSerializedSizeInt( + constantVector->valueVector().get(), + newRanges, + &elementSizePtr, + scratch); + } else if (std::is_same_v) { + auto value = constantVector->valueAt(0); + auto string = reinterpret_cast(&value); + elementSize = string->size(); + } else { + elementSize = sizeof(T); + } + + for (int32_t i = 0; i < ranges.size(); ++i) { + *sizes[i] += elementSize; + } +} + +template +void estimateDictionarySerializedSize( + const VectorPtr& vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) { + VELOX_CHECK(vector->encoding() == VectorEncoding::Simple::DICTIONARY); + using T = typename KindToFlatVector::WrapperType; + auto dictionaryVector = vector->as>(); + + // We don't currently support serializing DictionaryVectors with nulls, so use + // the flattened size. + if (dictionaryVector->nulls()) { + estimateWrapperSerializedSize(ranges, sizes, vector.get(), scratch); + return; + } + + // This will ultimately get passed to simd::transpose, so it needs to be a + // raw_vector. + raw_vector childIndices; + std::vector childSizes; + for (int rangeIndex = 0; rangeIndex < ranges.size(); rangeIndex++) { + ScratchPtr selectedIndicesHolder(scratch); + auto* mutableSelectedIndices = + selectedIndicesHolder.get(dictionaryVector->valueVector()->size()); + auto numUsed = computeSelectedIndices( + dictionaryVector, + ranges.subpiece(rangeIndex, 1), + scratch, + mutableSelectedIndices); + for (int i = 0; i < numUsed; i++) { + childIndices.push_back(mutableSelectedIndices[i]); + childSizes.push_back(sizes[rangeIndex]); + } + + // Add the size of the indices. + *sizes[rangeIndex] += sizeof(int32_t) * ranges[rangeIndex].size; + } + + // In PrestoBatchVectorSerializer we don't preserve the encodings for the + // valueVector for a DictionaryVector. + estimateSerializedSizeInt( + dictionaryVector->valueVector().get(), + childIndices, + childSizes.data(), + scratch); +} + class PrestoBatchVectorSerializer : public BatchVectorSerializer { public: PrestoBatchVectorSerializer(memory::MemoryPool* pool, const SerdeOpts& opts) @@ -3659,7 +3774,144 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { streams, numRows, arena, *codec_, opts_.minCompressionRatio, stream); } + void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) override { + estimateSerializedSizeImpl(vector, ranges, sizes, scratch); + } + private: + void estimateSerializedSizeImpl( + const VectorPtr& vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) { + switch (vector->encoding()) { + case VectorEncoding::Simple::FLAT: + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + estimateFlatSerializedSize, + vector->typeKind(), + vector.get(), + ranges, + sizes); + break; + case VectorEncoding::Simple::CONSTANT: + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + estimateConstantSerializedSize, + vector->typeKind(), + vector, + ranges, + sizes, + scratch); + break; + case VectorEncoding::Simple::DICTIONARY: + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + estimateDictionarySerializedSize, + vector->typeKind(), + vector, + ranges, + sizes, + scratch); + break; + case VectorEncoding::Simple::ROW: { + if (!vector->mayHaveNulls()) { + // Add the size of the offsets in the Row encoding. + for (int32_t i = 0; i < ranges.size(); ++i) { + *sizes[i] += ranges[i].size * sizeof(int32_t); + } + + auto rowVector = vector->as(); + auto& children = rowVector->children(); + for (auto& child : children) { + if (child) { + estimateSerializedSizeImpl(child, ranges, sizes, scratch); + } + } + + break; + } + + std::vector childRanges; + std::vector childSizes; + for (int32_t i = 0; i < ranges.size(); ++i) { + // Add the size of the nulls bit mask. + *sizes[i] += bits::nbytes(ranges[i].size); + + auto begin = ranges[i].begin; + auto end = begin + ranges[i].size; + for (auto offset = begin; offset < end; ++offset) { + // Add the size of the offset. + *sizes[i] += sizeof(int32_t); + if (!vector->isNullAt(offset)) { + childRanges.push_back(IndexRange{offset, 1}); + childSizes.push_back(sizes[i]); + } + } + } + + auto rowVector = vector->as(); + auto& children = rowVector->children(); + for (auto& child : children) { + if (child) { + estimateSerializedSizeImpl( + child, + folly::Range(childRanges.data(), childRanges.size()), + childSizes.data(), + scratch); + } + } + + break; + } + case VectorEncoding::Simple::MAP: { + auto mapVector = vector->as(); + std::vector childRanges; + std::vector childSizes; + expandRepeatedRanges( + mapVector, + mapVector->rawOffsets(), + mapVector->rawSizes(), + ranges, + sizes, + &childRanges, + &childSizes); + estimateSerializedSizeImpl( + mapVector->mapKeys(), childRanges, childSizes.data(), scratch); + estimateSerializedSizeImpl( + mapVector->mapValues(), childRanges, childSizes.data(), scratch); + break; + } + case VectorEncoding::Simple::ARRAY: { + auto arrayVector = vector->as(); + std::vector childRanges; + std::vector childSizes; + expandRepeatedRanges( + arrayVector, + arrayVector->rawOffsets(), + arrayVector->rawSizes(), + ranges, + sizes, + &childRanges, + &childSizes); + estimateSerializedSizeImpl( + arrayVector->elements(), childRanges, childSizes.data(), scratch); + break; + } + case VectorEncoding::Simple::LAZY: + estimateSerializedSizeImpl( + vector->as()->loadedVectorShared(), + ranges, + sizes, + scratch); + break; + default: + VELOX_CHECK( + false, "Unsupported vector encoding {}", vector->encoding()); + } + } + memory::MemoryPool* pool_; const std::unique_ptr codec_; SerdeOpts opts_; diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index fedf8d33867b..951d89b393a7 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -1528,3 +1528,345 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) { testRoundTripSingleColumn(data); } } + +class PrestoSerializerBatchEstimateSizeTest : public testing::Test, + public VectorTestBase { + protected: + static void SetUpTestSuite() { + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } + + memory::MemoryManager::testingSetInstance({}); + } + + void SetUp() override { + serde_ = std::make_unique(); + serializer_ = serde_->createBatchSerializer(pool_.get(), ¶mOptions_); + } + + void testEstimateSerializedSize( + VectorPtr input, + const std::vector& ranges, + const std::vector& expectedSizes) { + ASSERT_EQ(ranges.size(), expectedSizes.size()); + + // Wrap the input a RowVector to better emulate production. + auto row = makeRowVector({input}); + + std::vector sizes(ranges.size(), 0); + std::vector sizesPtrs(ranges.size()); + for (int i = 0; i < ranges.size(); i++) { + sizesPtrs[i] = &sizes[i]; + } + + Scratch scratch; + serializer_->estimateSerializedSize(row, ranges, sizesPtrs.data(), scratch); + + for (int i = 0; i < expectedSizes.size(); i++) { + // Add 4 bytes for each row in the wrapper. This is needed because we wrap + // the input in a RowVector. + ASSERT_EQ(sizes[i], expectedSizes[i] + 4 * ranges[i].size) + << "Mismatched estimated size for range" << i << " " + << ranges[i].begin << ":" << ranges[i].size; + } + } + + void testEstimateSerializedSize( + const VectorPtr& vector, + vector_size_t totalExpectedSize) { + // The whole Vector is a single range. + testEstimateSerializedSize( + vector, {{0, vector->size()}}, {totalExpectedSize}); + // Split the Vector into two equal ranges. + testEstimateSerializedSize( + vector, + {{0, vector->size() / 2}, {vector->size() / 2, vector->size() / 2}}, + {totalExpectedSize / 2, totalExpectedSize / 2}); + // Split the Vector into three ranges of 1/4, 1/2, 1/4. + testEstimateSerializedSize( + vector, + {{0, vector->size() / 4}, + {vector->size() / 4, vector->size() / 2}, + {vector->size() * 3 / 4, vector->size() / 4}}, + {totalExpectedSize / 4, totalExpectedSize / 2, totalExpectedSize / 4}); + } + + std::unique_ptr serde_; + std::unique_ptr serializer_; + serializer::presto::PrestoVectorSerde::PrestoOptions paramOptions_; +}; + +TEST_F(PrestoSerializerBatchEstimateSizeTest, flat) { + auto flatBoolVector = + makeFlatVector(32, [](vector_size_t row) { return row % 2 == 0; }); + + // Bools are 1 byte each. + // 32 * 1 = 32 + testEstimateSerializedSize(flatBoolVector, 32); + + auto flatIntVector = + makeFlatVector(32, [](vector_size_t row) { return row; }); + + // Ints are 4 bytes each. + // 4 * 32 = 128 + testEstimateSerializedSize(flatIntVector, 128); + + auto flatDoubleVector = + makeFlatVector(32, [](vector_size_t row) { return row; }); + + // Doubles are 8 bytes each. + // 8 * 32 = 256 + testEstimateSerializedSize(flatDoubleVector, 256); + + auto flatStringVector = makeFlatVector( + 32, [](vector_size_t row) { return fmt::format("{}", row); }); + + // Strings are variable length, the first 10 are 1 byte each, the rest are 2 + // bytes. Plus 4 bytes for the length of each string. + // 10 * 1 + 22 * 2 + 4 * 32 = 182 + testEstimateSerializedSize(flatStringVector, {{0, 32}}, {182}); + testEstimateSerializedSize(flatStringVector, {{0, 16}, {16, 16}}, {86, 96}); + testEstimateSerializedSize( + flatStringVector, {{0, 8}, {8, 16}, {24, 8}}, {40, 94, 48}); + + auto flatVectorWithNulls = makeFlatVector( + 32, + [](vector_size_t row) { return row; }, + [](vector_size_t row) { return row % 4 == 0; }); + + // Doubles are 8 bytes each, and only non-null doubles are counted. In + // addition there's a null bit per row. + // 8 * 24 + (32 / 8) = 196 + testEstimateSerializedSize(flatVectorWithNulls, {{0, 32}}, {196}); + testEstimateSerializedSize( + flatVectorWithNulls, {{0, 16}, {16, 16}}, {98, 98}); + testEstimateSerializedSize( + flatVectorWithNulls, {{0, 8}, {8, 16}, {24, 8}}, {49, 98, 49}); +} + +TEST_F(PrestoSerializerBatchEstimateSizeTest, array) { + std::vector offsets{ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30}; + auto elements = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto arrayVector = makeArrayVector(offsets, elements); + + // The ints in the array are 4 bytes each, and the array length is another 4 + // bytes per row. + // 4 * 32 + 4 * 16 = 192 + testEstimateSerializedSize(arrayVector, 192); + + std::vector offsetsWithEmptyOrNulls{ + 0, 4, 4, 8, 8, 12, 12, 16, 16, 20, 20, 24, 24, 28, 28, 32}; + auto arrayVectorWithEmptyArrays = + makeArrayVector(offsetsWithEmptyOrNulls, elements); + + // The ints in the array are 4 bytes each, and the array length is another 4 + // bytes per row. + // 4 * 32 + 4 * 16 = 192 + testEstimateSerializedSize(arrayVectorWithEmptyArrays, 192); + + std::vector nullOffsets{1, 3, 5, 7, 9, 11, 13, 15}; + auto arrayVectorWithNulls = + makeArrayVector(offsetsWithEmptyOrNulls, elements, nullOffsets); + + // The ints in the array are 4 bytes each, and the array length is another 4 + // bytes per non-null row, and 1 null bit per row. + // 4 * 32 + 4 * 8 + 16 / 8 = 162 + testEstimateSerializedSize(arrayVectorWithNulls, {{0, 16}}, {162}); + testEstimateSerializedSize(arrayVectorWithNulls, {{0, 8}, {8, 8}}, {81, 81}); + testEstimateSerializedSize( + arrayVectorWithNulls, {{0, 4}, {4, 8}, {8, 4}}, {41, 81, 41}); +} + +TEST_F(PrestoSerializerBatchEstimateSizeTest, map) { + std::vector offsets{ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30}; + auto keys = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto values = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto mapVector = makeMapVector(offsets, keys, values); + + // The ints in the map are 4 bytes each, the doubles are 8 bytes, and the map + // length is another 4 bytes per row. + // 4 * 32 + 8 * 32 + 4 * 16 = 448 + testEstimateSerializedSize(mapVector, 448); + + std::vector offsetsWithEmptyOrNulls{ + 0, 4, 4, 8, 8, 12, 12, 16, 16, 20, 20, 24, 24, 28, 28, 32}; + auto mapVectorWithEmptyMaps = + makeMapVector(offsetsWithEmptyOrNulls, keys, values); + + // The ints in the map are 4 bytes each, the doubles are 8 bytes, and the map + // length is another 4 bytes per row. + // 4 * 32 + 8 * 32 + 4 * 16 = 448 + testEstimateSerializedSize(mapVectorWithEmptyMaps, 448); + + std::vector nullOffsets{1, 3, 5, 7, 9, 11, 13, 15}; + auto mapVectorWithNulls = + makeMapVector(offsetsWithEmptyOrNulls, keys, values, nullOffsets); + + // The ints in the map are 4 bytes each, the doubles are 8 bytes, and the map + // length is another 4 bytes per non-null row, and 1 null bit per row. + // 4 * 32 + 8 * 32 + 4 * 8 + 16 / 8 = 216 + testEstimateSerializedSize(mapVectorWithNulls, {{0, 16}}, {418}); + testEstimateSerializedSize(mapVectorWithNulls, {{0, 8}, {8, 8}}, {209, 209}); + testEstimateSerializedSize( + mapVectorWithNulls, {{0, 4}, {4, 8}, {12, 4}}, {105, 209, 105}); +} + +TEST_F(PrestoSerializerBatchEstimateSizeTest, row) { + auto field1 = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto field2 = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto rowVector = makeRowVector({field1, field2}); + + // The ints in the row are 4 bytes each, the doubles are 8 bytes, and the + // offsets are 4 bytes per row. + // 4 * 32 + 8 * 32 + 4 * 32 = 512 + testEstimateSerializedSize(rowVector, 512); + + auto rowVectorWithNulls = + makeRowVector({field1, field2}, [](auto row) { return row % 4 == 0; }); + + // The ints in the row are 4 bytes each, the doubles are 8 bytes, and the + // offsets are 4 bytes per row, and 1 null bit per row. + // 4 * 24 + 8 * 24 + 4 * 32 + 32 / 8 = 420 + testEstimateSerializedSize(rowVectorWithNulls, 420); +} + +TEST_F(PrestoSerializerBatchEstimateSizeTest, constant) { + auto flatVector = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto constantInt = BaseVector::wrapInConstant(32, 0, flatVector); + + // The single constant int is 4 bytes. + testEstimateSerializedSize(constantInt, {{0, 32}}, {4}); + testEstimateSerializedSize(constantInt, {{0, 16}, {16, 16}}, {4, 4}); + testEstimateSerializedSize( + constantInt, {{0, 8}, {8, 16}, {24, 8}}, {4, 4, 4}); + + auto nullConstant = BaseVector::createNullConstant(BIGINT(), 32, pool_.get()); + + // The single constant null is 1 byte (for the bit mask). + testEstimateSerializedSize(nullConstant, {{0, 32}}, {1}); + testEstimateSerializedSize(nullConstant, {{0, 16}, {16, 16}}, {1, 1}); + testEstimateSerializedSize( + nullConstant, {{0, 8}, {8, 16}, {24, 8}}, {1, 1, 1}); + + std::vector offsets{ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30}; + auto arrayVector = makeArrayVector(offsets, flatVector); + auto constantArray = BaseVector::wrapInConstant(32, 0, arrayVector); + + // The single constant array is 4 bytes for the length, and 4 bytes for each + // of the 2 integer elements. + // 4 + 2 * 4 = 12 + testEstimateSerializedSize(constantArray, {{0, 32}}, {12}); + testEstimateSerializedSize(constantArray, {{0, 16}, {16, 16}}, {12, 12}); + testEstimateSerializedSize( + constantArray, {{0, 8}, {8, 16}, {24, 8}}, {12, 12, 12}); + + auto arrayVectorWithConstantElements = makeArrayVector(offsets, constantInt); + auto constantArrayWithConstantElements = + BaseVector::wrapInConstant(32, 0, arrayVectorWithConstantElements); + + // The single constant array is 4 bytes for the length, and 4 bytes for each + // of the 2 integer elements (encodings for children of encoded complex types + // are not currently preserved). + // 4 + 2 * 4 = 12 + testEstimateSerializedSize( + constantArrayWithConstantElements, {{0, 32}}, {12}); + testEstimateSerializedSize( + constantArrayWithConstantElements, {{0, 16}, {16, 16}}, {12, 12}); + testEstimateSerializedSize( + constantArrayWithConstantElements, + {{0, 8}, {8, 16}, {24, 8}}, + {12, 12, 12}); +} + +TEST_F(PrestoSerializerBatchEstimateSizeTest, dictionary) { + auto indices = makeIndices(32, [](auto row) { return (row * 2) % 32; }); + auto flatVector = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto dictionaryInts = + BaseVector::wrapInDictionary(nullptr, indices, 32, flatVector); + + // The indices are 4 bytes, and the dictionary entries are 4 bytes each. + // 4 * 32 + 4 * 16 = 192 + testEstimateSerializedSize(dictionaryInts, {{0, 32}}, {192}); + testEstimateSerializedSize(dictionaryInts, {{0, 16}, {16, 16}}, {128, 128}); + testEstimateSerializedSize( + dictionaryInts, {{0, 8}, {8, 16}, {24, 8}}, {64, 128, 64}); + + auto flatVectorWithNulls = makeFlatVector( + 32, + [](vector_size_t row) { return row; }, + [](vector_size_t row) { return row % 4 == 0; }); + auto dictionaryNullElements = + BaseVector::wrapInDictionary(nullptr, indices, 32, flatVectorWithNulls); + + // The indices are 4 bytes, half the dictionary entries are 8 byte doubles. + // Note that the bytes for the null bits in the entries are not accounted for, + // this is a limitation of having non-contiguous ranges selected from the + // dictionary values. + // 4 * 32 + 8 * 8 = 192 + testEstimateSerializedSize(dictionaryNullElements, {{0, 32}}, {192}); + testEstimateSerializedSize( + dictionaryNullElements, {{0, 16}, {16, 16}}, {128, 128}); + testEstimateSerializedSize( + dictionaryNullElements, {{0, 8}, {8, 16}, {24, 8}}, {64, 128, 64}); + + auto arrayIndices = makeIndices(16, [](auto row) { return (row * 2) % 16; }); + std::vector offsets{ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30}; + auto arrayVector = makeArrayVector(offsets, flatVector); + auto dictionaryArray = + BaseVector::wrapInDictionary(nullptr, arrayIndices, 16, arrayVector); + + // The indices are 4 bytes, and the dictionary entries are 4 bytes length + 4 + // bytes for each of the 2 array elements. + // 4 * 16 + 4 * 8 + 2 * 8 * 4 = 160 + testEstimateSerializedSize(dictionaryArray, {{0, 16}}, {160}); + testEstimateSerializedSize(dictionaryArray, {{0, 8}, {8, 8}}, {128, 128}); + testEstimateSerializedSize( + dictionaryArray, {{0, 4}, {4, 8}, {12, 4}}, {64, 128, 64}); + + auto constantInt = BaseVector::wrapInConstant(32, 0, flatVector); + auto arrayVectorWithConstantElements = makeArrayVector(offsets, constantInt); + auto dictionaryArrayWithConstantElements = BaseVector::wrapInDictionary( + nullptr, arrayIndices, 16, arrayVectorWithConstantElements); + + // The indices are 4 bytes, and the dictionary entries are 4 bytes length + 4 + // bytes for each of the 2 array elements (encodings for children of encoded + // complex types are not currently preserved). + // 4 * 16 + 4 * 8 + 2 * 8 * 4 = 160 + testEstimateSerializedSize( + dictionaryArrayWithConstantElements, {{0, 16}}, {160}); + testEstimateSerializedSize( + dictionaryArrayWithConstantElements, {{0, 8}, {8, 8}}, {128, 128}); + testEstimateSerializedSize( + dictionaryArrayWithConstantElements, + {{0, 4}, {4, 8}, {12, 4}}, + {64, 128, 64}); + + auto dictionaryWithNulls = BaseVector::wrapInDictionary( + makeNulls(32, [](auto row) { return row % 2 == 0; }), + indices, + 32, + flatVector); + + // When nulls are present in the dictionary, currently we flatten the data. So + // there are 4 bytes per row. Null bits are only accounted for the null + // elements because the non-null elements in the wrapped vector or + // non-contiguous. + // 4 * 16 + 16 / 8 = 66 + testEstimateSerializedSize(dictionaryWithNulls, {{0, 32}}, {66}); + testEstimateSerializedSize( + dictionaryWithNulls, {{0, 16}, {16, 16}}, {33, 33}); + testEstimateSerializedSize( + dictionaryWithNulls, {{0, 8}, {8, 16}, {24, 8}}, {17, 33, 17}); +} diff --git a/velox/vector/VectorStream.cpp b/velox/vector/VectorStream.cpp index a6fdefec8fc5..119611047146 100644 --- a/velox/vector/VectorStream.cpp +++ b/velox/vector/VectorStream.cpp @@ -47,6 +47,14 @@ class DefaultBatchVectorSerializer : public BatchVectorSerializer { serializer->flush(stream); } + void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) override { + serde_->estimateSerializedSize(vector.get(), ranges, sizes, scratch); + } + private: memory::MemoryPool* const pool_; VectorSerde* const serde_; diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index a54f60bded6b..b8304bdc2e28 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -125,6 +125,12 @@ class BatchVectorSerializer { serialize(vector, ranges, scratch, stream); } + virtual void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) = 0; + /// Serializes all rows in a vector. void serialize(const RowVectorPtr& vector, OutputStream* stream); };