Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

STREAM-1500: throttle tx sending #167

Merged
merged 6 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"packages": [
"packages/*"
],
"version": "6.3.1",
"version": "6.3.2",
"$schema": "node_modules/lerna/schemas/lerna-schema.json"
}
5 changes: 3 additions & 2 deletions packages/common/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@streamflow/common",
"version": "6.3.1",
"version": "6.3.2",
"description": "Common utilities and types used by streamflow packages.",
"homepage": "https://github.com/streamflow-finance/js-sdk/",
"main": "dist/index.js",
Expand Down Expand Up @@ -42,6 +42,7 @@
"aptos": "1.4.0",
"bn.js": "5.2.1",
"borsh": "^2.0.0",
"bs58": "5.0.0"
"bs58": "5.0.0",
"p-queue": "^6.6.2"
}
}
6 changes: 6 additions & 0 deletions packages/common/solana/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { AccountInfo, BlockhashWithExpiryBlockHeight, Commitment, Context, PublicKey } from "@solana/web3.js";
import PQueue from "p-queue";

export interface ITransactionSolanaExt {
computePrice?: number;
Expand Down Expand Up @@ -30,6 +31,11 @@ export interface ConfirmationParams {
commitment?: Commitment;
}

export interface ThrottleParams {
sendRate?: number;
sendThrottler?: PQueue;
}

export class TransactionFailedError extends Error {
constructor(m: string) {
super(m);
Expand Down
85 changes: 70 additions & 15 deletions packages/common/solana/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,24 @@ import {
SendTransactionError,
} from "@solana/web3.js";
import bs58 from "bs58";
import PQueue from "p-queue";

import { Account, AtaParams, ConfirmationParams, ITransactionSolanaExt, TransactionFailedError } from "./types";
import {
Account,
AtaParams,
ConfirmationParams,
ITransactionSolanaExt,
ThrottleParams,
TransactionFailedError,
} from "./types";
import { sleep } from "../utils";

const SIMULATE_TRIES = 3;

export const buildSendThrottler = (sendRate: number): PQueue => {
return new PQueue({ concurrency: sendRate, intervalCap: 1, interval: 1000 });
};

/**
* Wrapper function for Solana web3 getProgramAccounts with slightly better call interface
* @param {Connection} connection - Solana web3 connection object.
Expand Down Expand Up @@ -148,18 +160,20 @@ export async function signTransaction<T extends Transaction | VersionedTransacti
* @param connection - Solana client connection
* @param invoker - Keypair used as signer
* @param tx - Transaction instance
* @param {ConfirmationParams} confirmationParams - Confirmation Params that will be used for execution
* @param confirmationParams - Confirmation Params that will be used for execution
* @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much
* @returns Transaction signature
*/
export async function signAndExecuteTransaction(
connection: Connection,
invoker: Keypair | SignerWalletAdapter,
tx: Transaction | VersionedTransaction,
confirmationParams: ConfirmationParams,
throttleParams: ThrottleParams,
): Promise<string> {
const signedTx = await signTransaction(invoker, tx);

return executeTransaction(connection, signedTx, confirmationParams);
return executeTransaction(connection, signedTx, confirmationParams, throttleParams);
}

/**
Expand All @@ -173,35 +187,68 @@ export async function signAndExecuteTransaction(
* - otherwise there is a chance of marking a landed tx as failed if it was broadcasted at least once
* @param connection - Solana client connection
* @param tx - Transaction instance
* @param {ConfirmationParams} confirmationParams - Confirmation Params that will be used for execution
* @param confirmationParams - Confirmation Params that will be used for execution
* @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much
* @returns Transaction signature
*/
export async function executeTransaction(
connection: Connection,
tx: Transaction | VersionedTransaction,
confirmationParams: ConfirmationParams,
throttleParams: ThrottleParams,
): Promise<string> {
if (tx.signatures.length === 0) {
throw Error("Error with transaction parameters.");
}
await simulateTransaction(connection, tx);

return sendAndConfirmTransaction(connection, tx, confirmationParams);
return sendAndConfirmTransaction(connection, tx, confirmationParams, throttleParams);
}

/**
* Launches a PromisePool with all transaction being executed at the same time, allows to throttle all TXs through one Queue
* @param connection - Solana client connection
* @param txs - Transactions
* @param confirmationParams - Confirmation Params that will be used for execution
* @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much
* @param throttleParams.sendRate - rate
* @param throttleParams.sendThrottler - throttler instance
* @returns Raw Promise Results - should be handled by the consumer and unwrapped accordingly
*/
export async function executeMultipleTransactions(
connection: Connection,
txs: (Transaction | VersionedTransaction)[],
confirmationParams: ConfirmationParams,
{ sendRate = 1, sendThrottler }: ThrottleParams,
): Promise<PromiseSettledResult<string>[]> {
if (!sendThrottler) {
sendThrottler = buildSendThrottler(sendRate);
}
return Promise.allSettled(
txs.map((tx) =>
executeTransaction(connection, tx, confirmationParams, { sendRate: sendRate, sendThrottler: sendThrottler }),
),
);
}

/**
* Sends and confirm transaction in a loop, constantly re-broadcsting the tx until Blockheight expires.
* - we add additional 30 bocks to account for validators in an PRC pool divergence
* @param connection - Solana client connection
* @param tx - Transaction instance
* @param hash - blockhash information, the same hash should be used in the Transaction
* @param context - context at which blockhash has been retrieve
* @param commitment - optional commitment that will be used for simulation and confirmation
* @param confirmationParams - Confirmation Params that will be used for execution
* @param confirmationParams.hash - blockhash information, the same hash should be used in the Transaction
* @param confirmationParams.context - context at which blockhash has been retrieve
* @param confirmationParams.commitment - optional commitment that will be used for simulation and confirmation
* @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much
* @param throttleParams.sendRate - rate
* @param throttleParams.sendThrottler - throttler instance
*/
export async function sendAndConfirmTransaction(
connection: Connection,
tx: Transaction | VersionedTransaction,
{ hash, context, commitment }: ConfirmationParams,
{ sendRate = 1, sendThrottler }: ThrottleParams,
): Promise<string> {
const isVersioned = isTransactionVersioned(tx);

Expand All @@ -212,18 +259,24 @@ export async function sendAndConfirmTransaction(
signature = bs58.encode(tx.signature!);
}

if (!sendThrottler) {
sendThrottler = buildSendThrottler(sendRate);
}

let blockheight = await connection.getBlockHeight(commitment);
let transactionSent = false;
const rawTransaction = tx.serialize();
while (blockheight < hash.lastValidBlockHeight + 15) {
try {
if (blockheight < hash.lastValidBlockHeight || !transactionSent) {
await connection.sendRawTransaction(rawTransaction, {
maxRetries: 0,
minContextSlot: context.slot,
preflightCommitment: commitment,
skipPreflight: true,
});
await sendThrottler.add(async () =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it required for us to have global-like queue so we don't hit RPCs limit across several invocations of send... APi

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be handled on the client-side, like FE should have this throttler and pass it to methods. But not in the sdk I think.

connection.sendRawTransaction(rawTransaction, {
maxRetries: 0,
minContextSlot: context.slot,
preflightCommitment: commitment,
skipPreflight: true,
}),
);
transactionSent = true;
}
} catch (e) {
Expand Down Expand Up @@ -413,21 +466,23 @@ export async function generateCreateAtaBatchTx(
* @param invoker - Transaction invoker and payer
* @param paramsBatch - Array of Params for an each ATA account: {mint, owner}
* @param commitment - optional commitment that will be used to fetch Blockhash
* @param rate - throttle rate for tx sending
* @returns Transaction signature
*/
export async function createAtaBatch(
connection: Connection,
invoker: Keypair | SignerWalletAdapter,
paramsBatch: AtaParams[],
commitment?: Commitment,
rate?: number,
): Promise<string> {
const { tx, hash, context } = await generateCreateAtaBatchTx(
connection,
invoker.publicKey!,
await enrichAtaParams(connection, paramsBatch),
commitment,
);
return signAndExecuteTransaction(connection, invoker, tx, { hash, context, commitment });
return signAndExecuteTransaction(connection, invoker, tx, { hash, context, commitment }, { sendRate: rate });
}

/**
Expand Down
5 changes: 3 additions & 2 deletions packages/distributor/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@streamflow/distributor",
"version": "6.3.1",
"version": "6.3.2",
"description": "JavaScript SDK to interact with Streamflow Airdrop protocol.",
"homepage": "https://github.com/streamflow-finance/js-sdk/",
"main": "dist/index.js",
Expand Down Expand Up @@ -41,6 +41,7 @@
"@streamflow/common": "workspace:*",
"bn.js": "5.2.1",
"borsh": "^2.0.0",
"bs58": "5.0.0"
"bs58": "5.0.0",
"p-queue": "^6.6.2"
}
}
Loading
Loading