Skip to content

Commit

Permalink
progress on lookup table service.
Browse files Browse the repository at this point in the history
  • Loading branch information
jpe7s committed Sep 20, 2024
1 parent de1d84f commit 3b81abe
Show file tree
Hide file tree
Showing 8 changed files with 350 additions and 7 deletions.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.1-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencyResolutionManagement {
}
versionCatalogs {
libs {
from("software.sava:solana-version-catalog:0.1.3")
from("software.sava:solana-version-catalog:0.1.14")
}
}
}
8 changes: 3 additions & 5 deletions solana/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
module software.sava.solana_services {
requires systems.comodal.json_iterator;

requires org.bouncycastle.provider;

requires software.sava.core;
requires software.sava.rpc;
requires software.sava.core_services;
requires software.sava.solana_programs;
requires software.sava.rpc;
requires software.sava.core;

exports software.sava.services.solana.accounts.lookup;
exports software.sava.services.solana.remote.call;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package software.sava.services.solana.accounts.lookup;

import software.sava.core.accounts.lookup.AddressLookupTable;

record IndexedTable(int index, AddressLookupTable table) {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package software.sava.services.solana.accounts.lookup;

import software.sava.core.accounts.lookup.AddressLookupTable;
import software.sava.rpc.json.http.response.AccountInfo;
import software.sava.services.core.remote.call.Call;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;

class LookupTableCallHandler implements Function<List<AccountInfo<AddressLookupTable>>, IndexedTable[]> {

private static final Function<AccountInfo<AddressLookupTable>, AddressLookupTable> GET_TABLE = AccountInfo::data;
private static final Predicate<AddressLookupTable> IS_ACTIVE = AddressLookupTable::isActive;
private static final Comparator<AddressLookupTable> BY_NUM_TABLES = (a, b) -> Integer.compare(b.numAccounts(), a.numAccounts());
private static final IntFunction<AddressLookupTable[]> ARRAY_GENERATOR = AddressLookupTable[]::new;

protected final ExecutorService executorService;
protected final Call<List<AccountInfo<AddressLookupTable>>> call;
protected final Predicate<AddressLookupTable> filter;

LookupTableCallHandler(final ExecutorService executorService,
final Call<List<AccountInfo<AddressLookupTable>>> call,
final Predicate<AddressLookupTable> minAccountsFilter) {
this.executorService = executorService;
this.call = call;
this.filter = IS_ACTIVE.and(minAccountsFilter);
}

@Override
public IndexedTable[] apply(final List<AccountInfo<AddressLookupTable>> accountInfos) {
final var filteredAndSorted = accountInfos.stream()
.map(GET_TABLE)
.filter(filter)
.sorted(BY_NUM_TABLES)
.toArray(ARRAY_GENERATOR);
final var indexed = new IndexedTable[filteredAndSorted.length];
for (int i = 0; i < indexed.length; ++i) {
indexed[i] = new IndexedTable(i, filteredAndSorted[i].withReverseLookup());
}
return indexed;
}


CompletableFuture<IndexedTable[]> callAndApply() {
return call.async(executorService).thenApply(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package software.sava.services.solana.accounts.lookup;

import software.sava.core.accounts.PublicKey;
import software.sava.core.accounts.lookup.AddressLookupTable;
import software.sava.core.accounts.meta.AccountMeta;
import software.sava.core.accounts.sysvar.Clock;
import software.sava.core.encoding.ByteUtil;
import software.sava.core.rpc.Filter;
import software.sava.core.tx.Instruction;
import software.sava.core.tx.Transaction;
import software.sava.rpc.json.http.client.SolanaRpcClient;
import software.sava.services.core.remote.call.BalancedErrorHandler;
import software.sava.services.core.remote.call.Call;
import software.sava.services.core.remote.load_balance.LoadBalancer;
import software.sava.services.core.request_capacity.context.CallContext;
import software.sava.solana.programs.clients.NativeProgramClient;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.IntStream;

import static java.lang.System.Logger.Level.INFO;
import static java.util.concurrent.TimeUnit.MINUTES;
import static software.sava.core.accounts.lookup.AddressLookupTable.AUTHORITY_OPTION_OFFSET;
import static software.sava.core.accounts.lookup.AddressLookupTable.DEACTIVATION_SLOT_OFFSET;

public final class LookupTableDiscoveryService implements Runnable {

private static final System.Logger logger = System.getLogger(LookupTableDiscoveryService.class.getName());

private static final int MAX_ACCOUNTS_PER_TX = 64;

private static final BiFunction<PublicKey, byte[], AddressLookupTable> WITHOUT_REVERSE_LOOKUP_FACTORY = AddressLookupTable::readWithoutReverseLookup;

private static final int NUM_PARTITIONS = 257;
private static final Filter ACTIVE_FILTER;
private static final Filter NO_AUTHORITY_FILTER = Filter.createMemCompFilter(AUTHORITY_OPTION_OFFSET, new byte[]{0});
private static final Filter[] PARTITION_FILTERS;

static {
final byte[] stillActive = new byte[Long.BYTES];
ByteUtil.putInt64LE(stillActive, 0, Clock.MAX_SLOT);
ACTIVE_FILTER = Filter.createMemCompFilter(DEACTIVATION_SLOT_OFFSET, stillActive);

final var partitionFilters = new Filter[NUM_PARTITIONS];
final byte[] partition = new byte[]{1, 0};
for (int i = 0; i < NUM_PARTITIONS; ++i) {
partition[1] = (byte) i;
partitionFilters[i] = Filter.createMemCompFilter(AUTHORITY_OPTION_OFFSET, partition);
}
PARTITION_FILTERS = partitionFilters;
}

private final CompletableFuture<Integer> initialized;
private final PublicKey altProgram;
private final int maxConcurrentRequests;
private final AtomicReferenceArray<IndexedTable[]> partitions;
private final PartitionedLookupTableCallHandler[] partitionedCallHandlers;

public LookupTableDiscoveryService(final ExecutorService executorService,
final LoadBalancer<SolanaRpcClient> rpcClients,
final BalancedErrorHandler<SolanaRpcClient> balancedErrorHandler,
final NativeProgramClient nativeProgramClient,
final int maxConcurrentRequests,
final int minAccountsPerTable) {
this.initialized = new CompletableFuture<>();
this.altProgram = nativeProgramClient.accounts().addressLookupTableProgram();
this.maxConcurrentRequests = maxConcurrentRequests;
this.partitions = new AtomicReferenceArray<>(NUM_PARTITIONS);
final Predicate<AddressLookupTable> minAccountsFilter = alt -> alt.numAccounts() > minAccountsPerTable;
final var noAuthorityCall = Call.createCall(
rpcClients, rpcClient -> rpcClient.getProgramAccounts(
altProgram,
List.of(
ACTIVE_FILTER,
NO_AUTHORITY_FILTER
),
WITHOUT_REVERSE_LOOKUP_FACTORY
),
CallContext.DEFAULT_CALL_CONTEXT,
1, Integer.MAX_VALUE, false,
balancedErrorHandler,
"rpcClient::getProgramAccounts"
);
this.partitionedCallHandlers = new PartitionedLookupTableCallHandler[NUM_PARTITIONS];
this.partitionedCallHandlers[0] = new PartitionedLookupTableCallHandler(
executorService,
noAuthorityCall,
minAccountsFilter,
0,
partitions
);
for (int i = 1; i < NUM_PARTITIONS; ++i) {
final var partitionFilter = PARTITION_FILTERS[i];
final var call = Call.createCall(
rpcClients, rpcClient -> rpcClient.getProgramAccounts(
altProgram,
List.of(
ACTIVE_FILTER,
partitionFilter
),
WITHOUT_REVERSE_LOOKUP_FACTORY
),
CallContext.DEFAULT_CALL_CONTEXT,
1, Integer.MAX_VALUE, false,
balancedErrorHandler,
"rpcClient::getProgramAccounts"
);
this.partitionedCallHandlers[i] = new PartitionedLookupTableCallHandler(
executorService,
call,
minAccountsFilter,
i,
partitions
);
}
}


private static ScoredTable[] rankTables(final IndexedTable[] partition,
final Set<PublicKey> accounts,
final int limit) {
final int[] scores = new int[partition.length];
IndexedTable table;
for (int i = 0, score; i < scores.length; ++i) {
table = partition[i];
score = 0;
for (final var pubKey : accounts) {
if (table.table().containKey(pubKey)) {
++score;
}
}
scores[i] = score;
}
return Arrays.stream(partition)
.filter(t -> scores[t.index()] > 1)
.sorted((a, b) -> Integer.compare(scores[b.index()], scores[a.index()]))
.limit(limit)
.map(t -> new ScoredTable(scores[t.index()], t.table()))
.toArray(ScoredTable[]::new);
}

public List<AddressLookupTable> findOptimalSetOfTables(final Set<PublicKey> distinctAccounts) {
final var scoredTables = IntStream.range(0, NUM_PARTITIONS).parallel()
.mapToObj(i -> rankTables(partitions.get(i), distinctAccounts, 10))
.flatMap(Arrays::stream)
.sorted()
.toArray(ScoredTable[]::new);

final int numAccounts = distinctAccounts.size();
final var tables = new ArrayList<AddressLookupTable>(MAX_ACCOUNTS_PER_TX >> 1);
int numFound = 0;

for (final var scoredTable : scoredTables) {
final var table = scoredTable.table();
final var iterator = distinctAccounts.iterator();
do {
if (table.containKey(iterator.next())) {
iterator.remove();
if (++numFound == numAccounts) {
return tables;
}
}
} while (iterator.hasNext());
}
return tables;
}

public List<AddressLookupTable> findOptimalSetOfTables(final Transaction transaction) {
final var distinctAccounts = HashSet.<PublicKey>newHashSet(MAX_ACCOUNTS_PER_TX);
transaction.instructions()
.stream()
.map(Instruction::accounts)
.flatMap(List::stream)
.map(AccountMeta::publicKey)
.forEach(distinctAccounts::add);
return findOptimalSetOfTables(distinctAccounts);
}

@SuppressWarnings("unchecked")
public void run() {
try {
final CompletableFuture<IndexedTable[]>[] concurrentFutures = new CompletableFuture[maxConcurrentRequests];
int numTables = 0;
for (int nt; ; ) {
for (int i = 0, c = 0; i < NUM_PARTITIONS; ) {
for (; c < maxConcurrentRequests && i < NUM_PARTITIONS; ++c, ++i) {
concurrentFutures[c] = partitionedCallHandlers[i].callAndApply();
}
for (final var future : concurrentFutures) {
if (future == null) {
break;
}
final var tables = future.join();
nt = tables.length;
numTables += nt;
final var stats = Arrays.stream(tables)
.map(IndexedTable::table)
.mapToInt(AddressLookupTable::numAccounts)
.summaryStatistics();
logger.log(INFO, String.format("""
[partition=%d] [numTables=%s]
%s
""", i, nt, stats));
}
}
initialized.complete(numTables);
try {
MINUTES.sleep(60);
} catch (final InterruptedException e) {
return;
}
}
} catch (final RuntimeException ex) {
initialized.completeExceptionally(ex);
throw ex;
}
}

public static void main(final String[] args) {
try (final var executorService = Executors.newVirtualThreadPerTaskExecutor()) {
final var tableService = new LookupTableDiscoveryService(
executorService,
null,
null,
null,
4,
3
);
executorService.execute(tableService);
tableService.initialized.join();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package software.sava.services.solana.accounts.lookup;

import software.sava.core.accounts.lookup.AddressLookupTable;
import software.sava.rpc.json.http.response.AccountInfo;
import software.sava.services.core.remote.call.Call;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Predicate;

final class PartitionedLookupTableCallHandler extends LookupTableCallHandler {

private final int partition;
private final AtomicReferenceArray<IndexedTable[]> partitions;

PartitionedLookupTableCallHandler(final ExecutorService executorService,
final Call<List<AccountInfo<AddressLookupTable>>> call,
final Predicate<AddressLookupTable> minAccountsFilter,
final int partition,
final AtomicReferenceArray<IndexedTable[]> partitions) {
super(executorService, call, minAccountsFilter);
this.partition = partition;
this.partitions = partitions;
}


@Override
public IndexedTable[] apply(final List<AccountInfo<AddressLookupTable>> accountInfos) {
final var tables = super.apply(accountInfos);
partitions.set(partition, tables);
return tables;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package software.sava.services.solana.accounts.lookup;

import software.sava.core.accounts.lookup.AddressLookupTable;

import java.util.Comparator;

record ScoredTable(int score, AddressLookupTable table) implements Comparator<ScoredTable> {

@Override
public int compare(final ScoredTable o1, final ScoredTable o2) {
return Integer.compare(o2.score, o1.score);
}
}

0 comments on commit 3b81abe

Please sign in to comment.