diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/listener/response/ReadResponse.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/listener/response/ReadResponse.java index b955943217..86aa06fa27 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/listener/response/ReadResponse.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/listener/response/ReadResponse.java @@ -27,6 +27,7 @@ public abstract class ReadResponse { private int hadamardProductCount = 0; private int countOperatorCount = 0; private int rcu = 0; + private long responseWriteAndFlushStartTimeNanos = -1; public void setCompressionStrategy(CompressionStrategy compressionStrategy) { this.compressionStrategy = compressionStrategy; @@ -205,4 +206,16 @@ public int getCountOperatorCount() { public abstract ByteBuf getResponseBody(); public abstract int getResponseSchemaIdHeader(); + + public void setResponseWriteAndFlushStartTimeNanos(long responseWriteAndFlushStartTimeNanos) { + this.responseWriteAndFlushStartTimeNanos = responseWriteAndFlushStartTimeNanos; + } + + public void recordResponseWriteAndFlushStartTimeNanos() { + this.responseWriteAndFlushStartTimeNanos = System.nanoTime(); + } + + public long getResponseWriteAndFlushStartTimeNanos() { + return responseWriteAndFlushStartTimeNanos; + } } diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/concurrent/ThreadPoolFactory.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/concurrent/ThreadPoolFactory.java index b89ef3731a..4fa3c453a4 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/concurrent/ThreadPoolFactory.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/concurrent/ThreadPoolFactory.java @@ -23,7 +23,7 @@ public static ThreadPoolExecutor createThreadPool( ThreadPoolExecutor executor = new ThreadPoolExecutor( threadCount, threadCount, - 0, + 600, TimeUnit.MILLISECONDS, getExecutionQueue(capacity, blockingQueueType), new DaemonThreadFactory(threadNamePrefix)); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/throttle/TokenBucket.java b/internal/venice-common/src/main/java/com/linkedin/venice/throttle/TokenBucket.java index d694720d44..968e265cfa 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/throttle/TokenBucket.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/throttle/TokenBucket.java @@ -18,8 +18,8 @@ public class TokenBucket { private final Clock clock; private final AtomicLong tokens; private final AtomicLong tokensRequestedSinceLastRefill; - private volatile long previousRefillTime; - private volatile long nextUpdateTime; + private final AtomicLong previousRefillTime = new AtomicLong(); + private final AtomicLong nextUpdateTime = new AtomicLong(); /** * This constructor should only be used by tests. Application should not specify it's own instance of Clock @@ -51,8 +51,9 @@ public TokenBucket(long capacity, long refillAmount, long refillInterval, TimeUn tokens = new AtomicLong(capacity); tokensRequestedSinceLastRefill = new AtomicLong(0); - previousRefillTime = clock.millis(); - nextUpdateTime = previousRefillTime + refillIntervalMs; + long timeNow = clock.millis(); + previousRefillTime.set(timeNow); + nextUpdateTime.set(timeNow + refillIntervalMs); refillPerSecond = refillAmount / (float) refillUnit.toSeconds(refillInterval); } @@ -74,25 +75,18 @@ public TokenBucket(long capacity, long refillAmount, long refillInterval, TimeUn * function is short-circuited. Consumers of the token bucket should always retry. */ private void update() { - if (clock.millis() > nextUpdateTime) { - synchronized (this) { - long timeNow = clock.millis(); - if (timeNow > nextUpdateTime) { - long refillCount = (timeNow - nextUpdateTime) / refillIntervalMs + 1; - long totalRefillAmount = refillCount * refillAmount; - tokens.getAndAccumulate(totalRefillAmount, (existing, toAdd) -> { - long newTokens = existing + toAdd; - if (newTokens > capacity) { - return capacity; - } else { - return newTokens; - } - }); - previousRefillTime = timeNow; - tokensRequestedSinceLastRefill.set(0); - nextUpdateTime = timeNow + refillIntervalMs; - } - } + long timeNow = clock.millis(); + long currentNextUpdateTime = nextUpdateTime.get(); + if (timeNow > currentNextUpdateTime + && nextUpdateTime.compareAndSet(currentNextUpdateTime, timeNow + refillIntervalMs)) { + long refillCount = (timeNow - currentNextUpdateTime) / refillIntervalMs + 1; + long totalRefillAmount = refillCount * refillAmount; + tokens.getAndAccumulate(totalRefillAmount, (existing, toAdd) -> { + long newTokens = existing + toAdd; + return Math.min(newTokens, capacity); + }); + previousRefillTime.set(timeNow); + tokensRequestedSinceLastRefill.set(0); } } @@ -135,7 +129,7 @@ public float getAmortizedRefillPerSecond() { } public double getStaleUsageRatio() { - long timeSinceLastRefill = TimeUnit.MILLISECONDS.toSeconds(clock.millis() - previousRefillTime); + long timeSinceLastRefill = TimeUnit.MILLISECONDS.toSeconds(clock.millis() - previousRefillTime.get()); if (timeSinceLastRefill > 0) { return ((double) tokensRequestedSinceLastRefill.get() / (double) timeSinceLastRefill) / refillPerSecond; } else { diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java index ebccaf75df..9de04b5dd6 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/fastclient/AvroStoreClientEndToEndTest.java @@ -280,6 +280,18 @@ private void runTest( } } + @Test + public void testFastClientForNettyStats() throws Exception { + testFastClientGet( + false, + false, + false, + false, + 2, + RequestType.MULTI_GET, + StoreMetadataFetchMode.SERVER_BASED_METADATA); + } + @Test(dataProvider = "FastClient-Test-Permutations", timeOut = TIME_OUT) public void testFastClientGet( boolean dualRead, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/TestVeniceServer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/TestVeniceServer.java index cb755c1e69..257394e191 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/TestVeniceServer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/TestVeniceServer.java @@ -12,6 +12,7 @@ import com.linkedin.venice.helix.HelixCustomizedViewOfflinePushRepository; import com.linkedin.venice.listener.ListenerService; import com.linkedin.venice.listener.StorageReadRequestHandler; +import com.linkedin.venice.listener.VeniceServerNettyStats; import com.linkedin.venice.meta.ReadOnlySchemaRepository; import com.linkedin.venice.meta.ReadOnlyStoreRepository; import com.linkedin.venice.security.SSLFactory; @@ -82,7 +83,8 @@ protected StorageReadRequestHandler createRequestHandler( boolean parallelBatchGetEnabled, int parallelBatchGetChunkSize, StorageEngineBackedCompressorFactory compressorFactory, - Optional resourceReadUsageTracker) { + Optional resourceReadUsageTracker, + VeniceServerNettyStats nettyStats) { return new StorageReadRequestHandler( executor, @@ -98,7 +100,8 @@ protected StorageReadRequestHandler createRequestHandler( parallelBatchGetChunkSize, serverConfig, compressorFactory, - resourceReadUsageTracker) { + resourceReadUsageTracker, + nettyStats) { @Override public void channelRead(ChannelHandlerContext context, Object message) throws Exception { RequestHandler handler = requestHandler.get(); diff --git a/internal/venice-test-common/src/integrationTest/resources/log4j2.properties b/internal/venice-test-common/src/integrationTest/resources/log4j2.properties index b3730b4077..ea8996f79f 100644 --- a/internal/venice-test-common/src/integrationTest/resources/log4j2.properties +++ b/internal/venice-test-common/src/integrationTest/resources/log4j2.properties @@ -1,22 +1,55 @@ -status = error name = PropertiesConfig filters = threshold filter.threshold.type = ThresholdFilter -filter.threshold.level = debug +filter.threshold.level = info -appenders = console +#appenders = console appender.console.type = Console appender.console.name = STDOUT appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} - [%X{component}] %p [%c{1}] [%t] %m%n +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %p [%c{1}] [%t] %m%n -rootLogger.level = info -rootLogger.appenderRefs = stdout -rootLogger.appenderRef.stdout.ref = STDOUT +appender.rolling.type = RollingFile +appender.rolling.name = fileLogger +appender.rolling.fileName= /Users/sumane/logs/venice-test.log +appender.rolling.filePattern= ${basePath}/app_%d{yyyyMMdd}.log.gz +appender.rolling.layout.type = PatternLayout +appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %p [%c{1.}] [%t] %m%n +appender.rolling.policies.type = Policies + +rootLogger.level = ERROR +#rootLogger.appenderRefs = stdout +#rootLogger.appenderRef.stdout.ref = STDOUT +#rootLogger.appenderRefs = RollingFile +rootLogger.appenderRef.RollingFile.ref = fileLogger -# Set package org.apache.zookeeper log level to error. logger.zk.name = org.apache.zookeeper -logger.zk.level = error \ No newline at end of file +logger.zk.level = off +logger.apache.name = org.apache +logger.apache.level = error +logger.kafka.name = kafka +logger.kafka.level = error +logger.e.name = org.eclipse +logger.e.level = off +logger.d2.name = com.linkedin.d2 +logger.d2.level = error +logger.venice.name = com.linkedin.venice +logger.venice.level = info +logger.dvc.name = com.linkedin.davinci +logger.dvc.level = info +logger.vs.name = com.linkedin.venice.serialization.avro +logger.vs.level = ERROR +logger.vu.name = com.linkedin.venice.utils +logger.vu.level = ERROR +#com.linkedin.venice.listener +logger.vl.name = com.linkedin.venice.listener +logger.vl.level = debug +#c.l.v.helix +logger.helix.name = com.linkedin.venice.helix +logger.helix.level = error +#com.linkedin.venice.listener.ServerConnectionStatsHandler +logger.scs.name = com.linkedin.venice.listener.ServerConnectionStatsHandler +logger.scs.level = debug \ No newline at end of file diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java index 1f9eaaecfd..86a641c502 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/HttpChannelInitializer.java @@ -34,8 +34,10 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.timeout.IdleStateHandler; import io.tehuti.metrics.MetricsRepository; +import java.time.Clock; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -61,6 +63,7 @@ public class HttpChannelInitializer extends ChannelInitializer { private final ReadQuotaEnforcementHandler quotaEnforcer; private final VeniceHttp2PipelineInitializerBuilder http2PipelineInitializerBuilder; private final ServerConnectionStats serverConnectionStats; + private VeniceServerNettyStats nettyStats; AggServerQuotaUsageStats quotaUsageStats; AggServerQuotaTokenBucketStats quotaTokenBucketStats; List aclInterceptors; @@ -78,6 +81,31 @@ public HttpChannelInitializer( Optional routerAccessController, Optional storeAccessController, StorageReadRequestHandler requestHandler) { + this( + storeMetadataRepository, + customizedViewRepository, + metricsRepository, + sslFactory, + sslHandshakeExecutor, + serverConfig, + routerAccessController, + storeAccessController, + requestHandler, + null); + } + + public HttpChannelInitializer( + ReadOnlyStoreRepository storeMetadataRepository, + CompletableFuture customizedViewRepository, + MetricsRepository metricsRepository, + Optional sslFactory, + Executor sslHandshakeExecutor, + VeniceServerConfig serverConfig, + Optional routerAccessController, + Optional storeAccessController, + StorageReadRequestHandler requestHandler, + VeniceServerNettyStats nettyStats) { + this.nettyStats = nettyStats; this.serverConfig = serverConfig; this.requestHandler = requestHandler; this.isDaVinciClient = serverConfig.isDaVinciClient(); @@ -133,7 +161,9 @@ public HttpChannelInitializer( customizedViewRepository, nodeId, quotaUsageStats, - metricsRepository); + metricsRepository, + nettyStats, + Clock.systemUTC()); // Token Bucket Stats for a store must be initialized when that store is created this.quotaTokenBucketStats = new AggServerQuotaTokenBucketStats(metricsRepository, quotaEnforcer); @@ -174,6 +204,7 @@ interface ChannelPipelineConsumer { @Override public void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new FlushConsolidationHandler(16, true)); if (sslFactory.isPresent()) { SslInitializer sslInitializer = new SslInitializer(SslUtils.toAlpiniSSLFactory(sslFactory.get()), false); if (sslHandshakeExecutor != null) { @@ -184,9 +215,9 @@ public void initChannel(SocketChannel ch) { } ChannelPipelineConsumer httpPipelineInitializer = (pipeline, whetherNeedServerCodec) -> { ServerConnectionStatsHandler serverConnectionStatsHandler = - new ServerConnectionStatsHandler(serverConnectionStats, serverConfig.getRouterPrincipalName()); + new ServerConnectionStatsHandler(serverConnectionStats, nettyStats, serverConfig.getRouterPrincipalName()); pipeline.addLast(serverConnectionStatsHandler); - StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats); + StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats, nettyStats); pipeline.addLast(statsHandler); if (whetherNeedServerCodec) { pipeline.addLast(new HttpServerCodec()); @@ -245,7 +276,7 @@ public void initChannel(SocketChannel ch) { public VeniceServerGrpcRequestProcessor initGrpcRequestProcessor() { VeniceServerGrpcRequestProcessor grpcServerRequestProcessor = new VeniceServerGrpcRequestProcessor(); - StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats); + StatsHandler statsHandler = new StatsHandler(singleGetStats, multiGetStats, computeStats, nettyStats); GrpcStatsHandler grpcStatsHandler = new GrpcStatsHandler(statsHandler); grpcServerRequestProcessor.addHandler(grpcStatsHandler); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java index 8982aba7fe..43db911299 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ListenerService.java @@ -88,6 +88,8 @@ public ListenerService( this.isGrpcEnabled = serverConfig.isGrpcEnabled(); this.grpcPort = serverConfig.getGrpcPort(); + VeniceServerNettyStats nettyStats = new VeniceServerNettyStats(metricsRepository, "NettyStats"); + executor = createThreadPool( serverConfig.getRestServiceStorageThreadNum(), "StorageExecutionThread", @@ -121,7 +123,8 @@ public ListenerService( serverConfig.isEnableParallelBatchGet(), serverConfig.getParallelBatchGetChunkSize(), compressorFactory, - resourceReadUsageTracker); + resourceReadUsageTracker, + nettyStats); storageReadRequestHandler = requestHandler; @@ -134,7 +137,8 @@ public ListenerService( serverConfig, routerAccessController, storeAccessController, - requestHandler); + requestHandler, + nettyStats); Class serverSocketChannelClass = NioServerSocketChannel.class; boolean epollEnabled = serverConfig.isRestServiceEpollEnabled(); @@ -186,6 +190,7 @@ public ListenerService( public boolean startInner() throws Exception { serverFuture = bootstrap.bind(port).sync(); LOGGER.info("Listener service started on port: {}", port); + LOGGER.info("####EXPERIMENT#### -- No flush in quota code path"); if (isGrpcEnabled) { grpcServer.start(); @@ -236,7 +241,8 @@ protected StorageReadRequestHandler createRequestHandler( boolean parallelBatchGetEnabled, int parallelBatchGetChunkSize, StorageEngineBackedCompressorFactory compressorFactory, - Optional resourceReadUsageTracker) { + Optional resourceReadUsageTracker, + VeniceServerNettyStats nettyStats) { return new StorageReadRequestHandler( executor, computeExecutor, @@ -251,6 +257,7 @@ protected StorageReadRequestHandler createRequestHandler( parallelBatchGetChunkSize, serverConfig, compressorFactory, - resourceReadUsageTracker); + resourceReadUsageTracker, + nettyStats); } } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/OutboundHttpWrapperHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/OutboundHttpWrapperHandler.java index a093a45dc9..cb19c705fe 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/OutboundHttpWrapperHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/OutboundHttpWrapperHandler.java @@ -183,22 +183,24 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ctx.writeAndFlush(response); } - public void setStats(ServerStatsContext statsContext, ReadResponse obj) { - statsContext.setDatabaseLookupLatency(obj.getDatabaseLookupLatency()); - statsContext.setStorageExecutionHandlerSubmissionWaitTime(obj.getStorageExecutionHandlerSubmissionWaitTime()); - statsContext.setStorageExecutionQueueLen(obj.getStorageExecutionQueueLen()); - statsContext.setSuccessRequestKeyCount(obj.getRecordCount()); - statsContext.setMultiChunkLargeValueCount(obj.getMultiChunkLargeValueCount()); - statsContext.setReadComputeLatency(obj.getReadComputeLatency()); - statsContext.setReadComputeDeserializationLatency(obj.getReadComputeDeserializationLatency()); - statsContext.setReadComputeSerializationLatency(obj.getReadComputeSerializationLatency()); - statsContext.setDotProductCount(obj.getDotProductCount()); - statsContext.setCosineSimilarityCount(obj.getCosineSimilarityCount()); - statsContext.setHadamardProductCount(obj.getHadamardProductCount()); - statsContext.setCountOperatorCount(obj.getCountOperatorCount()); - statsContext.setKeySizeList(obj.getKeySizeList()); - statsContext.setValueSizeList(obj.getValueSizeList()); - statsContext.setValueSize(obj.getValueSize()); - statsContext.setReadComputeOutputSize(obj.getReadComputeOutputSize()); + public void setStats(ServerStatsContext statsContext, ReadResponse readResponse) { + statsContext.setDatabaseLookupLatency(readResponse.getDatabaseLookupLatency()); + statsContext + .setStorageExecutionHandlerSubmissionWaitTime(readResponse.getStorageExecutionHandlerSubmissionWaitTime()); + statsContext.setStorageExecutionQueueLen(readResponse.getStorageExecutionQueueLen()); + statsContext.setSuccessRequestKeyCount(readResponse.getRecordCount()); + statsContext.setMultiChunkLargeValueCount(readResponse.getMultiChunkLargeValueCount()); + statsContext.setReadComputeLatency(readResponse.getReadComputeLatency()); + statsContext.setReadComputeDeserializationLatency(readResponse.getReadComputeDeserializationLatency()); + statsContext.setReadComputeSerializationLatency(readResponse.getReadComputeSerializationLatency()); + statsContext.setDotProductCount(readResponse.getDotProductCount()); + statsContext.setCosineSimilarityCount(readResponse.getCosineSimilarityCount()); + statsContext.setHadamardProductCount(readResponse.getHadamardProductCount()); + statsContext.setCountOperatorCount(readResponse.getCountOperatorCount()); + statsContext.setKeySizeList(readResponse.getKeySizeList()); + statsContext.setValueSizeList(readResponse.getValueSizeList()); + statsContext.setValueSize(readResponse.getValueSize()); + statsContext.setReadComputeOutputSize(readResponse.getReadComputeOutputSize()); + statsContext.setResponseWriteAndFlushStartTimeNanos(readResponse.getResponseWriteAndFlushStartTimeNanos()); } } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandler.java index 59a35832cc..dbc1a7a328 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandler.java @@ -58,6 +58,7 @@ public class ReadQuotaEnforcementHandler extends SimpleChannelInboundHandler CHANNEL_ACTIVATED = AttributeKey.valueOf("channelActivated"); public ServerConnectionStatsHandler(ServerConnectionStats serverConnectionStats, String routerPrincipalName) { this.serverConnectionStats = serverConnectionStats; this.routerPrincipalName = routerPrincipalName; } + public ServerConnectionStatsHandler( + ServerConnectionStats serverConnectionStats, + VeniceServerNettyStats nettyStats, + String routerPrincipalName) { + this.serverConnectionStats = serverConnectionStats; + this.routerPrincipalName = routerPrincipalName; + this.nettyStats = nettyStats; + } + @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { SslHandler sslHandler = extractSslHandler(ctx); if (sslHandler == null) { // No ssl enabled, record all connections as client connections - serverConnectionStats.incrementClientConnectionCount(); + long cnt = serverConnectionStats.incrementClientConnectionCount(); + LOGGER.debug("####Channel registered: {} - clientCount: {}", ctx.channel().remoteAddress(), cnt); return; } + long cnt; String principalName = getPrincipal(sslHandler); - if (principalName.equals(routerPrincipalName)) { - serverConnectionStats.incrementRouterConnectionCount(); + if (principalName != null && principalName.contains("venice-router")) { + cnt = serverConnectionStats.incrementRouterConnectionCount(); } else { - serverConnectionStats.incrementClientConnectionCount(); + cnt = serverConnectionStats.incrementClientConnectionCount(); + } + LOGGER.debug("####Channel registered: {} - clientCount: {}", ctx.channel().remoteAddress(), cnt); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Attribute activated = ctx.channel().attr(CHANNEL_ACTIVATED); + if (activated.get() != null && activated.get()) { + return; + } + activated.set(true); + int activeConnections = nettyStats.incrementActiveConnections(); + LOGGER.debug("####Channel active: {} - activeCount: {}", ctx.channel().remoteAddress(), activeConnections); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Attribute activated = ctx.channel().attr(CHANNEL_ACTIVATED); + if (activated.get() == null || !activated.get()) { + return; } + activated.set(false); + int activeConnections = nettyStats.decrementActiveConnections(); + LOGGER.debug("####Channel inactive: {} - activeCount: {}", ctx.channel().remoteAddress(), activeConnections); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + if (ctx.channel().remoteAddress() == null) { + return; + } SslHandler sslHandler = extractSslHandler(ctx); if (sslHandler == null) { // No ssl enabled, record all connections as client connections - serverConnectionStats.decrementClientConnectionCount(); + long cnt = serverConnectionStats.decrementClientConnectionCount(); + LOGGER.debug("####Channel unregistered: {} - clientCount: {}", ctx.channel().remoteAddress(), cnt); return; } String principalName = getPrincipal(sslHandler); - if (principalName.equals(routerPrincipalName)) { - serverConnectionStats.decrementRouterConnectionCount(); + long cnt; + if (principalName != null && principalName.contains("venice-router")) { + cnt = serverConnectionStats.decrementRouterConnectionCount(); } else { - serverConnectionStats.decrementClientConnectionCount(); + cnt = serverConnectionStats.decrementClientConnectionCount(); } + LOGGER.debug("####Channel unregistered: {} - clientCount: {}", ctx.channel().remoteAddress(), cnt); } protected SslHandler extractSslHandler(ChannelHandlerContext ctx) { diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStatsContext.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStatsContext.java index 16f4b35fd5..20ec186751 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStatsContext.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/ServerStatsContext.java @@ -36,6 +36,7 @@ public class ServerStatsContext { private int hadamardProductCount = 0; private int countOperatorCount = 0; private boolean isRequestTerminatedEarly = false; + private long responseWriteAndFlushStartTimeNanos = -1; private IntList keySizeList; private IntList valueSizeList; @@ -157,8 +158,8 @@ public void resetContext() { isRequestTerminatedEarly = false; isComplete = false; isMisroutedStoreVersion = false; - newRequest = false; + responseWriteAndFlushStartTimeNanos = -1; } public void setFirstPartLatency(double firstPartLatency) { @@ -435,4 +436,12 @@ public void setMisroutedStoreVersion(boolean misroutedStoreVersion) { public boolean isMisroutedStoreVersion() { return isMisroutedStoreVersion; } + + public void setResponseWriteAndFlushStartTimeNanos(long startTimeNanos) { + responseWriteAndFlushStartTimeNanos = startTimeNanos; + } + + public long getResponseWriteAndFlushStartTimeNanos() { + return responseWriteAndFlushStartTimeNanos; + } } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/StatsHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/StatsHandler.java index 2c1f64f835..de022ff19f 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/StatsHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/StatsHandler.java @@ -1,5 +1,6 @@ package com.linkedin.venice.listener; +import static com.linkedin.venice.listener.VeniceServerNettyStats.FIRST_HANDLER_TIMESTAMP_KEY; import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; import static io.netty.handler.codec.http.HttpResponseStatus.OK; import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS; @@ -16,22 +17,27 @@ import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.HttpResponseStatus; import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class StatsHandler extends ChannelDuplexHandler { + private static final Logger LOGGER = LogManager.getLogger(StatsHandler.class); private final ServerStatsContext serverStatsContext; private final AggServerHttpRequestStats singleGetStats; private final AggServerHttpRequestStats multiGetStats; private final AggServerHttpRequestStats computeStats; + private final VeniceServerNettyStats nettyStats; public StatsHandler( AggServerHttpRequestStats singleGetStats, AggServerHttpRequestStats multiGetStats, - AggServerHttpRequestStats computeStats) { + AggServerHttpRequestStats computeStats, + VeniceServerNettyStats nettyStats) { this.singleGetStats = singleGetStats; this.multiGetStats = multiGetStats; this.computeStats = computeStats; - + this.nettyStats = nettyStats; this.serverStatsContext = new ServerStatsContext(singleGetStats, multiGetStats, computeStats); } @@ -153,6 +159,7 @@ public void setMisroutedStoreVersionRequest(boolean misroutedStoreVersionRequest @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { + ctx.channel().attr(FIRST_HANDLER_TIMESTAMP_KEY).set(System.nanoTime()); if (serverStatsContext.isNewRequest()) { // Reset for every request serverStatsContext.resetContext(); @@ -215,6 +222,11 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) serverStatsContext.errorRequest(serverHttpRequestStats, elapsedTime); } serverStatsContext.setStatCallBackExecuted(true); + + long responseWriteAndFlushStartTimeNanosStartTime = serverStatsContext.getResponseWriteAndFlushStartTimeNanos(); + if (responseWriteAndFlushStartTimeNanosStartTime > 0) { + nettyStats.recordWriteAndFlushCompletionTimeForDataRequest(responseWriteAndFlushStartTimeNanosStartTime); + } } }); } diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java index ed9f7bac36..4e48cf16ba 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/StorageReadRequestHandler.java @@ -123,6 +123,7 @@ public class StorageReadRequestHandler extends ChannelInboundHandlerAdapter { new VeniceConcurrentHashMap<>(); private final StorageEngineBackedCompressorFactory compressorFactory; private final Optional resourceReadUsageTracker; + private final VeniceServerNettyStats nettyStats; private static class PerStoreVersionState { final StoreDeserializerCache storeDeserializerCache; @@ -188,6 +189,41 @@ public StorageReadRequestHandler( VeniceServerConfig serverConfig, StorageEngineBackedCompressorFactory compressorFactory, Optional resourceReadUsageTracker) { + this( + executor, + computeExecutor, + storageEngineRepository, + metadataStoreRepository, + schemaRepository, + ingestionMetadataRetriever, + readMetadataRetriever, + healthCheckService, + fastAvroEnabled, + parallelBatchGetEnabled, + parallelBatchGetChunkSize, + serverConfig, + compressorFactory, + resourceReadUsageTracker, + null); + } + + public StorageReadRequestHandler( + ThreadPoolExecutor executor, + ThreadPoolExecutor computeExecutor, + StorageEngineRepository storageEngineRepository, + ReadOnlyStoreRepository metadataStoreRepository, + ReadOnlySchemaRepository schemaRepository, + IngestionMetadataRetriever ingestionMetadataRetriever, + ReadMetadataRetriever readMetadataRetriever, + DiskHealthCheckService healthCheckService, + boolean fastAvroEnabled, + boolean parallelBatchGetEnabled, + int parallelBatchGetChunkSize, + VeniceServerConfig serverConfig, + StorageEngineBackedCompressorFactory compressorFactory, + Optional resourceReadUsageTracker, + VeniceServerNettyStats nettyStats) { + this.nettyStats = nettyStats; this.executor = executor; this.computeExecutor = computeExecutor; this.storageEngineRepository = storageEngineRepository; @@ -234,116 +270,139 @@ public void channelRead(ChannelHandlerContext context, Object message) throws Ex if (message instanceof RouterRequest) { RouterRequest request = (RouterRequest) message; - resourceReadUsageTracker.ifPresent(tracker -> tracker.recordReadUsage(request.getResourceName())); - // Check before putting the request to the intermediate queue - if (request.shouldRequestBeTerminatedEarly()) { - // Try to make the response short - VeniceRequestEarlyTerminationException earlyTerminationException = - new VeniceRequestEarlyTerminationException(request.getStoreName()); - context.writeAndFlush( - new HttpShortcutResponse( - earlyTerminationException.getMessage(), - earlyTerminationException.getHttpResponseStatus())); - return; - } - /** - * For now, we are evaluating whether parallel lookup is good overall or not. - * Eventually, we either pick up the new parallel implementation or keep the original one, so it is fine - * to have some duplicate code for the time-being. - */ - if (parallelBatchGetEnabled && request.getRequestType().equals(RequestType.MULTI_GET)) { - handleMultiGetRequestInParallel((MultiGetRouterRequestWrapper) request, parallelBatchGetChunkSize) - .whenComplete((v, e) -> { - if (e != null) { - if (e instanceof VeniceRequestEarlyTerminationException) { - VeniceRequestEarlyTerminationException earlyTerminationException = - (VeniceRequestEarlyTerminationException) e; - context.writeAndFlush( - new HttpShortcutResponse( - earlyTerminationException.getMessage(), - earlyTerminationException.getHttpResponseStatus())); - } else if (e instanceof VeniceNoStoreException) { - HttpResponseStatus status = getHttpResponseStatus((VeniceNoStoreException) e); - context.writeAndFlush( - new HttpShortcutResponse( - "No storage exists for: " + ((VeniceNoStoreException) e).getStoreName(), - status)); - } else { - LOGGER.error("Exception thrown in parallel batch get for {}", request.getResourceName(), e); - HttpShortcutResponse shortcutResponse = - new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR); - shortcutResponse.setMisroutedStoreVersion(checkMisroutedStoreVersionRequest(request)); - context.writeAndFlush(shortcutResponse); - } - } else { - context.writeAndFlush(v); - } - }); - return; - } + // resourceReadUsageTracker.ifPresent(tracker -> tracker.recordReadUsage(request.getResourceName())); + // // Check before putting the request to the intermediate queue + // if (request.shouldRequestBeTerminatedEarly()) { + // // Try to make the response short + // VeniceRequestEarlyTerminationException earlyTerminationException = + // new VeniceRequestEarlyTerminationException(request.getStoreName()); + // writeAndFlushBadRequests( + // context, + // new HttpShortcutResponse( + // earlyTerminationException.getMessage(), + // earlyTerminationException.getHttpResponseStatus())); + // return; + // } + // /** + // * For now, we are evaluating whether parallel lookup is good overall or not. + // * Eventually, we either pick up the new parallel implementation or keep the original one, so it is fine + // * to have some duplicate code for the time-being. + // */ + // if (parallelBatchGetEnabled && request.getRequestType().equals(RequestType.MULTI_GET)) { + // handleMultiGetRequestInParallel((MultiGetRouterRequestWrapper) request, parallelBatchGetChunkSize) + // .whenComplete((v, e) -> { + // if (e == null) { + // writeAndFlush(context, v); + // return; + // } + // if (e instanceof VeniceRequestEarlyTerminationException) { + // VeniceRequestEarlyTerminationException earlyTerminationException = + // (VeniceRequestEarlyTerminationException) e; + // writeAndFlushBadRequests( + // context, + // new HttpShortcutResponse( + // earlyTerminationException.getMessage(), + // earlyTerminationException.getHttpResponseStatus())); + // } else if (e instanceof VeniceNoStoreException) { + // HttpResponseStatus status = getHttpResponseStatus((VeniceNoStoreException) e); + // writeAndFlushBadRequests( + // context, + // new HttpShortcutResponse( + // "No storage exists for: " + ((VeniceNoStoreException) e).getStoreName(), + // status)); + // } else { + // LOGGER.error("Exception thrown in parallel batch get for {}", request.getResourceName(), e); + // HttpShortcutResponse shortcutResponse = + // new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR); + // shortcutResponse.setMisroutedStoreVersion(checkMisroutedStoreVersionRequest(request)); + // writeAndFlushBadRequests(context, shortcutResponse); + // } + // }); + // return; + // } final ThreadPoolExecutor executor = getExecutor(request.getRequestType()); - executor.submit(() -> { + executor.execute(() -> { + long startTime = System.nanoTime(); try { - if (request.shouldRequestBeTerminatedEarly()) { - throw new VeniceRequestEarlyTerminationException(request.getStoreName()); - } - double submissionWaitTime = LatencyUtils.getElapsedTimeFromNSToMS(preSubmissionTimeNs); - int queueLen = executor.getQueue().size(); - ReadResponse response; - switch (request.getRequestType()) { - case SINGLE_GET: - response = handleSingleGetRequest((GetRouterRequest) request); - break; - case MULTI_GET: - response = handleMultiGetRequest((MultiGetRouterRequestWrapper) request); - break; - case COMPUTE: - response = handleComputeRequest((ComputeRouterRequestWrapper) message); - break; - default: - throw new VeniceException("Unknown request type: " + request.getRequestType()); - } - response.setStorageExecutionSubmissionWaitTime(submissionWaitTime); - response.setStorageExecutionQueueLen(queueLen); - response.setRCU(ReadQuotaEnforcementHandler.getRcu(request)); - if (request.isStreamingRequest()) { - response.setStreamingResponse(); - } - context.writeAndFlush(response); - } catch (VeniceNoStoreException e) { - String msg = "No storage exists for store: " + e.getStoreName(); - if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) { - LOGGER.error(msg, e); - } - HttpResponseStatus status = getHttpResponseStatus(e); - context.writeAndFlush(new HttpShortcutResponse("No storage exists for: " + e.getStoreName(), status)); - } catch (VeniceRequestEarlyTerminationException e) { - String msg = "Request timed out for store: " + e.getStoreName(); - if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) { - LOGGER.error(msg, e); - } - context.writeAndFlush(new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.REQUEST_TIMEOUT)); - } catch (OperationNotAllowedException e) { - String msg = "METHOD_NOT_ALLOWED: " + e.getMessage(); - if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) { - LOGGER.error(msg, e); + nettyStats.incrementActiveReadHandlerThreads(); + double submissionWaitTime = LatencyUtils.convertNSToMS(startTime - preSubmissionTimeNs); + try { + if (request.shouldRequestBeTerminatedEarly()) { + throw new VeniceRequestEarlyTerminationException(request.getStoreName()); + } + int queueLen = executor.getQueue().size(); + ReadResponse response; + switch (request.getRequestType()) { + case SINGLE_GET: + response = handleSingleGetRequest((GetRouterRequest) request); + break; + case MULTI_GET: + response = handleMultiGetRequest((MultiGetRouterRequestWrapper) request); + break; + case COMPUTE: + response = handleComputeRequest((ComputeRouterRequestWrapper) message); + break; + default: + throw new VeniceException("Unknown request type: " + request.getRequestType()); + } + response.setStorageExecutionSubmissionWaitTime(submissionWaitTime); + response.setStorageExecutionQueueLen(queueLen); + response.setRCU(ReadQuotaEnforcementHandler.getRcu(request)); + response.recordResponseWriteAndFlushStartTimeNanos(); + if (request.isStreamingRequest()) { + response.setStreamingResponse(); + } + writeAndFlush(context, response); + } catch (VeniceNoStoreException e) { + String msg = "No storage exists for store: " + e.getStoreName(); + if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) { + LOGGER.error(msg, e); + } + HttpResponseStatus status = getHttpResponseStatus(e); + writeAndFlushBadRequests( + context, + new HttpShortcutResponse("No storage exists for: " + e.getStoreName(), status)); + } catch (VeniceRequestEarlyTerminationException e) { + String msg = "Request timed out for store: " + e.getStoreName(); + if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) { + LOGGER.error(msg, e); + } + writeAndFlushBadRequests( + context, + new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.REQUEST_TIMEOUT)); + } catch (OperationNotAllowedException e) { + String msg = "METHOD_NOT_ALLOWED: " + e.getMessage(); + if (!REDUNDANT_LOGGING_FILTER.isRedundantException(msg)) { + LOGGER.error(msg, e); + } + writeAndFlushBadRequests( + context, + new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.METHOD_NOT_ALLOWED)); + } catch (Exception e) { + LOGGER.error("Exception thrown for {}", request.getResourceName(), e); + HttpShortcutResponse shortcutResponse = + new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR); + shortcutResponse.setMisroutedStoreVersion(checkMisroutedStoreVersionRequest(request)); + writeAndFlushBadRequests(context, shortcutResponse); } - context.writeAndFlush(new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.METHOD_NOT_ALLOWED)); - } catch (Exception e) { - LOGGER.error("Exception thrown for {}", request.getResourceName(), e); - HttpShortcutResponse shortcutResponse = - new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR); - shortcutResponse.setMisroutedStoreVersion(checkMisroutedStoreVersionRequest(request)); - context.writeAndFlush(shortcutResponse); + } finally { + nettyStats.decrementActiveReadHandlerThreads(); + nettyStats.decrementQueuedTasksForReadHandler(); + nettyStats.recordTimeSpentInReadHandler(startTime); } }); - + Long startTime = context.channel().attr(VeniceServerNettyStats.FIRST_HANDLER_TIMESTAMP_KEY).get(); + if (startTime != null) { + nettyStats.recordTimeSpentTillHandoffToReadHandler(startTime); + } + nettyStats.incrementQueuedTasksForReadHandler(); } else if (message instanceof HealthCheckRequest) { if (diskHealthCheckService.isDiskHealthy()) { - context.writeAndFlush(new HttpShortcutResponse("OK", HttpResponseStatus.OK)); + writeAndFlush(context, new HttpShortcutResponse("OK", HttpResponseStatus.OK)); } else { - context.writeAndFlush( + writeAndFlushBadRequests( + context, new HttpShortcutResponse( "Venice storage node hardware is not healthy!", HttpResponseStatus.INTERNAL_SERVER_ERROR)); @@ -353,35 +412,48 @@ public void channelRead(ChannelHandlerContext context, Object message) throws Ex } } else if (message instanceof DictionaryFetchRequest) { BinaryResponse response = handleDictionaryFetchRequest((DictionaryFetchRequest) message); - context.writeAndFlush(response); + writeAndFlush(context, response); } else if (message instanceof AdminRequest) { AdminResponse response = handleServerAdminRequest((AdminRequest) message); - context.writeAndFlush(response); + writeAndFlush(context, response); } else if (message instanceof MetadataFetchRequest) { try { MetadataResponse response = handleMetadataFetchRequest((MetadataFetchRequest) message); - context.writeAndFlush(response); + writeAndFlush(context, response); } catch (UnsupportedOperationException e) { LOGGER.warn( "Metadata requested by a storage node read quota not enabled store: {}", ((MetadataFetchRequest) message).getStoreName()); - context.writeAndFlush(new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.FORBIDDEN)); + writeAndFlushBadRequests(context, new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.FORBIDDEN)); } } else if (message instanceof CurrentVersionRequest) { ServerCurrentVersionResponse response = handleCurrentVersionRequest((CurrentVersionRequest) message); - context.writeAndFlush(response); + writeAndFlush(context, response); } else if (message instanceof TopicPartitionIngestionContextRequest) { TopicPartitionIngestionContextResponse response = handleTopicPartitionIngestionContextRequest((TopicPartitionIngestionContextRequest) message); - context.writeAndFlush(response); + writeAndFlush(context, response); } else { - context.writeAndFlush( + writeAndFlushBadRequests( + context, new HttpShortcutResponse( "Unrecognized object in StorageExecutionHandler", HttpResponseStatus.INTERNAL_SERVER_ERROR)); } } + private void writeAndFlush(ChannelHandlerContext context, Object message) { + long startTime = System.nanoTime(); + context.writeAndFlush(message); + nettyStats.recordWriteAndFlushTimeOkRequests(startTime); + } + + private void writeAndFlushBadRequests(ChannelHandlerContext context, Object message) { + long startTime = System.nanoTime(); + context.writeAndFlush(message); + nettyStats.recordWriteAndFlushTimeBadRequests(startTime); + } + private HttpResponseStatus getHttpResponseStatus(VeniceNoStoreException e) { String topic = e.getStoreName(); String storeName = Version.parseStoreFromKafkaTopicName(topic); diff --git a/services/venice-server/src/main/java/com/linkedin/venice/listener/VeniceServerNettyStats.java b/services/venice-server/src/main/java/com/linkedin/venice/listener/VeniceServerNettyStats.java new file mode 100644 index 0000000000..35f026bfc7 --- /dev/null +++ b/services/venice-server/src/main/java/com/linkedin/venice/listener/VeniceServerNettyStats.java @@ -0,0 +1,173 @@ +package com.linkedin.venice.listener; + +import com.linkedin.venice.stats.AbstractVeniceStats; +import com.linkedin.venice.stats.TehutiUtils; +import io.netty.util.AttributeKey; +import io.tehuti.metrics.MetricsRepository; +import io.tehuti.metrics.Sensor; +import io.tehuti.metrics.stats.AsyncGauge; +import io.tehuti.metrics.stats.Avg; +import io.tehuti.metrics.stats.Max; +import io.tehuti.metrics.stats.Min; +import io.tehuti.metrics.stats.OccurrenceRate; +import java.util.concurrent.atomic.AtomicInteger; + + +public class VeniceServerNettyStats extends AbstractVeniceStats { + public static final AttributeKey FIRST_HANDLER_TIMESTAMP_KEY = AttributeKey.valueOf("FirstHandlerTimestamp"); + + private final AtomicInteger activeConnections = new AtomicInteger(); + + private final AtomicInteger activeReadHandlerThreads = new AtomicInteger(); + private final Sensor writeAndFlushTimeOkRequests; + private final Sensor writeAndFlushTimeBadRequests; + private final Sensor writeAndFlushTimeCombined; + private final Sensor writeAndFlushCompletionTimeForDataRequest; + private final Sensor timeSpentInReadHandler; + private final Sensor timeSpentTillHandoffToReadHandler; + // time spent in quota enforcement logic + private final Sensor timeSpentInQuotaEnforcement; + private final AtomicInteger queuedTasksForReadHandler = new AtomicInteger(); + + public VeniceServerNettyStats(MetricsRepository metricsRepository, String name) { + super(metricsRepository, name); + registerSensorIfAbsent(new AsyncGauge((ignored, ignored2) -> activeConnections.get(), "active_connections")); + + registerSensorIfAbsent( + new AsyncGauge((ignored, ignored2) -> activeReadHandlerThreads.get(), "active_read_handler_threads")); + + registerSensorIfAbsent( + new AsyncGauge((ignored, ignored2) -> queuedTasksForReadHandler.get(), "queued_tasks_for_read_handler")); + + String writeAndFlushTimeCombinedSensorName = "WriteAndFlushTimeCombined"; + writeAndFlushTimeCombined = registerSensorIfAbsent( + writeAndFlushTimeCombinedSensorName, + new OccurrenceRate(), + new Max(), + new Min(), + new Avg(), + TehutiUtils.getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + writeAndFlushTimeCombinedSensorName)); + + String writeAndFlushTimeOkRequestsSensorName = "WriteAndFlushTimeOkRequests"; + writeAndFlushTimeOkRequests = registerSensorIfAbsent( + writeAndFlushTimeOkRequestsSensorName, + new OccurrenceRate(), + new Max(), + new Min(), + new Avg(), + TehutiUtils + .getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + writeAndFlushTimeOkRequestsSensorName)); + + String writeAndFlushTimeBadRequestsSensorName = "WriteAndFlushTimeBadRequests"; + writeAndFlushTimeBadRequests = registerSensorIfAbsent( + writeAndFlushTimeBadRequestsSensorName, + new OccurrenceRate(), + new Max(), + new Min(), + new Avg(), + TehutiUtils + .getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + writeAndFlushTimeBadRequestsSensorName)); + + String responseWriteAndFlushStartTimeNanosSensorName = "WriteAndFlushCompletionTimeForDataRequest"; + writeAndFlushCompletionTimeForDataRequest = registerSensorIfAbsent( + responseWriteAndFlushStartTimeNanosSensorName, + new OccurrenceRate(), + new Max(), + new Min(), + new Avg(), + TehutiUtils.getPercentileStat( + getName() + AbstractVeniceStats.DELIMITER + responseWriteAndFlushStartTimeNanosSensorName)); + + String timeSpentInReadHandlerSensorName = "TimeSpentInReadHandler"; + timeSpentInReadHandler = registerSensorIfAbsent( + timeSpentInReadHandlerSensorName, + new OccurrenceRate(), + new Max(), + new Min(), + new Avg(), + TehutiUtils.getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + timeSpentInReadHandlerSensorName)); + + String timeSpentTillHandoffToReadHandlerSensorName = "TimeSpentTillHandoffToReadHandler"; + timeSpentTillHandoffToReadHandler = registerSensorIfAbsent( + timeSpentTillHandoffToReadHandlerSensorName, + new OccurrenceRate(), + new Max(), + new Min(), + new Avg(), + TehutiUtils.getPercentileStat( + getName() + AbstractVeniceStats.DELIMITER + timeSpentTillHandoffToReadHandlerSensorName)); + + String timeSpentInQuotaEnforcementSensorName = "TimeSpentInQuotaEnforcement"; + timeSpentInQuotaEnforcement = registerSensorIfAbsent( + timeSpentInQuotaEnforcementSensorName, + new OccurrenceRate(), + new Max(), + new Min(), + new Avg(), + TehutiUtils + .getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + timeSpentInQuotaEnforcementSensorName)); + } + + public static double getElapsedTimeInMicros(long startTimeNanos) { + return (System.nanoTime() - startTimeNanos) / 1000.0; + } + + public static long getElapsedTimeInNanos(long startTimeNanos) { + return System.nanoTime() - startTimeNanos; + } + + public int incrementActiveReadHandlerThreads() { + return activeReadHandlerThreads.incrementAndGet(); + } + + public int decrementActiveReadHandlerThreads() { + return activeReadHandlerThreads.decrementAndGet(); + } + + public int incrementActiveConnections() { + return activeConnections.incrementAndGet(); + } + + // get activeConnections + public int getActiveConnections() { + return activeConnections.get(); + } + + public int decrementActiveConnections() { + return activeConnections.decrementAndGet(); + } + + public void recordWriteAndFlushTimeOkRequests(long startTimeNanos) { + writeAndFlushTimeOkRequests.record(getElapsedTimeInMicros(startTimeNanos)); + writeAndFlushTimeCombined.record(getElapsedTimeInMicros(startTimeNanos)); + } + + public void recordWriteAndFlushTimeBadRequests(long startTimeNanos) { + writeAndFlushTimeBadRequests.record(getElapsedTimeInMicros(startTimeNanos)); + writeAndFlushTimeCombined.record(getElapsedTimeInMicros(startTimeNanos)); + } + + public void recordWriteAndFlushCompletionTimeForDataRequest(long startTimeNanos) { + writeAndFlushCompletionTimeForDataRequest.record(getElapsedTimeInMicros(startTimeNanos)); + } + + public void incrementQueuedTasksForReadHandler() { + queuedTasksForReadHandler.incrementAndGet(); + } + + public void decrementQueuedTasksForReadHandler() { + queuedTasksForReadHandler.decrementAndGet(); + } + + public void recordTimeSpentInReadHandler(long startTimeNanos) { + timeSpentInReadHandler.record(getElapsedTimeInMicros(startTimeNanos)); + } + + public void recordTimeSpentTillHandoffToReadHandler(long startTimeNanos) { + timeSpentTillHandoffToReadHandler.record(getElapsedTimeInMicros(startTimeNanos)); + } + + public void recordTimeSpentInQuotaEnforcement(long startTimeNanos) { + timeSpentInQuotaEnforcement.record(getElapsedTimeInMicros(startTimeNanos)); + } +} diff --git a/services/venice-server/src/main/java/com/linkedin/venice/stats/ServerConnectionStats.java b/services/venice-server/src/main/java/com/linkedin/venice/stats/ServerConnectionStats.java index 70dc89a173..e30dcc8e8f 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/stats/ServerConnectionStats.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/stats/ServerConnectionStats.java @@ -29,21 +29,23 @@ public ServerConnectionStats(MetricsRepository metricsRepository, String name) { clientConnectionRequestSensor = registerSensorIfAbsent(CLIENT_CONNECTION_REQUEST, new OccurrenceRate()); } - public void incrementRouterConnectionCount() { - routerConnectionCount.incrementAndGet(); + public long incrementRouterConnectionCount() { + long count = routerConnectionCount.incrementAndGet(); routerConnectionRequestSensor.record(1); + return count; } - public void decrementRouterConnectionCount() { - routerConnectionCount.decrementAndGet(); + public long decrementRouterConnectionCount() { + return routerConnectionCount.decrementAndGet(); } - public void incrementClientConnectionCount() { - clientConnectionCount.incrementAndGet(); + public long incrementClientConnectionCount() { + long count = clientConnectionCount.incrementAndGet(); clientConnectionRequestSensor.record(1); + return count; } - public void decrementClientConnectionCount() { - clientConnectionCount.decrementAndGet(); + public long decrementClientConnectionCount() { + return clientConnectionCount.decrementAndGet(); } } diff --git a/services/venice-server/src/test/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandlerTest.java b/services/venice-server/src/test/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandlerTest.java index 9a6f03ae6d..c16872e4fa 100644 --- a/services/venice-server/src/test/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandlerTest.java +++ b/services/venice-server/src/test/java/com/linkedin/venice/listener/ReadQuotaEnforcementHandlerTest.java @@ -90,6 +90,7 @@ public void setUp() { thisNodeId, stats, metricsRepository, + null, clock); grpcQuotaEnforcer = new GrpcReadQuotaEnforcementHandler(quotaEnforcer); VeniceServerGrpcHandler mockNextHandler = mock(VeniceServerGrpcHandler.class);