diff --git a/lerna.json b/lerna.json index 3fa0a1dc..f8552e92 100644 --- a/lerna.json +++ b/lerna.json @@ -2,6 +2,6 @@ "packages": [ "packages/*" ], - "version": "6.3.1", + "version": "6.3.2", "$schema": "node_modules/lerna/schemas/lerna-schema.json" } diff --git a/packages/common/package.json b/packages/common/package.json index 82ec79ba..0d523573 100644 --- a/packages/common/package.json +++ b/packages/common/package.json @@ -1,6 +1,6 @@ { "name": "@streamflow/common", - "version": "6.3.1", + "version": "6.3.2", "description": "Common utilities and types used by streamflow packages.", "homepage": "https://github.com/streamflow-finance/js-sdk/", "main": "dist/index.js", @@ -42,6 +42,7 @@ "aptos": "1.4.0", "bn.js": "5.2.1", "borsh": "^2.0.0", - "bs58": "5.0.0" + "bs58": "5.0.0", + "p-queue": "^6.6.2" } } diff --git a/packages/common/solana/types.ts b/packages/common/solana/types.ts index 6e61704a..a4ef385f 100644 --- a/packages/common/solana/types.ts +++ b/packages/common/solana/types.ts @@ -1,4 +1,5 @@ import { AccountInfo, BlockhashWithExpiryBlockHeight, Commitment, Context, PublicKey } from "@solana/web3.js"; +import PQueue from "p-queue"; export interface ITransactionSolanaExt { computePrice?: number; @@ -30,6 +31,11 @@ export interface ConfirmationParams { commitment?: Commitment; } +export interface ThrottleParams { + sendRate?: number; + sendThrottler?: PQueue; +} + export class TransactionFailedError extends Error { constructor(m: string) { super(m); diff --git a/packages/common/solana/utils.ts b/packages/common/solana/utils.ts index 54e23436..19e97ad2 100644 --- a/packages/common/solana/utils.ts +++ b/packages/common/solana/utils.ts @@ -25,12 +25,24 @@ import { SendTransactionError, } from "@solana/web3.js"; import bs58 from "bs58"; +import PQueue from "p-queue"; -import { Account, AtaParams, ConfirmationParams, ITransactionSolanaExt, TransactionFailedError } from "./types"; +import { + Account, + AtaParams, + ConfirmationParams, + ITransactionSolanaExt, + ThrottleParams, + TransactionFailedError, +} from "./types"; import { sleep } from "../utils"; const SIMULATE_TRIES = 3; +export const buildSendThrottler = (sendRate: number): PQueue => { + return new PQueue({ concurrency: sendRate, intervalCap: 1, interval: 1000 }); +}; + /** * Wrapper function for Solana web3 getProgramAccounts with slightly better call interface * @param {Connection} connection - Solana web3 connection object. @@ -148,7 +160,8 @@ export async function signTransaction { const signedTx = await signTransaction(invoker, tx); - return executeTransaction(connection, signedTx, confirmationParams); + return executeTransaction(connection, signedTx, confirmationParams, throttleParams); } /** @@ -173,20 +187,48 @@ export async function signAndExecuteTransaction( * - otherwise there is a chance of marking a landed tx as failed if it was broadcasted at least once * @param connection - Solana client connection * @param tx - Transaction instance - * @param {ConfirmationParams} confirmationParams - Confirmation Params that will be used for execution + * @param confirmationParams - Confirmation Params that will be used for execution + * @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much * @returns Transaction signature */ export async function executeTransaction( connection: Connection, tx: Transaction | VersionedTransaction, confirmationParams: ConfirmationParams, + throttleParams: ThrottleParams, ): Promise { if (tx.signatures.length === 0) { throw Error("Error with transaction parameters."); } await simulateTransaction(connection, tx); - return sendAndConfirmTransaction(connection, tx, confirmationParams); + return sendAndConfirmTransaction(connection, tx, confirmationParams, throttleParams); +} + +/** + * Launches a PromisePool with all transaction being executed at the same time, allows to throttle all TXs through one Queue + * @param connection - Solana client connection + * @param txs - Transactions + * @param confirmationParams - Confirmation Params that will be used for execution + * @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much + * @param throttleParams.sendRate - rate + * @param throttleParams.sendThrottler - throttler instance + * @returns Raw Promise Results - should be handled by the consumer and unwrapped accordingly + */ +export async function executeMultipleTransactions( + connection: Connection, + txs: (Transaction | VersionedTransaction)[], + confirmationParams: ConfirmationParams, + { sendRate = 1, sendThrottler }: ThrottleParams, +): Promise[]> { + if (!sendThrottler) { + sendThrottler = buildSendThrottler(sendRate); + } + return Promise.allSettled( + txs.map((tx) => + executeTransaction(connection, tx, confirmationParams, { sendRate: sendRate, sendThrottler: sendThrottler }), + ), + ); } /** @@ -194,14 +236,19 @@ export async function executeTransaction( * - we add additional 30 bocks to account for validators in an PRC pool divergence * @param connection - Solana client connection * @param tx - Transaction instance - * @param hash - blockhash information, the same hash should be used in the Transaction - * @param context - context at which blockhash has been retrieve - * @param commitment - optional commitment that will be used for simulation and confirmation + * @param confirmationParams - Confirmation Params that will be used for execution + * @param confirmationParams.hash - blockhash information, the same hash should be used in the Transaction + * @param confirmationParams.context - context at which blockhash has been retrieve + * @param confirmationParams.commitment - optional commitment that will be used for simulation and confirmation + * @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much + * @param throttleParams.sendRate - rate + * @param throttleParams.sendThrottler - throttler instance */ export async function sendAndConfirmTransaction( connection: Connection, tx: Transaction | VersionedTransaction, { hash, context, commitment }: ConfirmationParams, + { sendRate = 1, sendThrottler }: ThrottleParams, ): Promise { const isVersioned = isTransactionVersioned(tx); @@ -212,18 +259,24 @@ export async function sendAndConfirmTransaction( signature = bs58.encode(tx.signature!); } + if (!sendThrottler) { + sendThrottler = buildSendThrottler(sendRate); + } + let blockheight = await connection.getBlockHeight(commitment); let transactionSent = false; const rawTransaction = tx.serialize(); while (blockheight < hash.lastValidBlockHeight + 15) { try { if (blockheight < hash.lastValidBlockHeight || !transactionSent) { - await connection.sendRawTransaction(rawTransaction, { - maxRetries: 0, - minContextSlot: context.slot, - preflightCommitment: commitment, - skipPreflight: true, - }); + await sendThrottler.add(async () => + connection.sendRawTransaction(rawTransaction, { + maxRetries: 0, + minContextSlot: context.slot, + preflightCommitment: commitment, + skipPreflight: true, + }), + ); transactionSent = true; } } catch (e) { @@ -413,6 +466,7 @@ export async function generateCreateAtaBatchTx( * @param invoker - Transaction invoker and payer * @param paramsBatch - Array of Params for an each ATA account: {mint, owner} * @param commitment - optional commitment that will be used to fetch Blockhash + * @param rate - throttle rate for tx sending * @returns Transaction signature */ export async function createAtaBatch( @@ -420,6 +474,7 @@ export async function createAtaBatch( invoker: Keypair | SignerWalletAdapter, paramsBatch: AtaParams[], commitment?: Commitment, + rate?: number, ): Promise { const { tx, hash, context } = await generateCreateAtaBatchTx( connection, @@ -427,7 +482,7 @@ export async function createAtaBatch( await enrichAtaParams(connection, paramsBatch), commitment, ); - return signAndExecuteTransaction(connection, invoker, tx, { hash, context, commitment }); + return signAndExecuteTransaction(connection, invoker, tx, { hash, context, commitment }, { sendRate: rate }); } /** diff --git a/packages/distributor/package.json b/packages/distributor/package.json index 93343735..8316920f 100644 --- a/packages/distributor/package.json +++ b/packages/distributor/package.json @@ -1,6 +1,6 @@ { "name": "@streamflow/distributor", - "version": "6.3.1", + "version": "6.3.2", "description": "JavaScript SDK to interact with Streamflow Airdrop protocol.", "homepage": "https://github.com/streamflow-finance/js-sdk/", "main": "dist/index.js", @@ -41,6 +41,7 @@ "@streamflow/common": "workspace:*", "bn.js": "5.2.1", "borsh": "^2.0.0", - "bs58": "5.0.0" + "bs58": "5.0.0", + "p-queue": "^6.6.2" } } diff --git a/packages/distributor/solana/client.ts b/packages/distributor/solana/client.ts index 5dfddac7..4c3547fa 100644 --- a/packages/distributor/solana/client.ts +++ b/packages/distributor/solana/client.ts @@ -1,4 +1,5 @@ import BN from "bn.js"; +import PQueue from "p-queue"; import { ASSOCIATED_TOKEN_PROGRAM_ID, NATIVE_MINT, createTransferCheckedInstruction } from "@solana/spl-token"; import { Connection, @@ -16,6 +17,7 @@ import { prepareBaseInstructions, prepareTransaction, getMintAndProgram, + buildSendThrottler, } from "@streamflow/common/solana"; import { DISTRIBUTOR_PROGRAM_ID } from "./constants"; @@ -54,6 +56,8 @@ interface IInitOptions { cluster?: ICluster; commitment?: Commitment | ConnectionConfig; programId?: string; + sendRate?: number; + sendThrottler?: PQueue; } export default class SolanaDistributorClient { @@ -63,33 +67,40 @@ export default class SolanaDistributorClient { private commitment: Commitment | ConnectionConfig; + private sendThrottler: PQueue; + /** * Create Stream instance */ - constructor({ clusterUrl, cluster = ICluster.Mainnet, commitment = "confirmed", programId = "" }: IInitOptions) { + constructor({ + clusterUrl, + cluster = ICluster.Mainnet, + commitment = "confirmed", + programId = "", + sendRate = 1, + sendThrottler, + }: IInitOptions) { this.commitment = commitment; this.connection = new Connection(clusterUrl, this.commitment); this.programId = programId !== "" ? new PublicKey(programId) : new PublicKey(DISTRIBUTOR_PROGRAM_ID[cluster]); + this.sendThrottler = sendThrottler ?? buildSendThrottler(sendRate); } public getCommitment(): Commitment | undefined { return typeof this.commitment == "string" ? this.commitment : this.commitment.commitment; } - public async create( - data: ICreateDistributorData, - { invoker, isNative = false, computePrice, computeLimit }: ICreateSolanaExt, - ): Promise { - if (!invoker.publicKey) { + public async create(data: ICreateDistributorData, extParams: ICreateSolanaExt): Promise { + if (!extParams.invoker.publicKey) { throw new Error("Invoker's PublicKey is not available, check passed wallet adapter!"); } - const ixs: TransactionInstruction[] = prepareBaseInstructions(this.connection, { computePrice, computeLimit }); - const mint = isNative ? NATIVE_MINT : new PublicKey(data.mint); + const ixs: TransactionInstruction[] = prepareBaseInstructions(this.connection, extParams); + const mint = extParams.isNative ? NATIVE_MINT : new PublicKey(data.mint); const { mint: mintAccount, tokenProgramId } = await getMintAndProgram(this.connection, mint); const distributorPublicKey = getDistributorPda(this.programId, mint, data.version); const tokenVault = await ata(mint, distributorPublicKey, tokenProgramId); - const senderTokens = await ata(mint, invoker.publicKey, tokenProgramId); + const senderTokens = await ata(mint, extParams.invoker.publicKey, tokenProgramId); const args: NewDistributorArgs = { version: new BN(data.version, 10), @@ -107,14 +118,14 @@ export default class SolanaDistributorClient { clawbackReceiver: senderTokens, mint, tokenVault, - admin: invoker.publicKey, + admin: extParams.invoker.publicKey, systemProgram: SystemProgram.programId, associatedTokenProgram: ASSOCIATED_TOKEN_PROGRAM_ID, tokenProgram: tokenProgramId, }; - if (isNative) { - ixs.push(...(await prepareWrappedAccount(this.connection, invoker.publicKey, data.maxTotalClaim))); + if (extParams.isNative) { + ixs.push(...(await prepareWrappedAccount(this.connection, extParams.invoker.publicKey, data.maxTotalClaim))); } const nowTs = new BN(Math.floor(Date.now() / 1000)); @@ -130,7 +141,7 @@ export default class SolanaDistributorClient { senderTokens, mint, tokenVault, - invoker.publicKey, + extParams.invoker.publicKey, BigInt(data.maxTotalClaim.toString()), mintAccount.decimals, undefined, @@ -138,12 +149,18 @@ export default class SolanaDistributorClient { ), ); - const { tx, hash, context } = await prepareTransaction(this.connection, ixs, invoker.publicKey); - const signature = await wrappedSignAndExecuteTransaction(this.connection, invoker, tx, { - hash, - context, - commitment: this.getCommitment(), - }); + const { tx, hash, context } = await prepareTransaction(this.connection, ixs, extParams.invoker.publicKey); + const signature = await wrappedSignAndExecuteTransaction( + this.connection, + extParams.invoker, + tx, + { + hash, + context, + commitment: this.getCommitment(), + }, + { sendThrottler: this.sendThrottler }, + ); return { ixs, @@ -152,11 +169,8 @@ export default class SolanaDistributorClient { }; } - public async claim( - data: IClaimData, - { invoker, computePrice, computeLimit }: IInteractSolanaExt, - ): Promise { - if (!invoker.publicKey) { + public async claim(data: IClaimData, extParams: IInteractSolanaExt): Promise { + if (!extParams.invoker.publicKey) { throw new Error("Invoker's PublicKey is not available, check passed wallet adapter!"); } @@ -168,12 +182,22 @@ export default class SolanaDistributorClient { } const { tokenProgramId } = await getMintAndProgram(this.connection, distributor.mint); - const ixs: TransactionInstruction[] = prepareBaseInstructions(this.connection, { computePrice, computeLimit }); + const ixs: TransactionInstruction[] = prepareBaseInstructions(this.connection, extParams); ixs.push( - ...(await checkOrCreateAtaBatch(this.connection, [invoker.publicKey], distributor.mint, invoker, tokenProgramId)), + ...(await checkOrCreateAtaBatch( + this.connection, + [extParams.invoker.publicKey], + distributor.mint, + extParams.invoker, + tokenProgramId, + )), + ); + const invokerTokens = await ata(distributor.mint, extParams.invoker.publicKey, tokenProgramId); + const claimStatusPublicKey = getClaimantStatusPda( + this.programId, + distributorPublicKey, + extParams.invoker.publicKey, ); - const invokerTokens = await ata(distributor.mint, invoker.publicKey, tokenProgramId); - const claimStatusPublicKey = getClaimantStatusPda(this.programId, distributorPublicKey, invoker.publicKey); const eventAuthorityPublicKey = getEventAuthorityPda(this.programId); const claimStatus = await ClaimStatus.fetch(this.connection, claimStatusPublicKey); @@ -182,7 +206,7 @@ export default class SolanaDistributorClient { claimStatus: claimStatusPublicKey, from: distributor.tokenVault, to: invokerTokens, - claimant: invoker.publicKey, + claimant: extParams.invoker.publicKey, mint: distributor.mint, tokenProgram: tokenProgramId, systemProgram: SystemProgram.programId, @@ -204,21 +228,24 @@ export default class SolanaDistributorClient { ixs.push(claimLocked(accounts, this.programId)); } - const { tx, hash, context } = await prepareTransaction(this.connection, ixs, invoker.publicKey); - const signature = await wrappedSignAndExecuteTransaction(this.connection, invoker, tx, { - hash, - context, - commitment: this.getCommitment(), - }); + const { tx, hash, context } = await prepareTransaction(this.connection, ixs, extParams.invoker.publicKey); + const signature = await wrappedSignAndExecuteTransaction( + this.connection, + extParams.invoker, + tx, + { + hash, + context, + commitment: this.getCommitment(), + }, + { sendThrottler: this.sendThrottler }, + ); return { ixs, txId: signature }; } - public async clawback( - data: IClawbackData, - { invoker, computePrice, computeLimit }: IInteractSolanaExt, - ): Promise { - if (!invoker.publicKey) { + public async clawback(data: IClawbackData, extParams: IInteractSolanaExt): Promise { + if (!extParams.invoker.publicKey) { throw new Error("Invoker's PublicKey is not available, check passed wallet adapter!"); } @@ -230,15 +257,21 @@ export default class SolanaDistributorClient { } const { tokenProgramId } = await getMintAndProgram(this.connection, distributor.mint); - const ixs: TransactionInstruction[] = prepareBaseInstructions(this.connection, { computePrice, computeLimit }); + const ixs: TransactionInstruction[] = prepareBaseInstructions(this.connection, extParams); ixs.push( - ...(await checkOrCreateAtaBatch(this.connection, [invoker.publicKey], distributor.mint, invoker, tokenProgramId)), + ...(await checkOrCreateAtaBatch( + this.connection, + [extParams.invoker.publicKey], + distributor.mint, + extParams.invoker, + tokenProgramId, + )), ); const accounts: ClawbackAccounts = { distributor: distributorPublicKey, from: distributor.tokenVault, to: distributor.clawbackReceiver, - admin: invoker.publicKey, + admin: extParams.invoker.publicKey, mint: distributor.mint, systemProgram: SystemProgram.programId, tokenProgram: tokenProgramId, @@ -246,12 +279,18 @@ export default class SolanaDistributorClient { ixs.push(clawback(accounts, this.programId)); - const { tx, hash, context } = await prepareTransaction(this.connection, ixs, invoker.publicKey); - const signature = await wrappedSignAndExecuteTransaction(this.connection, invoker, tx, { - hash, - context, - commitment: this.getCommitment(), - }); + const { tx, hash, context } = await prepareTransaction(this.connection, ixs, extParams.invoker.publicKey); + const signature = await wrappedSignAndExecuteTransaction( + this.connection, + extParams.invoker, + tx, + { + hash, + context, + commitment: this.getCommitment(), + }, + { sendThrottler: this.sendThrottler }, + ); return { ixs, txId: signature }; } diff --git a/packages/distributor/solana/utils.ts b/packages/distributor/solana/utils.ts index a18b8089..c0ae3540 100644 --- a/packages/distributor/solana/utils.ts +++ b/packages/distributor/solana/utils.ts @@ -1,7 +1,7 @@ import { SignerWalletAdapter } from "@solana/wallet-adapter-base"; import { Connection, Keypair, PublicKey, Transaction, VersionedTransaction } from "@solana/web3.js"; import { ContractError } from "@streamflow/common"; -import { ConfirmationParams, signAndExecuteTransaction } from "@streamflow/common/solana"; +import { ConfirmationParams, signAndExecuteTransaction, ThrottleParams } from "@streamflow/common/solana"; import { fromTxError } from "./generated/errors"; @@ -38,9 +38,10 @@ export async function wrappedSignAndExecuteTransaction( invoker: Keypair | SignerWalletAdapter, tx: Transaction | VersionedTransaction, confirmationParams: ConfirmationParams, + throttleParams: ThrottleParams, ): Promise { try { - return await signAndExecuteTransaction(connection, invoker, tx, confirmationParams); + return await signAndExecuteTransaction(connection, invoker, tx, confirmationParams, throttleParams); } catch (err: any) { if (err instanceof Error) { const parsed = fromTxError(err); diff --git a/packages/eslint-config/package.json b/packages/eslint-config/package.json index d669ab04..a113bc82 100644 --- a/packages/eslint-config/package.json +++ b/packages/eslint-config/package.json @@ -1,6 +1,6 @@ { "name": "@streamflow/eslint-config", - "version": "6.3.1", + "version": "6.3.2", "license": "ISC", "main": "index.js", "files": [ diff --git a/packages/stream/common/GenericStreamClient.ts b/packages/stream/common/GenericStreamClient.ts index e4d54810..5c59ade3 100644 --- a/packages/stream/common/GenericStreamClient.ts +++ b/packages/stream/common/GenericStreamClient.ts @@ -1,5 +1,6 @@ import { Commitment, ConnectionConfig } from "@solana/web3.js"; import { Signer } from "ethers"; +import PQueue from "p-queue"; import { BaseStreamClient } from "./BaseStreamClient"; import { @@ -34,6 +35,8 @@ export interface SolanaStreamClientOptions { cluster?: ICluster; programId?: string; commitment?: Commitment | ConnectionConfig; + sendRate?: number; + sendThrottler?: PQueue; } export interface AptosStreamClientOptions { @@ -126,6 +129,8 @@ export default class GenericStreamClient extends BaseStreamCli options.cluster, options.commitment, options.programId, + options.sendRate, + options.sendThrottler, ) as StreamClientType; break; case IChain.Aptos: diff --git a/packages/stream/package.json b/packages/stream/package.json index 690e0730..1fe2e352 100644 --- a/packages/stream/package.json +++ b/packages/stream/package.json @@ -1,6 +1,6 @@ { "name": "@streamflow/stream", - "version": "6.3.1", + "version": "6.3.2", "description": "JavaScript SDK to interact with Streamflow protocol.", "homepage": "https://github.com/streamflow-finance/js-sdk/", "main": "dist/index.js", @@ -62,6 +62,7 @@ "bs58": "5.0.0", "ethereum-checksum-address": "0.0.8", "ethers": "5.7.2", - "js-sha256": "0.9.0" + "js-sha256": "0.9.0", + "p-queue": "^6.6.2" } } diff --git a/packages/stream/solana/StreamClient.ts b/packages/stream/solana/StreamClient.ts index a3d5734c..68cc688c 100644 --- a/packages/stream/solana/StreamClient.ts +++ b/packages/stream/solana/StreamClient.ts @@ -2,6 +2,7 @@ import BN from "bn.js"; import { Buffer } from "buffer"; +import PQueue from "p-queue"; import { ASSOCIATED_TOKEN_PROGRAM_ID, NATIVE_MINT } from "@solana/spl-token"; import { Connection, @@ -25,6 +26,9 @@ import { prepareTransaction, prepareBaseInstructions, getMintAndProgram, + executeTransaction, + executeMultipleTransactions, + buildSendThrottler, } from "@streamflow/common/solana"; import * as borsh from "borsh"; @@ -33,8 +37,6 @@ import { MetadataRecipientHashMap, Contract, BatchItem, - BatchItemSuccess, - BatchItemError, ICreateStreamSolanaExt, IInteractStreamSolanaExt, ITopUpStreamSolanaExt, @@ -104,6 +106,8 @@ export default class SolanaStreamClient extends BaseStreamClient { private commitment: Commitment | ConnectionConfig; + private sendThrottler: PQueue; + /** * Create Stream instance */ @@ -112,11 +116,14 @@ export default class SolanaStreamClient extends BaseStreamClient { cluster: ICluster = ICluster.Mainnet, commitment: Commitment | ConnectionConfig = "confirmed", programId = "", + sendRate = 1, + sendThrottler?: PQueue, ) { super(); this.commitment = commitment; this.connection = new Connection(clusterUrl, this.commitment); this.programId = programId !== "" ? new PublicKey(programId) : new PublicKey(PROGRAM_ID[cluster]); + this.sendThrottler = sendThrottler ?? buildSendThrottler(sendRate); } public getConnection(): Connection { @@ -144,11 +151,17 @@ export default class SolanaStreamClient extends BaseStreamClient { undefined, metadata, ); - const signature = await signAndExecuteTransaction(this.connection, extParams.sender, tx, { - hash, - context, - commitment: this.getCommitment(), - }); + const signature = await signAndExecuteTransaction( + this.connection, + extParams.sender, + tx, + { + hash, + context, + commitment: this.getCommitment(), + }, + { sendThrottler: this.sendThrottler }, + ); return { ixs, txId: signature, metadataId: metadataPubKey.toBase58() }; } @@ -281,11 +294,17 @@ export default class SolanaStreamClient extends BaseStreamClient { undefined, metadata, ); - const signature = await signAndExecuteTransaction(this.connection, extParams.sender, tx, { - hash, - context, - commitment: this.getCommitment(), - }); + const signature = await signAndExecuteTransaction( + this.connection, + extParams.sender, + tx, + { + hash, + context, + commitment: this.getCommitment(), + }, + { sendThrottler: this.sendThrottler }, + ); return { ixs, txId: signature, metadataId: metadataPubKey.toBase58() }; } @@ -476,17 +495,22 @@ export default class SolanaStreamClient extends BaseStreamClient { if (isNative) { const prepareTx = signedBatch.pop(); - await sendAndConfirmStreamRawTransaction(this.connection, prepareTx!, { hash, context }); + await sendAndConfirmStreamRawTransaction( + this.connection, + prepareTx!, + { hash, context }, + { sendThrottler: this.sendThrottler }, + ); } - const responses: PromiseSettledResult[] = []; + const responses: PromiseSettledResult[] = []; if (metadataPubKeys.length > 0) { //if metadata pub keys were passed we should execute transaction sequentially //ephemeral signer need to be used first before proceeding with the next for (const batchTx of signedBatch) { responses.push( ...(await Promise.allSettled([ - sendAndConfirmStreamRawTransaction(this.connection, batchTx, { hash, context }), + executeTransaction(this.connection, batchTx.tx, { hash, context }, { sendThrottler: this.sendThrottler }), ])), ); } @@ -494,24 +518,27 @@ export default class SolanaStreamClient extends BaseStreamClient { //send all transactions in parallel and wait for them to settle. //it allows to speed up the process of sending transactions //we then filter all promise responses and handle failed transactions - const batchTransactionsCalls = signedBatch.map((el) => - sendAndConfirmStreamRawTransaction(this.connection, el, { hash, context }), + responses.push( + ...(await executeMultipleTransactions( + this.connection, + signedBatch.map((item) => item.tx), + { hash, context }, + { sendThrottler: this.sendThrottler }, + )), ); - responses.push(...(await Promise.allSettled(batchTransactionsCalls))); } - const successes = responses - .filter((el): el is PromiseFulfilledResult => el.status === "fulfilled") - .map((el) => el.value); - signatures.push(...successes.map((el) => el.signature)); - - const failures = responses - .filter((el): el is PromiseRejectedResult => el.status === "rejected") - .map((el) => ({ - ...(el.reason as BatchItemError), - contractErrorCode: this.extractErrorCode(el.reason.error) || undefined, - })); - errors.push(...failures); + responses.forEach((item, index) => { + if (item.status === "fulfilled") { + signatures.push(item.value); + } else { + errors.push({ + recipient: signedBatch[index].recipient, + error: item.reason, + contractErrorCode: this.extractErrorCode(item.reason) || undefined, + }); + } + }); return { txs: signatures, metadatas, metadataToRecipient, errors }; } @@ -525,11 +552,17 @@ export default class SolanaStreamClient extends BaseStreamClient { ): Promise { const ixs: TransactionInstruction[] = await this.prepareWithdrawInstructions({ id, amount }, extParams); const { tx, hash, context } = await prepareTransaction(this.connection, ixs, extParams.invoker.publicKey); - const signature = await signAndExecuteTransaction(this.connection, extParams.invoker, tx, { - hash, - context, - commitment: this.getCommitment(), - }); + const signature = await signAndExecuteTransaction( + this.connection, + extParams.invoker, + tx, + { + hash, + context, + commitment: this.getCommitment(), + }, + { sendThrottler: this.sendThrottler }, + ); return { ixs, txId: signature }; } @@ -587,11 +620,17 @@ export default class SolanaStreamClient extends BaseStreamClient { public async cancel({ id }: ICancelData, extParams: IInteractStreamSolanaExt): Promise { const ixs = await this.prepareCancelInstructions({ id }, extParams); const { tx, hash, context } = await prepareTransaction(this.connection, ixs, extParams.invoker.publicKey); - const signature = await signAndExecuteTransaction(this.connection, extParams.invoker, tx, { - hash, - context, - commitment: this.getCommitment(), - }); + const signature = await signAndExecuteTransaction( + this.connection, + extParams.invoker, + tx, + { + hash, + context, + commitment: this.getCommitment(), + }, + { sendThrottler: this.sendThrottler }, + ); return { ixs, txId: signature }; } @@ -656,11 +695,17 @@ export default class SolanaStreamClient extends BaseStreamClient { ): Promise { const ixs: TransactionInstruction[] = await this.prepareTransferInstructions({ id, newRecipient }, extParams); const { tx, hash, context } = await prepareTransaction(this.connection, ixs, extParams.invoker.publicKey); - const signature = await signAndExecuteTransaction(this.connection, extParams.invoker, tx, { - hash, - context, - commitment: this.getCommitment(), - }); + const signature = await signAndExecuteTransaction( + this.connection, + extParams.invoker, + tx, + { + hash, + context, + commitment: this.getCommitment(), + }, + { sendThrottler: this.sendThrottler }, + ); return { ixs, txId: signature }; } @@ -715,11 +760,17 @@ export default class SolanaStreamClient extends BaseStreamClient { public async topup({ id, amount }: ITopUpData, extParams: ITopUpStreamSolanaExt): Promise { const ixs: TransactionInstruction[] = await this.prepareTopupInstructions({ id, amount }, extParams); const { tx, hash, context } = await prepareTransaction(this.connection, ixs, extParams.invoker.publicKey); - const signature = await signAndExecuteTransaction(this.connection, extParams.invoker, tx, { - hash, - context, - commitment: this.getCommitment(), - }); + const signature = await signAndExecuteTransaction( + this.connection, + extParams.invoker, + tx, + { + hash, + context, + commitment: this.getCommitment(), + }, + { sendThrottler: this.sendThrottler }, + ); return { ixs, txId: signature }; } @@ -837,11 +888,17 @@ export default class SolanaStreamClient extends BaseStreamClient { public async update(data: IUpdateData, extParams: IInteractStreamSolanaExt): Promise { const ixs = await this.prepareUpdateInstructions(data, extParams); const { tx, hash, context } = await prepareTransaction(this.connection, ixs, extParams.invoker.publicKey); - const signature = await signAndExecuteTransaction(this.connection, extParams.invoker, tx, { - hash, - context, - commitment: this.getCommitment(), - }); + const signature = await signAndExecuteTransaction( + this.connection, + extParams.invoker, + tx, + { + hash, + context, + commitment: this.getCommitment(), + }, + { sendThrottler: this.sendThrottler }, + ); return { ixs, diff --git a/packages/stream/solana/utils.ts b/packages/stream/solana/utils.ts index ac84f12a..83a10330 100644 --- a/packages/stream/solana/utils.ts +++ b/packages/stream/solana/utils.ts @@ -1,6 +1,12 @@ import { SignerWalletAdapter } from "@solana/wallet-adapter-base"; import { Connection, Keypair, PublicKey } from "@solana/web3.js"; -import { ConfirmationParams, executeTransaction, isSignerKeypair, isSignerWallet } from "@streamflow/common/solana"; +import { + ConfirmationParams, + executeTransaction, + isSignerKeypair, + isSignerWallet, + ThrottleParams, +} from "@streamflow/common/solana"; import BN from "bn.js"; import { streamLayout } from "./layout"; @@ -93,16 +99,18 @@ export async function signAllTransactionWithRecipients( * Sign passed BatchItems with wallet request or KeyPair * @param {Connection} connection - Solana web3 connection object. * @param {BatchItem} batchItem - Signed transaction ready to be send. - * @param {ConfirmationParams} confirmationParams - Confirmation Params that will be used for execution + * @param confirmationParams - Confirmation Params that will be used for execution + * @param throttleParams - rate or throttler instance to throttle TX sending - to not spam the blockchain too much * @return {Promise} - Returns settled transaction item */ export async function sendAndConfirmStreamRawTransaction( connection: Connection, batchItem: BatchItem, confirmationParams: ConfirmationParams, + throttleParams: ThrottleParams, ): Promise { try { - const completedTxSignature = await executeTransaction(connection, batchItem.tx, confirmationParams); + const completedTxSignature = await executeTransaction(connection, batchItem.tx, confirmationParams, throttleParams); return { ...batchItem, signature: completedTxSignature }; } catch (error: any) { throw { diff --git a/packages/stream/sui/constants.ts b/packages/stream/sui/constants.ts index 5667ff00..f6f74196 100644 --- a/packages/stream/sui/constants.ts +++ b/packages/stream/sui/constants.ts @@ -3,23 +3,23 @@ import { ContractErrorCode, ICluster } from "../common/types"; // TODO: remove Devnet and Local addresses as they are not deployed, they are just a copy Testnet values export const SUI_PROGRAM_IDS: Record = { [ICluster.Mainnet]: "0x19007172b22fcfc00c22f1598a9cec7ab52de2b0ea0d111813bc1295a136fc10", - [ICluster.Devnet]: "0xf1916c119a6c917d4b36f96ffc0443930745789f3126a716e05a62223c48993a", - [ICluster.Testnet]: "0xc76590fa8711cd77b9360e00761d80cd09ceb8638e6c8ebcbcd749dcd21a9466", - [ICluster.Local]: "0xf1916c119a6c917d4b36f96ffc0443930745789f3126a716e05a62223c48993a", + [ICluster.Devnet]: "0x34dacd839e1758ebb70bdcd4e08f18fbfef5e2b96d9aa7ce2b4a71d96ca0df5a", + [ICluster.Testnet]: "0x34dacd839e1758ebb70bdcd4e08f18fbfef5e2b96d9aa7ce2b4a71d96ca0df5a", + [ICluster.Local]: "0x34dacd839e1758ebb70bdcd4e08f18fbfef5e2b96d9aa7ce2b4a71d96ca0df5a", }; export const SUI_CONFIG_IDS: Record = { [ICluster.Mainnet]: "0x6cf6760b64245b8d23ef57c28ddceb6adbd540a23a509fef29b82237da4ab87b", - [ICluster.Devnet]: "0x9cdb344873cd2995cab624f192fbe0b358e136c33acbdf7523916e32f24df44b", - [ICluster.Testnet]: "0x9cdb344873cd2995cab624f192fbe0b358e136c33acbdf7523916e32f24df44b", - [ICluster.Local]: "0x9cdb344873cd2995cab624f192fbe0b358e136c33acbdf7523916e32f24df44b", + [ICluster.Devnet]: "0xd6c9f5074584f58074ce56e3c5cc436d82258ec3285f186a0d6438a60bacdbf8", + [ICluster.Testnet]: "0xd6c9f5074584f58074ce56e3c5cc436d82258ec3285f186a0d6438a60bacdbf8", + [ICluster.Local]: "0xd6c9f5074584f58074ce56e3c5cc436d82258ec3285f186a0d6438a60bacdbf8", }; export const SUI_FEE_TABLE_IDS: Record = { [ICluster.Mainnet]: "0xad9b75399632583fb9fcae6b5bcca34e6542ab3bedb630ecbd3f15cb1cc48dbe", - [ICluster.Devnet]: "0xf3661941207b5027fb4b85a74ca5a9fd1389fb57a8f2c57bd312b950e7d48012", - [ICluster.Testnet]: "0xf3661941207b5027fb4b85a74ca5a9fd1389fb57a8f2c57bd312b950e7d48012", - [ICluster.Local]: "0xf3661941207b5027fb4b85a74ca5a9fd1389fb57a8f2c57bd312b950e7d48012", + [ICluster.Devnet]: "0x6057b093904c5bdfec1067d2b8a4dc35f65bcab03b3029ad716c4882b0f36078", + [ICluster.Testnet]: "0x6057b093904c5bdfec1067d2b8a4dc35f65bcab03b3029ad716c4882b0f36078", + [ICluster.Local]: "0x6057b093904c5bdfec1067d2b8a4dc35f65bcab03b3029ad716c4882b0f36078", }; export const SUI_ERROR_MATCH_REGEX = diff --git a/packages/stream/sui/wallet.ts b/packages/stream/sui/wallet.ts index 18fd8d63..ce2227f8 100644 --- a/packages/stream/sui/wallet.ts +++ b/packages/stream/sui/wallet.ts @@ -6,14 +6,15 @@ import { SuiSignAndExecuteTransactionBlockInput } from "./types"; /** * Utility function to check if the transaction initiator is a Wallet object - * @param {Keypair | SignerWalletAdapter} walletOrKeypair - Wallet or Keypair in question + * @param {Keypair | WalletContextState} walletOrKeypair - Wallet or Keypair in question * @return {boolean} - Returns true if parameter is a Wallet. */ export function isSignerKeypair(walletOrKeypair: Keypair | WalletContextState): walletOrKeypair is Keypair { return ( walletOrKeypair instanceof Keypair || walletOrKeypair.constructor === Keypair || - walletOrKeypair.constructor.name === Keypair.prototype.constructor.name + walletOrKeypair.constructor.name === Keypair.prototype.constructor.name || + "export" in walletOrKeypair ); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 78caf7cf..6c0335a1 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -68,6 +68,9 @@ importers: bs58: specifier: 5.0.0 version: 5.0.0 + p-queue: + specifier: ^6.6.2 + version: 6.6.2 devDependencies: '@streamflow/eslint-config': specifier: workspace:* @@ -120,6 +123,9 @@ importers: bs58: specifier: 5.0.0 version: 5.0.0 + p-queue: + specifier: ^6.6.2 + version: 6.6.2 devDependencies: '@streamflow/eslint-config': specifier: workspace:* @@ -217,6 +223,9 @@ importers: js-sha256: specifier: 0.9.0 version: 0.9.0 + p-queue: + specifier: ^6.6.2 + version: 6.6.2 devDependencies: '@streamflow/eslint-config': specifier: workspace:* @@ -8679,7 +8688,6 @@ packages: /p-finally@1.0.0: resolution: {integrity: sha512-LICb2p9CB7FS+0eR1oqWnHhp0FljGLZCWBE9aix0Uye9W8LTQPwMTYVGWQWIw9RdQiDg4+epXQODwIYJtSJaow==} engines: {node: '>=4'} - dev: true /p-limit@1.3.0: resolution: {integrity: sha512-vvcXsLAJ9Dr5rQOPk7toZQZJApBl2K4J6dANSsEuh6QI41JYcsS/qhTGa9ErIUUgK3WNQoJYvylxvjqmiqEA9Q==} @@ -8744,7 +8752,6 @@ packages: dependencies: eventemitter3: 4.0.7 p-timeout: 3.2.0 - dev: true /p-reduce@2.1.0: resolution: {integrity: sha512-2USApvnsutq8uoxZBGbbWM0JIYLiEMJ9RlaN7fAzVNb9OZN0SHjjTTfIcb667XynS5Y1VhwDJVDa72TnPzAYWw==} @@ -8756,7 +8763,6 @@ packages: engines: {node: '>=8'} dependencies: p-finally: 1.0.0 - dev: true /p-try@1.0.0: resolution: {integrity: sha512-U1etNYuMJoIz3ZXSrrySFjsXQTWOx2/jdi86L+2pRvph/qMKL6sbcCYdH23fqsbm8TH2Gn0OybpT4eSFlCVHww==}