diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java index fadcce827e0..0a6bd479256 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriter.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.math.BigDecimal; import javax.annotation.concurrent.NotThreadSafe; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.spi.utils.BigDecimalUtils; @@ -97,54 +98,34 @@ public void putBytes(byte[] value) { writeChunkIfNecessary(); } - // Note: some duplication is tolerated between these overloads for the sake of memory efficiency + @Override + public void putIntMV(int[] values) { + putBytes(ArraySerDeUtils.serializeIntArrayWithLength(values)); + } + + @Override + public void putLongMV(long[] values) { + putBytes(ArraySerDeUtils.serializeLongArrayWithLength(values)); + } + + @Override + public void putFloatMV(float[] values) { + putBytes(ArraySerDeUtils.serializeFloatArrayWithLength(values)); + } + + @Override + public void putDoubleMV(double[] values) { + putBytes(ArraySerDeUtils.serializeDoubleArrayWithLength(values)); + } @Override public void putStringMV(String[] values) { - // the entire String[] will be encoded as a single string, write the header here - _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet); - _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; - // write all the strings into the data buffer as if it's a single string, - // but with its own embedded header so offsets to strings within the body - // can be located - _chunkBuffer.putInt(_chunkDataOffSet, values.length); - _chunkDataOffSet += Integer.BYTES; - int headerSize = Integer.BYTES * values.length; - int bodyPosition = _chunkDataOffSet + headerSize; - _chunkBuffer.position(bodyPosition); - int bodySize = 0; - for (int i = 0, h = _chunkDataOffSet; i < values.length; i++, h += Integer.BYTES) { - byte[] utf8 = values[i].getBytes(UTF_8); - _chunkBuffer.putInt(h, utf8.length); - _chunkBuffer.put(utf8); - bodySize += utf8.length; - } - _chunkDataOffSet += headerSize + bodySize; - writeChunkIfNecessary(); + putBytes(ArraySerDeUtils.serializeStringArray(values)); } @Override public void putBytesMV(byte[][] values) { - // the entire byte[][] will be encoded as a single string, write the header here - _chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet); - _chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE; - // write all the byte[]s into the data buffer as if it's a single byte[], - // but with its own embedded header so offsets to byte[]s within the body - // can be located - _chunkBuffer.putInt(_chunkDataOffSet, values.length); - _chunkDataOffSet += Integer.BYTES; - int headerSize = Integer.BYTES * values.length; - int bodyPosition = _chunkDataOffSet + headerSize; - _chunkBuffer.position(bodyPosition); - int bodySize = 0; - for (int i = 0, h = _chunkDataOffSet; i < values.length; i++, h += Integer.BYTES) { - byte[] bytes = values[i]; - _chunkBuffer.putInt(h, bytes.length); - _chunkBuffer.put(bytes); - bodySize += bytes.length; - } - _chunkDataOffSet += headerSize + bodySize; - writeChunkIfNecessary(); + putBytes(ArraySerDeUtils.serializeBytesArray(values)); } private void writeChunkIfNecessary() { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java index 440808a6b0b..e7bc30fc702 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java @@ -30,6 +30,7 @@ import javax.annotation.concurrent.NotThreadSafe; import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.compression.ChunkCompressor; import org.apache.pinot.segment.spi.memory.CleanerUtil; @@ -37,8 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.nio.charset.StandardCharsets.UTF_8; - /** * Chunk-based raw (non-dictionary-encoded) forward index writer where each chunk contains variable number of docs, and @@ -145,49 +144,33 @@ public void putBytes(byte[] bytes) { } @Override - public void putStringMV(String[] values) { - // num values + length of each value - int headerSize = Integer.BYTES + Integer.BYTES * values.length; - int size = headerSize; - byte[][] stringBytes = new byte[values.length][]; - for (int i = 0; i < values.length; i++) { - stringBytes[i] = values[i].getBytes(UTF_8); - size += stringBytes[i].length; - } + public void putIntMV(int[] values) { + putBytes(ArraySerDeUtils.serializeIntArrayWithLength(values)); + } - // Format : [numValues][length1][length2]...[lengthN][value1][value2]...[valueN] - byte[] serializedBytes = new byte[size]; - ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes); - byteBuffer.putInt(values.length); - byteBuffer.position(headerSize); - for (int i = 0; i < values.length; i++) { - byteBuffer.putInt((i + 1) * Integer.BYTES, stringBytes[i].length); - byteBuffer.put(stringBytes[i]); - } + @Override + public void putLongMV(long[] values) { + putBytes(ArraySerDeUtils.serializeLongArrayWithLength(values)); + } - putBytes(serializedBytes); + @Override + public void putFloatMV(float[] values) { + putBytes(ArraySerDeUtils.serializeFloatArrayWithLength(values)); } @Override - public void putBytesMV(byte[][] values) { - // num values + length of each value - int headerSize = Integer.BYTES + Integer.BYTES * values.length; - int size = headerSize; - for (byte[] value : values) { - size += value.length; - } + public void putDoubleMV(double[] values) { + putBytes(ArraySerDeUtils.serializeDoubleArrayWithLength(values)); + } - // Format : [numValues][length1][length2]...[lengthN][bytes1][bytes2]...[bytesN] - byte[] serializedBytes = new byte[size]; - ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes); - byteBuffer.putInt(values.length); - byteBuffer.position(headerSize); - for (int i = 0; i < values.length; i++) { - byteBuffer.putInt((i + 1) * Integer.BYTES, values[i].length); - byteBuffer.put(values[i]); - } + @Override + public void putStringMV(String[] values) { + putBytes(ArraySerDeUtils.serializeStringArray(values)); + } - putBytes(serializedBytes); + @Override + public void putBytesMV(byte[][] values) { + putBytes(ArraySerDeUtils.serializeBytesArray(values)); } private void writeHugeChunk(byte[] bytes) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java index bf3537d67c9..82fbd2b4691 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkWriter.java @@ -29,6 +29,14 @@ public interface VarByteChunkWriter extends Closeable { void putBytes(byte[] value); + void putIntMV(int[] values); + + void putLongMV(long[] values); + + void putFloatMV(float[] values); + + void putDoubleMV(double[] values); + void putStringMV(String[] values); void putBytesMV(byte[][] values); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java index dcdcb997051..e7877f8b606 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java @@ -20,7 +20,6 @@ import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter; @@ -70,7 +69,6 @@ public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType c ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK); } - public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs, DataType valueType, int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion, int targetMaxChunkSizeBytes, int targetDocsPerChunk) @@ -108,54 +106,22 @@ public DataType getValueType() { @Override public void putIntMV(int[] values) { - byte[] bytes = new byte[Integer.BYTES + values.length * Integer.BYTES]; - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - //write the length - byteBuffer.putInt(values.length); - //write the content of each element - for (int value : values) { - byteBuffer.putInt(value); - } - _indexWriter.putBytes(bytes); + _indexWriter.putIntMV(values); } @Override public void putLongMV(long[] values) { - byte[] bytes = new byte[Integer.BYTES + values.length * Long.BYTES]; - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - //write the length - byteBuffer.putInt(values.length); - //write the content of each element - for (long value : values) { - byteBuffer.putLong(value); - } - _indexWriter.putBytes(bytes); + _indexWriter.putLongMV(values); } @Override public void putFloatMV(float[] values) { - byte[] bytes = new byte[Integer.BYTES + values.length * Float.BYTES]; - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - //write the length - byteBuffer.putInt(values.length); - //write the content of each element - for (float value : values) { - byteBuffer.putFloat(value); - } - _indexWriter.putBytes(bytes); + _indexWriter.putFloatMV(values); } @Override public void putDoubleMV(double[] values) { - byte[] bytes = new byte[Integer.BYTES + values.length * Double.BYTES]; - ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); - //write the length - byteBuffer.putInt(values.length); - //write the content of each element - for (double value : values) { - byteBuffer.putDouble(value); - } - _indexWriter.putBytes(bytes); + _indexWriter.putDoubleMV(values); } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java index f96ed6e878a..8e53ecb1563 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java @@ -22,6 +22,7 @@ import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -53,92 +54,47 @@ public ChunkReaderContext createContext() { @Override public int getIntMV(int docId, int[] valueBuffer, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getInt(); - } - return numValues; + return ArraySerDeUtils.deserializeIntArrayWithLength(slice(docId, context), valueBuffer); } @Override public int[] getIntMV(int docId, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - int[] valueBuffer = new int[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getInt(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeIntArrayWithLength(slice(docId, context)); } @Override public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getLong(); - } - return numValues; + return ArraySerDeUtils.deserializeLongArrayWithLength(slice(docId, context), valueBuffer); } @Override public long[] getLongMV(int docId, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - long[] valueBuffer = new long[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getLong(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeLongArrayWithLength(slice(docId, context)); } @Override public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getFloat(); - } - return numValues; + return ArraySerDeUtils.deserializeFloatArrayWithLength(slice(docId, context), valueBuffer); } @Override public float[] getFloatMV(int docId, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - float[] valueBuffer = new float[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getFloat(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeFloatArrayWithLength(slice(docId, context)); } @Override public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getDouble(); - } - return numValues; + return ArraySerDeUtils.deserializeDoubleArrayWithLength(slice(docId, context), valueBuffer); } @Override public double[] getDoubleMV(int docId, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - int numValues = byteBuffer.getInt(); - double[] valueBuffer = new double[numValues]; - for (int i = 0; i < numValues; i++) { - valueBuffer[i] = byteBuffer.getDouble(); - } - return valueBuffer; + return ArraySerDeUtils.deserializeDoubleArrayWithLength(slice(docId, context)); } @Override public int getNumValuesMV(int docId, ChunkReaderContext context) { - ByteBuffer byteBuffer = slice(docId, context); - return byteBuffer.getInt(); + return slice(docId, context).getInt(); } private ByteBuffer slice(int docId, ChunkReaderContext context) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java index 851f5f3e7d7..277805d22c9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java @@ -29,6 +29,7 @@ import java.util.Map; import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; import org.apache.pinot.segment.spi.compression.ChunkDecompressor; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; @@ -46,6 +47,8 @@ * Chunk-based raw (non-dictionary-encoded) forward index reader for values of SV variable length data types * (BIG_DECIMAL, STRING, BYTES), MV fixed length and MV variable length data types. *
For data layout, please refer to the documentation for {@link VarByteChunkForwardIndexWriterV4}
+ *
+ * TODO: Consider reading directly from sliced ByteBuffer instead of copying to byte[] first
*/
public class VarByteChunkForwardIndexReaderV4
implements ForwardIndexReader For data layout, please refer to the documentation for {@link VarByteChunkForwardIndexWriter}
+ *
+ * TODO: Consider reading directly from sliced ByteBuffer instead of copying to byte[] first
*/
public final class VarByteChunkMVForwardIndexReader extends BaseChunkForwardIndexReader {
private static final int ROW_OFFSET_SIZE = VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
@@ -53,105 +55,28 @@ public ChunkReaderContext createContext() {
}
@Override
- public int getStringMV(final int docId, final String[] valueBuffer, final ChunkReaderContext context) {
- byte[] compressedBytes;
- if (_isCompressed) {
- compressedBytes = getBytesCompressed(docId, context);
- } else {
- compressedBytes = getBytesUncompressed(docId);
- }
- ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
- int numValues = byteBuffer.getInt();
- int contentOffset = (numValues + 1) * Integer.BYTES;
- for (int i = 0; i < numValues; i++) {
- int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
- byte[] bytes = new byte[length];
- byteBuffer.position(contentOffset);
- byteBuffer.get(bytes, 0, length);
- valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
- contentOffset += length;
- }
- return numValues;
+ public int getStringMV(int docId, String[] valueBuffer, ChunkReaderContext context) {
+ return ArraySerDeUtils.deserializeStringArray(getBytes(docId, context), valueBuffer);
}
@Override
- public String[] getStringMV(final int docId, final ChunkReaderContext context) {
- byte[] compressedBytes;
- if (_isCompressed) {
- compressedBytes = getBytesCompressed(docId, context);
- } else {
- compressedBytes = getBytesUncompressed(docId);
- }
- ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
- int numValues = byteBuffer.getInt();
- int contentOffset = (numValues + 1) * Integer.BYTES;
- String[] valueBuffer = new String[numValues];
- for (int i = 0; i < numValues; i++) {
- int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
- byte[] bytes = new byte[length];
- byteBuffer.position(contentOffset);
- byteBuffer.get(bytes, 0, length);
- valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
- contentOffset += length;
- }
- return valueBuffer;
+ public String[] getStringMV(int docId, ChunkReaderContext context) {
+ return ArraySerDeUtils.deserializeStringArray(getBytes(docId, context));
}
@Override
- public int getBytesMV(final int docId, final byte[][] valueBuffer, final ChunkReaderContext context) {
- byte[] compressedBytes;
- if (_isCompressed) {
- compressedBytes = getBytesCompressed(docId, context);
- } else {
- compressedBytes = getBytesUncompressed(docId);
- }
- ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
- int numValues = byteBuffer.getInt();
- int contentOffset = (numValues + 1) * Integer.BYTES;
- for (int i = 0; i < numValues; i++) {
- int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
- byte[] bytes = new byte[length];
- byteBuffer.position(contentOffset);
- byteBuffer.get(bytes, 0, length);
- valueBuffer[i] = bytes;
- contentOffset += length;
- }
- return numValues;
+ public int getBytesMV(int docId, byte[][] valueBuffer, ChunkReaderContext context) {
+ return ArraySerDeUtils.deserializeBytesArray(getBytes(docId, context), valueBuffer);
}
@Override
- public byte[][] getBytesMV(final int docId, final ChunkReaderContext context) {
- byte[] compressedBytes;
- if (_isCompressed) {
- compressedBytes = getBytesCompressed(docId, context);
- } else {
- compressedBytes = getBytesUncompressed(docId);
- }
- ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
- int numValues = byteBuffer.getInt();
- int contentOffset = (numValues + 1) * Integer.BYTES;
- byte[][] valueBuffer = new byte[numValues][];
- for (int i = 0; i < numValues; i++) {
- int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
- byte[] bytes = new byte[length];
- byteBuffer.position(contentOffset);
- byteBuffer.get(bytes, 0, length);
- valueBuffer[i] = bytes;
- contentOffset += length;
- }
- return valueBuffer;
+ public byte[][] getBytesMV(int docId, ChunkReaderContext context) {
+ return ArraySerDeUtils.deserializeBytesArray(getBytes(docId, context));
}
@Override
- public int getNumValuesMV(final int docId, final ChunkReaderContext context) {
- byte[] compressedBytes;
- if (_isCompressed) {
- compressedBytes = getBytesCompressed(docId, context);
- } else {
- compressedBytes = getBytesUncompressed(docId);
- }
- ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
- return byteBuffer.getInt();
+ public int getNumValuesMV(int docId, ChunkReaderContext context) {
+ return ByteBuffer.wrap(getBytes(docId, context)).getInt();
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ArraySerDeUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ArraySerDeUtils.java
new file mode 100644
index 00000000000..58238a33e06
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/ArraySerDeUtils.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.nio.ByteBuffer;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+public class ArraySerDeUtils {
+ private ArraySerDeUtils() {
+ }
+
+ public static byte[] serializeIntArrayWithLength(int[] values) {
+ byte[] bytes = new byte[Integer.BYTES + values.length * Integer.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(values.length);
+ writeValues(byteBuffer, values);
+ return bytes;
+ }
+
+ public static int[] deserializeIntArrayWithLength(byte[] bytes) {
+ return deserializeIntArrayWithLength(ByteBuffer.wrap(bytes));
+ }
+
+ public static int[] deserializeIntArrayWithLength(ByteBuffer byteBuffer) {
+ int length = byteBuffer.getInt();
+ int[] values = new int[length];
+ readValues(byteBuffer, values, length);
+ return values;
+ }
+
+ public static int deserializeIntArrayWithLength(byte[] bytes, int[] values) {
+ return deserializeIntArrayWithLength(ByteBuffer.wrap(bytes), values);
+ }
+
+ public static int deserializeIntArrayWithLength(ByteBuffer byteBuffer, int[] values) {
+ int length = byteBuffer.getInt();
+ readValues(byteBuffer, values, length);
+ return length;
+ }
+
+ public static byte[] serializeIntArrayWithoutLength(int[] values) {
+ byte[] bytes = new byte[values.length * Integer.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ writeValues(byteBuffer, values);
+ return bytes;
+ }
+
+ public static int[] deserializeIntArrayWithoutLength(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int length = bytes.length / Integer.BYTES;
+ int[] values = new int[length];
+ readValues(byteBuffer, values, length);
+ return values;
+ }
+
+ public static int deserializeIntArrayWithoutLength(byte[] bytes, int[] values) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int length = bytes.length / Integer.BYTES;
+ readValues(byteBuffer, values, length);
+ return length;
+ }
+
+ private static void writeValues(ByteBuffer byteBuffer, int[] values) {
+ for (int value : values) {
+ byteBuffer.putInt(value);
+ }
+ }
+
+ private static void readValues(ByteBuffer byteBuffer, int[] values, int length) {
+ for (int i = 0; i < length; i++) {
+ values[i] = byteBuffer.getInt();
+ }
+ }
+
+ public static byte[] serializeLongArrayWithLength(long[] values) {
+ byte[] bytes = new byte[Integer.BYTES + values.length * Long.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(values.length);
+ writeValues(byteBuffer, values);
+ return bytes;
+ }
+
+ public static long[] deserializeLongArrayWithLength(byte[] bytes) {
+ return deserializeLongArrayWithLength(ByteBuffer.wrap(bytes));
+ }
+
+ public static long[] deserializeLongArrayWithLength(ByteBuffer byteBuffer) {
+ int length = byteBuffer.getInt();
+ long[] values = new long[length];
+ readValues(byteBuffer, values, length);
+ return values;
+ }
+
+ public static int deserializeLongArrayWithLength(byte[] bytes, long[] values) {
+ return deserializeLongArrayWithLength(ByteBuffer.wrap(bytes), values);
+ }
+
+ public static int deserializeLongArrayWithLength(ByteBuffer byteBuffer, long[] values) {
+ int length = byteBuffer.getInt();
+ readValues(byteBuffer, values, length);
+ return length;
+ }
+
+ public static byte[] serializeLongArrayWithoutLength(long[] values) {
+ byte[] bytes = new byte[values.length * Long.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ writeValues(byteBuffer, values);
+ return bytes;
+ }
+
+ public static long[] deserializeLongArrayWithoutLength(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int length = bytes.length / Long.BYTES;
+ long[] values = new long[length];
+ readValues(byteBuffer, values, length);
+ return values;
+ }
+
+ public static int deserializeLongArrayWithoutLength(byte[] bytes, long[] values) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int length = bytes.length / Long.BYTES;
+ readValues(byteBuffer, values, length);
+ return length;
+ }
+
+ private static void writeValues(ByteBuffer byteBuffer, long[] values) {
+ for (long value : values) {
+ byteBuffer.putLong(value);
+ }
+ }
+
+ private static void readValues(ByteBuffer byteBuffer, long[] values, int length) {
+ for (int i = 0; i < length; i++) {
+ values[i] = byteBuffer.getLong();
+ }
+ }
+
+ public static byte[] serializeFloatArrayWithLength(float[] values) {
+ byte[] bytes = new byte[Integer.BYTES + values.length * Float.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(values.length);
+ writeValues(byteBuffer, values);
+ return bytes;
+ }
+
+ public static float[] deserializeFloatArrayWithLength(byte[] bytes) {
+ return deserializeFloatArrayWithLength(ByteBuffer.wrap(bytes));
+ }
+
+ public static float[] deserializeFloatArrayWithLength(ByteBuffer byteBuffer) {
+ int length = byteBuffer.getInt();
+ float[] values = new float[length];
+ readValues(byteBuffer, values, length);
+ return values;
+ }
+
+ public static int deserializeFloatArrayWithLength(byte[] bytes, float[] values) {
+ return deserializeFloatArrayWithLength(ByteBuffer.wrap(bytes), values);
+ }
+
+ public static int deserializeFloatArrayWithLength(ByteBuffer byteBuffer, float[] values) {
+ int length = byteBuffer.getInt();
+ readValues(byteBuffer, values, length);
+ return length;
+ }
+
+ public static byte[] serializeFloatArrayWithoutLength(float[] values) {
+ byte[] bytes = new byte[values.length * Float.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ writeValues(byteBuffer, values);
+ return bytes;
+ }
+
+ public static float[] deserializeFloatArrayWithoutLength(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int length = bytes.length / Float.BYTES;
+ float[] values = new float[length];
+ readValues(byteBuffer, values, length);
+ return values;
+ }
+
+ public static int deserializeFloatArrayWithoutLength(byte[] bytes, float[] values) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int length = bytes.length / Float.BYTES;
+ readValues(byteBuffer, values, length);
+ return length;
+ }
+
+ private static void writeValues(ByteBuffer byteBuffer, float[] values) {
+ for (float value : values) {
+ byteBuffer.putFloat(value);
+ }
+ }
+
+ private static void readValues(ByteBuffer byteBuffer, float[] values, int length) {
+ for (int i = 0; i < length; i++) {
+ values[i] = byteBuffer.getFloat();
+ }
+ }
+
+ public static byte[] serializeDoubleArrayWithLength(double[] values) {
+ byte[] bytes = new byte[Integer.BYTES + values.length * Double.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(values.length);
+ writeValues(byteBuffer, values);
+ return bytes;
+ }
+
+ public static double[] deserializeDoubleArrayWithLength(byte[] bytes) {
+ return deserializeDoubleArrayWithLength(ByteBuffer.wrap(bytes));
+ }
+
+ public static double[] deserializeDoubleArrayWithLength(ByteBuffer byteBuffer) {
+ int length = byteBuffer.getInt();
+ double[] values = new double[length];
+ readValues(byteBuffer, values, length);
+ return values;
+ }
+
+ public static int deserializeDoubleArrayWithLength(byte[] bytes, double[] values) {
+ return deserializeDoubleArrayWithLength(ByteBuffer.wrap(bytes), values);
+ }
+
+ public static int deserializeDoubleArrayWithLength(ByteBuffer byteBuffer, double[] values) {
+ int length = byteBuffer.getInt();
+ readValues(byteBuffer, values, length);
+ return length;
+ }
+
+ public static byte[] serializeDoubleArrayWithoutLength(double[] values) {
+ byte[] bytes = new byte[values.length * Double.BYTES];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ writeValues(byteBuffer, values);
+ return bytes;
+ }
+
+ public static double[] deserializeDoubleArrayWithoutLength(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int length = bytes.length / Double.BYTES;
+ double[] values = new double[length];
+ readValues(byteBuffer, values, length);
+ return values;
+ }
+
+ public static int deserializeDoubleArrayWithoutLength(byte[] bytes, double[] values) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int length = bytes.length / Double.BYTES;
+ readValues(byteBuffer, values, length);
+ return length;
+ }
+
+ private static void writeValues(ByteBuffer byteBuffer, double[] values) {
+ for (double value : values) {
+ byteBuffer.putDouble(value);
+ }
+ }
+
+ private static void readValues(ByteBuffer byteBuffer, double[] values, int length) {
+ for (int i = 0; i < length; i++) {
+ values[i] = byteBuffer.getDouble();
+ }
+ }
+
+ public static byte[] serializeStringArray(String[] values) {
+ // Format: [numValues][length1][length2]...[lengthN][value1][value2]...[valueN]
+ int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+ int size = headerSize;
+ byte[][] stringBytes = new byte[values.length][];
+ for (int i = 0; i < values.length; i++) {
+ stringBytes[i] = values[i].getBytes(UTF_8);
+ size += stringBytes[i].length;
+ }
+ return writeValues(stringBytes, size, headerSize);
+ }
+
+ public static String[] deserializeStringArray(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int numValues = byteBuffer.getInt();
+ String[] values = new String[numValues];
+ readValues(bytes, byteBuffer, values, numValues);
+ return values;
+ }
+
+ public static int deserializeStringArray(byte[] bytes, String[] values) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int numValues = byteBuffer.getInt();
+ readValues(bytes, byteBuffer, values, numValues);
+ return numValues;
+ }
+
+ public static byte[] serializeBytesArray(byte[][] values) {
+ // Format: [numValues][length1][length2]...[lengthN][value1][value2]...[valueN]
+ int headerSize = Integer.BYTES + Integer.BYTES * values.length;
+ int size = headerSize;
+ for (byte[] value : values) {
+ size += value.length;
+ }
+ return writeValues(values, size, headerSize);
+ }
+
+ public static byte[][] deserializeBytesArray(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int numValues = byteBuffer.getInt();
+ byte[][] values = new byte[numValues][];
+ readValues(byteBuffer, values, numValues);
+ return values;
+ }
+
+ public static int deserializeBytesArray(byte[] bytes, byte[][] values) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ int numValues = byteBuffer.getInt();
+ readValues(byteBuffer, values, numValues);
+ return numValues;
+ }
+
+ private static byte[] writeValues(byte[][] values, int size, int headerSize) {
+ byte[] bytes = new byte[size];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ byteBuffer.putInt(values.length);
+ byteBuffer.position(headerSize);
+ for (int i = 0; i < values.length; i++) {
+ byteBuffer.putInt((i + 1) * Integer.BYTES, values[i].length);
+ byteBuffer.put(values[i]);
+ }
+ return bytes;
+ }
+
+ private static void readValues(byte[] bytes, ByteBuffer byteBuffer, String[] values, int numValues) {
+ int offset = (numValues + 1) * Integer.BYTES;
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt();
+ values[i] = new String(bytes, offset, length, UTF_8);
+ offset += length;
+ }
+ }
+
+ private static void readValues(ByteBuffer byteBuffer, byte[][] values, int numValues) {
+ byteBuffer.position((numValues + 1) * Integer.BYTES);
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ values[i] = new byte[length];
+ byteBuffer.get(values[i]);
+ }
+ }
+}