Skip to content

Commit

Permalink
Add time spent in read handler
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Aug 12, 2024
1 parent 16b70bd commit 49a9c8f
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static ThreadPoolExecutor createThreadPool(
ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadCount,
threadCount,
0,
600,
TimeUnit.MILLISECONDS,
getExecutionQueue(capacity, blockingQueueType),
new DaemonThreadFactory(threadNamePrefix));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,67 +270,67 @@ public void channelRead(ChannelHandlerContext context, Object message) throws Ex

if (message instanceof RouterRequest) {
RouterRequest request = (RouterRequest) message;
resourceReadUsageTracker.ifPresent(tracker -> tracker.recordReadUsage(request.getResourceName()));
// Check before putting the request to the intermediate queue
if (request.shouldRequestBeTerminatedEarly()) {
// Try to make the response short
VeniceRequestEarlyTerminationException earlyTerminationException =
new VeniceRequestEarlyTerminationException(request.getStoreName());
writeAndFlushBadRequests(
context,
new HttpShortcutResponse(
earlyTerminationException.getMessage(),
earlyTerminationException.getHttpResponseStatus()));
return;
}
/**
* For now, we are evaluating whether parallel lookup is good overall or not.
* Eventually, we either pick up the new parallel implementation or keep the original one, so it is fine
* to have some duplicate code for the time-being.
*/
if (parallelBatchGetEnabled && request.getRequestType().equals(RequestType.MULTI_GET)) {
handleMultiGetRequestInParallel((MultiGetRouterRequestWrapper) request, parallelBatchGetChunkSize)
.whenComplete((v, e) -> {
if (e == null) {
writeAndFlush(context, v);
return;
}
if (e instanceof VeniceRequestEarlyTerminationException) {
VeniceRequestEarlyTerminationException earlyTerminationException =
(VeniceRequestEarlyTerminationException) e;
writeAndFlushBadRequests(
context,
new HttpShortcutResponse(
earlyTerminationException.getMessage(),
earlyTerminationException.getHttpResponseStatus()));
} else if (e instanceof VeniceNoStoreException) {
HttpResponseStatus status = getHttpResponseStatus((VeniceNoStoreException) e);
writeAndFlushBadRequests(
context,
new HttpShortcutResponse(
"No storage exists for: " + ((VeniceNoStoreException) e).getStoreName(),
status));
} else {
LOGGER.error("Exception thrown in parallel batch get for {}", request.getResourceName(), e);
HttpShortcutResponse shortcutResponse =
new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
shortcutResponse.setMisroutedStoreVersion(checkMisroutedStoreVersionRequest(request));
writeAndFlushBadRequests(context, shortcutResponse);
}
});
return;
}
// resourceReadUsageTracker.ifPresent(tracker -> tracker.recordReadUsage(request.getResourceName()));
// // Check before putting the request to the intermediate queue
// if (request.shouldRequestBeTerminatedEarly()) {
// // Try to make the response short
// VeniceRequestEarlyTerminationException earlyTerminationException =
// new VeniceRequestEarlyTerminationException(request.getStoreName());
// writeAndFlushBadRequests(
// context,
// new HttpShortcutResponse(
// earlyTerminationException.getMessage(),
// earlyTerminationException.getHttpResponseStatus()));
// return;
// }
// /**
// * For now, we are evaluating whether parallel lookup is good overall or not.
// * Eventually, we either pick up the new parallel implementation or keep the original one, so it is fine
// * to have some duplicate code for the time-being.
// */
// if (parallelBatchGetEnabled && request.getRequestType().equals(RequestType.MULTI_GET)) {
// handleMultiGetRequestInParallel((MultiGetRouterRequestWrapper) request, parallelBatchGetChunkSize)
// .whenComplete((v, e) -> {
// if (e == null) {
// writeAndFlush(context, v);
// return;
// }
// if (e instanceof VeniceRequestEarlyTerminationException) {
// VeniceRequestEarlyTerminationException earlyTerminationException =
// (VeniceRequestEarlyTerminationException) e;
// writeAndFlushBadRequests(
// context,
// new HttpShortcutResponse(
// earlyTerminationException.getMessage(),
// earlyTerminationException.getHttpResponseStatus()));
// } else if (e instanceof VeniceNoStoreException) {
// HttpResponseStatus status = getHttpResponseStatus((VeniceNoStoreException) e);
// writeAndFlushBadRequests(
// context,
// new HttpShortcutResponse(
// "No storage exists for: " + ((VeniceNoStoreException) e).getStoreName(),
// status));
// } else {
// LOGGER.error("Exception thrown in parallel batch get for {}", request.getResourceName(), e);
// HttpShortcutResponse shortcutResponse =
// new HttpShortcutResponse(e.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
// shortcutResponse.setMisroutedStoreVersion(checkMisroutedStoreVersionRequest(request));
// writeAndFlushBadRequests(context, shortcutResponse);
// }
// });
// return;
// }

final ThreadPoolExecutor executor = getExecutor(request.getRequestType());
executor.execute(() -> {
long startTime = System.nanoTime();
try {
nettyStats.incrementActiveReadHandlerThreads();

double submissionWaitTime = LatencyUtils.convertNSToMS(startTime - preSubmissionTimeNs);
try {
if (request.shouldRequestBeTerminatedEarly()) {
throw new VeniceRequestEarlyTerminationException(request.getStoreName());
}
double submissionWaitTime = LatencyUtils.getElapsedTimeFromNSToMS(preSubmissionTimeNs);
int queueLen = executor.getQueue().size();
ReadResponse response;
switch (request.getRequestType()) {
Expand Down Expand Up @@ -389,6 +389,7 @@ public void channelRead(ChannelHandlerContext context, Object message) throws Ex
} finally {
nettyStats.decrementActiveReadHandlerThreads();
nettyStats.decrementQueuedTasksForReadHandler();
nettyStats.recordTimeSpentInReadHandler(startTime);
}
});
nettyStats.incrementQueuedTasksForReadHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class VeniceServerNettyStats extends AbstractVeniceStats {
private final Sensor writeAndFlushTimeOkRequests;
private final Sensor writeAndFlushTimeBadRequests;
private final Sensor writeAndFlushCompletionTimeForDataRequest;
private final Sensor timeSpentInReadHandler;
private final AtomicInteger queuedTasksForReadHandler = new AtomicInteger();

public VeniceServerNettyStats(MetricsRepository metricsRepository, String name) {
Expand Down Expand Up @@ -60,6 +61,15 @@ public VeniceServerNettyStats(MetricsRepository metricsRepository, String name)
new Avg(),
TehutiUtils.getPercentileStat(
getName() + AbstractVeniceStats.DELIMITER + responseWriteAndFlushStartTimeNanosSensorName));

String timeSpentInReadHandlerSensorName = "TimeSpentInReadHandler";
timeSpentInReadHandler = registerSensorIfAbsent(
timeSpentInReadHandlerSensorName,
new OccurrenceRate(),
new Max(),
new Min(),
new Avg(),
TehutiUtils.getPercentileStat(getName() + AbstractVeniceStats.DELIMITER + timeSpentInReadHandlerSensorName));
}

public static long getElapsedTimeInMicros(long startTimeNanos) {
Expand Down Expand Up @@ -105,4 +115,8 @@ public void incrementQueuedTasksForReadHandler() {
public void decrementQueuedTasksForReadHandler() {
queuedTasksForReadHandler.decrementAndGet();
}

public void recordTimeSpentInReadHandler(long startTimeNanos) {
timeSpentInReadHandler.record(getElapsedTimeInMicros(startTimeNanos));
}
}

0 comments on commit 49a9c8f

Please sign in to comment.