Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fast-avro] Schema evolution: complex fields changed to nullable #538

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"type": "record",
"name": "OuterRecordWithNestedNotNullComplexFields",
"namespace": "com.linkedin.avro.fastserde.generated.avro",
"doc": "Used in tests of fast-serde to serialize record with schema having not-null complex fields",
"fields": [
{
"name": "innerRecord",
"type": {
"name": "InnerRecordNotNull",
"type": "record",
"fields": [
{
"name": "comment",
"type": "string"
}
]
}
},
{
"name": "innerMap",
"type": {
"type": "map",
"values": "int"
}
},
{
"name": "innerArray",
"type": {
"type": "array",
"items": "int"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"type": "record",
"name": "OuterRecordWithNestedNullableComplexFields",
"aliases": [
"OuterRecordWithNestedNotNullComplexFields"
],
"namespace": "com.linkedin.avro.fastserde.generated.avro",
"doc": "Used in tests of fast-serde to deserialize record serialized with schema having not-null complex fields",
"fields": [
{
"name": "innerRecord",
"type": [
"null",
{
"name": "InnerRecordNullable",
"aliases": [
"InnerRecordNotNull"
],
"type": "record",
"fields": [
{
"name": "comment",
"type": "string"
}
]
}
]
},
{
"name": "innerMap",
"type": [
"null",
{
"type": "map",
"values": "int"
}
]
},
{
"name": "innerArray",
"type": [
"null",
{
"type": "array",
"items": "int"
}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
import com.linkedin.avro.api.PrimitiveFloatList;
import com.linkedin.avro.api.PrimitiveIntList;
import com.linkedin.avro.api.PrimitiveLongList;
import com.linkedin.avro.fastserde.generated.avro.InnerRecordNotNull;
import com.linkedin.avro.fastserde.generated.avro.OuterRecordWithNestedNotNullComplexFields;
import com.linkedin.avro.fastserde.generated.avro.OuterRecordWithNestedNullableComplexFields;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.avroutil1.compatibility.AvroRecordUtil;
import com.linkedin.avroutil1.compatibility.AvroVersion;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
Expand All @@ -26,16 +32,21 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
import org.testng.Assert;
import org.testng.SkipException;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
import org.testng.internal.collections.Pair;

import static com.linkedin.avro.fastserde.FastSerdeTestsSupport.*;

import static com.linkedin.avro.fastserde.FastSpecificDeserializerGeneratorTest.createAndSerializeOuterRecordWithNotNullComplexFields;


public class FastGenericDeserializerGeneratorTest {
Expand Down Expand Up @@ -1590,6 +1601,25 @@ record = new GenericData.Record(recordWithUnionMapOfUnionValuesSchema);
Assert.assertEquals(((Map<Utf8,Integer>) recordB.get("someInts")).get(new Utf8("3")), Integer.valueOf(3));
}

@Test(groups = {"deserializationTest"}, dataProvider = "Implementation")
void deserializeNullableFieldsPreviouslySerializedAsNotNull(Implementation implementation) throws IOException {
// given: outerRecord1 serialized using schema with not-null complex fields
Pair<OuterRecordWithNestedNotNullComplexFields, byte[]> pair = createAndSerializeOuterRecordWithNotNullComplexFields();
OuterRecordWithNestedNotNullComplexFields outerRecord1 = pair.first();
byte[] serializedOuterRecord1 = pair.second();

Schema writerSchema = OuterRecordWithNestedNotNullComplexFields.SCHEMA$; // without nullable fields
Schema readerSchema = OuterRecordWithNestedNullableComplexFields.SCHEMA$; // contains nullable fields
BinaryDecoder binaryDecoder = AvroCompatibilityHelper.newBinaryDecoder(serializedOuterRecord1);

// when: serialized outerRecord1 is deserialized using readerSchema with nullable complex fields
GenericRecord outerRecord2 = implementation.decode(writerSchema, readerSchema, binaryDecoder);

// then: deserialized outerRecord2 is the same as outerRecord1 (initial one)
Assert.assertNotNull(outerRecord2);
Assert.assertEquals(outerRecord2.toString(), outerRecord1.toString());
}

private static <T> T decodeRecordColdFast(Schema writerSchema, Schema readerSchema, Decoder decoder) {
FastDeserializer<T> deserializer =
new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, GenericData.get(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.fastserde.generated.avro.FullRecord;
import com.linkedin.avro.fastserde.generated.avro.InnerRecordNotNull;
import com.linkedin.avro.fastserde.generated.avro.IntRecord;
import com.linkedin.avro.fastserde.generated.avro.MyEnumV2;
import com.linkedin.avro.fastserde.generated.avro.MyRecordV2;
import com.linkedin.avro.fastserde.generated.avro.OuterRecordWithNestedNotNullComplexFields;
import com.linkedin.avro.fastserde.generated.avro.OuterRecordWithNestedNullableComplexFields;
import com.linkedin.avro.fastserde.generated.avro.RecordWithLargeUnionField;
import com.linkedin.avro.fastserde.generated.avro.RemovedTypesTestRecord;
import com.linkedin.avro.fastserde.generated.avro.SplitRecordTest1;
Expand All @@ -16,6 +19,9 @@
import com.linkedin.avro.fastserde.generated.avro.TestRecord;
import com.linkedin.avro.fastserde.generated.avro.UnionOfRecordsWithSameNameEnumField;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.avroutil1.compatibility.AvroRecordUtil;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URL;
Expand All @@ -31,13 +37,18 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.util.Utf8;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
import org.testng.internal.collections.Pair;

import static com.linkedin.avro.fastserde.FastSerdeTestsSupport.*;

Expand Down Expand Up @@ -844,6 +855,61 @@ public void largeSchemasWithUnionCanBeHandled() {
}
}

@Test(groups = {"deserializationTest"}, dataProvider = "SlowFastDeserializer")
void deserializeNullableFieldsPreviouslySerializedAsNotNull(boolean useFastSerializer) throws IOException {
// given: outerRecord1 serialized using schema with not-null complex fields
Pair<OuterRecordWithNestedNotNullComplexFields, byte[]> pair = createAndSerializeOuterRecordWithNotNullComplexFields();
OuterRecordWithNestedNotNullComplexFields outerRecord1 = pair.first();
byte[] serializedOuterRecord1 = pair.second();
BinaryDecoder binaryDecoder = AvroCompatibilityHelper.newBinaryDecoder(serializedOuterRecord1);

Schema writerSchema = OuterRecordWithNestedNotNullComplexFields.SCHEMA$; // without nullable fields
Schema readerSchema = OuterRecordWithNestedNullableComplexFields.SCHEMA$; // contains nullable fields

// when: serialized outerRecord1 is deserialized using readerSchema with nullable complex fields
OuterRecordWithNestedNullableComplexFields outerRecord2;
if (useFastSerializer) {
outerRecord2 = decodeRecordFast(readerSchema, writerSchema, binaryDecoder);
} else {
outerRecord2 = decodeRecordSlow(readerSchema, writerSchema, binaryDecoder);
}

// then: deserialized outerRecord2 is the same as outerRecord1 (initial one)
Assert.assertNotNull(outerRecord2);
Assert.assertEquals(outerRecord2.toString(), outerRecord1.toString());
}

/**
* @return serialized {@link OuterRecordWithNestedNotNullComplexFields}
*/
static Pair<OuterRecordWithNestedNotNullComplexFields, byte[]> createAndSerializeOuterRecordWithNotNullComplexFields() throws IOException {
InnerRecordNotNull innerRecord = AvroRecordUtil.setField(new InnerRecordNotNull(), "comment", "awesome comment");

Map<String, Integer> innerMap = new HashMap<>();
innerMap.put("one", 1);
innerMap.put("twotwo", 22);
innerMap.put("three x 3", 333);

List<Integer> innerArray = Lists.newArrayList(234, 2342, 948563);

OuterRecordWithNestedNotNullComplexFields outerRecord1 = new OuterRecordWithNestedNotNullComplexFields();
AvroRecordUtil.setField(outerRecord1, "innerRecord", innerRecord);
AvroRecordUtil.setField(outerRecord1, "innerMap", innerMap);
AvroRecordUtil.setField(outerRecord1, "innerArray", innerArray);

Schema writerSchema = OuterRecordWithNestedNotNullComplexFields.SCHEMA$;
SpecificDatumWriter<OuterRecordWithNestedNotNullComplexFields> datumWriter = new SpecificDatumWriter<>(writerSchema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = AvroCompatibilityHelper.newBinaryEncoder(baos);

datumWriter.write(outerRecord1, binaryEncoder);
binaryEncoder.flush();

byte[] serializedOuterRecord1 = baos.toByteArray();

return Pair.of(outerRecord1, serializedOuterRecord1);
}

private <T> T decodeRecordFast(Schema readerSchema, Schema writerSchema, Decoder decoder) {
FastDeserializer<T> deserializer =
new FastSpecificDeserializerGenerator<T>(writerSchema, readerSchema, tempDir, classLoader,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@

package com.linkedin.avro.fastserde.generated.deserialization.AVRO_1_11;

import java.io.IOException;
import java.util.Map;
import com.linkedin.avro.api.PrimitiveIntList;
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import com.linkedin.avro.fastserde.primitive.PrimitiveIntArrayList;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.util.Utf8;

public class OuterRecordWithNestedNullableComplexFields_GenericDeserializer_1244262185_49792023
implements FastDeserializer<IndexedRecord>
{

private final Schema readerSchema;
private final Schema innerRecord0;
private final Schema innerRecordNullableRecordSchema0;
private final Schema innerMap0;
private final Schema innerMapMapSchema0;
private final Schema innerArray0;
private final Schema innerArrayArraySchema0;

public OuterRecordWithNestedNullableComplexFields_GenericDeserializer_1244262185_49792023(Schema readerSchema) {
this.readerSchema = readerSchema;
this.innerRecord0 = readerSchema.getField("innerRecord").schema();
this.innerRecordNullableRecordSchema0 = innerRecord0 .getTypes().get(1);
this.innerMap0 = readerSchema.getField("innerMap").schema();
this.innerMapMapSchema0 = innerMap0 .getTypes().get(1);
this.innerArray0 = readerSchema.getField("innerArray").schema();
this.innerArrayArraySchema0 = innerArray0 .getTypes().get(1);
}

public IndexedRecord deserialize(IndexedRecord reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
return deserializeOuterRecordWithNestedNullableComplexFields0((reuse), (decoder), (customization));
}

public IndexedRecord deserializeOuterRecordWithNestedNullableComplexFields0(Object reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
IndexedRecord OuterRecordWithNestedNullableComplexFields;
if ((((reuse)!= null)&&((reuse) instanceof IndexedRecord))&&(((IndexedRecord)(reuse)).getSchema() == readerSchema)) {
OuterRecordWithNestedNullableComplexFields = ((IndexedRecord)(reuse));
} else {
OuterRecordWithNestedNullableComplexFields = new org.apache.avro.generic.GenericData.Record(readerSchema);
}
OuterRecordWithNestedNullableComplexFields.put(0, deserializeInnerRecordNullable0(OuterRecordWithNestedNullableComplexFields.get(0), (decoder), (customization)));
populate_OuterRecordWithNestedNullableComplexFields0((OuterRecordWithNestedNullableComplexFields), (customization), (decoder));
return OuterRecordWithNestedNullableComplexFields;
}

public IndexedRecord deserializeInnerRecordNullable0(Object reuse, Decoder decoder, DatumReaderCustomization customization)
throws IOException
{
IndexedRecord InnerRecordNullable;
if ((((reuse)!= null)&&((reuse) instanceof IndexedRecord))&&(((IndexedRecord)(reuse)).getSchema() == innerRecordNullableRecordSchema0)) {
InnerRecordNullable = ((IndexedRecord)(reuse));
} else {
InnerRecordNullable = new org.apache.avro.generic.GenericData.Record(innerRecordNullableRecordSchema0);
}
Utf8 charSequence0;
Object oldString0 = InnerRecordNullable.get(0);
if (oldString0 instanceof Utf8) {
charSequence0 = (decoder).readString(((Utf8) oldString0));
} else {
charSequence0 = (decoder).readString(null);
}
InnerRecordNullable.put(0, charSequence0);
return InnerRecordNullable;
}

private void populate_OuterRecordWithNestedNullableComplexFields0(IndexedRecord OuterRecordWithNestedNullableComplexFields, DatumReaderCustomization customization, Decoder decoder)
throws IOException
{
Map<Utf8, Integer> innerMap1 = null;
long chunkLen0 = (decoder.readMapStart());
if (chunkLen0 > 0) {
innerMap1 = ((Map)(customization).getNewMapOverrideFunc().apply(OuterRecordWithNestedNullableComplexFields.get(1), ((int) chunkLen0)));
do {
for (int counter0 = 0; (counter0 <chunkLen0); counter0 ++) {
Utf8 key0 = (decoder.readString(null));
innerMap1 .put(key0, (decoder.readInt()));
}
chunkLen0 = (decoder.mapNext());
} while (chunkLen0 > 0);
} else {
innerMap1 = ((Map)(customization).getNewMapOverrideFunc().apply(OuterRecordWithNestedNullableComplexFields.get(1), 0));
}
OuterRecordWithNestedNullableComplexFields.put(1, innerMap1);
PrimitiveIntList innerArray1 = null;
long chunkLen1 = (decoder.readArrayStart());
Object oldArray0 = OuterRecordWithNestedNullableComplexFields.get(2);
if (oldArray0 instanceof PrimitiveIntList) {
innerArray1 = ((PrimitiveIntList) oldArray0);
innerArray1 .clear();
} else {
innerArray1 = new PrimitiveIntArrayList(((int) chunkLen1));
}
while (chunkLen1 > 0) {
for (int counter1 = 0; (counter1 <chunkLen1); counter1 ++) {
innerArray1 .addPrimitive((decoder.readInt()));
}
chunkLen1 = (decoder.arrayNext());
}
OuterRecordWithNestedNullableComplexFields.put(2, innerArray1);
}

}
Loading
Loading