Skip to content
This repository has been archived by the owner on Aug 30, 2024. It is now read-only.

Commit

Permalink
feat: mempool uses separate delay configurations across live and arch…
Browse files Browse the repository at this point in the history
…ival orders (#123)

* feat: mempool uses separate delay configurations across live and archival orders

* fixes
  • Loading branch information
alexeychr authored Sep 14, 2023
1 parent db68b40 commit 22e7805
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 53 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@debridge-finance/dln-taker",
"version": "2.13.3",
"version": "2.13.4",
"description": "DLN executor is the rule-based daemon service developed to automatically execute orders placed on the deSwap Liquidity Network (DLN) across supported blockchains",
"license": "GPL-3.0-only",
"author": "deBridge",
Expand Down
79 changes: 43 additions & 36 deletions src/processors/mempool.service.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,37 @@
import { Logger } from 'pino';

import { setTimeout } from 'timers/promises';
import { OrderId } from './base';

export type OrderConsumer = (orderId: OrderId) => void;

export type MempoolOpts = {
baseDelay: number;
baseArchivalDelay: number;
delayStep: number;
archivalDelayStep: number;
};

export class MempoolService {
readonly #logger: Logger;

readonly #trackedOrders = new Set<OrderId>();
readonly #trackedOrders = new Map<OrderId, ReturnType<typeof setTimeout>>();

constructor(
logger: Logger,
private readonly opts: MempoolOpts,
private readonly orderConsumer: OrderConsumer,
private readonly maxReprocessDelay: number,
private readonly delayStep: number = 30,
) {
this.#logger = logger.child({ service: 'MempoolService' });
}

private static getDelayPromise(delay: number) {
return setTimeout(delay * 1000);
delayArchivalOrder(orderId: OrderId, attempt: number) {
this.addOrder(
orderId,
this.opts.baseArchivalDelay + this.opts.archivalDelayStep * (attempt - 1),
);
}

delayOrder(orderId: OrderId, attempt: number) {
this.addOrder(orderId, this.opts.baseDelay + this.opts.delayStep * (attempt - 1));
}

/**
Expand All @@ -29,46 +40,42 @@ export class MempoolService {
* @param params
* @param triggerOrDelay
*/
addOrder(orderId: OrderId, delay?: number, attempt: number = 0) {
addOrder(orderId: OrderId, delay: number = 0) {
const orderLogger = this.#logger.child({ orderId });

if (this.#trackedOrders.has(orderId)) {
orderLogger.debug('already present in the mempool, not adding again');
return;
clearTimeout(this.#trackedOrders.get(orderId));
}

this.#trackedOrders.add(orderId);
orderLogger.debug(`added to mempool, new mempool size: ${this.#trackedOrders.size} order(s)`);
const timeoutId = setTimeout(this.getTimeoutFunc(orderId, orderLogger), delay * 1000);
this.#trackedOrders.set(orderId, timeoutId);

const promiseStartTime = new Date();
const maxTimeoutPromise = MempoolService.getDelayPromise(
this.maxReprocessDelay + this.delayStep * attempt,
orderLogger.debug(
`added to mempool (delay: ${delay}s), new mempool size: ${this.#trackedOrders.size} order(s)`,
);
const trigger = delay
? Promise.any([MempoolService.getDelayPromise(delay), maxTimeoutPromise])
: maxTimeoutPromise;

trigger
.catch((reason) => {
orderLogger.error(`mempool promise triggered error: ${reason}`);
orderLogger.error(reason);
})
.finally(() => {
const settlementTime = new Date();
const waitingTime = (settlementTime.getTime() - promiseStartTime.getTime()) / 1000;
orderLogger.debug(`mempool promise triggered after ${waitingTime}s`);
if (this.#trackedOrders.has(orderId)) {
orderLogger.debug(`invoking order processing routine`);
this.#trackedOrders.delete(orderId);
this.orderConsumer(orderId);
} else {
orderLogger.debug(`order does not exist in the mempool`);
}
});
}

delete(orderId: string) {
if (this.#trackedOrders.has(orderId)) {
clearTimeout(this.#trackedOrders.get(orderId));
}
this.#trackedOrders.delete(orderId);
this.#logger.child({ orderId }).debug('order has been removed from the mempool');
}

private getTimeoutFunc(orderId: OrderId, logger: Logger) {
const promiseStartTime = new Date();
return () => {
const settlementTime = new Date();
const waitingTime = (settlementTime.getTime() - promiseStartTime.getTime()) / 1000;
logger.debug(`mempool promise triggered after ${waitingTime}s`);
if (this.#trackedOrders.has(orderId)) {
logger.debug(`invoking order processing routine`);
this.#trackedOrders.delete(orderId);
this.orderConsumer(orderId);
} else {
logger.debug(`order does not exist in the mempool`);
}
};
}
}
34 changes: 20 additions & 14 deletions src/processors/universal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import {
OrderProcessorInitializer,
} from './base';
import { BatchUnlocker } from './BatchUnlocker';
import { MempoolService } from './mempool.service';
import { MempoolOpts, MempoolService } from './mempool.service';
import { PostponingReason, RejectionReason } from '../hooks/HookEnums';
import { isRevertedError } from './utils/isRevertedError';
import { DexlessChains } from '../config';
Expand All @@ -63,14 +63,6 @@ export type UniversalProcessorParams = {
* the deBridge app and the API suggest users placing orders with as much margin as 4bps
*/
minProfitabilityBps: number;
/**
* Mempool: max amount of seconds to wait before second attempt to process an order; default: 60s
*/
mempoolInterval: number;
/**
* Mempool: amount of seconds to add to the max amount of seconds on each subsequent attempt; default: 30s
*/
mempoolMaxDelayStep: number;
/**
* Number of orders (per every chain where orders are coming from and to) to accumulate to unlock them in batches
* Min: 1; max: 10, default: 10.
Expand All @@ -93,6 +85,8 @@ export type UniversalProcessorParams = {
* Max slippage that can be used for swap from reserveToken to takeToken when calculated automatically
*/
preFulfillSwapMaxAllowedSlippageBps: number;

mempool: MempoolOpts;
};

// Represents all necessary information about Created order during its internal lifecycle
Expand Down Expand Up @@ -123,11 +117,15 @@ class UniversalProcessor extends BaseOrderProcessor {

private params: UniversalProcessorParams = {
minProfitabilityBps: 4,
mempoolInterval: 60,
mempoolMaxDelayStep: 30,
batchUnlockSize: 10,
preFulfillSwapMinAllowedSlippageBps: 5,
preFulfillSwapMaxAllowedSlippageBps: 400,
mempool: {
baseDelay: 5,
baseArchivalDelay: 60 * 2,
delayStep: 10,
archivalDelayStep: 60 * 5,
},
};

constructor(params?: Partial<UniversalProcessorParams>) {
Expand Down Expand Up @@ -165,8 +163,8 @@ class UniversalProcessor extends BaseOrderProcessor {

this.mempoolService = new MempoolService(
logger.child({ takeChainId: chainId }),
this.params.mempool,
(orderId: string) => this.consume(orderId),
this.params.mempoolInterval,
);

if (chainId !== ChainId.Solana) {
Expand Down Expand Up @@ -371,7 +369,15 @@ class UniversalProcessor extends BaseOrderProcessor {
attempts,
});

if (addToMempool) this.mempoolService.addOrder(metadata.orderId, remainingDelay, attempts);
if (addToMempool) {
if (remainingDelay) {
this.mempoolService.addOrder(metadata.orderId, remainingDelay);
} else if (metadata.context.orderInfo.status === OrderInfoStatus.ArchivalCreated) {
this.mempoolService.delayArchivalOrder(metadata.orderId, attempts);
} else {
this.mempoolService.delayOrder(metadata.orderId, attempts);
}
}
}

private rejectOrder(
Expand Down Expand Up @@ -922,7 +928,7 @@ class UniversalProcessor extends BaseOrderProcessor {
const fulfillCheckDelay: number =
this.takeChain.fulfillProvider.avgBlockSpeed *
this.takeChain.fulfillProvider.finalizedBlockCount;
this.mempoolService.addOrder(metadata.orderId, fulfillCheckDelay, metadata.attempts);
this.mempoolService.addOrder(metadata.orderId, fulfillCheckDelay);

return Promise.resolve();
}
Expand Down

0 comments on commit 22e7805

Please sign in to comment.