From fa607ebba16d9bbbb083febf14aa14b2a0f503db Mon Sep 17 00:00:00 2001 From: Dmytro Shcherbonos Date: Mon, 11 Sep 2023 15:02:31 +0300 Subject: [PATCH] recurring scheduler: purge the scheduler --- src/redux/actions/ao.js | 16 +++ src/redux/constants/ao.js | 2 + src/redux/helpers/recurring_ao.js | 4 +- src/redux/middleware/ws/on_message.js | 10 ++ .../sagas/ao/fetch_recurring_ao_atomics.js | 36 +++++-- .../sagas/ao/handle_recurring_ao_atomics.js | 7 +- src/redux/sagas/ao/index.js | 2 + src/redux/sagas/ao/process_recurring_ao.js | 27 +++-- src/redux/sagas/ao/process_recurring_aos.js | 10 +- .../schedule_fetching_recurring_ao_status.js | 23 ---- src/redux/sagas/ao/schedule_recurring_ao.js | 100 ++++++++++++++++++ 11 files changed, 189 insertions(+), 48 deletions(-) delete mode 100644 src/redux/sagas/ao/schedule_fetching_recurring_ao_status.js create mode 100644 src/redux/sagas/ao/schedule_recurring_ao.js diff --git a/src/redux/actions/ao.js b/src/redux/actions/ao.js index a44fd8b62..6542f966f 100644 --- a/src/redux/actions/ao.js +++ b/src/redux/actions/ao.js @@ -102,6 +102,20 @@ export function recvRecurringAoAtomics(orders, gid, mode) { } } +export function recvRecurringAoAtomicsFailed(gid) { + return { + type: types.GET_RECURRING_AO_ATOMICS_FAILED, + payload: { gid }, + } +} + +export function requestRecurringAoAtomics(payload) { + return { + type: types.REQUEST_RECURRING_AO_ATOMICS, + payload, + } +} + export function setFailedRecurringAoAtomics(orders) { return { type: types.SET_FAILED_RECURRING_AO_ATOMICS, @@ -122,4 +136,6 @@ export default { setShowAOsHistory, recvRecurringAoAtomics, setFailedRecurringAoAtomics, + recvRecurringAoAtomicsFailed, + requestRecurringAoAtomics, } diff --git a/src/redux/constants/ao.js b/src/redux/constants/ao.js index 9529fdf08..a6417d51e 100644 --- a/src/redux/constants/ao.js +++ b/src/redux/constants/ao.js @@ -12,4 +12,6 @@ export default { SHOW_AOS_HISTORY: 'SHOW_AOS_HISTORY', SET_FAILED_RECURRING_AO_ATOMICS: 'SET_FAILED_RECURRING_AO_ATOMICS', DATA_RECURRING_AO_ATOMICS: 'DATA_RECURRING_AO_ATOMICS', + GET_RECURRING_AO_ATOMICS_FAILED: 'GET_RECURRING_AO_ATOMICS_FAILED', + REQUEST_RECURRING_AO_ATOMICS: 'REQUEST_RECURRING_AO_ATOMICS', } diff --git a/src/redux/helpers/recurring_ao.js b/src/redux/helpers/recurring_ao.js index c0fa73b8f..2efa3209e 100644 --- a/src/redux/helpers/recurring_ao.js +++ b/src/redux/helpers/recurring_ao.js @@ -6,7 +6,7 @@ const RECURRENCE_WIDTH = { [RECURRENCE_OPTIONS.WEEKLY]: TIMEFRAME_WIDTH['1w'], [RECURRENCE_OPTIONS.MONTHLY]: TIMEFRAME_WIDTH['1M'], } -export const DELAY_FOR_FETCH = TIMEFRAME_WIDTH['1m'] * 2 +export const RECURRING_DELAY_FOR_FETCH = TIMEFRAME_WIDTH['1m'] * 1.5 export const calculateNextExecutionTime = (startedAt, endedAt, recurrence) => { const recurrenceMs = RECURRENCE_WIDTH[recurrence] @@ -18,7 +18,7 @@ export const calculateNextExecutionTime = (startedAt, endedAt, recurrence) => { let nextExecutionTime = new Date(startedAt).getTime() const endedAtTime = endedAt ? new Date(endedAt).getTime() : Infinity - while (nextExecutionTime <= endedAtTime) { + while (nextExecutionTime <= endedAtTime + RECURRING_DELAY_FOR_FETCH) { if (nextExecutionTime >= Date.now()) { break } diff --git a/src/redux/middleware/ws/on_message.js b/src/redux/middleware/ws/on_message.js index 685c6a062..3ca3e298a 100644 --- a/src/redux/middleware/ws/on_message.js +++ b/src/redux/middleware/ws/on_message.js @@ -435,6 +435,16 @@ export default (alias, store) => (e = {}) => { break } + case 'data.recur_ao_atomic_orders.status': { + const [, gid, status] = payload + + if (status === 'failed') { + store.dispatch(AOActions.recvRecurringAoAtomicsFailed(gid)) + } + + break + } + case 'bt.exec': { const [, from, to, symbol, tf, withCandles, withTrades, syncData] = payload store.dispatch(WSActions.recvBacktestExecute({ diff --git a/src/redux/sagas/ao/fetch_recurring_ao_atomics.js b/src/redux/sagas/ao/fetch_recurring_ao_atomics.js index 122cf7c6d..d5f170319 100644 --- a/src/redux/sagas/ao/fetch_recurring_ao_atomics.js +++ b/src/redux/sagas/ao/fetch_recurring_ao_atomics.js @@ -1,26 +1,50 @@ import { - call, put, select, take, + call, delay, put, select, take, } from 'redux-saga/effects' import Debug from 'debug' import WSActions from '../../actions/ws' import types from '../../constants/ao' import { getAuthToken } from '../../selectors/ws' import handleRecurringAoAtomics from './handle_recurring_ao_atomics' +import AOActions from '../../actions/ao' +import { RECURRING_DELAY_FOR_FETCH } from '../../helpers/recurring_ao' const debug = Debug('hfui:recurring-ao') -export default function* fetchRecurringAoAtomics({ gid, firstDataRequest }) { +export default function* fetchRecurringAoAtomics({ + payload: { gid, firstDataRequest }, +}) { const authToken = yield select(getAuthToken) - yield put(WSActions.send(['recurring_algo_order.orders', authToken, 'bitfinex', gid])) + yield put( + WSActions.send(['recurring_algo_order.orders', authToken, 'bitfinex', gid]), + ) debug('request atomics for recurring %s', gid) while (true) { - const action = yield take(types.DATA_RECURRING_AO_ATOMICS) - const { payload: { orders, gid: _gid, mode } } = action + const action = yield take([ + types.DATA_RECURRING_AO_ATOMICS, + types.GET_RECURRING_AO_ATOMICS_FAILED, + ]) + const { + payload: { orders, gid: _gid, mode }, + type, + } = action + if (gid === _gid) { + if (type === types.GET_RECURRING_AO_ATOMICS_FAILED) { + debug('FAILED request atomics for recurring %s, retry in 1.5s', gid) + yield delay(RECURRING_DELAY_FOR_FETCH) + yield put( + AOActions.requestRecurringAoAtomics({ gid, firstDataRequest }), + ) + break + } yield call(handleRecurringAoAtomics, { - gid, orders, firstDataRequest, mode, + gid, + orders, + firstDataRequest, + mode, }) break } diff --git a/src/redux/sagas/ao/handle_recurring_ao_atomics.js b/src/redux/sagas/ao/handle_recurring_ao_atomics.js index 4b2148c41..94819842f 100644 --- a/src/redux/sagas/ao/handle_recurring_ao_atomics.js +++ b/src/redux/sagas/ao/handle_recurring_ao_atomics.js @@ -12,9 +12,8 @@ import UIActions from '../../actions/ui' import { getAlgoOrderById, getOrderHistory } from '../../selectors/ws' import { getFailedRecurringAoAtomics } from '../../selectors/ao' import { getMarketPair } from '../../selectors/meta' -import scheduleFetchingRecurringAOStatus from './schedule_fetching_recurring_ao_status' +import scheduleRecurringAo from './schedule_recurring_ao' import TIMEFRAME_WIDTHS from '../../../util/time_frame_widths' -import fetchRecurringAoAtomics from './fetch_recurring_ao_atomics' import { getCurrentMode, getFormatTimeFn } from '../../selectors/ui' import { getLastSessionTimestamp } from '../../../util/ui' import i18n from '../../../locales/i18n' @@ -137,7 +136,7 @@ export default function* handleRecurringAoAtomics({ }) if (isResponseUseful || firstDataRequest) { - yield call(scheduleFetchingRecurringAOStatus, { + yield call(scheduleRecurringAo, { gid, startedAt, endedAt, @@ -146,6 +145,6 @@ export default function* handleRecurringAoAtomics({ } else { debug('there are not new orders for %s, fetch again in 1m', gid) yield delay(TIMEFRAME_WIDTHS['1m']) - yield call(fetchRecurringAoAtomics, { gid, firstDataRequest: false }) + yield put(AOActions.requestRecurringAoAtomics({ gid, firstDataRequest: false })) } } diff --git a/src/redux/sagas/ao/index.js b/src/redux/sagas/ao/index.js index 8f9531e35..7bec7b8ba 100644 --- a/src/redux/sagas/ao/index.js +++ b/src/redux/sagas/ao/index.js @@ -11,6 +11,7 @@ import requestAOsHistory from './request_aos_history' import handleActiveAlgoOrders from './handle_active_algo_orders' import processRecurringAOs from './process_recurring_aos' import processRecurringAO from './process_recurring_ao' +import fetchRecurringAoAtomics from './fetch_recurring_ao_atomics' export default function* () { yield takeEvery(types.RESUME_REMOVE_ACTIVE_AOS, onResumeRemoveActiveAlgoOrdersHandler) @@ -21,4 +22,5 @@ export default function* () { yield takeEvery(WSTypes.DATA_RECURRING_ALGO_ORDERS, processRecurringAOs) yield takeLatest(types.HANDLE_ACTIVE_AOS, handleActiveAlgoOrders) yield takeEvery(WSTypes.DATA_ALGO_ORDER, processRecurringAO) + yield takeEvery(types.REQUEST_RECURRING_AO_ATOMICS, fetchRecurringAoAtomics) } diff --git a/src/redux/sagas/ao/process_recurring_ao.js b/src/redux/sagas/ao/process_recurring_ao.js index 695065a54..edffbf5e6 100644 --- a/src/redux/sagas/ao/process_recurring_ao.js +++ b/src/redux/sagas/ao/process_recurring_ao.js @@ -1,19 +1,30 @@ -import { call } from 'redux-saga/effects' +import { call, put, delay } from 'redux-saga/effects' +import Debug from 'debug' import { isFuture } from 'date-fns' -import scheduleFetchingRecurringAOStatus from './schedule_fetching_recurring_ao_status' -import fetchRecurringAoAtomics from './fetch_recurring_ao_atomics' +import scheduleRecurringAo from './schedule_recurring_ao' +import AOActions from '../../actions/ao' +import { RECURRING_DELAY_FOR_FETCH } from '../../helpers/recurring_ao' + +const debug = Debug('hfui:recurring-ao') export default function* processRecurringAO({ payload }) { - const { ao: { gid, args } } = payload + const { + ao: { + gid, args, createdAt, lastActive, + }, + } = payload const { startedAt, recurrence, endedAt = null } = args - - const shouldAtomicsBeFetched = !isFuture(new Date(startedAt)) + const isOrderNew = createdAt === lastActive + const shouldAtomicsBeFetched = !isFuture(new Date(startedAt)) && isOrderNew if (!shouldAtomicsBeFetched) { - yield call(scheduleFetchingRecurringAOStatus, { + yield call(scheduleRecurringAo, { gid, startedAt, endedAt, recurrence, }) + return } - yield call(fetchRecurringAoAtomics, { gid, firstDataRequest: false }) + debug('fetch recurring ao %s atomics in 1.5m', gid) + yield delay(RECURRING_DELAY_FOR_FETCH) + yield put(AOActions.requestRecurringAoAtomics({ gid, firstDataRequest: false })) } diff --git a/src/redux/sagas/ao/process_recurring_aos.js b/src/redux/sagas/ao/process_recurring_aos.js index c34de5629..8a09cec23 100644 --- a/src/redux/sagas/ao/process_recurring_aos.js +++ b/src/redux/sagas/ao/process_recurring_aos.js @@ -1,9 +1,9 @@ import _map from 'lodash/map' -import { all, call } from 'redux-saga/effects' +import { all, call, put } from 'redux-saga/effects' import { isFuture } from 'date-fns' import _isEmpty from 'lodash/isEmpty' -import scheduleFetchingRecurringAOStatus from './schedule_fetching_recurring_ao_status' -import fetchRecurringAoAtomics from './fetch_recurring_ao_atomics' +import scheduleRecurringAo from './schedule_recurring_ao' +import AOActions from '../../actions/ao' export default function* processRecurringAOs({ payload }) { const { aos } = payload @@ -17,11 +17,11 @@ export default function* processRecurringAOs({ payload }) { const shouldAtomicsBeFetched = !isFuture(new Date(startedAt)) if (!shouldAtomicsBeFetched) { - return call(scheduleFetchingRecurringAOStatus, { + return call(scheduleRecurringAo, { gid, startedAt, endedAt, recurrence, }) } - return call(fetchRecurringAoAtomics, { gid, firstDataRequest: true }) + return put(AOActions.requestRecurringAoAtomics({ gid, firstDataRequest: true })) }) yield all(operations) diff --git a/src/redux/sagas/ao/schedule_fetching_recurring_ao_status.js b/src/redux/sagas/ao/schedule_fetching_recurring_ao_status.js deleted file mode 100644 index eddfb4514..000000000 --- a/src/redux/sagas/ao/schedule_fetching_recurring_ao_status.js +++ /dev/null @@ -1,23 +0,0 @@ -import { delay } from 'redux-saga/effects' -import Debug from 'debug' -import { DELAY_FOR_FETCH, calculateNextExecutionTime } from '../../helpers/recurring_ao' -import fetchRecurringAoAtomics from './fetch_recurring_ao_atomics' - -const debug = Debug('hfui:recurring-ao') - -export default function* scheduleFetchingRecurringAOStatus({ - gid, startedAt, endedAt, recurrence, -}) { - const nextExecutionTime = calculateNextExecutionTime(startedAt, endedAt, recurrence) - const delayTime = nextExecutionTime - Date.now() + DELAY_FOR_FETCH - - debug('scheduled fetching for %s', gid, { - startedAt, - endedAt, - recurrence, - nextExecutionTime: new Date(nextExecutionTime).toISOString(), - fetchInMs: delayTime, - }) - yield delay(delayTime) - yield fetchRecurringAoAtomics({ gid, firstDataRequest: false }) -} diff --git a/src/redux/sagas/ao/schedule_recurring_ao.js b/src/redux/sagas/ao/schedule_recurring_ao.js new file mode 100644 index 000000000..83c20c831 --- /dev/null +++ b/src/redux/sagas/ao/schedule_recurring_ao.js @@ -0,0 +1,100 @@ +import { + call, delay, put, race, take, +} from 'redux-saga/effects' +import _max from 'lodash/max' +import Debug from 'debug' +import { + RECURRING_DELAY_FOR_FETCH, + calculateNextExecutionTime, +} from '../../helpers/recurring_ao' +import AOActions from '../../actions/ao' +import WSActions from '../../actions/ws' +import WSTypes from '../../constants/ws' + +const debug = Debug('hfui:recurring-ao') + +function* purgeSchedulerOnAlgoOrderChange({ gid, sagaToExecute }) { + console.log('before race') + const { cancel } = yield race({ + delay: sagaToExecute, + cancel: take((action) => { + const { + type, + payload, + } = action + let shouldBeCanceled = false + if (type === WSTypes.DATA_ALGO_ORDER) { + const { ao: { gid: _gid } } = payload + shouldBeCanceled = gid === _gid + } + if (type === WSTypes.DATA_ALGO_ORDER_STOPPED) { + const { gid: _gid } = payload + shouldBeCanceled = gid === _gid + } + if (shouldBeCanceled) { + debug('scheduler of %s was cancelled', gid) + } + return shouldBeCanceled + }), + }) + + return cancel +} + +function* scheduleCancellation({ gid, endedAtTime, endedAt }) { + const delayTime = _max(endedAtTime - Date.now(), 0) + debug('scheduled cancelation of %s', gid, { + endedAt, + delayTime, + }) + yield delay(delayTime) + yield put(WSActions.cancelAlgoOrder(gid)) +} + +function* scheduleFetching({ + gid, nextExecutionTime, startedAt, endedAt, recurrence, +}) { + const delayTime = nextExecutionTime - Date.now() + RECURRING_DELAY_FOR_FETCH + + debug('scheduled fetching for %s', gid, { + startedAt, + endedAt, + recurrence, + nextExecutionTime: new Date(nextExecutionTime).toISOString(), + fetchInMs: delayTime, + }) + + yield delay(delayTime) + yield put( + AOActions.requestRecurringAoAtomics({ gid, firstDataRequest: false }), + ) +} + +export default function* scheduleRecurringAo({ + gid, + startedAt, + endedAt, + recurrence, +}) { + const nextExecutionTime = calculateNextExecutionTime( + startedAt, + endedAt, + recurrence, + ) + const endedAtTime = endedAt ? new Date(endedAt).getTime() : null + + const shouldBeCancelled = endedAtTime && endedAtTime < nextExecutionTime + if (shouldBeCancelled) { + yield call(purgeSchedulerOnAlgoOrderChange, { + gid, + sagaToExecute: scheduleCancellation({ gid, endedAtTime, endedAt }), + }) + return + } + yield call(purgeSchedulerOnAlgoOrderChange, { + gid, + sagaToExecute: scheduleFetching({ + gid, nextExecutionTime, startedAt, endedAt, recurrence, + }), + }) +}