Skip to content

Commit

Permalink
[common] Retry controller client request on exceptions (#1165)
Browse files Browse the repository at this point in the history
Today, "ControllerClient#retryableRequest" retries when the controller returns an error response. It doesn't retry when an exception is thrown. We need to handle exceptions as an error condition and retry till the retryAttempt count is reached.

Also it misses to retry VeniceException from getLeaderControllerUrl which could lead to data recovery pushes to fail on a single unsuccessful controller discovery call.
---------

Co-authored-by: Sourav Maji <tester@linkedin.com>
  • Loading branch information
majisourav99 and Sourav Maji authored Sep 11, 2024
1 parent d4bf2ba commit 74e5e82
Showing 1 changed file with 30 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -713,24 +713,39 @@ public static <C extends ControllerClient, R extends ControllerResponse> R retry
throw new VeniceException(
"Querying with retries requires at least one attempt, called with " + totalAttempts + " attempts");
}
int currentAttempt = 1;
while (true) {
R response = request.apply(client);
Exception exception = null;
R response = null;

for (int currentAttempt = 1; currentAttempt <= totalAttempts; currentAttempt++) {
try {
response = request.apply(client);
} catch (Exception e) {
exception = e;
}
// Do not retry if value schema is not found. TODO: Ideally response should not be an error but should return
// INVALID schema ID in the response.
if (!response.isError() || currentAttempt == totalAttempts || valueSchemaNotFoundSchemaResponse(response)
|| abortRetryCondition.apply(response)) {
if (exception == null && (!response.isError() || valueSchemaNotFoundSchemaResponse(response)
|| abortRetryCondition.apply(response))) {
return response;
} else {
LOGGER.warn(
"Error on attempt {}/{} of querying the Controller: {}",
currentAttempt,
totalAttempts,
response.getError());
currentAttempt++;
if (exception != null) {
LOGGER
.warn("Exception on attempt {}/{} of querying the Controller", currentAttempt, totalAttempts, exception);
} else {
LOGGER.warn(
"Error on attempt {}/{} of querying the Controller: {}",
currentAttempt,
totalAttempts,
response.getError());
}
Utils.sleep(2000);
}
}
if (exception != null) {
throw new VeniceException("Could not execute query even after " + totalAttempts + " attempts.", exception);
} else {
return response;
}
}

private static <R> boolean valueSchemaNotFoundSchemaResponse(R response) {
Expand Down Expand Up @@ -1434,6 +1449,8 @@ private <T extends ControllerResponse> T request(
}
// leader controller has changed. Let's wait for a new leader to realize it.
lastException = e;
} catch (Exception e) {
lastException = e;
}

if (attempt < maxAttempts) {
Expand All @@ -1459,14 +1476,14 @@ private <T extends ControllerResponse> T request(
return makeErrorResponse(message, lastException, responseType, logErrorMessage);
}

private <T extends ControllerResponse> T makeErrorResponse(
private static <T extends ControllerResponse> T makeErrorResponse(
String message,
Exception exception,
Class<T> responseType) {
return makeErrorResponse(message, exception, responseType, true);
}

private <T extends ControllerResponse> T makeErrorResponse(
private static <T extends ControllerResponse> T makeErrorResponse(
String message,
Exception exception,
Class<T> responseType,
Expand Down

0 comments on commit 74e5e82

Please sign in to comment.