Skip to content

Commit

Permalink
[server] Refactor ReadQuotaEnforcementHandler to reduce code duplicat…
Browse files Browse the repository at this point in the history
…ion across gRPC and Netty handlers (#1154)

- Extracted common quota logic into ReadQuotaEnforcementHandler::enforceQuota
- Kept handler-specific logic for gRPC and Netty in their respective methods
- Renamed GrpcErrorCodes to VeniceReadResponseStatus for shared use across HTTP and gRPC
  • Loading branch information
sushantmane authored Sep 5, 2024
1 parent ba6ead3 commit 31a442c
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 263 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.fastclient.GrpcClientConfig;
import com.linkedin.venice.grpc.GrpcErrorCodes;
import com.linkedin.venice.grpc.GrpcUtils;
import com.linkedin.venice.protocols.VeniceClientRequest;
import com.linkedin.venice.protocols.VeniceReadServiceGrpc;
import com.linkedin.venice.protocols.VeniceServerResponse;
import com.linkedin.venice.response.VeniceReadResponseStatus;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.grpc.ChannelCredentials;
Expand Down Expand Up @@ -267,7 +267,7 @@ public VeniceGrpcStreamObserver(CompletableFuture<TransportClientResponse> respo

@Override
public void onNext(VeniceServerResponse value) {
if (value.getErrorCode() != GrpcErrorCodes.OK) {
if (value.getErrorCode() != VeniceReadResponseStatus.OK) {
handleResponseError(value);
return;
}
Expand Down Expand Up @@ -310,13 +310,13 @@ void handleResponseError(VeniceServerResponse response) {
Exception exception;

switch (statusCode) {
case GrpcErrorCodes.BAD_REQUEST:
case VeniceReadResponseStatus.BAD_REQUEST:
exception = new VeniceClientHttpException(errorMessage, statusCode);
break;
case GrpcErrorCodes.TOO_MANY_REQUESTS:
case VeniceReadResponseStatus.TOO_MANY_REQUESTS:
exception = new VeniceClientRateExceededException(errorMessage);
break;
case GrpcErrorCodes.KEY_NOT_FOUND:
case VeniceReadResponseStatus.KEY_NOT_FOUND:
exception = null;
break;
default:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public HttpShortcutResponse(String message, HttpResponseStatus status) {
this.status = status;
}

public HttpShortcutResponse(HttpResponseStatus status) {
this("", status);
}

public String getMessage() {
return message;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.venice.response;

/**
* Enumeration of response status codes for Venice read requests.
* <p>
* **Positive values** correspond to standard HTTP status codes and can be used directly in HTTP responses.
* **Negative values** represent custom Venice-specific error codes.
* <p>
* For example, a status code of `200` indicates a successful read, while a status code of `-100` might indicate a specific Venice-related error.
*/
public class VeniceReadResponseStatus {
public static final int KEY_NOT_FOUND = -420;

public static final int OK = 200;
public static final int BAD_REQUEST = 400;
public static final int INTERNAL_ERROR = 500;
public static final int TOO_MANY_REQUESTS = 429;
public static final int SERVICE_UNAVAILABLE = 503;
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
responseStatus = shortcutResponse.getStatus();
String message = shortcutResponse.getMessage();
if (message == null) {
message = "";
body = Unpooled.EMPTY_BUFFER;
} else {
body = Unpooled.wrappedBuffer(message.getBytes(StandardCharsets.UTF_8));
}
body = Unpooled.wrappedBuffer(message.getBytes(StandardCharsets.UTF_8));
contentType = HttpConstants.TEXT_PLAIN;
if (shortcutResponse.getStatus().equals(VeniceRequestEarlyTerminationException.getHttpResponseStatus())) {
statsHandler.setRequestTerminatedEarly();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceNoHelixResourceException;
import com.linkedin.venice.grpc.GrpcErrorCodes;
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.listener.grpc.GrpcRequestContext;
import com.linkedin.venice.listener.request.RouterRequest;
import com.linkedin.venice.listener.response.HttpShortcutResponse;
import com.linkedin.venice.meta.Instance;
Expand Down Expand Up @@ -45,8 +43,10 @@ public class ReadQuotaEnforcementHandler extends SimpleChannelInboundHandler<Rou
implements RoutingDataRepository.RoutingDataChangedListener, StoreDataChangedListener {
private static final Logger LOGGER = LogManager.getLogger(ReadQuotaEnforcementHandler.class);
private static final String SERVER_BUCKET_STATS_NAME = "venice-storage-node-token-bucket";
public static final String SERVER_OVER_CAPACITY_MSG = "Server over capacity";
public static final String INVALID_REQUEST_RESOURCE_MSG = "Invalid request resource: ";

private final ConcurrentMap<String, TokenBucket> storeVersionBuckets = new VeniceConcurrentHashMap<>();
private final TokenBucket storageNodeBucket;
private final ServerQuotaTokenBucketStats storageNodeTokenBucketStats;
private final ReadOnlyStoreRepository storeRepository;
private final String thisNodeId;
Expand All @@ -55,9 +55,11 @@ public class ReadQuotaEnforcementHandler extends SimpleChannelInboundHandler<Rou
// TODO make these configurable
private final int enforcementIntervalSeconds = 10; // TokenBucket refill interval
private final int enforcementCapacityMultiple = 5; // Token bucket capacity is refill amount times this multiplier

private HelixCustomizedViewOfflinePushRepository customizedViewRepository;
private volatile boolean initializedVolatile = false;
private boolean initialized = false;
private TokenBucket storageNodeBucket;

public ReadQuotaEnforcementHandler(
long storageNodeRcuCapacity,
Expand Down Expand Up @@ -184,152 +186,87 @@ public boolean isInitialized() {
return false;
}

@Override
public void channelRead0(ChannelHandlerContext ctx, RouterRequest request) {
public enum QuotaEnforcementResult {
ALLOWED, // request is allowed
REJECTED, // too many requests (store level quota enforcement)
OVER_CAPACITY, // server over capacity (server level quota enforcement)
BAD_REQUEST, // bad request
}

/**
* Enforce quota for a given request. This is common to both HTTP and GRPC handlers. Respective handlers will
* take actions such as retaining the request and passing it to the next handler, or sending an error response.
* @param request RouterRequest
* @return QuotaEnforcementResult
*/
public QuotaEnforcementResult enforceQuota(RouterRequest request) {
String storeName = request.getStoreName();
Store store = storeRepository.getStore(storeName);

if (checkStoreNull(ctx, request, null, false, store)) {
return;
if (store == null) {
return QuotaEnforcementResult.BAD_REQUEST;
}

if (checkInitAndQuotaEnabledToSkipQuotaEnforcement(ctx, request, store, false)) {
return;
/*
* If we haven't completed initialization or store does not have SN read quota enabled, allow all requests
*/
if (!isInitialized() || !store.isStorageNodeReadQuotaEnabled()) {
return QuotaEnforcementResult.ALLOWED;
}

int rcu = getRcu(request); // read capacity units
int readCapacityUnits = getRcu(request);

/**
* First check store bucket for capacity don't throttle retried request at store version level
/*
* First check per store version bucket for capacity; don't throttle retried request at store version level
*/
TokenBucket tokenBucket = storeVersionBuckets.get(request.getResourceName());
if (tokenBucket != null) {
if (!request.isRetryRequest() && !tokenBucket.tryConsume(rcu)
&& handleTooManyRequests(ctx, request, null, store, rcu, false)) {
// Enforce store version quota for non-retry requests.
// TODO: check if extra node capacity and can still process this request out of quota
return;
TokenBucket veniceRateLimiter = storeVersionBuckets.get(request.getResourceName());
if (veniceRateLimiter != null) {
if (!request.isRetryRequest() && !veniceRateLimiter.tryConsume(readCapacityUnits)) {
stats.recordRejected(request.getStoreName(), readCapacityUnits);
return QuotaEnforcementResult.REJECTED;
}
} else {
// If this happens it is probably due to a short-lived race condition where the resource is being accessed before
// the bucket is allocated. The request will be allowed based on node/server capacity so emit metrics accordingly.
stats.recordAllowedUnintentionally(storeName, rcu);
stats.recordAllowedUnintentionally(storeName, readCapacityUnits);
}

/**
/*
* Once we know store bucket has capacity, check node bucket for capacity;
* retried requests need to be throttled at node capacity level
*/
if (!storageNodeBucket.tryConsume(rcu)) {
if (handleServerOverCapacity(ctx, null, storeName, rcu, false))
return;
if (!storageNodeBucket.tryConsume(readCapacityUnits)) {
return QuotaEnforcementResult.OVER_CAPACITY;
}
handleEpilogue(ctx, request, storeName, rcu, false);

stats.recordAllowed(storeName, readCapacityUnits);
return QuotaEnforcementResult.ALLOWED;
}

public boolean checkStoreNull(
ChannelHandlerContext ctx,
RouterRequest request,
GrpcRequestContext grpcCtx,
boolean isGrpc,
Store store) {
if (store != null) {
return false;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, RouterRequest request) {
QuotaEnforcementResult result = enforceQuota(request);

if (!isGrpc) {
if (result == QuotaEnforcementResult.BAD_REQUEST) {
ctx.writeAndFlush(
new HttpShortcutResponse(
"Invalid request resource " + request.getResourceName(),
INVALID_REQUEST_RESOURCE_MSG + request.getResourceName(),
HttpResponseStatus.BAD_REQUEST));
} else {
String error = "Invalid request resource " + request.getResourceName();
grpcCtx.setError();
grpcCtx.getVeniceServerResponseBuilder().setErrorCode(GrpcErrorCodes.BAD_REQUEST).setErrorMessage(error);
}

return true;
}

public boolean checkInitAndQuotaEnabledToSkipQuotaEnforcement(
ChannelHandlerContext ctx,
RouterRequest request,
Store store,
boolean isGrpc) {
if (!isInitialized() || !store.isStorageNodeReadQuotaEnabled()) {
// If we haven't completed initialization or store does not have SN read quota enabled, allow all requests
// Note: not recording any metrics. Lack of metrics indicates an issue with initialization
ReferenceCountUtil.retain(request);

if (!isGrpc) {
ctx.fireChannelRead(request);
}

return true;
return;
}

return false;
}

public boolean handleTooManyRequests(
ChannelHandlerContext ctx,
RouterRequest request,
GrpcRequestContext grpcCtx,
Store store,
int rcu,
boolean isGrpc) {
stats.recordRejected(request.getStoreName(), rcu);

long storeQuota = store.getReadQuotaInCU();
float thisNodeRcuPerSecond = storeVersionBuckets.get(request.getResourceName()).getAmortizedRefillPerSecond();
String errorMessage =
"Total quota for store " + request.getStoreName() + " is " + storeQuota + " RCU per second. Storage Node "
+ thisNodeId + " is allocated " + thisNodeRcuPerSecond + " RCU per second which has been exceeded.";

if (!isGrpc) {
ctx.writeAndFlush(new HttpShortcutResponse(errorMessage, HttpResponseStatus.TOO_MANY_REQUESTS));
} else {
grpcCtx.setError();
grpcCtx.getVeniceServerResponseBuilder()
.setErrorCode(GrpcErrorCodes.TOO_MANY_REQUESTS)
.setErrorMessage(errorMessage);
if (result == QuotaEnforcementResult.REJECTED) {
ctx.writeAndFlush(new HttpShortcutResponse(HttpResponseStatus.TOO_MANY_REQUESTS));
return;
}

return true;
}

public boolean handleServerOverCapacity(
ChannelHandlerContext ctx,
GrpcRequestContext grpcCtx,
String storeName,
int rcu,
boolean isGrpc) {
stats.recordRejected(storeName, rcu);

if (!isGrpc) {
ctx.writeAndFlush(new HttpShortcutResponse("Server over capacity", HttpResponseStatus.SERVICE_UNAVAILABLE));
} else {
String errorMessage = "Server over capacity";
grpcCtx.setError();
grpcCtx.getVeniceServerResponseBuilder()
.setErrorCode(GrpcErrorCodes.SERVICE_UNAVAILABLE)
.setErrorMessage(errorMessage);
if (result == QuotaEnforcementResult.OVER_CAPACITY) {
ctx.writeAndFlush(new HttpShortcutResponse(SERVER_OVER_CAPACITY_MSG, HttpResponseStatus.SERVICE_UNAVAILABLE));
return;
}

return true;
}

private void handleEpilogue(
ChannelHandlerContext ctx,
RouterRequest request,
String storeName,
int rcu,
boolean isGrpc) {
// If we reach here, the request is allowed; retain the request and pass it to the next handler
ReferenceCountUtil.retain(request);
if (!isGrpc) {
ctx.fireChannelRead(request);
}
stats.recordAllowed(storeName, rcu);
ctx.fireChannelRead(request);
}

/**
Expand Down Expand Up @@ -534,8 +471,7 @@ private boolean isLatestVersion(int version, List<String> topics) {
}

/**
* For tests
* @return
* Helper methods for unit testing
*/
protected Set<String> listTopics() {
return storeVersionBuckets.keySet();
Expand All @@ -560,15 +496,27 @@ public ReadOnlyStoreRepository getStoreRepository() {
return storeRepository;
}

public ConcurrentMap<String, TokenBucket> getStoreVersionBuckets() {
return storeVersionBuckets;
public AggServerQuotaUsageStats getStats() {
return stats;
}

public boolean storageConsumeRcu(int rcu) {
return !storageNodeBucket.tryConsume(rcu);
void setInitialized(boolean initialized) {
this.initialized = initialized;
}

public AggServerQuotaUsageStats getStats() {
return stats;
void setInitializedVolatile(boolean initializedVolatile) {
this.initializedVolatile = initializedVolatile;
}

TokenBucket getStoreVersionBucket(String storeVersion) {
return storeVersionBuckets.get(storeVersion);
}

void setStoreVersionBucket(String storeVersion, TokenBucket bucket) {
storeVersionBuckets.put(storeVersion, bucket);
}

void setStorageNodeBucket(TokenBucket bucket) {
storageNodeBucket = bucket;
}
}
Loading

0 comments on commit 31a442c

Please sign in to comment.