Skip to content

Commit

Permalink
[vpj] Use native Spark readers for reading from Avro and VSON files (#…
Browse files Browse the repository at this point in the history
…1191)

Currently, we implement a custom input data source to read Avro and VSON files from HDFS. This input data source isn't optimized for large files (i.e. files larger than HDFS block size) and is not aware of blocks and location of those on HDFS. This PR modifies the input logic to use Spark's native Avro and SequenceFile input formats that should be much more optimal for files larger than the block size on HDFS.

Additionally, internally at LinkedIn, we have another input format that reads from a Spark table. This commit adds a lot of the base code that can be used to add arbitrary Spark dataframes as supported input formats. The remaining gap is the schema validation done before the DataWriterComputeJob.
  • Loading branch information
nisargthakkar committed Sep 24, 2024
1 parent 9b33502 commit 5d92c7d
Show file tree
Hide file tree
Showing 10 changed files with 1,803 additions and 44 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ ext.libraries = [
grpcServices: "io.grpc:grpc-services:${grpcVersion}",
grpcStub: "io.grpc:grpc-stub:${grpcVersion}",
hadoopCommon: "org.apache.hadoop:hadoop-common:${hadoopVersion}",
hadoopHdfs: "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}",
httpAsyncClient: 'org.apache.httpcomponents:httpasyncclient:4.1.5',
httpClient5: 'org.apache.httpcomponents.client5:httpclient5:5.3',
httpCore5: 'org.apache.httpcomponents.core5:httpcore5:5.2.4',
Expand Down
7 changes: 7 additions & 0 deletions clients/venice-push-job/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ dependencies {
exclude group: 'javax.servlet'
}

implementation (libraries.hadoopHdfs) {
// Exclude transitive dependency
exclude group: 'org.apache.avro'
exclude group: 'javax.servlet'
exclude group: 'com.fasterxml.jackson.core'
}

implementation (libraries.apacheSparkAvro) {
// Spark 3.1 depends on Avro 1.8.2 - which uses avro-mapred with the hadoop2 classifier. Starting from Avro 1.9
// onwards, avro-mapred is no longer published with a hadoop2 classifier, but Gradle still looks for one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@
import static com.linkedin.venice.vpj.VenicePushJobConstants.TELEMETRY_MESSAGE_INTERVAL;
import static com.linkedin.venice.vpj.VenicePushJobConstants.TOPIC_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_SCHEMA_ID_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.VSON_PUSH;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.annotation.NotThreadsafe;
import com.linkedin.venice.exceptions.RecordTooLargeException;
Expand All @@ -21,17 +19,12 @@
import com.linkedin.venice.guid.GuidUtils;
import com.linkedin.venice.hadoop.InputStorageQuotaTracker;
import com.linkedin.venice.hadoop.engine.EngineTaskConfigProvider;
import com.linkedin.venice.hadoop.input.recordreader.AbstractVeniceRecordReader;
import com.linkedin.venice.hadoop.input.recordreader.avro.VeniceAvroRecordReader;
import com.linkedin.venice.hadoop.input.recordreader.vson.VeniceVsonRecordReader;
import com.linkedin.venice.hadoop.task.TaskTracker;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.pubsub.api.PubSubProduceResult;
import com.linkedin.venice.pubsub.api.PubSubProducerCallback;
import com.linkedin.venice.serialization.DefaultSerializer;
import com.linkedin.venice.serializer.FastSerializerDeserializerFactory;
import com.linkedin.venice.serializer.RecordDeserializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.PartitionUtils;
import com.linkedin.venice.utils.SystemTime;
Expand All @@ -44,7 +37,6 @@
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
import com.linkedin.venice.writer.VeniceWriterOptions;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -56,9 +48,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -247,7 +236,7 @@ protected VeniceWriterMessage extract(
if (duplicateKeyPrinter == null) {
throw new VeniceException("'DuplicateKeyPrinter' is not initialized properly");
}
duplicateKeyPrinter.detectAndHandleDuplicateKeys(keyBytes, valueBytes, values, dataWriterTaskTracker);
duplicateKeyPrinter.detectAndHandleDuplicateKeys(valueBytes, values, dataWriterTaskTracker);
return new VeniceWriterMessage(
keyBytes,
valueBytes,
Expand Down Expand Up @@ -545,24 +534,13 @@ public static class DuplicateKeyPrinter implements AutoCloseable, Closeable {

private final boolean isDupKeyAllowed;

private final Schema keySchema;
private final RecordDeserializer<?> keyDeserializer;
private final GenericDatumWriter<Object> avroDatumWriter;
private int numOfDupKey = 0;

DuplicateKeyPrinter(VeniceProperties props) {
this.isDupKeyAllowed = props.getBoolean(ALLOW_DUPLICATE_KEY, false);

AbstractVeniceRecordReader schemaReader = props.getBoolean(VSON_PUSH, false)
? new VeniceVsonRecordReader(props)
: VeniceAvroRecordReader.fromProps(props);
this.keySchema = schemaReader.getKeySchema();
this.keyDeserializer = FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(keySchema, keySchema);
this.avroDatumWriter = new GenericDatumWriter<>(keySchema);
}

protected void detectAndHandleDuplicateKeys(
byte[] keyBytes,
byte[] valueBytes,
Iterator<byte[]> values,
DataWriterTaskTracker dataWriterTaskTracker) {
Expand All @@ -579,7 +557,8 @@ protected void detectAndHandleDuplicateKeys(
identicalValuesToKeyCount++;
if (shouldPrint) {
shouldPrint = false;
LOGGER.warn(printDuplicateKey(keyBytes));
numOfDupKey++;
LOGGER.warn("There are multiple records for the same key");
}
} else {
// Distinct values map to the same key. E.g. key:[ value_1, value_2 ]
Expand All @@ -588,7 +567,8 @@ protected void detectAndHandleDuplicateKeys(
if (isDupKeyAllowed) {
if (shouldPrint) {
shouldPrint = false;
LOGGER.warn(printDuplicateKey(keyBytes));
numOfDupKey++;
LOGGER.warn("There are multiple records for the same key");
}
}
}
Expand All @@ -597,21 +577,6 @@ protected void detectAndHandleDuplicateKeys(
dataWriterTaskTracker.trackDuplicateKeyWithDistinctValue(distinctValuesToKeyCount);
}

private String printDuplicateKey(byte[] keyBytes) {
Object keyRecord = keyDeserializer.deserialize(keyBytes);
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
Encoder jsonEncoder = AvroCompatibilityHelper.newJsonEncoder(keySchema, output, false);
avroDatumWriter.write(keyRecord, jsonEncoder);
jsonEncoder.flush();
output.flush();

numOfDupKey++;
return String.format("There are multiple records for key:\n%s", new String(output.toByteArray()));
} catch (IOException exception) {
throw new VeniceException(exception);
}
}

@Override
public void close() {
// Nothing to do
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,40 @@
package com.linkedin.venice.spark.datawriter.jobs;

import static com.linkedin.venice.spark.SparkConstants.DEFAULT_SCHEMA;
import static com.linkedin.venice.vpj.VenicePushJobConstants.ETL_VALUE_SCHEMA_TRANSFORMATION;
import static com.linkedin.venice.vpj.VenicePushJobConstants.FILE_KEY_SCHEMA;
import static com.linkedin.venice.vpj.VenicePushJobConstants.FILE_VALUE_SCHEMA;
import static com.linkedin.venice.vpj.VenicePushJobConstants.GENERATE_PARTIAL_UPDATE_RECORD_FROM_INPUT;
import static com.linkedin.venice.vpj.VenicePushJobConstants.GLOB_FILTER_PATTERN;
import static com.linkedin.venice.vpj.VenicePushJobConstants.INPUT_PATH_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.KEY_FIELD_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.SCHEMA_STRING_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.SPARK_NATIVE_INPUT_FORMAT_ENABLED;
import static com.linkedin.venice.vpj.VenicePushJobConstants.UPDATE_SCHEMA_STRING_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.VALUE_FIELD_PROP;
import static com.linkedin.venice.vpj.VenicePushJobConstants.VSON_PUSH;

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.venice.hadoop.PushJobSetting;
import com.linkedin.venice.hadoop.input.recordreader.avro.VeniceAvroRecordReader;
import com.linkedin.venice.hadoop.input.recordreader.vson.VeniceVsonRecordReader;
import com.linkedin.venice.spark.input.hdfs.VeniceHdfsSource;
import com.linkedin.venice.spark.utils.RowToAvroConverter;
import com.linkedin.venice.utils.VeniceProperties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;


/**
Expand All @@ -29,6 +46,19 @@ protected Dataset<Row> getUserInputDataFrame() {
SparkSession sparkSession = getSparkSession();
PushJobSetting pushJobSetting = getPushJobSetting();

VeniceProperties jobProps = getJobProperties();
boolean useNativeInputFormat = jobProps.getBoolean(SPARK_NATIVE_INPUT_FORMAT_ENABLED, false);

if (!useNativeInputFormat) {
return getDataFrameFromCustomInputFormat(sparkSession, pushJobSetting);
} else if (pushJobSetting.isAvro) {
return getAvroDataFrame(sparkSession, pushJobSetting);
} else {
return getVsonDataFrame(sparkSession, pushJobSetting);
}
}

private Dataset<Row> getDataFrameFromCustomInputFormat(SparkSession sparkSession, PushJobSetting pushJobSetting) {
DataFrameReader dataFrameReader = sparkSession.read();
dataFrameReader.format(VeniceHdfsSource.class.getCanonicalName());
setInputConf(sparkSession, dataFrameReader, INPUT_PATH_PROP, new Path(pushJobSetting.inputURI).toString());
Expand All @@ -55,4 +85,53 @@ protected Dataset<Row> getUserInputDataFrame() {
}
return dataFrameReader.load();
}

private Dataset<Row> getAvroDataFrame(SparkSession sparkSession, PushJobSetting pushJobSetting) {
Dataset<Row> df =
sparkSession.read().format("avro").option("pathGlobFilter", GLOB_FILTER_PATTERN).load(pushJobSetting.inputURI);

// Transforming the input data format
df = df.map((MapFunction<Row, Row>) (record) -> {
Schema updateSchema = null;
if (pushJobSetting.generatePartialUpdateRecordFromInput) {
updateSchema = AvroCompatibilityHelper.parse(pushJobSetting.valueSchemaString);
}

GenericRecord rowRecord = RowToAvroConverter.convert(record, pushJobSetting.inputDataSchema);
VeniceAvroRecordReader recordReader = new VeniceAvroRecordReader(
pushJobSetting.inputDataSchema,
pushJobSetting.keyField,
pushJobSetting.valueField,
pushJobSetting.etlValueSchemaTransformation,
updateSchema);

AvroWrapper<IndexedRecord> recordAvroWrapper = new AvroWrapper<>(rowRecord);
final byte[] inputKeyBytes = recordReader.getKeyBytes(recordAvroWrapper, null);
final byte[] inputValueBytes = recordReader.getValueBytes(recordAvroWrapper, null);

return new GenericRowWithSchema(new Object[] { inputKeyBytes, inputValueBytes }, DEFAULT_SCHEMA);
}, RowEncoder.apply(DEFAULT_SCHEMA));

return df;
}

@Deprecated
private Dataset<Row> getVsonDataFrame(SparkSession sparkSession, PushJobSetting pushJobSetting) {
JavaRDD<Row> rdd = sparkSession.sparkContext()
.sequenceFile(pushJobSetting.inputURI, BytesWritable.class, BytesWritable.class)
.toJavaRDD()
.map(record -> {
VeniceVsonRecordReader recordReader = new VeniceVsonRecordReader(
pushJobSetting.vsonInputKeySchemaString,
pushJobSetting.vsonInputValueSchemaString,
pushJobSetting.keyField,
pushJobSetting.valueField);

final byte[] inputKeyBytes = recordReader.getKeyBytes(record._1, record._2);
final byte[] inputValueBytes = recordReader.getValueBytes(record._1, record._2);

return new GenericRowWithSchema(new Object[] { inputKeyBytes, inputValueBytes }, DEFAULT_SCHEMA);
});
return sparkSession.createDataFrame(rdd, DEFAULT_SCHEMA);
}
}
Loading

0 comments on commit 5d92c7d

Please sign in to comment.