Skip to content

Commit

Permalink
[spark] [hadoop] [vpj] Refactor to make spark a first class citizen o…
Browse files Browse the repository at this point in the history
…f com.linkedin.venice (#1183)

* Refactor to make spark a first class citizen of com.linkedin.venice
* refactor tests in the same fashion
  • Loading branch information
eldernewborn committed Sep 18, 2024
1 parent 7678a57 commit 88e0f5b
Show file tree
Hide file tree
Showing 32 changed files with 85 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark;
package com.linkedin.venice.spark;

import static org.apache.spark.sql.types.DataTypes.BinaryType;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.datawriter.jobs;
package com.linkedin.venice.spark.datawriter.jobs;

import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.KAFKA_PRODUCER_DELIVERY_TIMEOUT_MS;
Expand Down Expand Up @@ -36,17 +36,17 @@
import static com.linkedin.venice.hadoop.VenicePushJobConstants.ZSTD_COMPRESSION_LEVEL;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.ZSTD_DICTIONARY_CREATION_REQUIRED;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.ZSTD_DICTIONARY_CREATION_SUCCESS;
import static com.linkedin.venice.hadoop.spark.SparkConstants.DEFAULT_SCHEMA;
import static com.linkedin.venice.hadoop.spark.SparkConstants.DEFAULT_SCHEMA_WITH_PARTITION;
import static com.linkedin.venice.hadoop.spark.SparkConstants.DEFAULT_SPARK_CLUSTER;
import static com.linkedin.venice.hadoop.spark.SparkConstants.KEY_COLUMN_NAME;
import static com.linkedin.venice.hadoop.spark.SparkConstants.PARTITION_COLUMN_NAME;
import static com.linkedin.venice.hadoop.spark.SparkConstants.SPARK_CASE_SENSITIVE_CONFIG;
import static com.linkedin.venice.hadoop.spark.SparkConstants.SPARK_CLUSTER_CONFIG;
import static com.linkedin.venice.hadoop.spark.SparkConstants.SPARK_DATA_WRITER_CONF_PREFIX;
import static com.linkedin.venice.hadoop.spark.SparkConstants.SPARK_LEADER_CONFIG;
import static com.linkedin.venice.hadoop.spark.SparkConstants.SPARK_SESSION_CONF_PREFIX;
import static com.linkedin.venice.hadoop.spark.SparkConstants.VALUE_COLUMN_NAME;
import static com.linkedin.venice.spark.SparkConstants.DEFAULT_SCHEMA;
import static com.linkedin.venice.spark.SparkConstants.DEFAULT_SCHEMA_WITH_PARTITION;
import static com.linkedin.venice.spark.SparkConstants.DEFAULT_SPARK_CLUSTER;
import static com.linkedin.venice.spark.SparkConstants.KEY_COLUMN_NAME;
import static com.linkedin.venice.spark.SparkConstants.PARTITION_COLUMN_NAME;
import static com.linkedin.venice.spark.SparkConstants.SPARK_CASE_SENSITIVE_CONFIG;
import static com.linkedin.venice.spark.SparkConstants.SPARK_CLUSTER_CONFIG;
import static com.linkedin.venice.spark.SparkConstants.SPARK_DATA_WRITER_CONF_PREFIX;
import static com.linkedin.venice.spark.SparkConstants.SPARK_LEADER_CONFIG;
import static com.linkedin.venice.spark.SparkConstants.SPARK_SESSION_CONF_PREFIX;
import static com.linkedin.venice.spark.SparkConstants.VALUE_COLUMN_NAME;

import com.github.luben.zstd.Zstd;
import com.linkedin.venice.compression.CompressionStrategy;
Expand All @@ -55,16 +55,16 @@
import com.linkedin.venice.hadoop.PushJobSetting;
import com.linkedin.venice.hadoop.input.kafka.ttl.TTLResolutionPolicy;
import com.linkedin.venice.hadoop.jobs.DataWriterComputeJob;
import com.linkedin.venice.hadoop.spark.datawriter.partition.PartitionSorter;
import com.linkedin.venice.hadoop.spark.datawriter.partition.VeniceSparkPartitioner;
import com.linkedin.venice.hadoop.spark.datawriter.recordprocessor.SparkInputRecordProcessorFactory;
import com.linkedin.venice.hadoop.spark.datawriter.task.DataWriterAccumulators;
import com.linkedin.venice.hadoop.spark.datawriter.task.SparkDataWriterTaskTracker;
import com.linkedin.venice.hadoop.spark.datawriter.writer.SparkPartitionWriterFactory;
import com.linkedin.venice.hadoop.spark.utils.SparkPartitionUtils;
import com.linkedin.venice.hadoop.spark.utils.SparkScalaUtils;
import com.linkedin.venice.hadoop.ssl.TempFileSSLConfigurator;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.spark.datawriter.partition.PartitionSorter;
import com.linkedin.venice.spark.datawriter.partition.VeniceSparkPartitioner;
import com.linkedin.venice.spark.datawriter.recordprocessor.SparkInputRecordProcessorFactory;
import com.linkedin.venice.spark.datawriter.task.DataWriterAccumulators;
import com.linkedin.venice.spark.datawriter.task.SparkDataWriterTaskTracker;
import com.linkedin.venice.spark.datawriter.writer.SparkPartitionWriterFactory;
import com.linkedin.venice.spark.utils.SparkPartitionUtils;
import com.linkedin.venice.spark.utils.SparkScalaUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.writer.VeniceWriter;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.datawriter.jobs;
package com.linkedin.venice.spark.datawriter.jobs;

import static com.linkedin.venice.hadoop.VenicePushJobConstants.ETL_VALUE_SCHEMA_TRANSFORMATION;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.FILE_KEY_SCHEMA;
Expand All @@ -12,7 +12,7 @@
import static com.linkedin.venice.hadoop.VenicePushJobConstants.VSON_PUSH;

import com.linkedin.venice.hadoop.PushJobSetting;
import com.linkedin.venice.hadoop.spark.input.hdfs.VeniceHdfsSource;
import com.linkedin.venice.spark.input.hdfs.VeniceHdfsSource;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.hadoop.spark.datawriter.partition;
package com.linkedin.venice.spark.datawriter.partition;

import com.linkedin.venice.spark.datawriter.writer.SparkPartitionWriter;
import com.linkedin.venice.utils.ArrayUtils;
import java.io.Serializable;
import java.util.Comparator;
Expand All @@ -10,7 +11,7 @@
* Sort the rows based on the key and value in ascending order using unsigned byte comparison.
* <ul>
* <li>The sorting on the key is the same as what RocksDB and Shuffle-Sort in MapReduce use.</li>
* <li>The sorting on the value is to make {@link com.linkedin.venice.hadoop.spark.datawriter.writer.SparkPartitionWriter} be able to optimize the de-duping of values.</li>
* <li>The sorting on the value is to make {@link SparkPartitionWriter} be able to optimize the de-duping of values.</li>
* </ul>
*/
public class PartitionSorter implements Comparator<Row>, Serializable {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.datawriter.partition;
package com.linkedin.venice.spark.datawriter.partition;

import static com.linkedin.venice.hadoop.VenicePushJobConstants.PARTITION_COUNT;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package com.linkedin.venice.hadoop.spark.datawriter.recordprocessor;

import static com.linkedin.venice.hadoop.spark.SparkConstants.DEFAULT_SCHEMA;
import static com.linkedin.venice.hadoop.spark.SparkConstants.KEY_COLUMN_NAME;
import static com.linkedin.venice.hadoop.spark.SparkConstants.VALUE_COLUMN_NAME;
package com.linkedin.venice.spark.datawriter.recordprocessor;

import com.linkedin.venice.hadoop.input.recordreader.AbstractVeniceRecordReader;
import com.linkedin.venice.hadoop.input.recordreader.avro.IdentityVeniceRecordReader;
import com.linkedin.venice.hadoop.spark.datawriter.task.DataWriterAccumulators;
import com.linkedin.venice.hadoop.spark.datawriter.task.SparkDataWriterTaskTracker;
import com.linkedin.venice.hadoop.spark.engine.SparkEngineTaskConfigProvider;
import com.linkedin.venice.hadoop.task.datawriter.AbstractInputRecordProcessor;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
import com.linkedin.venice.spark.SparkConstants;
import com.linkedin.venice.spark.datawriter.task.DataWriterAccumulators;
import com.linkedin.venice.spark.datawriter.task.SparkDataWriterTaskTracker;
import com.linkedin.venice.spark.engine.SparkEngineTaskConfigProvider;
import com.linkedin.venice.utils.VeniceProperties;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -36,8 +33,8 @@ public SparkInputRecordProcessor(Properties jobProperties, DataWriterAccumulator

public Iterator<Row> processRecord(Row record) {
List<Row> outputRows = new ArrayList<>();
ByteBuffer keyBB = ByteBuffer.wrap(record.getAs(KEY_COLUMN_NAME));
byte[] value = record.getAs(VALUE_COLUMN_NAME);
ByteBuffer keyBB = ByteBuffer.wrap(record.getAs(SparkConstants.KEY_COLUMN_NAME));
byte[] value = record.getAs(SparkConstants.VALUE_COLUMN_NAME);
ByteBuffer valueBB = value == null ? null : ByteBuffer.wrap(value);
super.processRecord(keyBB, valueBB, getRecordEmitter(outputRows), dataWriterTaskTracker);
return outputRows.iterator();
Expand All @@ -50,7 +47,7 @@ protected AbstractVeniceRecordReader<ByteBuffer, ByteBuffer> getRecordReader(Ven

private BiConsumer<byte[], byte[]> getRecordEmitter(List<Row> rows) {
return (key, value) -> {
rows.add(new GenericRowWithSchema(new Object[] { key, value }, DEFAULT_SCHEMA));
rows.add(new GenericRowWithSchema(new Object[] { key, value }, SparkConstants.DEFAULT_SCHEMA));
};
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.hadoop.spark.datawriter.recordprocessor;
package com.linkedin.venice.spark.datawriter.recordprocessor;

import com.linkedin.venice.hadoop.spark.datawriter.task.DataWriterAccumulators;
import com.linkedin.venice.spark.datawriter.task.DataWriterAccumulators;
import java.util.Iterator;
import java.util.Properties;
import org.apache.spark.api.java.function.FlatMapFunction;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.datawriter.task;
package com.linkedin.venice.spark.datawriter.task;

import java.io.Serializable;
import org.apache.spark.SparkContext;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.datawriter.task;
package com.linkedin.venice.spark.datawriter.task;

import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.linkedin.venice.hadoop.spark.datawriter.writer;
package com.linkedin.venice.spark.datawriter.writer;

import static com.linkedin.venice.hadoop.spark.SparkConstants.KEY_COLUMN_NAME;
import static com.linkedin.venice.hadoop.spark.SparkConstants.VALUE_COLUMN_NAME;
import static com.linkedin.venice.spark.SparkConstants.KEY_COLUMN_NAME;
import static com.linkedin.venice.spark.SparkConstants.VALUE_COLUMN_NAME;

import com.linkedin.venice.hadoop.spark.datawriter.task.DataWriterAccumulators;
import com.linkedin.venice.hadoop.spark.datawriter.task.SparkDataWriterTaskTracker;
import com.linkedin.venice.hadoop.spark.engine.SparkEngineTaskConfigProvider;
import com.linkedin.venice.hadoop.task.datawriter.AbstractPartitionWriter;
import com.linkedin.venice.spark.datawriter.task.DataWriterAccumulators;
import com.linkedin.venice.spark.datawriter.task.SparkDataWriterTaskTracker;
import com.linkedin.venice.spark.engine.SparkEngineTaskConfigProvider;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.hadoop.spark.datawriter.writer;
package com.linkedin.venice.spark.datawriter.writer;

import com.linkedin.venice.hadoop.spark.datawriter.task.DataWriterAccumulators;
import com.linkedin.venice.spark.datawriter.task.DataWriterAccumulators;
import java.util.Iterator;
import java.util.Properties;
import org.apache.spark.api.java.function.MapPartitionsFunction;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.hadoop.spark.engine;
package com.linkedin.venice.spark.engine;

import static com.linkedin.venice.hadoop.spark.SparkConstants.SPARK_APP_NAME_CONFIG;
import static com.linkedin.venice.spark.SparkConstants.SPARK_APP_NAME_CONFIG;

import com.linkedin.venice.hadoop.engine.EngineTaskConfigProvider;
import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.input;
package com.linkedin.venice.spark.input;

import com.linkedin.venice.hadoop.input.recordreader.VeniceRecordIterator;
import com.linkedin.venice.utils.Utils;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.input.hdfs;
package com.linkedin.venice.spark.input.hdfs;

import java.net.URI;
import org.apache.hadoop.fs.Path;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.input.hdfs;
package com.linkedin.venice.spark.input.hdfs;

import static com.linkedin.venice.hadoop.VenicePushJobConstants.VSON_PUSH;

Expand All @@ -8,7 +8,7 @@
import com.linkedin.venice.hadoop.input.recordreader.avro.VeniceAvroRecordReader;
import com.linkedin.venice.hadoop.input.recordreader.vson.VeniceVsonFileIterator;
import com.linkedin.venice.hadoop.input.recordreader.vson.VeniceVsonRecordReader;
import com.linkedin.venice.hadoop.spark.input.VeniceAbstractPartitionReader;
import com.linkedin.venice.spark.input.VeniceAbstractPartitionReader;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.input.hdfs;
package com.linkedin.venice.spark.input.hdfs;

import com.linkedin.venice.utils.VeniceProperties;
import org.apache.spark.sql.catalyst.InternalRow;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.linkedin.venice.hadoop.spark.input.hdfs;
package com.linkedin.venice.spark.input.hdfs;

import static com.linkedin.venice.hadoop.VenicePushJobConstants.INPUT_PATH_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.PATH_FILTER;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.spark.SparkConstants;
import com.linkedin.venice.spark.SparkConstants;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.input.hdfs;
package com.linkedin.venice.spark.input.hdfs;

import com.linkedin.venice.utils.VeniceProperties;
import org.apache.spark.sql.connector.read.Scan;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.hadoop.spark.input.hdfs;
package com.linkedin.venice.spark.input.hdfs;

import static com.linkedin.venice.hadoop.spark.SparkConstants.DEFAULT_SCHEMA;
import static com.linkedin.venice.spark.SparkConstants.DEFAULT_SCHEMA;

import com.linkedin.venice.utils.VeniceProperties;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.hadoop.spark.input.hdfs;
package com.linkedin.venice.spark.input.hdfs;

import static com.linkedin.venice.hadoop.spark.SparkConstants.DEFAULT_SCHEMA;
import static com.linkedin.venice.spark.SparkConstants.DEFAULT_SCHEMA;

import com.linkedin.venice.utils.VeniceProperties;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.utils;
package com.linkedin.venice.spark.utils;

import java.util.Comparator;
import org.apache.spark.Partitioner;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.utils;
package com.linkedin.venice.spark.utils;

import org.apache.spark.sql.types.StructType;
import scala.Int;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.linkedin.venice.hadoop.spark.datawriter.jobs;
package com.linkedin.venice.spark.datawriter.jobs;

import static com.linkedin.venice.ConfigKeys.KAFKA_CONFIG_PREFIX;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
import static com.linkedin.venice.hadoop.spark.SparkConstants.KEY_COLUMN_NAME;
import static com.linkedin.venice.hadoop.spark.SparkConstants.SPARK_APP_NAME_CONFIG;
import static com.linkedin.venice.hadoop.spark.SparkConstants.SPARK_DATA_WRITER_CONF_PREFIX;
import static com.linkedin.venice.hadoop.spark.SparkConstants.SPARK_SESSION_CONF_PREFIX;
import static com.linkedin.venice.hadoop.spark.SparkConstants.VALUE_COLUMN_NAME;
import static com.linkedin.venice.meta.Store.UNLIMITED_STORAGE_QUOTA;
import static com.linkedin.venice.spark.SparkConstants.KEY_COLUMN_NAME;
import static com.linkedin.venice.spark.SparkConstants.SPARK_APP_NAME_CONFIG;
import static com.linkedin.venice.spark.SparkConstants.SPARK_DATA_WRITER_CONF_PREFIX;
import static com.linkedin.venice.spark.SparkConstants.SPARK_SESSION_CONF_PREFIX;
import static com.linkedin.venice.spark.SparkConstants.VALUE_COLUMN_NAME;
import static org.apache.spark.sql.types.DataTypes.BinaryType;
import static org.apache.spark.sql.types.DataTypes.StringType;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.datawriter.partition;
package com.linkedin.venice.spark.datawriter.partition;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.linkedin.venice.hadoop.spark.datawriter.partition;
package com.linkedin.venice.spark.datawriter.partition;

import static com.linkedin.venice.hadoop.VenicePushJobConstants.PARTITION_COUNT;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.spark.SparkConstants;
import com.linkedin.venice.partitioner.DefaultVenicePartitioner;
import com.linkedin.venice.partitioner.VenicePartitioner;
import com.linkedin.venice.spark.SparkConstants;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.VeniceProperties;
import java.util.Properties;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.hadoop.spark.datawriter.task;
package com.linkedin.venice.spark.datawriter.task;

import com.linkedin.venice.hadoop.spark.SparkConstants;
import com.linkedin.venice.spark.SparkConstants;
import org.apache.spark.sql.SparkSession;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.hadoop.spark.engine;
package com.linkedin.venice.spark.engine;

import com.linkedin.venice.hadoop.spark.SparkConstants;
import com.linkedin.venice.spark.SparkConstants;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.hadoop.spark.input.hdfs;
package com.linkedin.venice.spark.input.hdfs;

import static com.linkedin.venice.hadoop.VenicePushJobConstants.DEFAULT_KEY_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.DEFAULT_VALUE_FIELD_PROP;
Expand All @@ -11,8 +11,8 @@
import static com.linkedin.venice.hadoop.VenicePushJobConstants.UPDATE_SCHEMA_STRING_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.VALUE_FIELD_PROP;
import static com.linkedin.venice.hadoop.VenicePushJobConstants.VSON_PUSH;
import static com.linkedin.venice.hadoop.spark.SparkConstants.DEFAULT_SCHEMA;
import static com.linkedin.venice.hadoop.spark.input.hdfs.VeniceHdfsInputTable.INPUT_TABLE_NAME;
import static com.linkedin.venice.spark.SparkConstants.DEFAULT_SCHEMA;
import static com.linkedin.venice.spark.input.hdfs.VeniceHdfsInputTable.INPUT_TABLE_NAME;
import static com.linkedin.venice.utils.TestWriteUtils.DEFAULT_USER_DATA_VALUE_PREFIX;
import static com.linkedin.venice.utils.TestWriteUtils.NAME_RECORD_V2_SCHEMA;
import static com.linkedin.venice.utils.TestWriteUtils.STRING_TO_STRING_SCHEMA;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.venice.hadoop.spark.utils;
package com.linkedin.venice.spark.utils;

import static com.linkedin.venice.hadoop.spark.SparkConstants.KEY_COLUMN_NAME;
import static com.linkedin.venice.hadoop.spark.SparkConstants.VALUE_COLUMN_NAME;
import static com.linkedin.venice.spark.SparkConstants.KEY_COLUMN_NAME;
import static com.linkedin.venice.spark.SparkConstants.VALUE_COLUMN_NAME;
import static org.apache.spark.sql.types.DataTypes.BinaryType;

import org.apache.spark.sql.types.Metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.hadoop.VenicePushJob;
import com.linkedin.venice.hadoop.spark.datawriter.jobs.DataWriterSparkJob;
import com.linkedin.venice.helix.VeniceJsonSerializer;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceClusterWrapper;
Expand All @@ -93,6 +92,7 @@
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer;
import com.linkedin.venice.spark.datawriter.jobs.DataWriterSparkJob;
import com.linkedin.venice.stats.AbstractVeniceStats;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.tehuti.MetricsUtils;
Expand Down
Loading

0 comments on commit 88e0f5b

Please sign in to comment.