Skip to content

Commit

Permalink
Extract common MV ser/de logic into ArraySerDeUtils (#14209)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Oct 11, 2024
1 parent 782e769 commit b067ae1
Show file tree
Hide file tree
Showing 8 changed files with 456 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.spi.utils.BigDecimalUtils;

Expand Down Expand Up @@ -97,54 +98,34 @@ public void putBytes(byte[] value) {
writeChunkIfNecessary();
}

// Note: some duplication is tolerated between these overloads for the sake of memory efficiency
@Override
public void putIntMV(int[] values) {
putBytes(ArraySerDeUtils.serializeIntArrayWithLength(values));
}

@Override
public void putLongMV(long[] values) {
putBytes(ArraySerDeUtils.serializeLongArrayWithLength(values));
}

@Override
public void putFloatMV(float[] values) {
putBytes(ArraySerDeUtils.serializeFloatArrayWithLength(values));
}

@Override
public void putDoubleMV(double[] values) {
putBytes(ArraySerDeUtils.serializeDoubleArrayWithLength(values));
}

@Override
public void putStringMV(String[] values) {
// the entire String[] will be encoded as a single string, write the header here
_chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
_chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
// write all the strings into the data buffer as if it's a single string,
// but with its own embedded header so offsets to strings within the body
// can be located
_chunkBuffer.putInt(_chunkDataOffSet, values.length);
_chunkDataOffSet += Integer.BYTES;
int headerSize = Integer.BYTES * values.length;
int bodyPosition = _chunkDataOffSet + headerSize;
_chunkBuffer.position(bodyPosition);
int bodySize = 0;
for (int i = 0, h = _chunkDataOffSet; i < values.length; i++, h += Integer.BYTES) {
byte[] utf8 = values[i].getBytes(UTF_8);
_chunkBuffer.putInt(h, utf8.length);
_chunkBuffer.put(utf8);
bodySize += utf8.length;
}
_chunkDataOffSet += headerSize + bodySize;
writeChunkIfNecessary();
putBytes(ArraySerDeUtils.serializeStringArray(values));
}

@Override
public void putBytesMV(byte[][] values) {
// the entire byte[][] will be encoded as a single string, write the header here
_chunkBuffer.putInt(_chunkHeaderOffset, _chunkDataOffSet);
_chunkHeaderOffset += CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
// write all the byte[]s into the data buffer as if it's a single byte[],
// but with its own embedded header so offsets to byte[]s within the body
// can be located
_chunkBuffer.putInt(_chunkDataOffSet, values.length);
_chunkDataOffSet += Integer.BYTES;
int headerSize = Integer.BYTES * values.length;
int bodyPosition = _chunkDataOffSet + headerSize;
_chunkBuffer.position(bodyPosition);
int bodySize = 0;
for (int i = 0, h = _chunkDataOffSet; i < values.length; i++, h += Integer.BYTES) {
byte[] bytes = values[i];
_chunkBuffer.putInt(h, bytes.length);
_chunkBuffer.put(bytes);
bodySize += bytes.length;
}
_chunkDataOffSet += headerSize + bodySize;
writeChunkIfNecessary();
putBytes(ArraySerDeUtils.serializeBytesArray(values));
}

private void writeChunkIfNecessary() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
import org.apache.pinot.segment.spi.memory.CleanerUtil;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.nio.charset.StandardCharsets.UTF_8;


/**
* Chunk-based raw (non-dictionary-encoded) forward index writer where each chunk contains variable number of docs, and
Expand Down Expand Up @@ -145,49 +144,33 @@ public void putBytes(byte[] bytes) {
}

@Override
public void putStringMV(String[] values) {
// num values + length of each value
int headerSize = Integer.BYTES + Integer.BYTES * values.length;
int size = headerSize;
byte[][] stringBytes = new byte[values.length][];
for (int i = 0; i < values.length; i++) {
stringBytes[i] = values[i].getBytes(UTF_8);
size += stringBytes[i].length;
}
public void putIntMV(int[] values) {
putBytes(ArraySerDeUtils.serializeIntArrayWithLength(values));
}

// Format : [numValues][length1][length2]...[lengthN][value1][value2]...[valueN]
byte[] serializedBytes = new byte[size];
ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
byteBuffer.putInt(values.length);
byteBuffer.position(headerSize);
for (int i = 0; i < values.length; i++) {
byteBuffer.putInt((i + 1) * Integer.BYTES, stringBytes[i].length);
byteBuffer.put(stringBytes[i]);
}
@Override
public void putLongMV(long[] values) {
putBytes(ArraySerDeUtils.serializeLongArrayWithLength(values));
}

putBytes(serializedBytes);
@Override
public void putFloatMV(float[] values) {
putBytes(ArraySerDeUtils.serializeFloatArrayWithLength(values));
}

@Override
public void putBytesMV(byte[][] values) {
// num values + length of each value
int headerSize = Integer.BYTES + Integer.BYTES * values.length;
int size = headerSize;
for (byte[] value : values) {
size += value.length;
}
public void putDoubleMV(double[] values) {
putBytes(ArraySerDeUtils.serializeDoubleArrayWithLength(values));
}

// Format : [numValues][length1][length2]...[lengthN][bytes1][bytes2]...[bytesN]
byte[] serializedBytes = new byte[size];
ByteBuffer byteBuffer = ByteBuffer.wrap(serializedBytes);
byteBuffer.putInt(values.length);
byteBuffer.position(headerSize);
for (int i = 0; i < values.length; i++) {
byteBuffer.putInt((i + 1) * Integer.BYTES, values[i].length);
byteBuffer.put(values[i]);
}
@Override
public void putStringMV(String[] values) {
putBytes(ArraySerDeUtils.serializeStringArray(values));
}

putBytes(serializedBytes);
@Override
public void putBytesMV(byte[][] values) {
putBytes(ArraySerDeUtils.serializeBytesArray(values));
}

private void writeHugeChunk(byte[] bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ public interface VarByteChunkWriter extends Closeable {

void putBytes(byte[] value);

void putIntMV(int[] values);

void putLongMV(long[] values);

void putFloatMV(float[] values);

void putDoubleMV(double[] values);

void putStringMV(String[] values);

void putBytesMV(byte[][] values);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
Expand Down Expand Up @@ -70,7 +69,6 @@ public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType c
ForwardIndexConfig.DEFAULT_TARGET_DOCS_PER_CHUNK);
}


public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType compressionType, int totalDocs,
DataType valueType, int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion,
int targetMaxChunkSizeBytes, int targetDocsPerChunk)
Expand Down Expand Up @@ -108,54 +106,22 @@ public DataType getValueType() {

@Override
public void putIntMV(int[] values) {
byte[] bytes = new byte[Integer.BYTES + values.length * Integer.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
//write the content of each element
for (int value : values) {
byteBuffer.putInt(value);
}
_indexWriter.putBytes(bytes);
_indexWriter.putIntMV(values);
}

@Override
public void putLongMV(long[] values) {
byte[] bytes = new byte[Integer.BYTES + values.length * Long.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
//write the content of each element
for (long value : values) {
byteBuffer.putLong(value);
}
_indexWriter.putBytes(bytes);
_indexWriter.putLongMV(values);
}

@Override
public void putFloatMV(float[] values) {
byte[] bytes = new byte[Integer.BYTES + values.length * Float.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
//write the content of each element
for (float value : values) {
byteBuffer.putFloat(value);
}
_indexWriter.putBytes(bytes);
_indexWriter.putFloatMV(values);
}

@Override
public void putDoubleMV(double[] values) {
byte[] bytes = new byte[Integer.BYTES + values.length * Double.BYTES];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
//write the content of each element
for (double value : values) {
byteBuffer.putDouble(value);
}
_indexWriter.putBytes(bytes);
_indexWriter.putDoubleMV(values);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec.DataType;

Expand Down Expand Up @@ -53,92 +54,47 @@ public ChunkReaderContext createContext() {

@Override
public int getIntMV(int docId, int[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getInt();
}
return numValues;
return ArraySerDeUtils.deserializeIntArrayWithLength(slice(docId, context), valueBuffer);
}

@Override
public int[] getIntMV(int docId, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
int[] valueBuffer = new int[numValues];
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getInt();
}
return valueBuffer;
return ArraySerDeUtils.deserializeIntArrayWithLength(slice(docId, context));
}

@Override
public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getLong();
}
return numValues;
return ArraySerDeUtils.deserializeLongArrayWithLength(slice(docId, context), valueBuffer);
}

@Override
public long[] getLongMV(int docId, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
long[] valueBuffer = new long[numValues];
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getLong();
}
return valueBuffer;
return ArraySerDeUtils.deserializeLongArrayWithLength(slice(docId, context));
}

@Override
public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getFloat();
}
return numValues;
return ArraySerDeUtils.deserializeFloatArrayWithLength(slice(docId, context), valueBuffer);
}

@Override
public float[] getFloatMV(int docId, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
float[] valueBuffer = new float[numValues];
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getFloat();
}
return valueBuffer;
return ArraySerDeUtils.deserializeFloatArrayWithLength(slice(docId, context));
}

@Override
public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getDouble();
}
return numValues;
return ArraySerDeUtils.deserializeDoubleArrayWithLength(slice(docId, context), valueBuffer);
}

@Override
public double[] getDoubleMV(int docId, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
int numValues = byteBuffer.getInt();
double[] valueBuffer = new double[numValues];
for (int i = 0; i < numValues; i++) {
valueBuffer[i] = byteBuffer.getDouble();
}
return valueBuffer;
return ArraySerDeUtils.deserializeDoubleArrayWithLength(slice(docId, context));
}

@Override
public int getNumValuesMV(int docId, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
return byteBuffer.getInt();
return slice(docId, context).getInt();
}

private ByteBuffer slice(int docId, ChunkReaderContext context) {
Expand Down
Loading

0 comments on commit b067ae1

Please sign in to comment.