Skip to content

Commit

Permalink
Introduce raw fwd index version V5 containing implicit num doc length…
Browse files Browse the repository at this point in the history
…, improving space efficiency (#14105)
  • Loading branch information
jackluo923 authored Oct 17, 2024
1 parent b556b37 commit ad37bd8
Show file tree
Hide file tree
Showing 9 changed files with 492 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@
public class VarByteChunkForwardIndexWriterV4 implements VarByteChunkWriter {
public static final int VERSION = 4;

private static final Logger LOGGER = LoggerFactory.getLogger(VarByteChunkForwardIndexWriterV4.class);
// Use the run-time concrete class to retrieve the logger
protected final Logger _logger = LoggerFactory.getLogger(getClass());

private static final String DATA_BUFFER_SUFFIX = ".buf";

private final File _dataBuffer;
Expand Down Expand Up @@ -105,11 +107,16 @@ public VarByteChunkForwardIndexWriterV4(File file, ChunkCompressionType compress
writeHeader(_chunkCompressor.compressionType(), chunkSize);
}

// Child class must override this class instance method
protected int getVersion() {
return VERSION;
}

private void writeHeader(ChunkCompressionType compressionType, int targetDecompressedChunkSize)
throws IOException {
// keep metadata BE for backwards compatibility
// (e.g. the version needs to be read by a factory which assumes BE)
_output.writeInt(VERSION);
_output.writeInt(getVersion());
_output.writeInt(targetDecompressedChunkSize);
_output.writeInt(compressionType.getValue());
// reserve a slot to write the data offset into
Expand Down Expand Up @@ -270,7 +277,7 @@ private void write(ByteBuffer buffer, boolean huge) {
_chunkOffset += compressedSize;
_docIdOffset = _nextDocId;
} catch (IOException e) {
LOGGER.error("Exception caught while compressing/writing data chunk", e);
_logger.error("Exception caught while compressing/writing data chunk", e);
throw new RuntimeException(e);
} finally {
if (mapped != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.writer.impl;

import java.io.File;
import java.io.IOException;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;


/**
* Forward index writer that extends {@link VarByteChunkForwardIndexWriterV4} and overrides the data layout for
* multi-value fixed byte operations to improve space efficiency.
*
* <p>Consider the following multi-value document as an example: {@code [int(1), int(2), int(3)]}.
* The current binary data layout in {@code VarByteChunkForwardIndexWriterV4} is as follows:</p>
* <pre>
* 0x00000010 0x00000003 0x00000001 0x00000002 0x00000003
* </pre>
*
* <ol>
* <li>The first 4 bytes ({@code 0x00000010}) represent the total payload length of the byte array
* containing the multi-value document content, which in this case is 16 bytes.</li>
*
* <li>The next 4 bytes ({@code 0x00000003}) represent the number of elements in the multi-value document (i.e., 3)
* .</li>
*
* <li>The remaining 12 bytes ({@code 0x00000001 0x00000002 0x00000003}) represent the 3 integer values of the
* multi-value document: 1, 2, and 3.</li>
* </ol>
*
* <p>In Pinot, the fixed byte raw forward index can only store one specific fixed-length data type:
* {@code int}, {@code long}, {@code float}, or {@code double}. Instead of explicitly storing the number of elements
* for each document for multi-value document, this value can be inferred by:</p>
* <pre>
* number of elements = buffer payload length / size of data type
* </pre>
*
* <p>If the forward index uses the passthrough chunk compression type (i.e., no compression), we can save
* 4 bytes per document by omitting the explicit element count. This leads to the following space savings:</p>
*
* <ul>
* <li>For documents with 0 elements, we save 50%.</li>
* <li>For documents with 1 element, we save 33%.</li>
* <li>For documents with 2 elements, we save 25%.</li>
* <li>As the number of elements increases, the percentage of space saved decreases.</li>
* </ul>
*
* <p>For forward indexes that use compression to reduce data size, the savings can be even more significant
* in certain cases. This is demonstrated in the unit test {@link VarByteChunkV5Test#validateCompressionRatioIncrease},
* where ZStandard was used as the chunk compressor. In the test, 1 million short multi-value (MV) documents
* were inserted, following a Gaussian distribution for document lengths. Additionally, the values of each integer
* in the MV documents were somewhat repetitive. Under these conditions, we observed a 50%+ reduction in on-disk
* file size compared to the V4 forward index writer version.</p>
*
* <p>Note that the {@code VERSION} tag is a {@code static final} class variable set to {@code 5}. Since static
* variables are shadowed in the child class thus associated with the class that defines them, care must be taken to
* ensure that the parent class can correctly observe the child class's {@code VERSION} value at runtime. To handle
* this cleanly and correctly, the {@code getVersion()} method is overridden to return the concrete subclass's
* {@code VERSION} value, ensuring that the correct version number is returned even when using a reference to the
* parent class.</p>
*
* @see VarByteChunkForwardIndexWriterV4
* @see VarByteChunkForwardIndexWriterV5#getVersion()
*/
@NotThreadSafe
public class VarByteChunkForwardIndexWriterV5 extends VarByteChunkForwardIndexWriterV4 {
public static final int VERSION = 5;

public VarByteChunkForwardIndexWriterV5(File file, ChunkCompressionType compressionType, int chunkSize)
throws IOException {
super(file, compressionType, chunkSize);
}

// Override the parent class getVersion();
@Override
public int getVersion() {
return VERSION;
}

@Override
public void putIntMV(int[] values) {
putBytes(ArraySerDeUtils.serializeIntArrayWithoutLength(values));
}

@Override
public void putLongMV(long[] values) {
putBytes(ArraySerDeUtils.serializeLongArrayWithoutLength(values));
}

@Override
public void putFloatMV(float[] values) {
putBytes(ArraySerDeUtils.serializeFloatArrayWithoutLength(values));
}

@Override
public void putDoubleMV(double[] values) {
putBytes(ArraySerDeUtils.serializeDoubleArrayWithoutLength(values));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter;
import org.apache.pinot.segment.spi.V1Constants.Indexes;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
Expand Down Expand Up @@ -73,18 +74,28 @@ public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType c
DataType valueType, int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion,
int targetMaxChunkSizeBytes, int targetDocsPerChunk)
throws IOException {
// Store the length followed by the values
int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * valueType.getStoredType().size());
if (writerVersion < VarByteChunkForwardIndexWriterV4.VERSION) {
// Store the length followed by the values
int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * valueType.getStoredType().size());
int numDocsPerChunk = deriveNumDocsPerChunk ? Math.max(targetMaxChunkSizeBytes / (totalMaxLength
+ VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) : targetDocsPerChunk;
_indexWriter =
new VarByteChunkForwardIndexWriter(indexFile, compressionType, totalDocs, numDocsPerChunk, totalMaxLength,
writerVersion);
} else {
int chunkSize =
ForwardIndexUtils.getDynamicTargetChunkSize(totalMaxLength, targetDocsPerChunk, targetMaxChunkSizeBytes);
_indexWriter = new VarByteChunkForwardIndexWriterV4(indexFile, compressionType, chunkSize);
if (writerVersion == VarByteChunkForwardIndexWriterV5.VERSION) {
// Store only the values
int totalMaxLength = maxNumberOfMultiValueElements * valueType.getStoredType().size();
int chunkSize =
ForwardIndexUtils.getDynamicTargetChunkSize(totalMaxLength, targetDocsPerChunk, targetMaxChunkSizeBytes);
_indexWriter = new VarByteChunkForwardIndexWriterV5(indexFile, compressionType, chunkSize);
} else {
// Store the length followed by the values
int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * valueType.getStoredType().size());
int chunkSize =
ForwardIndexUtils.getDynamicTargetChunkSize(totalMaxLength, targetDocsPerChunk, targetMaxChunkSizeBytes);
_indexWriter = new VarByteChunkForwardIndexWriterV4(indexFile, compressionType, chunkSize);
}
}
_valueType = valueType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Arrays;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1;
import org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader;
Expand All @@ -30,6 +31,7 @@
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV5;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
Expand Down Expand Up @@ -106,7 +108,10 @@ public static ForwardIndexReader createRawIndexReader(PinotDataBuffer dataBuffer
: new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType);
}

if (version == VarByteChunkForwardIndexWriterV4.VERSION) {
if (version == VarByteChunkForwardIndexWriterV5.VERSION) {
// V5 is the same as V4 except the multi-value docs have implicit value count rather than explicit
return new VarByteChunkForwardIndexReaderV5(dataBuffer, storedType, isSingleValue);
} else if (version == VarByteChunkForwardIndexWriterV4.VERSION) {
// V4 reader is common for sv var byte, mv fixed byte and mv var byte
return new VarByteChunkForwardIndexReaderV4(dataBuffer, storedType, isSingleValue);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ public class VarByteChunkForwardIndexReaderV4

public VarByteChunkForwardIndexReaderV4(PinotDataBuffer dataBuffer, FieldSpec.DataType storedType,
boolean isSingleValue) {
int version = dataBuffer.getInt(0);
Preconditions.checkState(version == VarByteChunkForwardIndexWriterV4.VERSION, "Illegal index version: %s", version);
validateIndexVersion(dataBuffer);
_storedType = storedType;
_targetDecompressedChunkSize = dataBuffer.getInt(4);
_chunkCompressionType = ChunkCompressionType.valueOf(dataBuffer.getInt(8));
Expand All @@ -81,6 +80,15 @@ public VarByteChunkForwardIndexReaderV4(PinotDataBuffer dataBuffer, FieldSpec.Da
_isSingleValue = isSingleValue;
}

public void validateIndexVersion(PinotDataBuffer dataBuffer) {
int version = dataBuffer.getInt(0);
Preconditions.checkState(version == getVersion(), "Illegal index version: %s", version);
}

public int getVersion() {
return VarByteChunkForwardIndexWriterV4.VERSION;
}

@Override
public boolean isDictionaryEncoded() {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.segment.index.readers.forward;

import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5;
import org.apache.pinot.segment.local.utils.ArraySerDeUtils;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.spi.data.FieldSpec;


/**
* Chunk-based raw (non-dictionary-encoded) forward index reader for values of SV variable length data types
* (BIG_DECIMAL, STRING, BYTES), MV fixed length and MV variable length data types.
* <p>For data layout, please refer to the documentation for {@link VarByteChunkForwardIndexWriterV4}
*/
public class VarByteChunkForwardIndexReaderV5 extends VarByteChunkForwardIndexReaderV4 {
public VarByteChunkForwardIndexReaderV5(PinotDataBuffer dataBuffer, FieldSpec.DataType storedType,
boolean isSingleValue) {
super(dataBuffer, storedType, isSingleValue);
}

@Override
public int getVersion() {
return VarByteChunkForwardIndexWriterV5.VERSION;
}

@Override
public int getIntMV(int docId, int[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) {
return ArraySerDeUtils.deserializeIntArrayWithoutLength(context.getValue(docId), valueBuffer);
}

@Override
public int[] getIntMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) {
return ArraySerDeUtils.deserializeIntArrayWithoutLength(context.getValue(docId));
}

@Override
public int getLongMV(int docId, long[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) {
return ArraySerDeUtils.deserializeLongArrayWithoutLength(context.getValue(docId), valueBuffer);
}

@Override
public long[] getLongMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) {
return ArraySerDeUtils.deserializeLongArrayWithoutLength(context.getValue(docId));
}

@Override
public int getFloatMV(int docId, float[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) {
return ArraySerDeUtils.deserializeFloatArrayWithoutLength(context.getValue(docId), valueBuffer);
}

@Override
public float[] getFloatMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) {
return ArraySerDeUtils.deserializeFloatArrayWithoutLength(context.getValue(docId));
}

@Override
public int getDoubleMV(int docId, double[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) {
return ArraySerDeUtils.deserializeDoubleArrayWithoutLength(context.getValue(docId), valueBuffer);
}

@Override
public double[] getDoubleMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) {
return ArraySerDeUtils.deserializeDoubleArrayWithoutLength(context.getValue(docId));
}
}
Loading

0 comments on commit ad37bd8

Please sign in to comment.