Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added serde customization support in Fast-Avro #520

Merged
merged 3 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#gradle artifacts
.gradle/

**/build/*
!**/avro-fastserde-tests111/build/codegen

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.generated.avro.TestEnum;
import com.linkedin.avro.fastserde.generated.avro.TestRecord;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -81,7 +87,7 @@ public void shouldNotCreateSpecificDatumReader() throws IOException, Interrupted
(FastDeserializer<TestRecord>) cache.getFastSpecificDeserializer(TestRecord.SCHEMA$, faultySchema);

Assert.assertNotNull(fastSpecificDeserializer);
Assert.assertEquals(fastSpecificDeserializer.getClass().getDeclaredMethods().length, 1);
Assert.assertFalse(fastSpecificDeserializer.isBackedByGeneratedClass());
}

@Test(groups = {"deserializationTest"})
Expand Down Expand Up @@ -134,4 +140,60 @@ public void shouldCreateGenericDatumReader() throws IOException {
Assert.assertTrue(fastGenericDatumReader.isFastDeserializerUsed(), "FastGenericDatumReader should be using"
+ " Fast Deserializer when the fast deserializer generation is done.");
}

@Test(groups = {"deserializationTest"})
public void testDatumReaderWithCustomization() throws IOException, ExecutionException, InterruptedException {
Schema recordSchema = createRecord("TestSchema",
createField("testInt", Schema.create(Schema.Type.INT)),
createMapFieldSchema("testMap", Schema.create(Schema.Type.STRING)));
/**
* Test with special map type: {@link java.util.concurrent.ConcurrentHashMap}.
*/
DatumReaderCustomization customization = new DatumReaderCustomization.Builder()
.setNewMapOverrideFunc( (reuse, size) -> {
if (reuse instanceof ConcurrentHashMap) {
((ConcurrentHashMap)reuse).clear();
return reuse;
} else {
return new ConcurrentHashMap<>(size);
}
})
.build();
// Check cold datum Reader
GenericRecord record = new GenericData.Record(recordSchema);
record.put("testInt", new Integer(100));
Map<Utf8, Utf8> testMap = new HashMap<>();
testMap.put(new Utf8("key1"), new Utf8("value1"));
testMap.put(new Utf8("key2"), new Utf8("value2"));
record.put("testMap", testMap);
FastGenericDatumReader<GenericRecord> fastGenericDatumReader = new FastGenericDatumReader<>(recordSchema, recordSchema, cache, null, customization);
GenericRecord deserializedRecordByColdDatumReader = fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record));
Assert.assertEquals(deserializedRecordByColdDatumReader.get("testInt"), new Integer(100));
Assert.assertEquals(deserializedRecordByColdDatumReader.get("testMap"), testMap);
Assert.assertTrue(deserializedRecordByColdDatumReader.get("testMap") instanceof ConcurrentHashMap);

// Block the fast deserializer generation
fastGenericDatumReader.getFastDeserializer().get();
// Decode the record by fast datum reader
GenericRecord deserializedRecordByFastDatumReader = fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record));
Assert.assertEquals(deserializedRecordByFastDatumReader.get("testInt"), new Integer(100));
Assert.assertEquals(deserializedRecordByFastDatumReader.get("testMap"), testMap);
Assert.assertTrue(deserializedRecordByFastDatumReader.get("testMap") instanceof ConcurrentHashMap);

// Test with an empty map
GenericRecord recordWithEmptyMap = new GenericData.Record(recordSchema);
recordWithEmptyMap.put("testInt", new Integer(200));
recordWithEmptyMap.put("testMap", Collections.emptyMap());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me think of something... using the builder introduced in this PR, we could take advantage of datasets with many empty maps to make them take up less space on heap by leveraging the singleton empty map, e.g.:

    DatumReaderCustomization customization = new DatumReaderCustomization.Builder()
        .setNewMapOverrideFunc( (reuse, size) -> {
          if (reuse instanceof ConcurrentHashMap) {
            ((ConcurrentHashMap)reuse).clear();
            return reuse;
          } else if (size == 0) {
            return Collections.emptyMap();
          } else {
            return new ConcurrentHashMap<>(size);
          }
        })
        .build();

This could be done built-into fast-avro itself, but it's probably not a good default, since it means the returned map is immutable, which would be a breaking change in some cases. But it's a good use case for customization...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we can consider doing this in OSS Venice.

GenericRecord deserializedRecordWithEmptyMapByFastDatumReader = fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(recordWithEmptyMap));
Assert.assertEquals(deserializedRecordWithEmptyMapByFastDatumReader.get("testInt"), new Integer(200));
Assert.assertEquals(deserializedRecordWithEmptyMapByFastDatumReader.get("testMap"), Collections.emptyMap());
Assert.assertTrue(deserializedRecordWithEmptyMapByFastDatumReader.get("testMap") instanceof ConcurrentHashMap);

// Generate a new fast datum reader with the same schema, but without customization
FastGenericDatumReader<GenericRecord> fastGenericDatumReaderWithoutCustomization = new FastGenericDatumReader<>(recordSchema, cache);
GenericRecord deserializedRecordByFastDatumReaderWithoutCustomization = fastGenericDatumReaderWithoutCustomization.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record));
Assert.assertEquals(deserializedRecordByFastDatumReaderWithoutCustomization.get("testInt"), new Integer(100));
Assert.assertEquals(deserializedRecordByFastDatumReaderWithoutCustomization.get("testMap"), testMap);
Assert.assertFalse(deserializedRecordByFastDatumReaderWithoutCustomization.get("testMap") instanceof ConcurrentHashMap);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.fastserde.customized.DatumWriterCustomization;
import com.linkedin.avro.fastserde.generated.avro.TestEnum;
import com.linkedin.avro.fastserde.generated.avro.TestRecord;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -80,4 +85,40 @@ public void shouldCreateGenericDatumWriter() throws IOException {
Assert.assertTrue(fastGenericDatumWriter.isFastSerializerUsed(), "FastGenericDatumWriter should be using"
+ " Fast Serializer when the fast deserializer generation is done.");
}

@Test(groups = {"serializationTest"})
@SuppressWarnings("unchecked")
public void writeWithCustomizationCheck() throws IOException {
Schema recordSchema = createRecord("TestSchema",
createField("testInt", Schema.create(Schema.Type.INT)),
createMapFieldSchema("testMap", Schema.create(Schema.Type.STRING)));
/**
* Check whether the map type is a {@link java.util.LinkedHashMap} or not.
*/
DatumWriterCustomization customization = new DatumWriterCustomization.Builder()
.setCheckMapTypeFunction( o -> {
if (o == null) {
return;
}
if (! (o instanceof LinkedHashMap)) {
throw new IllegalArgumentException("The map type should be 'LinkedHashMap'");
}
}).build();
// Check cold datum Writer
GenericRecord record = new GenericData.Record(recordSchema);
record.put("testInt", new Integer(100));
Map<Utf8, Utf8> testMap = new HashMap<>();
testMap.put(new Utf8("key1"), new Utf8("value1"));
testMap.put(new Utf8("key2"), new Utf8("value2"));
record.put("testMap", testMap);
FastGenericDatumWriter<GenericRecord> fastGenericDatumWriterWithoutCustomization = new FastGenericDatumWriter<>(recordSchema, null, cache, null);
// No exception
fastGenericDatumWriterWithoutCustomization.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null));

FastGenericDatumWriter<GenericRecord> fastGenericDatumWriterWithCustomization = new FastGenericDatumWriter<>(recordSchema, null, cache, customization);
Assert.expectThrows(IllegalArgumentException.class,
() -> fastGenericDatumWriterWithCustomization.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null)));


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,7 @@ record = new GenericData.Record(recordWithUnionMapOfUnionValuesSchema);

private static <T> T decodeRecordColdFast(Schema writerSchema, Schema readerSchema, Decoder decoder) {
FastDeserializer<T> deserializer =
new FastSerdeCache.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, GenericData.get());
new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, GenericData.get(), false);

return decodeRecordFast(deserializer, decoder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.avro.fastserde.FastSerdeTestsSupport.getCodeGenDirectory;

import com.linkedin.avro.fastserde.FastSerdeUtils;
import java.io.File;
import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -136,11 +137,11 @@ protected <T extends SpecificRecordBase> byte[] verifySerializers(T data,
schema, classesDir, classLoader, null, specificData)
.generateSerializer();

FastSerdeCache.FastSerializerWithAvroGenericImpl<T> fastSerializerWithAvroGeneric =
new FastSerdeCache.FastSerializerWithAvroGenericImpl<>(schema, genericData);
FastSerdeUtils.FastSerializerWithAvroGenericImpl<T> fastSerializerWithAvroGeneric =
new FastSerdeUtils.FastSerializerWithAvroGenericImpl<>(schema, genericData, null, false);

FastSerdeCache.FastSerializerWithAvroSpecificImpl<T> fastSerializerWithAvroSpecific =
new FastSerdeCache.FastSerializerWithAvroSpecificImpl<>(schema, specificData);
FastSerdeUtils.FastSerializerWithAvroSpecificImpl<T> fastSerializerWithAvroSpecific =
new FastSerdeUtils.FastSerializerWithAvroSpecificImpl<>(schema, specificData, null, false);

GenericDatumWriter<T> genericDatumWriter = new GenericDatumWriter<>(
schema, genericData);
Expand Down Expand Up @@ -188,11 +189,11 @@ protected <T extends SpecificRecordBase> T verifyDeserializers(byte[] bytesWithH
schema, schema, classesDir, classLoader, null, specificData)
.generateDeserializer();

FastSerdeCache.FastDeserializerWithAvroGenericImpl<GenericData.Record> fastDeserializerWithAvroGeneric =
new FastSerdeCache.FastDeserializerWithAvroGenericImpl<>(schema, schema, genericData);
FastSerdeUtils.FastDeserializerWithAvroGenericImpl<GenericData.Record> fastDeserializerWithAvroGeneric =
new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(schema, schema, genericData, false);

FastSerdeCache.FastDeserializerWithAvroSpecificImpl<T> fastDeserializerWithAvroSpecific =
new FastSerdeCache.FastDeserializerWithAvroSpecificImpl<>(schema, schema, specificData);
FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<T> fastDeserializerWithAvroSpecific =
new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<>(schema, schema, specificData, false);

GenericDatumReader<GenericData.Record> genericDatumReader = new GenericDatumReader<>(schema, schema, genericData);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import com.linkedin.avro.api.PrimitiveBooleanList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.primitive.PrimitiveBooleanArrayList;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
Expand All @@ -19,7 +20,7 @@ public Array_of_BOOLEAN_GenericDeserializer_869749973_869749973(Schema readerSch
this.readerSchema = readerSchema;
}

public List<Boolean> deserialize(List<Boolean> reuse, Decoder decoder)
public List<Boolean> deserialize(List<Boolean> reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
PrimitiveBooleanList array0 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import com.linkedin.avro.api.PrimitiveDoubleList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.primitive.PrimitiveDoubleArrayList;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
Expand All @@ -19,7 +20,7 @@ public Array_of_DOUBLE_GenericDeserializer_18760307_18760307(Schema readerSchema
this.readerSchema = readerSchema;
}

public List<Double> deserialize(List<Double> reuse, Decoder decoder)
public List<Double> deserialize(List<Double> reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
PrimitiveDoubleList array0 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.avro.api.PrimitiveFloatList;
import com.linkedin.avro.fastserde.BufferBackedPrimitiveFloatList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;

Expand All @@ -19,7 +20,7 @@ public Array_of_FLOAT_GenericDeserializer_1012670397_1012670397(Schema readerSch
this.readerSchema = readerSchema;
}

public List<Float> deserialize(List<Float> reuse, Decoder decoder)
public List<Float> deserialize(List<Float> reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
PrimitiveFloatList array0 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import com.linkedin.avro.api.PrimitiveIntList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.primitive.PrimitiveIntArrayList;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
Expand All @@ -19,7 +20,7 @@ public Array_of_INT_GenericDeserializer_1012089072_1012089072(Schema readerSchem
this.readerSchema = readerSchema;
}

public List<Integer> deserialize(List<Integer> reuse, Decoder decoder)
public List<Integer> deserialize(List<Integer> reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
PrimitiveIntList array0 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import com.linkedin.avro.api.PrimitiveLongList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.primitive.PrimitiveLongArrayList;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
Expand All @@ -19,7 +20,7 @@ public Array_of_LONG_GenericDeserializer_325099267_325099267(Schema readerSchema
this.readerSchema = readerSchema;
}

public List<Long> deserialize(List<Long> reuse, Decoder decoder)
public List<Long> deserialize(List<Long> reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
PrimitiveLongList array0 = null;
Expand Down
Loading
Loading