diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java index e7bc30fc702..332c52d0c59 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV4.java @@ -76,7 +76,9 @@ public class VarByteChunkForwardIndexWriterV4 implements VarByteChunkWriter { public static final int VERSION = 4; - private static final Logger LOGGER = LoggerFactory.getLogger(VarByteChunkForwardIndexWriterV4.class); + // Use the run-time concrete class to retrieve the logger + protected final Logger _logger = LoggerFactory.getLogger(getClass()); + private static final String DATA_BUFFER_SUFFIX = ".buf"; private final File _dataBuffer; @@ -105,11 +107,16 @@ public VarByteChunkForwardIndexWriterV4(File file, ChunkCompressionType compress writeHeader(_chunkCompressor.compressionType(), chunkSize); } + // Child class must override this class instance method + protected int getVersion() { + return VERSION; + } + private void writeHeader(ChunkCompressionType compressionType, int targetDecompressedChunkSize) throws IOException { // keep metadata BE for backwards compatibility // (e.g. the version needs to be read by a factory which assumes BE) - _output.writeInt(VERSION); + _output.writeInt(getVersion()); _output.writeInt(targetDecompressedChunkSize); _output.writeInt(compressionType.getValue()); // reserve a slot to write the data offset into @@ -270,7 +277,7 @@ private void write(ByteBuffer buffer, boolean huge) { _chunkOffset += compressedSize; _docIdOffset = _nextDocId; } catch (IOException e) { - LOGGER.error("Exception caught while compressing/writing data chunk", e); + _logger.error("Exception caught while compressing/writing data chunk", e); throw new RuntimeException(e); } finally { if (mapped != null) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV5.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV5.java new file mode 100644 index 00000000000..b96812a0593 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/writer/impl/VarByteChunkForwardIndexWriterV5.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.io.writer.impl; + +import java.io.File; +import java.io.IOException; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; + + +/** + * Forward index writer that extends {@link VarByteChunkForwardIndexWriterV4} and overrides the data layout for + * multi-value fixed byte operations to improve space efficiency. + * + *

Consider the following multi-value document as an example: {@code [int(1), int(2), int(3)]}. + * The current binary data layout in {@code VarByteChunkForwardIndexWriterV4} is as follows:

+ *
+ *     0x00000010 0x00000003 0x00000001 0x00000002 0x00000003
+ * 
+ * + *
    + *
  1. The first 4 bytes ({@code 0x00000010}) represent the total payload length of the byte array + * containing the multi-value document content, which in this case is 16 bytes.
  2. + * + *
  3. The next 4 bytes ({@code 0x00000003}) represent the number of elements in the multi-value document (i.e., 3) + * .
  4. + * + *
  5. The remaining 12 bytes ({@code 0x00000001 0x00000002 0x00000003}) represent the 3 integer values of the + * multi-value document: 1, 2, and 3.
  6. + *
+ * + *

In Pinot, the fixed byte raw forward index can only store one specific fixed-length data type: + * {@code int}, {@code long}, {@code float}, or {@code double}. Instead of explicitly storing the number of elements + * for each document for multi-value document, this value can be inferred by:

+ *
+ *     number of elements = buffer payload length / size of data type
+ * 
+ * + *

If the forward index uses the passthrough chunk compression type (i.e., no compression), we can save + * 4 bytes per document by omitting the explicit element count. This leads to the following space savings:

+ * + * + * + *

For forward indexes that use compression to reduce data size, the savings can be even more significant + * in certain cases. This is demonstrated in the unit test {@link VarByteChunkV5Test#validateCompressionRatioIncrease}, + * where ZStandard was used as the chunk compressor. In the test, 1 million short multi-value (MV) documents + * were inserted, following a Gaussian distribution for document lengths. Additionally, the values of each integer + * in the MV documents were somewhat repetitive. Under these conditions, we observed a 50%+ reduction in on-disk + * file size compared to the V4 forward index writer version.

+ * + *

Note that the {@code VERSION} tag is a {@code static final} class variable set to {@code 5}. Since static + * variables are shadowed in the child class thus associated with the class that defines them, care must be taken to + * ensure that the parent class can correctly observe the child class's {@code VERSION} value at runtime. To handle + * this cleanly and correctly, the {@code getVersion()} method is overridden to return the concrete subclass's + * {@code VERSION} value, ensuring that the correct version number is returned even when using a reference to the + * parent class.

+ * + * @see VarByteChunkForwardIndexWriterV4 + * @see VarByteChunkForwardIndexWriterV5#getVersion() + */ +@NotThreadSafe +public class VarByteChunkForwardIndexWriterV5 extends VarByteChunkForwardIndexWriterV4 { + public static final int VERSION = 5; + + public VarByteChunkForwardIndexWriterV5(File file, ChunkCompressionType compressionType, int chunkSize) + throws IOException { + super(file, compressionType, chunkSize); + } + + // Override the parent class getVersion(); + @Override + public int getVersion() { + return VERSION; + } + + @Override + public void putIntMV(int[] values) { + putBytes(ArraySerDeUtils.serializeIntArrayWithoutLength(values)); + } + + @Override + public void putLongMV(long[] values) { + putBytes(ArraySerDeUtils.serializeLongArrayWithoutLength(values)); + } + + @Override + public void putFloatMV(float[] values) { + putBytes(ArraySerDeUtils.serializeFloatArrayWithoutLength(values)); + } + + @Override + public void putDoubleMV(double[] values) { + putBytes(ArraySerDeUtils.serializeDoubleArrayWithoutLength(values)); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java index e7877f8b606..b8a6bd6daaf 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriter; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkWriter; import org.apache.pinot.segment.spi.V1Constants.Indexes; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; @@ -73,18 +74,28 @@ public MultiValueFixedByteRawIndexCreator(File indexFile, ChunkCompressionType c DataType valueType, int maxNumberOfMultiValueElements, boolean deriveNumDocsPerChunk, int writerVersion, int targetMaxChunkSizeBytes, int targetDocsPerChunk) throws IOException { - // Store the length followed by the values - int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * valueType.getStoredType().size()); if (writerVersion < VarByteChunkForwardIndexWriterV4.VERSION) { + // Store the length followed by the values + int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * valueType.getStoredType().size()); int numDocsPerChunk = deriveNumDocsPerChunk ? Math.max(targetMaxChunkSizeBytes / (totalMaxLength + VarByteChunkForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) : targetDocsPerChunk; _indexWriter = new VarByteChunkForwardIndexWriter(indexFile, compressionType, totalDocs, numDocsPerChunk, totalMaxLength, writerVersion); } else { - int chunkSize = - ForwardIndexUtils.getDynamicTargetChunkSize(totalMaxLength, targetDocsPerChunk, targetMaxChunkSizeBytes); - _indexWriter = new VarByteChunkForwardIndexWriterV4(indexFile, compressionType, chunkSize); + if (writerVersion == VarByteChunkForwardIndexWriterV5.VERSION) { + // Store only the values + int totalMaxLength = maxNumberOfMultiValueElements * valueType.getStoredType().size(); + int chunkSize = + ForwardIndexUtils.getDynamicTargetChunkSize(totalMaxLength, targetDocsPerChunk, targetMaxChunkSizeBytes); + _indexWriter = new VarByteChunkForwardIndexWriterV5(indexFile, compressionType, chunkSize); + } else { + // Store the length followed by the values + int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * valueType.getStoredType().size()); + int chunkSize = + ForwardIndexUtils.getDynamicTargetChunkSize(totalMaxLength, targetDocsPerChunk, targetMaxChunkSizeBytes); + _indexWriter = new VarByteChunkForwardIndexWriterV4(indexFile, compressionType, chunkSize); + } } _valueType = valueType; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java index db815761d9e..cc7201ed985 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/forward/ForwardIndexReaderFactory.java @@ -21,6 +21,7 @@ import java.util.Arrays; import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5; import org.apache.pinot.segment.local.segment.creator.impl.fwd.CLPForwardIndexCreatorV1; import org.apache.pinot.segment.local.segment.index.readers.forward.CLPForwardIndexReaderV1; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVEntryDictForwardIndexReader; @@ -30,6 +31,7 @@ import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBytePower2ChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV4; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV5; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl; @@ -106,7 +108,10 @@ public static ForwardIndexReader createRawIndexReader(PinotDataBuffer dataBuffer : new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType); } - if (version == VarByteChunkForwardIndexWriterV4.VERSION) { + if (version == VarByteChunkForwardIndexWriterV5.VERSION) { + // V5 is the same as V4 except the multi-value docs have implicit value count rather than explicit + return new VarByteChunkForwardIndexReaderV5(dataBuffer, storedType, isSingleValue); + } else if (version == VarByteChunkForwardIndexWriterV4.VERSION) { // V4 reader is common for sv var byte, mv fixed byte and mv var byte return new VarByteChunkForwardIndexReaderV4(dataBuffer, storedType, isSingleValue); } else { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java index 277805d22c9..cf2a8b4de4d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV4.java @@ -67,8 +67,7 @@ public class VarByteChunkForwardIndexReaderV4 public VarByteChunkForwardIndexReaderV4(PinotDataBuffer dataBuffer, FieldSpec.DataType storedType, boolean isSingleValue) { - int version = dataBuffer.getInt(0); - Preconditions.checkState(version == VarByteChunkForwardIndexWriterV4.VERSION, "Illegal index version: %s", version); + validateIndexVersion(dataBuffer); _storedType = storedType; _targetDecompressedChunkSize = dataBuffer.getInt(4); _chunkCompressionType = ChunkCompressionType.valueOf(dataBuffer.getInt(8)); @@ -81,6 +80,15 @@ public VarByteChunkForwardIndexReaderV4(PinotDataBuffer dataBuffer, FieldSpec.Da _isSingleValue = isSingleValue; } + public void validateIndexVersion(PinotDataBuffer dataBuffer) { + int version = dataBuffer.getInt(0); + Preconditions.checkState(version == getVersion(), "Illegal index version: %s", version); + } + + public int getVersion() { + return VarByteChunkForwardIndexWriterV4.VERSION; + } + @Override public boolean isDictionaryEncoded() { return false; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV5.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV5.java new file mode 100644 index 00000000000..e72fedfc584 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkForwardIndexReaderV5.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.readers.forward; + +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV4; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5; +import org.apache.pinot.segment.local.utils.ArraySerDeUtils; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec; + + +/** + * Chunk-based raw (non-dictionary-encoded) forward index reader for values of SV variable length data types + * (BIG_DECIMAL, STRING, BYTES), MV fixed length and MV variable length data types. + *

For data layout, please refer to the documentation for {@link VarByteChunkForwardIndexWriterV4} + */ +public class VarByteChunkForwardIndexReaderV5 extends VarByteChunkForwardIndexReaderV4 { + public VarByteChunkForwardIndexReaderV5(PinotDataBuffer dataBuffer, FieldSpec.DataType storedType, + boolean isSingleValue) { + super(dataBuffer, storedType, isSingleValue); + } + + @Override + public int getVersion() { + return VarByteChunkForwardIndexWriterV5.VERSION; + } + + @Override + public int getIntMV(int docId, int[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + return ArraySerDeUtils.deserializeIntArrayWithoutLength(context.getValue(docId), valueBuffer); + } + + @Override + public int[] getIntMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + return ArraySerDeUtils.deserializeIntArrayWithoutLength(context.getValue(docId)); + } + + @Override + public int getLongMV(int docId, long[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + return ArraySerDeUtils.deserializeLongArrayWithoutLength(context.getValue(docId), valueBuffer); + } + + @Override + public long[] getLongMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + return ArraySerDeUtils.deserializeLongArrayWithoutLength(context.getValue(docId)); + } + + @Override + public int getFloatMV(int docId, float[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + return ArraySerDeUtils.deserializeFloatArrayWithoutLength(context.getValue(docId), valueBuffer); + } + + @Override + public float[] getFloatMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + return ArraySerDeUtils.deserializeFloatArrayWithoutLength(context.getValue(docId)); + } + + @Override + public int getDoubleMV(int docId, double[] valueBuffer, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + return ArraySerDeUtils.deserializeDoubleArrayWithoutLength(context.getValue(docId), valueBuffer); + } + + @Override + public double[] getDoubleMV(int docId, VarByteChunkForwardIndexReaderV4.ReaderContext context) { + return ArraySerDeUtils.deserializeDoubleArrayWithoutLength(context.getValue(docId)); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java index 33c8525a42d..9a2105726aa 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java @@ -49,10 +49,9 @@ public class MultiValueFixedByteRawIndexCreatorTest { - private static final String OUTPUT_DIR = - System.getProperty("java.io.tmpdir") + File.separator + "mvFixedRawTest"; + protected static String _outputDir; - private static final Random RANDOM = new Random(); + protected static final Random RANDOM = new Random(); @DataProvider(name = "compressionTypes") public Object[][] compressionTypes() { @@ -64,7 +63,8 @@ public Object[][] compressionTypes() { @BeforeClass public void setup() throws Exception { - FileUtils.forceMkdir(new File(OUTPUT_DIR)); + _outputDir = System.getProperty("java.io.tmpdir") + File.separator + "mvFixedRawTest"; + FileUtils.forceMkdir(new File(_outputDir)); } /** @@ -72,7 +72,7 @@ public void setup() */ @AfterClass public void cleanup() { - FileUtils.deleteQuietly(new File(OUTPUT_DIR)); + FileUtils.deleteQuietly(new File(_outputDir)); } @Test(dataProvider = "compressionTypes") @@ -147,25 +147,34 @@ public void testMVDouble(ChunkCompressionType compressionType, int writerVersion }, compressionType, writerVersion); } + public MultiValueFixedByteRawIndexCreator getMultiValueFixedByteRawIndexCreator(ChunkCompressionType compressionType, + String column, int numDocs, DataType dataType, int maxElements, int writerVersion) + throws IOException { + return new MultiValueFixedByteRawIndexCreator(new File(_outputDir), compressionType, column, numDocs, dataType, + maxElements, false, writerVersion, 1024 * 1024, 1000); + } + + public ForwardIndexReader getForwardIndexReader(PinotDataBuffer buffer, DataType dataType, int writerVersion) { + return writerVersion == VarByteChunkForwardIndexWriterV4.VERSION ? new VarByteChunkForwardIndexReaderV4(buffer, + dataType.getStoredType(), false) : new FixedByteChunkMVForwardIndexReader(buffer, dataType.getStoredType()); + } + public void testMV(DataType dataType, List inputs, ToIntFunction sizeof, IntFunction constructor, Injector injector, Extractor extractor, ChunkCompressionType compressionType, int writerVersion) throws IOException { String column = "testCol_" + dataType; int numDocs = inputs.size(); int maxElements = inputs.stream().mapToInt(sizeof).max().orElseThrow(RuntimeException::new); - File file = new File(OUTPUT_DIR, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); + File file = new File(_outputDir, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION); file.delete(); MultiValueFixedByteRawIndexCreator creator = - new MultiValueFixedByteRawIndexCreator(new File(OUTPUT_DIR), compressionType, column, numDocs, dataType, - maxElements, false, writerVersion, 1024 * 1024, 1000); + getMultiValueFixedByteRawIndexCreator(compressionType, column, numDocs, dataType, maxElements, writerVersion); inputs.forEach(input -> injector.inject(creator, input)); creator.close(); //read final PinotDataBuffer buffer = PinotDataBuffer.mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, ""); - ForwardIndexReader reader = - writerVersion == VarByteChunkForwardIndexWriterV4.VERSION ? new VarByteChunkForwardIndexReaderV4(buffer, - dataType.getStoredType(), false) : new FixedByteChunkMVForwardIndexReader(buffer, dataType.getStoredType()); + ForwardIndexReader reader = getForwardIndexReader(buffer, dataType, writerVersion); final ForwardIndexReaderContext context = reader.createContext(); T valueBuffer = constructor.apply(maxElements); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java index 70313d91e70..8387c93dc83 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV4Test.java @@ -193,7 +193,7 @@ private void testWriteRead(File file, ChunkCompressionType compressionType, } } - private Stream randomStrings(int count, int lengthOfLongestEntry) { + protected Stream randomStrings(int count, int lengthOfLongestEntry) { return IntStream.range(0, count) .mapToObj(i -> { int length = ThreadLocalRandom.current().nextInt(lengthOfLongestEntry); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV5Test.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV5Test.java new file mode 100644 index 00000000000..c5bbc75c276 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/VarByteChunkV5Test.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.creator; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkForwardIndexWriterV5; +import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueFixedByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkForwardIndexReaderV5; +import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + + +public class VarByteChunkV5Test extends VarByteChunkV4Test { + private static final Random RANDOM = new Random(); + private static File[] _dirs; + + @DataProvider(parallel = true) + public Object[][] params() { + Object[][] params = new Object[][]{ + {null, ChunkCompressionType.LZ4, 20, 1024}, + {null, ChunkCompressionType.LZ4_LENGTH_PREFIXED, 20, 1024}, + {null, ChunkCompressionType.PASS_THROUGH, 20, 1024}, + {null, ChunkCompressionType.SNAPPY, 20, 1024}, + {null, ChunkCompressionType.ZSTANDARD, 20, 1024}, + {null, ChunkCompressionType.LZ4, 2048, 1024}, + {null, ChunkCompressionType.LZ4_LENGTH_PREFIXED, 2048, 1024}, + {null, ChunkCompressionType.PASS_THROUGH, 2048, 1024}, + {null, ChunkCompressionType.SNAPPY, 2048, 1024}, + {null, ChunkCompressionType.ZSTANDARD, 2048, 1024} + }; + + for (int i = 0; i < _dirs.length; i++) { + params[i][0] = _dirs[i]; + } + + return params; + } + + @BeforeClass + public void forceMkDirs() + throws IOException { + _dirs = new File[10]; + for (int i = 0; i < _dirs.length; i++) { + _dirs[i] = new File(new File(FileUtils.getTempDirectory(), UUID.randomUUID().toString()), "VarByteChunkV5Test"); + FileUtils.forceMkdir(_dirs[i]); + } + } + + @AfterClass + public void deleteDirs() { + for (File dir : _dirs) { + FileUtils.deleteQuietly(dir); + } + } + + @Test(dataProvider = "params") + public void testStringSV(File file, ChunkCompressionType compressionType, int longestEntry, int chunkSize) + throws IOException { + File stringSVFile = new File(file, "testStringSV"); + testWriteRead(stringSVFile, compressionType, longestEntry, chunkSize, FieldSpec.DataType.STRING, x -> x, + VarByteChunkForwardIndexWriterV5::putString, (reader, context, docId) -> reader.getString(docId, context)); + FileUtils.deleteQuietly(stringSVFile); + } + + @Test(dataProvider = "params") + public void testBytesSV(File file, ChunkCompressionType compressionType, int longestEntry, int chunkSize) + throws IOException { + File bytesSVFile = new File(file, "testBytesSV"); + testWriteRead(bytesSVFile, compressionType, longestEntry, chunkSize, FieldSpec.DataType.BYTES, + x -> x.getBytes(StandardCharsets.UTF_8), VarByteChunkForwardIndexWriterV5::putBytes, + (reader, context, docId) -> reader.getBytes(docId, context)); + FileUtils.deleteQuietly(bytesSVFile); + } + + @Test(dataProvider = "params") + public void testStringMV(File file, ChunkCompressionType compressionType, int longestEntry, int chunkSize) + throws IOException { + File stringMVFile = new File(file, "testStringMV"); + testWriteRead(stringMVFile, compressionType, longestEntry, chunkSize, FieldSpec.DataType.STRING, + new StringSplitterMV(), VarByteChunkForwardIndexWriterV5::putStringMV, + (reader, context, docId) -> reader.getStringMV(docId, context)); + FileUtils.deleteQuietly(stringMVFile); + } + + @Test(dataProvider = "params") + public void testBytesMV(File file, ChunkCompressionType compressionType, int longestEntry, int chunkSize) + throws IOException { + File bytesMVFile = new File(file, "testBytesMV"); + testWriteRead(bytesMVFile, compressionType, longestEntry, chunkSize, FieldSpec.DataType.BYTES, new ByteSplitterMV(), + VarByteChunkForwardIndexWriterV5::putBytesMV, (reader, context, docId) -> reader.getBytesMV(docId, context)); + FileUtils.deleteQuietly(bytesMVFile); + } + + @Test + public void validateCompressionRatioIncrease() + throws IOException { + // Generate input data containing short MV docs with somewhat repetitive data + int numDocs = 1000000; + int numElements = 0; + int maxMVRowSize = 0; + List inputData = new ArrayList<>(numDocs); + for (int i = 0; i < numDocs; i++) { + long[] mvRow = new long[Math.abs((int) Math.floor(RANDOM.nextGaussian()))]; + maxMVRowSize = Math.max(maxMVRowSize, mvRow.length); + numElements += mvRow.length; + for (int j = 0; j < mvRow.length; j++, numElements++) { + mvRow[j] = numElements % 10; + } + inputData.add(mvRow); + } + + for (int i = 0; i < _dirs.length; i++) { + _dirs[i] = new File(new File(FileUtils.getTempDirectory(), UUID.randomUUID().toString()), "VarByteChunkV5Test"); + FileUtils.forceMkdir(_dirs[i]); + } + + // Generate MV fixed byte raw fwd index with explicit length + int rawIndexVersionV4 = 4; + File explicitLengthFwdIndexFile = new File(FileUtils.getTempDirectory(), Integer.toString(rawIndexVersionV4)); + FileUtils.deleteQuietly(explicitLengthFwdIndexFile); + try (MultiValueFixedByteRawIndexCreator creator = new MultiValueFixedByteRawIndexCreator(explicitLengthFwdIndexFile, + ChunkCompressionType.ZSTANDARD, numDocs, FieldSpec.DataType.LONG, numElements, true, rawIndexVersionV4)) { + for (long[] mvRow : inputData) { + creator.putLongMV(mvRow); + } + } + + // Generate MV fixed byte raw fwd index with implicit length + int rawIndexVersionV5 = 5; + File implicitLengthFwdIndexFile = new File(FileUtils.getTempDirectory(), Integer.toString(rawIndexVersionV5)); + FileUtils.deleteQuietly(implicitLengthFwdIndexFile); + try (MultiValueFixedByteRawIndexCreator creator = new MultiValueFixedByteRawIndexCreator(implicitLengthFwdIndexFile, + ChunkCompressionType.ZSTANDARD, numDocs, FieldSpec.DataType.LONG, numElements, true, rawIndexVersionV5)) { + for (long[] mvRow : inputData) { + creator.putLongMV(mvRow); + } + } + + // For the input data, the explicit length compressed MV fixed byte raw forward index is expected to be at least + // 2x larger size in explicit length variant in V4 compared to the new implicit length variant in V5 + long expectedImplicitLengthFwdIndexMaxSize = Math.round(implicitLengthFwdIndexFile.length() * 2.0d); + Assert.assertTrue(expectedImplicitLengthFwdIndexMaxSize < explicitLengthFwdIndexFile.length()); + + // Cleanup + FileUtils.deleteQuietly(explicitLengthFwdIndexFile); + FileUtils.deleteQuietly(implicitLengthFwdIndexFile); + } + + private void testWriteRead(File file, ChunkCompressionType compressionType, int longestEntry, int chunkSize, + FieldSpec.DataType dataType, Function forwardMapper, + BiConsumer write, Read read) + throws IOException { + List values = randomStrings(1000, longestEntry).map(forwardMapper).collect(Collectors.toList()); + try (VarByteChunkForwardIndexWriterV5 writer = new VarByteChunkForwardIndexWriterV5(file, compressionType, + chunkSize)) { + for (T value : values) { + write.accept(writer, value); + } + } + try (PinotDataBuffer buffer = PinotDataBuffer.mapReadOnlyBigEndianFile(file)) { + try (VarByteChunkForwardIndexReaderV5 reader = new VarByteChunkForwardIndexReaderV5(buffer, dataType, true); + VarByteChunkForwardIndexReaderV5.ReaderContext context = reader.createContext()) { + for (int i = 0; i < values.size(); i++) { + assertEquals(read.read(reader, context, i), values.get(i)); + } + for (int i = 0; i < values.size(); i += 2) { + assertEquals(read.read(reader, context, i), values.get(i)); + } + for (int i = 1; i < values.size(); i += 2) { + assertEquals(read.read(reader, context, i), values.get(i)); + } + for (int i = 1; i < values.size(); i += 100) { + assertEquals(read.read(reader, context, i), values.get(i)); + } + for (int i = values.size() - 1; i >= 0; i--) { + assertEquals(read.read(reader, context, i), values.get(i)); + } + for (int i = values.size() - 1; i >= 0; i -= 2) { + assertEquals(read.read(reader, context, i), values.get(i)); + } + for (int i = values.size() - 2; i >= 0; i -= 2) { + assertEquals(read.read(reader, context, i), values.get(i)); + } + for (int i = values.size() - 1; i >= 0; i -= 100) { + assertEquals(read.read(reader, context, i), values.get(i)); + } + } + } + } + + @FunctionalInterface + interface Read { + T read(VarByteChunkForwardIndexReaderV5 reader, VarByteChunkForwardIndexReaderV5.ReaderContext context, int docId); + } +}