Skip to content

Commit

Permalink
STREAM-1451: solana reliable tx execution (#158)
Browse files Browse the repository at this point in the history
* STREAM-1451: solana reliable tx execution
  • Loading branch information
Yolley committed Apr 5, 2024
1 parent d8f882d commit 170947f
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 130 deletions.
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"packages": [
"packages/*"
],
"version": "6.0.3",
"version": "6.1.0",
"$schema": "node_modules/lerna/schemas/lerna-schema.json"
}
2 changes: 1 addition & 1 deletion packages/common/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@streamflow/common",
"version": "6.0.3",
"version": "6.1.0",
"description": "Common utilities and types used by streamflow packages.",
"homepage": "https://github.com/streamflow-finance/js-sdk/",
"main": "dist/index.js",
Expand Down
10 changes: 9 additions & 1 deletion packages/common/solana/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { AccountInfo, PublicKey } from "@solana/web3.js";
import { AccountInfo, BlockhashWithExpiryBlockHeight, Commitment, Context, PublicKey } from "@solana/web3.js";

export interface ITransactionSolanaExt {
computePrice?: number;
Expand All @@ -23,3 +23,11 @@ export interface AtaParams {
owner: PublicKey;
programId?: PublicKey;
}

export interface ConfirmationParams {
hash: BlockhashWithExpiryBlockHeight;
context: Context;
commitment?: Commitment;
}

export class TransactionFailedError extends Error {}
199 changes: 138 additions & 61 deletions packages/common/solana/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,24 @@ import {
import { SignerWalletAdapter } from "@solana/wallet-adapter-base";
import {
BlockhashWithExpiryBlockHeight,
BlockheightBasedTransactionConfirmationStrategy,
Commitment,
ComputeBudgetProgram,
Connection,
Keypair,
PublicKey,
sendAndConfirmRawTransaction,
Transaction,
TransactionInstruction,
TransactionExpiredBlockheightExceededError,
SignatureStatus,
TransactionMessage,
VersionedTransaction,
Context,
RpcResponseAndContext,
SimulatedTransactionResponse,
SendTransactionError,
} from "@solana/web3.js";
import bs58 from "bs58";

import { Account, AtaParams, ITransactionSolanaExt } from "./types";
import { Account, AtaParams, ConfirmationParams, ITransactionSolanaExt, TransactionFailedError } from "./types";
import { sleep } from "../utils";

/**
Expand Down Expand Up @@ -74,6 +77,15 @@ export function isSignerKeypair(walletOrKeypair: Keypair | SignerWalletAdapter):
);
}

/**
* Utility function to check whether given transaction is Versioned
* @param tx {Transaction | VersionedTransaction} - Transaction to check
* @returns {boolean} - Returns true if transaction is Versioned.
*/
export function isTransactionVersioned(tx: Transaction | VersionedTransaction): tx is VersionedTransaction {
return "message" in tx;
}

/**
* Creates a Transaction with given instructions and optionally signs it.
* @param connection - Solana client connection
Expand All @@ -90,31 +102,36 @@ export async function prepareTransaction(
commitment?: Commitment,
...partialSigners: (Keypair | undefined)[]
): Promise<{
tx: Transaction;
tx: VersionedTransaction;
hash: BlockhashWithExpiryBlockHeight;
context: Context;
}> {
const hash = await connection.getLatestBlockhash(commitment);
const tx = new Transaction({
feePayer: payer,
blockhash: hash.blockhash,
lastValidBlockHeight: hash.lastValidBlockHeight,
}).add(...ixs);

for (const signer of partialSigners) {
if (signer) {
tx.partialSign(signer);
}
}
const { value: hash, context } = await connection.getLatestBlockhashAndContext(commitment);
const messageV0 = new TransactionMessage({
payerKey: payer!,
recentBlockhash: hash.blockhash,
instructions: ixs,
}).compileToV0Message();
const tx = new VersionedTransaction(messageV0);
const signers: Keypair[] = partialSigners.filter((item): item is Keypair => !!item);
tx.sign(signers);

return { tx, hash };
return { tx, context, hash };
}

export async function signTransaction(invoker: Keypair | SignerWalletAdapter, tx: Transaction): Promise<Transaction> {
let signedTx: Transaction;
export async function signTransaction<T extends Transaction | VersionedTransaction>(
invoker: Keypair | SignerWalletAdapter,
tx: T,
): Promise<T> {
let signedTx: T;
if (isSignerWallet(invoker)) {
signedTx = await invoker.signTransaction(tx);
} else {
tx.partialSign(invoker);
if (isTransactionVersioned(tx)) {
tx.sign([invoker]);
} else {
tx.partialSign(invoker);
}
signedTx = tx;
}
return signedTx;
Expand All @@ -125,72 +142,125 @@ export async function signTransaction(invoker: Keypair | SignerWalletAdapter, tx
* @param connection - Solana client connection
* @param invoker - Keypair used as signer
* @param tx - Transaction instance
* @param hash - blockhash information, the same hash should be used in the Transaction
* @param {ConfirmationParams} confirmationParams - Confirmation Params that will be used for execution
* @returns Transaction signature
*/
export async function signAndExecuteTransaction(
connection: Connection,
invoker: Keypair | SignerWalletAdapter,
tx: Transaction,
hash: BlockhashWithExpiryBlockHeight,
tx: Transaction | VersionedTransaction,
confirmationParams: ConfirmationParams,
): Promise<string> {
const signedTx = await signTransaction(invoker, tx);

return executeTransaction(connection, signedTx, hash);
return executeTransaction(connection, signedTx, confirmationParams);
}

/**
* Sends and confirms Transaction
* Confirmation strategy is not 100% reliable here as in times of congestion there can be a case that tx is executed,
* but is not in `commitment` state and so it's not considered executed by the `sendAndConfirmRawTransaction` method,
* and it raises an expiry error even though transaction may be executed soon.
* - so we add additional 50 blocks for checks to account for such issues;
* - also, we check for SignatureStatus one last time as it could be that websocket was slow to respond.
* Uses custom confirmation logic that:
* - simulates tx before sending separately
* - sends transaction without preFlight checks but with some valuable flags https://twitter.com/jordaaash/status/1774892862049800524?s=46&t=bhZ10V0r7IX5Lk5kKzxfGw
* - rebroadcasts a tx every 500 ms
* - after broadcasting check whether tx has executed once
* - catch errors for every actionable item, throw only the ones that signal that tx has failed
* - otherwise there is a chance of marking a landed tx as failed if it was broadcasted at least once
* @param connection - Solana client connection
* @param tx - Transaction instance
* @param hash - blockhash information, the same hash should be used in the Transaction
* @param context - context at which blockhash has been retrieve
* @param commitment - optional commitment that will be used for simulation and confirmation
* @returns Transaction signature
*/
export async function executeTransaction(
connection: Connection,
tx: Transaction,
hash: BlockhashWithExpiryBlockHeight,
tx: Transaction | VersionedTransaction,
{ hash, context, commitment }: ConfirmationParams,
): Promise<string> {
const rawTx = tx.serialize();
if (!hash.lastValidBlockHeight || tx.signatures.length === 0 || !hash.blockhash) {
throw Error("Error with transaction parameters.");
}

for (let i = 0; i < 3; i++) {
let res: RpcResponseAndContext<SimulatedTransactionResponse>;
if (isTransactionVersioned(tx)) {
res = await connection.simulateTransaction(tx);
} else {
res = await connection.simulateTransaction(tx);
}
if (res.value.err) {
const errMessage = JSON.stringify(res.value.err);
if (!errMessage.includes("BlockhashNotFound") || i === 2) {
throw new SendTransactionError("failed to simulate transaction: " + errMessage, res.value.logs || undefined);
}
}
break;
}

if (!hash.lastValidBlockHeight || !tx.signature || !hash.blockhash) throw Error("Error with transaction parameters.");
const isVersioned = isTransactionVersioned(tx);

const signature = bs58.encode(tx.signature);
const confirmationStrategy: BlockheightBasedTransactionConfirmationStrategy = {
lastValidBlockHeight: hash.lastValidBlockHeight + 50,
signature,
blockhash: hash.blockhash,
};
try {
return await sendAndConfirmRawTransaction(connection, rawTx, confirmationStrategy);
} catch (e) {
// If BlockHeight expired, we will check tx status one last time to make sure
if (e instanceof TransactionExpiredBlockheightExceededError) {
await sleep(1000);
let signature: string;
if (isVersioned) {
signature = bs58.encode(tx.signatures[0]);
} else {
signature = bs58.encode(tx.signature!);
}

let blockheight = await connection.getBlockHeight(commitment);
let transactionSent = false;
const rawTransaction = tx.serialize();
while (blockheight < hash.lastValidBlockHeight) {
try {
await connection.sendRawTransaction(rawTransaction, {
maxRetries: 0,
minContextSlot: context.slot,
preflightCommitment: commitment,
skipPreflight: true,
});
transactionSent = true;
} catch (e) {
if (
transactionSent ||
(e instanceof SendTransactionError && e.message.includes("Minimum context slot has not been reached"))
) {
continue;
}
throw e;
}
await sleep(500);
try {
const value = await confirmAndEnsureTransaction(connection, signature);
if (!value) {
if (value) {
return signature;
}
} catch (e) {
if (e instanceof TransactionFailedError) {
throw e;
}
return signature;
await sleep(500);
}

try {
blockheight = await connection.getBlockHeight(commitment);
} catch (_e) {
await sleep(500);
}
throw e;
}

throw new Error(`Transaction ${signature} expired.`);
}

/**
* Confirms and validates transaction success once
* @param connection - Solana client connection
* @param signature - Transaction signature
* @param passError - return status even if tx failed
* @returns Transaction Status
*/
export async function confirmAndEnsureTransaction(
connection: Connection,
signature: string,
passError?: boolean,
): Promise<SignatureStatus | null> {
const response = await connection.getSignatureStatus(signature);
if (!response) {
Expand All @@ -200,9 +270,9 @@ export async function confirmAndEnsureTransaction(
if (!value) {
return null;
}
if (value.err) {
if (!passError && value.err) {
// That's how solana-web3js does it, `err` here is an object that won't really be handled
throw new Error(`Raw transaction ${signature} failed (${JSON.stringify({ err: value.err })})`);
throw new TransactionFailedError(`Raw transaction ${signature} failed (${JSON.stringify({ err: value.err })})`);
}
switch (connection.commitment) {
case "confirmed":
Expand Down Expand Up @@ -278,49 +348,56 @@ export async function enrichAtaParams(connection: Connection, paramsBatch: AtaPa
* @param connection - Solana client connection
* @param payer - Transaction invoker, should be a signer
* @param paramsBatch - Array of Params for an each ATA account: {mint, owner}
* @param commitment - optional commitment that will be used to fetch Blockhash
* @returns Unsigned Transaction with create ATA instructions
*/
export async function generateCreateAtaBatchTx(
connection: Connection,
payer: PublicKey,
paramsBatch: AtaParams[],
commitment?: Commitment,
): Promise<{
tx: Transaction;
tx: VersionedTransaction;
hash: BlockhashWithExpiryBlockHeight;
context: Context;
}> {
paramsBatch = await enrichAtaParams(connection, paramsBatch);
const ixs: TransactionInstruction[] = await Promise.all(
paramsBatch.map(async ({ mint, owner, programId }) => {
return createAssociatedTokenAccountInstruction(payer, await ata(mint, owner), owner, mint, programId);
}),
);
const hash = await connection.getLatestBlockhash();
const tx = new Transaction({
feePayer: payer,
blockhash: hash.blockhash,
lastValidBlockHeight: hash.lastValidBlockHeight,
}).add(...ixs);
return { tx, hash };
const { value: hash, context } = await connection.getLatestBlockhashAndContext({ commitment });
const messageV0 = new TransactionMessage({
payerKey: payer,
recentBlockhash: hash.blockhash,
instructions: ixs,
}).compileToV0Message();
const tx = new VersionedTransaction(messageV0);
return { tx, hash, context };
}

/**
* Creates ATA for an array of owners
* @param connection - Solana client connection
* @param invoker - Transaction invoker and payer
* @param paramsBatch - Array of Params for an each ATA account: {mint, owner}
* @param commitment - optional commitment that will be used to fetch Blockhash
* @returns Transaction signature
*/
export async function createAtaBatch(
connection: Connection,
invoker: Keypair | SignerWalletAdapter,
paramsBatch: AtaParams[],
commitment?: Commitment,
): Promise<string> {
const { tx, hash } = await generateCreateAtaBatchTx(
const { tx, hash, context } = await generateCreateAtaBatchTx(
connection,
invoker.publicKey!,
await enrichAtaParams(connection, paramsBatch),
commitment,
);
return signAndExecuteTransaction(connection, invoker, tx, hash);
return signAndExecuteTransaction(connection, invoker, tx, { hash, context, commitment });
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/distributor/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@streamflow/distributor",
"version": "6.0.3",
"version": "6.1.0",
"description": "JavaScript SDK to interact with Streamflow Airdrop protocol.",
"homepage": "https://github.com/streamflow-finance/js-sdk/",
"main": "dist/index.js",
Expand Down
Loading

0 comments on commit 170947f

Please sign in to comment.