diff --git a/package.json b/package.json index 442e81a8..1fa30ce2 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bfx-report", - "version": "4.10.3", + "version": "4.10.4", "description": "Reporting tool", "main": "worker.js", "license": "Apache-2.0", diff --git a/workers/loc.api/bfx.api.router/index.js b/workers/loc.api/bfx.api.router/index.js index 083aff11..8badc274 100644 --- a/workers/loc.api/bfx.api.router/index.js +++ b/workers/loc.api/bfx.api.router/index.js @@ -4,7 +4,7 @@ const { decorateInjectable } = require('../di/utils') class BfxApiRouter { // Method does an idle job to be overridden in framework mode - route (methodName, method) { + route (methodName, method, interrupter) { return method() } } diff --git a/workers/loc.api/generate-report-file/csv-writer/weighted-averages-report-csv-writer.js b/workers/loc.api/generate-report-file/csv-writer/weighted-averages-report-csv-writer.js index 15134bdf..99288db6 100644 --- a/workers/loc.api/generate-report-file/csv-writer/weighted-averages-report-csv-writer.js +++ b/workers/loc.api/generate-report-file/csv-writer/weighted-averages-report-csv-writer.js @@ -41,7 +41,8 @@ module.exports = ( const { res } = await getDataFromApi({ getData: rService[name].bind(rService), args, - callerName: 'CSV_WRITER' + callerName: 'CSV_WRITER', + shouldNotInterrupt: true }) wStream.setMaxListeners(50) diff --git a/workers/loc.api/helpers/get-data-from-api.js b/workers/loc.api/helpers/get-data-from-api.js index 86e83b7f..f2fee277 100644 --- a/workers/loc.api/helpers/get-data-from-api.js +++ b/workers/loc.api/helpers/get-data-from-api.js @@ -49,7 +49,7 @@ const _getEmptyArrRes = () => { } module.exports = ( - interrupter, + commonInterrupter, wsEventEmitter ) => async ({ getData, @@ -59,11 +59,12 @@ module.exports = ( callerName, eNetErrorAttemptsTimeframeMin = 10, // min eNetErrorAttemptsTimeoutMs = 10000, // ms - shouldNotInterrupt + shouldNotInterrupt, + interrupter }) => { const _interrupter = shouldNotInterrupt ? null - : interrupter + : interrupter ?? commonInterrupter const ms = 80000 @@ -87,6 +88,7 @@ module.exports = ( * due to an internet connection issue */ _args.shouldNotBeLoggedToStdErrorStream = true + _args.interrupter = _interrupter if ( typeof getData === 'string' && diff --git a/workers/loc.api/helpers/get-rest.js b/workers/loc.api/helpers/get-rest.js index a4d98c47..85fa8e57 100644 --- a/workers/loc.api/helpers/get-rest.js +++ b/workers/loc.api/helpers/get-rest.js @@ -41,18 +41,25 @@ const _bfxFactory = (conf) => { }) } -const _route = (bfxApiRouter, methodName, args) => { +const _route = (bfxApiRouter, methodName, args, interrupter) => { if (!(bfxApiRouter instanceof BfxApiRouter)) { return Reflect.apply(...args) } return bfxApiRouter.route( methodName, - () => Reflect.apply(...args) + () => Reflect.apply(...args), + interrupter ) } -const _asyncApplyHook = async (bfxApiRouter, incomingRes, propKey, ...args) => { +const _asyncApplyHook = async ( + bfxApiRouter, + incomingRes, + propKey, + args, + interrupter +) => { let attemptsCount = 0 let caughtErr = null @@ -70,7 +77,8 @@ const _asyncApplyHook = async (bfxApiRouter, incomingRes, propKey, ...args) => { const res = await _route( bfxApiRouter, propKey, - args + args, + interrupter ) return res @@ -99,7 +107,11 @@ const _isNotPromiseOrBluebird = (instance) => ( ) ) -const _getRestProxy = (rest, bfxApiRouter) => { +const _getRestProxy = (rest, deps) => { + const { + bfxApiRouter, + interrupter + } = deps return new Proxy(rest, { get (target, propKey) { if (typeof target[propKey] !== 'function') { @@ -121,14 +133,21 @@ const _getRestProxy = (rest, bfxApiRouter) => { const res = _route( bfxApiRouter, propKey, - args + args, + interrupter ) if (_isNotPromiseOrBluebird(res)) { return res } - return _asyncApplyHook(bfxApiRouter, res, propKey, ...args) + return _asyncApplyHook( + bfxApiRouter, + res, + propKey, + args, + interrupter + ) } catch (err) { if (isNonceSmallError(err)) { attemptsCount += 1 @@ -165,7 +184,8 @@ module.exports = (conf, bfxApiRouter) => { authToken: _authToken = '' } = auth const { - timeout = 90000 + timeout = 90000, + interrupter } = opts ?? {} /* @@ -185,7 +205,10 @@ module.exports = (conf, bfxApiRouter) => { } const rest = bfxInstance.rest(2, restOpts) - const proxy = _getRestProxy(rest, bfxApiRouter) + const proxy = _getRestProxy(rest, { + bfxApiRouter, + interrupter + }) return proxy } diff --git a/workers/loc.api/helpers/prepare-response/helpers/is-not-contained-same-mts.js b/workers/loc.api/helpers/prepare-response/helpers/is-not-contained-same-mts.js index 9eb830a5..5011f950 100644 --- a/workers/loc.api/helpers/prepare-response/helpers/is-not-contained-same-mts.js +++ b/workers/loc.api/helpers/prepare-response/helpers/is-not-contained-same-mts.js @@ -2,6 +2,14 @@ const { getMethodLimit } = require('../../limit-param.helpers') +const checkLength = (apiRes, apiMethodName) => { + if (apiMethodName === 'candles') { + return apiRes.length <= 1 + } + + return apiRes.length === 0 +} + module.exports = (args, opts) => { const { apiRes, @@ -27,7 +35,7 @@ module.exports = (args, opts) => { ) return ( - apiRes.length <= 1 || // Check makes sense to prevent double requests to api + checkLength(apiRes, apiMethodName) || // Check makes sense to prevent double requests to api methodLimit > limit || apiRes.some((item) => ( item?.[datePropName] !== mts diff --git a/workers/loc.api/helpers/prepare-response/index.js b/workers/loc.api/helpers/prepare-response/index.js index b9d5ffa5..a3817bee 100644 --- a/workers/loc.api/helpers/prepare-response/index.js +++ b/workers/loc.api/helpers/prepare-response/index.js @@ -22,9 +22,10 @@ const _requestToApi = ( getREST, apiMethodName, params, - auth + auth, + interrupter ) => { - const rest = getREST(auth) + const rest = getREST(auth, { interrupter }) const bfxApiMethodName = getBfxApiMethodName(apiMethodName) const fn = rest[bfxApiMethodName].bind(rest) @@ -55,7 +56,8 @@ const _getResAndParams = async ( getREST, apiMethodName, queryParams, - args.auth + args.auth, + args?.interrupter ) return { diff --git a/workers/loc.api/interrupter/index.js b/workers/loc.api/interrupter/index.js index 6741c4b7..3b9a3598 100644 --- a/workers/loc.api/interrupter/index.js +++ b/workers/loc.api/interrupter/index.js @@ -2,6 +2,8 @@ const EventEmitter = require('events') +const INTERRUPTER_NAMES = require('./interrupter.names') + const { decorateInjectable } = require('../di/utils') class Interrupter extends EventEmitter { @@ -14,6 +16,14 @@ class Interrupter extends EventEmitter { this._isInterrupted = false this._interruptPromise = Promise.resolve() + + this.name = INTERRUPTER_NAMES.COMMON_INTERRUPTER + } + + setName (name) { + this.name = name ?? INTERRUPTER_NAMES.COMMON_INTERRUPTER + + return this } hasInterrupted () { @@ -63,6 +73,10 @@ class Interrupter extends EventEmitter { this.off(this.INTERRUPT_EVENT, cb) } + onceInterrupted (cb) { + this.once(this.INTERRUPTED_EVENT, cb) + } + emitInterrupted (error, progress) { if (error) { this.emit(this.INTERRUPTED_WITH_ERR_EVENT, error) diff --git a/workers/loc.api/interrupter/interrupter.names.js b/workers/loc.api/interrupter/interrupter.names.js new file mode 100644 index 00000000..85e4af31 --- /dev/null +++ b/workers/loc.api/interrupter/interrupter.names.js @@ -0,0 +1,8 @@ +'use strict' + +module.exports = { + COMMON_INTERRUPTER: 'COMMON_INTERRUPTER', + + SYNC_INTERRUPTER: 'SYNC_INTERRUPTER', + TRX_TAX_REPORT_INTERRUPTER: 'TRX_TAX_REPORT_INTERRUPTER' +} diff --git a/workers/loc.api/queue/write-data-to-stream/index.js b/workers/loc.api/queue/write-data-to-stream/index.js index 5e128f94..1f9bd2e8 100644 --- a/workers/loc.api/queue/write-data-to-stream/index.js +++ b/workers/loc.api/queue/write-data-to-stream/index.js @@ -49,7 +49,8 @@ module.exports = ( const _res = await getDataFromApi({ getData, args: currIterationArgs, - callerName: 'REPORT_FILE_WRITER' + callerName: 'REPORT_FILE_WRITER', + shouldNotInterrupt: true }) const isGetWalletsMethod = method === 'getWallets' diff --git a/workers/loc.api/service.report.js b/workers/loc.api/service.report.js index 5c1399b2..090a7413 100644 --- a/workers/loc.api/service.report.js +++ b/workers/loc.api/service.report.js @@ -26,7 +26,9 @@ class ReportService extends Api { } _generateToken (args, opts) { - const rest = this._getREST(args?.auth) + const rest = this._getREST(args?.auth, { + interrupter: args?.interrupter + }) return rest.generateToken({ ttl: opts?.ttl ?? 3600, @@ -38,26 +40,34 @@ class ReportService extends Api { } _invalidateAuthToken (args) { - const rest = this._getREST(args?.auth) + const rest = this._getREST(args?.auth, { + interrupter: args?.interrupter + }) const { authToken } = args?.params ?? {} return rest.invalidateAuthToken({ authToken }) } _getUserInfo (args) { - const rest = this._getREST(args.auth) + const rest = this._getREST(args.auth, { + interrupter: args?.interrupter + }) return rest.userInfo() } - _getSymbols () { - const rest = this._getREST({}) + _getSymbols (args) { + const rest = this._getREST({}, { + interrupter: args?.interrupter + }) return rest.symbols() } - _getFutures () { - const rest = this._getREST({}) + _getFutures (args) { + const rest = this._getREST({}, { + interrupter: args?.interrupter + }) return rest.futures() } @@ -68,41 +78,46 @@ class ReportService extends Api { return rest.currencies() } - _getInactiveSymbols () { - const rest = this._getREST({}) + _getInactiveSymbols (args) { + const rest = this._getREST({}, { + interrupter: args?.interrupter + }) return rest.inactiveSymbols() } - async _getConf (opts) { - const { keys: _keys } = opts ?? {} + async _getConf (args) { + const { keys: _keys, interrupter } = args ?? {} const keys = Array.isArray(_keys) ? _keys : [_keys] - const rest = this._getREST({}) + const rest = this._getREST({}, { interrupter }) const res = await rest.conf({ keys }) return Array.isArray(res) ? res : [] } - async _getMapSymbols () { + async _getMapSymbols (args) { const [res] = await this._getConf({ - keys: 'pub:map:pair:sym' + keys: 'pub:map:pair:sym', + interrupter: args?.interrupter }) return Array.isArray(res) ? res : [] } - async _getInactiveCurrencies () { + async _getInactiveCurrencies (args) { const [res] = await this._getConf({ - keys: 'pub:list:currency:inactive' + keys: 'pub:list:currency:inactive', + interrupter: args?.interrupter }) return Array.isArray(res) ? res : [] } - async _getMarginCurrencyList () { + async _getMarginCurrencyList (args) { const [res] = await this._getConf({ - keys: 'pub:list:currency:margin' + keys: 'pub:list:currency:margin', + interrupter: args?.interrupter }) return Array.isArray(res) ? res : [] @@ -111,7 +126,9 @@ class ReportService extends Api { _getWeightedAveragesReportFromApi (args) { const { auth, params } = args ?? {} - const rest = this._getREST(auth) + const rest = this._getREST(auth, { + interrupter: args?.interrupter + }) return rest.getWeightedAverages(params) } @@ -218,13 +235,13 @@ class ReportService extends Api { inactiveCurrencies, marginCurrencyList ] = await Promise.all([ - this._getSymbols(), - this._getFutures(), - this._getCurrencies(), - this._getInactiveSymbols(), - this._getMapSymbols(), - this._getInactiveCurrencies(), - this._getMarginCurrencyList() + this._getSymbols(args), + this._getFutures(args), + this._getCurrencies(args), + this._getInactiveSymbols(args), + this._getMapSymbols(args), + this._getInactiveCurrencies(args), + this._getMarginCurrencyList(args) ]) const res = prepareSymbolResponse({ @@ -248,7 +265,9 @@ class ReportService extends Api { const { auth, params } = args ?? {} const { keys = [] } = params ?? {} - const rest = this._getREST(auth) + const rest = this._getREST(auth, { + interrupter: args?.interrupter + }) return rest.getSettings({ keys }) }, 'getSettings', args, cb) @@ -259,7 +278,9 @@ class ReportService extends Api { const { auth, params } = args ?? {} const { settings = {} } = params ?? {} - const rest = this._getREST(auth) + const rest = this._getREST(auth, { + interrupter: args?.interrupter + }) return rest.updateSettings({ settings }) }, 'updateSettings', args, cb) @@ -311,7 +332,9 @@ class ReportService extends Api { getActivePositions (space, args, cb) { return this._responder(async () => { - const rest = this._getREST(args.auth) + const rest = this._getREST(args.auth, { + interrupter: args?.interrupter + }) const positions = omitPrivateModelFields( await rest.positions() ) @@ -341,7 +364,9 @@ class ReportService extends Api { return this._responder(async () => { checkParams(args, 'paramsSchemaForWallets') - const rest = this._getREST(args.auth) + const rest = this._getREST(args.auth, { + interrupter: args?.interrupter + }) return omitPrivateModelFields(await rest.wallets()) }, 'getWallets', args, cb) @@ -510,7 +535,9 @@ class ReportService extends Api { getActiveOrders (space, args, cb) { return this._responder(async () => { - const rest = this._getREST(args.auth) + const rest = this._getREST(args.auth, { + interrupter: args?.interrupter + }) const _res = omitPrivateModelFields( await rest.activeOrders() @@ -537,8 +564,8 @@ class ReportService extends Api { return this._responder(async () => { checkParams(args, 'paramsSchemaForMovementInfo', ['id']) - const { auth, params } = args ?? {} - const rest = this._getREST(auth) + const { auth, params, interrupter } = args ?? {} + const rest = this._getREST(auth, { interrupter }) const res = omitPrivateModelFields( await rest.movementInfo({ id: params?.id }) @@ -604,7 +631,10 @@ class ReportService extends Api { getAccountSummary (space, args, cb) { return this._responder(async () => { const { auth } = { ...args } - const rest = this._getREST(auth, { timeout: 30000 }) + const rest = this._getREST(auth, { + timeout: 30000, + interrupter: args?.interrupter + }) const res = omitPrivateModelFields( await rest.accountSummary()