Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use retry interceptor for streaming calls #400

Merged
merged 1 commit into from
Nov 13, 2024

Conversation

rishtigupta
Copy link
Contributor

@rishtigupta rishtigupta commented Nov 8, 2024

PR Description:

Overview:

  • This PR updates the RetryInterceptor to handle retries for both unary and streaming calls (e.g., getBatch) without requiring any code changes to the interceptor itself. The existing logic in RetryInterceptor uses the following condition to determine if a call should be retried:
if (!method.getType().clientSendsOneMessage()) {
  return channel.newCall(method, callOptions);
}

This leverages the method.getType().clientSendsOneMessage() check, where:

 public final boolean clientSendsOneMessage() {
            return this == UNARY || this == SERVER_STREAMING;
        }

Since both unary and server-side streaming calls return true for clientSendsOneMessage(), the existing interceptor logic is inherently compatible with server-side streaming calls. To enable retries for getBatch specifically, this PR simply adds it to the list of API calls that should trigger retries.

Enhancements:

  • Resource Cleanup: Introduced a cancelAttempt call to release resources if the client decides to stop retrying. This prevents the program from hanging by ensuring that any remaining resources are cleaned up.
  • API Whitelisting: Added getBatch to the list of retriable API calls, enabling retry behavior for streaming calls without modifications to the core interceptor logic.

Testing

To verify the retry functionality for streaming calls, I:

  • Configured a development cell to simulate real server-side behavior, including 5XX errors.
  • Developed a test program (below) that attempts to retrieve cached items using getBatch, retrying on failure. This allowed for validating that retries work as expected and that resources are released properly upon completion.
package momento.sdk;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import momento.sdk.auth.CredentialProvider;
import momento.sdk.config.Configuration;
import momento.sdk.config.Configurations;
import momento.sdk.responses.cache.GetBatchResponse;
import momento.sdk.responses.cache.SetBatchResponse;

public class Program {
  public static void main(String[] args) {
    String cacheName = "java-test-cache";
    CredentialProvider credentialProvider = CredentialProvider.fromEnvVar("MOMENTO_API_KEY");

    Configuration configuration = Configurations.Laptop.v1();
    CacheClient cacheClient =
            new CacheClientBuilder(credentialProvider, configuration, Duration.ofMinutes(1)).build();

    final Map<String, String> items = new HashMap<>();
    for (int i = 0; i < 10; i++) {
      items.put("key" + i, "val" + i);
    }

    // Set batch operation to populate cache (uncomment if needed)
    final SetBatchResponse setBatchResponse =
        cacheClient.setBatch(cacheName, items, Duration.ofMinutes(60)).join();
    if (setBatchResponse instanceof SetBatchResponse.Success) {
      System.out.println("Set batch successful");
    } else {
      System.out.println("Failed to set batch" + setBatchResponse.toString());
      SetBatchResponse.Error error = (SetBatchResponse.Error) setBatchResponse;
      System.out.println("Failed to set batch: " + error.getErrorCode());
    }

    while (true) {
      try {
        final GetBatchResponse getBatchResponse =
                cacheClient.getBatch(cacheName, items.keySet()).join();
        if (getBatchResponse instanceof GetBatchResponse.Success) {
          System.out.println("Get batch successful");
          Map<String, String> values =
                  ((GetBatchResponse.Success) getBatchResponse).valueMapStringString();
          for (Map.Entry<String, String> entry : values.entrySet()) {
            System.out.println("Key: " + entry.getKey() + " Value: " + entry.getValue());
          }
            // Exit the loop if all items are found
            if (values.size() == items.size()) {
                break;
            }
        } else {
          System.out.println("Failed to get batch: " + getBatchResponse.toString());
          GetBatchResponse.Error error = (GetBatchResponse.Error) getBatchResponse;
          System.out.println("Failed to get batch: " + error.getErrorCode());
        }
      } catch (Exception e) {
        System.out.println("Exception during getBatch: " + e.getMessage());
      }

      try {
        // Add a delay between each retry
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        break;
      }
    }

    System.out.println("Done");
    // cacheClient.close()
  }
}

This setup confirmed that RetryInterceptor effectively handles retries for both unary and streaming calls, ensuring robust error handling for getBatch requests.

Issue

#398

@rishtigupta rishtigupta marked this pull request as ready for review November 8, 2024 21:59
Copy link
Contributor

@anitarua anitarua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@@ -126,12 +124,13 @@ public void onClose(Status status, Metadata trailers) {
// a delay not present indicates we have exhausted retries or exceeded
// delay or any variable the strategy author wishes to not retry anymore
if (!retryDelay.isPresent()) {
cancelAttempt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@rishtigupta rishtigupta merged commit 29d38e1 into main Nov 13, 2024
5 checks passed
@rishtigupta rishtigupta deleted the feat/streaming-retries branch November 13, 2024 19:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants