Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of avro codegen #535

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@
import com.squareup.javapoet.JavaFile;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -57,85 +55,65 @@ public void createOperations(BuilderPluginContext context) {
}

private void generateCode(OperationContext opContext) {
//mkdir any output folders that don't exist
if (!config.getOutputSpecificRecordClassesRoot().exists() && !config.getOutputSpecificRecordClassesRoot()
.mkdirs()) {
throw new IllegalStateException(
"unable to create destination folder " + config.getOutputSpecificRecordClassesRoot());
// Make sure the output folder exists
File outputFolder = config.getOutputSpecificRecordClassesRoot();
if (!outputFolder.exists() && !outputFolder.mkdirs()) {
throw new IllegalStateException("unable to create output folder " + outputFolder);
}
final Path outputDirectoryPath = outputFolder.toPath();

final AtomicInteger schemaCounter = new AtomicInteger(0);
final int schemaChunkSize = 500;
Collection<List<AvroNamedSchema>> allNamedSchemaList = opContext.getAvroSchemas().stream().flatMap(schema -> {
if (schema instanceof AvroNamedSchema) {
return Stream.of((AvroNamedSchema) schema);
} else if (AvroType.UNION.equals(schema.type())) {
return ((AvroUnionSchema) schema).getTypes()
.stream()
.map(schemaOrRef -> (AvroNamedSchema) schemaOrRef.getSchema());
} else {
return Stream.empty();
}
}).collect(Collectors.groupingBy(it -> schemaCounter.getAndIncrement() / schemaChunkSize)).values();

// Collect all named schemas.
long collectStart = System.currentTimeMillis();
Collection<AvroNamedSchema> allNamedSchemas = opContext.getAvroSchemas()
.stream()
.flatMap(schema -> {
if (schema instanceof AvroNamedSchema) {
return Stream.of((AvroNamedSchema) schema);
} else if (AvroType.UNION.equals(schema.type())) {
return ((AvroUnionSchema) schema).getTypes()
.stream()
.filter(schemaOrRef -> schemaOrRef.getSchema() instanceof AvroNamedSchema)
.map(schemaOrRef -> (AvroNamedSchema) schemaOrRef.getSchema());
} else {
return Stream.empty();
}
})
.flatMap(namedSchema -> {
// Collect inner schemas.
return Stream.concat(Stream.of(namedSchema),
SpecificRecordGeneratorUtil.getNestedInternalSchemaList(namedSchema).stream());
})
.filter(namedSchema -> {
// Skip codegen if schema is on classpath and config says to skip
return !config.shouldSkipCodegenIfSchemaOnClasspath() || !doesSchemaExistOnClasspath(namedSchema,
opContext.getLookupSchemaSet());
})
.collect(Collectors.toMap(AvroNamedSchema::getFullName, Function.identity(), (s1, s2) -> s1))
.values();
long collectEnd = System.currentTimeMillis();
LOGGER.info("Collected {} named avro schemas in {} millis", allNamedSchemas.size(), collectEnd - collectStart);

// Generate avro binding java classes.
long genStart = System.currentTimeMillis();

final SpecificRecordGenerationConfig generationConfig =
SpecificRecordGenerationConfig.getBroadCompatibilitySpecificRecordGenerationConfig(
AvroJavaStringRepresentation.fromJson(config.getStringRepresentation().toString()),
AvroJavaStringRepresentation.fromJson(config.getMethodStringRepresentation().toString()),
config.getMinAvroVersion(), config.isUtf8EncodingPutByIndexEnabled());

// Make sure the output folder exists
File outputFolder = config.getOutputSpecificRecordClassesRoot();
if (!outputFolder.exists() && !outputFolder.mkdirs()) {
throw new IllegalStateException("unable to create output folder " + outputFolder);
}
final Path outputDirectoryPath = outputFolder.toPath();
final SpecificRecordClassGenerator generator = new SpecificRecordClassGenerator();

int totalGeneratedClasses = allNamedSchemaList.parallelStream().map(allNamedSchemas -> {
HashSet<String> alreadyGeneratedSchemaNames = new HashSet<>();
List<JavaFile> generatedSpecificClasses = new ArrayList<>(allNamedSchemas.size());
for (AvroNamedSchema namedSchema : allNamedSchemas) {
try {
if (!alreadyGeneratedSchemaNames.contains(namedSchema.getFullName())) {
// skip codegen if schema is on classpath and config says to skip
if (config.shouldSkipCodegenIfSchemaOnClasspath() &&
doesSchemaExistOnClasspath(namedSchema, opContext.getLookupSchemaSet())) {
continue;
}

//top level schema
alreadyGeneratedSchemaNames.add(namedSchema.getFullName());
generatedSpecificClasses.add(generator.generateSpecificClass(namedSchema, generationConfig));

// generate internal schemas if not already present
List<AvroNamedSchema> internalSchemaList =
SpecificRecordGeneratorUtil.getNestedInternalSchemaList(namedSchema);
for (AvroNamedSchema namedInternalSchema : internalSchemaList) {
if (!alreadyGeneratedSchemaNames.contains(namedInternalSchema.getFullName())) {
// skip codegen for nested schemas if schema is on classpath and config says to skip
if (config.shouldSkipCodegenIfSchemaOnClasspath() &&
doesSchemaExistOnClasspath(namedInternalSchema, opContext.getLookupSchemaSet())) {
continue;
}

generatedSpecificClasses.add(generator.generateSpecificClass(namedInternalSchema, generationConfig));
alreadyGeneratedSchemaNames.add(namedInternalSchema.getFullName());
}
}
}
} catch (Exception e) {
throw new RuntimeException("failed to generate class for " + namedSchema.getFullName(), e);
}
List<JavaFile> generatedClasses = allNamedSchemas.parallelStream().map(namedSchema -> {
try {
// Top level schema
return generator.generateSpecificClass(namedSchema, generationConfig);
} catch (Exception e) {
throw new RuntimeException("failed to generate class for " + namedSchema.getFullName(), e);
}
writeJavaFilesToDisk(generatedSpecificClasses, outputDirectoryPath);
return generatedSpecificClasses.size();
}).reduce(0, Integer::sum);

}).collect(Collectors.toList());
long genEnd = System.currentTimeMillis();
LOGGER.info("Generated {} java source files in {} millis", totalGeneratedClasses, genEnd - genStart);
LOGGER.info("Generated {} java source files in {} millis", generatedClasses.size(), genEnd - genStart);

// Write to disk.
writeJavaFilesToDisk(generatedClasses, outputDirectoryPath);
}

private boolean doesSchemaExistOnClasspath(AvroNamedSchema schema, SchemaSet schemaSet) {
Expand All @@ -162,7 +140,7 @@ private void writeJavaFilesToDisk(Collection<JavaFile> javaFiles, Path outputFol
}).reduce(0, Integer::sum);

long writeEnd = System.currentTimeMillis();
LOGGER.info("wrote out {} generated java source files under {} in {} millis", filesWritten, outputFolderPath,
LOGGER.info("Wrote out {} generated java source files under {} in {} millis", filesWritten, outputFolderPath,
writeEnd - writeStart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.squareup.javapoet.TypeName;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -206,7 +207,7 @@ public static List<AvroNamedSchema> getNestedInternalSchemaList(AvroNamedSchema
switch (type) {
case ENUM:
case FIXED:
return new ArrayList<>();
return Collections.emptyList();
case RECORD:
return getNestedInternalSchemaListForRecord((AvroRecordSchema) topLevelSchema);
default:
Expand Down
Loading