Skip to content

Commit

Permalink
improve concurrent fetching efficiency.
Browse files Browse the repository at this point in the history
  • Loading branch information
jpe7s committed Sep 20, 2024
1 parent 3b81abe commit d36ed01
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 41 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ hs_err_pid*
out/
benchmark-results/
gradle.properties
ignore
ignore
configs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package software.sava.services.core.request_capacity;

import systems.comodal.jsoniter.FieldBufferPredicate;
import systems.comodal.jsoniter.JsonIterator;
import systems.comodal.jsoniter.ValueType;

import java.net.URI;
import java.net.http.HttpResponse;
import java.util.Objects;

import static systems.comodal.jsoniter.JsonIterator.fieldEquals;

public record UriCapacityConfig(URI endpoint, CapacityConfig capacityConfig) {

public static UriCapacityConfig parseConfig(final JsonIterator ji) {
if (ji.whatIsNext() == ValueType.STRING) {
final var endpoint = ji.readString();
return new UriCapacityConfig(URI.create(endpoint), null);
} else {
final var parser = new UriCapacityConfig.Builder();
ji.testObject(parser);
return parser.create();
}
}

public ErrorTrackedCapacityMonitor<HttpResponse<byte[]>> createMonitor(final String serviceName,
final CapacityConfig defaultConfig) {
return Objects.requireNonNullElse(capacityConfig, defaultConfig).createHttpResponseMonitor(serviceName);
}

private static final class Builder implements FieldBufferPredicate {

private URI endpoint;
private CapacityConfig capacityConfig;

private Builder() {
}

private UriCapacityConfig create() {
return new UriCapacityConfig(endpoint, capacityConfig);
}

@Override
public boolean test(final char[] buf, final int offset, final int len, final JsonIterator ji) {
if (fieldEquals("url", buf, offset, len)) {
final var endpoint = ji.readString();
if (endpoint != null && !endpoint.isBlank()) {
this.endpoint = URI.create(endpoint);
}
} else if (fieldEquals("capacity", buf, offset, len)) {
capacityConfig = CapacityConfig.parse(ji);
} else {
ji.skip();
}
return true;
}
}
}
2 changes: 2 additions & 0 deletions solana/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
requires software.sava.solana_programs;
requires software.sava.rpc;
requires software.sava.core;
requires java.net.http;
requires systems.comodal.json_iterator;

exports software.sava.services.solana.accounts.lookup;
exports software.sava.services.solana.remote.call;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,38 @@
import software.sava.rpc.json.http.client.SolanaRpcClient;
import software.sava.services.core.remote.call.BalancedErrorHandler;
import software.sava.services.core.remote.call.Call;
import software.sava.services.core.remote.load_balance.BalancedItem;
import software.sava.services.core.remote.load_balance.LoadBalancer;
import software.sava.services.core.request_capacity.UriCapacityConfig;
import software.sava.services.core.request_capacity.context.CallContext;
import software.sava.solana.programs.clients.NativeProgramClient;
import systems.comodal.jsoniter.JsonIterator;

import java.io.IOException;
import java.net.http.HttpClient;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.IntStream;

import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.INFO;
import static java.util.concurrent.TimeUnit.MINUTES;
import static software.sava.core.accounts.lookup.AddressLookupTable.AUTHORITY_OPTION_OFFSET;
import static software.sava.core.accounts.lookup.AddressLookupTable.DEACTIVATION_SLOT_OFFSET;
import static software.sava.services.core.remote.call.ErrorHandler.linearBackoff;
import static software.sava.services.solana.remote.call.RemoteCallUtil.createRpcClientErrorHandler;

@SuppressWarnings("ALL")
public final class LookupTableDiscoveryService implements Runnable {

private static final System.Logger logger = System.getLogger(LookupTableDiscoveryService.class.getName());
Expand Down Expand Up @@ -182,58 +196,99 @@ public List<AddressLookupTable> findOptimalSetOfTables(final Transaction transac
return findOptimalSetOfTables(distinctAccounts);
}

private static record Worker(AtomicInteger nextPartition,
CountDownLatch latch,
PartitionedLookupTableCallHandler[] partitionedCallHandlers) implements Runnable {

@Override
public void run() {
for (; ; ) {
final int partition = nextPartition.incrementAndGet();
if (partition >= NUM_PARTITIONS) {
return;
}
try {
final var tables = partitionedCallHandlers[partition].callAndApply().join();
final var stats = Arrays.stream(tables)
.map(IndexedTable::table)
.mapToInt(AddressLookupTable::numAccounts)
.summaryStatistics();
logger.log(INFO, String.format("""
[partition=%d] [numTables=%s] [averageNumAccounts=%f.1]
""", partition, tables.length, stats.getAverage()));
latch.countDown();
} catch (final RuntimeException ex) {
logger.log(ERROR, "Failed to get lookup tables for partition " + partition, ex);
throw ex;
}
}
}
}

@SuppressWarnings("unchecked")
public void run() {
try {
final CompletableFuture<IndexedTable[]>[] concurrentFutures = new CompletableFuture[maxConcurrentRequests];
int numTables = 0;
for (int nt; ; ) {
for (int i = 0, c = 0; i < NUM_PARTITIONS; ) {
for (; c < maxConcurrentRequests && i < NUM_PARTITIONS; ++c, ++i) {
concurrentFutures[c] = partitionedCallHandlers[i].callAndApply();
}
for (final var future : concurrentFutures) {
if (future == null) {
break;
}
final var tables = future.join();
nt = tables.length;
numTables += nt;
final var stats = Arrays.stream(tables)
.map(IndexedTable::table)
.mapToInt(AddressLookupTable::numAccounts)
.summaryStatistics();
logger.log(INFO, String.format("""
[partition=%d] [numTables=%s]
%s
""", i, nt, stats));
}
try (final var executor = Executors.newFixedThreadPool(maxConcurrentRequests)) {
final var nextPartition = new AtomicInteger(0);
final var latch = new CountDownLatch(NUM_PARTITIONS);
final var workers = IntStream.range(0, maxConcurrentRequests)
.mapToObj(i -> new Worker(nextPartition, latch, partitionedCallHandlers))
.toArray(Worker[]::new);

for (long start; ; ) {
for (final var worker : workers) {
executor.execute(worker);
}
start = System.currentTimeMillis();
latch.await();
final var duration = Duration.ofMillis(System.currentTimeMillis() - start);

final int numTables = IntStream.range(0, NUM_PARTITIONS)
.mapToObj(partitions::get)
.flatMap(Arrays::stream)
.map(IndexedTable::table)
.mapToInt(AddressLookupTable::numAccounts)
.sum();

initialized.complete(numTables);
try {
MINUTES.sleep(60);
} catch (final InterruptedException e) {
return;
}
logger.log(INFO, String.format("""
%s to fetch all %d tables.""", duration, numTables
));

nextPartition.set(0);
MINUTES.sleep(60);
}

} catch (final InterruptedException e) {
// return;
} catch (final RuntimeException ex) {
initialized.completeExceptionally(ex);
throw ex;
}
}

public static void main(final String[] args) {
public static void main(final String[] args) throws IOException {
try (final var executorService = Executors.newVirtualThreadPerTaskExecutor()) {
final var tableService = new LookupTableDiscoveryService(
executorService,
null,
null,
null,
4,
3
);
executorService.execute(tableService);
tableService.initialized.join();
try (final var httpClient = HttpClient.newHttpClient()) {
final var configFile = Path.of("").toAbsolutePath().resolve(System.getProperty("software.sava.services.solana.rpcConfigFile"));
final UriCapacityConfig rpcConfig = UriCapacityConfig.parseConfig(JsonIterator.parse(Files.readAllBytes(configFile)));
final var endpoint = rpcConfig.endpoint();
final var monitor = rpcConfig.createMonitor(endpoint.getHost(), null);
final var rpcClient = SolanaRpcClient.createClient(rpcConfig.endpoint(), httpClient, monitor.errorTracker());
final var defaultErrorHandler = createRpcClientErrorHandler(
linearBackoff(1, 21)
);
final var nativeProgramClient = NativeProgramClient.createClient();
final var tableService = new LookupTableDiscoveryService(
executorService,
LoadBalancer.createBalancer(BalancedItem.createItem(rpcClient, monitor), defaultErrorHandler),
defaultErrorHandler,
nativeProgramClient,
4,
3
);
executorService.execute(tableService);
tableService.initialized.join();
}
}
}
}

0 comments on commit d36ed01

Please sign in to comment.