Skip to content

Commit

Permalink
allow user to peek at response before parsing.
Browse files Browse the repository at this point in the history
  • Loading branch information
jpe7s committed Aug 30, 2024
1 parent 61c8798 commit 498a141
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

import static java.net.http.HttpRequest.BodyPublishers.ofString;
import static java.net.http.HttpResponse.BodyHandlers.ofByteArray;
Expand All @@ -19,13 +19,16 @@ public abstract class JsonHttpClient {
protected final URI endpoint;
protected final HttpClient httpClient;
protected final Duration requestTimeout;
protected final Predicate<HttpResponse<byte[]>> applyResponse;

public JsonHttpClient(final URI endpoint,
final HttpClient httpClient,
final Duration requestTimeout) {
protected JsonHttpClient(final URI endpoint,
final HttpClient httpClient,
final Duration requestTimeout,
final Predicate<HttpResponse<byte[]>> applyResponse) {
this.endpoint = endpoint;
this.httpClient = httpClient;
this.requestTimeout = requestTimeout;
this.applyResponse = applyResponse;
}

public final URI endpoint() {
Expand All @@ -40,8 +43,8 @@ protected static <R> Function<HttpResponse<byte[]>, R> applyResponse(final Funct
return new JsonResponseController<>(adapter);
}

protected static <R> Function<HttpResponse<String>, R> applyResponse(final BiFunction<String, JsonIterator, R> adapter) {
return new KeepJsonStringResponseController<>(adapter);
protected <R> Function<HttpResponse<byte[]>, R> wrapParser(final Function<HttpResponse<byte[]>, R> parser) {
return applyResponse == null ? parser : response -> applyResponse.test(response) ? parser.apply(response) : null;
}

protected HttpRequest.Builder newRequest(final URI endpoint, final Duration requestTimeout) {
Expand Down Expand Up @@ -104,7 +107,9 @@ protected final <R> CompletableFuture<R> sendPostRequest(final URI endpoint,
final Duration requestTimeout,
final String body) {
// System.out.println(body);
return httpClient.sendAsync(newPostRequest(endpoint, requestTimeout, body), ofByteArray()).thenApply(parser);
return httpClient
.sendAsync(newPostRequest(endpoint, requestTimeout, body), ofByteArray())
.thenApply(wrapParser(parser));
}

protected final <R> CompletableFuture<R> sendPostRequest(final Function<HttpResponse<byte[]>, R> parser,
Expand All @@ -127,22 +132,57 @@ protected final <R> CompletableFuture<R> sendPostRequest(final URI endpoint,

protected final <R> CompletableFuture<R> sendGetRequest(final Function<HttpResponse<byte[]>, R> parser,
final String path) {
return httpClient.sendAsync(newGetRequest(path).build(), ofByteArray()).thenApply(parser);
return httpClient
.sendAsync(newGetRequest(path).build(), ofByteArray())
.thenApply(wrapParser(parser));
}

protected final <R> CompletableFuture<R> sendGetRequest(final URI endpoint,
final Function<HttpResponse<byte[]>, R> parser) {
return httpClient.sendAsync(newGetRequest(endpoint).build(), ofByteArray()).thenApply(parser);
return httpClient
.sendAsync(newGetRequest(endpoint).build(), ofByteArray())
.thenApply(wrapParser(parser));
}

protected final <R> CompletableFuture<R> sendGetRequestWithStringResponse(final Function<HttpResponse<String>, R> parser,
final Duration requestTimeout,
final String path) {
return httpClient.sendAsync(newGetRequest(path, requestTimeout).build(), HttpResponse.BodyHandlers.ofString()).thenApply(parser);
protected final <R> CompletableFuture<R> sendPostRequestNoWrap(final URI endpoint,
final Function<HttpResponse<byte[]>, R> parser,
final Duration requestTimeout,
final String body) {
// System.out.println(body);
return httpClient
.sendAsync(newPostRequest(endpoint, requestTimeout, body), ofByteArray())
.thenApply(parser);
}

protected final <R> CompletableFuture<R> sendPostRequestNoWrap(final Function<HttpResponse<byte[]>, R> parser,
final Duration requestTimeout,
final String body) {
return sendPostRequestNoWrap(endpoint, parser, requestTimeout, body);
}

protected final <R> CompletableFuture<R> sendPostRequestNoWrap(final Function<HttpResponse<byte[]>, R> parser,
final String body) {
// System.out.println(body);
return sendPostRequestNoWrap(parser, requestTimeout, body);
}

protected final <R> CompletableFuture<R> sendPostRequestNoWrap(final URI endpoint,
final Function<HttpResponse<byte[]>, R> parser,
final String body) {
return sendPostRequestNoWrap(endpoint, parser, requestTimeout, body);
}

protected final <R> CompletableFuture<R> sendGetRequestNoWrap(final Function<HttpResponse<byte[]>, R> parser,
final String path) {
return httpClient
.sendAsync(newGetRequest(path).build(), ofByteArray())
.thenApply(parser);
}

protected final <R> CompletableFuture<R> sendGetRequestWithStringResponse(final Function<HttpResponse<String>, R> parser,
final String path) {
return sendGetRequestWithStringResponse(parser, requestTimeout, path);
protected final <R> CompletableFuture<R> sendGetRequestNoWrap(final URI endpoint,
final Function<HttpResponse<byte[]>, R> parser) {
return httpClient
.sendAsync(newGetRequest(endpoint).build(), ofByteArray())
.thenApply(parser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

import static software.sava.rpc.json.http.client.JsonResponseController.throwUncheckedIOException;

public abstract class JsonRpcHttpClient extends JsonHttpClient {

public JsonRpcHttpClient(final URI endpoint, final HttpClient httpClient, final Duration requestTimeout) {
super(endpoint, httpClient, requestTimeout);
public JsonRpcHttpClient(final URI endpoint,
final HttpClient httpClient,
final Duration requestTimeout,
final Predicate<HttpResponse<byte[]>> applyResponse) {
super(endpoint, httpClient, requestTimeout, applyResponse);
}

static JsonIterator createJsonIterator(final HttpResponse<byte[]> httpResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.lang.String.format;
Expand All @@ -30,7 +31,7 @@
final class SolanaJsonRpcClient extends JsonRpcHttpClient implements SolanaRpcClient {

static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(8);
private static final Duration PROGRAM_ACCOUNTS_TIMEOUT = Duration.ofSeconds(120);
static final Duration PROGRAM_ACCOUNTS_TIMEOUT = Duration.ofSeconds(120);

private static final Function<HttpResponse<byte[]>, LatestBlockHash> LATEST_BLOCK_HASH = applyResponseValue(LatestBlockHash::parse);
private static final Function<HttpResponse<byte[]>, Lamports> CONTEXT_LONG_VAL = applyResponseValue(Lamports::parse);
Expand Down Expand Up @@ -91,14 +92,19 @@ final class SolanaJsonRpcClient extends JsonRpcHttpClient implements SolanaRpcCl

private final AtomicLong id;
private final Commitment defaultCommitment;
private final Function<HttpResponse<byte[]>, String> sendTxResponseParser;
private final Function<HttpResponse<byte[]>, LatestBlockHash> latestBlockhashResponseParser;

SolanaJsonRpcClient(final URI endpoint,
final HttpClient httpClient,
final Duration requestTimeout,
final Predicate<HttpResponse<byte[]>> applyResponse,
final Commitment defaultCommitment) {
super(endpoint, httpClient, requestTimeout);
super(endpoint, httpClient, requestTimeout, applyResponse);
this.id = new AtomicLong(System.currentTimeMillis());
this.defaultCommitment = defaultCommitment;
this.latestBlockhashResponseParser = wrapParser(LATEST_BLOCK_HASH);
this.sendTxResponseParser = wrapParser(SEND_TX_RESPONSE_PARSER);
}

@Override
Expand Down Expand Up @@ -388,7 +394,7 @@ public CompletableFuture<LatestBlockHash> getLatestBlockHash() {

@Override
public CompletableFuture<LatestBlockHash> getLatestBlockHash(final Commitment commitment) {
return sendPostRequest(LATEST_BLOCK_HASH, format("""
return sendPostRequestNoWrap(latestBlockhashResponseParser, format("""
{"jsonrpc":"2.0","id":%d,"method":"getLatestBlockhash","params":[{"commitment":"%s"}]}""",
id.incrementAndGet(), commitment.getValue()));
}
Expand Down Expand Up @@ -478,28 +484,31 @@ public <T> CompletableFuture<List<AccountInfo<T>>> getMultipleAccounts(final Com
}

@Override
public <T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final PublicKey programId,
public <T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final Duration requestTimeout,
final PublicKey programId,
final BiFunction<PublicKey, byte[], T> factory) {
return getProgramAccounts(programId, null, factory);
return getProgramAccounts(requestTimeout, programId, null, factory);
}

@Override
public <T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final PublicKey programId,
public <T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final Duration requestTimeout,
final PublicKey programId,
final List<Filter> filters,
final BiFunction<PublicKey, byte[], T> factory) {
return getProgramAccounts(programId, defaultCommitment, filters, factory);
return getProgramAccounts(requestTimeout, programId, defaultCommitment, filters, factory);
}

@Override
public <T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final PublicKey programId,
public <T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final Duration requestTimeout,
final PublicKey programId,
final Commitment commitment,
final List<Filter> filters,
final BiFunction<PublicKey, byte[], T> factory) {
final var filtersJson = filters == null || filters.isEmpty() ? "" : filters.stream()
.map(Filter::toJson)
.collect(Collectors.joining(",", ",\"filters\":[", "]"));
return sendPostRequest(applyResponseValue((ji, context) -> AccountInfo.parseAccounts(ji, context, factory)),
PROGRAM_ACCOUNTS_TIMEOUT,
requestTimeout,
format("""
{"jsonrpc":"2.0","id":%d,"method":"getProgramAccounts","params":["%s",{"commitment":"%s","withContext":true,"encoding":"base64"%s}]}""",
id.incrementAndGet(), programId.toBase58(), commitment.getValue(), filtersJson));
Expand Down Expand Up @@ -848,14 +857,14 @@ public CompletableFuture<String> sendTransaction(final String base64SignedTx, fi

@Override
public CompletableFuture<String> sendTransaction(final Commitment preflightCommitment, final String base64SignedTx, final int maxRetries) {
return sendPostRequest(SEND_TX_RESPONSE_PARSER, format("""
return sendPostRequestNoWrap(sendTxResponseParser, format("""
{"jsonrpc":"2.0","id":%d,"method":"sendTransaction","params":["%s",{"encoding":"base64","preflightCommitment":"%s","maxRetries":%d}]}""",
id.incrementAndGet(), base64SignedTx, preflightCommitment.getValue(), maxRetries));
}

@Override
public CompletableFuture<String> sendTransactionSkipPreflight(final Commitment preflightCommitment, final String base64SignedTx, final int maxRetries) {
return sendPostRequest(SEND_TX_RESPONSE_PARSER, format("""
return sendPostRequestNoWrap(sendTxResponseParser, format("""
{"jsonrpc":"2.0","id":%d,"method":"sendTransaction","params":["%s",{"encoding":"base64","skipPreflight":true,"preflightCommitment":"%s","maxRetries":%d}]}""",
id.incrementAndGet(), base64SignedTx, preflightCommitment.getValue(), maxRetries));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@
import software.sava.core.accounts.token.TokenAccount;
import software.sava.core.rpc.Filter;
import software.sava.core.tx.Transaction;
import software.sava.rpc.json.http.SolanaNetwork;
import software.sava.rpc.json.http.request.Commitment;
import software.sava.rpc.json.http.request.ContextBoolVal;
import software.sava.rpc.json.http.response.*;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Predicate;

import static software.sava.rpc.json.http.client.SolanaJsonRpcClient.DEFAULT_REQUEST_TIMEOUT;
import static software.sava.rpc.json.http.client.SolanaJsonRpcClient.PROGRAM_ACCOUNTS_TIMEOUT;
import static software.sava.rpc.json.http.request.Commitment.CONFIRMED;
import static software.sava.rpc.json.http.request.Commitment.PROCESSED;
import static software.sava.rpc.json.http.response.AccountInfo.BYTES_IDENTITY;
Expand All @@ -29,31 +31,29 @@ public interface SolanaRpcClient {

int MAX_MULTIPLE_ACCOUNTS = 100;

static SolanaRpcClient createHttpClient(final URI endpoint,
final HttpClient httpClient,
final Duration requestTimeout) {
return new SolanaJsonRpcClient(endpoint, httpClient, requestTimeout, CONFIRMED);
static SolanaRpcClient createClient(final URI endpoint,
final HttpClient httpClient,
final Duration requestTimeout,
final Predicate<HttpResponse<byte[]>> applyResponse,
final Commitment defaultCommitment) {
return new SolanaJsonRpcClient(endpoint, httpClient, requestTimeout, applyResponse, defaultCommitment);
}

static SolanaRpcClient createHttpClient(final URI endpoint,
final HttpClient httpClient) {
return createHttpClient(endpoint, httpClient, DEFAULT_REQUEST_TIMEOUT);
static SolanaRpcClient createClient(final URI endpoint,
final HttpClient httpClient,
final Predicate<HttpResponse<byte[]>> applyResponse) {
return createClient(endpoint, httpClient, DEFAULT_REQUEST_TIMEOUT, applyResponse, CONFIRMED);
}

static SolanaRpcClient createHttpClient(final URI endpoint) {
return createHttpClient(endpoint, HttpClient.newHttpClient());
static SolanaRpcClient createClient(final URI endpoint,
final HttpClient httpClient,
final Duration requestTimeout,
final Commitment defaultCommitment) {
return createClient(endpoint, httpClient, requestTimeout, null, defaultCommitment);
}

static SolanaRpcClient createHttpClient(final SolanaNetwork network) {
return createHttpClient(network.getEndpoint(), HttpClient.newHttpClient());
}

static SolanaRpcClient createHttpClient(final SolanaNetwork network, final HttpClient httpClient) {
return createHttpClient(network.getEndpoint(), httpClient);
}

static SolanaRpcClient createHttpClient(final String endpoint, final HttpClient httpClient) {
return createHttpClient(URI.create(endpoint), httpClient);
static SolanaRpcClient createClient(final URI endpoint, final HttpClient httpClient) {
return createClient(endpoint, httpClient, DEFAULT_REQUEST_TIMEOUT, null, CONFIRMED);
}

URI endpoint();
Expand Down Expand Up @@ -226,14 +226,35 @@ default CompletableFuture<List<AccountInfo<byte[]>>> getProgramAccounts(final Pu
return getProgramAccounts(programId, commitment, filters, BYTES_IDENTITY);
}

<T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final PublicKey programId,
default <T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final PublicKey programId,
final BiFunction<PublicKey, byte[], T> factory) {
return getProgramAccounts(PROGRAM_ACCOUNTS_TIMEOUT, programId, factory);
}

default <T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final PublicKey programId,
final List<Filter> filters,
final BiFunction<PublicKey, byte[], T> factory) {
return getProgramAccounts(PROGRAM_ACCOUNTS_TIMEOUT, programId, filters, factory);
}

default <T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final PublicKey programId,
final Commitment commitment,
final List<Filter> filters,
final BiFunction<PublicKey, byte[], T> factory) {
return getProgramAccounts(PROGRAM_ACCOUNTS_TIMEOUT, programId, commitment, filters, factory);
}

<T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final Duration requestTimeout,
final PublicKey programId,
final BiFunction<PublicKey, byte[], T> factory);

<T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final PublicKey programId,
<T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final Duration requestTimeout,
final PublicKey programId,
final List<Filter> filters,
final BiFunction<PublicKey, byte[], T> factory);

<T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final PublicKey programId,
<T> CompletableFuture<List<AccountInfo<T>>> getProgramAccounts(final Duration requestTimeout,
final PublicKey programId,
final Commitment commitment,
final List<Filter> filters,
final BiFunction<PublicKey, byte[], T> factory);
Expand Down
Loading

0 comments on commit 498a141

Please sign in to comment.