Skip to content

Commit

Permalink
Added a config to limit the code-gen and class loading
Browse files Browse the repository at this point in the history
It sets the maximum number of fast SerDes classes generated and
loaded. It helps to limit the metaspace and codecache usage
brought by fast-avro runtime code-gen and class loading.

The limit config can be set via FastSerdeCache constructors.
  • Loading branch information
Bingfeng Xia committed Aug 5, 2020
1 parent 86f47dd commit 2510845
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ public class FastDeserializerGenerator<T> extends FastDeserializerGeneratorBase<
super(useGenericTypes, writer, reader, destination, classLoader, compileClassPath);
}

FastDeserializerGenerator(boolean useGenericTypes, Schema writer, Schema reader, File destination,
ClassLoader classLoader, String compileClassPath, int loadClassLimit) {
super(useGenericTypes, writer, reader, destination, classLoader, compileClassPath, loadClassLimit);
}

public FastDeserializer<T> generateDeserializer() {
String className = getClassName(writer, reader, useGenericTypes ? "Generic" : "Specific");
JPackage classPackage = codeModel._package(generatedPackageName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ public abstract class FastDeserializerGeneratorBase<T> extends FastSerdeBase {
this.reader = reader;
}

FastDeserializerGeneratorBase(boolean useGenericTypes, Schema writer, Schema reader, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super("deserialization", useGenericTypes, Utf8.class, destination, classLoader, compileClassPath, false, loadClassLimit);
this.writer = writer;
this.reader = reader;
}

protected static Symbol[] reverseSymbolArray(Symbol[] symbols) {
Symbol[] reversedSymbols = new Symbol[symbols.length];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ public final class FastGenericDeserializerGenerator<T> extends FastDeserializerG
String compileClassPath) {
super(true, writer, reader, destination, classLoader, compileClassPath);
}

FastGenericDeserializerGenerator(Schema writer, Schema reader, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super(true, writer, reader, destination, classLoader, compileClassPath, loadClassLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ public FastGenericSerializerGenerator(Schema schema, File destination, ClassLoad
String compileClassPath) {
super(true, schema, destination, classLoader, compileClassPath);
}

public FastGenericSerializerGenerator(Schema schema, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super(true, schema, destination, classLoader, compileClassPath, loadClassLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.tools.JavaCompiler;
import javax.tools.ToolProvider;
import org.apache.avro.Schema;
Expand All @@ -32,6 +33,9 @@ public abstract class FastSerdeBase {
protected static final String SEP = "_";
public static final String GENERATED_PACKAGE_NAME_PREFIX = "com.linkedin.avro.fastserde.generated.";

private volatile int loadClassLimit = Integer.MAX_VALUE;
private int loadClassNum = 0;

/**
* A repository of how many times a given name was used.
* N.B.: Does not actually need to be threadsafe, but it is made so just for defensive coding reasons.
Expand All @@ -47,6 +51,12 @@ public abstract class FastSerdeBase {
protected final String compileClassPath;
protected JDefinedClass generatedClass;

public FastSerdeBase(String description, boolean useGenericTypes, Class defaultStringClass, File destination, ClassLoader classLoader,
String compileClassPath, boolean isForSerializer, int loadClassLimit) {
this(description, useGenericTypes, defaultStringClass, destination, classLoader, compileClassPath, isForSerializer);
this.loadClassLimit = loadClassLimit;
}

public FastSerdeBase(String description, boolean useGenericTypes, Class defaultStringClass, File destination, ClassLoader classLoader,
String compileClassPath, boolean isForSerializer) {
this.useGenericTypes = useGenericTypes;
Expand Down Expand Up @@ -136,6 +146,29 @@ protected Class compileClass(final String className, Set<String> knownUsedFullyQ
throw new FastSerdeGeneratorException("Unable to compile:" + className + " from source file: " + filePath);
}

return classLoader.loadClass(generatedPackageName + "." + className);
return loadClassWithLimit(() -> {
try {
return classLoader.loadClass(generatedPackageName + "." + className);
} catch (ClassNotFoundException e) {
throw new FastSerdeGeneratorException("Unable to load:" + className + " from source file: " + filePath, e);
}
});
}

/**
* A wrapper function that limits the total number of fast de/serializer classes loaded
*
* @param supplier The function to load fast de/serializer class
*/
protected synchronized <T> T loadClassWithLimit(Supplier<T> supplier) {
if (this.loadClassNum < this.loadClassLimit) {
T classObj = null;
classObj = supplier.get();
this.loadClassNum++;
return classObj;
} else {
LOGGER.warn("Loaded fast serdes classes number {}, with limit set to {}", loadClassNum, loadClassLimit);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ public final class FastSerdeCache {

private static volatile FastSerdeCache _INSTANCE;

/**
* Fast-avro will generate and load serializer and deserializer(SerDes) classes into metaspace during runtime.
* During serialization and deserialization, fast-avro also leverages JIT compilation to boost the SerDes speed.
* And JIT compilation code is saved in code cache.
* Too much usage of metaspace and code cache will bring GC/OOM issue.
*
* We set a hard limit of the total number of SerDes classes generated and loaded by fast-avro.
* By default, the limit is set to MAX_INT.
* Fast-avro will fall back to regular avro after the limit is hit.
* One could set the limit through {@link FastSerdeCache} constructors.
*/
private volatile int generatedFastSerDesLimit = Integer.MAX_VALUE;
private final AtomicInteger generatedSerDesNum = new AtomicInteger(0);

private final Map<String, FastDeserializer<?>> fastSpecificRecordDeserializersCache =
new FastAvroConcurrentHashMap<>();
private final Map<String, FastDeserializer<?>> fastGenericRecordDeserializersCache =
Expand Down Expand Up @@ -81,6 +95,20 @@ public FastSerdeCache(Executor executorService, Supplier<String> compileClassPat
this(executorService, compileClassPathSupplier.get());
}

/**
*
* @param executorService
* {@link Executor} used by serializer/deserializer compile threads
* @param compileClassPathSupplier
* custom classpath {@link Supplier}
* @param limit
* custom number {@link #generatedFastSerDesLimit}
*/
public FastSerdeCache(Executor executorService, Supplier<String> compileClassPathSupplier, int limit) {
this(executorService, compileClassPathSupplier);
this.generatedFastSerDesLimit = limit;
}

public FastSerdeCache(String compileClassPath) {
this();
this.compileClassPath = Optional.ofNullable(compileClassPath);
Expand Down Expand Up @@ -118,10 +146,24 @@ public FastSerdeCache(Executor executorService) {
this.compileClassPath = Optional.empty();
}

public FastSerdeCache(Executor executorService, int limit) {
this(executorService);
this.generatedFastSerDesLimit = limit;
}

private FastSerdeCache() {
this((Executor) null);
}

/**
* @param limit
* custom number {@link #generatedFastSerDesLimit}
*/
public FastSerdeCache(int limit) {
this();
this.generatedFastSerDesLimit = limit;
}

/**
* Gets default {@link FastSerdeCache} instance. Default instance classpath can be customized via
* {@value #CLASSPATH} or {@value #CLASSPATH_SUPPLIER} system properties.
Expand Down Expand Up @@ -296,6 +338,25 @@ private String getSchemaKey(Schema writerSchema, Schema readerSchema) {
Utils.getSchemaFingerprint(readerSchema));
}

/**
* A wrapper function that limits the total number of fast de/serializer classes generated
*
* @param supplier The function to build and save fast de/serializer
*/
private <T> T buildFastClassWithLimit(Supplier<T> supplier) {
T result = null;
if (this.generatedSerDesNum.get() < this.generatedFastSerDesLimit) {
result = supplier.get();
} else if (this.generatedSerDesNum.get() == this.generatedFastSerDesLimit) {
// We still want to print the warning when the limit is hit
LOGGER.warn("Generated fast serdes classes number hits limit {}", this.generatedFastSerDesLimit);
} else {
LOGGER.debug("Generated serdes number {}, with fast serdes limit set to {}", this.generatedSerDesNum.get(), this.generatedFastSerDesLimit);
}
generatedSerDesNum.incrementAndGet();
return result;
}

/**
* This function will generate a fast specific deserializer, and it will throw exception if anything wrong happens.
* This function can be used to verify whether current {@link FastSerdeCache} could generate proper fast deserializer.
Expand All @@ -307,7 +368,7 @@ private String getSchemaKey(Schema writerSchema, Schema readerSchema) {
public FastDeserializer<?> buildFastSpecificDeserializer(Schema writerSchema, Schema readerSchema) {
FastSpecificDeserializerGenerator<?> generator =
new FastSpecificDeserializerGenerator<>(writerSchema, readerSchema, classesDir, classLoader,
compileClassPath.orElseGet(() -> null));
compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit);
LOGGER.info("Generated class dir: {}, and generation of specific FastDeserializer is done for writer schema: "
+ "[\n{}\n] and reader schema: [\n{}\n]", classesDir, writerSchema.toString(true), readerSchema.toString(true));
return generator.generateDeserializer();
Expand All @@ -322,7 +383,10 @@ public FastDeserializer<?> buildFastSpecificDeserializer(Schema writerSchema, Sc
*/
private FastDeserializer<?> buildSpecificDeserializer(Schema writerSchema, Schema readerSchema) {
try {
return buildFastSpecificDeserializer(writerSchema, readerSchema);
FastDeserializer<?> fastSpecificDeserializer = buildFastClassWithLimit(() -> buildFastSpecificDeserializer(writerSchema, readerSchema));
if (fastSpecificDeserializer != null) {
return fastSpecificDeserializer;
}
} catch (FastDeserializerGeneratorException e) {
LOGGER.warn("Deserializer generation exception when generating specific FastDeserializer for writer schema: "
+ "[\n{}\n] and reader schema: [\n{}\n]", writerSchema.toString(true), readerSchema.toString(true), e);
Expand Down Expand Up @@ -351,7 +415,7 @@ public Object deserialize(Object reuse, Decoder d) throws IOException {
public FastDeserializer<?> buildFastGenericDeserializer(Schema writerSchema, Schema readerSchema) {
FastGenericDeserializerGenerator<?> generator =
new FastGenericDeserializerGenerator<>(writerSchema, readerSchema, classesDir, classLoader,
compileClassPath.orElseGet(() -> null));
compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit);
LOGGER.info("Generated classes dir: {} and generation of generic FastDeserializer is done for writer schema: "
+ "[\n{}\n] and reader schema:[\n{}\n]", classesDir, writerSchema.toString(true), readerSchema.toString(true));
return generator.generateDeserializer();
Expand All @@ -367,7 +431,10 @@ public FastDeserializer<?> buildFastGenericDeserializer(Schema writerSchema, Sch
*/
private FastDeserializer<?> buildGenericDeserializer(Schema writerSchema, Schema readerSchema) {
try {
return buildFastGenericDeserializer(writerSchema, readerSchema);
FastDeserializer<?> fastGenericDeserializer = buildFastClassWithLimit(() -> buildFastGenericDeserializer(writerSchema, readerSchema));
if (fastGenericDeserializer != null) {
return fastGenericDeserializer;
}
} catch (FastDeserializerGeneratorException e) {
LOGGER.warn("Deserializer generation exception when generating generic FastDeserializer for writer schema: [\n"
+ writerSchema.toString(true) + "\n] and reader schema:[\n" + readerSchema.toString(true) + "\n]", e);
Expand All @@ -392,7 +459,7 @@ public FastSerializer<?> buildFastSpecificSerializer(Schema schema) {
Utils.getAvroVersionsSupportedForSerializer());
}
FastSpecificSerializerGenerator<?> generator =
new FastSpecificSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null));
new FastSpecificSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit);
LOGGER.info("Generated classes dir: {} and generation of specific FastSerializer is done for schema: [\n{}\n]",
classesDir, schema.toString(true));
return generator.generateSerializer();
Expand All @@ -402,7 +469,10 @@ private FastSerializer<?> buildSpecificSerializer(Schema schema) {
if (Utils.isSupportedAvroVersionsForSerializer()) {
// Only build fast specific serializer for supported Avro versions.
try {
return buildFastSpecificSerializer(schema);
FastSerializer<?> fastSpecificSerializer = buildFastClassWithLimit(() -> buildFastSpecificSerializer(schema));
if (fastSpecificSerializer != null) {
return fastSpecificSerializer;
}
} catch (FastDeserializerGeneratorException e) {
LOGGER.warn("Serializer generation exception when generating specific FastSerializer for schema: [\n{}\n]",
schema.toString(true), e);
Expand All @@ -428,7 +498,7 @@ public FastSerializer<?> buildFastGenericSerializer(Schema schema) {
+ Utils.getAvroVersionsSupportedForSerializer());
}
FastGenericSerializerGenerator<?> generator =
new FastGenericSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null));
new FastGenericSerializerGenerator<>(schema, classesDir, classLoader, compileClassPath.orElseGet(() -> null), generatedFastSerDesLimit);
LOGGER.info("Generated classes dir: {} and generation of generic FastSerializer is done for schema: [\n{}\n]",
classesDir, schema.toString(true));
return generator.generateSerializer();
Expand All @@ -438,7 +508,10 @@ private FastSerializer<?> buildGenericSerializer(Schema schema) {
if (Utils.isSupportedAvroVersionsForSerializer()) {
// Only build fast generic serializer for supported Avro versions.
try {
return buildFastGenericSerializer(schema);
FastSerializer<?> fastGenericSerializer = buildFastClassWithLimit(() -> buildFastGenericSerializer(schema));
if (fastGenericSerializer != null) {
return fastGenericSerializer;
}
} catch (FastDeserializerGeneratorException e) {
LOGGER.warn("Serializer generation exception when generating generic FastSerializer for schema: [\n{}\n]",
schema.toString(true), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File dest
this.schema = schema;
}

public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super("serialization", useGenericTypes, CharSequence.class, destination, classLoader, compileClassPath, true, loadClassLimit);
this.schema = schema;
}

public static String getClassName(Schema schema, String description) {
Long schemaId = Math.abs(Utils.getSchemaFingerprint(schema));
String typeName = SchemaAssistant.getTypeName(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ public final class FastSpecificDeserializerGenerator<T> extends FastDeserializer
String compileClassPath) {
super(false, writer, reader, destination, classLoader, compileClassPath);
}

FastSpecificDeserializerGenerator(Schema writer, Schema reader, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super(false, writer, reader, destination, classLoader, compileClassPath, loadClassLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ public FastSpecificSerializerGenerator(Schema schema, File destination, ClassLoa
String compileClassPath) {
super(false, schema, destination, classLoader, compileClassPath);
}

public FastSpecificSerializerGenerator(Schema schema, File destination, ClassLoader classLoader,
String compileClassPath, int loadClassLimit) {
super(false, schema, destination, classLoader, compileClassPath, loadClassLimit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,51 @@ public void shouldCreateGenericDatumReader() throws IOException, InterruptedExce
Assert.assertEquals(new Utf8("test"),
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record)).get("test"));
}

@Test(groups = {"deserializationTest"})
@SuppressWarnings("unchecked")
public void shouldNotCreateFastDeserializerDueToLimit() throws IOException, InterruptedException {
// Set generatedFastSerDesLimit to 1
// Try to generate fast deserializer for the first schema
Schema recordSchema1 = createRecord("TestSchema1", createPrimitiveUnionFieldSchema("test", Schema.Type.STRING));
GenericRecord record1 = new GenericData.Record(recordSchema1);
record1.put("test", "test");

FastSerdeCache cacheLimit1 = new FastSerdeCache(Runnable::run, 1);
FastGenericDatumReader<GenericRecord> fastGenericDatumReader = new FastGenericDatumReader<>(recordSchema1, cacheLimit1);

// when
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record1));

// then
FastDeserializer<GenericRecord> fastGenericDeserializer =
(FastDeserializer<GenericRecord>) cacheLimit1.getFastGenericDeserializer(recordSchema1, recordSchema1);

fastGenericDeserializer =
(FastDeserializer<GenericRecord>) cacheLimit1.getFastGenericDeserializer(recordSchema1, recordSchema1);

Assert.assertNotNull(fastGenericDeserializer);
Assert.assertNotEquals(1, fastGenericDeserializer.getClass().getDeclaredMethods().length);
Assert.assertEquals(new Utf8("test"),
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record1)).get("test"));

// Try to generate fast deserializer for the second schema
// Verify only return FastDeserializerWithAvroGenericImpl
Schema recordSchema2 = createRecord("TestSchema2", createPrimitiveUnionFieldSchema("test", Schema.Type.STRING));
GenericRecord record2 = new GenericData.Record(recordSchema2);
record2.put("test", "test");

// when
fastGenericDatumReader.read(null, FastSerdeTestsSupport.genericDataAsDecoder(record2));

// then
FastDeserializer<GenericRecord> fastGenericDeserializer2 =
(FastDeserializer<GenericRecord>) cacheLimit1.getFastGenericDeserializer(recordSchema2, recordSchema2);

fastGenericDeserializer2 =
(FastDeserializer<GenericRecord>) cacheLimit1.getFastGenericDeserializer(recordSchema2, recordSchema2);

Assert.assertNotNull(fastGenericDeserializer2);
Assert.assertEquals(1, fastGenericDeserializer2.getClass().getDeclaredMethods().length);
}
}
Loading

0 comments on commit 2510845

Please sign in to comment.