Skip to content

Commit

Permalink
Merge pull request #375 from bitfinexcom/staging
Browse files Browse the repository at this point in the history
Release version to master
  • Loading branch information
prdn authored Jul 2, 2024
2 parents 72a97ed + ebbe2df commit 7e2d0c8
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 54 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bfx-report",
"version": "4.10.3",
"version": "4.10.4",
"description": "Reporting tool",
"main": "worker.js",
"license": "Apache-2.0",
Expand Down
2 changes: 1 addition & 1 deletion workers/loc.api/bfx.api.router/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { decorateInjectable } = require('../di/utils')

class BfxApiRouter {
// Method does an idle job to be overridden in framework mode
route (methodName, method) {
route (methodName, method, interrupter) {
return method()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ module.exports = (
const { res } = await getDataFromApi({
getData: rService[name].bind(rService),
args,
callerName: 'CSV_WRITER'
callerName: 'CSV_WRITER',
shouldNotInterrupt: true
})

wStream.setMaxListeners(50)
Expand Down
8 changes: 5 additions & 3 deletions workers/loc.api/helpers/get-data-from-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const _getEmptyArrRes = () => {
}

module.exports = (
interrupter,
commonInterrupter,
wsEventEmitter
) => async ({
getData,
Expand All @@ -59,11 +59,12 @@ module.exports = (
callerName,
eNetErrorAttemptsTimeframeMin = 10, // min
eNetErrorAttemptsTimeoutMs = 10000, // ms
shouldNotInterrupt
shouldNotInterrupt,
interrupter
}) => {
const _interrupter = shouldNotInterrupt
? null
: interrupter
: interrupter ?? commonInterrupter

const ms = 80000

Expand All @@ -87,6 +88,7 @@ module.exports = (
* due to an internet connection issue
*/
_args.shouldNotBeLoggedToStdErrorStream = true
_args.interrupter = _interrupter

if (
typeof getData === 'string' &&
Expand Down
41 changes: 32 additions & 9 deletions workers/loc.api/helpers/get-rest.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,25 @@ const _bfxFactory = (conf) => {
})
}

const _route = (bfxApiRouter, methodName, args) => {
const _route = (bfxApiRouter, methodName, args, interrupter) => {
if (!(bfxApiRouter instanceof BfxApiRouter)) {
return Reflect.apply(...args)
}

return bfxApiRouter.route(
methodName,
() => Reflect.apply(...args)
() => Reflect.apply(...args),
interrupter
)
}

const _asyncApplyHook = async (bfxApiRouter, incomingRes, propKey, ...args) => {
const _asyncApplyHook = async (
bfxApiRouter,
incomingRes,
propKey,
args,
interrupter
) => {
let attemptsCount = 0
let caughtErr = null

Expand All @@ -70,7 +77,8 @@ const _asyncApplyHook = async (bfxApiRouter, incomingRes, propKey, ...args) => {
const res = await _route(
bfxApiRouter,
propKey,
args
args,
interrupter
)

return res
Expand Down Expand Up @@ -99,7 +107,11 @@ const _isNotPromiseOrBluebird = (instance) => (
)
)

const _getRestProxy = (rest, bfxApiRouter) => {
const _getRestProxy = (rest, deps) => {
const {
bfxApiRouter,
interrupter
} = deps
return new Proxy(rest, {
get (target, propKey) {
if (typeof target[propKey] !== 'function') {
Expand All @@ -121,14 +133,21 @@ const _getRestProxy = (rest, bfxApiRouter) => {
const res = _route(
bfxApiRouter,
propKey,
args
args,
interrupter
)

if (_isNotPromiseOrBluebird(res)) {
return res
}

return _asyncApplyHook(bfxApiRouter, res, propKey, ...args)
return _asyncApplyHook(
bfxApiRouter,
res,
propKey,
args,
interrupter
)
} catch (err) {
if (isNonceSmallError(err)) {
attemptsCount += 1
Expand Down Expand Up @@ -165,7 +184,8 @@ module.exports = (conf, bfxApiRouter) => {
authToken: _authToken = ''
} = auth
const {
timeout = 90000
timeout = 90000,
interrupter
} = opts ?? {}

/*
Expand All @@ -185,7 +205,10 @@ module.exports = (conf, bfxApiRouter) => {
}

const rest = bfxInstance.rest(2, restOpts)
const proxy = _getRestProxy(rest, bfxApiRouter)
const proxy = _getRestProxy(rest, {
bfxApiRouter,
interrupter
})

return proxy
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

const { getMethodLimit } = require('../../limit-param.helpers')

const checkLength = (apiRes, apiMethodName) => {
if (apiMethodName === 'candles') {
return apiRes.length <= 1
}

return apiRes.length === 0
}

module.exports = (args, opts) => {
const {
apiRes,
Expand All @@ -27,7 +35,7 @@ module.exports = (args, opts) => {
)

return (
apiRes.length <= 1 || // Check makes sense to prevent double requests to api
checkLength(apiRes, apiMethodName) || // Check makes sense to prevent double requests to api
methodLimit > limit ||
apiRes.some((item) => (
item?.[datePropName] !== mts
Expand Down
8 changes: 5 additions & 3 deletions workers/loc.api/helpers/prepare-response/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const _requestToApi = (
getREST,
apiMethodName,
params,
auth
auth,
interrupter
) => {
const rest = getREST(auth)
const rest = getREST(auth, { interrupter })
const bfxApiMethodName = getBfxApiMethodName(apiMethodName)

const fn = rest[bfxApiMethodName].bind(rest)
Expand Down Expand Up @@ -55,7 +56,8 @@ const _getResAndParams = async (
getREST,
apiMethodName,
queryParams,
args.auth
args.auth,
args?.interrupter
)

return {
Expand Down
14 changes: 14 additions & 0 deletions workers/loc.api/interrupter/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

const EventEmitter = require('events')

const INTERRUPTER_NAMES = require('./interrupter.names')

const { decorateInjectable } = require('../di/utils')

class Interrupter extends EventEmitter {
Expand All @@ -14,6 +16,14 @@ class Interrupter extends EventEmitter {

this._isInterrupted = false
this._interruptPromise = Promise.resolve()

this.name = INTERRUPTER_NAMES.COMMON_INTERRUPTER
}

setName (name) {
this.name = name ?? INTERRUPTER_NAMES.COMMON_INTERRUPTER

return this
}

hasInterrupted () {
Expand Down Expand Up @@ -63,6 +73,10 @@ class Interrupter extends EventEmitter {
this.off(this.INTERRUPT_EVENT, cb)
}

onceInterrupted (cb) {
this.once(this.INTERRUPTED_EVENT, cb)
}

emitInterrupted (error, progress) {
if (error) {
this.emit(this.INTERRUPTED_WITH_ERR_EVENT, error)
Expand Down
8 changes: 8 additions & 0 deletions workers/loc.api/interrupter/interrupter.names.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict'

module.exports = {
COMMON_INTERRUPTER: 'COMMON_INTERRUPTER',

SYNC_INTERRUPTER: 'SYNC_INTERRUPTER',
TRX_TAX_REPORT_INTERRUPTER: 'TRX_TAX_REPORT_INTERRUPTER'
}
3 changes: 2 additions & 1 deletion workers/loc.api/queue/write-data-to-stream/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ module.exports = (
const _res = await getDataFromApi({
getData,
args: currIterationArgs,
callerName: 'REPORT_FILE_WRITER'
callerName: 'REPORT_FILE_WRITER',
shouldNotInterrupt: true
})

const isGetWalletsMethod = method === 'getWallets'
Expand Down
Loading

0 comments on commit 7e2d0c8

Please sign in to comment.