diff --git a/package-lock.json b/package-lock.json index 4ff8246d..90452ff2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@debridge-finance/dln-taker", - "version": "2.13.3", + "version": "2.13.4", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@debridge-finance/dln-taker", - "version": "2.13.3", + "version": "2.13.4", "license": "GPL-3.0-only", "dependencies": { "@debridge-finance/dln-client": "7.0.1", diff --git a/package.json b/package.json index 871de0b8..d7fee22e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@debridge-finance/dln-taker", - "version": "2.13.3", + "version": "2.13.4", "description": "DLN executor is the rule-based daemon service developed to automatically execute orders placed on the deSwap Liquidity Network (DLN) across supported blockchains", "license": "GPL-3.0-only", "author": "deBridge", diff --git a/src/processors/mempool.service.ts b/src/processors/mempool.service.ts index d40d9757..7a2ebc4b 100644 --- a/src/processors/mempool.service.ts +++ b/src/processors/mempool.service.ts @@ -1,26 +1,37 @@ import { Logger } from 'pino'; - -import { setTimeout } from 'timers/promises'; import { OrderId } from './base'; export type OrderConsumer = (orderId: OrderId) => void; +export type MempoolOpts = { + baseDelay: number; + baseArchivalDelay: number; + delayStep: number; + archivalDelayStep: number; +}; + export class MempoolService { readonly #logger: Logger; - readonly #trackedOrders = new Set(); + readonly #trackedOrders = new Map>(); constructor( logger: Logger, + private readonly opts: MempoolOpts, private readonly orderConsumer: OrderConsumer, - private readonly maxReprocessDelay: number, - private readonly delayStep: number = 30, ) { this.#logger = logger.child({ service: 'MempoolService' }); } - private static getDelayPromise(delay: number) { - return setTimeout(delay * 1000); + delayArchivalOrder(orderId: OrderId, attempt: number) { + this.addOrder( + orderId, + this.opts.baseArchivalDelay + this.opts.archivalDelayStep * (attempt - 1), + ); + } + + delayOrder(orderId: OrderId, attempt: number) { + this.addOrder(orderId, this.opts.baseDelay + this.opts.delayStep * (attempt - 1)); } /** @@ -29,46 +40,42 @@ export class MempoolService { * @param params * @param triggerOrDelay */ - addOrder(orderId: OrderId, delay?: number, attempt: number = 0) { + addOrder(orderId: OrderId, delay: number = 0) { const orderLogger = this.#logger.child({ orderId }); if (this.#trackedOrders.has(orderId)) { - orderLogger.debug('already present in the mempool, not adding again'); - return; + clearTimeout(this.#trackedOrders.get(orderId)); } - this.#trackedOrders.add(orderId); - orderLogger.debug(`added to mempool, new mempool size: ${this.#trackedOrders.size} order(s)`); + const timeoutId = setTimeout(this.getTimeoutFunc(orderId, orderLogger), delay * 1000); + this.#trackedOrders.set(orderId, timeoutId); - const promiseStartTime = new Date(); - const maxTimeoutPromise = MempoolService.getDelayPromise( - this.maxReprocessDelay + this.delayStep * attempt, + orderLogger.debug( + `added to mempool (delay: ${delay}s), new mempool size: ${this.#trackedOrders.size} order(s)`, ); - const trigger = delay - ? Promise.any([MempoolService.getDelayPromise(delay), maxTimeoutPromise]) - : maxTimeoutPromise; - - trigger - .catch((reason) => { - orderLogger.error(`mempool promise triggered error: ${reason}`); - orderLogger.error(reason); - }) - .finally(() => { - const settlementTime = new Date(); - const waitingTime = (settlementTime.getTime() - promiseStartTime.getTime()) / 1000; - orderLogger.debug(`mempool promise triggered after ${waitingTime}s`); - if (this.#trackedOrders.has(orderId)) { - orderLogger.debug(`invoking order processing routine`); - this.#trackedOrders.delete(orderId); - this.orderConsumer(orderId); - } else { - orderLogger.debug(`order does not exist in the mempool`); - } - }); } delete(orderId: string) { + if (this.#trackedOrders.has(orderId)) { + clearTimeout(this.#trackedOrders.get(orderId)); + } this.#trackedOrders.delete(orderId); this.#logger.child({ orderId }).debug('order has been removed from the mempool'); } + + private getTimeoutFunc(orderId: OrderId, logger: Logger) { + const promiseStartTime = new Date(); + return () => { + const settlementTime = new Date(); + const waitingTime = (settlementTime.getTime() - promiseStartTime.getTime()) / 1000; + logger.debug(`mempool promise triggered after ${waitingTime}s`); + if (this.#trackedOrders.has(orderId)) { + logger.debug(`invoking order processing routine`); + this.#trackedOrders.delete(orderId); + this.orderConsumer(orderId); + } else { + logger.debug(`order does not exist in the mempool`); + } + }; + } } diff --git a/src/processors/universal.ts b/src/processors/universal.ts index c4bfc695..6ffb6e7d 100644 --- a/src/processors/universal.ts +++ b/src/processors/universal.ts @@ -36,7 +36,7 @@ import { OrderProcessorInitializer, } from './base'; import { BatchUnlocker } from './BatchUnlocker'; -import { MempoolService } from './mempool.service'; +import { MempoolOpts, MempoolService } from './mempool.service'; import { PostponingReason, RejectionReason } from '../hooks/HookEnums'; import { isRevertedError } from './utils/isRevertedError'; import { DexlessChains } from '../config'; @@ -63,14 +63,6 @@ export type UniversalProcessorParams = { * the deBridge app and the API suggest users placing orders with as much margin as 4bps */ minProfitabilityBps: number; - /** - * Mempool: max amount of seconds to wait before second attempt to process an order; default: 60s - */ - mempoolInterval: number; - /** - * Mempool: amount of seconds to add to the max amount of seconds on each subsequent attempt; default: 30s - */ - mempoolMaxDelayStep: number; /** * Number of orders (per every chain where orders are coming from and to) to accumulate to unlock them in batches * Min: 1; max: 10, default: 10. @@ -93,6 +85,8 @@ export type UniversalProcessorParams = { * Max slippage that can be used for swap from reserveToken to takeToken when calculated automatically */ preFulfillSwapMaxAllowedSlippageBps: number; + + mempool: MempoolOpts; }; // Represents all necessary information about Created order during its internal lifecycle @@ -123,11 +117,15 @@ class UniversalProcessor extends BaseOrderProcessor { private params: UniversalProcessorParams = { minProfitabilityBps: 4, - mempoolInterval: 60, - mempoolMaxDelayStep: 30, batchUnlockSize: 10, preFulfillSwapMinAllowedSlippageBps: 5, preFulfillSwapMaxAllowedSlippageBps: 400, + mempool: { + baseDelay: 5, + baseArchivalDelay: 60 * 2, + delayStep: 10, + archivalDelayStep: 60 * 5, + }, }; constructor(params?: Partial) { @@ -165,8 +163,8 @@ class UniversalProcessor extends BaseOrderProcessor { this.mempoolService = new MempoolService( logger.child({ takeChainId: chainId }), + this.params.mempool, (orderId: string) => this.consume(orderId), - this.params.mempoolInterval, ); if (chainId !== ChainId.Solana) { @@ -371,7 +369,15 @@ class UniversalProcessor extends BaseOrderProcessor { attempts, }); - if (addToMempool) this.mempoolService.addOrder(metadata.orderId, remainingDelay, attempts); + if (addToMempool) { + if (remainingDelay) { + this.mempoolService.addOrder(metadata.orderId, remainingDelay); + } else if (metadata.context.orderInfo.status === OrderInfoStatus.ArchivalCreated) { + this.mempoolService.delayArchivalOrder(metadata.orderId, attempts); + } else { + this.mempoolService.delayOrder(metadata.orderId, attempts); + } + } } private rejectOrder( @@ -922,7 +928,7 @@ class UniversalProcessor extends BaseOrderProcessor { const fulfillCheckDelay: number = this.takeChain.fulfillProvider.avgBlockSpeed * this.takeChain.fulfillProvider.finalizedBlockCount; - this.mempoolService.addOrder(metadata.orderId, fulfillCheckDelay, metadata.attempts); + this.mempoolService.addOrder(metadata.orderId, fulfillCheckDelay); return Promise.resolve(); }