Skip to content

Commit

Permalink
STREAM-1500: throttle tx sending (#167)
Browse files Browse the repository at this point in the history
* STREAM-1500: throttle tx sending
  • Loading branch information
Yolley authored Apr 22, 2024
1 parent dd90162 commit 254c8af
Show file tree
Hide file tree
Showing 15 changed files with 327 additions and 146 deletions.
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"packages": [
"packages/*"
],
"version": "6.3.1",
"version": "6.3.2",
"$schema": "node_modules/lerna/schemas/lerna-schema.json"
}
5 changes: 3 additions & 2 deletions packages/common/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@streamflow/common",
"version": "6.3.1",
"version": "6.3.2",
"description": "Common utilities and types used by streamflow packages.",
"homepage": "https://github.com/streamflow-finance/js-sdk/",
"main": "dist/index.js",
Expand Down Expand Up @@ -42,6 +42,7 @@
"aptos": "1.4.0",
"bn.js": "5.2.1",
"borsh": "^2.0.0",
"bs58": "5.0.0"
"bs58": "5.0.0",
"p-queue": "^6.6.2"
}
}
6 changes: 6 additions & 0 deletions packages/common/solana/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AccountInfo, BlockhashWithExpiryBlockHeight, Commitment, Context, PublicKey } from "@solana/web3.js";
import PQueue from "p-queue";

export interface ITransactionSolanaExt {
computePrice?: number;
Expand Down Expand Up @@ -30,6 +31,11 @@ export interface ConfirmationParams {
commitment?: Commitment;
}

export interface ThrottleParams {
sendRate?: number;
sendThrottler?: PQueue;
}

export class TransactionFailedError extends Error {
constructor(m: string) {
super(m);
Expand Down
85 changes: 70 additions & 15 deletions packages/common/solana/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,24 @@ import {
SendTransactionError,
} from "@solana/web3.js";
import bs58 from "bs58";
import PQueue from "p-queue";

import { Account, AtaParams, ConfirmationParams, ITransactionSolanaExt, TransactionFailedError } from "./types";
import {
Account,
AtaParams,
ConfirmationParams,
ITransactionSolanaExt,
ThrottleParams,
TransactionFailedError,
} from "./types";
import { sleep } from "../utils";

const SIMULATE_TRIES = 3;

export const buildSendThrottler = (sendRate: number): PQueue => {
return new PQueue({ concurrency: sendRate, intervalCap: 1, interval: 1000 });
};

/**
* Wrapper function for Solana web3 getProgramAccounts with slightly better call interface
* @param {Connection} connection - Solana web3 connection object.
Expand Down Expand Up @@ -148,18 +160,20 @@ export async function signTransaction<T extends Transaction | VersionedTransacti
* @param connection - Solana client connection
* @param invoker - Keypair used as signer
* @param tx - Transaction instance
* @param {ConfirmationParams} confirmationParams - Confirmation Params that will be used for execution
* @param confirmationParams - Confirmation Params that will be used for execution
* @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much
* @returns Transaction signature
*/
export async function signAndExecuteTransaction(
connection: Connection,
invoker: Keypair | SignerWalletAdapter,
tx: Transaction | VersionedTransaction,
confirmationParams: ConfirmationParams,
throttleParams: ThrottleParams,
): Promise<string> {
const signedTx = await signTransaction(invoker, tx);

return executeTransaction(connection, signedTx, confirmationParams);
return executeTransaction(connection, signedTx, confirmationParams, throttleParams);
}

/**
Expand All @@ -173,35 +187,68 @@ export async function signAndExecuteTransaction(
* - otherwise there is a chance of marking a landed tx as failed if it was broadcasted at least once
* @param connection - Solana client connection
* @param tx - Transaction instance
* @param {ConfirmationParams} confirmationParams - Confirmation Params that will be used for execution
* @param confirmationParams - Confirmation Params that will be used for execution
* @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much
* @returns Transaction signature
*/
export async function executeTransaction(
connection: Connection,
tx: Transaction | VersionedTransaction,
confirmationParams: ConfirmationParams,
throttleParams: ThrottleParams,
): Promise<string> {
if (tx.signatures.length === 0) {
throw Error("Error with transaction parameters.");
}
await simulateTransaction(connection, tx);

return sendAndConfirmTransaction(connection, tx, confirmationParams);
return sendAndConfirmTransaction(connection, tx, confirmationParams, throttleParams);
}

/**
* Launches a PromisePool with all transaction being executed at the same time, allows to throttle all TXs through one Queue
* @param connection - Solana client connection
* @param txs - Transactions
* @param confirmationParams - Confirmation Params that will be used for execution
* @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much
* @param throttleParams.sendRate - rate
* @param throttleParams.sendThrottler - throttler instance
* @returns Raw Promise Results - should be handled by the consumer and unwrapped accordingly
*/
export async function executeMultipleTransactions(
connection: Connection,
txs: (Transaction | VersionedTransaction)[],
confirmationParams: ConfirmationParams,
{ sendRate = 1, sendThrottler }: ThrottleParams,
): Promise<PromiseSettledResult<string>[]> {
if (!sendThrottler) {
sendThrottler = buildSendThrottler(sendRate);
}
return Promise.allSettled(
txs.map((tx) =>
executeTransaction(connection, tx, confirmationParams, { sendRate: sendRate, sendThrottler: sendThrottler }),
),
);
}

/**
* Sends and confirm transaction in a loop, constantly re-broadcsting the tx until Blockheight expires.
* - we add additional 30 bocks to account for validators in an PRC pool divergence
* @param connection - Solana client connection
* @param tx - Transaction instance
* @param hash - blockhash information, the same hash should be used in the Transaction
* @param context - context at which blockhash has been retrieve
* @param commitment - optional commitment that will be used for simulation and confirmation
* @param confirmationParams - Confirmation Params that will be used for execution
* @param confirmationParams.hash - blockhash information, the same hash should be used in the Transaction
* @param confirmationParams.context - context at which blockhash has been retrieve
* @param confirmationParams.commitment - optional commitment that will be used for simulation and confirmation
* @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much
* @param throttleParams.sendRate - rate
* @param throttleParams.sendThrottler - throttler instance
*/
export async function sendAndConfirmTransaction(
connection: Connection,
tx: Transaction | VersionedTransaction,
{ hash, context, commitment }: ConfirmationParams,
{ sendRate = 1, sendThrottler }: ThrottleParams,
): Promise<string> {
const isVersioned = isTransactionVersioned(tx);

Expand All @@ -212,18 +259,24 @@ export async function sendAndConfirmTransaction(
signature = bs58.encode(tx.signature!);
}

if (!sendThrottler) {
sendThrottler = buildSendThrottler(sendRate);
}

let blockheight = await connection.getBlockHeight(commitment);
let transactionSent = false;
const rawTransaction = tx.serialize();
while (blockheight < hash.lastValidBlockHeight + 15) {
try {
if (blockheight < hash.lastValidBlockHeight || !transactionSent) {
await connection.sendRawTransaction(rawTransaction, {
maxRetries: 0,
minContextSlot: context.slot,
preflightCommitment: commitment,
skipPreflight: true,
});
await sendThrottler.add(async () =>
connection.sendRawTransaction(rawTransaction, {
maxRetries: 0,
minContextSlot: context.slot,
preflightCommitment: commitment,
skipPreflight: true,
}),
);
transactionSent = true;
}
} catch (e) {
Expand Down Expand Up @@ -413,21 +466,23 @@ export async function generateCreateAtaBatchTx(
* @param invoker - Transaction invoker and payer
* @param paramsBatch - Array of Params for an each ATA account: {mint, owner}
* @param commitment - optional commitment that will be used to fetch Blockhash
* @param rate - throttle rate for tx sending
* @returns Transaction signature
*/
export async function createAtaBatch(
connection: Connection,
invoker: Keypair | SignerWalletAdapter,
paramsBatch: AtaParams[],
commitment?: Commitment,
rate?: number,
): Promise<string> {
const { tx, hash, context } = await generateCreateAtaBatchTx(
connection,
invoker.publicKey!,
await enrichAtaParams(connection, paramsBatch),
commitment,
);
return signAndExecuteTransaction(connection, invoker, tx, { hash, context, commitment });
return signAndExecuteTransaction(connection, invoker, tx, { hash, context, commitment }, { sendRate: rate });
}

/**
Expand Down
5 changes: 3 additions & 2 deletions packages/distributor/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@streamflow/distributor",
"version": "6.3.1",
"version": "6.3.2",
"description": "JavaScript SDK to interact with Streamflow Airdrop protocol.",
"homepage": "https://github.com/streamflow-finance/js-sdk/",
"main": "dist/index.js",
Expand Down Expand Up @@ -41,6 +41,7 @@
"@streamflow/common": "workspace:*",
"bn.js": "5.2.1",
"borsh": "^2.0.0",
"bs58": "5.0.0"
"bs58": "5.0.0",
"p-queue": "^6.6.2"
}
}
Loading

0 comments on commit 254c8af

Please sign in to comment.