diff --git a/README.md b/README.md index 56dc0d35..691f230b 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ - [Managing cross-chain risk/reward ratio](#managing-cross-chain-riskreward-ratio) - [Reducing transaction finality constraint](#reducing-transaction-finality-constraint) - [Setting a budget for non-finalized orders](#setting-a-budget-for-non-finalized-orders) + - [Delayed fulfillments](#delayed-fulfillments) - [Testing the order execution flow in the wild](#testing-the-order-execution-flow-in-the-wild) - [Restricting orders from fulfillment](#restricting-orders-from-fulfillment) - [Placing new orders](#placing-new-orders) @@ -222,6 +223,72 @@ and there is an accidental flood of 100,000 orders worth $1 occurs, you probably This budged is a hard cap for orders that were not yet finalized after your `dln-taker`'s instance have successfully fulfilled them. As soon as such orders got a finalization status, they got removed effectively releasing the room for other non-finalized orders that can be attempted to be fulfulled. +### Delayed fulfillments + +Sometimes you may be willing to run several instances of dln-taker to reduce risks of managing a full bag of assets under one roof, or create a virtual fault tolerant cluster of `dln-taker`'s in which second instance catches up with the new orders when the first instance runs out money. + +To avoid a race when all of the instances are attempting to fulfill the same order simultaneously and thus burning each other's gas, there is an opt-in feature that allows configuring delayed fulfillments. This gives the ability to make specific instance wait the given amount of time before starting its attempt to fulfill newly arrived order. + +For example, when you want your instance#1 to delay the fulfillment of orders under $1000 by 30s, and all other orders by 60s, you may use `constraints.requiredConfirmationsThresholds[].fulfillmentDelay` and `constraints.fulfillmentDelay` accordingly for each source chain: + +```ts +{ + chain: ChainId.Avalanche, + chainRpc: `${process.env.AVALANCHE_RPC}`, + + constraints: { + requiredConfirmationsThresholds: [ + // expects to receive orders under $1,000 coming from Avalanche as soon as 1 block confirmations, + // and starts processing it after a 30s delay + { + thresholdAmountInUSD: 1000, // USD + minBlockConfirmations: 1, // see transaction finality + fulfillmentDelay: 30 // seconds + }, + + // expects to receive orders under $10,000 coming from Avalanche as soon as 6 block confirmations, + // and starts processing it after a 60s delay (see higher level default value) + { + thresholdAmountInUSD: 10_000, // USD + minBlockConfirmations: 6, // see transaction finality + // default value of fulfillmentDelay is implicitly inherited and is set to 60s + }, + ], + + // optional: start processing orders over >$1,000 coming from Avalanche after a 60s delay + fulfillmentDelay: 60 // seconds + }, +}, +``` + +At the same time, instance#2 may be configured without such timeouts, so if it goes offline or runs out of money, your instance#1 will catch up with the orders missed by instance#2 after the given time frame. + +Additionally, you may be willing to delay orders coming to the specific chain (regardless its source chain). In this case, `dstConstraints` must be used: + +```ts +{ + chain: ChainId.Ethereum, + chainRpc: `${process.env.ETHEREUM_RPC}`, + + dstConstraints: { + perOrderValueUpperThreshold: [ + // start processing all orders under $1,000 coming to Ethereum (from any supported chain) after a 30s delay, + // regardless of constraints specified for the supported chains + { + upperThreshold: 1000, // USD + fulfillmentDelay: 30 // seconds + }, + ], + + // optional: start processing orders over >$1,000 coming to ethereum (from any supported chain) after a 40s delay, + // regardless of constraints specified for the supported chains + fulfillmentDelay: 40 // seconds + }, +}, +``` + +Mind that `dstConstraints` property has precedence over `constraints`. + ## Testing the order execution flow in the wild After you have set up and launched `dln-taker`, you may wish to give it a try in a limited conditions. diff --git a/package-lock.json b/package-lock.json index 9e232550..266f4fdd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@debridge-finance/dln-executor", - "version": "2.4.0", + "version": "2.5.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@debridge-finance/dln-executor", - "version": "2.4.0", + "version": "2.5.0", "license": "GPL-3.0-only", "dependencies": { "@debridge-finance/dln-client": "5.2.0", diff --git a/package.json b/package.json index 33622fdb..8199d669 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@debridge-finance/dln-executor", "description": "DLN executor is the rule-based daemon service developed to automatically execute orders placed on the deSwap Liquidity Network (DLN) across supported blockchains", - "version": "2.4.0", + "version": "2.5.0", "author": "deBridge", "license": "GPL-3.0-only", "homepage": "https://debridge.finance", diff --git a/src/config.ts b/src/config.ts index e9b75caa..47cedaf8 100644 --- a/src/config.ts +++ b/src/config.ts @@ -81,6 +81,22 @@ export type ChainEnvironment = { }; }; +export type DstOrderConstraints = { + /** + * Defines a delay (in seconds) the dln-taker should wait before starting to process each new (non-archival) order + * coming to this chain after it first saw it. + */ + fulfillmentDelay?: number; +} + +export type SrcOrderConstraints = { + /** + * Defines a delay (in seconds) the dln-taker should wait before starting to process each new (non-archival) order + * coming from this chain after it first saw it. + */ + fulfillmentDelay?: number; +} + /** * Represents a chain configuration where orders can be fulfilled. */ @@ -113,7 +129,7 @@ export interface ChainDefinition { /** * Defines constraints imposed on all orders coming from this chain */ - constraints?: { + constraints?: SrcOrderConstraints & { /** * Defines necessary and sufficient block confirmation thresholds per worth of order expressed in dollars. * For example, you may want to fulfill orders coming from Ethereum: @@ -129,7 +145,10 @@ export interface ChainDefinition { * ] * ``` */ - requiredConfirmationsThresholds?: Array<{thresholdAmountInUSD: number, minBlockConfirmations: number}>; + requiredConfirmationsThresholds?: Array; /** * Defines a budget (a hard cap) of all successfully fulfilled orders' value (expressed in USD) that @@ -143,7 +162,21 @@ export interface ChainDefinition { * one by one as soon as fulfilled orders are being finalized. */ nonFinalizedTVLBudget?: number; - } + }, + + /** + * Defines constraints imposed on all orders coming to this chain. These properties have precedence over `constraints` property + */ + dstConstraints?: DstOrderConstraints & { + /** + * Defines custom constraints for orders falling into the given upper thresholds expressed in US dollars. + * + * Mind that these constraints have precedence over higher order constraints + */ + perOrderValueUpperThreshold?: Array + }, // // taker related diff --git a/src/executors/executor.ts b/src/executors/executor.ts index 9cd89ff9..a1883102 100644 --- a/src/executors/executor.ts +++ b/src/executors/executor.ts @@ -29,7 +29,7 @@ import { ProviderAdapter } from "../providers/provider.adapter"; import { SolanaProviderAdapter } from "../providers/solana.provider.adapter"; import { HooksEngine } from "../hooks/HooksEngine"; import { NonFinalizedOrdersBudgetController } from "../processors/NonFinalizedOrdersBudgetController"; - +import { DstOrderConstraints as RawDstOrderConstraints, SrcOrderConstraints as RawSrcOrderConstraints } from "../config"; const BLOCK_CONFIRMATIONS_HARD_CAPS: { [key in SupportedChain]: number } = { [SupportedChain.Avalanche]: 15, @@ -49,25 +49,45 @@ export type ExecutorInitializingChain = Readonly<{ client: Solana.PmmClient | Evm.PmmEvmClient; }>; -type UsdWorthBlockConfirmationConstraints = Array<{ - usdWorthFrom: number, - usdWorthTo: number, - minBlockConfirmations: number, -}>; +type DstOrderConstraints = Readonly<{ + fulfillmentDelay: number; +}> + +type DstConstraintsPerOrderValue = Array< + DstOrderConstraints & Readonly<{ + upperThreshold: number; + }> +>; + +type SrcOrderConstraints = Readonly<{ + fulfillmentDelay: number; +}> -export type ExecutorSupportedChain = { +type SrcConstraintsPerOrderValue = Array< + SrcOrderConstraints & Readonly<{ + upperThreshold: number; + minBlockConfirmations: number; + }> +>; + +export type ExecutorSupportedChain = Readonly<{ chain: ChainId; chainRpc: string; srcFilters: OrderFilter[]; dstFilters: OrderFilter[]; - usdAmountConfirmations: UsdWorthBlockConfirmationConstraints; nonFinalizedOrdersBudgetController: NonFinalizedOrdersBudgetController; + srcConstraints: Readonly, + dstConstraints: Readonly, orderProcessor: processors.IOrderProcessor; unlockProvider: ProviderAdapter; fulfillProvider: ProviderAdapter; beneficiary: string; client: Solana.PmmClient | Evm.PmmEvmClient; -}; +}>; export interface IExecutor { readonly tokenPriceService: PriceTokenService; @@ -259,8 +279,15 @@ export class Executor implements IExecutor { this.logger ), client, - usdAmountConfirmations: this.getConfirmationRanges(chain.chain as unknown as SupportedChain, chain), beneficiary: chain.beneficiary, + srcConstraints: { + ...this.getSrcConstraints(chain.constraints || {}), + perOrderValue: this.getSrcConstraintsPerOrderValue(chain.chain as unknown as SupportedChain, chain.constraints || {}) + }, + dstConstraints: { + ...this.getDstConstraints(chain.dstConstraints || {}), + perOrderValue: this.getDstConstraintsPerOrderValue(chain.dstConstraints || {}), + } }; clients[chain.chain] = client; @@ -284,10 +311,14 @@ export class Executor implements IExecutor { address: chain.unlockProvider.address as string, }; }); - const minConfirmationThresholds = Object.values(this.chains).map(chain => ({ - chainId: chain.chain, - points: chain.usdAmountConfirmations.map(t => t.minBlockConfirmations) - })) + const minConfirmationThresholds = Object.values(this.chains) + .map(chain => ({ + chainId: chain.chain, + points: chain.srcConstraints.perOrderValue + .map(t => t.minBlockConfirmations) + .filter(t => t > 0) // skip empty block confirmations + })) + .filter(range => range.points.length > 0); // skip chains without necessary confirmation points orderFeed.init(this.execute.bind(this), unlockAuthorities, minConfirmationThresholds, hooksEngine); // Override internal slippage calculation: do not reserve slippage buffer for pre-fulfill swap @@ -296,29 +327,43 @@ export class Executor implements IExecutor { this.isInitialized = true; } - private getConfirmationRanges(chain: SupportedChain, definition: ChainDefinition): UsdWorthBlockConfirmationConstraints { - const ranges: UsdWorthBlockConfirmationConstraints = []; - const requiredConfirmationsThresholds = definition.constraints?.requiredConfirmationsThresholds || []; - requiredConfirmationsThresholds - .sort((a, b) => a.thresholdAmountInUSD < b.thresholdAmountInUSD ? -1 : 1) // sort by usdWorth ASC - .forEach((threshold, index, thresholdsSortedByUsdWorth) => { - const prev = index === 0 ? {minBlockConfirmations: 0, thresholdAmountInUSD: 0} : thresholdsSortedByUsdWorth[index - 1]; + private getDstConstraintsPerOrderValue(configDstConstraints: ChainDefinition['dstConstraints']): DstConstraintsPerOrderValue { + return (configDstConstraints?.perOrderValueUpperThreshold || []) + .map(constraint => ({ + upperThreshold: constraint.upperThreshold, + ...this.getDstConstraints(constraint, configDstConstraints) + })) + // important to sort by upper bound ASC for easier finding of the corresponding range + .sort((constraintA, constraintB) => constraintA.upperThreshold - constraintB.upperThreshold); + } - if (threshold.minBlockConfirmations <= prev.minBlockConfirmations) { - throw new Error(`Unable to set required confirmation threshold for $${threshold.thresholdAmountInUSD} on ${SupportedChain[chain]}: minBlockConfirmations (${threshold.minBlockConfirmations}) must be greater than ${prev.minBlockConfirmations}`) - } - if (BLOCK_CONFIRMATIONS_HARD_CAPS[chain] <= threshold.minBlockConfirmations) { - throw new Error(`Unable to set required confirmation threshold for $${threshold.thresholdAmountInUSD} on ${SupportedChain[chain]}: minBlockConfirmations (${threshold.minBlockConfirmations}) must be less than max block confirmations (${BLOCK_CONFIRMATIONS_HARD_CAPS[chain]})`) + private getDstConstraints(primaryConstraints: RawDstOrderConstraints, defaultConstraints?: RawDstOrderConstraints): DstOrderConstraints { + return { + fulfillmentDelay: primaryConstraints?.fulfillmentDelay || defaultConstraints?.fulfillmentDelay || 0 + } + } + + private getSrcConstraintsPerOrderValue(chain: SupportedChain, configDstConstraints: ChainDefinition['constraints']): SrcConstraintsPerOrderValue { + return (configDstConstraints?.requiredConfirmationsThresholds || []) + .map(constraint => { + if (BLOCK_CONFIRMATIONS_HARD_CAPS[chain] <= (constraint.minBlockConfirmations || 0)) { + throw new Error(`Unable to set required confirmation threshold for $${constraint.thresholdAmountInUSD} on ${SupportedChain[chain]}: minBlockConfirmations (${constraint.minBlockConfirmations}) must be less than max block confirmations (${BLOCK_CONFIRMATIONS_HARD_CAPS[chain]})`); } - ranges.push({ - usdWorthFrom: prev.thresholdAmountInUSD, - usdWorthTo: threshold.thresholdAmountInUSD, - minBlockConfirmations: threshold.minBlockConfirmations - }) - }); + return { + upperThreshold: constraint.thresholdAmountInUSD, + minBlockConfirmations: constraint.minBlockConfirmations || 0, + ...this.getSrcConstraints(constraint, configDstConstraints) + } + }) + // important to sort by upper bound ASC for easier finding of the corresponding range + .sort((constraintA, constraintB) => constraintA.upperThreshold - constraintB.upperThreshold); + } - return ranges; + private getSrcConstraints(primaryConstraints: RawSrcOrderConstraints, defaultConstraints?: RawSrcOrderConstraints): SrcOrderConstraints { + return { + fulfillmentDelay: primaryConstraints?.fulfillmentDelay || defaultConstraints?.fulfillmentDelay || 0 + } } async execute(nextOrderInfo: IncomingOrder) { @@ -410,8 +455,8 @@ export class Executor implements IExecutor { logger, config: this, giveChain, + takeChain }, - attempts: 0 }); return true; diff --git a/src/hooks/HookEnums.ts b/src/hooks/HookEnums.ts index 723e27e5..35f9777c 100644 --- a/src/hooks/HookEnums.ts +++ b/src/hooks/HookEnums.ts @@ -49,6 +49,11 @@ export enum PostponingReason { * Unexpected error */ UNHANDLED_ERROR, + + /** + * indicates that this order is forcibly delayed according to this dln-takers instance configuration + */ + FORCED_DELAY, } export enum RejectionReason { diff --git a/src/interfaces.ts b/src/interfaces.ts index 02555400..b479ef3f 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -1,7 +1,7 @@ import { ChainId, OrderData } from "@debridge-finance/dln-client"; import { Logger } from "pino"; -import { OrderProcessorContext } from "./processors/base"; +import { OrderId, OrderProcessorContext } from "./processors/base"; import { HooksEngine } from "./hooks/HooksEngine"; export enum OrderInfoStatus { @@ -35,12 +35,11 @@ export type IncomingOrder = { ) & (T extends OrderInfoStatus.Fulfilled ? { unlockAuthority: string } : {} ) & (T extends OrderInfoStatus.Created ? { finalization_info: FinalizationInfo } : {}) -export type ProcessOrder = (params: IncomingOrderContext) => Promise; +export type ProcessOrder = (orderId: OrderId) => Promise; export type IncomingOrderContext = { orderInfo: IncomingOrder; context: OrderProcessorContext; - attempts: number; }; export type OrderProcessorFunc = (order: IncomingOrder) => Promise; diff --git a/src/processors/base.ts b/src/processors/base.ts index 0216cadd..9c85115d 100644 --- a/src/processors/base.ts +++ b/src/processors/base.ts @@ -16,10 +16,13 @@ import { import { IncomingOrderContext } from "../interfaces"; import { HooksEngine } from "../hooks/HooksEngine"; +export type OrderId = string; + export class OrderProcessorContext { logger: Logger; config: IExecutor; giveChain: ExecutorSupportedChain; + takeChain: ExecutorSupportedChain; } export class OrderProcessorInitContext { diff --git a/src/processors/mempool.service.ts b/src/processors/mempool.service.ts index 83e0e4a4..e45ec726 100644 --- a/src/processors/mempool.service.ts +++ b/src/processors/mempool.service.ts @@ -1,18 +1,19 @@ import { Logger } from "pino"; -import { IncomingOrderContext, ProcessOrder } from "../interfaces"; +import { ProcessOrder } from "../interfaces"; import { setTimeout } from 'timers/promises' +import { OrderId } from "./base"; export class MempoolService { - private readonly logger: Logger; - private readonly orderParams = new Map(); + readonly #logger: Logger; + readonly #trackedOrders = new Set(); constructor( logger: Logger, private readonly processOrderFunction: ProcessOrder, private readonly maxReprocessDelay: number, private readonly delayStep: number = 30 ) { - this.logger = logger.child({ service: "MempoolService" }); + this.#logger = logger.child({ service: "MempoolService" }); } private getDelayPromise(delay: number) { @@ -25,20 +26,19 @@ export class MempoolService { * @param params * @param triggerOrDelay */ - addOrder(params: IncomingOrderContext, triggerOrDelay?: Promise | number) { - const orderId = params.orderInfo.orderId; - this.orderParams.set(orderId, params); + addOrder(orderId: OrderId, triggerOrDelay?: Promise | number, attempt: number = 0) { + const orderLogger = this.#logger.child({ orderId }); - // logging from the order's context - params.context.logger.debug("added to mempool"); + if (this.#trackedOrders.has(orderId)) { + orderLogger.debug("already present in the mempool, not adding again"); + return; + } - // logging from the service's context - this.logger.debug( - `current mempool size: ${this.orderParams.size} order(s)` - ); + this.#trackedOrders.add(orderId); + orderLogger.debug(`added to mempool, new mempool size: ${this.#trackedOrders.size} order(s)`); const promiseStartTime = new Date() - const maxTimeoutPromise = this.getDelayPromise(this.maxReprocessDelay + (this.delayStep * params.attempts)) + const maxTimeoutPromise = this.getDelayPromise(this.maxReprocessDelay + (this.delayStep * attempt)) if (triggerOrDelay && typeof triggerOrDelay === 'number') triggerOrDelay = this.getDelayPromise(triggerOrDelay); @@ -48,26 +48,27 @@ export class MempoolService { trigger .catch((reason) => { - params.context.logger.error(`mempool promise triggered error: ${reason}`) - params.context.logger.error(reason); + orderLogger.error(`mempool promise triggered error: ${reason}`) + orderLogger.error(reason); }) .finally(() => { const settlementTime = new Date(); const waitingTime = (settlementTime.getTime() - promiseStartTime.getTime()) / 1000; - params.context.logger.debug(`mempool promise triggered after ${waitingTime}s`) - if (this.orderParams.has(orderId)) { - params.context.logger.debug(`invoking order processing routine`) - this.orderParams.delete(orderId); - params.attempts++; - this.processOrderFunction(params); + orderLogger.debug(`mempool promise triggered after ${waitingTime}s`) + if (this.#trackedOrders.has(orderId)) { + orderLogger.debug(`invoking order processing routine`) + this.#trackedOrders.delete(orderId); + this.processOrderFunction(orderId); } else { - params.context.logger.debug(`order does not exist in the mempool`) + orderLogger.debug(`order does not exist in the mempool`) } }) } delete(orderId: string) { - this.orderParams.delete(orderId); + this.#trackedOrders.delete(orderId); + this.#logger.child({orderId}) + .debug("order has been removed from the mempool") } } diff --git a/src/processors/universal.ts b/src/processors/universal.ts index 4478f72b..1f33f8fb 100644 --- a/src/processors/universal.ts +++ b/src/processors/universal.ts @@ -25,7 +25,7 @@ import { createClientLogger } from "../logger"; import { EvmProviderAdapter, Tx } from "../providers/evm.provider.adapter"; import { SolanaProviderAdapter } from "../providers/solana.provider.adapter"; -import { BaseOrderProcessor, OrderProcessorContext, OrderProcessorInitContext, OrderProcessorInitializer } from "./base"; +import { BaseOrderProcessor, OrderId, OrderProcessorContext, OrderProcessorInitContext, OrderProcessorInitializer } from "./base"; import { BatchUnlocker } from "./BatchUnlocker"; import { MempoolService } from "./mempool.service"; import { PostponingReason, RejectionReason } from "../hooks/HookEnums"; @@ -43,7 +43,7 @@ const EVM_FULFILL_GAS_MULTIPLIER = 1.25; // bump until const EVM_FULFILL_GAS_PRICE_MULTIPLIER = 1.3; -// dummy slippage used before any estimetions are performed, this is needed only for estimation purposes +// dummy slippage used before any estimations are performed, this is needed only for estimation purposes const DUMMY_SLIPPAGE_BPS = 400; // 4% export type UniversalProcessorParams = { @@ -84,14 +84,23 @@ export type UniversalProcessorParams = { preFulfillSwapMaxAllowedSlippageBps: number; }; +// Represents all necessary information about Created order during its internal lifecycle +type CreatedOrderMetadata = { + readonly orderId: OrderId, + readonly arrivedAt: Date, + attempts: number, + context: IncomingOrderContext +}; + class UniversalProcessor extends BaseOrderProcessor { private mempoolService: MempoolService; - private priorityQueue = new Set(); // queue of orderid for processing created order - private queue = new Set(); // queue of orderid for retry processing order - private incomingOrdersMap = new Map(); // key orderid, contains incoming order from order feed + private priorityQueue = new Set(); // queue of orderid for processing created order + private queue = new Set(); // queue of orderid for retry processing order private isLocked: boolean = false; private batchUnlocker: BatchUnlocker; + readonly #createdOrdersMetadata = new Map() + private params: UniversalProcessorParams = { minProfitabilityBps: 4, mempoolInterval: 60, @@ -135,7 +144,7 @@ class UniversalProcessor extends BaseOrderProcessor { this.mempoolService = new MempoolService( logger.child({ takeChainId: chainId }), - this.process.bind(this), + this.tryProcess.bind(this), this.params.mempoolInterval ); @@ -179,11 +188,25 @@ class UniversalProcessor extends BaseOrderProcessor { }); switch (orderInfo.status) { - case OrderInfoStatus.ArchivalCreated: - case OrderInfoStatus.Created: { - // must remove this order from all queues bc new order can be an updated version - this.incomingOrdersMap.set(orderInfo.orderId, params); - return this.tryProcess(orderInfo.orderId); + case OrderInfoStatus.Created: + case OrderInfoStatus.ArchivalCreated: { + + if (!this.#createdOrdersMetadata.has(orderId)) { + this.#createdOrdersMetadata.set(orderId, { + orderId, + arrivedAt: new Date(), + attempts: 0, + context: params + }) + } + + // dequeue everything? right now I don't see any possible side effect of not dequeueing + // this.clearInternalQueues(orderId); + + // override order params because there can be refreshed data (patches, newer confirmations, etc) + this.#createdOrdersMetadata.get(orderId)!.context = params; + + return this.tryProcess(orderId); } case OrderInfoStatus.ArchivalFulfilled: { this.batchUnlocker.unlockOrder(orderId, orderInfo.order, context); @@ -191,13 +214,17 @@ class UniversalProcessor extends BaseOrderProcessor { } case OrderInfoStatus.Cancelled: { this.clearInternalQueues(orderId); + this.clearOrderStore(orderId); context.logger.debug(`deleted from queues`); return; } case OrderInfoStatus.Fulfilled: { + context.giveChain.nonFinalizedOrdersBudgetController.removeOrder(orderId); + this.clearInternalQueues(orderId); + this.clearOrderStore(orderId); context.logger.debug(`deleted from queues`); - context.giveChain.nonFinalizedOrdersBudgetController.removeOrder(orderId); + this.batchUnlocker.unlockOrder(orderId, orderInfo.order, context); return; } @@ -213,13 +240,16 @@ class UniversalProcessor extends BaseOrderProcessor { private clearInternalQueues(orderId: string): void { this.queue.delete(orderId); this.priorityQueue.delete(orderId); - this.incomingOrdersMap.delete(orderId) this.mempoolService.delete(orderId); } + private clearOrderStore(orderId: string): void { + this.#createdOrdersMetadata.delete(orderId) + } + private async tryProcess(orderId: string): Promise { - const params = this.incomingOrdersMap.get(orderId); - if (!params) throw new Error("Unexpected: missing data for order"); + const metadata = this.getCreatedOrderMetadata(orderId); + const params = metadata.context const logger = params.context.logger; const orderInfo = params.orderInfo; @@ -252,37 +282,25 @@ class UniversalProcessor extends BaseOrderProcessor { // process this order this.isLocked = true; try { - await this.processOrder(orderId); + await this.processOrder(metadata); } catch (e) { const message = `processing order failed with an unhandled error: ${e}`; logger.error(message); logger.error(e); - const params = this.incomingOrdersMap.get(orderId); - if (params) { - const { context, orderInfo } = params; - this.hooksEngine.handleOrderPostponed({ - order: orderInfo, - context, - reason: PostponingReason.UNHANDLED_ERROR, - attempts: params.attempts, - message, - }); - this.mempoolService.addOrder(params); - } else { - logger.debug(`order data is not presented in the map`); - } + this.postponeOrder(metadata, message, PostponingReason.UNHANDLED_ERROR, true); } + metadata.attempts++; this.isLocked = false; // forward to the next order // TODO try to get rid of recursion here. Use setInterval? - const nextOrder = this.pickNextOrder(); - if (nextOrder) { - this.tryProcess(nextOrder); + const nextOrderId = this.pickNextOrderId(); + if (nextOrderId) { + this.tryProcess(nextOrderId); } } - private pickNextOrder(): string | undefined { + private pickNextOrderId(): OrderId | undefined { const nextOrderId = this.priorityQueue.values().next().value || this.queue.values().next().value; @@ -295,10 +313,58 @@ class UniversalProcessor extends BaseOrderProcessor { } } - private async processOrder(orderId: string): Promise { - const params = this.incomingOrdersMap.get(orderId); - if (!params) throw new Error("Unexpected: missing data for order"); - const { context, orderInfo } = params; + // gets the amount of sec to additionally wait until this order can be processed + private getOrderRemainingDelay(firstSeen: Date, delay: number): number { + if (delay > 0) { + const delayMs = delay * 1000; + + const orderKnownFor = new Date().getTime() - firstSeen.getTime(); + + if (delayMs > orderKnownFor) { + return (delayMs - orderKnownFor) / 1000 + } + } + + return 0; + } + + private getCreatedOrderMetadata(orderId: OrderId): CreatedOrderMetadata { + if (!this.#createdOrdersMetadata.has(orderId)) throw new Error(`Unexpected: missing created order data`) + return this.#createdOrdersMetadata.get(orderId)!; + } + + private postponeOrder(metadata: CreatedOrderMetadata, message: string, reason: PostponingReason, addToMempool: boolean = true, remainingDelay?: number) { + const { attempts, context: { context, orderInfo } } = metadata; + + context.logger.info(message); + this.hooksEngine.handleOrderPostponed({ + order: orderInfo, + context, + message, + reason, + attempts, + }); + + if (addToMempool) + this.mempoolService.addOrder(metadata.orderId, remainingDelay, attempts) + } + + private rejectOrder(metadata: CreatedOrderMetadata, message: string, reason: RejectionReason) { + const { attempts, context: { context, orderInfo } } = metadata; + + context.logger.info(message); + this.hooksEngine.handleOrderRejected({ + order: orderInfo, + context, + message, + reason, + attempts, + }); + } + + private async processOrder(metadata: CreatedOrderMetadata): Promise { + const { context, orderInfo } = metadata.context; + const orderId = orderInfo.orderId; const logger = context.logger; const bucket = context.config.buckets.find( @@ -308,15 +374,7 @@ class UniversalProcessor extends BaseOrderProcessor { ); if (bucket === undefined) { const message = `no bucket found to cover order's give token: ${tokenAddressToString(orderInfo.order.give.chainId, orderInfo.order.give.tokenAddress)}`; - logger.info(message); - this.hooksEngine.handleOrderRejected({ - order: orderInfo, - reason: RejectionReason.UNEXPECTED_GIVE_TOKEN, - context, - attempts: params.attempts, - message - }); - return; + return this.rejectOrder(metadata, message, RejectionReason.UNEXPECTED_GIVE_TOKEN); } // calculate USD worth of order @@ -344,20 +402,29 @@ class UniversalProcessor extends BaseOrderProcessor { // compare worthiness of the order against block confirmation thresholds if (orderInfo.status == OrderInfoStatus.Created) { + // find corresponding srcConstraints + const srcConstraintsByValue = context.giveChain.srcConstraints.perOrderValue.find(srcConstraints => usdWorth <= srcConstraints.upperThreshold); + const srcConstraints = srcConstraintsByValue || context.giveChain.srcConstraints; + + // find corresponding dstConstraints (they may supersede srcConstraints) + const dstConstraintsByValue = + context.takeChain.dstConstraints.perOrderValue.find(dstConstraints => usdWorth <= dstConstraints.upperThreshold) + || context.takeChain.dstConstraints; + + // determine if we should postpone the order + const fulfillmentDelay = dstConstraintsByValue.fulfillmentDelay || srcConstraints.fulfillmentDelay; + const remainingDelay = this.getOrderRemainingDelay(metadata.arrivedAt, fulfillmentDelay); + if (remainingDelay > 0) { + const message = `order should be delayed by ${remainingDelay}s (why: fulfillment delay is set to ${fulfillmentDelay}s)`; + return this.postponeOrder(metadata, message, PostponingReason.FORCED_DELAY, true, remainingDelay); + } + const finalizationInfo = (orderInfo as IncomingOrder).finalization_info; if (finalizationInfo == 'Revoked') { this.clearInternalQueues(orderInfo.orderId); const message = 'order has been revoked by the order feed due to chain reorganization'; - logger.info(message); - this.hooksEngine.handleOrderRejected({ - order: orderInfo, - reason: RejectionReason.REVOKED, - attempts: params.attempts, - context, - message, - }); - return; + return this.rejectOrder(metadata, message, RejectionReason.REVOKED); } else if ('Confirmed' in finalizationInfo) { // we don't rely on ACTUAL finality (which can be retrieved from dln-taker's RPC node) @@ -369,37 +436,16 @@ class UniversalProcessor extends BaseOrderProcessor { // ensure we can afford fulfilling this order and thus increasing our TVL if (!context.giveChain.nonFinalizedOrdersBudgetController.isFitsBudget(orderId, usdWorth)) { const message = 'order does not fit the budget, rejecting'; - logger.info(message); - this.hooksEngine.handleOrderRejected({ - order: orderInfo, - reason: RejectionReason.NON_FINALIZED_ORDERS_BUDGET_EXCEEDED, - attempts: params.attempts, - context, - message, - }); - return; + return this.rejectOrder(metadata, message, RejectionReason.NON_FINALIZED_ORDERS_BUDGET_EXCEEDED) } - // find appropriate range corresponding to this USD worth - const range = context.giveChain.usdAmountConfirmations.find( - usdWorthRange => usdWorthRange.usdWorthFrom < usdWorth && usdWorth <= usdWorthRange.usdWorthTo - ); - // range found, ensure current block confirmation >= expected - if (range?.minBlockConfirmations) { - logger.debug(`usdAmountConfirmationRange found: (${range.usdWorthFrom}, ${range.usdWorthTo}]`) - - if (announcedConfirmation < range.minBlockConfirmations) { - const message = `announced block confirmations (${ announcedConfirmation }) is less than the block confirmation constraint (${range.minBlockConfirmations} for order worth of $${usdWorth.toFixed(2)}`; - logger.info(message) - this.hooksEngine.handleOrderRejected({ - order: orderInfo, - reason: RejectionReason.NOT_ENOUGH_BLOCK_CONFIRMATIONS_FOR_ORDER_WORTH, - attempts: params.attempts, - context, - message, - }); - return; + if (srcConstraintsByValue?.minBlockConfirmations) { + logger.debug(`usdAmountConfirmationRange found: <=$${srcConstraintsByValue.upperThreshold}`) + + if (announcedConfirmation < srcConstraintsByValue.minBlockConfirmations) { + const message = `announced block confirmations (${ announcedConfirmation }) is less than the block confirmation constraint (${srcConstraintsByValue.minBlockConfirmations} for order worth of $${usdWorth.toFixed(2)}`; + return this.rejectOrder(metadata, message, RejectionReason.NOT_ENOUGH_BLOCK_CONFIRMATIONS_FOR_ORDER_WORTH) } else { logger.debug("accepting order for execution") @@ -407,15 +453,7 @@ class UniversalProcessor extends BaseOrderProcessor { } else { // range not found: we do not accept this order, let it come finalized const message = `non-finalized order worth of $${usdWorth.toFixed(2)} is not covered by any custom block confirmation range`; - logger.debug(message); - this.hooksEngine.handleOrderRejected({ - order: orderInfo, - reason: RejectionReason.NOT_YET_FINALIZED, - context, - attempts: params.attempts, - message, - }); - return; + return this.rejectOrder(metadata, message, RejectionReason.NOT_YET_FINALIZED); } } @@ -439,15 +477,7 @@ class UniversalProcessor extends BaseOrderProcessor { takeOrderStatus?.status !== undefined ) { const message = `order is already handled on the take chain (${ ChainId[ orderInfo.order.take.chainId ] }), actual status: ${takeOrderStatus?.status}`; - logger.info(message); - this.hooksEngine.handleOrderRejected({ - order: orderInfo, - reason: RejectionReason.ALREADY_FULFILLED_OR_CANCELLED, - context, - attempts: params.attempts, - message, - }); - return; + return this.rejectOrder(metadata, message, RejectionReason.ALREADY_FULFILLED_OR_CANCELLED) } // validate that order is created @@ -459,28 +489,12 @@ class UniversalProcessor extends BaseOrderProcessor { if (giveOrderStatus?.status === undefined) { const message = `order does not exist on the give chain (${ChainId[orderInfo.order.give.chainId]})`; - logger.info(message); - this.hooksEngine.handleOrderRejected({ - order: orderInfo, - reason: RejectionReason.MISSING, - context, - attempts: params.attempts, - message - }); - return; + return this.rejectOrder(metadata, message, RejectionReason.MISSING) } if (giveOrderStatus?.status !== OrderState.Created) { const message = `order has unexpected give status (${giveOrderStatus?.status}) on the give chain (${ChainId[ orderInfo.order.give.chainId]})`; - logger.info(message); - this.hooksEngine.handleOrderRejected({ - order: orderInfo, - reason: RejectionReason.UNEXPECTED_GIVE_STATUS, - attempts: params.attempts, - context, - message, - }); - return; + return this.rejectOrder(metadata, message, RejectionReason.UNEXPECTED_GIVE_STATUS); } // perform rough estimation: assuming order.give.amount is what we need on balance @@ -504,17 +518,7 @@ class UniversalProcessor extends BaseOrderProcessor { `actual balance: ${new BigNumber(accountReserveBalance).div(BigNumber(10).pow(reserveDstTokenDecimals))}, `, `but expected ${new BigNumber(roughReserveDstAmount).div(BigNumber(10).pow(roughReserveDstDecimals))}` ].join(''); - logger.info(message); - this.hooksEngine.handleOrderPostponed({ - order: orderInfo, - context, - message, - reason: PostponingReason.NOT_ENOUGH_BALANCE, - attempts: params.attempts, - }); - if (isFinalizedOrder) - this.mempoolService.addOrder(params); - return; + return this.postponeOrder(metadata, message, PostponingReason.NOT_ENOUGH_BALANCE, isFinalizedOrder) } logger.debug(`enough balance (${accountReserveBalance.toString()}) to cover order (${roughReserveDstAmount.toString()})`) @@ -586,16 +590,7 @@ class UniversalProcessor extends BaseOrderProcessor { logger.error(message); logger.error(e); } - this.hooksEngine.handleOrderPostponed({ - order: orderInfo, - context, - reason: PostponingReason.FULFILLMENT_EVM_TX_PREESTIMATION_FAILED, - message, - attempts: params.attempts, - }); - if (isFinalizedOrder) - this.mempoolService.addOrder(params); - return; + return this.postponeOrder(metadata, message, PostponingReason.FULFILLMENT_EVM_TX_PREESTIMATION_FAILED, isFinalizedOrder); } } @@ -668,16 +663,7 @@ class UniversalProcessor extends BaseOrderProcessor { ].join(""); } logger.info(`order is not profitable: ${message}`); - this.hooksEngine.handleOrderPostponed({ - order: orderInfo, - context, - message, - reason: PostponingReason.NOT_PROFITABLE, - attempts: params.attempts, - }); - if (isFinalizedOrder) - this.mempoolService.addOrder(params); - return; + return this.postponeOrder(metadata, message, PostponingReason.NOT_PROFITABLE, isFinalizedOrder); } if (!buffersAreEqual(reserveDstToken, pickedBucket.reserveDstToken)) { @@ -706,39 +692,20 @@ while calculateExpectedTakeAmount returned ${tokenAddressToString(orderInfo.orde logger.debug(`final fulfill tx gas estimation: ${evmFulfillGas}`) if (evmFulfillGas > evmFulfillGasLimit!) { const message = `final fulfill tx requires more gas units (${evmFulfillGas}) than it was declared during pre-estimation (${evmFulfillGasLimit})`; - logger.info(message) - this.hooksEngine.handleOrderPostponed({ - order: orderInfo, - context, - message, - reason: PostponingReason.FULFILLMENT_EVM_TX_ESTIMATION_EXCEEDED_PREESTIMATION, - attempts: params.attempts, - }); - - if (isFinalizedOrder) { - // reprocess order after 5s delay, but no more than two times in a row - const maxFastTrackAttempts = 2; // attempts - const fastTrackDelay = 5; // seconds - this.mempoolService.addOrder(params, params.attempts < maxFastTrackAttempts ? fastTrackDelay : undefined); - } - return; + // reprocess order after 5s delay, but no more than two times in a row + const maxFastTrackAttempts = 2; // attempts + const fastTrackDelay = 5; // seconds + const delay = metadata.attempts <= maxFastTrackAttempts ? fastTrackDelay : undefined; + + return this.postponeOrder(metadata, message, PostponingReason.FULFILLMENT_EVM_TX_ESTIMATION_EXCEEDED_PREESTIMATION, isFinalizedOrder, delay) } } catch (e) { const message = `unable to estimate fullfil tx: ${e}`; logger.error(message) logger.error(e); - this.hooksEngine.handleOrderPostponed({ - order: orderInfo, - context, - message, - reason: PostponingReason.FULFILLMENT_EVM_TX_ESTIMATION_FAILED, - attempts: params.attempts, - }); - if (isFinalizedOrder) - this.mempoolService.addOrder(params); - return; + return this.postponeOrder(metadata, message, PostponingReason.FULFILLMENT_EVM_TX_ESTIMATION_FAILED, isFinalizedOrder); } (fulfillTx as Tx).gas = evmFulfillGasLimit; @@ -767,19 +734,14 @@ while calculateExpectedTakeAmount returned ${tokenAddressToString(orderInfo.orde const message = `fulfill transaction failed: ${e}`; logger.error(message); logger.error(e); - this.hooksEngine.handleOrderPostponed({ - order: orderInfo, - context, - reason: isRevertedError(e as Error) - ? PostponingReason.FULFILLMENT_TX_REVERTED - : PostponingReason.FULFILLMENT_TX_FAILED, + return this.postponeOrder( + metadata, message, - attempts: params.attempts, - }); - - if (isFinalizedOrder) - this.mempoolService.addOrder(params); - return; + isRevertedError(e as Error) + ? PostponingReason.FULFILLMENT_TX_REVERTED + : PostponingReason.FULFILLMENT_TX_FAILED, + isFinalizedOrder + ); } await this.waitIsOrderFulfilled(orderInfo.orderId, orderInfo.order, context, logger);