Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use retry interceptor for streaming calls #400

Merged
merged 1 commit into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions momento-sdk/src/main/java/momento/sdk/RetryClientInterceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,27 @@
import javax.annotation.Nullable;
import momento.sdk.retry.RetryEligibilityStrategy;
import momento.sdk.retry.RetryStrategy;
import momento.sdk.retry.RetryingUnaryClientCall;
import momento.sdk.retry.RetryingClientCall;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Interceptor for retrying client calls with gRPC servers. This interceptor is responsible for
* handling retry logic when making unary (single request, single response) gRPC calls.
* handling retry logic when making unary (single request, single response) and streaming gRPC
* calls.
*
* <p>A {@link ClientCall} is essentially an instance of a gRPC invoker. Every gRPC interceptor
* expects us to return such client call(s) that it will execute in order. Each call has a "start"
* method, which is the entry point for the call.
*
* <p>This retry client interceptor returns an instance of a {@link RetryingUnaryClientCall}, which
* is a client call designed to handle retrying unary (single request, single response) operations.
* The interceptor uses a provided {@link RetryStrategy} to determine when and how to retry failed
* calls.
* <p>This retry client interceptor returns an instance of a {@link RetryingClientCall}, which is a
* client call designed to handle retrying unary (single request, single response) and streaming
* call operations. The interceptor uses a provided {@link RetryStrategy} to determine when and how
* to retry failed calls.
*
* <p>When a gRPC call is intercepted, the interceptor checks whether the method is unary (client
* sends one message), and if so, it wraps the original {@link ClientCall} with the {@link
* RetryingUnaryClientCall}. This custom call is responsible for handling the retry logic.
* sends one message) or streaming, and if so, it wraps the original {@link ClientCall} with the
* {@link RetryingClientCall}. This custom call is responsible for handling the retry logic.
*
* <p>When the gRPC call is closed, the {@code onClose} method is called, which is the point where
* we can safely check the status of the initial request that was made and determine if we want to
Expand All @@ -48,8 +49,6 @@
* we should not retry anymore), the interceptor propagates the final result to the original
* listener, effectively completing the call with the last status received.
*
* <p>Note that the interceptor only supports unary operations for retrying.
*
* @see RetryStrategy
* @see RetryEligibilityStrategy
* @param <ReqT> The type of the request message.
Expand All @@ -76,12 +75,12 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
final MethodDescriptor<ReqT, RespT> method,
final CallOptions callOptions,
final Channel channel) {
// currently the SDK only supports unary operations which we want to retry on

if (!method.getType().clientSendsOneMessage()) {
return channel.newCall(method, callOptions);
}

return new RetryingUnaryClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
return new RetryingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
private int attemptNumber = 0;
@Nullable private Future<?> future = null;

Expand All @@ -90,7 +89,6 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {

/**
* At this point, the ClientCall has been closed. Any additional calls to the
* ClientCall will not be processed by the server. The server does not send any
Expand Down Expand Up @@ -126,12 +124,13 @@ public void onClose(Status status, Metadata trailers) {
// a delay not present indicates we have exhausted retries or exceeded
// delay or any variable the strategy author wishes to not retry anymore
if (!retryDelay.isPresent()) {
cancelAttempt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

super.onClose(status, trailers);
return;
}

logger.debug(
"Retrying request {} on error code {} with delay {} millisecodns",
"Retrying request {} on error code {} with delay {} milliseconds",
method.getFullMethodName(),
status.getCode().toString(),
retryDelay.get().toMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class DefaultRetryEligibilityStrategy implements RetryEligibilityStrategy
add("cache_client.Scs/ListLength");
// not idempotent: "/cache_client.Scs/ListConcatenateFront",
// not idempotent: "/cache_client.Scs/ListConcatenateBack"
add("cache_client.Scs/GetBatch");
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,20 @@

/**
* A custom implementation of {@link ClientCall} that handles retrying unary (single request, single
* response) operations. This class is used by the RetryClientInterceptor to manage retry logic for
* failed gRPC calls.
* response) and streaming call operations. This class is used by the RetryClientInterceptor to
* manage retry logic for failed gRPC calls.
*
* <p>The {@code RetryingUnaryClientCall} wraps an original {@link ClientCall} and intercepts the
* methods related to starting, sending messages, and handling the response. If the original call
* encounters an error, the interceptor schedules a retry attempt based on the configured retry
* strategy and eligibility rules.
* <p>The {@code RetryingClientCall} wraps an original {@link ClientCall} and intercepts the methods
* related to starting, sending messages, and handling the response. If the original call encounters
* an error, the interceptor schedules a retry attempt based on the configured retry strategy and
* eligibility rules.
*
* <p>Each instance of {@code RetryingUnaryClientCall} maintains its own state, including the
* request message, response listener, headers, and other properties specific to the call. When a
* retry is needed, a new instance of this class is created with the original request details, and
* the retry attempt is initiated.
*
* <p>Note that this implementation assumes that the gRPC call is unary, meaning the client sends
* one message and receives one response. For streaming calls or other call types, a different
* approach or implementation would be required.
* <p>Each instance of {@code RetryingClientCall} maintains its own state, including the request
* message, response listener, headers, and other properties specific to the call. When a retry is
* needed, a new instance of this class is created with the original request details, and the retry
* attempt is initiated.
*/
public class RetryingUnaryClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
public class RetryingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {

private ClientCall<ReqT, RespT> delegate;
private Listener<RespT> responseListener;
Expand All @@ -34,12 +30,12 @@ public class RetryingUnaryClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT
private boolean compressionEnabled;

/**
* Constructs a new instance of {@code RetryingUnaryClientCall} with the provided delegate.
* Constructs a new instance of {@code RetryingClientCall} with the provided delegate.
*
* @param delegate The original {@link ClientCall} to be wrapped and managed by this retrying
* call.
*/
public RetryingUnaryClientCall(final ClientCall<ReqT, RespT> delegate) {
public RetryingClientCall(final ClientCall<ReqT, RespT> delegate) {
this.delegate = delegate;
}

Expand Down
Loading