diff --git a/.gitignore b/.gitignore index 9d074de..2881a90 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,5 @@ hs_err_pid* out/ benchmark-results/ gradle.properties -ignore \ No newline at end of file +ignore +configs \ No newline at end of file diff --git a/core/src/main/java/software/sava/services/core/request_capacity/UriCapacityConfig.java b/core/src/main/java/software/sava/services/core/request_capacity/UriCapacityConfig.java new file mode 100644 index 0000000..d820a9d --- /dev/null +++ b/core/src/main/java/software/sava/services/core/request_capacity/UriCapacityConfig.java @@ -0,0 +1,58 @@ +package software.sava.services.core.request_capacity; + +import systems.comodal.jsoniter.FieldBufferPredicate; +import systems.comodal.jsoniter.JsonIterator; +import systems.comodal.jsoniter.ValueType; + +import java.net.URI; +import java.net.http.HttpResponse; +import java.util.Objects; + +import static systems.comodal.jsoniter.JsonIterator.fieldEquals; + +public record UriCapacityConfig(URI endpoint, CapacityConfig capacityConfig) { + + public static UriCapacityConfig parseConfig(final JsonIterator ji) { + if (ji.whatIsNext() == ValueType.STRING) { + final var endpoint = ji.readString(); + return new UriCapacityConfig(URI.create(endpoint), null); + } else { + final var parser = new UriCapacityConfig.Builder(); + ji.testObject(parser); + return parser.create(); + } + } + + public ErrorTrackedCapacityMonitor> createMonitor(final String serviceName, + final CapacityConfig defaultConfig) { + return Objects.requireNonNullElse(capacityConfig, defaultConfig).createHttpResponseMonitor(serviceName); + } + + private static final class Builder implements FieldBufferPredicate { + + private URI endpoint; + private CapacityConfig capacityConfig; + + private Builder() { + } + + private UriCapacityConfig create() { + return new UriCapacityConfig(endpoint, capacityConfig); + } + + @Override + public boolean test(final char[] buf, final int offset, final int len, final JsonIterator ji) { + if (fieldEquals("url", buf, offset, len)) { + final var endpoint = ji.readString(); + if (endpoint != null && !endpoint.isBlank()) { + this.endpoint = URI.create(endpoint); + } + } else if (fieldEquals("capacity", buf, offset, len)) { + capacityConfig = CapacityConfig.parse(ji); + } else { + ji.skip(); + } + return true; + } + } +} diff --git a/solana/src/main/java/module-info.java b/solana/src/main/java/module-info.java index f6d5281..19e6706 100644 --- a/solana/src/main/java/module-info.java +++ b/solana/src/main/java/module-info.java @@ -4,6 +4,8 @@ requires software.sava.solana_programs; requires software.sava.rpc; requires software.sava.core; + requires java.net.http; + requires systems.comodal.json_iterator; exports software.sava.services.solana.accounts.lookup; exports software.sava.services.solana.remote.call; diff --git a/solana/src/main/java/software/sava/services/solana/accounts/lookup/LookupTableDiscoveryService.java b/solana/src/main/java/software/sava/services/solana/accounts/lookup/LookupTableDiscoveryService.java index 9dad457..28f2755 100644 --- a/solana/src/main/java/software/sava/services/solana/accounts/lookup/LookupTableDiscoveryService.java +++ b/solana/src/main/java/software/sava/services/solana/accounts/lookup/LookupTableDiscoveryService.java @@ -11,24 +11,38 @@ import software.sava.rpc.json.http.client.SolanaRpcClient; import software.sava.services.core.remote.call.BalancedErrorHandler; import software.sava.services.core.remote.call.Call; +import software.sava.services.core.remote.load_balance.BalancedItem; import software.sava.services.core.remote.load_balance.LoadBalancer; +import software.sava.services.core.request_capacity.UriCapacityConfig; import software.sava.services.core.request_capacity.context.CallContext; import software.sava.solana.programs.clients.NativeProgramClient; +import systems.comodal.jsoniter.JsonIterator; +import java.io.IOException; +import java.net.http.HttpClient; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.function.BiFunction; import java.util.function.Predicate; import java.util.stream.IntStream; +import static java.lang.System.Logger.Level.ERROR; import static java.lang.System.Logger.Level.INFO; import static java.util.concurrent.TimeUnit.MINUTES; import static software.sava.core.accounts.lookup.AddressLookupTable.AUTHORITY_OPTION_OFFSET; import static software.sava.core.accounts.lookup.AddressLookupTable.DEACTIVATION_SLOT_OFFSET; +import static software.sava.services.core.remote.call.ErrorHandler.linearBackoff; +import static software.sava.services.solana.remote.call.RemoteCallUtil.createRpcClientErrorHandler; +@SuppressWarnings("ALL") public final class LookupTableDiscoveryService implements Runnable { private static final System.Logger logger = System.getLogger(LookupTableDiscoveryService.class.getName()); @@ -182,58 +196,99 @@ public List findOptimalSetOfTables(final Transaction transac return findOptimalSetOfTables(distinctAccounts); } + private static record Worker(AtomicInteger nextPartition, + CountDownLatch latch, + PartitionedLookupTableCallHandler[] partitionedCallHandlers) implements Runnable { + + @Override + public void run() { + for (; ; ) { + final int partition = nextPartition.incrementAndGet(); + if (partition >= NUM_PARTITIONS) { + return; + } + try { + final var tables = partitionedCallHandlers[partition].callAndApply().join(); + final var stats = Arrays.stream(tables) + .map(IndexedTable::table) + .mapToInt(AddressLookupTable::numAccounts) + .summaryStatistics(); + logger.log(INFO, String.format(""" + [partition=%d] [numTables=%s] [averageNumAccounts=%f.1] + """, partition, tables.length, stats.getAverage())); + latch.countDown(); + } catch (final RuntimeException ex) { + logger.log(ERROR, "Failed to get lookup tables for partition " + partition, ex); + throw ex; + } + } + } + } + @SuppressWarnings("unchecked") public void run() { - try { - final CompletableFuture[] concurrentFutures = new CompletableFuture[maxConcurrentRequests]; - int numTables = 0; - for (int nt; ; ) { - for (int i = 0, c = 0; i < NUM_PARTITIONS; ) { - for (; c < maxConcurrentRequests && i < NUM_PARTITIONS; ++c, ++i) { - concurrentFutures[c] = partitionedCallHandlers[i].callAndApply(); - } - for (final var future : concurrentFutures) { - if (future == null) { - break; - } - final var tables = future.join(); - nt = tables.length; - numTables += nt; - final var stats = Arrays.stream(tables) - .map(IndexedTable::table) - .mapToInt(AddressLookupTable::numAccounts) - .summaryStatistics(); - logger.log(INFO, String.format(""" - [partition=%d] [numTables=%s] - %s - """, i, nt, stats)); - } + try (final var executor = Executors.newFixedThreadPool(maxConcurrentRequests)) { + final var nextPartition = new AtomicInteger(0); + final var latch = new CountDownLatch(NUM_PARTITIONS); + final var workers = IntStream.range(0, maxConcurrentRequests) + .mapToObj(i -> new Worker(nextPartition, latch, partitionedCallHandlers)) + .toArray(Worker[]::new); + + for (long start; ; ) { + for (final var worker : workers) { + executor.execute(worker); } + start = System.currentTimeMillis(); + latch.await(); + final var duration = Duration.ofMillis(System.currentTimeMillis() - start); + + final int numTables = IntStream.range(0, NUM_PARTITIONS) + .mapToObj(partitions::get) + .flatMap(Arrays::stream) + .map(IndexedTable::table) + .mapToInt(AddressLookupTable::numAccounts) + .sum(); + initialized.complete(numTables); - try { - MINUTES.sleep(60); - } catch (final InterruptedException e) { - return; - } + logger.log(INFO, String.format(""" + %s to fetch all %d tables.""", duration, numTables + )); + + nextPartition.set(0); + MINUTES.sleep(60); } + + } catch (final InterruptedException e) { + // return; } catch (final RuntimeException ex) { initialized.completeExceptionally(ex); throw ex; } } - public static void main(final String[] args) { + public static void main(final String[] args) throws IOException { try (final var executorService = Executors.newVirtualThreadPerTaskExecutor()) { - final var tableService = new LookupTableDiscoveryService( - executorService, - null, - null, - null, - 4, - 3 - ); - executorService.execute(tableService); - tableService.initialized.join(); + try (final var httpClient = HttpClient.newHttpClient()) { + final var configFile = Path.of("").toAbsolutePath().resolve(System.getProperty("software.sava.services.solana.rpcConfigFile")); + final UriCapacityConfig rpcConfig = UriCapacityConfig.parseConfig(JsonIterator.parse(Files.readAllBytes(configFile))); + final var endpoint = rpcConfig.endpoint(); + final var monitor = rpcConfig.createMonitor(endpoint.getHost(), null); + final var rpcClient = SolanaRpcClient.createClient(rpcConfig.endpoint(), httpClient, monitor.errorTracker()); + final var defaultErrorHandler = createRpcClientErrorHandler( + linearBackoff(1, 21) + ); + final var nativeProgramClient = NativeProgramClient.createClient(); + final var tableService = new LookupTableDiscoveryService( + executorService, + LoadBalancer.createBalancer(BalancedItem.createItem(rpcClient, monitor), defaultErrorHandler), + defaultErrorHandler, + nativeProgramClient, + 4, + 3 + ); + executorService.execute(tableService); + tableService.initialized.join(); + } } } }