Skip to content

Commit

Permalink
int96 parity with native parquet reader (#12496)
Browse files Browse the repository at this point in the history
  • Loading branch information
swaminathanmanish authored Feb 27, 2024
1 parent 8977c85 commit 3267a74
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public GenericRow extract(GenericRecord from, GenericRow to) {
value = AvroSchemaUtil.applyLogicalType(field, value);
}
if (value != null) {
value = convert(value);
value = transformValue(value, field);
}
to.putValue(fieldName, value);
}
Expand All @@ -80,14 +80,18 @@ public GenericRow extract(GenericRecord from, GenericRow to) {
value = AvroSchemaUtil.applyLogicalType(field, value);
}
if (value != null) {
value = convert(value);
value = transformValue(value, field);
}
to.putValue(fieldName, value);
}
}
return to;
}

protected Object transformValue(Object value, Schema.Field field) {
return convert(value);
}

/**
* Returns whether the object is an Avro GenericRecord.
*/
Expand Down Expand Up @@ -116,7 +120,7 @@ protected Object convertRecord(Object value) {
String fieldName = field.name();
Object fieldValue = record.get(fieldName);
if (fieldValue != null) {
fieldValue = convert(fieldValue);
fieldValue = transformValue(fieldValue, field);
}
convertedMap.put(fieldName, fieldValue);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.inputformat.parquet;

import java.util.Set;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.spi.data.readers.RecordExtractorConfig;


public class ParquetAvroRecordExtractor extends AvroRecordExtractor {

@Override
public void init(@Nullable Set<String> fields, @Nullable RecordExtractorConfig recordExtractorConfig) {
super.init(fields, recordExtractorConfig);
}

@Override
protected Object transformValue(Object value, Schema.Field field) {
return handleDeprecatedTypes(convert(value), field);
}

Object handleDeprecatedTypes(Object value, Schema.Field field) {
Schema.Type avroColumnType = field.schema().getType();
if (avroColumnType == org.apache.avro.Schema.Type.UNION) {
org.apache.avro.Schema nonNullSchema = null;
for (org.apache.avro.Schema childFieldSchema : field.schema().getTypes()) {
if (childFieldSchema.getType() != org.apache.avro.Schema.Type.NULL) {
if (nonNullSchema == null) {
nonNullSchema = childFieldSchema;
} else {
throw new IllegalStateException("More than one non-null schema in UNION schema");
}
}
}

//INT96 is deprecated. We convert to long as we do in the native parquet extractor.
if (nonNullSchema.getName().equals(PrimitiveType.PrimitiveTypeName.INT96.name())) {
return ParquetNativeRecordExtractor.convertInt96ToLong((byte[]) value);
}
}
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.pinot.plugin.inputformat.avro.AvroRecordExtractor;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;


/**
* Avro Record reader for Parquet file. This reader doesn't read parquet file with incompatible Avro schemas,
* e.g. INT96, DECIMAL. Please use {@link org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader}
Expand All @@ -44,7 +42,7 @@ public class ParquetAvroRecordReader implements RecordReader {
private static final String EXTENSION = "parquet";

private Path _dataFilePath;
private AvroRecordExtractor _recordExtractor;
private ParquetAvroRecordExtractor _recordExtractor;
private ParquetReader<GenericRecord> _parquetReader;
private GenericRecord _nextRecord;

Expand All @@ -54,7 +52,7 @@ public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable Re
File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
_dataFilePath = new Path(parquetFile.getAbsolutePath());
_parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
_recordExtractor = new AvroRecordExtractor();
_recordExtractor = new ParquetAvroRecordExtractor();
_recordExtractor.init(fieldsToRead, null);
_nextRecord = _parquetReader.read();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ private Object extractValue(Group from, int fieldIndex, Type fieldType, int inde
return from.getValueToString(fieldIndex, index);
case INT96:
Binary int96 = from.getInt96(fieldIndex, index);
ByteBuffer buf = ByteBuffer.wrap(int96.getBytes()).order(ByteOrder.LITTLE_ENDIAN);
return (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+ buf.getLong(0) / NANOS_PER_MILLISECOND;
return convertInt96ToLong(int96.getBytes());
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
Expand Down Expand Up @@ -204,6 +202,12 @@ private Object extractValue(Group from, int fieldIndex, Type fieldType, int inde
return null;
}

public static long convertInt96ToLong(byte[] int96Bytes) {
ByteBuffer buf = ByteBuffer.wrap(int96Bytes).order(ByteOrder.LITTLE_ENDIAN);
return (buf.getInt(8) - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * DateTimeConstants.MILLIS_PER_DAY
+ buf.getLong(0) / NANOS_PER_MILLISECOND;
}

public Object[] extractList(Group group) {
int repFieldCount = group.getType().getFieldCount();
if (repFieldCount < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,28 +125,20 @@ public void testFileMetadataParsing()
@Test
public void testComparison()
throws IOException {
testComparison(_dataFile, SAMPLE_RECORDS_SIZE, false);
testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1, false);
testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667,
false);
testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870,
false);
testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889,
false);
testComparison(new File(getClass().getClassLoader().getResource("baseballStats.zstd.parquet").getFile()), 97889,
false);
testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000,
false);
testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443,
false);
testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492,
false);
testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000,
false);
testComparison(new File(getClass().getClassLoader().getResource("int96AvroParquet.parquet").getFile()), 1, true);
testComparison(_dataFile, SAMPLE_RECORDS_SIZE);
testComparison(new File(getClass().getClassLoader().getResource("users.parquet").getFile()), 1);
testComparison(new File(getClass().getClassLoader().getResource("test-comparison.gz.parquet").getFile()), 363667);
testComparison(new File(getClass().getClassLoader().getResource("test-comparison.snappy.parquet").getFile()), 2870);
testComparison(new File(getClass().getClassLoader().getResource("baseballStats.snappy.parquet").getFile()), 97889);
testComparison(new File(getClass().getClassLoader().getResource("baseballStats.zstd.parquet").getFile()), 97889);
testComparison(new File(getClass().getClassLoader().getResource("githubEvents.snappy.parquet").getFile()), 10000);
testComparison(new File(getClass().getClassLoader().getResource("starbucksStores.snappy.parquet").getFile()), 6443);
testComparison(new File(getClass().getClassLoader().getResource("airlineStats.snappy.parquet").getFile()), 19492);
testComparison(new File(getClass().getClassLoader().getResource("githubActivities.gz.parquet").getFile()), 2000);
testComparison(new File(getClass().getClassLoader().getResource("int96AvroParquet.parquet").getFile()), 1);
}

private void testComparison(File dataFile, int totalRecords, boolean skipIndividualRecordComparison)
private void testComparison(File dataFile, int totalRecords)
throws IOException {
final ParquetRecordReader avroRecordReader = new ParquetRecordReader();
ParquetRecordReaderConfig avroRecordReaderConfig = new ParquetRecordReaderConfig();
Expand All @@ -159,14 +151,14 @@ private void testComparison(File dataFile, int totalRecords, boolean skipIndivid
Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());

testComparison(avroRecordReader, nativeRecordReader, totalRecords, skipIndividualRecordComparison);
testComparison(avroRecordReader, nativeRecordReader, totalRecords);
avroRecordReader.rewind();
nativeRecordReader.rewind();
testComparison(avroRecordReader, nativeRecordReader, totalRecords, skipIndividualRecordComparison);
testComparison(avroRecordReader, nativeRecordReader, totalRecords);
}

private void testComparison(ParquetRecordReader avroRecordReader, ParquetRecordReader nativeRecordReader,
int totalRecords, boolean skipIndividualRecordComparison)
int totalRecords)
throws IOException {
GenericRow avroReuse = new GenericRow();
GenericRow nativeReuse = new GenericRow();
Expand All @@ -175,10 +167,8 @@ private void testComparison(ParquetRecordReader avroRecordReader, ParquetRecordR
Assert.assertTrue(nativeRecordReader.hasNext());
final GenericRow avroReaderRow = avroRecordReader.next(avroReuse);
final GenericRow nativeReaderRow = nativeRecordReader.next(nativeReuse);
if (!skipIndividualRecordComparison) {
Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
}
Assert.assertEquals(nativeReaderRow.toString(), avroReaderRow.toString());
Assert.assertTrue(avroReaderRow.equals(nativeReaderRow));
recordsRead++;
}
Assert.assertFalse(nativeRecordReader.hasNext());
Expand Down

0 comments on commit 3267a74

Please sign in to comment.