diff --git a/velox/functions/prestosql/aggregates/tests/SumDataSizeForStatsTest.cpp b/velox/functions/prestosql/aggregates/tests/SumDataSizeForStatsTest.cpp index d3d9d5f89915..eab51e9e6a66 100644 --- a/velox/functions/prestosql/aggregates/tests/SumDataSizeForStatsTest.cpp +++ b/velox/functions/prestosql/aggregates/tests/SumDataSizeForStatsTest.cpp @@ -217,7 +217,7 @@ TEST_F(SumDataSizeForStatsTest, complexRecursiveGlobalAggregate) { }), })}; - testAggregations(vectors, {}, {"sum_data_size_for_stats(c0)"}, "SELECT 118"); + testAggregations(vectors, {}, {"sum_data_size_for_stats(c0)"}, "SELECT 115"); } TEST_F(SumDataSizeForStatsTest, constantEncodingTest) { @@ -269,10 +269,10 @@ TEST_F(SumDataSizeForStatsTest, dictionaryEncodingTest) { BaseVector::wrapInDictionary(nullptr, indices, size, columnTwo); auto vectors = {makeRowVector({columnOne, columnTwoDictionaryEncoded})}; - testAggregations(vectors, {}, {"sum_data_size_for_stats(c1)"}, "SELECT 118"); + testAggregations(vectors, {}, {"sum_data_size_for_stats(c1)"}, "SELECT 115"); testAggregations( - vectors, {"c0"}, {"sum_data_size_for_stats(c1)"}, "VALUES (1,82),(2,36)"); + vectors, {"c0"}, {"sum_data_size_for_stats(c1)"}, "VALUES (1,79),(2,36)"); } TEST_F(SumDataSizeForStatsTest, mask) { diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index 97b8cd52e100..602301f82599 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -2069,6 +2069,30 @@ static inline int32_t rangesTotalSize( return total; } +template +vector_size_t computeSelectedIndices( + const DictionaryVector* dictionaryVector, + const folly::Range& ranges, + Scratch& scratch, + vector_size_t* selectedIndices) { + // Create a bit set to track which values in the Dictionary are used. + ScratchPtr usedIndicesHolder(scratch); + auto* usedIndices = usedIndicesHolder.get( + bits::nwords(dictionaryVector->valueVector()->size())); + simd::memset(usedIndices, 0, usedIndicesHolder.size() * sizeof(uint64_t)); + + auto* indices = dictionaryVector->indices()->template as(); + for (const auto& range : ranges) { + for (auto i = 0; i < range.size; ++i) { + bits::setBit(usedIndices, indices[range.begin + i]); + } + } + + // Convert the bitset to a list of the used indices. + return simd::indicesOfSetBits( + usedIndices, 0, dictionaryVector->valueVector()->size(), selectedIndices); +} + template void serializeDictionaryVector( const VectorPtr& vector, @@ -2082,9 +2106,11 @@ void serializeDictionaryVector( return; } + auto numRows = rangesTotalSize(ranges); + // Cannot serialize dictionary as PrestoPage dictionary if it has nulls. if (vector->nulls()) { - stream->flattenStream(vector, rangesTotalSize(ranges)); + stream->flattenStream(vector, numRows); serializeWrapped(vector, ranges, stream, scratch); return; } @@ -2092,27 +2118,11 @@ void serializeDictionaryVector( using T = typename KindToFlatVector::WrapperType; auto dictionaryVector = vector->as>(); - // Create a bit set to track which values in the Dictionary are used. - ScratchPtr usedIndicesHolder(scratch); - auto* usedIndices = usedIndicesHolder.get( - bits::nwords(dictionaryVector->valueVector()->size())); - simd::memset(usedIndices, 0, usedIndicesHolder.size() * sizeof(uint64_t)); - - auto* indices = dictionaryVector->indices()->template as(); - vector_size_t numRows = 0; - for (const auto& range : ranges) { - numRows += range.size; - for (auto i = 0; i < range.size; ++i) { - bits::setBit(usedIndices, indices[range.begin + i]); - } - } - - // Convert the bitset to a list of the used indices. ScratchPtr selectedIndicesHolder(scratch); auto* mutableSelectedIndices = selectedIndicesHolder.get(dictionaryVector->valueVector()->size()); - auto numUsed = simd::indicesOfSetBits( - usedIndices, 0, selectedIndicesHolder.size(), mutableSelectedIndices); + auto numUsed = computeSelectedIndices( + dictionaryVector, ranges, scratch, mutableSelectedIndices); // If the values are fixed width and we aren't getting enough reuse to justify // the dictionary, flatten it. @@ -2158,6 +2168,7 @@ void serializeDictionaryVector( // Write out the indices, translating them using the above mapping. stream->appendNonNull(numRows); + auto* indices = dictionaryVector->indices()->template as(); for (const auto& range : ranges) { for (auto i = 0; i < range.size; ++i) { stream->appendOne(updatedIndices[indices[range.begin + i]]); @@ -2358,6 +2369,10 @@ int32_t rowsToRanges( auto ranges = rangesHolder.get(numInner); int32_t fill = 0; for (auto i = 0; i < numInner; ++i) { + // Add the size of the length. + if (sizesPtr) { + *sizesPtr[rawNulls ? nonNullRows[i] : i] += sizeof(int32_t); + } if (sizes[innerRows[i]] == 0) { continue; } @@ -2904,14 +2919,23 @@ void expandRepeatedRanges( for (int32_t i = 0; i < ranges.size(); ++i) { int32_t begin = ranges[i].begin; int32_t end = begin + ranges[i].size; - *sizes[i] += sizeof(int32_t); + bool hasNull = false; for (int32_t offset = begin; offset < end; ++offset) { - if (!vector->isNullAt(offset)) { + if (vector->isNullAt(offset)) { + hasNull = true; + } else { + // Add the size of the length. + *sizes[i] += sizeof(int32_t); childRanges->push_back( IndexRange{rawOffsets[offset], rawSizes[offset]}); childSizes->push_back(sizes[i]); } } + + if (hasNull) { + // Add the size of the null bit mask. + *sizes[i] += bits::nbytes(ranges[i].size); + } } } @@ -2926,11 +2950,17 @@ void estimateFlatSerializedSize( for (int32_t i = 0; i < ranges.size(); ++i) { auto end = ranges[i].begin + ranges[i].size; auto numValues = bits::countBits(rawNulls, ranges[i].begin, end); - *(sizes[i]) += - numValues * valueSize + bits::nbytes(ranges[i].size - numValues); + // Add the size of the values. + *(sizes[i]) += numValues * valueSize; + // Add the size of the null bit mask if there are nulls in the range. + if (numValues != ranges[i].size) { + *(sizes[i]) += bits::nbytes(ranges[i].size); + } } } else { for (int32_t i = 0; i < ranges.size(); ++i) { + // Add the size of the values (there's not bit mask since there are no + // nulls). *(sizes[i]) += ranges[i].size * valueSize; } } @@ -3024,7 +3054,7 @@ void estimateWrapperSerializedSize( } template -void estimateConstantSerializedSize( +void estimateFlattenedConstantSerializedSize( const BaseVector* vector, const folly::Range& ranges, vector_size_t** sizes, @@ -3065,7 +3095,7 @@ void estimateSerializedSizeInt( break; case VectorEncoding::Simple::CONSTANT: VELOX_DYNAMIC_TYPE_DISPATCH_ALL( - estimateConstantSerializedSize, + estimateFlattenedConstantSerializedSize, vector->typeKind(), vector, ranges, @@ -3278,7 +3308,7 @@ void estimateWrapperSerializedSize( } template -void estimateConstantSerializedSize( +void estimateFlattenedConstantSerializedSize( const BaseVector* vector, const folly::Range& rows, vector_size_t** sizes, @@ -3333,7 +3363,7 @@ void estimateSerializedSizeInt( } case VectorEncoding::Simple::CONSTANT: VELOX_DYNAMIC_TYPE_DISPATCH_ALL( - estimateConstantSerializedSize, + estimateFlattenedConstantSerializedSize, vector->typeKind(), vector, rows, @@ -3623,6 +3653,91 @@ FlushSizes flushStreams( } } +template +void estimateConstantSerializedSize( + const VectorPtr& vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) { + VELOX_CHECK(vector->encoding() == VectorEncoding::Simple::CONSTANT); + using T = typename KindToFlatVector::WrapperType; + auto constantVector = vector->as>(); + vector_size_t elementSize = 0; + if (constantVector->isNullAt(0)) { + // There's just a bit mask for the one null. + elementSize = 1; + } else if (constantVector->valueVector()) { + std::vector newRanges; + newRanges.push_back({constantVector->index(), 1}); + auto* elementSizePtr = &elementSize; + // In PrestoBatchVectorSerializer we don't preserve the encodings for the + // valueVector for a ConstantVector. + estimateSerializedSizeInt( + constantVector->valueVector().get(), + newRanges, + &elementSizePtr, + scratch); + } else if (std::is_same_v) { + auto value = constantVector->valueAt(0); + auto string = reinterpret_cast(&value); + elementSize = string->size(); + } else { + elementSize = sizeof(T); + } + + for (int32_t i = 0; i < ranges.size(); ++i) { + *sizes[i] += elementSize; + } +} + +template +void estimateDictionarySerializedSize( + const VectorPtr& vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) { + VELOX_CHECK(vector->encoding() == VectorEncoding::Simple::DICTIONARY); + using T = typename KindToFlatVector::WrapperType; + auto dictionaryVector = vector->as>(); + + // We don't currently support serializing DictionaryVectors with nulls, so use + // the flattened size. + if (dictionaryVector->nulls()) { + estimateWrapperSerializedSize(ranges, sizes, vector.get(), scratch); + return; + } + + // This will ultimately get passed to simd::transpose, so it needs to be a + // raw_vector. + raw_vector childIndices; + std::vector childSizes; + for (int rangeIndex = 0; rangeIndex < ranges.size(); rangeIndex++) { + ScratchPtr selectedIndicesHolder(scratch); + auto* mutableSelectedIndices = + selectedIndicesHolder.get(dictionaryVector->valueVector()->size()); + auto numUsed = computeSelectedIndices( + dictionaryVector, + ranges.subpiece(rangeIndex, 1), + scratch, + mutableSelectedIndices); + for (int i = 0; i < numUsed; i++) { + childIndices.push_back(mutableSelectedIndices[i]); + childSizes.push_back(sizes[rangeIndex]); + } + + // Add the size of the indices. + *sizes[rangeIndex] += sizeof(int32_t) * ranges[rangeIndex].size; + } + + // In PrestoBatchVectorSerializer we don't preserve the encodings for the + // valueVector for a DictionaryVector. + estimateSerializedSizeInt( + dictionaryVector->valueVector().get(), + childIndices, + childSizes.data(), + scratch); +} + class PrestoBatchVectorSerializer : public BatchVectorSerializer { public: PrestoBatchVectorSerializer(memory::MemoryPool* pool, const SerdeOpts& opts) @@ -3659,7 +3774,144 @@ class PrestoBatchVectorSerializer : public BatchVectorSerializer { streams, numRows, arena, *codec_, opts_.minCompressionRatio, stream); } + void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) override { + estimateSerializedSizeImpl(vector, ranges, sizes, scratch); + } + private: + void estimateSerializedSizeImpl( + const VectorPtr& vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) { + switch (vector->encoding()) { + case VectorEncoding::Simple::FLAT: + VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL( + estimateFlatSerializedSize, + vector->typeKind(), + vector.get(), + ranges, + sizes); + break; + case VectorEncoding::Simple::CONSTANT: + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + estimateConstantSerializedSize, + vector->typeKind(), + vector, + ranges, + sizes, + scratch); + break; + case VectorEncoding::Simple::DICTIONARY: + VELOX_DYNAMIC_TYPE_DISPATCH_ALL( + estimateDictionarySerializedSize, + vector->typeKind(), + vector, + ranges, + sizes, + scratch); + break; + case VectorEncoding::Simple::ROW: { + if (!vector->mayHaveNulls()) { + // Add the size of the offsets in the Row encoding. + for (int32_t i = 0; i < ranges.size(); ++i) { + *sizes[i] += ranges[i].size * sizeof(int32_t); + } + + auto rowVector = vector->as(); + auto& children = rowVector->children(); + for (auto& child : children) { + if (child) { + estimateSerializedSizeImpl(child, ranges, sizes, scratch); + } + } + + break; + } + + std::vector childRanges; + std::vector childSizes; + for (int32_t i = 0; i < ranges.size(); ++i) { + // Add the size of the nulls bit mask. + *sizes[i] += bits::nbytes(ranges[i].size); + + auto begin = ranges[i].begin; + auto end = begin + ranges[i].size; + for (auto offset = begin; offset < end; ++offset) { + // Add the size of the offset. + *sizes[i] += sizeof(int32_t); + if (!vector->isNullAt(offset)) { + childRanges.push_back(IndexRange{offset, 1}); + childSizes.push_back(sizes[i]); + } + } + } + + auto rowVector = vector->as(); + auto& children = rowVector->children(); + for (auto& child : children) { + if (child) { + estimateSerializedSizeImpl( + child, + folly::Range(childRanges.data(), childRanges.size()), + childSizes.data(), + scratch); + } + } + + break; + } + case VectorEncoding::Simple::MAP: { + auto mapVector = vector->as(); + std::vector childRanges; + std::vector childSizes; + expandRepeatedRanges( + mapVector, + mapVector->rawOffsets(), + mapVector->rawSizes(), + ranges, + sizes, + &childRanges, + &childSizes); + estimateSerializedSizeImpl( + mapVector->mapKeys(), childRanges, childSizes.data(), scratch); + estimateSerializedSizeImpl( + mapVector->mapValues(), childRanges, childSizes.data(), scratch); + break; + } + case VectorEncoding::Simple::ARRAY: { + auto arrayVector = vector->as(); + std::vector childRanges; + std::vector childSizes; + expandRepeatedRanges( + arrayVector, + arrayVector->rawOffsets(), + arrayVector->rawSizes(), + ranges, + sizes, + &childRanges, + &childSizes); + estimateSerializedSizeImpl( + arrayVector->elements(), childRanges, childSizes.data(), scratch); + break; + } + case VectorEncoding::Simple::LAZY: + estimateSerializedSizeImpl( + vector->as()->loadedVectorShared(), + ranges, + sizes, + scratch); + break; + default: + VELOX_CHECK( + false, "Unsupported vector encoding {}", vector->encoding()); + } + } + memory::MemoryPool* pool_; const std::unique_ptr codec_; SerdeOpts opts_; diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index fedf8d33867b..951d89b393a7 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -1528,3 +1528,345 @@ TEST_F(PrestoSerializerTest, deserializeSingleColumn) { testRoundTripSingleColumn(data); } } + +class PrestoSerializerBatchEstimateSizeTest : public testing::Test, + public VectorTestBase { + protected: + static void SetUpTestSuite() { + if (!isRegisteredVectorSerde()) { + serializer::presto::PrestoVectorSerde::registerVectorSerde(); + } + + memory::MemoryManager::testingSetInstance({}); + } + + void SetUp() override { + serde_ = std::make_unique(); + serializer_ = serde_->createBatchSerializer(pool_.get(), ¶mOptions_); + } + + void testEstimateSerializedSize( + VectorPtr input, + const std::vector& ranges, + const std::vector& expectedSizes) { + ASSERT_EQ(ranges.size(), expectedSizes.size()); + + // Wrap the input a RowVector to better emulate production. + auto row = makeRowVector({input}); + + std::vector sizes(ranges.size(), 0); + std::vector sizesPtrs(ranges.size()); + for (int i = 0; i < ranges.size(); i++) { + sizesPtrs[i] = &sizes[i]; + } + + Scratch scratch; + serializer_->estimateSerializedSize(row, ranges, sizesPtrs.data(), scratch); + + for (int i = 0; i < expectedSizes.size(); i++) { + // Add 4 bytes for each row in the wrapper. This is needed because we wrap + // the input in a RowVector. + ASSERT_EQ(sizes[i], expectedSizes[i] + 4 * ranges[i].size) + << "Mismatched estimated size for range" << i << " " + << ranges[i].begin << ":" << ranges[i].size; + } + } + + void testEstimateSerializedSize( + const VectorPtr& vector, + vector_size_t totalExpectedSize) { + // The whole Vector is a single range. + testEstimateSerializedSize( + vector, {{0, vector->size()}}, {totalExpectedSize}); + // Split the Vector into two equal ranges. + testEstimateSerializedSize( + vector, + {{0, vector->size() / 2}, {vector->size() / 2, vector->size() / 2}}, + {totalExpectedSize / 2, totalExpectedSize / 2}); + // Split the Vector into three ranges of 1/4, 1/2, 1/4. + testEstimateSerializedSize( + vector, + {{0, vector->size() / 4}, + {vector->size() / 4, vector->size() / 2}, + {vector->size() * 3 / 4, vector->size() / 4}}, + {totalExpectedSize / 4, totalExpectedSize / 2, totalExpectedSize / 4}); + } + + std::unique_ptr serde_; + std::unique_ptr serializer_; + serializer::presto::PrestoVectorSerde::PrestoOptions paramOptions_; +}; + +TEST_F(PrestoSerializerBatchEstimateSizeTest, flat) { + auto flatBoolVector = + makeFlatVector(32, [](vector_size_t row) { return row % 2 == 0; }); + + // Bools are 1 byte each. + // 32 * 1 = 32 + testEstimateSerializedSize(flatBoolVector, 32); + + auto flatIntVector = + makeFlatVector(32, [](vector_size_t row) { return row; }); + + // Ints are 4 bytes each. + // 4 * 32 = 128 + testEstimateSerializedSize(flatIntVector, 128); + + auto flatDoubleVector = + makeFlatVector(32, [](vector_size_t row) { return row; }); + + // Doubles are 8 bytes each. + // 8 * 32 = 256 + testEstimateSerializedSize(flatDoubleVector, 256); + + auto flatStringVector = makeFlatVector( + 32, [](vector_size_t row) { return fmt::format("{}", row); }); + + // Strings are variable length, the first 10 are 1 byte each, the rest are 2 + // bytes. Plus 4 bytes for the length of each string. + // 10 * 1 + 22 * 2 + 4 * 32 = 182 + testEstimateSerializedSize(flatStringVector, {{0, 32}}, {182}); + testEstimateSerializedSize(flatStringVector, {{0, 16}, {16, 16}}, {86, 96}); + testEstimateSerializedSize( + flatStringVector, {{0, 8}, {8, 16}, {24, 8}}, {40, 94, 48}); + + auto flatVectorWithNulls = makeFlatVector( + 32, + [](vector_size_t row) { return row; }, + [](vector_size_t row) { return row % 4 == 0; }); + + // Doubles are 8 bytes each, and only non-null doubles are counted. In + // addition there's a null bit per row. + // 8 * 24 + (32 / 8) = 196 + testEstimateSerializedSize(flatVectorWithNulls, {{0, 32}}, {196}); + testEstimateSerializedSize( + flatVectorWithNulls, {{0, 16}, {16, 16}}, {98, 98}); + testEstimateSerializedSize( + flatVectorWithNulls, {{0, 8}, {8, 16}, {24, 8}}, {49, 98, 49}); +} + +TEST_F(PrestoSerializerBatchEstimateSizeTest, array) { + std::vector offsets{ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30}; + auto elements = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto arrayVector = makeArrayVector(offsets, elements); + + // The ints in the array are 4 bytes each, and the array length is another 4 + // bytes per row. + // 4 * 32 + 4 * 16 = 192 + testEstimateSerializedSize(arrayVector, 192); + + std::vector offsetsWithEmptyOrNulls{ + 0, 4, 4, 8, 8, 12, 12, 16, 16, 20, 20, 24, 24, 28, 28, 32}; + auto arrayVectorWithEmptyArrays = + makeArrayVector(offsetsWithEmptyOrNulls, elements); + + // The ints in the array are 4 bytes each, and the array length is another 4 + // bytes per row. + // 4 * 32 + 4 * 16 = 192 + testEstimateSerializedSize(arrayVectorWithEmptyArrays, 192); + + std::vector nullOffsets{1, 3, 5, 7, 9, 11, 13, 15}; + auto arrayVectorWithNulls = + makeArrayVector(offsetsWithEmptyOrNulls, elements, nullOffsets); + + // The ints in the array are 4 bytes each, and the array length is another 4 + // bytes per non-null row, and 1 null bit per row. + // 4 * 32 + 4 * 8 + 16 / 8 = 162 + testEstimateSerializedSize(arrayVectorWithNulls, {{0, 16}}, {162}); + testEstimateSerializedSize(arrayVectorWithNulls, {{0, 8}, {8, 8}}, {81, 81}); + testEstimateSerializedSize( + arrayVectorWithNulls, {{0, 4}, {4, 8}, {8, 4}}, {41, 81, 41}); +} + +TEST_F(PrestoSerializerBatchEstimateSizeTest, map) { + std::vector offsets{ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30}; + auto keys = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto values = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto mapVector = makeMapVector(offsets, keys, values); + + // The ints in the map are 4 bytes each, the doubles are 8 bytes, and the map + // length is another 4 bytes per row. + // 4 * 32 + 8 * 32 + 4 * 16 = 448 + testEstimateSerializedSize(mapVector, 448); + + std::vector offsetsWithEmptyOrNulls{ + 0, 4, 4, 8, 8, 12, 12, 16, 16, 20, 20, 24, 24, 28, 28, 32}; + auto mapVectorWithEmptyMaps = + makeMapVector(offsetsWithEmptyOrNulls, keys, values); + + // The ints in the map are 4 bytes each, the doubles are 8 bytes, and the map + // length is another 4 bytes per row. + // 4 * 32 + 8 * 32 + 4 * 16 = 448 + testEstimateSerializedSize(mapVectorWithEmptyMaps, 448); + + std::vector nullOffsets{1, 3, 5, 7, 9, 11, 13, 15}; + auto mapVectorWithNulls = + makeMapVector(offsetsWithEmptyOrNulls, keys, values, nullOffsets); + + // The ints in the map are 4 bytes each, the doubles are 8 bytes, and the map + // length is another 4 bytes per non-null row, and 1 null bit per row. + // 4 * 32 + 8 * 32 + 4 * 8 + 16 / 8 = 216 + testEstimateSerializedSize(mapVectorWithNulls, {{0, 16}}, {418}); + testEstimateSerializedSize(mapVectorWithNulls, {{0, 8}, {8, 8}}, {209, 209}); + testEstimateSerializedSize( + mapVectorWithNulls, {{0, 4}, {4, 8}, {12, 4}}, {105, 209, 105}); +} + +TEST_F(PrestoSerializerBatchEstimateSizeTest, row) { + auto field1 = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto field2 = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto rowVector = makeRowVector({field1, field2}); + + // The ints in the row are 4 bytes each, the doubles are 8 bytes, and the + // offsets are 4 bytes per row. + // 4 * 32 + 8 * 32 + 4 * 32 = 512 + testEstimateSerializedSize(rowVector, 512); + + auto rowVectorWithNulls = + makeRowVector({field1, field2}, [](auto row) { return row % 4 == 0; }); + + // The ints in the row are 4 bytes each, the doubles are 8 bytes, and the + // offsets are 4 bytes per row, and 1 null bit per row. + // 4 * 24 + 8 * 24 + 4 * 32 + 32 / 8 = 420 + testEstimateSerializedSize(rowVectorWithNulls, 420); +} + +TEST_F(PrestoSerializerBatchEstimateSizeTest, constant) { + auto flatVector = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto constantInt = BaseVector::wrapInConstant(32, 0, flatVector); + + // The single constant int is 4 bytes. + testEstimateSerializedSize(constantInt, {{0, 32}}, {4}); + testEstimateSerializedSize(constantInt, {{0, 16}, {16, 16}}, {4, 4}); + testEstimateSerializedSize( + constantInt, {{0, 8}, {8, 16}, {24, 8}}, {4, 4, 4}); + + auto nullConstant = BaseVector::createNullConstant(BIGINT(), 32, pool_.get()); + + // The single constant null is 1 byte (for the bit mask). + testEstimateSerializedSize(nullConstant, {{0, 32}}, {1}); + testEstimateSerializedSize(nullConstant, {{0, 16}, {16, 16}}, {1, 1}); + testEstimateSerializedSize( + nullConstant, {{0, 8}, {8, 16}, {24, 8}}, {1, 1, 1}); + + std::vector offsets{ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30}; + auto arrayVector = makeArrayVector(offsets, flatVector); + auto constantArray = BaseVector::wrapInConstant(32, 0, arrayVector); + + // The single constant array is 4 bytes for the length, and 4 bytes for each + // of the 2 integer elements. + // 4 + 2 * 4 = 12 + testEstimateSerializedSize(constantArray, {{0, 32}}, {12}); + testEstimateSerializedSize(constantArray, {{0, 16}, {16, 16}}, {12, 12}); + testEstimateSerializedSize( + constantArray, {{0, 8}, {8, 16}, {24, 8}}, {12, 12, 12}); + + auto arrayVectorWithConstantElements = makeArrayVector(offsets, constantInt); + auto constantArrayWithConstantElements = + BaseVector::wrapInConstant(32, 0, arrayVectorWithConstantElements); + + // The single constant array is 4 bytes for the length, and 4 bytes for each + // of the 2 integer elements (encodings for children of encoded complex types + // are not currently preserved). + // 4 + 2 * 4 = 12 + testEstimateSerializedSize( + constantArrayWithConstantElements, {{0, 32}}, {12}); + testEstimateSerializedSize( + constantArrayWithConstantElements, {{0, 16}, {16, 16}}, {12, 12}); + testEstimateSerializedSize( + constantArrayWithConstantElements, + {{0, 8}, {8, 16}, {24, 8}}, + {12, 12, 12}); +} + +TEST_F(PrestoSerializerBatchEstimateSizeTest, dictionary) { + auto indices = makeIndices(32, [](auto row) { return (row * 2) % 32; }); + auto flatVector = + makeFlatVector(32, [](vector_size_t row) { return row; }); + auto dictionaryInts = + BaseVector::wrapInDictionary(nullptr, indices, 32, flatVector); + + // The indices are 4 bytes, and the dictionary entries are 4 bytes each. + // 4 * 32 + 4 * 16 = 192 + testEstimateSerializedSize(dictionaryInts, {{0, 32}}, {192}); + testEstimateSerializedSize(dictionaryInts, {{0, 16}, {16, 16}}, {128, 128}); + testEstimateSerializedSize( + dictionaryInts, {{0, 8}, {8, 16}, {24, 8}}, {64, 128, 64}); + + auto flatVectorWithNulls = makeFlatVector( + 32, + [](vector_size_t row) { return row; }, + [](vector_size_t row) { return row % 4 == 0; }); + auto dictionaryNullElements = + BaseVector::wrapInDictionary(nullptr, indices, 32, flatVectorWithNulls); + + // The indices are 4 bytes, half the dictionary entries are 8 byte doubles. + // Note that the bytes for the null bits in the entries are not accounted for, + // this is a limitation of having non-contiguous ranges selected from the + // dictionary values. + // 4 * 32 + 8 * 8 = 192 + testEstimateSerializedSize(dictionaryNullElements, {{0, 32}}, {192}); + testEstimateSerializedSize( + dictionaryNullElements, {{0, 16}, {16, 16}}, {128, 128}); + testEstimateSerializedSize( + dictionaryNullElements, {{0, 8}, {8, 16}, {24, 8}}, {64, 128, 64}); + + auto arrayIndices = makeIndices(16, [](auto row) { return (row * 2) % 16; }); + std::vector offsets{ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30}; + auto arrayVector = makeArrayVector(offsets, flatVector); + auto dictionaryArray = + BaseVector::wrapInDictionary(nullptr, arrayIndices, 16, arrayVector); + + // The indices are 4 bytes, and the dictionary entries are 4 bytes length + 4 + // bytes for each of the 2 array elements. + // 4 * 16 + 4 * 8 + 2 * 8 * 4 = 160 + testEstimateSerializedSize(dictionaryArray, {{0, 16}}, {160}); + testEstimateSerializedSize(dictionaryArray, {{0, 8}, {8, 8}}, {128, 128}); + testEstimateSerializedSize( + dictionaryArray, {{0, 4}, {4, 8}, {12, 4}}, {64, 128, 64}); + + auto constantInt = BaseVector::wrapInConstant(32, 0, flatVector); + auto arrayVectorWithConstantElements = makeArrayVector(offsets, constantInt); + auto dictionaryArrayWithConstantElements = BaseVector::wrapInDictionary( + nullptr, arrayIndices, 16, arrayVectorWithConstantElements); + + // The indices are 4 bytes, and the dictionary entries are 4 bytes length + 4 + // bytes for each of the 2 array elements (encodings for children of encoded + // complex types are not currently preserved). + // 4 * 16 + 4 * 8 + 2 * 8 * 4 = 160 + testEstimateSerializedSize( + dictionaryArrayWithConstantElements, {{0, 16}}, {160}); + testEstimateSerializedSize( + dictionaryArrayWithConstantElements, {{0, 8}, {8, 8}}, {128, 128}); + testEstimateSerializedSize( + dictionaryArrayWithConstantElements, + {{0, 4}, {4, 8}, {12, 4}}, + {64, 128, 64}); + + auto dictionaryWithNulls = BaseVector::wrapInDictionary( + makeNulls(32, [](auto row) { return row % 2 == 0; }), + indices, + 32, + flatVector); + + // When nulls are present in the dictionary, currently we flatten the data. So + // there are 4 bytes per row. Null bits are only accounted for the null + // elements because the non-null elements in the wrapped vector or + // non-contiguous. + // 4 * 16 + 16 / 8 = 66 + testEstimateSerializedSize(dictionaryWithNulls, {{0, 32}}, {66}); + testEstimateSerializedSize( + dictionaryWithNulls, {{0, 16}, {16, 16}}, {33, 33}); + testEstimateSerializedSize( + dictionaryWithNulls, {{0, 8}, {8, 16}, {24, 8}}, {17, 33, 17}); +} diff --git a/velox/vector/VectorStream.cpp b/velox/vector/VectorStream.cpp index a6fdefec8fc5..119611047146 100644 --- a/velox/vector/VectorStream.cpp +++ b/velox/vector/VectorStream.cpp @@ -47,6 +47,14 @@ class DefaultBatchVectorSerializer : public BatchVectorSerializer { serializer->flush(stream); } + void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) override { + serde_->estimateSerializedSize(vector.get(), ranges, sizes, scratch); + } + private: memory::MemoryPool* const pool_; VectorSerde* const serde_; diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index a54f60bded6b..b8304bdc2e28 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -125,6 +125,12 @@ class BatchVectorSerializer { serialize(vector, ranges, scratch, stream); } + virtual void estimateSerializedSize( + VectorPtr vector, + const folly::Range& ranges, + vector_size_t** sizes, + Scratch& scratch) = 0; + /// Serializes all rows in a vector. void serialize(const RowVectorPtr& vector, OutputStream* stream); };