diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/file/FastSerdeWithDataFileStreamTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/file/FastSerdeWithDataFileStreamTest.java index 4e38ef84..1e7b9711 100644 --- a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/file/FastSerdeWithDataFileStreamTest.java +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/file/FastSerdeWithDataFileStreamTest.java @@ -53,7 +53,7 @@ void dataFileStreamShouldReadTheSameValuesAsValuesSentToDataFileWriter() throws DataFileStream dataFileStream = new DataFileStream<>(inputStream, datumReader); // when: pre-populated bytes array is consumed by DataFileStream - for (OuterRecord1 outerRecord : dataFileStream) { // throws NPE + for (OuterRecord1 outerRecord : dataFileStream) { InnerRecord1 innerRecord = (InnerRecord1) FastSerdeTestsSupport.getField(outerRecord, "innerRecord"); CharSequence comment = (CharSequence) FastSerdeTestsSupport.getField(innerRecord, "comment"); actualComments.add(comment.toString()); diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializer.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializer.java index fb6c579c..f7f4097c 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializer.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializer.java @@ -2,6 +2,8 @@ import com.linkedin.avro.fastserde.customized.DatumReaderCustomization; import java.io.IOException; + +import org.apache.avro.Schema; import org.apache.avro.io.Decoder; import static com.linkedin.avro.fastserde.customized.DatumReaderCustomization.*; @@ -17,5 +19,15 @@ default T deserialize(T reuse, Decoder d) throws IOException { return deserialize(reuse, d, DEFAULT_DATUM_READER_CUSTOMIZATION); } + /** + * Set the writer's schema. + * @see org.apache.avro.io.DatumReader#setSchema(Schema) + */ + default void setSchema(Schema writerSchema) { + // Implement this method only in vanilla-avro-based classes (e.g. fallback scenario). + // Normally for generated deserializers it doesn't make sense. + throw new UnsupportedOperationException("Can't change schema for already generated class."); + } + T deserialize(T reuse, Decoder d, DatumReaderCustomization customization) throws IOException; } diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java index fb0d663f..213d0776 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastGenericDatumReader.java @@ -87,6 +87,8 @@ public void setSchema(Schema schema) { if (readerSchema == null) { readerSchema = writerSchema; } + + coldDeserializer.setSchema(schema); } @Override diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java index 99204964..29503d54 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeCache.java @@ -183,7 +183,7 @@ public static boolean isSupportedForFastSerializer(Schema.Type schemaType) { Schema.Type.ARRAY); } - public static boolean isFastDeserializer(FastDeserializer deserializer) { + public static boolean isFastDeserializer(FastDeserializer deserializer) { return deserializer.isBackedByGeneratedClass(); } @@ -476,7 +476,7 @@ private FastDeserializer buildSpecificDeserializer(Schema writerSchema, Schem LOGGER.error("Deserializer class instantiation exception", e); } - return new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl(writerSchema, readerSchema, modelData, customization, failFast, true); + return new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<>(writerSchema, readerSchema, modelData, customization, failFast, true); } /** @@ -536,7 +536,7 @@ private FastDeserializer buildGenericDeserializer(Schema writerSchema, Schema LOGGER.error("Deserializer class instantiation exception:", e); } - return new FastSerdeUtils.FastDeserializerWithAvroGenericImpl(writerSchema, readerSchema, modelData, customization, failFast, true); + return new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, modelData, customization, failFast, true); } public FastSerializer buildFastSpecificSerializer(Schema schema, SpecificData modelData) { diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeUtils.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeUtils.java index c9bc079f..333ac419 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeUtils.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerdeUtils.java @@ -52,6 +52,11 @@ public FastDeserializerWithAvroSpecificImpl(Schema writerSchema, Schema readerSc this.runtimeClassGenerationDone = runtimeClassGenerationDone; } + @Override + public void setSchema(Schema writerSchema) { + this.customizedDatumReader.setSchema(writerSchema); + } + @Override public V deserialize(V reuse, Decoder d, DatumReaderCustomization customization) throws IOException { if (failFast) { @@ -103,6 +108,11 @@ public FastDeserializerWithAvroGenericImpl(Schema writerSchema, Schema readerSch this.runtimeClassGenerationDone = runtimeClassGenerationDone; } + @Override + public void setSchema(Schema writerSchema) { + customizedDatumReader.setSchema(writerSchema); + } + @Override public V deserialize(V reuse, Decoder d, DatumReaderCustomization customization) throws IOException { if (failFast) {