Skip to content

Commit

Permalink
recurring scheduler: purge the scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
dmytroshch committed Sep 11, 2023
1 parent 49bcd0a commit fa607eb
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 48 deletions.
16 changes: 16 additions & 0 deletions src/redux/actions/ao.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,20 @@ export function recvRecurringAoAtomics(orders, gid, mode) {
}
}

export function recvRecurringAoAtomicsFailed(gid) {
return {
type: types.GET_RECURRING_AO_ATOMICS_FAILED,
payload: { gid },
}
}

export function requestRecurringAoAtomics(payload) {
return {
type: types.REQUEST_RECURRING_AO_ATOMICS,
payload,
}
}

export function setFailedRecurringAoAtomics(orders) {
return {
type: types.SET_FAILED_RECURRING_AO_ATOMICS,
Expand All @@ -122,4 +136,6 @@ export default {
setShowAOsHistory,
recvRecurringAoAtomics,
setFailedRecurringAoAtomics,
recvRecurringAoAtomicsFailed,
requestRecurringAoAtomics,
}
2 changes: 2 additions & 0 deletions src/redux/constants/ao.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ export default {
SHOW_AOS_HISTORY: 'SHOW_AOS_HISTORY',
SET_FAILED_RECURRING_AO_ATOMICS: 'SET_FAILED_RECURRING_AO_ATOMICS',
DATA_RECURRING_AO_ATOMICS: 'DATA_RECURRING_AO_ATOMICS',
GET_RECURRING_AO_ATOMICS_FAILED: 'GET_RECURRING_AO_ATOMICS_FAILED',
REQUEST_RECURRING_AO_ATOMICS: 'REQUEST_RECURRING_AO_ATOMICS',
}
4 changes: 2 additions & 2 deletions src/redux/helpers/recurring_ao.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const RECURRENCE_WIDTH = {
[RECURRENCE_OPTIONS.WEEKLY]: TIMEFRAME_WIDTH['1w'],
[RECURRENCE_OPTIONS.MONTHLY]: TIMEFRAME_WIDTH['1M'],
}
export const DELAY_FOR_FETCH = TIMEFRAME_WIDTH['1m'] * 2
export const RECURRING_DELAY_FOR_FETCH = TIMEFRAME_WIDTH['1m'] * 1.5

export const calculateNextExecutionTime = (startedAt, endedAt, recurrence) => {
const recurrenceMs = RECURRENCE_WIDTH[recurrence]
Expand All @@ -18,7 +18,7 @@ export const calculateNextExecutionTime = (startedAt, endedAt, recurrence) => {
let nextExecutionTime = new Date(startedAt).getTime()
const endedAtTime = endedAt ? new Date(endedAt).getTime() : Infinity

while (nextExecutionTime <= endedAtTime) {
while (nextExecutionTime <= endedAtTime + RECURRING_DELAY_FOR_FETCH) {
if (nextExecutionTime >= Date.now()) {
break
}
Expand Down
10 changes: 10 additions & 0 deletions src/redux/middleware/ws/on_message.js
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ export default (alias, store) => (e = {}) => {
break
}

case 'data.recur_ao_atomic_orders.status': {
const [, gid, status] = payload

if (status === 'failed') {
store.dispatch(AOActions.recvRecurringAoAtomicsFailed(gid))
}

break
}

case 'bt.exec': {
const [, from, to, symbol, tf, withCandles, withTrades, syncData] = payload
store.dispatch(WSActions.recvBacktestExecute({
Expand Down
36 changes: 30 additions & 6 deletions src/redux/sagas/ao/fetch_recurring_ao_atomics.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,50 @@
import {
call, put, select, take,
call, delay, put, select, take,
} from 'redux-saga/effects'
import Debug from 'debug'
import WSActions from '../../actions/ws'
import types from '../../constants/ao'
import { getAuthToken } from '../../selectors/ws'
import handleRecurringAoAtomics from './handle_recurring_ao_atomics'
import AOActions from '../../actions/ao'
import { RECURRING_DELAY_FOR_FETCH } from '../../helpers/recurring_ao'

const debug = Debug('hfui:recurring-ao')

export default function* fetchRecurringAoAtomics({ gid, firstDataRequest }) {
export default function* fetchRecurringAoAtomics({
payload: { gid, firstDataRequest },
}) {
const authToken = yield select(getAuthToken)

yield put(WSActions.send(['recurring_algo_order.orders', authToken, 'bitfinex', gid]))
yield put(
WSActions.send(['recurring_algo_order.orders', authToken, 'bitfinex', gid]),
)
debug('request atomics for recurring %s', gid)

while (true) {
const action = yield take(types.DATA_RECURRING_AO_ATOMICS)
const { payload: { orders, gid: _gid, mode } } = action
const action = yield take([
types.DATA_RECURRING_AO_ATOMICS,
types.GET_RECURRING_AO_ATOMICS_FAILED,
])
const {
payload: { orders, gid: _gid, mode },
type,
} = action

if (gid === _gid) {
if (type === types.GET_RECURRING_AO_ATOMICS_FAILED) {
debug('FAILED request atomics for recurring %s, retry in 1.5s', gid)
yield delay(RECURRING_DELAY_FOR_FETCH)
yield put(
AOActions.requestRecurringAoAtomics({ gid, firstDataRequest }),
)
break
}
yield call(handleRecurringAoAtomics, {
gid, orders, firstDataRequest, mode,
gid,
orders,
firstDataRequest,
mode,
})
break
}
Expand Down
7 changes: 3 additions & 4 deletions src/redux/sagas/ao/handle_recurring_ao_atomics.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import UIActions from '../../actions/ui'
import { getAlgoOrderById, getOrderHistory } from '../../selectors/ws'
import { getFailedRecurringAoAtomics } from '../../selectors/ao'
import { getMarketPair } from '../../selectors/meta'
import scheduleFetchingRecurringAOStatus from './schedule_fetching_recurring_ao_status'
import scheduleRecurringAo from './schedule_recurring_ao'
import TIMEFRAME_WIDTHS from '../../../util/time_frame_widths'
import fetchRecurringAoAtomics from './fetch_recurring_ao_atomics'
import { getCurrentMode, getFormatTimeFn } from '../../selectors/ui'
import { getLastSessionTimestamp } from '../../../util/ui'
import i18n from '../../../locales/i18n'
Expand Down Expand Up @@ -137,7 +136,7 @@ export default function* handleRecurringAoAtomics({
})

if (isResponseUseful || firstDataRequest) {
yield call(scheduleFetchingRecurringAOStatus, {
yield call(scheduleRecurringAo, {
gid,
startedAt,
endedAt,
Expand All @@ -146,6 +145,6 @@ export default function* handleRecurringAoAtomics({
} else {
debug('there are not new orders for %s, fetch again in 1m', gid)
yield delay(TIMEFRAME_WIDTHS['1m'])
yield call(fetchRecurringAoAtomics, { gid, firstDataRequest: false })
yield put(AOActions.requestRecurringAoAtomics({ gid, firstDataRequest: false }))
}
}
2 changes: 2 additions & 0 deletions src/redux/sagas/ao/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import requestAOsHistory from './request_aos_history'
import handleActiveAlgoOrders from './handle_active_algo_orders'
import processRecurringAOs from './process_recurring_aos'
import processRecurringAO from './process_recurring_ao'
import fetchRecurringAoAtomics from './fetch_recurring_ao_atomics'

export default function* () {
yield takeEvery(types.RESUME_REMOVE_ACTIVE_AOS, onResumeRemoveActiveAlgoOrdersHandler)
Expand All @@ -21,4 +22,5 @@ export default function* () {
yield takeEvery(WSTypes.DATA_RECURRING_ALGO_ORDERS, processRecurringAOs)
yield takeLatest(types.HANDLE_ACTIVE_AOS, handleActiveAlgoOrders)
yield takeEvery(WSTypes.DATA_ALGO_ORDER, processRecurringAO)
yield takeEvery(types.REQUEST_RECURRING_AO_ATOMICS, fetchRecurringAoAtomics)
}
27 changes: 19 additions & 8 deletions src/redux/sagas/ao/process_recurring_ao.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
import { call } from 'redux-saga/effects'
import { call, put, delay } from 'redux-saga/effects'
import Debug from 'debug'
import { isFuture } from 'date-fns'
import scheduleFetchingRecurringAOStatus from './schedule_fetching_recurring_ao_status'
import fetchRecurringAoAtomics from './fetch_recurring_ao_atomics'
import scheduleRecurringAo from './schedule_recurring_ao'
import AOActions from '../../actions/ao'
import { RECURRING_DELAY_FOR_FETCH } from '../../helpers/recurring_ao'

const debug = Debug('hfui:recurring-ao')

export default function* processRecurringAO({ payload }) {
const { ao: { gid, args } } = payload
const {
ao: {
gid, args, createdAt, lastActive,
},
} = payload

const { startedAt, recurrence, endedAt = null } = args

const shouldAtomicsBeFetched = !isFuture(new Date(startedAt))
const isOrderNew = createdAt === lastActive
const shouldAtomicsBeFetched = !isFuture(new Date(startedAt)) && isOrderNew

if (!shouldAtomicsBeFetched) {
yield call(scheduleFetchingRecurringAOStatus, {
yield call(scheduleRecurringAo, {
gid, startedAt, endedAt, recurrence,
})
return
}
yield call(fetchRecurringAoAtomics, { gid, firstDataRequest: false })
debug('fetch recurring ao %s atomics in 1.5m', gid)
yield delay(RECURRING_DELAY_FOR_FETCH)
yield put(AOActions.requestRecurringAoAtomics({ gid, firstDataRequest: false }))
}
10 changes: 5 additions & 5 deletions src/redux/sagas/ao/process_recurring_aos.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import _map from 'lodash/map'
import { all, call } from 'redux-saga/effects'
import { all, call, put } from 'redux-saga/effects'
import { isFuture } from 'date-fns'
import _isEmpty from 'lodash/isEmpty'
import scheduleFetchingRecurringAOStatus from './schedule_fetching_recurring_ao_status'
import fetchRecurringAoAtomics from './fetch_recurring_ao_atomics'
import scheduleRecurringAo from './schedule_recurring_ao'
import AOActions from '../../actions/ao'

export default function* processRecurringAOs({ payload }) {
const { aos } = payload
Expand All @@ -17,11 +17,11 @@ export default function* processRecurringAOs({ payload }) {
const shouldAtomicsBeFetched = !isFuture(new Date(startedAt))

if (!shouldAtomicsBeFetched) {
return call(scheduleFetchingRecurringAOStatus, {
return call(scheduleRecurringAo, {
gid, startedAt, endedAt, recurrence,
})
}
return call(fetchRecurringAoAtomics, { gid, firstDataRequest: true })
return put(AOActions.requestRecurringAoAtomics({ gid, firstDataRequest: true }))
})

yield all(operations)
Expand Down
23 changes: 0 additions & 23 deletions src/redux/sagas/ao/schedule_fetching_recurring_ao_status.js

This file was deleted.

100 changes: 100 additions & 0 deletions src/redux/sagas/ao/schedule_recurring_ao.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import {
call, delay, put, race, take,
} from 'redux-saga/effects'
import _max from 'lodash/max'
import Debug from 'debug'
import {
RECURRING_DELAY_FOR_FETCH,
calculateNextExecutionTime,
} from '../../helpers/recurring_ao'
import AOActions from '../../actions/ao'
import WSActions from '../../actions/ws'
import WSTypes from '../../constants/ws'

const debug = Debug('hfui:recurring-ao')

function* purgeSchedulerOnAlgoOrderChange({ gid, sagaToExecute }) {
console.log('before race')
const { cancel } = yield race({
delay: sagaToExecute,
cancel: take((action) => {
const {
type,
payload,
} = action
let shouldBeCanceled = false
if (type === WSTypes.DATA_ALGO_ORDER) {
const { ao: { gid: _gid } } = payload
shouldBeCanceled = gid === _gid
}
if (type === WSTypes.DATA_ALGO_ORDER_STOPPED) {
const { gid: _gid } = payload
shouldBeCanceled = gid === _gid
}
if (shouldBeCanceled) {
debug('scheduler of %s was cancelled', gid)
}
return shouldBeCanceled
}),
})

return cancel
}

function* scheduleCancellation({ gid, endedAtTime, endedAt }) {
const delayTime = _max(endedAtTime - Date.now(), 0)
debug('scheduled cancelation of %s', gid, {
endedAt,
delayTime,
})
yield delay(delayTime)
yield put(WSActions.cancelAlgoOrder(gid))
}

function* scheduleFetching({
gid, nextExecutionTime, startedAt, endedAt, recurrence,
}) {
const delayTime = nextExecutionTime - Date.now() + RECURRING_DELAY_FOR_FETCH

debug('scheduled fetching for %s', gid, {
startedAt,
endedAt,
recurrence,
nextExecutionTime: new Date(nextExecutionTime).toISOString(),
fetchInMs: delayTime,
})

yield delay(delayTime)
yield put(
AOActions.requestRecurringAoAtomics({ gid, firstDataRequest: false }),
)
}

export default function* scheduleRecurringAo({
gid,
startedAt,
endedAt,
recurrence,
}) {
const nextExecutionTime = calculateNextExecutionTime(
startedAt,
endedAt,
recurrence,
)
const endedAtTime = endedAt ? new Date(endedAt).getTime() : null

const shouldBeCancelled = endedAtTime && endedAtTime < nextExecutionTime
if (shouldBeCancelled) {
yield call(purgeSchedulerOnAlgoOrderChange, {
gid,
sagaToExecute: scheduleCancellation({ gid, endedAtTime, endedAt }),
})
return
}
yield call(purgeSchedulerOnAlgoOrderChange, {
gid,
sagaToExecute: scheduleFetching({
gid, nextExecutionTime, startedAt, endedAt, recurrence,
}),
})
}

0 comments on commit fa607eb

Please sign in to comment.