Skip to content

Commit

Permalink
Always return PrimitiveFloatList, even when cold
Browse files Browse the repository at this point in the history
The PrimitiveFloatList is an API which users should expect to rely on,
so it is wrong to degrade to the more constrained List<Float> API while
Fast-Avro is still cold. This commit introduces several changes to make
the extended API reliably present whenever using Fast-Avro, regardless
of being cold or warm.

- Changed PrimitiveFloatList to an interface, in a new package called:
  com.linkedin.avro.api; since the package name migration makes this an
  incompatible change, it would be desirable for the next release to
  not increment only the patch version. Having a proper package name
  for API extension should make things cleaner in the future as we add
  other optimized APIs (e.g. PR linkedin#45).
- Renamed the old class to ByteBufferBackedPrimitiveFloatList, and made
  it implement the new interface.
- Added new several new classes to ensure that the PrimitiveFloatList
  is always returned even when Fast-Avro falls back to vanilla Avro:
  - ColdPrimitiveFloatList which is a naive implementation that simply
    implements the new API by delegating to the regular Avro functions.
    This does not provide any GC benefits, but at least maintains the
    API.
  - ColdGenericDatumReader and ColdSpecificDatumReader which extend the
    GenericDatumReader and SpecificDatumReader classes, respectively,
    from vanilla Avro.
  - ColdDatumReaderMixIn which provides a utility function to minimize
    repeated code between the two DatumReader functions.
- Significantly refactored the FastGenericDeserializerGeneratorTest so
  that it tests three permutations: vanilla, cold fast and warm fast.
  As part of doing this, several test short-comings were discovered and
  fixed. In particular, the decodeRecordSlow function had some flipped
  parameters which led to test failures on vanilla Avro, and those test
  failures were hidden by the fact that some tests ignored he provided
  permutation param and systematically tested only Fast-Avro.
  • Loading branch information
FelixGV committed May 28, 2020
1 parent 10bef2f commit 0500654
Show file tree
Hide file tree
Showing 10 changed files with 301 additions and 181 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.linkedin.avro.api;

import java.util.List;

/**
* A {@link List} implementation with additional functions to prevent boxing.
*/
public interface PrimitiveFloatList extends List<Float> {
float getPrimitive(int index);
boolean addPrimitive(float o);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.api.PrimitiveFloatList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.AbstractList;
Expand Down Expand Up @@ -33,8 +34,8 @@
*
* TODO: Provide arrays for other primitive types.
*/
public class PrimitiveFloatList extends AbstractList<Float>
implements GenericArray<Float>, Comparable<GenericArray<Float>> {
public class ByteBufferBackedPrimitiveFloatList extends AbstractList<Float>
implements GenericArray<Float>, Comparable<GenericArray<Float>>, PrimitiveFloatList {
private static final float[] EMPTY = new float[0];
private static final int FLOAT_SIZE = Float.BYTES;
private static final Schema FLOAT_SCHEMA = Schema.create(Schema.Type.FLOAT);
Expand All @@ -44,15 +45,15 @@ public class PrimitiveFloatList extends AbstractList<Float>
private boolean isCached = false;
private CompositeByteBuffer byteBuffer;

public PrimitiveFloatList(int capacity) {
public ByteBufferBackedPrimitiveFloatList(int capacity) {
if (capacity != 0) {
elements = new float[capacity];
}
// create empty ByteBuffer if capacity != 0 ( List<Float> interface usage case)
byteBuffer = new CompositeByteBuffer(capacity != 0);
}

public PrimitiveFloatList(Collection<Float> c) {
public ByteBufferBackedPrimitiveFloatList(Collection<Float> c) {
if (c != null) {
elements = new float[c.size()];
addAll(c);
Expand All @@ -61,21 +62,21 @@ public PrimitiveFloatList(Collection<Float> c) {
}

/**
* Instantiate (or re-use) and populate a {@link PrimitiveFloatList} from a {@link org.apache.avro.io.Decoder}.
* Instantiate (or re-use) and populate a {@link ByteBufferBackedPrimitiveFloatList} from a {@link org.apache.avro.io.Decoder}.
*
* N.B.: the caller must ensure the data is of the appropriate type by calling {@link #isFloatArray(Schema)}.
*
* @param old old {@link PrimitiveFloatList} to reuse
* @param old old {@link ByteBufferBackedPrimitiveFloatList} to reuse
* @param in {@link org.apache.avro.io.Decoder} to read new list from
* @return a {@link PrimitiveFloatList} with data, possibly the old argument reused
* @return a {@link ByteBufferBackedPrimitiveFloatList} with data, possibly the old argument reused
* @throws IOException on io errors
*/
public static Object readPrimitiveFloatArray(Object old, Decoder in) throws IOException {
long length = in.readArrayStart();
long totalLength = 0;

if (length > 0) {
PrimitiveFloatList array = (PrimitiveFloatList) newPrimitiveFloatArray(old);
ByteBufferBackedPrimitiveFloatList array = (ByteBufferBackedPrimitiveFloatList) newPrimitiveFloatArray(old);
int index = 0;

do {
Expand All @@ -90,11 +91,11 @@ public static Object readPrimitiveFloatArray(Object old, Decoder in) throws IOEx
setupElements(array, (int)totalLength);
return array;
} else {
return new PrimitiveFloatList(0);
return new ByteBufferBackedPrimitiveFloatList(0);
}
}

private static void setupElements(PrimitiveFloatList list, int totalSize) {
private static void setupElements(ByteBufferBackedPrimitiveFloatList list, int totalSize) {
if (list.elements.length != 0) {
if (totalSize <= list.getCapacity()) {
// reuse the float array directly
Expand All @@ -111,7 +112,7 @@ private static void setupElements(PrimitiveFloatList list, int totalSize) {

/**
* @param expected {@link Schema} to inspect
* @return true if the {@code expected} SCHEMA is of the right type to decode as a {@link PrimitiveFloatList}
* @return true if the {@code expected} SCHEMA is of the right type to decode as a {@link ByteBufferBackedPrimitiveFloatList}
* false otherwise
*/
public static boolean isFloatArray(Schema expected) {
Expand All @@ -120,15 +121,15 @@ public static boolean isFloatArray(Schema expected) {
}

private static Object newPrimitiveFloatArray(Object old) {
if (old instanceof PrimitiveFloatList) {
PrimitiveFloatList oldFloatList = (PrimitiveFloatList) old;
if (old instanceof ByteBufferBackedPrimitiveFloatList) {
ByteBufferBackedPrimitiveFloatList oldFloatList = (ByteBufferBackedPrimitiveFloatList) old;
oldFloatList.byteBuffer.clear();
oldFloatList.isCached = false;
oldFloatList.size = 0;
return oldFloatList;
} else {
// Just a place holder, will set up the elements later.
return new PrimitiveFloatList(0);
return new ByteBufferBackedPrimitiveFloatList(0);
}
}

Expand Down Expand Up @@ -282,8 +283,8 @@ public Float peek() {
@Override
public int compareTo(GenericArray<Float> that) {
cacheFromByteBuffer();
if (that instanceof PrimitiveFloatList) {
PrimitiveFloatList thatPrimitiveList = (PrimitiveFloatList) that;
if (that instanceof ByteBufferBackedPrimitiveFloatList) {
ByteBufferBackedPrimitiveFloatList thatPrimitiveList = (ByteBufferBackedPrimitiveFloatList) that;
if (this.size == thatPrimitiveList.size) {
for (int i = 0; i < this.size; i++) {
int compare = Float.compare(this.elements[i], thatPrimitiveList.elements[i]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,10 +589,10 @@ private void processArray(JVar arraySchemaVar, final String name, final Schema a

final JVar arrayVar = action.getShouldRead() ? declareValueVar(name, arraySchema, parentBody) : null;
/**
* Special optimization for float array by leveraging {@link PrimitiveFloatList}.
* Special optimization for float array by leveraging {@link ByteBufferBackedPrimitiveFloatList}.
*/
if (action.getShouldRead() && arraySchema.getElementType().getType().equals(Schema.Type.FLOAT)) {
JClass primitiveFloatList = codeModel.ref(PrimitiveFloatList.class);
JClass primitiveFloatList = codeModel.ref(ByteBufferBackedPrimitiveFloatList.class);
JExpression readPrimitiveFloatArrayInvocation = primitiveFloatList.staticInvoke("readPrimitiveFloatArray").
arg(reuseSupplier.get()).arg(JExpr.direct(DECODER));
JExpression castedResult =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.avro.fastserde;

import org.apache.avro.generic.ColdGenericDatumReader;
import org.apache.avro.generic.ColdSpecificDatumReader;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
Expand Down Expand Up @@ -478,7 +480,7 @@ public static class FastDeserializerWithAvroSpecificImpl<V> implements FastDeser
private final SpecificDatumReader<V> datumReader;

public FastDeserializerWithAvroSpecificImpl(Schema writerSchema, Schema readerSchema) {
this.datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
this.datumReader = new ColdSpecificDatumReader<>(writerSchema, readerSchema);
}

@Override
Expand All @@ -491,7 +493,7 @@ public static class FastDeserializerWithAvroGenericImpl<V> implements FastDeseri
private final GenericDatumReader<V> datumReader;

public FastDeserializerWithAvroGenericImpl(Schema writerSchema, Schema readerSchema) {
this.datumReader = new GenericDatumReader<>(writerSchema, readerSchema);
this.datumReader = new ColdGenericDatumReader<>(writerSchema, readerSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.linkedin.avro.fastserde.coldstart;

import com.linkedin.avro.api.PrimitiveFloatList;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;

public class ColdPrimitiveFloatList extends GenericData.Array<Float> implements PrimitiveFloatList {
private static final Schema SCHEMA = Schema.createArray(Schema.create(Schema.Type.FLOAT));
public ColdPrimitiveFloatList(int capacity) {
super(capacity, SCHEMA);
}

@Override
public float getPrimitive(int index) {
return get(index);
}

@Override
public boolean addPrimitive(float o) {
return add(o);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.apache.avro.generic;

import com.linkedin.avro.fastserde.coldstart.ColdPrimitiveFloatList;
import org.apache.avro.Schema;


/**
* An interface with default implementation in order to defeat the lack of multiple inheritance.
*/
public interface ColdDatumReaderMixIn {
default Object newArray(Object old, int size, Schema schema, NewArrayFunction fallBackFunction) {
switch (schema.getElementType().getType()) {
case FLOAT:
if (null == old || !(old instanceof ColdPrimitiveFloatList)) {
old = new ColdPrimitiveFloatList(size);
}
return old;
// TODO: Add more cases when we support more primitive array types
default:
return fallBackFunction.newArray(old, size, schema);
}
}

interface NewArrayFunction {
Object newArray(Object old, int size, Schema schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.avro.generic;

import org.apache.avro.Schema;


/**
* A light-weight extension of {@link GenericDatumReader} which merely ensures that the types of the
* extended API are always returned.
*
* This class needs to be in the org.apache.avro.generic package in order to access protected methods.
*/
public class ColdGenericDatumReader<T> extends GenericDatumReader<T> implements ColdDatumReaderMixIn {
public ColdGenericDatumReader(Schema writerSchema, Schema readerSchema) {
super(writerSchema, readerSchema);
}

@Override
protected Object newArray(Object old, int size, Schema schema) {
return newArray(old, size, schema, super::newArray);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.apache.avro.generic;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificDatumReader;


/**
* A light-weight extension of {@link GenericDatumReader} which merely ensures that the types of
* the extended API are always returned.
*
* This class needs to be in the org.apache.avro.generic package in order to access protected methods.
*/
public class ColdSpecificDatumReader<T> extends SpecificDatumReader<T> implements ColdDatumReaderMixIn {
public ColdSpecificDatumReader(Schema writerSchema, Schema readerSchema) {
super(writerSchema, readerSchema);
}

@Override
protected Object newArray(Object old, int size, Schema schema) {
return newArray(old, size, schema, super::newArray);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void testPrimitiveFloatListAddPrimitive() {
long startTime = System.currentTimeMillis();

for (int i = 0; i < iteration; i++) {
PrimitiveFloatList list = new PrimitiveFloatList(array_size);
ByteBufferBackedPrimitiveFloatList list = new ByteBufferBackedPrimitiveFloatList(array_size);

for (int l = 0; l < array_size; l++) {
list.addPrimitive((float) l);
Expand Down
Loading

0 comments on commit 0500654

Please sign in to comment.