Skip to content

Commit

Permalink
Merge branch 'linkedin:main' into kristy_lee/650
Browse files Browse the repository at this point in the history
  • Loading branch information
kristyelee committed Sep 27, 2024
2 parents 53d177c + c694d96 commit d8bad05
Show file tree
Hide file tree
Showing 240 changed files with 8,241 additions and 1,841 deletions.
620 changes: 583 additions & 37 deletions .github/workflows/VeniceCI-E2ETests.yml

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ ext.libraries = [
grpcServices: "io.grpc:grpc-services:${grpcVersion}",
grpcStub: "io.grpc:grpc-stub:${grpcVersion}",
hadoopCommon: "org.apache.hadoop:hadoop-common:${hadoopVersion}",
hadoopHdfs: "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}",
httpAsyncClient: 'org.apache.httpcomponents:httpasyncclient:4.1.5',
httpClient5: 'org.apache.httpcomponents.client5:httpclient5:5.3',
httpCore5: 'org.apache.httpcomponents.core5:httpcore5:5.2.4',
Expand Down Expand Up @@ -125,7 +126,7 @@ ext.libraries = [
snappy: 'org.iq80.snappy:snappy:0.4',
spark: 'com.sparkjava:spark-core:2.9.4', // Spark-Java Rest framework
spotbugs: 'com.github.spotbugs:spotbugs:4.5.2',
tehuti: 'io.tehuti:tehuti:0.11.4',
tehuti: 'io.tehuti:tehuti:0.12.2',
testcontainers: 'org.testcontainers:testcontainers:1.18.0',
testng: 'org.testng:testng:6.14.3',
tomcatAnnotations: 'org.apache.tomcat:annotations-api:6.0.53',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER;
import static java.lang.Thread.currentThread;

import com.linkedin.davinci.blobtransfer.BlobTransferManager;
import com.linkedin.davinci.blobtransfer.BlobTransferUtil;
import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.StoreBackendConfig;
Expand All @@ -30,8 +32,6 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.cache.backend.ObjectCacheConfig;
import com.linkedin.venice.blobtransfer.BlobTransferManager;
import com.linkedin.venice.blobtransfer.BlobTransferUtil;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.schema.StoreSchemaFetcher;
import com.linkedin.venice.client.store.ClientConfig;
Expand Down Expand Up @@ -293,7 +293,8 @@ public DaVinciBackend(
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(),
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferClientPort(),
configLoader.getVeniceServerConfig().getRocksDBPath(),
clientConfig);
clientConfig,
storageMetadataService);
} else {
blobTransferManager = null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.blobtransfer;
package com.linkedin.davinci.blobtransfer;

import com.linkedin.venice.annotation.Experimental;
import com.linkedin.venice.exceptions.VenicePeersNotFoundException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.linkedin.davinci.blobtransfer;

import com.fasterxml.jackson.annotation.JsonProperty;


/**
* This class is the metadata of a partition in the blob transfer client
*/
public class BlobTransferPartitionMetadata {
public String topicName;
public int partitionId;
public java.nio.ByteBuffer offsetRecord;
public java.nio.ByteBuffer storeVersionState;

public BlobTransferPartitionMetadata() {
}

public BlobTransferPartitionMetadata(
@JsonProperty("topicName") String topicName,
@JsonProperty("partitionId") int partitionId,
@JsonProperty("offsetRecord") java.nio.ByteBuffer offsetRecord,
@JsonProperty("storeVersionState") java.nio.ByteBuffer storeVersionState) {
this.topicName = topicName;
this.partitionId = partitionId;
this.offsetRecord = offsetRecord;
this.storeVersionState = storeVersionState;
}

public String getTopicName() {
return topicName;
}

public void setTopicName(String topicName) {
this.topicName = topicName;
}

public int getPartitionId() {
return partitionId;
}

public void setPartitionId(int partitionId) {
this.partitionId = partitionId;
}

public java.nio.ByteBuffer getOffsetRecord() {
return offsetRecord;
}

public void setOffsetRecord(java.nio.ByteBuffer offsetRecord) {
this.offsetRecord = offsetRecord;
}

public void setStoreVersionState(java.nio.ByteBuffer storeVersionState) {
this.storeVersionState = storeVersionState;
}

public java.nio.ByteBuffer getStoreVersionState() {
return storeVersionState;
}

@Override
public String toString() {
return "BlobTransferPartitionMetadata {" + " topicName='" + topicName + ", partitionId=" + partitionId
+ ", offsetRecord=" + offsetRecord + ", storeVersionState=" + storeVersionState + " }";
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.blobtransfer;
package com.linkedin.davinci.blobtransfer;

import static com.linkedin.venice.store.rocksdb.RocksDBUtils.composePartitionDbDir;
import static com.linkedin.venice.store.rocksdb.RocksDBUtils.composeSnapshotDir;
Expand Down Expand Up @@ -31,4 +31,12 @@ public String getSnapshotDir() {
public String getFullResourceName() {
return Utils.getReplicaId(topicName, partition);
}

public String getTopicName() {
return topicName;
}

public int getPartition() {
return partition;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package com.linkedin.venice.blobtransfer;
package com.linkedin.davinci.blobtransfer;

import static com.linkedin.venice.client.store.ClientFactory.getTransportClient;

import com.linkedin.venice.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.venice.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.davinci.storage.StorageMetadataService;
import com.linkedin.venice.blobtransfer.DaVinciBlobFinder;
import com.linkedin.venice.blobtransfer.ServerBlobFinder;
import com.linkedin.venice.client.store.AbstractAvroStoreClient;
import com.linkedin.venice.client.store.AvroGenericStoreClientImpl;
import com.linkedin.venice.client.store.ClientConfig;
Expand All @@ -21,27 +24,35 @@ public class BlobTransferUtil {
* @param p2pTransferPort, the port used by the P2P transfer server and client
* @param baseDir, the base directory of the underlying storage
* @param clientConfig, the client config to start up a transport client
* @param storageMetadataService, the storage metadata service
* @return the blob transfer manager
* @throws Exception
*/
public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
int p2pTransferPort,
String baseDir,
ClientConfig clientConfig) {
return getP2PBlobTransferManagerForDVCAndStart(p2pTransferPort, p2pTransferPort, baseDir, clientConfig);
ClientConfig clientConfig,
StorageMetadataService storageMetadataService) {
return getP2PBlobTransferManagerForDVCAndStart(
p2pTransferPort,
p2pTransferPort,
baseDir,
clientConfig,
storageMetadataService);
}

public static BlobTransferManager<Void> getP2PBlobTransferManagerForDVCAndStart(
int p2pTransferServerPort,
int p2pTransferClientPort,
String baseDir,
ClientConfig clientConfig) {
ClientConfig clientConfig,
StorageMetadataService storageMetadataService) {
try {
AbstractAvroStoreClient storeClient =
new AvroGenericStoreClientImpl<>(getTransportClient(clientConfig), false, clientConfig);
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir),
new NettyFileTransferClient(p2pTransferClientPort, baseDir),
new P2PBlobTransferService(p2pTransferServerPort, baseDir, storageMetadataService),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new DaVinciBlobFinder(storeClient));
manager.start();
return manager;
Expand All @@ -64,11 +75,12 @@ public static BlobTransferManager<Void> getP2PBlobTransferManagerForServerAndSta
int p2pTransferServerPort,
int p2pTransferClientPort,
String baseDir,
CompletableFuture<HelixCustomizedViewOfflinePushRepository> customizedViewFuture) {
CompletableFuture<HelixCustomizedViewOfflinePushRepository> customizedViewFuture,
StorageMetadataService storageMetadataService) {
try {
BlobTransferManager<Void> manager = new NettyP2PBlobTransferManager(
new P2PBlobTransferService(p2pTransferServerPort, baseDir),
new NettyFileTransferClient(p2pTransferClientPort, baseDir),
new P2PBlobTransferService(p2pTransferServerPort, baseDir, storageMetadataService),
new NettyFileTransferClient(p2pTransferClientPort, baseDir, storageMetadataService),
new ServerBlobFinder(customizedViewFuture));
manager.start();
return manager;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.linkedin.davinci.blobtransfer;

import io.netty.handler.codec.http.HttpResponse;


public class BlobTransferUtils {
public static final String BLOB_TRANSFER_STATUS = "X-Blob-Transfer-Status";
public static final String BLOB_TRANSFER_COMPLETED = "Completed";
public static final String BLOB_TRANSFER_TYPE = "X-Blob-Transfer-Type";

public enum BlobTransferType {
FILE, METADATA
}

/**
* Check if the HttpResponse message is for metadata.
* @param msg the HttpResponse message
* @return true if the message is a metadata message, false otherwise
*/
public static boolean isMetadataMessage(HttpResponse msg) {
String metadataHeader = msg.headers().get(BlobTransferUtils.BLOB_TRANSFER_TYPE);
if (metadataHeader == null) {
return false;
}
return metadataHeader.equals(BlobTransferUtils.BlobTransferType.METADATA.name());
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.linkedin.venice.blobtransfer;
package com.linkedin.davinci.blobtransfer;

import com.linkedin.venice.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.venice.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient;
import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService;
import com.linkedin.venice.blobtransfer.BlobFinder;
import com.linkedin.venice.blobtransfer.BlobPeersDiscoveryResponse;
import com.linkedin.venice.exceptions.VenicePeersNotFoundException;
import java.io.InputStream;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.linkedin.venice.blobtransfer;
package com.linkedin.davinci.blobtransfer;

import com.linkedin.venice.exceptions.VeniceException;
import java.util.concurrent.CompletionStage;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.linkedin.davinci.blobtransfer.client;

import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpResponse;


/**
* MetadataAggregator is a custom HttpObjectAggregator that
* only aggregated HttpResponse messages for metadata.
*/
public class MetadataAggregator extends HttpObjectAggregator {
public MetadataAggregator(int maxContentLength) {
super(maxContentLength);
}

@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
if (msg instanceof HttpResponse) {
HttpResponse httpMessage = (HttpResponse) msg;
// only accept metadata messages to be aggregated
if (BlobTransferUtils.isMetadataMessage(httpMessage)) {
return super.acceptInboundMessage(msg);
} else {
return false;
}
}
return super.acceptInboundMessage(msg);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.blobtransfer.client;
package com.linkedin.davinci.blobtransfer.client;

import com.linkedin.davinci.storage.StorageMetadataService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
Expand All @@ -22,16 +23,20 @@

public class NettyFileTransferClient {
private static final Logger LOGGER = LogManager.getLogger(NettyFileTransferClient.class);
private static final int MAX_METADATA_CONTENT_LENGTH = 1024 * 1024 * 100;
EventLoopGroup workerGroup;
Bootstrap clientBootstrap;
private final String baseDir;
private final int serverPort;
private StorageMetadataService storageMetadataService;

// TODO 1: move tunable configs to a config class
// TODO 2: consider either increasing worker threads or have a dedicated thread pool to handle requests.
public NettyFileTransferClient(int serverPort, String baseDir) {
public NettyFileTransferClient(int serverPort, String baseDir, StorageMetadataService storageMetadataService) {
this.baseDir = baseDir;
this.serverPort = serverPort;
this.storageMetadataService = storageMetadataService;

clientBootstrap = new Bootstrap();
workerGroup = new NioEventLoopGroup();
clientBootstrap.group(workerGroup);
Expand All @@ -50,8 +55,13 @@ public CompletionStage<InputStream> get(String host, String storeName, int versi
// Connects to the remote host
try {
Channel ch = clientBootstrap.connect(host, serverPort).sync().channel();
// Request to get the blob file and metadata
// Attach the file handler to the pipeline
ch.pipeline().addLast(new P2PFileTransferClientHandler(baseDir, inputStream, storeName, version, partition));
// Attach the metadata handler to the pipeline
ch.pipeline()
.addLast(new MetadataAggregator(MAX_METADATA_CONTENT_LENGTH))
.addLast(new P2PFileTransferClientHandler(baseDir, inputStream, storeName, version, partition))
.addLast(new P2PMetadataTransferHandler(storageMetadataService, baseDir, storeName, version, partition));
// Send a GET request
ch.writeAndFlush(prepareRequest(storeName, version, partition));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.linkedin.venice.blobtransfer.client;
package com.linkedin.davinci.blobtransfer.client;

import static com.linkedin.venice.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED;
import static com.linkedin.venice.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_COMPLETED;
import static com.linkedin.davinci.blobtransfer.BlobTransferUtils.BLOB_TRANSFER_STATUS;

import com.linkedin.venice.blobtransfer.BlobTransferPayload;
import com.linkedin.davinci.blobtransfer.BlobTransferPayload;
import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
import com.linkedin.venice.exceptions.VeniceException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
Expand All @@ -15,6 +16,7 @@
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.ReferenceCountUtil;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
Expand Down Expand Up @@ -63,6 +65,14 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
if (!response.status().equals(HttpResponseStatus.OK)) {
throw new VeniceException("Failed to fetch file from remote peer. Response: " + response.status());
}
// redirect the message to the next handler if it's a metadata transfer
boolean isMetadataMessage = BlobTransferUtils.isMetadataMessage(response);
if (isMetadataMessage) {
ReferenceCountUtil.retain(msg);
ctx.fireChannelRead(msg);
return;
}

// Already end of transfer. Close the connection and completes the future
if (response.headers().get(BLOB_TRANSFER_STATUS) != null
&& response.headers().get(BLOB_TRANSFER_STATUS).equals(BLOB_TRANSFER_COMPLETED)) {
Expand All @@ -72,11 +82,10 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex

// Parse the file name
this.fileName = getFileNameFromHeader(response);

if (this.fileName == null) {
throw new VeniceException("No file name specified in the response for " + payload.getFullResourceName());
}
LOGGER.info("Starting blob transfer for file: {}", fileName);
LOGGER.debug("Starting blob transfer for file: {}", fileName);
this.fileContentLength = Long.parseLong(response.headers().get(HttpHeaderNames.CONTENT_LENGTH));

// Create the directory
Expand All @@ -91,8 +100,6 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex
HttpContent content = (HttpContent) msg;
ByteBuf byteBuf = content.content();
if (byteBuf.readableBytes() == 0) {
// hit EMPTY_LAST_CONTENT, it indicates the end of all file transfer.
// Skip it since it's not going to be used
return;
}
// defensive check
Expand Down
Loading

0 comments on commit d8bad05

Please sign in to comment.