Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EXPERIMENTS] Quota rejection and it's impact on server latencies #1115

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a419089
Threadpool for quota
sushantmane Aug 11, 2024
a1c3d34
Revert "Threadpool for quota"
sushantmane Aug 11, 2024
01f42b1
Add netty stats
sushantmane Aug 12, 2024
b6bcb3e
Add server connection stats
sushantmane Aug 12, 2024
57d08e8
Change log level to debug for ServerConnectionStatsHandler
sushantmane Aug 12, 2024
596c71e
rename sensors
sushantmane Aug 12, 2024
37cb75b
Measure writeAndFlushTime for quota rejected requests
sushantmane Aug 12, 2024
8d3accf
Add metrics for ResponseWriteAndFlushTimeForDataRequest
sushantmane Aug 12, 2024
efd1489
Fix broken code
sushantmane Aug 12, 2024
16b70bd
Add queuedTasksForReadHandler
sushantmane Aug 12, 2024
49a9c8f
Add time spent in read handler
sushantmane Aug 12, 2024
21e6f69
Add elapsed time
sushantmane Aug 12, 2024
0e89b97
Introduce write and flush handoff pool
sushantmane Aug 12, 2024
b210502
Revert "Introduce write and flush handoff pool"
sushantmane Aug 12, 2024
1e9fab0
Add back FlushConsolidationHandler
sushantmane Aug 12, 2024
e736e00
reduce flush consolidation to 16
sushantmane Aug 13, 2024
25a916d
Update connection stats
sushantmane Aug 13, 2024
78e4f45
Add recordTimeSpentInQuotaEnforcement
sushantmane Aug 13, 2024
09039a6
Skip flush in quota
sushantmane Aug 13, 2024
edae8d4
Disable flush in channelReadCompletd
sushantmane Aug 13, 2024
8fd262e
Revert "Disable flush in channelReadCompletd"
sushantmane Aug 13, 2024
950083f
Revert "Skip flush in quota"
sushantmane Aug 13, 2024
e9fb60f
Use expensive handlers only for acl and quota
sushantmane Aug 13, 2024
536ee95
Revert "Use expensive handlers only for acl and quota"
sushantmane Aug 13, 2024
d8c5aaa
[server][router] Replace synchronized with CAS in TokenBucket::update…
sushantmane Aug 13, 2024
d99fafd
Delete unnecessary set
sushantmane Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public abstract class ReadResponse {
private int hadamardProductCount = 0;
private int countOperatorCount = 0;
private int rcu = 0;
private long responseWriteAndFlushStartTimeNanos = -1;

public void setCompressionStrategy(CompressionStrategy compressionStrategy) {
this.compressionStrategy = compressionStrategy;
Expand Down Expand Up @@ -205,4 +206,16 @@ public int getCountOperatorCount() {
public abstract ByteBuf getResponseBody();

public abstract int getResponseSchemaIdHeader();

public void setResponseWriteAndFlushStartTimeNanos(long responseWriteAndFlushStartTimeNanos) {
this.responseWriteAndFlushStartTimeNanos = responseWriteAndFlushStartTimeNanos;
}

public void recordResponseWriteAndFlushStartTimeNanos() {
this.responseWriteAndFlushStartTimeNanos = System.nanoTime();
}

public long getResponseWriteAndFlushStartTimeNanos() {
return responseWriteAndFlushStartTimeNanos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static ThreadPoolExecutor createThreadPool(
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadCount,
threadCount,
0,
600,
TimeUnit.MILLISECONDS,
getExecutionQueue(capacity, blockingQueueType),
new DaemonThreadFactory(threadNamePrefix));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public class TokenBucket {
private final Clock clock;
private final AtomicLong tokens;
private final AtomicLong tokensRequestedSinceLastRefill;
private volatile long previousRefillTime;
private volatile long nextUpdateTime;
private final AtomicLong previousRefillTime = new AtomicLong();
private final AtomicLong nextUpdateTime = new AtomicLong();

/**
* This constructor should only be used by tests. Application should not specify it's own instance of Clock
Expand Down Expand Up @@ -51,8 +51,9 @@ public TokenBucket(long capacity, long refillAmount, long refillInterval, TimeUn

tokens = new AtomicLong(capacity);
tokensRequestedSinceLastRefill = new AtomicLong(0);
previousRefillTime = clock.millis();
nextUpdateTime = previousRefillTime + refillIntervalMs;
long timeNow = clock.millis();
previousRefillTime.set(timeNow);
nextUpdateTime.set(timeNow + refillIntervalMs);

refillPerSecond = refillAmount / (float) refillUnit.toSeconds(refillInterval);
}
Expand All @@ -74,25 +75,18 @@ public TokenBucket(long capacity, long refillAmount, long refillInterval, TimeUn
* function is short-circuited. Consumers of the token bucket should always retry.
*/
private void update() {
if (clock.millis() > nextUpdateTime) {
synchronized (this) {
long timeNow = clock.millis();
if (timeNow > nextUpdateTime) {
long refillCount = (timeNow - nextUpdateTime) / refillIntervalMs + 1;
long totalRefillAmount = refillCount * refillAmount;
tokens.getAndAccumulate(totalRefillAmount, (existing, toAdd) -> {
long newTokens = existing + toAdd;
if (newTokens > capacity) {
return capacity;
} else {
return newTokens;
}
});
previousRefillTime = timeNow;
tokensRequestedSinceLastRefill.set(0);
nextUpdateTime = timeNow + refillIntervalMs;
}
}
long timeNow = clock.millis();
long currentNextUpdateTime = nextUpdateTime.get();
if (timeNow > currentNextUpdateTime
&& nextUpdateTime.compareAndSet(currentNextUpdateTime, timeNow + refillIntervalMs)) {
long refillCount = (timeNow - currentNextUpdateTime) / refillIntervalMs + 1;
long totalRefillAmount = refillCount * refillAmount;
tokens.getAndAccumulate(totalRefillAmount, (existing, toAdd) -> {
long newTokens = existing + toAdd;
return Math.min(newTokens, capacity);
});
previousRefillTime.set(timeNow);
tokensRequestedSinceLastRefill.set(0);
}
}

Expand Down Expand Up @@ -135,7 +129,7 @@ public float getAmortizedRefillPerSecond() {
}

public double getStaleUsageRatio() {
long timeSinceLastRefill = TimeUnit.MILLISECONDS.toSeconds(clock.millis() - previousRefillTime);
long timeSinceLastRefill = TimeUnit.MILLISECONDS.toSeconds(clock.millis() - previousRefillTime.get());
if (timeSinceLastRefill > 0) {
return ((double) tokensRequestedSinceLastRefill.get() / (double) timeSinceLastRefill) / refillPerSecond;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ private void runTest(
}
}

@Test
public void testFastClientForNettyStats() throws Exception {
testFastClientGet(
false,
false,
false,
false,
2,
RequestType.MULTI_GET,
StoreMetadataFetchMode.SERVER_BASED_METADATA);
}

@Test(dataProvider = "FastClient-Test-Permutations", timeOut = TIME_OUT)
public void testFastClientGet(
boolean dualRead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository;
import com.linkedin.venice.listener.ListenerService;
import com.linkedin.venice.listener.StorageReadRequestHandler;
import com.linkedin.venice.listener.VeniceServerNettyStats;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.ReadOnlyStoreRepository;
import com.linkedin.venice.security.SSLFactory;
Expand Down Expand Up @@ -82,7 +83,8 @@ protected StorageReadRequestHandler createRequestHandler(
boolean parallelBatchGetEnabled,
int parallelBatchGetChunkSize,
StorageEngineBackedCompressorFactory compressorFactory,
Optional<ResourceReadUsageTracker> resourceReadUsageTracker) {
Optional<ResourceReadUsageTracker> resourceReadUsageTracker,
VeniceServerNettyStats nettyStats) {

return new StorageReadRequestHandler(
executor,
Expand All @@ -98,7 +100,8 @@ protected StorageReadRequestHandler createRequestHandler(
parallelBatchGetChunkSize,
serverConfig,
compressorFactory,
resourceReadUsageTracker) {
resourceReadUsageTracker,
nettyStats) {
@Override
public void channelRead(ChannelHandlerContext context, Object message) throws Exception {
RequestHandler handler = requestHandler.get();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,55 @@
status = error
name = PropertiesConfig

filters = threshold

filter.threshold.type = ThresholdFilter
filter.threshold.level = debug
filter.threshold.level = info

appenders = console
#appenders = console

appender.console.type = Console
appender.console.name = STDOUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} - [%X{component}] %p [%c{1}] [%t] %m%n
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %p [%c{1}] [%t] %m%n

rootLogger.level = info
rootLogger.appenderRefs = stdout
rootLogger.appenderRef.stdout.ref = STDOUT
appender.rolling.type = RollingFile
appender.rolling.name = fileLogger
appender.rolling.fileName= /Users/sumane/logs/venice-test.log
appender.rolling.filePattern= ${basePath}/app_%d{yyyyMMdd}.log.gz
appender.rolling.layout.type = PatternLayout
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %p [%c{1.}] [%t] %m%n
appender.rolling.policies.type = Policies

rootLogger.level = ERROR
#rootLogger.appenderRefs = stdout
#rootLogger.appenderRef.stdout.ref = STDOUT
#rootLogger.appenderRefs = RollingFile
rootLogger.appenderRef.RollingFile.ref = fileLogger

# Set package org.apache.zookeeper log level to error.
logger.zk.name = org.apache.zookeeper
logger.zk.level = error
logger.zk.level = off
logger.apache.name = org.apache
logger.apache.level = error
logger.kafka.name = kafka
logger.kafka.level = error
logger.e.name = org.eclipse
logger.e.level = off
logger.d2.name = com.linkedin.d2
logger.d2.level = error
logger.venice.name = com.linkedin.venice
logger.venice.level = info
logger.dvc.name = com.linkedin.davinci
logger.dvc.level = info
logger.vs.name = com.linkedin.venice.serialization.avro
logger.vs.level = ERROR
logger.vu.name = com.linkedin.venice.utils
logger.vu.level = ERROR
#com.linkedin.venice.listener
logger.vl.name = com.linkedin.venice.listener
logger.vl.level = debug
#c.l.v.helix
logger.helix.name = com.linkedin.venice.helix
logger.helix.level = error
#com.linkedin.venice.listener.ServerConnectionStatsHandler
logger.scs.name = com.linkedin.venice.listener.ServerConnectionStatsHandler
logger.scs.level = debug
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.tehuti.metrics.MetricsRepository;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -61,6 +63,7 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ReadQuotaEnforcementHandler quotaEnforcer;
private final VeniceHttp2PipelineInitializerBuilder http2PipelineInitializerBuilder;
private final ServerConnectionStats serverConnectionStats;
private VeniceServerNettyStats nettyStats;
AggServerQuotaUsageStats quotaUsageStats;
AggServerQuotaTokenBucketStats quotaTokenBucketStats;
List<ServerInterceptor> aclInterceptors;
Expand All @@ -78,6 +81,31 @@ public HttpChannelInitializer(
Optional<StaticAccessController> routerAccessController,
Optional<DynamicAccessController> storeAccessController,
StorageReadRequestHandler requestHandler) {
this(
storeMetadataRepository,
customizedViewRepository,
metricsRepository,
sslFactory,
sslHandshakeExecutor,
serverConfig,
routerAccessController,
storeAccessController,
requestHandler,
null);
}

public HttpChannelInitializer(
ReadOnlyStoreRepository storeMetadataRepository,
CompletableFuture<HelixCustomizedViewOfflinePushRepository> customizedViewRepository,
MetricsRepository metricsRepository,
Optional<SSLFactory> sslFactory,
Executor sslHandshakeExecutor,
VeniceServerConfig serverConfig,
Optional<StaticAccessController> routerAccessController,
Optional<DynamicAccessController> storeAccessController,
StorageReadRequestHandler requestHandler,
VeniceServerNettyStats nettyStats) {
this.nettyStats = nettyStats;
this.serverConfig = serverConfig;
this.requestHandler = requestHandler;
this.isDaVinciClient = serverConfig.isDaVinciClient();
Expand Down Expand Up @@ -133,7 +161,9 @@ public HttpChannelInitializer(
customizedViewRepository,
nodeId,
quotaUsageStats,
metricsRepository);
metricsRepository,
nettyStats,
Clock.systemUTC());

// Token Bucket Stats for a store must be initialized when that store is created
this.quotaTokenBucketStats = new AggServerQuotaTokenBucketStats(metricsRepository, quotaEnforcer);
Expand Down Expand Up @@ -174,6 +204,7 @@ interface ChannelPipelineConsumer {

@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new FlushConsolidationHandler(16, true));
if (sslFactory.isPresent()) {
SslInitializer sslInitializer = new SslInitializer(SslUtils.toAlpiniSSLFactory(sslFactory.get()), false);
if (sslHandshakeExecutor != null) {
Expand All @@ -184,9 +215,9 @@ public void initChannel(SocketChannel ch) {
}
ChannelPipelineConsumer httpPipelineInitializer = (pipeline, whetherNeedServerCodec) -> {
ServerConnectionStatsHandler serverConnectionStatsHandler =
new ServerConnectionStatsHandler(serverConnectionStats, serverConfig.getRouterPrincipalName());
new ServerConnectionStatsHandler(serverConnectionStats, nettyStats, serverConfig.getRouterPrincipalName());
pipeline.addLast(serverConnectionStatsHandler);
StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats);
StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats, nettyStats);
pipeline.addLast(statsHandler);
if (whetherNeedServerCodec) {
pipeline.addLast(new HttpServerCodec());
Expand Down Expand Up @@ -245,7 +276,7 @@ public void initChannel(SocketChannel ch) {
public VeniceServerGrpcRequestProcessor initGrpcRequestProcessor() {
VeniceServerGrpcRequestProcessor grpcServerRequestProcessor = new VeniceServerGrpcRequestProcessor();

StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats);
StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats, nettyStats);
GrpcStatsHandler grpcStatsHandler = new GrpcStatsHandler(statsHandler);
grpcServerRequestProcessor.addHandler(grpcStatsHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public ListenerService(
this.isGrpcEnabled = serverConfig.isGrpcEnabled();
this.grpcPort = serverConfig.getGrpcPort();

VeniceServerNettyStats nettyStats = new VeniceServerNettyStats(metricsRepository, "NettyStats");

executor = createThreadPool(
serverConfig.getRestServiceStorageThreadNum(),
"StorageExecutionThread",
Expand Down Expand Up @@ -121,7 +123,8 @@ public ListenerService(
serverConfig.isEnableParallelBatchGet(),
serverConfig.getParallelBatchGetChunkSize(),
compressorFactory,
resourceReadUsageTracker);
resourceReadUsageTracker,
nettyStats);

storageReadRequestHandler = requestHandler;

Expand All @@ -134,7 +137,8 @@ public ListenerService(
serverConfig,
routerAccessController,
storeAccessController,
requestHandler);
requestHandler,
nettyStats);

Class<? extends ServerChannel> serverSocketChannelClass = NioServerSocketChannel.class;
boolean epollEnabled = serverConfig.isRestServiceEpollEnabled();
Expand Down Expand Up @@ -186,6 +190,7 @@ public ListenerService(
public boolean startInner() throws Exception {
serverFuture = bootstrap.bind(port).sync();
LOGGER.info("Listener service started on port: {}", port);
LOGGER.info("####EXPERIMENT#### -- No flush in quota code path");

if (isGrpcEnabled) {
grpcServer.start();
Expand Down Expand Up @@ -236,7 +241,8 @@ protected StorageReadRequestHandler createRequestHandler(
boolean parallelBatchGetEnabled,
int parallelBatchGetChunkSize,
StorageEngineBackedCompressorFactory compressorFactory,
Optional<ResourceReadUsageTracker> resourceReadUsageTracker) {
Optional<ResourceReadUsageTracker> resourceReadUsageTracker,
VeniceServerNettyStats nettyStats) {
return new StorageReadRequestHandler(
executor,
computeExecutor,
Expand All @@ -251,6 +257,7 @@ protected StorageReadRequestHandler createRequestHandler(
parallelBatchGetChunkSize,
serverConfig,
compressorFactory,
resourceReadUsageTracker);
resourceReadUsageTracker,
nettyStats);
}
}
Loading