Skip to content

Commit

Permalink
[server] Make rate limiter configurable in quota enforcement handler (#…
Browse files Browse the repository at this point in the history
…1160)

- Add configuration to allow swapping between different rate limiter types.
- Make quota enforcement interval and capacity (used in TokenBucket) configurable.
- Update TokenBucket to support refreshing at millisecond intervals.
- Some other cleanups to make code style consistent
  • Loading branch information
sushantmane authored Sep 10, 2024
1 parent a782eee commit d4bf2ba
Show file tree
Hide file tree
Showing 30 changed files with 625 additions and 416 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_BACKOFF_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_PUBSUB_CONSUMER_POLL_RETRY_TIMES;
import static com.linkedin.venice.ConfigKeys.SERVER_QUOTA_ENFORCEMENT_CAPACITY_MULTIPLE;
import static com.linkedin.venice.ConfigKeys.SERVER_QUOTA_ENFORCEMENT_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_QUOTA_ENFORCEMENT_INTERVAL_IN_MILLIS;
import static com.linkedin.venice.ConfigKeys.SERVER_RECORD_LEVEL_METRICS_WHEN_BOOTSTRAPPING_CURRENT_VERSION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_CONSUMER_CONFIG_PREFIX;
import static com.linkedin.venice.ConfigKeys.SERVER_REMOTE_INGESTION_REPAIR_SLEEP_INTERVAL_SECONDS;
Expand Down Expand Up @@ -168,12 +170,14 @@
import com.linkedin.davinci.kafka.consumer.RemoteIngestionRepairService;
import com.linkedin.davinci.store.rocksdb.RocksDBServerConfig;
import com.linkedin.davinci.validation.KafkaDataIntegrityValidator;
import com.linkedin.venice.ConfigKeys;
import com.linkedin.venice.authorization.DefaultIdentityParser;
import com.linkedin.venice.exceptions.ConfigurationException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.IngestionMode;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter;
import com.linkedin.venice.throttle.VeniceRateLimiter;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
Expand Down Expand Up @@ -295,6 +299,26 @@ public class VeniceServerConfig extends VeniceClusterConfig {
*/
private final int maxRequestSize;

/**
* Rate limiter type for store version QPS rate limiter.
*/
private VeniceRateLimiter.RateLimiterType storeVersionQpsRateLimiterType;

/**
* Rate limiter type for storage node.
*/
private VeniceRateLimiter.RateLimiterType storageNodeRateLimiterType;

/**
* Server quota enforcement interval in seconds.
*/
private final int quotaEnforcementIntervalInMs;

/**
* Server quota enforcement capacity multiple.
*/
private final int quotaEnforcementCapacityMultiple;

/**
* Time interval for offset check of topic in Hybrid Store lag measurement.
*/
Expand Down Expand Up @@ -570,6 +594,16 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
serverComputeThreadNum = serverProperties.getInt(SERVER_COMPUTE_THREAD_NUM, 16);
nettyIdleTimeInSeconds = serverProperties.getInt(SERVER_NETTY_IDLE_TIME_SECONDS, (int) TimeUnit.HOURS.toSeconds(3));
maxRequestSize = (int) serverProperties.getSizeInBytes(SERVER_MAX_REQUEST_SIZE, 256 * 1024);
storeVersionQpsRateLimiterType = extractRateLimiterType(
serverProperties.getString(
ConfigKeys.SERVER_STORE_VERSION_QPS_RATE_LIMITER,
VeniceRateLimiter.RateLimiterType.TOKEN_BUCKET_INCREMENTAL_REFILL.name()));
storageNodeRateLimiterType = extractRateLimiterType(
serverProperties.getString(
ConfigKeys.SERVER_STORAGE_NODE_RATE_LIMITER,
VeniceRateLimiter.RateLimiterType.TOKEN_BUCKET_INCREMENTAL_REFILL.name()));
quotaEnforcementIntervalInMs = serverProperties.getInt(SERVER_QUOTA_ENFORCEMENT_INTERVAL_IN_MILLIS, 10_000);
quotaEnforcementCapacityMultiple = serverProperties.getInt(SERVER_QUOTA_ENFORCEMENT_CAPACITY_MULTIPLE, 5);
topicOffsetCheckIntervalMs =
serverProperties.getInt(SERVER_SOURCE_TOPIC_OFFSET_CHECK_INTERVAL_MS, (int) TimeUnit.SECONDS.toMillis(60));
this.topicManagerMetadataFetcherConsumerPoolSize = serverProperties.getInt(
Expand Down Expand Up @@ -939,6 +973,14 @@ long extractIngestionMemoryLimit(
return extractedMemoryLimit;
}

private VeniceRateLimiter.RateLimiterType extractRateLimiterType(String rateLimiterTypeStr) {
try {
return VeniceRateLimiter.RateLimiterType.valueOf(rateLimiterTypeStr);
} catch (IllegalArgumentException e) {
throw new VeniceException("Invalid rate limiter type: " + rateLimiterTypeStr);
}
}

public int getListenerPort() {
return listenerPort;
}
Expand Down Expand Up @@ -1547,4 +1589,20 @@ public int getNonCurrentVersionNonAAWCLeaderQuotaRecordsPerSecond() {
public int getChannelOptionWriteBufferHighBytes() {
return channelOptionWriteBufferHighBytes;
}

public VeniceRateLimiter.RateLimiterType getStoreVersionQpsRateLimiterType() {
return storeVersionQpsRateLimiterType;
}

public VeniceRateLimiter.RateLimiterType getStorageNodeRateLimiterType() {
return storageNodeRateLimiterType;
}

public int getQuotaEnforcementIntervalInMs() {
return quotaEnforcementIntervalInMs;
}

public int getQuotaEnforcementCapacityMultiple() {
return quotaEnforcementCapacityMultiple;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.RESET_OFFSET;
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.SUBSCRIBE;
import static com.linkedin.davinci.kafka.consumer.ConsumerActionType.UNSUBSCRIBE;
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.*;
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.LEADER;
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY;
import static com.linkedin.davinci.validation.KafkaDataIntegrityValidator.DISABLED;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.LogMessages.KILLED_JOB_MESSAGE;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice;

import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doCallRealMethod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,27 @@ private ConfigKeys() {
*/
public static final String SERVER_NODE_CAPACITY_RCU = "server.node.capacity.rcu.per.second";

/**
* Rate limiter for store version level read quota enforcement.
*/
public static final String SERVER_STORE_VERSION_QPS_RATE_LIMITER = "server.store.version.qps.rate.limiter";

/**
* Rate limiter for storage node level read quota enforcement.
*/
public static final String SERVER_STORAGE_NODE_RATE_LIMITER = "server.storage.node.rate.limiter";

/**
* Server quota enforcement interval in milliseconds.
*/
public static final String SERVER_QUOTA_ENFORCEMENT_INTERVAL_IN_MILLIS =
"server.quota.enforcement.interval.in.millis";

/**
* Server quota enforcement capacity multiple.
*/
public static final String SERVER_QUOTA_ENFORCEMENT_CAPACITY_MULTIPLE = "server.quota.enforcement.capacity.multiple";

/**
* This config is used to control the maximum records returned by every poll request.
* So far, Store Ingestion is throttling per poll, so if the configured value is too big,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.linkedin.venice.acl.handler;

import static com.linkedin.venice.grpc.GrpcUtils.*;
import static com.linkedin.venice.listener.ServerHandlerUtils.*;
import static com.linkedin.venice.grpc.GrpcUtils.extractGrpcClientCert;
import static com.linkedin.venice.grpc.GrpcUtils.httpResponseStatusToGrpcStatus;
import static com.linkedin.venice.listener.ServerHandlerUtils.extractClientCert;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.venice.acl.AclCreationDeletionListener;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.venice.helix;

import static com.linkedin.venice.helix.ResourceAssignment.ResourceAssignmentChanges;
import static com.linkedin.venice.meta.Store.*;
import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.COMPLETED;

import com.linkedin.venice.exceptions.VeniceException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.linkedin.venice.pushmonitor;

import static com.linkedin.venice.pushmonitor.ExecutionStatus.NOT_CREATED;
import static com.linkedin.venice.utils.Utils.*;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.linkedin.venice.utils.Utils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -90,7 +90,7 @@ public List<StatusSnapshot> getReplicaHistoricStatusList(String instanceId) {
public boolean hasFatalDataValidationError() {
for (ReplicaStatus replicaStatus: replicaStatusMap.values()) {
if (ExecutionStatus.isError(replicaStatus.getCurrentStatus()) && replicaStatus.getIncrementalPushVersion() != null
&& replicaStatus.getIncrementalPushVersion().contains(FATAL_DATA_VALIDATION_ERROR)) {
&& replicaStatus.getIncrementalPushVersion().contains(Utils.FATAL_DATA_VALIDATION_ERROR)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* This is a generalized IoThrottler as it existed before, which can be used to
* throttle Bytes read or written, number of entries scanned, etc.
*/
public class EventThrottler {
public class EventThrottler implements VeniceRateLimiter {
private static final Logger LOGGER = LogManager.getLogger(EventThrottler.class);
private static final long DEFAULT_CHECK_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
private static final String THROTTLER_NAME = "event-throttler";
Expand All @@ -51,6 +51,9 @@ public class EventThrottler {
private MetricConfig rateConfig = null;
private final EventThrottlingStrategy throttlingStrategy;

// Used only to compare if the new quota requests are different from the existing quota.
private long quota = -1;

/**
* @param maxRatePerSecond Maximum rate that this throttler should allow (-1 is unlimited)
*/
Expand Down Expand Up @@ -279,4 +282,36 @@ protected long getConfiguredMaxRatePerSecond() {
protected boolean isCheckQuotaBeforeRecording() {
return checkQuotaBeforeRecording;
}

@Override
public boolean tryAcquirePermit(int units) {
if (getMaxRatePerSecond() < 0) {
return true;
}
long now = time.milliseconds();
try {
rateSensor.record(units, now);
return true;
} catch (QuotaViolationException e) {
return false;
}
}

@Override
public void setQuota(long quota) {
this.quota = quota;
}

@Override
public long getQuota() {
return quota;
}

@Override
public String toString() {
return "EventThrottler{" + "maxRatePerSecondProvider=" + maxRatePerSecondProvider + ", enforcementIntervalMs="
+ enforcementIntervalMs + ", throttlerName='" + throttlerName + ", checkQuotaBeforeRecording="
+ checkQuotaBeforeRecording + ", configuredMaxRatePerSecond=" + configuredMaxRatePerSecond + ", time=" + time
+ ", throttlingStrategy=" + throttlingStrategy + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.linkedin.venice.throttle;

import com.google.common.util.concurrent.RateLimiter;


/**
* A wrapper around Guava's RateLimiter to provide a common interface for rate limiting.
*/
public class GuavaRateLimiter implements VeniceRateLimiter {
private final RateLimiter rateLimiter;
private long permitsPerSecond;

public GuavaRateLimiter(long permitsPerSecond) {
this.permitsPerSecond = permitsPerSecond;
this.rateLimiter = RateLimiter.create(permitsPerSecond);
}

@Override
public boolean tryAcquirePermit(int units) {
return rateLimiter.tryAcquire(units);
}

@Override
public void setQuota(long quota) {
this.permitsPerSecond = quota;
}

@Override
public long getQuota() {
return permitsPerSecond;
}

@Override
public String toString() {
return "GuavaRateLimiter{" + "permitsPerSecond=" + permitsPerSecond + '}';
}
}
Loading

0 comments on commit d4bf2ba

Please sign in to comment.