From b067ae1924a532282229e0506188b9375f0ec6d2 Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Fri, 11 Oct 2024 16:00:51 -0700 Subject: [PATCH] Extract common MV ser/de logic into ArraySerDeUtils (#14209) --- .../impl/VarByteChunkForwardIndexWriter.java | 63 ++- .../VarByteChunkForwardIndexWriterV4.java | 59 +-- .../io/writer/impl/VarByteChunkWriter.java | 8 + .../MultiValueFixedByteRawIndexCreator.java | 42 +- .../FixedByteChunkMVForwardIndexReader.java | 64 +-- .../VarByteChunkForwardIndexReaderV4.java | 109 +----- .../VarByteChunkMVForwardIndexReader.java | 101 +---- .../segment/local/utils/ArraySerDeUtils.java | 363 ++++++++++++++++++ 8 files changed, 456 insertions(+), 353 deletions(-) create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ArraySerDeUtils.java diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java index fadcce827e0..0a6bd479256 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.math.BigDecimal; import javax.annotation.concurrent.NotThreadSafe; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.spi.utils.BigDecimalUtils; @@ -97,54 +98,34 @@ public void putBytes(byte[] value) { writeChunkIfNecessary(); } - // Note: some duplication is tolerated between these overloads for the sake of memory efficiency + @Override + public void putIntMV(int[] values) { + putBytes(ArraySerDeUtils.serializeIntArrayWithLength(values)); + } + + @Override + public void putLongMV(long[] values) { + putBytes(ArraySerDeUtils.serializeLongArrayWithLength(values)); + } + + @Override + public void putFloatMV(float[] values) { + putBytes(ArraySerDeUtils.serializeFloatArrayWithLength(values)); + } + + @Override + public void putDoubleMV(double[] values) { + putBytes(ArraySerDeUtils.serializeDoubleArrayWithLength(values)); + } @Override public void putStringMV(String[] values) { - // the entire String[] will be encoded as a single string, write the header here - _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet); - _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; - // write all the strings into the data buffer as if it's a single string, - // but with its own embedded header so offsets to strings within the body - // can be located - _chunkBuffer.putInt(_chunkDataOffSet, values.length); - _chunkDataOffSet += Integer.BYTES; - int headerSize = Integer.BYTES * values.length; - int bodyPosition = _chunkDataOffSet + headerSize; - _chunkBuffer.position(bodyPosition); - int bodySize = 0; - for (int i = 0, h = _chunkDataOffSet; i < values.length; i++, h += Integer.BYTES) { - byte[] utf8 = values[i].getBytes(UTF_8); - _chunkBuffer.putInt(h, utf8.length); - _chunkBuffer.put(utf8); - bodySize += utf8.length; - } - _chunkDataOffSet += headerSize + bodySize; - writeChunkIfNecessary(); + putBytes(ArraySerDeUtils.serializeStringArray(values)); } @Override public void putBytesMV(byte[][] values) { - // the entire byte[][] will be encoded as a single string, write the header here - _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet); - _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; - // write all the byte[]s into the data buffer as if it's a single byte[], - // but with its own embedded header so offsets to byte[]s within the body - // can be located - _chunkBuffer.putInt(_chunkDataOffSet, values.length); - _chunkDataOffSet += Integer.BYTES; - int headerSize = Integer.BYTES * values.length; - int bodyPosition = _chunkDataOffSet + headerSize; - _chunkBuffer.position(bodyPosition); - int bodySize = 0; - for (int i = 0, h = _chunkDataOffSet; i < values.length; i++, h += Integer.BYTES) { - byte[] bytes = values[i]; - _chunkBuffer.putInt(h, bytes.length); - _chunkBuffer.put(bytes); - bodySize += bytes.length; - } - _chunkDataOffSet += headerSize + bodySize; - writeChunkIfNecessary(); + putBytes(ArraySerDeUtils.serializeBytesArray(values)); } private void writeChunkIfNecessary() { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java index 440808a6b0b..e7bc30fc702 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java @@ -30,6 +30,7 @@ import javax.annotation.concurrent.NotThreadSafe; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.compression.ChunkCompressor; import org.apache.pinot.segment.spi.memory.CleanerUtil; @@ -37,8 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.nio.charset.StandardCharsets.UTF_8; - /** * Chunk-based raw (non-dictionary-encoded) forward index writer where each chunk contains variable number of docs, and @@ -145,49 +144,33 @@ public void putBytes(byte[] bytes) { } @Override - public void putStringMV(String[] values) { - // num values + length of each value - int headerSize = Integer.BYTES + Integer.BYTES * values.length; - int size = headerSize; - byte[][] stringBytes = new byte[values.length][]; - for (int i = 0; i < values.length; i++) { - stringBytes[i] = values[i].getBytes(UTF_8); - size += stringBytes[i].length; - } + public void putIntMV(int[] values) { + putBytes(ArraySerDeUtils.serializeIntArrayWithLength(values)); + } - // Format : [numValues][length1][length2]...[lengthN][value1][value2]...[valueN] - byte[] serializedBytes = new byte[size]; - ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes); - byteBuffer.putInt(values.length); - byteBuffer.position(headerSize); - for (int i = 0; i < values.length; i++) { - byteBuffer.putInt((i + 1) * Integer.BYTES, stringBytes[i].length); - byteBuffer.put(stringBytes[i]); - } + @Override + public void putLongMV(long[] values) { + putBytes(ArraySerDeUtils.serializeLongArrayWithLength(values)); + } - putBytes(serializedBytes); + @Override + public void putFloatMV(float[] values) { + putBytes(ArraySerDeUtils.serializeFloatArrayWithLength(values)); } @Override - public void putBytesMV(byte[][] values) { - // num values + length of each value - int headerSize = Integer.BYTES + Integer.BYTES * values.length; - int size = headerSize; - for (byte[] value : values) { - size += value.length; - } + public void putDoubleMV(double[] values) { + putBytes(ArraySerDeUtils.serializeDoubleArrayWithLength(values)); + } - // Format : [numValues][length1][length2]...[lengthN][bytes1][bytes2]...[bytesN] - byte[] serializedBytes = new byte[size]; - ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes); - byteBuffer.putInt(values.length); - byteBuffer.position(headerSize); - for (int i = 0; i < values.length; i++) { - byteBuffer.putInt((i + 1) * Integer.BYTES, values[i].length); - byteBuffer.put(values[i]); - } + @Override + public void putStringMV(String[] values) { + putBytes(ArraySerDeUtils.serializeStringArray(values)); + } - putBytes(serializedBytes); + @Override + public void putBytesMV(byte[][] values) { + putBytes(ArraySerDeUtils.serializeBytesArray(values)); } private void writeHugeChunk(byte[] bytes) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java index bf3537d67c9..82fbd2b4691 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java @@ -29,6 +29,14 @@ public interface VarByteChunkWriter extends Closeable { void putBytes(byte[] value); + void putIntMV(int[] values); + + void putLongMV(long[] values); + + void putFloatMV(float[] values); + + void putDoubleMV(double[] values); + void putStringMV(String[] values); void putBytesMV(byte[][] values); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java index dcdcb997051..e7877f8b606 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java @@ -20,7 +20,6 @@ import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter; @@ -70,7 +69,6 @@ public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType c ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK); } - public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs, DataType valueType, int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion, int targetMaxChunkSizeBytes, int targetDocsPerChunk) @@ -108,54 +106,22 @@ public DataType getValueType() { @Override public void putIntMV(int[] values) { - byte[] bytes = new byte[Integer.BYTES + values.length * Integer.BYTES]; - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - //write the length - byteBuffer.putInt(values.length); - //write the content of each element - for (int value : values) { - byteBuffer.putInt(value); - } - _indexWriter.putBytes(bytes); + _indexWriter.putIntMV(values); } @Override public void putLongMV(long[] values) { - byte[] bytes = new byte[Integer.BYTES + values.length * Long.BYTES]; - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - //write the length - byteBuffer.putInt(values.length); - //write the content of each element - for (long value : values) { - byteBuffer.putLong(value); - } - _indexWriter.putBytes(bytes); + _indexWriter.putLongMV(values); } @Override public void putFloatMV(float[] values) { - byte[] bytes = new byte[Integer.BYTES + values.length * Float.BYTES]; - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - //write the length - byteBuffer.putInt(values.length); - //write the content of each element - for (float value : values) { - byteBuffer.putFloat(value); - } - _indexWriter.putBytes(bytes); + _indexWriter.putFloatMV(values); } @Override public void putDoubleMV(double[] values) { - byte[] bytes = new byte[Integer.BYTES + values.length * Double.BYTES]; - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - //write the length - byteBuffer.putInt(values.length); - //write the content of each element - for (double value : values) { - byteBuffer.putDouble(value); - } - _indexWriter.putBytes(bytes); + _indexWriter.putDoubleMV(values); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java index f96ed6e878a..8e53ecb1563 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java @@ -22,6 +22,7 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -53,92 +54,47 @@ public ChunkReaderContext createContext() { @Override public int getIntMV(int docId, int[] valueBuffer, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getInt(); - } - return numValues; + return ArraySerDeUtils.deserializeIntArrayWithLength(slice(docId, context), valueBuffer); } @Override public int[] getIntMV(int docId, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - int[] valueBuffer = new int[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getInt(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeIntArrayWithLength(slice(docId, context)); } @Override public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getLong(); - } - return numValues; + return ArraySerDeUtils.deserializeLongArrayWithLength(slice(docId, context), valueBuffer); } @Override public long[] getLongMV(int docId, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - long[] valueBuffer = new long[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getLong(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeLongArrayWithLength(slice(docId, context)); } @Override public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getFloat(); - } - return numValues; + return ArraySerDeUtils.deserializeFloatArrayWithLength(slice(docId, context), valueBuffer); } @Override public float[] getFloatMV(int docId, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - float[] valueBuffer = new float[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getFloat(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeFloatArrayWithLength(slice(docId, context)); } @Override public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getDouble(); - } - return numValues; + return ArraySerDeUtils.deserializeDoubleArrayWithLength(slice(docId, context), valueBuffer); } @Override public double[] getDoubleMV(int docId, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - double[] valueBuffer = new double[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getDouble(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeDoubleArrayWithLength(slice(docId, context)); } @Override public int getNumValuesMV(int docId, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - return byteBuffer.getInt(); + return slice(docId, context).getInt(); } private ByteBuffer slice(int docId, ChunkReaderContext context) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java index 851f5f3e7d7..277805d22c9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.compression.ChunkDecompressor; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; @@ -46,6 +47,8 @@ * Chunk-based raw (non-dictionary-encoded) forward index reader for values of SV variable length data types * (BIG_DECIMAL, STRING, BYTES), MV fixed length and MV variable length data types. *

For data layout, please refer to the documentation for {@link VarByteChunkForwardIndexWriterV4} + * + * TODO: Consider reading directly from sliced ByteBuffer instead of copying to byte[] first */ public class VarByteChunkForwardIndexReaderV4 implements ForwardIndexReader { @@ -130,144 +133,62 @@ public Map getMap(int docId, ReaderContext context) { @Override public int getIntMV(int docId, int[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getInt(); - } - return numValues; + return ArraySerDeUtils.deserializeIntArrayWithLength(context.getValue(docId), valueBuffer); } @Override public int[] getIntMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); - int numValues = byteBuffer.getInt(); - int[] valueBuffer = new int[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getInt(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeIntArrayWithLength(context.getValue(docId)); } @Override public int getLongMV(int docId, long[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getLong(); - } - return numValues; + return ArraySerDeUtils.deserializeLongArrayWithLength(context.getValue(docId), valueBuffer); } @Override public long[] getLongMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); - int numValues = byteBuffer.getInt(); - long[] valueBuffer = new long[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getLong(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeLongArrayWithLength(context.getValue(docId)); } @Override public int getFloatMV(int docId, float[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getFloat(); - } - return numValues; + return ArraySerDeUtils.deserializeFloatArrayWithLength(context.getValue(docId), valueBuffer); } @Override public float[] getFloatMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); - int numValues = byteBuffer.getInt(); - float[] valueBuffer = new float[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getFloat(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeFloatArrayWithLength(context.getValue(docId)); } @Override public int getDoubleMV(int docId, double[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getDouble(); - } - return numValues; + return ArraySerDeUtils.deserializeDoubleArrayWithLength(context.getValue(docId), valueBuffer); } @Override public double[] getDoubleMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); - int numValues = byteBuffer.getInt(); - double[] valueBuffer = new double[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getFloat(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeDoubleArrayWithLength(context.getValue(docId)); } @Override public int getStringMV(int docId, String[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - byte[] bytes = context.getValue(docId); - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - int numValues = byteBuffer.getInt(); - int offset = (numValues + 1) * Integer.BYTES; - for (int i = 0; i < numValues; i++) { - int length = byteBuffer.getInt((i + 1) * Integer.BYTES); - valueBuffer[i] = new String(bytes, offset, length, StandardCharsets.UTF_8); - offset += length; - } - return numValues; + return ArraySerDeUtils.deserializeStringArray(context.getValue(docId), valueBuffer); } @Override public String[] getStringMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - byte[] bytes = context.getValue(docId); - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - int numValues = byteBuffer.getInt(); - int offset = (numValues + 1) * Integer.BYTES; - String[] valueBuffer = new String[numValues]; - for (int i = 0; i < numValues; i++) { - int length = byteBuffer.getInt((i + 1) * Integer.BYTES); - valueBuffer[i] = new String(bytes, offset, length, StandardCharsets.UTF_8); - offset += length; - } - return valueBuffer; + return ArraySerDeUtils.deserializeStringArray(context.getValue(docId)); } @Override public int getBytesMV(int docId, byte[][] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); - int numValues = byteBuffer.getInt(); - byteBuffer.position((numValues + 1) * Integer.BYTES); - for (int i = 0; i < numValues; i++) { - int length = byteBuffer.getInt((i + 1) * Integer.BYTES); - byte[] bytes = new byte[length]; - byteBuffer.get(bytes, 0, length); - valueBuffer[i] = bytes; - } - return numValues; + return ArraySerDeUtils.deserializeBytesArray(context.getValue(docId), valueBuffer); } @Override public byte[][] getBytesMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { - ByteBuffer byteBuffer = ByteBuffer.wrap(context.getValue(docId)); - int numValues = byteBuffer.getInt(); - byteBuffer.position((numValues + 1) * Integer.BYTES); - byte[][] valueBuffer = new byte[numValues][]; - for (int i = 0; i < numValues; i++) { - int length = byteBuffer.getInt((i + 1) * Integer.BYTES); - byte[] bytes = new byte[length]; - byteBuffer.get(bytes, 0, length); - valueBuffer[i] = bytes; - } - return valueBuffer; + return ArraySerDeUtils.deserializeBytesArray(context.getValue(docId)); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java index 1d133bb896c..bd7dc5dbb74 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java @@ -19,10 +19,10 @@ package org.apache.pinot.segment.local.segment.index.readers.forward; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -31,6 +31,8 @@ * Chunk-based multi-value raw (non-dictionary-encoded) forward index reader for values of variable length data type * (STRING, BYTES). *

For data layout, please refer to the documentation for {@link VarByteChunkForwardIndexWriter} + * + * TODO: Consider reading directly from sliced ByteBuffer instead of copying to byte[] first */ public final class VarByteChunkMVForwardIndexReader extends BaseChunkForwardIndexReader { private static final int ROW_OFFSET_SIZE = VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; @@ -53,105 +55,28 @@ public ChunkReaderContext createContext() { } @Override - public int getStringMV(final int docId, final String[] valueBuffer, final ChunkReaderContext context) { - byte[] compressedBytes; - if (_isCompressed) { - compressedBytes = getBytesCompressed(docId, context); - } else { - compressedBytes = getBytesUncompressed(docId); - } - ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes); - int numValues = byteBuffer.getInt(); - int contentOffset = (numValues + 1) * Integer.BYTES; - for (int i = 0; i < numValues; i++) { - int length = byteBuffer.getInt((i + 1) * Integer.BYTES); - byte[] bytes = new byte[length]; - byteBuffer.position(contentOffset); - byteBuffer.get(bytes, 0, length); - valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8); - contentOffset += length; - } - return numValues; + public int getStringMV(int docId, String[] valueBuffer, ChunkReaderContext context) { + return ArraySerDeUtils.deserializeStringArray(getBytes(docId, context), valueBuffer); } @Override - public String[] getStringMV(final int docId, final ChunkReaderContext context) { - byte[] compressedBytes; - if (_isCompressed) { - compressedBytes = getBytesCompressed(docId, context); - } else { - compressedBytes = getBytesUncompressed(docId); - } - ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes); - int numValues = byteBuffer.getInt(); - int contentOffset = (numValues + 1) * Integer.BYTES; - String[] valueBuffer = new String[numValues]; - for (int i = 0; i < numValues; i++) { - int length = byteBuffer.getInt((i + 1) * Integer.BYTES); - byte[] bytes = new byte[length]; - byteBuffer.position(contentOffset); - byteBuffer.get(bytes, 0, length); - valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8); - contentOffset += length; - } - return valueBuffer; + public String[] getStringMV(int docId, ChunkReaderContext context) { + return ArraySerDeUtils.deserializeStringArray(getBytes(docId, context)); } @Override - public int getBytesMV(final int docId, final byte[][] valueBuffer, final ChunkReaderContext context) { - byte[] compressedBytes; - if (_isCompressed) { - compressedBytes = getBytesCompressed(docId, context); - } else { - compressedBytes = getBytesUncompressed(docId); - } - ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes); - int numValues = byteBuffer.getInt(); - int contentOffset = (numValues + 1) * Integer.BYTES; - for (int i = 0; i < numValues; i++) { - int length = byteBuffer.getInt((i + 1) * Integer.BYTES); - byte[] bytes = new byte[length]; - byteBuffer.position(contentOffset); - byteBuffer.get(bytes, 0, length); - valueBuffer[i] = bytes; - contentOffset += length; - } - return numValues; + public int getBytesMV(int docId, byte[][] valueBuffer, ChunkReaderContext context) { + return ArraySerDeUtils.deserializeBytesArray(getBytes(docId, context), valueBuffer); } @Override - public byte[][] getBytesMV(final int docId, final ChunkReaderContext context) { - byte[] compressedBytes; - if (_isCompressed) { - compressedBytes = getBytesCompressed(docId, context); - } else { - compressedBytes = getBytesUncompressed(docId); - } - ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes); - int numValues = byteBuffer.getInt(); - int contentOffset = (numValues + 1) * Integer.BYTES; - byte[][] valueBuffer = new byte[numValues][]; - for (int i = 0; i < numValues; i++) { - int length = byteBuffer.getInt((i + 1) * Integer.BYTES); - byte[] bytes = new byte[length]; - byteBuffer.position(contentOffset); - byteBuffer.get(bytes, 0, length); - valueBuffer[i] = bytes; - contentOffset += length; - } - return valueBuffer; + public byte[][] getBytesMV(int docId, ChunkReaderContext context) { + return ArraySerDeUtils.deserializeBytesArray(getBytes(docId, context)); } @Override - public int getNumValuesMV(final int docId, final ChunkReaderContext context) { - byte[] compressedBytes; - if (_isCompressed) { - compressedBytes = getBytesCompressed(docId, context); - } else { - compressedBytes = getBytesUncompressed(docId); - } - ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes); - return byteBuffer.getInt(); + public int getNumValuesMV(int docId, ChunkReaderContext context) { + return ByteBuffer.wrap(getBytes(docId, context)).getInt(); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ArraySerDeUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ArraySerDeUtils.java new file mode 100644 index 00000000000..58238a33e06 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ArraySerDeUtils.java @@ -0,0 +1,363 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils; + +import java.nio.ByteBuffer; + +import static java.nio.charset.StandardCharsets.UTF_8; + + +public class ArraySerDeUtils { + private ArraySerDeUtils() { + } + + public static byte[] serializeIntArrayWithLength(int[] values) { + byte[] bytes = new byte[Integer.BYTES + values.length * Integer.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.putInt(values.length); + writeValues(byteBuffer, values); + return bytes; + } + + public static int[] deserializeIntArrayWithLength(byte[] bytes) { + return deserializeIntArrayWithLength(ByteBuffer.wrap(bytes)); + } + + public static int[] deserializeIntArrayWithLength(ByteBuffer byteBuffer) { + int length = byteBuffer.getInt(); + int[] values = new int[length]; + readValues(byteBuffer, values, length); + return values; + } + + public static int deserializeIntArrayWithLength(byte[] bytes, int[] values) { + return deserializeIntArrayWithLength(ByteBuffer.wrap(bytes), values); + } + + public static int deserializeIntArrayWithLength(ByteBuffer byteBuffer, int[] values) { + int length = byteBuffer.getInt(); + readValues(byteBuffer, values, length); + return length; + } + + public static byte[] serializeIntArrayWithoutLength(int[] values) { + byte[] bytes = new byte[values.length * Integer.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + writeValues(byteBuffer, values); + return bytes; + } + + public static int[] deserializeIntArrayWithoutLength(byte[] bytes) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int length = bytes.length / Integer.BYTES; + int[] values = new int[length]; + readValues(byteBuffer, values, length); + return values; + } + + public static int deserializeIntArrayWithoutLength(byte[] bytes, int[] values) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int length = bytes.length / Integer.BYTES; + readValues(byteBuffer, values, length); + return length; + } + + private static void writeValues(ByteBuffer byteBuffer, int[] values) { + for (int value : values) { + byteBuffer.putInt(value); + } + } + + private static void readValues(ByteBuffer byteBuffer, int[] values, int length) { + for (int i = 0; i < length; i++) { + values[i] = byteBuffer.getInt(); + } + } + + public static byte[] serializeLongArrayWithLength(long[] values) { + byte[] bytes = new byte[Integer.BYTES + values.length * Long.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.putInt(values.length); + writeValues(byteBuffer, values); + return bytes; + } + + public static long[] deserializeLongArrayWithLength(byte[] bytes) { + return deserializeLongArrayWithLength(ByteBuffer.wrap(bytes)); + } + + public static long[] deserializeLongArrayWithLength(ByteBuffer byteBuffer) { + int length = byteBuffer.getInt(); + long[] values = new long[length]; + readValues(byteBuffer, values, length); + return values; + } + + public static int deserializeLongArrayWithLength(byte[] bytes, long[] values) { + return deserializeLongArrayWithLength(ByteBuffer.wrap(bytes), values); + } + + public static int deserializeLongArrayWithLength(ByteBuffer byteBuffer, long[] values) { + int length = byteBuffer.getInt(); + readValues(byteBuffer, values, length); + return length; + } + + public static byte[] serializeLongArrayWithoutLength(long[] values) { + byte[] bytes = new byte[values.length * Long.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + writeValues(byteBuffer, values); + return bytes; + } + + public static long[] deserializeLongArrayWithoutLength(byte[] bytes) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int length = bytes.length / Long.BYTES; + long[] values = new long[length]; + readValues(byteBuffer, values, length); + return values; + } + + public static int deserializeLongArrayWithoutLength(byte[] bytes, long[] values) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int length = bytes.length / Long.BYTES; + readValues(byteBuffer, values, length); + return length; + } + + private static void writeValues(ByteBuffer byteBuffer, long[] values) { + for (long value : values) { + byteBuffer.putLong(value); + } + } + + private static void readValues(ByteBuffer byteBuffer, long[] values, int length) { + for (int i = 0; i < length; i++) { + values[i] = byteBuffer.getLong(); + } + } + + public static byte[] serializeFloatArrayWithLength(float[] values) { + byte[] bytes = new byte[Integer.BYTES + values.length * Float.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.putInt(values.length); + writeValues(byteBuffer, values); + return bytes; + } + + public static float[] deserializeFloatArrayWithLength(byte[] bytes) { + return deserializeFloatArrayWithLength(ByteBuffer.wrap(bytes)); + } + + public static float[] deserializeFloatArrayWithLength(ByteBuffer byteBuffer) { + int length = byteBuffer.getInt(); + float[] values = new float[length]; + readValues(byteBuffer, values, length); + return values; + } + + public static int deserializeFloatArrayWithLength(byte[] bytes, float[] values) { + return deserializeFloatArrayWithLength(ByteBuffer.wrap(bytes), values); + } + + public static int deserializeFloatArrayWithLength(ByteBuffer byteBuffer, float[] values) { + int length = byteBuffer.getInt(); + readValues(byteBuffer, values, length); + return length; + } + + public static byte[] serializeFloatArrayWithoutLength(float[] values) { + byte[] bytes = new byte[values.length * Float.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + writeValues(byteBuffer, values); + return bytes; + } + + public static float[] deserializeFloatArrayWithoutLength(byte[] bytes) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int length = bytes.length / Float.BYTES; + float[] values = new float[length]; + readValues(byteBuffer, values, length); + return values; + } + + public static int deserializeFloatArrayWithoutLength(byte[] bytes, float[] values) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int length = bytes.length / Float.BYTES; + readValues(byteBuffer, values, length); + return length; + } + + private static void writeValues(ByteBuffer byteBuffer, float[] values) { + for (float value : values) { + byteBuffer.putFloat(value); + } + } + + private static void readValues(ByteBuffer byteBuffer, float[] values, int length) { + for (int i = 0; i < length; i++) { + values[i] = byteBuffer.getFloat(); + } + } + + public static byte[] serializeDoubleArrayWithLength(double[] values) { + byte[] bytes = new byte[Integer.BYTES + values.length * Double.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.putInt(values.length); + writeValues(byteBuffer, values); + return bytes; + } + + public static double[] deserializeDoubleArrayWithLength(byte[] bytes) { + return deserializeDoubleArrayWithLength(ByteBuffer.wrap(bytes)); + } + + public static double[] deserializeDoubleArrayWithLength(ByteBuffer byteBuffer) { + int length = byteBuffer.getInt(); + double[] values = new double[length]; + readValues(byteBuffer, values, length); + return values; + } + + public static int deserializeDoubleArrayWithLength(byte[] bytes, double[] values) { + return deserializeDoubleArrayWithLength(ByteBuffer.wrap(bytes), values); + } + + public static int deserializeDoubleArrayWithLength(ByteBuffer byteBuffer, double[] values) { + int length = byteBuffer.getInt(); + readValues(byteBuffer, values, length); + return length; + } + + public static byte[] serializeDoubleArrayWithoutLength(double[] values) { + byte[] bytes = new byte[values.length * Double.BYTES]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + writeValues(byteBuffer, values); + return bytes; + } + + public static double[] deserializeDoubleArrayWithoutLength(byte[] bytes) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int length = bytes.length / Double.BYTES; + double[] values = new double[length]; + readValues(byteBuffer, values, length); + return values; + } + + public static int deserializeDoubleArrayWithoutLength(byte[] bytes, double[] values) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int length = bytes.length / Double.BYTES; + readValues(byteBuffer, values, length); + return length; + } + + private static void writeValues(ByteBuffer byteBuffer, double[] values) { + for (double value : values) { + byteBuffer.putDouble(value); + } + } + + private static void readValues(ByteBuffer byteBuffer, double[] values, int length) { + for (int i = 0; i < length; i++) { + values[i] = byteBuffer.getDouble(); + } + } + + public static byte[] serializeStringArray(String[] values) { + // Format: [numValues][length1][length2]...[lengthN][value1][value2]...[valueN] + int headerSize = Integer.BYTES + Integer.BYTES * values.length; + int size = headerSize; + byte[][] stringBytes = new byte[values.length][]; + for (int i = 0; i < values.length; i++) { + stringBytes[i] = values[i].getBytes(UTF_8); + size += stringBytes[i].length; + } + return writeValues(stringBytes, size, headerSize); + } + + public static String[] deserializeStringArray(byte[] bytes) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int numValues = byteBuffer.getInt(); + String[] values = new String[numValues]; + readValues(bytes, byteBuffer, values, numValues); + return values; + } + + public static int deserializeStringArray(byte[] bytes, String[] values) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int numValues = byteBuffer.getInt(); + readValues(bytes, byteBuffer, values, numValues); + return numValues; + } + + public static byte[] serializeBytesArray(byte[][] values) { + // Format: [numValues][length1][length2]...[lengthN][value1][value2]...[valueN] + int headerSize = Integer.BYTES + Integer.BYTES * values.length; + int size = headerSize; + for (byte[] value : values) { + size += value.length; + } + return writeValues(values, size, headerSize); + } + + public static byte[][] deserializeBytesArray(byte[] bytes) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int numValues = byteBuffer.getInt(); + byte[][] values = new byte[numValues][]; + readValues(byteBuffer, values, numValues); + return values; + } + + public static int deserializeBytesArray(byte[] bytes, byte[][] values) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int numValues = byteBuffer.getInt(); + readValues(byteBuffer, values, numValues); + return numValues; + } + + private static byte[] writeValues(byte[][] values, int size, int headerSize) { + byte[] bytes = new byte[size]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.putInt(values.length); + byteBuffer.position(headerSize); + for (int i = 0; i < values.length; i++) { + byteBuffer.putInt((i + 1) * Integer.BYTES, values[i].length); + byteBuffer.put(values[i]); + } + return bytes; + } + + private static void readValues(byte[] bytes, ByteBuffer byteBuffer, String[] values, int numValues) { + int offset = (numValues + 1) * Integer.BYTES; + for (int i = 0; i < numValues; i++) { + int length = byteBuffer.getInt(); + values[i] = new String(bytes, offset, length, UTF_8); + offset += length; + } + } + + private static void readValues(ByteBuffer byteBuffer, byte[][] values, int numValues) { + byteBuffer.position((numValues + 1) * Integer.BYTES); + for (int i = 0; i < numValues; i++) { + int length = byteBuffer.getInt((i + 1) * Integer.BYTES); + values[i] = new byte[length]; + byteBuffer.get(values[i]); + } + } +}