diff --git a/rpc/src/main/java/software/sava/rpc/json/http/client/JsonHttpClient.java b/rpc/src/main/java/software/sava/rpc/json/http/client/JsonHttpClient.java index 1115d3c..1510e27 100644 --- a/rpc/src/main/java/software/sava/rpc/json/http/client/JsonHttpClient.java +++ b/rpc/src/main/java/software/sava/rpc/json/http/client/JsonHttpClient.java @@ -8,8 +8,8 @@ import java.net.http.HttpResponse; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Predicate; import static java.net.http.HttpRequest.BodyPublishers.ofString; import static java.net.http.HttpResponse.BodyHandlers.ofByteArray; @@ -19,13 +19,16 @@ public abstract class JsonHttpClient { protected final URI endpoint; protected final HttpClient httpClient; protected final Duration requestTimeout; + protected final Predicate> applyResponse; - public JsonHttpClient(final URI endpoint, - final HttpClient httpClient, - final Duration requestTimeout) { + protected JsonHttpClient(final URI endpoint, + final HttpClient httpClient, + final Duration requestTimeout, + final Predicate> applyResponse) { this.endpoint = endpoint; this.httpClient = httpClient; this.requestTimeout = requestTimeout; + this.applyResponse = applyResponse; } public final URI endpoint() { @@ -40,8 +43,8 @@ protected static Function, R> applyResponse(final Funct return new JsonResponseController<>(adapter); } - protected static Function, R> applyResponse(final BiFunction adapter) { - return new KeepJsonStringResponseController<>(adapter); + protected Function, R> wrapParser(final Function, R> parser) { + return applyResponse == null ? parser : response -> applyResponse.test(response) ? parser.apply(response) : null; } protected HttpRequest.Builder newRequest(final URI endpoint, final Duration requestTimeout) { @@ -104,7 +107,9 @@ protected final CompletableFuture sendPostRequest(final URI endpoint, final Duration requestTimeout, final String body) { // System.out.println(body); - return httpClient.sendAsync(newPostRequest(endpoint, requestTimeout, body), ofByteArray()).thenApply(parser); + return httpClient + .sendAsync(newPostRequest(endpoint, requestTimeout, body), ofByteArray()) + .thenApply(wrapParser(parser)); } protected final CompletableFuture sendPostRequest(final Function, R> parser, @@ -127,22 +132,57 @@ protected final CompletableFuture sendPostRequest(final URI endpoint, protected final CompletableFuture sendGetRequest(final Function, R> parser, final String path) { - return httpClient.sendAsync(newGetRequest(path).build(), ofByteArray()).thenApply(parser); + return httpClient + .sendAsync(newGetRequest(path).build(), ofByteArray()) + .thenApply(wrapParser(parser)); } protected final CompletableFuture sendGetRequest(final URI endpoint, final Function, R> parser) { - return httpClient.sendAsync(newGetRequest(endpoint).build(), ofByteArray()).thenApply(parser); + return httpClient + .sendAsync(newGetRequest(endpoint).build(), ofByteArray()) + .thenApply(wrapParser(parser)); } - protected final CompletableFuture sendGetRequestWithStringResponse(final Function, R> parser, - final Duration requestTimeout, - final String path) { - return httpClient.sendAsync(newGetRequest(path, requestTimeout).build(), HttpResponse.BodyHandlers.ofString()).thenApply(parser); + protected final CompletableFuture sendPostRequestNoWrap(final URI endpoint, + final Function, R> parser, + final Duration requestTimeout, + final String body) { +// System.out.println(body); + return httpClient + .sendAsync(newPostRequest(endpoint, requestTimeout, body), ofByteArray()) + .thenApply(parser); + } + + protected final CompletableFuture sendPostRequestNoWrap(final Function, R> parser, + final Duration requestTimeout, + final String body) { + return sendPostRequestNoWrap(endpoint, parser, requestTimeout, body); + } + + protected final CompletableFuture sendPostRequestNoWrap(final Function, R> parser, + final String body) { + // System.out.println(body); + return sendPostRequestNoWrap(parser, requestTimeout, body); + } + + protected final CompletableFuture sendPostRequestNoWrap(final URI endpoint, + final Function, R> parser, + final String body) { + return sendPostRequestNoWrap(endpoint, parser, requestTimeout, body); + } + + protected final CompletableFuture sendGetRequestNoWrap(final Function, R> parser, + final String path) { + return httpClient + .sendAsync(newGetRequest(path).build(), ofByteArray()) + .thenApply(parser); } - protected final CompletableFuture sendGetRequestWithStringResponse(final Function, R> parser, - final String path) { - return sendGetRequestWithStringResponse(parser, requestTimeout, path); + protected final CompletableFuture sendGetRequestNoWrap(final URI endpoint, + final Function, R> parser) { + return httpClient + .sendAsync(newGetRequest(endpoint).build(), ofByteArray()) + .thenApply(parser); } } diff --git a/rpc/src/main/java/software/sava/rpc/json/http/client/JsonRpcHttpClient.java b/rpc/src/main/java/software/sava/rpc/json/http/client/JsonRpcHttpClient.java index 4bba258..3dd8a75 100644 --- a/rpc/src/main/java/software/sava/rpc/json/http/client/JsonRpcHttpClient.java +++ b/rpc/src/main/java/software/sava/rpc/json/http/client/JsonRpcHttpClient.java @@ -11,13 +11,17 @@ import java.time.Duration; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Predicate; import static software.sava.rpc.json.http.client.JsonResponseController.throwUncheckedIOException; public abstract class JsonRpcHttpClient extends JsonHttpClient { - public JsonRpcHttpClient(final URI endpoint, final HttpClient httpClient, final Duration requestTimeout) { - super(endpoint, httpClient, requestTimeout); + public JsonRpcHttpClient(final URI endpoint, + final HttpClient httpClient, + final Duration requestTimeout, + final Predicate> applyResponse) { + super(endpoint, httpClient, requestTimeout, applyResponse); } static JsonIterator createJsonIterator(final HttpResponse httpResponse) { diff --git a/rpc/src/main/java/software/sava/rpc/json/http/client/SolanaJsonRpcClient.java b/rpc/src/main/java/software/sava/rpc/json/http/client/SolanaJsonRpcClient.java index fbfcb26..87fa253 100644 --- a/rpc/src/main/java/software/sava/rpc/json/http/client/SolanaJsonRpcClient.java +++ b/rpc/src/main/java/software/sava/rpc/json/http/client/SolanaJsonRpcClient.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import static java.lang.String.format; @@ -30,7 +31,7 @@ final class SolanaJsonRpcClient extends JsonRpcHttpClient implements SolanaRpcClient { static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(8); - private static final Duration PROGRAM_ACCOUNTS_TIMEOUT = Duration.ofSeconds(120); + static final Duration PROGRAM_ACCOUNTS_TIMEOUT = Duration.ofSeconds(120); private static final Function, LatestBlockHash> LATEST_BLOCK_HASH = applyResponseValue(LatestBlockHash::parse); private static final Function, Lamports> CONTEXT_LONG_VAL = applyResponseValue(Lamports::parse); @@ -91,14 +92,19 @@ final class SolanaJsonRpcClient extends JsonRpcHttpClient implements SolanaRpcCl private final AtomicLong id; private final Commitment defaultCommitment; + private final Function, String> sendTxResponseParser; + private final Function, LatestBlockHash> latestBlockhashResponseParser; SolanaJsonRpcClient(final URI endpoint, final HttpClient httpClient, final Duration requestTimeout, + final Predicate> applyResponse, final Commitment defaultCommitment) { - super(endpoint, httpClient, requestTimeout); + super(endpoint, httpClient, requestTimeout, applyResponse); this.id = new AtomicLong(System.currentTimeMillis()); this.defaultCommitment = defaultCommitment; + this.latestBlockhashResponseParser = wrapParser(LATEST_BLOCK_HASH); + this.sendTxResponseParser = wrapParser(SEND_TX_RESPONSE_PARSER); } @Override @@ -388,7 +394,7 @@ public CompletableFuture getLatestBlockHash() { @Override public CompletableFuture getLatestBlockHash(final Commitment commitment) { - return sendPostRequest(LATEST_BLOCK_HASH, format(""" + return sendPostRequestNoWrap(latestBlockhashResponseParser, format(""" {"jsonrpc":"2.0","id":%d,"method":"getLatestBlockhash","params":[{"commitment":"%s"}]}""", id.incrementAndGet(), commitment.getValue())); } @@ -478,20 +484,23 @@ public CompletableFuture>> getMultipleAccounts(final Com } @Override - public CompletableFuture>> getProgramAccounts(final PublicKey programId, + public CompletableFuture>> getProgramAccounts(final Duration requestTimeout, + final PublicKey programId, final BiFunction factory) { - return getProgramAccounts(programId, null, factory); + return getProgramAccounts(requestTimeout, programId, null, factory); } @Override - public CompletableFuture>> getProgramAccounts(final PublicKey programId, + public CompletableFuture>> getProgramAccounts(final Duration requestTimeout, + final PublicKey programId, final List filters, final BiFunction factory) { - return getProgramAccounts(programId, defaultCommitment, filters, factory); + return getProgramAccounts(requestTimeout, programId, defaultCommitment, filters, factory); } @Override - public CompletableFuture>> getProgramAccounts(final PublicKey programId, + public CompletableFuture>> getProgramAccounts(final Duration requestTimeout, + final PublicKey programId, final Commitment commitment, final List filters, final BiFunction factory) { @@ -499,7 +508,7 @@ public CompletableFuture>> getProgramAccounts(final Publ .map(Filter::toJson) .collect(Collectors.joining(",", ",\"filters\":[", "]")); return sendPostRequest(applyResponseValue((ji, context) -> AccountInfo.parseAccounts(ji, context, factory)), - PROGRAM_ACCOUNTS_TIMEOUT, + requestTimeout, format(""" {"jsonrpc":"2.0","id":%d,"method":"getProgramAccounts","params":["%s",{"commitment":"%s","withContext":true,"encoding":"base64"%s}]}""", id.incrementAndGet(), programId.toBase58(), commitment.getValue(), filtersJson)); @@ -848,14 +857,14 @@ public CompletableFuture sendTransaction(final String base64SignedTx, fi @Override public CompletableFuture sendTransaction(final Commitment preflightCommitment, final String base64SignedTx, final int maxRetries) { - return sendPostRequest(SEND_TX_RESPONSE_PARSER, format(""" + return sendPostRequestNoWrap(sendTxResponseParser, format(""" {"jsonrpc":"2.0","id":%d,"method":"sendTransaction","params":["%s",{"encoding":"base64","preflightCommitment":"%s","maxRetries":%d}]}""", id.incrementAndGet(), base64SignedTx, preflightCommitment.getValue(), maxRetries)); } @Override public CompletableFuture sendTransactionSkipPreflight(final Commitment preflightCommitment, final String base64SignedTx, final int maxRetries) { - return sendPostRequest(SEND_TX_RESPONSE_PARSER, format(""" + return sendPostRequestNoWrap(sendTxResponseParser, format(""" {"jsonrpc":"2.0","id":%d,"method":"sendTransaction","params":["%s",{"encoding":"base64","skipPreflight":true,"preflightCommitment":"%s","maxRetries":%d}]}""", id.incrementAndGet(), base64SignedTx, preflightCommitment.getValue(), maxRetries)); } diff --git a/rpc/src/main/java/software/sava/rpc/json/http/client/SolanaRpcClient.java b/rpc/src/main/java/software/sava/rpc/json/http/client/SolanaRpcClient.java index 2390fb6..2a75a37 100644 --- a/rpc/src/main/java/software/sava/rpc/json/http/client/SolanaRpcClient.java +++ b/rpc/src/main/java/software/sava/rpc/json/http/client/SolanaRpcClient.java @@ -5,13 +5,13 @@ import software.sava.core.accounts.token.TokenAccount; import software.sava.core.rpc.Filter; import software.sava.core.tx.Transaction; -import software.sava.rpc.json.http.SolanaNetwork; import software.sava.rpc.json.http.request.Commitment; import software.sava.rpc.json.http.request.ContextBoolVal; import software.sava.rpc.json.http.response.*; import java.net.URI; import java.net.http.HttpClient; +import java.net.http.HttpResponse; import java.time.Duration; import java.time.Instant; import java.util.Collection; @@ -19,8 +19,10 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.function.BiFunction; +import java.util.function.Predicate; import static software.sava.rpc.json.http.client.SolanaJsonRpcClient.DEFAULT_REQUEST_TIMEOUT; +import static software.sava.rpc.json.http.client.SolanaJsonRpcClient.PROGRAM_ACCOUNTS_TIMEOUT; import static software.sava.rpc.json.http.request.Commitment.CONFIRMED; import static software.sava.rpc.json.http.request.Commitment.PROCESSED; import static software.sava.rpc.json.http.response.AccountInfo.BYTES_IDENTITY; @@ -29,31 +31,29 @@ public interface SolanaRpcClient { int MAX_MULTIPLE_ACCOUNTS = 100; - static SolanaRpcClient createHttpClient(final URI endpoint, - final HttpClient httpClient, - final Duration requestTimeout) { - return new SolanaJsonRpcClient(endpoint, httpClient, requestTimeout, CONFIRMED); + static SolanaRpcClient createClient(final URI endpoint, + final HttpClient httpClient, + final Duration requestTimeout, + final Predicate> applyResponse, + final Commitment defaultCommitment) { + return new SolanaJsonRpcClient(endpoint, httpClient, requestTimeout, applyResponse, defaultCommitment); } - static SolanaRpcClient createHttpClient(final URI endpoint, - final HttpClient httpClient) { - return createHttpClient(endpoint, httpClient, DEFAULT_REQUEST_TIMEOUT); + static SolanaRpcClient createClient(final URI endpoint, + final HttpClient httpClient, + final Predicate> applyResponse) { + return createClient(endpoint, httpClient, DEFAULT_REQUEST_TIMEOUT, applyResponse, CONFIRMED); } - static SolanaRpcClient createHttpClient(final URI endpoint) { - return createHttpClient(endpoint, HttpClient.newHttpClient()); + static SolanaRpcClient createClient(final URI endpoint, + final HttpClient httpClient, + final Duration requestTimeout, + final Commitment defaultCommitment) { + return createClient(endpoint, httpClient, requestTimeout, null, defaultCommitment); } - static SolanaRpcClient createHttpClient(final SolanaNetwork network) { - return createHttpClient(network.getEndpoint(), HttpClient.newHttpClient()); - } - - static SolanaRpcClient createHttpClient(final SolanaNetwork network, final HttpClient httpClient) { - return createHttpClient(network.getEndpoint(), httpClient); - } - - static SolanaRpcClient createHttpClient(final String endpoint, final HttpClient httpClient) { - return createHttpClient(URI.create(endpoint), httpClient); + static SolanaRpcClient createClient(final URI endpoint, final HttpClient httpClient) { + return createClient(endpoint, httpClient, DEFAULT_REQUEST_TIMEOUT, null, CONFIRMED); } URI endpoint(); @@ -226,14 +226,35 @@ default CompletableFuture>> getProgramAccounts(final Pu return getProgramAccounts(programId, commitment, filters, BYTES_IDENTITY); } - CompletableFuture>> getProgramAccounts(final PublicKey programId, + default CompletableFuture>> getProgramAccounts(final PublicKey programId, + final BiFunction factory) { + return getProgramAccounts(PROGRAM_ACCOUNTS_TIMEOUT, programId, factory); + } + + default CompletableFuture>> getProgramAccounts(final PublicKey programId, + final List filters, + final BiFunction factory) { + return getProgramAccounts(PROGRAM_ACCOUNTS_TIMEOUT, programId, filters, factory); + } + + default CompletableFuture>> getProgramAccounts(final PublicKey programId, + final Commitment commitment, + final List filters, + final BiFunction factory) { + return getProgramAccounts(PROGRAM_ACCOUNTS_TIMEOUT, programId, commitment, filters, factory); + } + + CompletableFuture>> getProgramAccounts(final Duration requestTimeout, + final PublicKey programId, final BiFunction factory); - CompletableFuture>> getProgramAccounts(final PublicKey programId, + CompletableFuture>> getProgramAccounts(final Duration requestTimeout, + final PublicKey programId, final List filters, final BiFunction factory); - CompletableFuture>> getProgramAccounts(final PublicKey programId, + CompletableFuture>> getProgramAccounts(final Duration requestTimeout, + final PublicKey programId, final Commitment commitment, final List filters, final BiFunction factory); diff --git a/rpc/src/test/java/test/soljava/rpc/json/SignerTest.java b/rpc/src/test/java/test/software/sava/rpc/json/SignerTest.java similarity index 94% rename from rpc/src/test/java/test/soljava/rpc/json/SignerTest.java rename to rpc/src/test/java/test/software/sava/rpc/json/SignerTest.java index 7070f86..1b12f89 100644 --- a/rpc/src/test/java/test/soljava/rpc/json/SignerTest.java +++ b/rpc/src/test/java/test/software/sava/rpc/json/SignerTest.java @@ -1,4 +1,4 @@ -package test.soljava.rpc.json; +package test.software.sava.rpc.json; import org.junit.jupiter.api.Test; import software.sava.rpc.json.PrivateKeyEncoding; diff --git a/rpc/src/test/java/test/soljava/rpc/json/http/client/HttpClientTests.java b/rpc/src/test/java/test/software/sava/rpc/json/http/client/HttpClientTests.java similarity index 91% rename from rpc/src/test/java/test/soljava/rpc/json/http/client/HttpClientTests.java rename to rpc/src/test/java/test/software/sava/rpc/json/http/client/HttpClientTests.java index d4023dc..d032d48 100644 --- a/rpc/src/test/java/test/soljava/rpc/json/http/client/HttpClientTests.java +++ b/rpc/src/test/java/test/software/sava/rpc/json/http/client/HttpClientTests.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.InetSocketAddress; +import java.net.http.HttpClient; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -19,6 +20,10 @@ final class HttpClientTests { private static final ExecutorService HTTP_EXECUTOR = Executors.newVirtualThreadPerTaskExecutor(); + static HttpClient createClient() { + return HttpClient.newBuilder().executor(HTTP_EXECUTOR).build(); + } + static HttpServerRecord createServer() { try { final var httpServer = HttpServer.create(new InetSocketAddress(0), 0); diff --git a/rpc/src/test/java/test/soljava/rpc/json/http/client/HttpServerRecord.java b/rpc/src/test/java/test/software/sava/rpc/json/http/client/HttpServerRecord.java similarity index 100% rename from rpc/src/test/java/test/soljava/rpc/json/http/client/HttpServerRecord.java rename to rpc/src/test/java/test/software/sava/rpc/json/http/client/HttpServerRecord.java diff --git a/rpc/src/test/java/test/soljava/rpc/json/http/client/SolanaRpcTests.java b/rpc/src/test/java/test/software/sava/rpc/json/http/client/SolanaRpcTests.java similarity index 64% rename from rpc/src/test/java/test/soljava/rpc/json/http/client/SolanaRpcTests.java rename to rpc/src/test/java/test/software/sava/rpc/json/http/client/SolanaRpcTests.java index d0b0e36..4376599 100644 --- a/rpc/src/test/java/test/soljava/rpc/json/http/client/SolanaRpcTests.java +++ b/rpc/src/test/java/test/software/sava/rpc/json/http/client/SolanaRpcTests.java @@ -5,9 +5,11 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import software.sava.rpc.json.http.client.SolanaRpcClient; +import software.sava.rpc.json.http.response.NodeHealth; import systems.comodal.jsoniter.JsonIterator; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static test.software.sava.rpc.json.http.client.HttpClientTests.createServer; import static test.software.sava.rpc.json.http.client.HttpClientTests.writeResponse; @@ -34,19 +36,40 @@ static void setupHttpServer() { }); HTTP_SERVER = httpServerRecord.httpServer(); - RPC_CLIENT = SolanaRpcClient.createHttpClient(httpServerRecord.endpoint()); + final var httpClient = HttpClientTests.createClient(); + RPC_CLIENT = SolanaRpcClient.createClient(httpServerRecord.endpoint(), httpClient); } @AfterAll - static void shutdownServer() { + static void shutdown() { + RPC_CLIENT.httpClient().close(); HTTP_SERVER.stop(0); } - @Test - void testNodeHealth() { - final var nodeHealth = RPC_CLIENT.getHealth().join(); + private void validateNodeHealth(final NodeHealth nodeHealth) { assertEquals(-32005, nodeHealth.code()); assertEquals(0, nodeHealth.numSlotsBehind()); assertEquals("Node is unhealthy", nodeHealth.message()); } + + @Test + void testNodeHealth() { + final var nodeHealth = RPC_CLIENT.getHealth().join(); + validateNodeHealth(nodeHealth); + } + + @Test + void testPeekResponse() { + var rpcClient = SolanaRpcClient.createClient(RPC_CLIENT.endpoint(), RPC_CLIENT.httpClient(), + response -> { + assertEquals(200, response.statusCode()); + return false; + }); + var nodeHealth = rpcClient.getHealth().join(); + assertNull(nodeHealth); + + rpcClient = SolanaRpcClient.createClient(RPC_CLIENT.endpoint(), RPC_CLIENT.httpClient(), _ -> true); + nodeHealth = rpcClient.getHealth().join(); + validateNodeHealth(nodeHealth); + } }