Skip to content

Commit

Permalink
feat: Add high-level wrappers for GetResponse (#46)
Browse files Browse the repository at this point in the history
* feat: Add high-level wrappers for GetResponse

1. Update the GetResponse Object to internally store ByteString. This allows our wrapper to use the utility methods on ByteString rather than having to author our own.
2. Added different entry points into set to allow customer options to
   set(String key, String value, ttl)
   set(String, ByteBuffer value, ttl)
   set(byte[] key, byte[] value, ttl)

* update method names

* update java doc

* update exception construction
  • Loading branch information
gautamomento committed Sep 30, 2021
1 parent 674812b commit f52b87a
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 38 deletions.
12 changes: 6 additions & 6 deletions momento-sdk/src/intTest/java/momento/sdk/CacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ private static void testHappyPath(Cache cache) {
assertEquals(MomentoCacheResult.Ok, setRsp.getResult());

// Get Key that was just set
ClientGetResponse<ByteBuffer> rsp = cache.get(key);
ClientGetResponse rsp = cache.get(key);
assertEquals(MomentoCacheResult.Hit, rsp.getResult());
assertEquals("bar", StandardCharsets.US_ASCII.decode(rsp.getBody()).toString());
assertEquals("bar", rsp.asStringUtf8());
}

@Test
Expand Down Expand Up @@ -131,10 +131,10 @@ private static void testAsyncHappyPath(Cache client) throws Exception {
assertEquals(MomentoCacheResult.Ok, setRsp.toCompletableFuture().get().getResult());

// Get Key Async
ClientGetResponse<ByteBuffer> rsp = client.getAsync(key).toCompletableFuture().get();
ClientGetResponse rsp = client.getAsync(key).toCompletableFuture().get();

assertEquals(MomentoCacheResult.Hit, rsp.getResult());
assertEquals("bar", StandardCharsets.US_ASCII.decode(rsp.getBody()).toString());
assertEquals("bar", rsp.asStringUtf8());
}

@Test
Expand Down Expand Up @@ -165,7 +165,7 @@ private static void testTtlHappyPath(Cache client) throws Exception {
Thread.sleep(1500);

// Get Key that was just set
ClientGetResponse<ByteBuffer> rsp = client.get(key);
ClientGetResponse rsp = client.get(key);
assertEquals(MomentoCacheResult.Miss, rsp.getResult());
}

Expand All @@ -188,7 +188,7 @@ void testMissHappyPathWithTracing() throws Exception {

private static void testMissHappyPathInternal(Cache client) {
// Get Key that was just set
ClientGetResponse<ByteBuffer> rsp = client.get(UUID.randomUUID().toString());
ClientGetResponse rsp = client.get(UUID.randomUUID().toString());

assertEquals(MomentoCacheResult.Miss, rsp.getResult());
}
Expand Down
4 changes: 2 additions & 2 deletions momento-sdk/src/intTest/java/momento/sdk/MomentoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ void testHappyPath() {
assertEquals(MomentoCacheResult.Ok, setRsp.getResult());

// Get Key that was just set
ClientGetResponse<ByteBuffer> rsp = cache.get(key);
ClientGetResponse rsp = cache.get(key);
assertEquals(MomentoCacheResult.Hit, rsp.getResult());
assertEquals("bar", StandardCharsets.US_ASCII.decode(rsp.getBody()).toString());
assertEquals("bar", rsp.asStringUtf8());
}

// TODO: Update this to be recreated each time and add a separate test case for Already Exists
Expand Down
64 changes: 40 additions & 24 deletions momento-sdk/src/main/java/momento/sdk/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.ImplicitContextKeyed;
import io.opentelemetry.context.Scope;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -36,7 +35,6 @@
import java.util.concurrent.CompletionStage;
import javax.net.ssl.SSLException;
import momento.sdk.exceptions.CacheServiceExceptionMapper;
import momento.sdk.exceptions.ClientSdkException;
import momento.sdk.messages.ClientGetResponse;
import momento.sdk.messages.ClientSetResponse;

Expand Down Expand Up @@ -135,17 +133,15 @@ public Cache(
* java.util.concurrent.CompletionStage<ClientGetResponse>} returned instead.
*
* @param key the key of item to fetch from cache
* @return {@link ClientGetResponse} with the response object as a {@link java.nio.ByteBuffer}
* @return {@link ClientGetResponse} with the response object
* @throws IOException if an error occurs opening input stream for response body.
*/
public ClientGetResponse<ByteBuffer> get(String key) {
public ClientGetResponse get(String key) {
Optional<Span> span = buildSpan("java-sdk-get-request");
try (Scope ignored = (span.map(ImplicitContextKeyed::makeCurrent).orElse(null))) {
GetResponse rsp = blockingStub.get(buildGetRequest(key));

ByteBuffer body = rsp.getCacheBody().asReadOnlyByteBuffer();
ClientGetResponse<java.nio.ByteBuffer> clientGetResponse =
new ClientGetResponse<>(rsp.getResult(), body);
ClientGetResponse clientGetResponse =
new ClientGetResponse(rsp.getResult(), rsp.getCacheBody());
span.ifPresent(theSpan -> theSpan.setStatus(StatusCode.OK));
return clientGetResponse;
} catch (Exception e) {
Expand All @@ -172,6 +168,18 @@ public ClientGetResponse<ByteBuffer> get(String key) {
* @throws IOException if an error occurs opening ByteBuffer for request body.
*/
public ClientSetResponse set(String key, ByteBuffer value, int ttlSeconds) {
return set(convert(key), convert(value), ttlSeconds);
}

public ClientSetResponse set(String key, String value, int ttlSeconds) {
return set(convert(key), convert(value), ttlSeconds);
}

public ClientSetResponse set(byte[] key, byte[] value, int ttlSeconds) {
return set(convert(key), convert(value), ttlSeconds);
}

private ClientSetResponse set(ByteString key, ByteString value, int ttlSeconds) {
Optional<Span> span = buildSpan("java-sdk-set-request");
try (Scope ignored = (span.map(ImplicitContextKeyed::makeCurrent).orElse(null))) {
SetResponse rsp = blockingStub.set(buildSetRequest(key, value, ttlSeconds * 1000));
Expand Down Expand Up @@ -200,15 +208,15 @@ public ClientSetResponse set(String key, ByteBuffer value, int ttlSeconds) {
* CompletionStage interface wrapping standard ClientResponse with response object as a {@link
* java.io.InputStream}.
*/
public CompletionStage<ClientGetResponse<ByteBuffer>> getAsync(String key) {
public CompletionStage<ClientGetResponse> getAsync(String key) {
Optional<Span> span = buildSpan("java-sdk-get-request");
Optional<Scope> scope = (span.map(ImplicitContextKeyed::makeCurrent));
// Submit request to non-blocking stub
ListenableFuture<GetResponse> rspFuture = futureStub.get(buildGetRequest(key));

// Build a CompletableFuture to return to caller
CompletableFuture<ClientGetResponse<ByteBuffer>> returnFuture =
new CompletableFuture<ClientGetResponse<ByteBuffer>>() {
CompletableFuture<ClientGetResponse> returnFuture =
new CompletableFuture<ClientGetResponse>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// propagate cancel to the listenable future if called on returned completable future
Expand All @@ -224,8 +232,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
new FutureCallback<GetResponse>() {
@Override
public void onSuccess(GetResponse rsp) {
ByteBuffer body = rsp.getCacheBody().asReadOnlyByteBuffer();
returnFuture.complete(new ClientGetResponse<>(rsp.getResult(), body));
returnFuture.complete(new ClientGetResponse(rsp.getResult(), rsp.getCacheBody()));
span.ifPresent(
theSpan -> {
theSpan.setStatus(StatusCode.OK);
Expand Down Expand Up @@ -265,14 +272,15 @@ public void onFailure(Throwable e) {
* CompletionStage interface wrapping standard ClientSetResponse.
* @throws IOException if an error occurs opening ByteBuffer for request body.
*/
// TODO: Update Async methods to support different input params.
public CompletionStage<ClientSetResponse> setAsync(String key, ByteBuffer value, int ttlSeconds) {

Optional<Span> span = buildSpan("java-sdk-set-request");
Optional<Scope> scope = (span.map(ImplicitContextKeyed::makeCurrent));

// Submit request to non-blocking stub
ListenableFuture<SetResponse> rspFuture =
futureStub.set(buildSetRequest(key, value, ttlSeconds * 1000));
futureStub.set(buildSetRequest(convert(key), convert(value), ttlSeconds * 1000));

// Build a CompletableFuture to return to caller
CompletableFuture<ClientSetResponse> returnFuture =
Expand Down Expand Up @@ -331,16 +339,24 @@ private GetRequest buildGetRequest(String key) {
.build();
}

private SetRequest buildSetRequest(String key, ByteBuffer value, int ttl) {
try {
return SetRequest.newBuilder()
.setCacheKey(ByteString.copyFrom(key, StandardCharsets.UTF_8))
.setCacheBody(ByteString.readFrom(new ByteArrayInputStream(value.array())))
.setTtlMilliseconds(ttl)
.build();
} catch (IOException e) {
throw new ClientSdkException("Failed to create request.");
}
private SetRequest buildSetRequest(ByteString key, ByteString value, int ttl) {
return SetRequest.newBuilder()
.setCacheKey(key)
.setCacheBody(value)
.setTtlMilliseconds(ttl)
.build();
}

private ByteString convert(String stringToEncode) {
return ByteString.copyFromUtf8(stringToEncode);
}

private ByteString convert(byte[] bytes) {
return ByteString.copyFrom(bytes);
}

private ByteString convert(ByteBuffer byteBuffer) {
return ByteString.copyFrom(byteBuffer);
}

private Optional<Span> buildSpan(String spanName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ public class SdkException extends RuntimeException {

public SdkException(String message, Throwable cause) {
super(message, cause);
this.initCause(cause);
}

public SdkException(String message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package momento.sdk.messages;

import com.google.protobuf.ByteString;
import grpc.cache_client.ECacheResult;
import java.nio.ByteBuffer;

public final class ClientGetResponse<T> extends BaseResponse {
private final T body;
public final class ClientGetResponse extends BaseResponse {
private final ByteString body;
private final ECacheResult result;

public ClientGetResponse(ECacheResult result, T body) {
public ClientGetResponse(ECacheResult result, ByteString body) {
this.body = body;
this.result = result;
}
Expand All @@ -15,7 +17,20 @@ public MomentoCacheResult getResult() {
return this.resultMapper(this.result);
}

public T getBody() {
return body;
public byte[] asByteArray() {
return body.toByteArray();
}

public ByteBuffer asByteBuffer() {
return body.asReadOnlyByteBuffer();
}

/**
* Converts the value read from cache to a UTF-8 String
*
* @return
*/
public String asStringUtf8() {
return body.toStringUtf8();
}
}

0 comments on commit f52b87a

Please sign in to comment.