Skip to content

Commit

Permalink
Add unit tests for GrpcIoRequestProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Sep 15, 2024
1 parent f1e94cf commit 8ae1141
Show file tree
Hide file tree
Showing 10 changed files with 769 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,11 @@
import static com.linkedin.venice.listener.ReadQuotaEnforcementHandler.INVALID_REQUEST_RESOURCE_MSG;
import static com.linkedin.venice.listener.ReadQuotaEnforcementHandler.SERVER_OVER_CAPACITY_MSG;

import com.google.protobuf.ByteString;
import com.linkedin.davinci.listener.response.ReadResponse;
import com.linkedin.venice.HttpConstants;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.listener.QuotaEnforcementHandler;
import com.linkedin.venice.listener.QuotaEnforcementHandler.QuotaEnforcementResult;
import com.linkedin.venice.listener.RequestStatsRecorder;
import com.linkedin.venice.listener.StorageReadRequestHandler;
import com.linkedin.venice.listener.request.RouterRequest;
import com.linkedin.venice.listener.response.AbstractReadResponse;
import com.linkedin.venice.protocols.MultiKeyResponse;
import com.linkedin.venice.protocols.SingleGetResponse;
import com.linkedin.venice.protocols.VeniceServerResponse;
import com.linkedin.venice.response.VeniceReadResponseStatus;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -46,10 +36,12 @@ public class GrpcIoRequestProcessor {
private static final Logger LOGGER = LogManager.getLogger(GrpcIoRequestProcessor.class);
private final QuotaEnforcementHandler quotaEnforcementHandler;
private final StorageReadRequestHandler storageReadRequestHandler;
private final GrpcReplyProcessor replyProcessor;

public GrpcIoRequestProcessor(GrpcServiceDependencies services) {
this.quotaEnforcementHandler = services.getQuotaEnforcementHandler();
this.storageReadRequestHandler = services.getStorageReadRequestHandler();
this.replyProcessor = services.getGrpcReplyProcessor();
}

/**
Expand All @@ -74,7 +66,7 @@ public GrpcIoRequestProcessor(GrpcServiceDependencies services) {
* indicating an unknown quota enforcement result.</li>
* </ul>
*
* After determining the appropriate response, the method calls {@link #sendResponse(GrpcRequestContext)} to
* After determining the appropriate response, the method calls {@link GrpcReplyProcessor#sendResponse(GrpcRequestContext)} to
* finalize and send the response to the client.
*
* This method is executed in the gRPC executor thread, so it should not perform any blocking operations.
Expand All @@ -88,8 +80,9 @@ public void processRequest(GrpcRequestContext requestContext) {
QuotaEnforcementResult result = quotaEnforcementHandler.enforceQuota(request);
// If the request is allowed, hand it off to the storage read request handler
if (result == ALLOWED) {
GrpcStorageResponseHandlerCallback callback = GrpcStorageResponseHandlerCallback.create(requestContext);
storageReadRequestHandler.queueIoRequestForAsyncProcessing(request, callback);
storageReadRequestHandler.queueIoRequestForAsyncProcessing(
request,
GrpcStorageResponseHandlerCallback.create(requestContext, replyProcessor));
return;
}

Expand All @@ -110,167 +103,9 @@ public void processRequest(GrpcRequestContext requestContext) {
default:
requestContext.setReadResponseStatus(VeniceReadResponseStatus.INTERNAL_SERVER_ERROR);
requestContext.setErrorMessage("Unknown quota enforcement result: " + result);
LOGGER.error("Unknown quota enforcement result: {}", result);
}

sendResponse(requestContext);
}

/**
* Callers must ensure that all fields in the request context are properly set before invoking this method.
* Callers must also use the appropriate {@link GrpcRequestContext#readResponseStatus} to comply with the API contract.
*
* @param requestContext The context of the request for which a response is being sent
* @param <T> The type of the response observer
*/
public static <T> void sendResponse(GrpcRequestContext<T> requestContext) {
GrpcRequestContext.GrpcRequestType grpcRequestType = requestContext.getGrpcRequestType();
switch (grpcRequestType) {
case SINGLE_GET:
sendSingleGetResponse((GrpcRequestContext<SingleGetResponse>) requestContext);
break;
case MULTI_GET:
case COMPUTE:
sendMultiKeyResponse((GrpcRequestContext<MultiKeyResponse>) requestContext);
break;
case LEGACY:
sendVeniceServerResponse((GrpcRequestContext<VeniceServerResponse>) requestContext);
break;
default:
VeniceException veniceException = new VeniceException("Unknown response type: " + grpcRequestType);
LOGGER.error("Unknown response type: {}", grpcRequestType, veniceException);
throw veniceException;
}
}

/**
* Sends a single get response to the client and records the request statistics via {@link #reportRequestStats}.
* Since {@link io.grpc.stub.StreamObserver} is not thread-safe, synchronization is required before invoking
* {@link io.grpc.stub.StreamObserver#onNext} and {@link io.grpc.stub.StreamObserver#onCompleted}.
*
* @param requestContext The context of the gRPC request, which includes the response and stats recorder to be updated.
*/
public static void sendSingleGetResponse(GrpcRequestContext<SingleGetResponse> requestContext) {
ReadResponse readResponse = requestContext.getReadResponse();
SingleGetResponse.Builder builder = SingleGetResponse.newBuilder();
VeniceReadResponseStatus responseStatus = requestContext.getReadResponseStatus();

if (readResponse == null) {
builder.setStatusCode(requestContext.getReadResponseStatus().getCode());
builder.setErrorMessage(requestContext.getErrorMessage());
} else if (readResponse.isFound()) {
builder.setRcu(readResponse.getRCU())
.setStatusCode(responseStatus.getCode())
.setSchemaId(readResponse.getResponseSchemaIdHeader())
.setCompressionStrategy(readResponse.getCompressionStrategy().getValue())
.setContentLength(readResponse.getResponseBody().readableBytes())
.setContentType(HttpConstants.AVRO_BINARY)
.setValue(GrpcUtils.toByteString(readResponse.getResponseBody()));
} else {
builder.setStatusCode(responseStatus.getCode())
.setRcu(readResponse.getRCU())
.setErrorMessage("Key not found")
.setContentLength(0);
}

StreamObserver<SingleGetResponse> responseObserver = requestContext.getResponseObserver();
synchronized (responseObserver) {
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

reportRequestStats(requestContext);
}

/**
* Sends a multi key response (multiGet and compute requests) to the client and records the request statistics via {@link #reportRequestStats}.
* Since {@link io.grpc.stub.StreamObserver} is not thread-safe, synchronization is required before invoking
* {@link io.grpc.stub.StreamObserver#onNext} and {@link io.grpc.stub.StreamObserver#onCompleted}.
*
* @param requestContext The context of the gRPC request, which includes the response and stats recorder to be updated.
*/
public static void sendMultiKeyResponse(GrpcRequestContext<MultiKeyResponse> requestContext) {
ReadResponse readResponse = requestContext.getReadResponse();
MultiKeyResponse.Builder builder = MultiKeyResponse.newBuilder();
VeniceReadResponseStatus responseStatus = requestContext.getReadResponseStatus();

if (readResponse == null) {
builder.setStatusCode(responseStatus.getCode());
builder.setErrorMessage(requestContext.getErrorMessage());
} else if (readResponse.isFound()) {
builder.setStatusCode(responseStatus.getCode())
.setRcu(readResponse.getRCU())
.setCompressionStrategy(readResponse.getCompressionStrategy().getValue())
.setContentLength(readResponse.getResponseBody().readableBytes())
.setContentType(HttpConstants.AVRO_BINARY)
.setValue(GrpcUtils.toByteString(readResponse.getResponseBody()));
} else {
builder.setStatusCode(responseStatus.getCode())
.setRcu(readResponse.getRCU())
.setErrorMessage("Key not found")
.setContentLength(0);
}

StreamObserver<MultiKeyResponse> responseObserver = requestContext.getResponseObserver();
synchronized (responseObserver) {
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
reportRequestStats(requestContext);
}

/**
* Sends response (for the legacy API) to the client and records the request statistics via {@link #reportRequestStats}.
* Since {@link io.grpc.stub.StreamObserver} is not thread-safe, synchronization is required before invoking
* {@link io.grpc.stub.StreamObserver#onNext} and {@link io.grpc.stub.StreamObserver#onCompleted}.
*
* @param requestContext The context of the gRPC request, which includes the response and stats recorder to be updated.
*/
public static void sendVeniceServerResponse(GrpcRequestContext<VeniceServerResponse> requestContext) {
ReadResponse readResponse = requestContext.getReadResponse();
VeniceServerResponse.Builder builder = VeniceServerResponse.newBuilder();
VeniceReadResponseStatus responseStatus = requestContext.getReadResponseStatus();

if (readResponse == null) {
builder.setErrorCode(responseStatus.getCode());
builder.setErrorMessage(requestContext.getErrorMessage());
} else if (readResponse.isFound()) {
builder.setErrorCode(responseStatus.getCode())
.setResponseRCU(readResponse.getRCU())
.setCompressionStrategy(readResponse.getCompressionStrategy().getValue())
.setIsStreamingResponse(readResponse.isStreamingResponse())
.setSchemaId(readResponse.getResponseSchemaIdHeader())
.setData(GrpcUtils.toByteString(readResponse.getResponseBody()));
} else {
builder.setErrorCode(responseStatus.getCode()).setErrorMessage("Key not found").setData(ByteString.EMPTY);
}

StreamObserver<VeniceServerResponse> responseObserver = requestContext.getResponseObserver();
synchronized (responseObserver) {
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}

reportRequestStats(requestContext);
}

/**
* Records the request statistics based on the provided {@link GrpcRequestContext}.
* This method updates the {@link RequestStatsRecorder} with statistics from the {@link GrpcRequestContext} and {@link ReadResponse}.
* @param requestContext The context of the gRPC request, which contains the response and stats recorder to be updated.
*/
public static void reportRequestStats(GrpcRequestContext requestContext) {
ReadResponse readResponse = requestContext.getReadResponse();
RequestStatsRecorder requestStatsRecorder = requestContext.getRequestStatsRecorder();
AbstractReadResponse abstractReadResponse = (AbstractReadResponse) readResponse;
if (readResponse == null) {
requestStatsRecorder.setReadResponseStats(null).setResponseSize(0);
} else if (readResponse.isFound()) {
requestStatsRecorder.setReadResponseStats(abstractReadResponse.getReadResponseStatsRecorder())
.setResponseSize(abstractReadResponse.getResponseBody().readableBytes());
} else {
requestStatsRecorder.setReadResponseStats(abstractReadResponse.getReadResponseStatsRecorder()).setResponseSize(0);
}

RequestStatsRecorder.recordRequestCompletionStats(requestContext.getRequestStatsRecorder(), true, -1);
replyProcessor.sendResponse(requestContext);
}
}
Loading

0 comments on commit 8ae1141

Please sign in to comment.