Skip to content

Commit

Permalink
Added serde customization support in Fast-Avro (#520)
Browse files Browse the repository at this point in the history
* Added serde customization support in Fast-Avro

We have the following requirements:
For serialization, we would like to validate whether all the map fields are using the desired map type.
For deserialization, we would like to deserialize the map type into a special map impelementation for later use.

These customized requirements are not supported in the past because of the following reasons:

1. Fast classes generated are shared, so it is possible different users of the same schema may have different requirement.
2. For the same process, for different schema, the requirements can be different too.
3. No way to specify customized logic/data type when generating fast classes.
This PR adds a new functionality to specify customized logic and it is expandable and backward compatible.
DatumReaderCustomization : customization for read
DatumWriterCustomization : customization for write

Currently, these classes only support the requirements mentioned at the beginning.

How it works internally?

1. Each Fast DatumReader/DatumWriter constructor will take a new param for customization.
2. Each Fast DatumReader/DatumWriter will keep a local vanilla-Avro based implementation with customization support since the shared vanilla-Avro based implementation is still the default implementation.
3. Each generated Fast class will have a new param for customization in serialize/deserialize APIs.
4. Fast DatumReader/DatumWriter will call this new API with customization param of Fast classes.
5. The read/write API in Fast DatumReader/DatumWriter doesn't change, so it is backward compatible.

* Addressed comments

* Code gen for #520
  • Loading branch information
gaojieliu authored Oct 26, 2023
1 parent 2fb570c commit cd17a70
Show file tree
Hide file tree
Showing 151 changed files with 4,323 additions and 4,302 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#gradle artifacts
.gradle/

**/build/*
!**/avro-fastserde-tests111/build/codegen

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.generated.avro.TestEnum;
import com.linkedin.avro.fastserde.generated.avro.TestRecord;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -81,7 +87,7 @@ public void shouldNotCreateSpecificDatumReader() throws IOException, Interrupted
(FastDeserializer<TestRecord>) cache.getFastSpecificDeserializer(TestRecord.SCHEMA$, faultySchema);

Assert.assertNotNull(fastSpecificDeserializer);
Assert.assertEquals(fastSpecificDeserializer.getClass().getDeclaredMethods().length, 1);
Assert.assertFalse(fastSpecificDeserializer.isBackedByGeneratedClass());
}

@Test(groups = {"deserializationTest"})
Expand Down Expand Up @@ -134,4 +140,60 @@ public void shouldCreateGenericDatumReader() throws IOException {
Assert.assertTrue(fastGenericDatumReader.isFastDeserializerUsed(), "FastGenericDatumReader should be using"
+ " Fast Deserializer when the fast deserializer generation is done.");
}

@Test(groups = {"deserializationTest"})
public void testDatumReaderWithCustomization() throws IOException, ExecutionException, InterruptedException {
Schema recordSchema = createRecord("TestSchema",
createField("testInt", Schema.create(Schema.Type.INT)),
createMapFieldSchema("testMap", Schema.create(Schema.Type.STRING)));
/**
* Test with special map type: {@link java.util.concurrent.ConcurrentHashMap}.
*/
DatumReaderCustomization customization = new DatumReaderCustomization.Builder()
.setNewMapOverrideFunc( (reuse, size) -> {
if (reuse instanceof ConcurrentHashMap) {
((ConcurrentHashMap)reuse).clear();
return reuse;
} else {
return new ConcurrentHashMap<>(size);
}
})
.build();
// Check cold datum Reader
GenericRecord record = new GenericData.Record(recordSchema);
record.put("testInt", new Integer(100));
Map<Utf8, Utf8> testMap = new HashMap<>();
testMap.put(new Utf8("key1"), new Utf8("value1"));
testMap.put(new Utf8("key2"), new Utf8("value2"));
record.put("testMap", testMap);
FastGenericDatumReader<GenericRecord> fastGenericDatumReader = new FastGenericDatumReader<>(recordSchema, recordSchema, cache, null, customization);
GenericRecord deserializedRecordByColdDatumReader = fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record));
Assert.assertEquals(deserializedRecordByColdDatumReader.get("testInt"), new Integer(100));
Assert.assertEquals(deserializedRecordByColdDatumReader.get("testMap"), testMap);
Assert.assertTrue(deserializedRecordByColdDatumReader.get("testMap") instanceof ConcurrentHashMap);

// Block the fast deserializer generation
fastGenericDatumReader.getFastDeserializer().get();
// Decode the record by fast datum reader
GenericRecord deserializedRecordByFastDatumReader = fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record));
Assert.assertEquals(deserializedRecordByFastDatumReader.get("testInt"), new Integer(100));
Assert.assertEquals(deserializedRecordByFastDatumReader.get("testMap"), testMap);
Assert.assertTrue(deserializedRecordByFastDatumReader.get("testMap") instanceof ConcurrentHashMap);

// Test with an empty map
GenericRecord recordWithEmptyMap = new GenericData.Record(recordSchema);
recordWithEmptyMap.put("testInt", new Integer(200));
recordWithEmptyMap.put("testMap", Collections.emptyMap());
GenericRecord deserializedRecordWithEmptyMapByFastDatumReader = fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(recordWithEmptyMap));
Assert.assertEquals(deserializedRecordWithEmptyMapByFastDatumReader.get("testInt"), new Integer(200));
Assert.assertEquals(deserializedRecordWithEmptyMapByFastDatumReader.get("testMap"), Collections.emptyMap());
Assert.assertTrue(deserializedRecordWithEmptyMapByFastDatumReader.get("testMap") instanceof ConcurrentHashMap);

// Generate a new fast datum reader with the same schema, but without customization
FastGenericDatumReader<GenericRecord> fastGenericDatumReaderWithoutCustomization = new FastGenericDatumReader<>(recordSchema, cache);
GenericRecord deserializedRecordByFastDatumReaderWithoutCustomization = fastGenericDatumReaderWithoutCustomization.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record));
Assert.assertEquals(deserializedRecordByFastDatumReaderWithoutCustomization.get("testInt"), new Integer(100));
Assert.assertEquals(deserializedRecordByFastDatumReaderWithoutCustomization.get("testMap"), testMap);
Assert.assertFalse(deserializedRecordByFastDatumReaderWithoutCustomization.get("testMap") instanceof ConcurrentHashMap);
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.fastserde.customized.DatumWriterCustomization;
import com.linkedin.avro.fastserde.generated.avro.TestEnum;
import com.linkedin.avro.fastserde.generated.avro.TestRecord;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -80,4 +85,40 @@ public void shouldCreateGenericDatumWriter() throws IOException {
Assert.assertTrue(fastGenericDatumWriter.isFastSerializerUsed(), "FastGenericDatumWriter should be using"
+ " Fast Serializer when the fast deserializer generation is done.");
}

@Test(groups = {"serializationTest"})
@SuppressWarnings("unchecked")
public void writeWithCustomizationCheck() throws IOException {
Schema recordSchema = createRecord("TestSchema",
createField("testInt", Schema.create(Schema.Type.INT)),
createMapFieldSchema("testMap", Schema.create(Schema.Type.STRING)));
/**
* Check whether the map type is a {@link java.util.LinkedHashMap} or not.
*/
DatumWriterCustomization customization = new DatumWriterCustomization.Builder()
.setCheckMapTypeFunction( o -> {
if (o == null) {
return;
}
if (! (o instanceof LinkedHashMap)) {
throw new IllegalArgumentException("The map type should be 'LinkedHashMap'");
}
}).build();
// Check cold datum Writer
GenericRecord record = new GenericData.Record(recordSchema);
record.put("testInt", new Integer(100));
Map<Utf8, Utf8> testMap = new HashMap<>();
testMap.put(new Utf8("key1"), new Utf8("value1"));
testMap.put(new Utf8("key2"), new Utf8("value2"));
record.put("testMap", testMap);
FastGenericDatumWriter<GenericRecord> fastGenericDatumWriterWithoutCustomization = new FastGenericDatumWriter<>(recordSchema, null, cache, null);
// No exception
fastGenericDatumWriterWithoutCustomization.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null));

FastGenericDatumWriter<GenericRecord> fastGenericDatumWriterWithCustomization = new FastGenericDatumWriter<>(recordSchema, null, cache, customization);
Assert.expectThrows(IllegalArgumentException.class,
() -> fastGenericDatumWriterWithCustomization.write(record, AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream(), true, null)));


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1592,7 +1592,7 @@ record = new GenericData.Record(recordWithUnionMapOfUnionValuesSchema);

private static <T> T decodeRecordColdFast(Schema writerSchema, Schema readerSchema, Decoder decoder) {
FastDeserializer<T> deserializer =
new FastSerdeCache.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, GenericData.get());
new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, GenericData.get(), false);

return decodeRecordFast(deserializer, decoder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.avro.fastserde.FastSerdeTestsSupport.getCodeGenDirectory;

import com.linkedin.avro.fastserde.FastSerdeUtils;
import java.io.File;
import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -136,11 +137,11 @@ protected <T extends SpecificRecordBase> byte[] verifySerializers(T data,
schema, classesDir, classLoader, null, specificData)
.generateSerializer();

FastSerdeCache.FastSerializerWithAvroGenericImpl<T> fastSerializerWithAvroGeneric =
new FastSerdeCache.FastSerializerWithAvroGenericImpl<>(schema, genericData);
FastSerdeUtils.FastSerializerWithAvroGenericImpl<T> fastSerializerWithAvroGeneric =
new FastSerdeUtils.FastSerializerWithAvroGenericImpl<>(schema, genericData, null, false);

FastSerdeCache.FastSerializerWithAvroSpecificImpl<T> fastSerializerWithAvroSpecific =
new FastSerdeCache.FastSerializerWithAvroSpecificImpl<>(schema, specificData);
FastSerdeUtils.FastSerializerWithAvroSpecificImpl<T> fastSerializerWithAvroSpecific =
new FastSerdeUtils.FastSerializerWithAvroSpecificImpl<>(schema, specificData, null, false);

GenericDatumWriter<T> genericDatumWriter = new GenericDatumWriter<>(
schema, genericData);
Expand Down Expand Up @@ -188,11 +189,11 @@ protected <T extends SpecificRecordBase> T verifyDeserializers(byte[] bytesWithH
schema, schema, classesDir, classLoader, null, specificData)
.generateDeserializer();

FastSerdeCache.FastDeserializerWithAvroGenericImpl<GenericData.Record> fastDeserializerWithAvroGeneric =
new FastSerdeCache.FastDeserializerWithAvroGenericImpl<>(schema, schema, genericData);
FastSerdeUtils.FastDeserializerWithAvroGenericImpl<GenericData.Record> fastDeserializerWithAvroGeneric =
new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(schema, schema, genericData, false);

FastSerdeCache.FastDeserializerWithAvroSpecificImpl<T> fastDeserializerWithAvroSpecific =
new FastSerdeCache.FastDeserializerWithAvroSpecificImpl<>(schema, schema, specificData);
FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<T> fastDeserializerWithAvroSpecific =
new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<>(schema, schema, specificData, false);

GenericDatumReader<GenericData.Record> genericDatumReader = new GenericDatumReader<>(schema, schema, genericData);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import com.linkedin.avro.api.PrimitiveBooleanList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.primitive.PrimitiveBooleanArrayList;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
Expand All @@ -19,7 +20,7 @@ public Array_of_BOOLEAN_GenericDeserializer_869749973_869749973(Schema readerSch
this.readerSchema = readerSchema;
}

public List<Boolean> deserialize(List<Boolean> reuse, Decoder decoder)
public List<Boolean> deserialize(List<Boolean> reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
PrimitiveBooleanList array0 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import com.linkedin.avro.api.PrimitiveDoubleList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.primitive.PrimitiveDoubleArrayList;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
Expand All @@ -19,7 +20,7 @@ public Array_of_DOUBLE_GenericDeserializer_18760307_18760307(Schema readerSchema
this.readerSchema = readerSchema;
}

public List<Double> deserialize(List<Double> reuse, Decoder decoder)
public List<Double> deserialize(List<Double> reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
PrimitiveDoubleList array0 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.avro.api.PrimitiveFloatList;
import com.linkedin.avro.fastserde.BufferBackedPrimitiveFloatList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;

Expand All @@ -19,7 +20,7 @@ public Array_of_FLOAT_GenericDeserializer_1012670397_1012670397(Schema readerSch
this.readerSchema = readerSchema;
}

public List<Float> deserialize(List<Float> reuse, Decoder decoder)
public List<Float> deserialize(List<Float> reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
PrimitiveFloatList array0 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import com.linkedin.avro.api.PrimitiveIntList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.primitive.PrimitiveIntArrayList;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
Expand All @@ -19,7 +20,7 @@ public Array_of_INT_GenericDeserializer_1012089072_1012089072(Schema readerSchem
this.readerSchema = readerSchema;
}

public List<Integer> deserialize(List<Integer> reuse, Decoder decoder)
public List<Integer> deserialize(List<Integer> reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
PrimitiveIntList array0 = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import com.linkedin.avro.api.PrimitiveLongList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.primitive.PrimitiveLongArrayList;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
Expand All @@ -19,7 +20,7 @@ public Array_of_LONG_GenericDeserializer_325099267_325099267(Schema readerSchema
this.readerSchema = readerSchema;
}

public List<Long> deserialize(List<Long> reuse, Decoder decoder)
public List<Long> deserialize(List<Long> reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
PrimitiveLongList array0 = null;
Expand Down
Loading

0 comments on commit cd17a70

Please sign in to comment.