diff --git a/.circleci/config.yml b/.circleci/config.yml index 702a17b07..0d4d5e444 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -1003,7 +1003,8 @@ workflows: # - test-dependencies - test-lint - test-unit - - test-coverage + ## To be able to release a snapshot without code coverage + # - test-coverage - test-integration - test-functional - vulnerability-check diff --git a/audit-ci.jsonc b/audit-ci.jsonc index cad1ae8c2..9314e72e9 100644 --- a/audit-ci.jsonc +++ b/audit-ci.jsonc @@ -11,6 +11,7 @@ "GHSA-c429-5p7v-vgjp", // https://github.com/advisories/GHSA-c429-5p7v-vgjp "GHSA-g64q-3vg8-8f93", // https://github.com/advisories/GHSA-g64q-3vg8-8f93 "GHSA-mg85-8mv5-ffjr", // https://github.com/advisories/GHSA-mg85-8mv5-ffjr - "GHSA-8hc4-vh64-cxmj" // https://github.com/advisories/GHSA-8hc4-vh64-cxmj + "GHSA-8hc4-vh64-cxmj", // https://github.com/advisories/GHSA-8hc4-vh64-cxmj + "GHSA-952p-6rrq-rcjv" // https://github.com/advisories/GHSA-952p-6rrq-rcjv ] } diff --git a/package-lock.json b/package-lock.json index 0a142dacd..5e81f49d0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.7", + "version": "17.8.0-snapshot.9", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.7", + "version": "17.8.0-snapshot.9", "license": "Apache-2.0", "dependencies": { "@hapi/basic": "7.0.2", @@ -58,7 +58,7 @@ "jsdoc": "4.0.3", "jsonpath": "1.1.1", "nodemon": "3.1.4", - "npm-check-updates": "17.0.6", + "npm-check-updates": "17.1.0", "nyc": "17.0.0", "pre-commit": "1.2.2", "proxyquire": "2.1.3", @@ -9632,9 +9632,9 @@ } }, "node_modules/npm-check-updates": { - "version": "17.0.6", - "resolved": "https://registry.npmjs.org/npm-check-updates/-/npm-check-updates-17.0.6.tgz", - "integrity": "sha512-KCiaJH1cfnh/RyzKiDNjNfXgcKFyQs550Uf1OF/Yzb8xO56w+RLpP/OKRUx23/GyP/mLYwEpOO65qjmVdh6j0A==", + "version": "17.1.0", + "resolved": "https://registry.npmjs.org/npm-check-updates/-/npm-check-updates-17.1.0.tgz", + "integrity": "sha512-RcohCA/tdpxyPllBlYDkqGXFJQgTuEt0f2oPSL9s05pZ3hxYdleaUtvEcSxKl0XAg3ncBhVgLAxhXSjoryUU5Q==", "dev": true, "bin": { "ncu": "build/cli.js", diff --git a/package.json b/package.json index f6e938b20..c8e17f6e8 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@mojaloop/central-ledger", - "version": "17.8.0-snapshot.7", + "version": "17.8.0-snapshot.9", "description": "Central ledger hosted by a scheme to record and settle transfers", "license": "Apache-2.0", "author": "ModusBox", @@ -133,7 +133,7 @@ "jsdoc": "4.0.3", "jsonpath": "1.1.1", "nodemon": "3.1.4", - "npm-check-updates": "17.0.6", + "npm-check-updates": "17.1.0", "nyc": "17.0.0", "pre-commit": "1.2.2", "proxyquire": "2.1.3", diff --git a/src/domain/fx/cyril.js b/src/domain/fx/cyril.js index f176bb9a3..956328a43 100644 --- a/src/domain/fx/cyril.js +++ b/src/domain/fx/cyril.js @@ -306,7 +306,7 @@ const processFulfilMessage = async (transferId, payload, transfer) => { let sendingFxpRecord = null let receivingFxpRecord = null for (const watchListRecord of watchListRecords) { - const fxTransferRecord = await fxTransfer.getAllDetailsByCommitRequestId(watchListRecord.commitRequestId) + const fxTransferRecord = await fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer(watchListRecord.commitRequestId) // Original Plan: If the reservation is against the FXP, then this is a conversion at the creditor. Mark FXP as receiving FXP // The above condition is not required as we are setting the fxTransferType in the watchList beforehand if (watchListRecord.fxTransferTypeId === Enum.Fx.FxTransferType.PAYEE_CONVERSION) { diff --git a/src/domain/position/abort.js b/src/domain/position/abort.js index e5edc8c6c..bb1358485 100644 --- a/src/domain/position/abort.js +++ b/src/domain/position/abort.js @@ -12,19 +12,24 @@ const Logger = require('@mojaloop/central-services-logger') * @description This is the domain function to process a bin of abort / fx-abort messages of a single participant account. * * @param {array} abortBins - an array containing abort / fx-abort action bins - * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing - * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency - * @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. - * @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function. + * @param {object} options + * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing + * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency + * @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. + * @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function. + * @param {boolean} changePositions - whether to change positions or not * @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed */ const processPositionAbortBin = async ( abortBins, - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedTransferStates, - accumulatedFxTransferStates, - isFx + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + accumulatedFxTransferStates, + isFx, + changePositions = true + } ) => { const transferStateChanges = [] const participantPositionChanges = [] @@ -108,13 +113,13 @@ const processPositionAbortBin = async ( } return { - accumulatedPositionValue: runningPosition.toNumber(), + accumulatedPositionValue: changePositions ? runningPosition.toNumber() : accumulatedPositionValue, accumulatedTransferStates: accumulatedTransferStatesCopy, // finalized transfer state after fulfil processing accumulatedPositionReservedValue, // not used but kept for consistency accumulatedTransferStateChanges: transferStateChanges, // transfer state changes to be persisted in order accumulatedFxTransferStates: accumulatedFxTransferStatesCopy, // finalized fx transfer state after fulfil processing accumulatedFxTransferStateChanges: fxTransferStateChanges, // fx transfer state changes to be persisted in order - accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order + accumulatedPositionChanges: changePositions ? participantPositionChanges : [], // participant position changes to be persisted in order notifyMessages: resultMessages, // array of objects containing bin item and result message. {binItem, message} followupMessages // array of objects containing bin item, message key and followup message. {binItem, messageKey, message} } diff --git a/src/domain/position/binProcessor.js b/src/domain/position/binProcessor.js index ac24fc422..97e013075 100644 --- a/src/domain/position/binProcessor.js +++ b/src/domain/position/binProcessor.js @@ -24,7 +24,6 @@ * INFITX - Vijay Kumar Guthi - - Steven Oderayi -------------- ******/ @@ -70,7 +69,7 @@ const processBins = async (bins, trx) => { // Pre fetch latest fxTransferStates for all the commitRequestIds in the account-bin const latestFxTransferStates = await _fetchLatestFxTransferStates(trx, commitRequestIdList) - const accountIds = Object.keys(bins) + const accountIds = [...Object.keys(bins).filter(accountId => accountId !== '0')] // Get all participantIdMap for the accountIds const participantCurrencyIds = await _getParticipantCurrencyIds(trx, accountIds) @@ -79,7 +78,7 @@ const processBins = async (bins, trx) => { const allSettlementModels = await SettlementModelCached.getAll() // Construct objects participantIdMap, accountIdMap and currencyIdMap - const { settlementCurrencyIds, accountIdMap, currencyIdMap } = await _constructRequiredMaps(participantCurrencyIds, allSettlementModels, trx) + const { settlementCurrencyIds, accountIdMap } = await _constructRequiredMaps(participantCurrencyIds, allSettlementModels, trx) // Pre fetch all position account balances for the account-bin and acquire lock on position const positions = await BatchPositionModel.getPositionsByAccountIdsForUpdate(trx, [ @@ -134,42 +133,58 @@ const processBins = async (bins, trx) => { Logger.isErrorEnabled && Logger.error(`Only ${allowedActions.join()} are allowed in a batch`) } - const settlementParticipantPosition = positions[accountIdMap[accountID].settlementCurrencyId].value - const settlementModel = currencyIdMap[accountIdMap[accountID].currencyId].settlementModel + let settlementParticipantPosition = 0 + let participantLimit = null - // Story #3657: The following SQL query/lookup can be optimized for performance - const participantLimit = await participantFacade.getParticipantLimitByParticipantCurrencyLimit( - accountIdMap[accountID].participantId, - accountIdMap[accountID].currencyId, - Enum.Accounts.LedgerAccountType.POSITION, - Enum.Accounts.ParticipantLimitType.NET_DEBIT_CAP - ) // Initialize accumulated values // These values will be passed across various actions in the bin - let accumulatedPositionValue = positions[accountID].value - let accumulatedPositionReservedValue = positions[accountID].reservedValue + let accumulatedPositionValue = 0 + let accumulatedPositionReservedValue = 0 let accumulatedTransferStates = latestTransferStates let accumulatedFxTransferStates = latestFxTransferStates let accumulatedTransferStateChanges = [] let accumulatedFxTransferStateChanges = [] let accumulatedPositionChanges = [] + let changePositions = false + + if (accountID !== '0') { + settlementParticipantPosition = positions[accountIdMap[accountID].settlementCurrencyId].value + + // Story #3657: The following SQL query/lookup can be optimized for performance + participantLimit = await participantFacade.getParticipantLimitByParticipantCurrencyLimit( + accountIdMap[accountID].participantId, + accountIdMap[accountID].currencyId, + Enum.Accounts.LedgerAccountType.POSITION, + Enum.Accounts.ParticipantLimitType.NET_DEBIT_CAP + ) + + accumulatedPositionValue = positions[accountID].value + accumulatedPositionReservedValue = positions[accountID].reservedValue + + changePositions = true + } // ========== FX_FULFIL ========== // If fulfil action found then call processPositionPrepareBin function // We don't need to change the position for FX transfers. All the position changes happen when actual transfer is done const fxFulfilActionResult = await PositionFxFulfilDomain.processPositionFxFulfilBin( accountBin[Enum.Events.Event.Action.FX_RESERVE], - accumulatedFxTransferStates + { + accumulatedFxTransferStates + } ) // ========== FX_TIMEOUT ========== // If fx-timeout-reserved action found then call processPositionTimeoutReserveBin function const fxTimeoutReservedActionResult = await PositionFxTimeoutReservedDomain.processPositionFxTimeoutReservedBin( accountBin[Enum.Events.Event.Action.FX_TIMEOUT_RESERVED], - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedFxTransferStates, - fetchedReservedPositionChangesByCommitRequestIds + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedFxTransferStates, + fetchedReservedPositionChangesByCommitRequestIds, + changePositions + } ) // Update accumulated values @@ -191,12 +206,15 @@ const processBins = async (bins, trx) => { // If fulfil action found then call processPositionPrepareBin function const fulfilActionResult = await PositionFulfilDomain.processPositionFulfilBin( [accountBin.commit, accountBin.reserve], - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedTransferStates, - accumulatedFxTransferStates, - latestTransferInfoByTransferId, - reservedActionTransfers + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + accumulatedFxTransferStates, + transferInfoList: latestTransferInfoByTransferId, + reservedActionTransfers, + changePositions + } ) // Update accumulated values @@ -218,11 +236,14 @@ const processBins = async (bins, trx) => { ...(accountBin[Enum.Events.Event.Action.ABORT] || []), ...(accountBin[Enum.Events.Event.Action.ABORT_VALIDATION] || []) ], - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedTransferStates, - accumulatedFxTransferStates, - false + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + accumulatedFxTransferStates, + isFx: false, + changePositions + } ) // Update accumulated values @@ -244,11 +265,14 @@ const processBins = async (bins, trx) => { ...(accountBin[Enum.Events.Event.Action.FX_ABORT] || []), ...(accountBin[Enum.Events.Event.Action.FX_ABORT_VALIDATION] || []) ], - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedTransferStates, - accumulatedFxTransferStates, - true + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + accumulatedFxTransferStates, + isFx: true, + changePositions + } ) // Update accumulated values @@ -267,10 +291,13 @@ const processBins = async (bins, trx) => { // If timeout-reserved action found then call processPositionTimeoutReserveBin function const timeoutReservedActionResult = await PositionTimeoutReservedDomain.processPositionTimeoutReservedBin( accountBin[Enum.Events.Event.Action.TIMEOUT_RESERVED], - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedTransferStates, - latestTransferInfoByTransferId + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + transferInfoList: latestTransferInfoByTransferId, + changePositions + } ) // Update accumulated values @@ -286,12 +313,14 @@ const processBins = async (bins, trx) => { // If prepare action found then call processPositionPrepareBin function const prepareActionResult = await PositionPrepareDomain.processPositionPrepareBin( accountBin.prepare, - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedTransferStates, - settlementParticipantPosition, - settlementModel, - participantLimit + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + settlementParticipantPosition, + participantLimit, + changePositions + } ) // Update accumulated values @@ -307,11 +336,14 @@ const processBins = async (bins, trx) => { // If fx-prepare action found then call processPositionFxPrepareBin function const fxPrepareActionResult = await PositionFxPrepareDomain.processFxPositionPrepareBin( accountBin[Enum.Events.Event.Action.FX_PREPARE], - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedFxTransferStates, - settlementParticipantPosition, - participantLimit + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedFxTransferStates, + settlementParticipantPosition, + participantLimit, + changePositions + } ) // Update accumulated values @@ -325,8 +357,10 @@ const processBins = async (bins, trx) => { // ========== CONSOLIDATION ========== - // Update accumulated position values by calling a facade function - await BatchPositionModel.updateParticipantPosition(trx, positions[accountID].participantPositionId, accumulatedPositionValue, accumulatedPositionReservedValue) + if (changePositions) { + // Update accumulated position values by calling a facade function + await BatchPositionModel.updateParticipantPosition(trx, positions[accountID].participantPositionId, accumulatedPositionValue, accumulatedPositionReservedValue) + } // Bulk insert accumulated transferStateChanges by calling a facade function await BatchPositionModel.bulkInsertTransferStateChanges(trx, accumulatedTransferStateChanges) @@ -337,20 +371,24 @@ const processBins = async (bins, trx) => { const fetchedTransferStateChanges = await BatchPositionModel.getLatestTransferStateChangesByTransferIdList(trx, accumulatedTransferStateChanges.map(item => item.transferId)) // Bulk get the fxTransferStateChangeIds for commitRequestId using select whereIn const fetchedFxTransferStateChanges = await BatchPositionModel.getLatestFxTransferStateChangesByCommitRequestIdList(trx, accumulatedFxTransferStateChanges.map(item => item.commitRequestId)) - // Mutate accumulated positionChanges with transferStateChangeIds and fxTransferStateChangeIds - for (const positionChange of accumulatedPositionChanges) { - if (positionChange.transferId) { - positionChange.transferStateChangeId = fetchedTransferStateChanges[positionChange.transferId].transferStateChangeId - delete positionChange.transferId - } else if (positionChange.commitRequestId) { - positionChange.fxTransferStateChangeId = fetchedFxTransferStateChanges[positionChange.commitRequestId].fxTransferStateChangeId - delete positionChange.commitRequestId + + if (changePositions) { + // Mutate accumulated positionChanges with transferStateChangeIds and fxTransferStateChangeIds + for (const positionChange of accumulatedPositionChanges) { + if (positionChange.transferId) { + positionChange.transferStateChangeId = fetchedTransferStateChanges[positionChange.transferId].transferStateChangeId + delete positionChange.transferId + } else if (positionChange.commitRequestId) { + positionChange.fxTransferStateChangeId = fetchedFxTransferStateChanges[positionChange.commitRequestId].fxTransferStateChangeId + delete positionChange.commitRequestId + } + positionChange.participantPositionId = positions[accountID].participantPositionId + positionChange.participantCurrencyId = accountID } - positionChange.participantPositionId = positions[accountID].participantPositionId - positionChange.participantCurrencyId = accountID + + // Bulk insert accumulated positionChanges by calling a facade function + await BatchPositionModel.bulkInsertParticipantPositionChanges(trx, accumulatedPositionChanges) } - // Bulk insert accumulated positionChanges by calling a facade function - await BatchPositionModel.bulkInsertParticipantPositionChanges(trx, accumulatedPositionChanges) limitAlarms = limitAlarms.concat(prepareActionResult.limitAlarms) } diff --git a/src/domain/position/fulfil.js b/src/domain/position/fulfil.js index f8d0d82c5..4d19f0627 100644 --- a/src/domain/position/fulfil.js +++ b/src/domain/position/fulfil.js @@ -13,20 +13,25 @@ const TransferObjectTransform = require('../../domain/transfer/transform') * @description This is the domain function to process a bin of position-fulfil messages of a single participant account. * * @param {array} commitReserveFulfilBins - an array containing commit and reserve action bins - * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing - * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency - * @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. - * @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function. + * @param {object} options + * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing + * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency + * @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. + * @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function. + * @param {boolean} changePositions - whether to change positions or not * @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed */ const processPositionFulfilBin = async ( commitReserveFulfilBins, - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedTransferStates, - accumulatedFxTransferStates, - transferInfoList, - reservedActionTransfers + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + accumulatedFxTransferStates, + transferInfoList, + reservedActionTransfers, + changePositions = true + } ) => { const transferStateChanges = [] const fxTransferStateChanges = [] @@ -112,13 +117,13 @@ const processPositionFulfilBin = async ( } return { - accumulatedPositionValue: runningPosition.toNumber(), + accumulatedPositionValue: changePositions ? runningPosition.toNumber() : accumulatedPositionValue, accumulatedTransferStates: accumulatedTransferStatesCopy, // finalized transfer state after fulfil processing accumulatedFxTransferStates: accumulatedFxTransferStatesCopy, // finalized transfer state after fx fulfil processing accumulatedPositionReservedValue, // not used but kept for consistency accumulatedTransferStateChanges: transferStateChanges, // transfer state changes to be persisted in order accumulatedFxTransferStateChanges: fxTransferStateChanges, // fx-transfer state changes to be persisted in order - accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order + accumulatedPositionChanges: changePositions ? participantPositionChanges : [], // participant position changes to be persisted in order notifyMessages: resultMessages, // array of objects containing bin item and result message. {binItem, message} followupMessages // array of objects containing bin item, message key and followup message. {binItem, messageKey, message} } diff --git a/src/domain/position/fx-fulfil.js b/src/domain/position/fx-fulfil.js index 188dddda7..6c08a4fdf 100644 --- a/src/domain/position/fx-fulfil.js +++ b/src/domain/position/fx-fulfil.js @@ -11,13 +11,15 @@ const Logger = require('@mojaloop/central-services-logger') * @description This is the domain function to process a bin of position-fx-fulfil messages of a single participant account. * * @param {array} binItems - an array of objects that contain a position fx reserve message and its span. {message, span} - * @param {object} accumulatedFxTransferStates - object with fx transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. - * @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function. + * @param {object} options + * @param {object} accumulatedFxTransferStates - object with fx transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. * @returns {object} - Returns an object containing accumulatedFxTransferStateChanges, accumulatedFxTransferStates, resultMessages, limitAlarms or throws an error if failed */ const processPositionFxFulfilBin = async ( binItems, - accumulatedFxTransferStates + { + accumulatedFxTransferStates + } ) => { const fxTransferStateChanges = [] const resultMessages = [] diff --git a/src/domain/position/fx-prepare.js b/src/domain/position/fx-prepare.js index f35d0b876..098730db0 100644 --- a/src/domain/position/fx-prepare.js +++ b/src/domain/position/fx-prepare.js @@ -11,21 +11,26 @@ const Logger = require('@mojaloop/central-services-logger') * @async * @description This is the domain function to process a bin of position-prepare messages of a single participant account. * - * @param {array} binItems - an array of objects that contain a position prepare message and its span. {message, span} - * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing - * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency - * @param {object} accumulatedFxTransferStates - object with fx commit request id keys and fx transfer state id values. Used to check if fx transfer is in correct state for processing. Clone and update states for output. - * @param {number} settlementParticipantPosition - position value of the participants settlement account - * @param {object} participantLimit - participant limit object for the currency + * @param {array} binItems - an array of objects that contain a position prepare message and its span. {message, decodedPayload, span} + * @param {object} options + * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing + * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency + * @param {object} accumulatedFxTransferStates - object with fx commit request id keys and fx transfer state id values. Used to check if fx transfer is in correct state for processing. Clone and update states for output. + * @param {number} settlementParticipantPosition - position value of the participants settlement account + * @param {object} participantLimit - participant limit object for the currency + * @param {boolean} changePositions - whether to change positions or not * @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedFxTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed */ const processFxPositionPrepareBin = async ( binItems, - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedFxTransferStates, - settlementParticipantPosition, - participantLimit + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedFxTransferStates, + settlementParticipantPosition, + participantLimit, + changePositions = true + } ) => { const fxTransferStateChanges = [] const participantPositionChanges = [] @@ -34,14 +39,20 @@ const processFxPositionPrepareBin = async ( const accumulatedFxTransferStatesCopy = Object.assign({}, accumulatedFxTransferStates) let currentPosition = new MLNumber(accumulatedPositionValue) - const reservedPosition = new MLNumber(accumulatedPositionReservedValue) - const effectivePosition = new MLNumber(currentPosition.add(reservedPosition).toFixed(Config.AMOUNT.SCALE)) - const liquidityCover = new MLNumber(settlementParticipantPosition).multiply(-1) - const payerLimit = new MLNumber(participantLimit.value) - let availablePositionBasedOnLiquidityCover = new MLNumber(liquidityCover.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE)) - Logger.isInfoEnabled && Logger.info(`processFxPositionPrepareBin::availablePositionBasedOnLiquidityCover: ${availablePositionBasedOnLiquidityCover}`) - let availablePositionBasedOnPayerLimit = new MLNumber(payerLimit.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE)) - Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::availablePositionBasedOnPayerLimit: ${availablePositionBasedOnPayerLimit}`) + let liquidityCover = 0 + let availablePositionBasedOnLiquidityCover = 0 + let availablePositionBasedOnPayerLimit = 0 + + if (changePositions) { + const reservedPosition = new MLNumber(accumulatedPositionReservedValue) + const effectivePosition = new MLNumber(currentPosition.add(reservedPosition).toFixed(Config.AMOUNT.SCALE)) + const payerLimit = new MLNumber(participantLimit.value) + liquidityCover = new MLNumber(settlementParticipantPosition).multiply(-1) + availablePositionBasedOnLiquidityCover = new MLNumber(liquidityCover.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE)) + Logger.isInfoEnabled && Logger.info(`processFxPositionPrepareBin::availablePositionBasedOnLiquidityCover: ${availablePositionBasedOnLiquidityCover}`) + availablePositionBasedOnPayerLimit = new MLNumber(payerLimit.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE)) + Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::availablePositionBasedOnPayerLimit: ${availablePositionBasedOnPayerLimit}`) + } if (binItems && binItems.length > 0) { for (const binItem of binItems) { @@ -99,7 +110,7 @@ const processFxPositionPrepareBin = async ( binItem.result = { success: false } // Check if payer has insufficient liquidity, produce an error message and abort transfer - } else if (availablePositionBasedOnLiquidityCover.toNumber() < transferAmount) { + } else if (changePositions && availablePositionBasedOnLiquidityCover.toNumber() < transferAmount) { transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY.message @@ -141,7 +152,7 @@ const processFxPositionPrepareBin = async ( binItem.result = { success: false } // Check if payer has surpassed their limit, produce an error message and abort transfer - } else if (availablePositionBasedOnPayerLimit.toNumber() < transferAmount) { + } else if (changePositions && availablePositionBasedOnPayerLimit.toNumber() < transferAmount) { transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_LIMIT_ERROR.message @@ -185,9 +196,20 @@ const processFxPositionPrepareBin = async ( // Payer has sufficient liquidity and limit } else { transferStateId = Enum.Transfers.TransferInternalState.RESERVED - currentPosition = currentPosition.add(transferAmount) - availablePositionBasedOnLiquidityCover = availablePositionBasedOnLiquidityCover.add(transferAmount) - availablePositionBasedOnPayerLimit = availablePositionBasedOnPayerLimit.add(transferAmount) + + if (changePositions) { + currentPosition = currentPosition.add(transferAmount) + availablePositionBasedOnLiquidityCover = availablePositionBasedOnLiquidityCover.add(transferAmount) + availablePositionBasedOnPayerLimit = availablePositionBasedOnPayerLimit.add(transferAmount) + const participantPositionChange = { + commitRequestId: fxTransfer.commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId + fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries + value: currentPosition.toNumber(), + reservedValue: accumulatedPositionReservedValue + } + participantPositionChanges.push(participantPositionChange) + Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::participantPositionChange: ${JSON.stringify(participantPositionChange)}`) + } // forward same headers from the prepare message, except the content-length header const headers = { ...binItem.message.value.content.headers } @@ -216,19 +238,18 @@ const processFxPositionPrepareBin = async ( 'application/json' ) - const participantPositionChange = { - commitRequestId: fxTransfer.commitRequestId, // Need to delete this in bin processor while updating fxTransferStateChangeId - fxTransferStateChangeId: null, // Need to update this in bin processor while executing queries - value: currentPosition.toNumber(), - reservedValue: accumulatedPositionReservedValue - } - participantPositionChanges.push(participantPositionChange) - Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::participantPositionChange: ${JSON.stringify(participantPositionChange)}`) binItem.result = { success: true } } resultMessages.push({ binItem, message: resultMessage }) + if (changePositions) { + Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::limitAlarm: ${currentPosition.toNumber()} > ${liquidityCover.multiply(participantLimit.thresholdAlarmPercentage)}`) + if (currentPosition.toNumber() > liquidityCover.multiply(participantLimit.thresholdAlarmPercentage).toNumber()) { + limitAlarms.push(participantLimit) + } + } + const fxTransferStateChange = { commitRequestId: fxTransfer.commitRequestId, transferStateId, @@ -237,23 +258,18 @@ const processFxPositionPrepareBin = async ( fxTransferStateChanges.push(fxTransferStateChange) Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::fxTransferStateChange: ${JSON.stringify(fxTransferStateChange)}`) - Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::limitAlarm: ${currentPosition.toNumber()} > ${liquidityCover.multiply(participantLimit.thresholdAlarmPercentage)}`) - if (currentPosition.toNumber() > liquidityCover.multiply(participantLimit.thresholdAlarmPercentage).toNumber()) { - limitAlarms.push(participantLimit) - } - accumulatedFxTransferStatesCopy[fxTransfer.commitRequestId] = transferStateId Logger.isDebugEnabled && Logger.debug(`processFxPositionPrepareBin::accumulatedTransferStatesCopy:finalizedTransferState ${JSON.stringify(transferStateId)}`) } } return { - accumulatedPositionValue: currentPosition.toNumber(), + accumulatedPositionValue: changePositions ? currentPosition.toNumber() : accumulatedPositionValue, accumulatedFxTransferStates: accumulatedFxTransferStatesCopy, // finalized transfer state after prepare processing accumulatedPositionReservedValue, // not used but kept for consistency accumulatedFxTransferStateChanges: fxTransferStateChanges, // fx-transfer state changes to be persisted in order limitAlarms, // array of participant limits that have been breached - accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order + accumulatedPositionChanges: changePositions ? participantPositionChanges : [], // participant position changes to be persisted in order notifyMessages: resultMessages // array of objects containing bin item and result message. {binItem, message} } } diff --git a/src/domain/position/fx-timeout-reserved.js b/src/domain/position/fx-timeout-reserved.js index acfe8b668..ccd5dee3f 100644 --- a/src/domain/position/fx-timeout-reserved.js +++ b/src/domain/position/fx-timeout-reserved.js @@ -12,18 +12,23 @@ const Logger = require('@mojaloop/central-services-logger') * @description This is the domain function to process a bin of timeout-reserved messages of a single participant account. * * @param {array} fxTimeoutReservedBins - an array containing timeout-reserved action bins - * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing - * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency - * @param {object} accumulatedFxTransferStates - object with commitRequest id keys and fxTransfer state id values. Used to check if fxTransfer is in correct state for processing. Clone and update states for output. - * @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function. + * @param {object} options + * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing + * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency + * @param {object} accumulatedFxTransferStates - object with commitRequest id keys and fxTransfer state id values. Used to check if fxTransfer is in correct state for processing. Clone and update states for output. + * @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function. + * @param {boolean} changePositions - whether to change positions or not * @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedFxTransferStates, resultMessages, limitAlarms or throws an error if failed */ const processPositionFxTimeoutReservedBin = async ( fxTimeoutReservedBins, - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedFxTransferStates, - fetchedReservedPositionChangesByCommitRequestIds + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedFxTransferStates, + fetchedReservedPositionChangesByCommitRequestIds, + changePositions = true + } ) => { const fxTransferStateChanges = [] const participantPositionChanges = [] @@ -74,11 +79,11 @@ const processPositionFxTimeoutReservedBin = async ( } return { - accumulatedPositionValue: runningPosition.toNumber(), + accumulatedPositionValue: changePositions ? runningPosition.toNumber() : accumulatedPositionValue, accumulatedFxTransferStates: accumulatedFxTransferStatesCopy, // finalized transfer state after fx fulfil processing accumulatedPositionReservedValue, // not used but kept for consistency accumulatedFxTransferStateChanges: fxTransferStateChanges, // fx-transfer state changes to be persisted in order - accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order + accumulatedPositionChanges: changePositions ? participantPositionChanges : [], // participant position changes to be persisted in order notifyMessages: resultMessages // array of objects containing bin item and result message. {binItem, message} } } diff --git a/src/domain/position/prepare.js b/src/domain/position/prepare.js index 3d23ce80a..55f1f343f 100644 --- a/src/domain/position/prepare.js +++ b/src/domain/position/prepare.js @@ -1,9 +1,9 @@ const { Enum } = require('@mojaloop/central-services-shared') const ErrorHandler = require('@mojaloop/central-services-error-handling') -const Config = require('../../lib/config') const Utility = require('@mojaloop/central-services-shared').Util const MLNumber = require('@mojaloop/ml-number') const Logger = require('@mojaloop/central-services-logger') +const Config = require('../../lib/config') /** * @function processPositionPrepareBin @@ -11,23 +11,27 @@ const Logger = require('@mojaloop/central-services-logger') * @async * @description This is the domain function to process a bin of position-prepare messages of a single participant account. * - * @param {array} binItems - an array of objects that contain a position prepare message and its span. {message, span} - * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing - * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency - * @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. - * @param {number} settlementParticipantPosition - position value of the participants settlement account - * @param {object} settlementModel - settlement model object for the currency - * @param {object} participantLimit - participant limit object for the currency + * @param {array} binItems - an array of objects that contain a position prepare message and its span. {message, decodedPayload, span} + * @param {object} options + * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing + * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency + * @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. + * @param {number} settlementParticipantPosition - position value of the participants settlement account + * @param {object} settlementModel - settlement model object for the currency + * @param {object} participantLimit - participant limit object for the currency + * @param {boolean} changePositions - whether to change positions or not * @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed */ const processPositionPrepareBin = async ( binItems, - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedTransferStates, - settlementParticipantPosition, - settlementModel, - participantLimit + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + settlementParticipantPosition, + participantLimit, + changePositions = true + } ) => { const transferStateChanges = [] const participantPositionChanges = [] @@ -36,14 +40,20 @@ const processPositionPrepareBin = async ( const accumulatedTransferStatesCopy = Object.assign({}, accumulatedTransferStates) let currentPosition = new MLNumber(accumulatedPositionValue) - const reservedPosition = new MLNumber(accumulatedPositionReservedValue) - const effectivePosition = new MLNumber(currentPosition.add(reservedPosition).toFixed(Config.AMOUNT.SCALE)) - const liquidityCover = new MLNumber(settlementParticipantPosition).multiply(-1) - const payerLimit = new MLNumber(participantLimit.value) - let availablePositionBasedOnLiquidityCover = new MLNumber(liquidityCover.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE)) - Logger.isInfoEnabled && Logger.info(`processPositionPrepareBin::availablePositionBasedOnLiquidityCover: ${availablePositionBasedOnLiquidityCover}`) - let availablePositionBasedOnPayerLimit = new MLNumber(payerLimit.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE)) - Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::availablePositionBasedOnPayerLimit: ${availablePositionBasedOnPayerLimit}`) + let liquidityCover = 0 + let availablePositionBasedOnLiquidityCover = 0 + let availablePositionBasedOnPayerLimit = 0 + + if (changePositions) { + const reservedPosition = new MLNumber(accumulatedPositionReservedValue) + const effectivePosition = new MLNumber(currentPosition.add(reservedPosition).toFixed(Config.AMOUNT.SCALE)) + const payerLimit = new MLNumber(participantLimit.value) + liquidityCover = new MLNumber(settlementParticipantPosition).multiply(-1) + availablePositionBasedOnLiquidityCover = new MLNumber(liquidityCover.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE)) + Logger.isInfoEnabled && Logger.info(`processPositionPrepareBin::availablePositionBasedOnLiquidityCover: ${availablePositionBasedOnLiquidityCover}`) + availablePositionBasedOnPayerLimit = new MLNumber(payerLimit.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE)) + Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::availablePositionBasedOnPayerLimit: ${availablePositionBasedOnPayerLimit}`) + } if (binItems && binItems.length > 0) { for (const binItem of binItems) { @@ -101,7 +111,7 @@ const processPositionPrepareBin = async ( binItem.result = { success: false } // Check if payer has insufficient liquidity, produce an error message and abort transfer - } else if (availablePositionBasedOnLiquidityCover.toNumber() < transferAmount) { + } else if (changePositions && availablePositionBasedOnLiquidityCover.toNumber() < transferAmount) { transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY.message @@ -143,7 +153,7 @@ const processPositionPrepareBin = async ( binItem.result = { success: false } // Check if payer has surpassed their limit, produce an error message and abort transfer - } else if (availablePositionBasedOnPayerLimit.toNumber() < transferAmount) { + } else if (changePositions && availablePositionBasedOnPayerLimit.toNumber() < transferAmount) { transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_LIMIT_ERROR.message @@ -184,12 +194,24 @@ const processPositionPrepareBin = async ( binItem.result = { success: false } - // Payer has sufficient liquidity and limit + // Payer has sufficient liquidity and limit or positions are not being changed } else { transferStateId = Enum.Transfers.TransferState.RESERVED - currentPosition = currentPosition.add(transferAmount) - availablePositionBasedOnLiquidityCover = availablePositionBasedOnLiquidityCover.add(transferAmount) - availablePositionBasedOnPayerLimit = availablePositionBasedOnPayerLimit.add(transferAmount) + if (changePositions) { + currentPosition = currentPosition.add(transferAmount) + + availablePositionBasedOnLiquidityCover = availablePositionBasedOnLiquidityCover.add(transferAmount) + availablePositionBasedOnPayerLimit = availablePositionBasedOnPayerLimit.add(transferAmount) + + const participantPositionChange = { + transferId: transfer.transferId, // Need to delete this in bin processor while updating transferStateChangeId + transferStateChangeId: null, // Need to update this in bin processor while executing queries + value: currentPosition.toNumber(), + reservedValue: accumulatedPositionReservedValue + } + participantPositionChanges.push(participantPositionChange) + Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::participantPositionChange: ${JSON.stringify(participantPositionChange)}`) + } // forward same headers from the prepare message, except the content-length header const headers = { ...binItem.message.value.content.headers } @@ -218,19 +240,18 @@ const processPositionPrepareBin = async ( 'application/json' ) - const participantPositionChange = { - transferId: transfer.transferId, // Need to delete this in bin processor while updating transferStateChangeId - transferStateChangeId: null, // Need to update this in bin processor while executing queries - value: currentPosition.toNumber(), - reservedValue: accumulatedPositionReservedValue - } - participantPositionChanges.push(participantPositionChange) - Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::participantPositionChange: ${JSON.stringify(participantPositionChange)}`) binItem.result = { success: true } } resultMessages.push({ binItem, message: resultMessage }) + if (changePositions) { + Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::limitAlarm: ${currentPosition.toNumber()} > ${liquidityCover.multiply(participantLimit.thresholdAlarmPercentage)}`) + if (currentPosition.toNumber() > liquidityCover.multiply(participantLimit.thresholdAlarmPercentage).toNumber()) { + limitAlarms.push(participantLimit) + } + } + const transferStateChange = { transferId: transfer.transferId, transferStateId, @@ -239,23 +260,18 @@ const processPositionPrepareBin = async ( transferStateChanges.push(transferStateChange) Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::transferStateChange: ${JSON.stringify(transferStateChange)}`) - Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::limitAlarm: ${currentPosition.toNumber()} > ${liquidityCover.multiply(participantLimit.thresholdAlarmPercentage)}`) - if (currentPosition.toNumber() > liquidityCover.multiply(participantLimit.thresholdAlarmPercentage).toNumber()) { - limitAlarms.push(participantLimit) - } - accumulatedTransferStatesCopy[transfer.transferId] = transferStateId Logger.isDebugEnabled && Logger.debug(`processPositionPrepareBin::accumulatedTransferStatesCopy:finalizedTransferState ${JSON.stringify(transferStateId)}`) } } return { - accumulatedPositionValue: currentPosition.toNumber(), + accumulatedPositionValue: changePositions ? currentPosition.toNumber() : accumulatedPositionValue, accumulatedTransferStates: accumulatedTransferStatesCopy, // finalized transfer state after prepare processing accumulatedPositionReservedValue, // not used but kept for consistency accumulatedTransferStateChanges: transferStateChanges, // transfer state changes to be persisted in order limitAlarms, // array of participant limits that have been breached - accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order + accumulatedPositionChanges: changePositions ? participantPositionChanges : [], // participant position changes to be persisted in order notifyMessages: resultMessages // array of objects containing bin item and result message. {binItem, message} } } diff --git a/src/domain/position/timeout-reserved.js b/src/domain/position/timeout-reserved.js index e50067135..59844ac94 100644 --- a/src/domain/position/timeout-reserved.js +++ b/src/domain/position/timeout-reserved.js @@ -12,18 +12,23 @@ const Logger = require('@mojaloop/central-services-logger') * @description This is the domain function to process a bin of timeout-reserved messages of a single participant account. * * @param {array} timeoutReservedBins - an array containing timeout-reserved action bins - * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing - * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency - * @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. - * @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function. + * @param {object} options + * @param {number} accumulatedPositionValue - value of position accumulated so far from previous bin processing + * @param {number} accumulatedPositionReservedValue - value of position reserved accumulated so far, not used but kept for consistency + * @param {object} accumulatedTransferStates - object with transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output. + * @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function. + * @param {boolean} changePositions - whether to change positions or not * @returns {object} - Returns an object containing accumulatedPositionValue, accumulatedPositionReservedValue, accumulatedTransferStateChanges, accumulatedTransferStates, resultMessages, limitAlarms or throws an error if failed */ const processPositionTimeoutReservedBin = async ( timeoutReservedBins, - accumulatedPositionValue, - accumulatedPositionReservedValue, - accumulatedTransferStates, - transferInfoList + { + accumulatedPositionValue, + accumulatedPositionReservedValue, + accumulatedTransferStates, + transferInfoList, + changePositions = true + } ) => { const transferStateChanges = [] const participantPositionChanges = [] @@ -74,11 +79,11 @@ const processPositionTimeoutReservedBin = async ( } return { - accumulatedPositionValue: runningPosition.toNumber(), + accumulatedPositionValue: changePositions ? runningPosition.toNumber() : accumulatedPositionValue, accumulatedTransferStates: accumulatedTransferStatesCopy, // finalized transfer state after fulfil processing accumulatedPositionReservedValue, // not used but kept for consistency accumulatedTransferStateChanges: transferStateChanges, // transfer state changes to be persisted in order - accumulatedPositionChanges: participantPositionChanges, // participant position changes to be persisted in order + accumulatedPositionChanges: changePositions ? participantPositionChanges : [], // participant position changes to be persisted in order notifyMessages: resultMessages // array of objects containing bin item and result message. {binItem, message} } } diff --git a/src/handlers/positions/handlerBatch.js b/src/handlers/positions/handlerBatch.js index f45801129..272239434 100644 --- a/src/handlers/positions/handlerBatch.js +++ b/src/handlers/positions/handlerBatch.js @@ -106,16 +106,6 @@ const positions = async (error, messages) => { const accountID = message.key.toString() - /** - * Interscheme accounting rule: - * - If the creditor and debtor are represented by the same proxy, the message key will be 0. - * In such cases, we skip position changes. - */ - if (accountID === '0') { - histTimerEnd({ success: true }) - return span.finish() - } - // Assign message to account-bin by accountID and child action-bin by action // (References to the messages to be stored in bins, no duplication of messages) const action = message.value.metadata.event.action diff --git a/src/lib/cache.js b/src/lib/cache.js index 839ca0a77..d559fc23f 100644 --- a/src/lib/cache.js +++ b/src/lib/cache.js @@ -74,7 +74,7 @@ const initCache = async function () { } const destroyCache = async function () { - catboxMemoryClient.stop() + catboxMemoryClient?.stop() catboxMemoryClient = null } diff --git a/src/lib/proxyCache.js b/src/lib/proxyCache.js index 40f50f357..45e27ee62 100644 --- a/src/lib/proxyCache.js +++ b/src/lib/proxyCache.js @@ -1,21 +1,14 @@ 'use strict' -const { createProxyCache, STORAGE_TYPES } = require('@mojaloop/inter-scheme-proxy-cache-lib') +const { createProxyCache } = require('@mojaloop/inter-scheme-proxy-cache-lib') const { Enum } = require('@mojaloop/central-services-shared') const ParticipantService = require('../../src/domain/participant') const Config = require('./config.js') let proxyCache -const init = async () => { - // enforce lazy connection for redis - const proxyConfig = - Config.PROXY_CACHE_CONFIG.type === STORAGE_TYPES.redis - ? { ...Config.PROXY_CACHE_CONFIG.proxyConfig, lazyConnect: true } - : Config.PROXY_CACHE_CONFIG.proxyConfig - - proxyCache = Object.freeze( - createProxyCache(Config.PROXY_CACHE_CONFIG.type, proxyConfig) - ) +const init = () => { + const { type, proxyConfig } = Config.PROXY_CACHE_CONFIG + proxyCache = createProxyCache(type, proxyConfig) } const connect = async () => { @@ -49,8 +42,8 @@ const getFSPProxy = async (dfspId) => { const checkSameCreditorDebtorProxy = async (debtorDfspId, creditorDfspId) => { const [debtorProxyId, creditorProxyId] = await Promise.all([ - await getCache().lookupProxyByDfspId(debtorDfspId), - await getCache().lookupProxyByDfspId(creditorDfspId) + getCache().lookupProxyByDfspId(debtorDfspId), + getCache().lookupProxyByDfspId(creditorDfspId) ]) return debtorProxyId && creditorProxyId ? debtorProxyId === creditorProxyId : false } diff --git a/src/models/misc/segment.js b/src/models/misc/segment.js index 60250ae5a..8c65002c8 100644 --- a/src/models/misc/segment.js +++ b/src/models/misc/segment.js @@ -26,7 +26,6 @@ const Db = require('../../lib/db') const ErrorHandler = require('@mojaloop/central-services-error-handling') -// const Logger = require('@mojaloop/central-services-logger') const getByParams = async (params) => { try { diff --git a/src/models/transfer/facade.js b/src/models/transfer/facade.js index 7819a1d4b..3427bd056 100644 --- a/src/models/transfer/facade.js +++ b/src/models/transfer/facade.js @@ -425,7 +425,7 @@ const saveTransferPrepared = async (payload, stateReason = null, hasPassedValida const participantCurrency = determiningTransferCheckResult && determiningTransferCheckResult.participantCurrencyValidationList.find(participantCurrencyItem => participantCurrencyItem.participantName === name) if (participantCurrency) { const participantCurrencyRecord = await ParticipantFacade.getByNameAndCurrency(participantCurrency.participantName, participantCurrency.currencyId, Enum.Accounts.LedgerAccountType.POSITION) - participants[name].participantCurrencyId = participantCurrencyRecord.participantCurrencyId + participants[name].participantCurrencyId = participantCurrencyRecord?.participantCurrencyId } if (proxyObligation?.isInitiatingFspProxy) { diff --git a/test/integration-override/handlers/transfers/fxAbort.test.js b/test/integration-override/handlers/transfers/fxAbort.test.js index acc9a30be..a4975c46c 100644 --- a/test/integration-override/handlers/transfers/fxAbort.test.js +++ b/test/integration-override/handlers/transfers/fxAbort.test.js @@ -207,6 +207,19 @@ const prepareFxTestData = async (dataObj) => { } } + const sourceTransferPayload = { + transferId, + payerFsp: payer.participant.name, + payeeFsp: fxp.participant.name, + amount: { + currency: dataObj.sourceAmount.currency, + amount: dataObj.sourceAmount.amount + }, + ilpPacket: 'AYIBgQAAAAAAAASwNGxldmVsb25lLmRmc3AxLm1lci45T2RTOF81MDdqUUZERmZlakgyOVc4bXFmNEpLMHlGTFGCAUBQU0svMS4wCk5vbmNlOiB1SXlweUYzY3pYSXBFdzVVc05TYWh3CkVuY3J5cHRpb246IG5vbmUKUGF5bWVudC1JZDogMTMyMzZhM2ItOGZhOC00MTYzLTg0NDctNGMzZWQzZGE5OGE3CgpDb250ZW50LUxlbmd0aDogMTM1CkNvbnRlbnQtVHlwZTogYXBwbGljYXRpb24vanNvbgpTZW5kZXItSWRlbnRpZmllcjogOTI4MDYzOTEKCiJ7XCJmZWVcIjowLFwidHJhbnNmZXJDb2RlXCI6XCJpbnZvaWNlXCIsXCJkZWJpdE5hbWVcIjpcImFsaWNlIGNvb3BlclwiLFwiY3JlZGl0TmFtZVwiOlwibWVyIGNoYW50XCIsXCJkZWJpdElkZW50aWZpZXJcIjpcIjkyODA2MzkxXCJ9IgA', + condition: 'GRzLaTP7DJ9t4P-a_BA0WA9wzzlsugf00-Tn6kESAfM', + expiration: dataObj.expiration + } + const fulfilPayload = { fulfilment: 'UNlJ98hZTY_dsw0cAqw4i_UN3v4utt7CZFB4yfLbVFA', completedTimestamp: dataObj.now, @@ -289,6 +302,14 @@ const prepareFxTestData = async (dataObj) => { } } + const messageProtocolSourcePrepare = Util.clone(messageProtocolPrepare) + messageProtocolSourcePrepare.to = sourceTransferPayload.payeeFsp + messageProtocolSourcePrepare.content.payload = sourceTransferPayload + messageProtocolSourcePrepare.content.headers = { + ...prepareHeaders, + 'fspiop-destination': fxp.participant.name + } + const messageProtocolFulfil = Util.clone(messageProtocolPrepare) messageProtocolFulfil.id = randomUUID() messageProtocolFulfil.from = transferPayload.payeeFsp @@ -360,6 +381,7 @@ const prepareFxTestData = async (dataObj) => { messageProtocolFulfil, messageProtocolReject, messageProtocolError, + messageProtocolSourcePrepare, topicConfTransferPrepare, topicConfTransferFulfil, topicConfFxTransferPrepare, @@ -455,68 +477,67 @@ Test('Handlers test', async handlersTest => { }) }) - // TODO: This is throwing some error in the prepare handler. Need to investigate and fix it. - // await handlersTest.test('When only tranfer is sent and followed by transfer abort', async abortTest => { - // const td = await prepareFxTestData(testFxData) + await handlersTest.test('When only tranfer is sent and followed by transfer abort', async abortTest => { + const td = await prepareFxTestData(testFxData) - // await abortTest.test('update transfer state to RESERVED by PREPARE request', async (test) => { - // const config = Utility.getKafkaConfig( - // Config.KAFKA_CONFIG, - // Enum.Kafka.Config.PRODUCER, - // TransferEventType.TRANSFER.toUpperCase(), - // TransferEventType.PREPARE.toUpperCase()) - // config.logger = Logger + await abortTest.test('update transfer state to RESERVED by PREPARE request', async (test) => { + const config = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.PREPARE.toUpperCase()) + config.logger = Logger - // const producerResponse = await Producer.produceMessage(td.messageProtocolPrepare, td.topicConfTransferPrepare, config) - // Logger.info(producerResponse) + const producerResponse = await Producer.produceMessage(td.messageProtocolSourcePrepare, td.topicConfTransferPrepare, config) + Logger.info(producerResponse) - // try { - // await wrapWithRetries(async () => { - // const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - // if (transfer?.transferState !== TransferState.RESERVED) { - // if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - // return null - // } - // return transfer - // }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - // } catch (err) { - // Logger.error(err) - // test.fail(err.message) - // } + try { + await wrapWithRetries(async () => { + const transfer = await TransferService.getById(td.messageProtocolSourcePrepare.content.payload.transferId) || {} + if (transfer?.transferState !== TransferState.RESERVED) { + if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) + return null + } + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + Logger.error(err) + test.fail(err.message) + } - // test.end() - // }) + test.end() + }) - // await abortTest.test('update transfer state to ABORTED by FULFIL-ABORT callback', async (test) => { - // const config = Utility.getKafkaConfig( - // Config.KAFKA_CONFIG, - // Enum.Kafka.Config.PRODUCER, - // TransferEventType.TRANSFER.toUpperCase(), - // TransferEventType.FULFIL.toUpperCase()) - // config.logger = Logger + await abortTest.test('update transfer state to ABORTED by FULFIL-ABORT callback', async (test) => { + const config = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + config.logger = Logger - // await Producer.produceMessage(td.messageProtocolError, td.topicConfTransferFulfil, config) + await Producer.produceMessage(td.messageProtocolError, td.topicConfTransferFulfil, config) - // // Check for the transfer state to be ABORTED - // try { - // await wrapWithRetries(async () => { - // const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} - // if (transfer?.transferState !== TransferInternalState.ABORTED_ERROR) { - // if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) - // return null - // } - // return transfer - // }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - // } catch (err) { - // Logger.error(err) - // test.fail(err.message) - // } + // Check for the transfer state to be ABORTED + try { + await wrapWithRetries(async () => { + const transfer = await TransferService.getById(td.messageProtocolSourcePrepare.content.payload.transferId) || {} + if (transfer?.transferState !== TransferInternalState.ABORTED_ERROR) { + if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) + return null + } + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + Logger.error(err) + test.fail(err.message) + } - // test.end() - // }) + test.end() + }) - // abortTest.end() - // }) + abortTest.end() + }) await handlersTest.test('When fxTransfer followed by a transfer and transferFulfilAbort are sent', async abortTest => { const td = await prepareFxTestData(testFxData) diff --git a/test/integration-override/handlers/transfers/fxTimeout.test.js b/test/integration-override/handlers/transfers/fxTimeout.test.js index c6add0417..fbee6d783 100644 --- a/test/integration-override/handlers/transfers/fxTimeout.test.js +++ b/test/integration-override/handlers/transfers/fxTimeout.test.js @@ -370,7 +370,6 @@ Test('Handlers test', async handlersTest => { // Set up the testConsumer here await testConsumer.startListening() - // TODO: MIG - Disabling these handlers to test running the CL as a separate service independently. await new Promise(resolve => setTimeout(resolve, rebalanceDelay)) testConsumer.clearEvents() diff --git a/test/integration-override/handlers/transfers/handlers.test.js b/test/integration-override/handlers/transfers/handlers.test.js index ef03c5823..805167ed6 100644 --- a/test/integration-override/handlers/transfers/handlers.test.js +++ b/test/integration-override/handlers/transfers/handlers.test.js @@ -1394,32 +1394,43 @@ Test('Handlers test', async handlersTest => { console.error(err) } - // TODO: It seems there is an issue in position handler. Its not processing the messages with key 0. - // It should change the state of the transfer to RESERVED in the prepare step. - // Until the issue with position handler is resolved. Commenting the following test. - // // Fulfil the transfer - // const fulfilConfig = Utility.getKafkaConfig( - // Config.KAFKA_CONFIG, - // Enum.Kafka.Config.PRODUCER, - // TransferEventType.TRANSFER.toUpperCase(), - // TransferEventType.FULFIL.toUpperCase()) - // fulfilConfig.logger = Logger - - // td.messageProtocolFulfil.content.from = transferPrepareTo - // td.messageProtocolFulfil.content.headers['fspiop-source'] = transferPrepareTo - // testConsumer.clearEvents() - // await Producer.produceMessage(td.messageProtocolFulfil, td.topicConfTransferFulfil, fulfilConfig) - // try { - // const positionFulfil1 = await wrapWithRetries(() => testConsumer.getEventsForFilter({ - // topicFilter: 'topic-transfer-position-batch', - // action: 'commit', - // keyFilter: td.proxyAR.participantCurrencyId.toString() - // }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) - // test.ok(positionFulfil1[0], 'Position fulfil message with key found') - // } catch (err) { - // test.notOk('Error should not be thrown') - // console.error(err) - // } + try { + await wrapWithRetries(async () => { + const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId) || {} + if (transfer?.transferState !== TransferInternalState.RESERVED) { + if (debug) console.log(`retrying in ${retryDelay / 1000}s..`) + return null + } + return transfer + }, wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + } catch (err) { + Logger.error(err) + test.fail(err.message) + } + + // Fulfil the transfer + const fulfilConfig = Utility.getKafkaConfig( + Config.KAFKA_CONFIG, + Enum.Kafka.Config.PRODUCER, + TransferEventType.TRANSFER.toUpperCase(), + TransferEventType.FULFIL.toUpperCase()) + fulfilConfig.logger = Logger + + td.messageProtocolFulfil.content.from = transferPrepareTo + td.messageProtocolFulfil.content.headers['fspiop-source'] = transferPrepareTo + testConsumer.clearEvents() + await Producer.produceMessage(td.messageProtocolFulfil, td.topicConfTransferFulfil, fulfilConfig) + try { + const positionFulfil1 = await wrapWithRetries(() => testConsumer.getEventsForFilter({ + topicFilter: 'topic-transfer-position-batch', + action: 'commit', + keyFilter: td.proxyAR.participantCurrencyId.toString() + }), wrapWithRetriesConf.remainingRetries, wrapWithRetriesConf.timeout) + test.ok(positionFulfil1[0], 'Position fulfil message with key found') + } catch (err) { + test.notOk('Error should not be thrown') + console.error(err) + } testConsumer.clearEvents() test.end() diff --git a/test/unit/domain/fx/cyril.test.js b/test/unit/domain/fx/cyril.test.js index cd9ca013d..1fcafdad6 100644 --- a/test/unit/domain/fx/cyril.test.js +++ b/test/unit/domain/fx/cyril.test.js @@ -307,7 +307,7 @@ Test('Cyril', cyrilTest => { createdDate: new Date() }] )) - fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve( { initiatingFspParticipantId: 2, targetAmount: fxPayload.targetAmount.amount, @@ -327,7 +327,7 @@ Test('Cyril', cyrilTest => { ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve(defaultGetProxyParticipantAccountDetailsResponse)) const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) - test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.calledWith(fxPayload.commitRequestId)) test.ok(ProxyCache.getProxyParticipantAccountDetails.calledWith( 'dfsp2', fxPayload.targetAmount.currency @@ -369,7 +369,7 @@ Test('Cyril', cyrilTest => { createdDate: new Date() }] )) - fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve( { initiatingFspParticipantId: 1, targetAmount: fxPayload.targetAmount.amount, @@ -389,7 +389,7 @@ Test('Cyril', cyrilTest => { ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve(defaultGetProxyParticipantAccountDetailsResponse)) const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) - test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.calledWith(fxPayload.commitRequestId)) test.deepEqual(result, { isFx: true, positionChanges: [{ @@ -435,7 +435,7 @@ Test('Cyril', cyrilTest => { } ] )) - fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve( { initiatingFspParticipantId: 1, targetAmount: fxPayload.targetAmount.amount, @@ -455,7 +455,7 @@ Test('Cyril', cyrilTest => { ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve(defaultGetProxyParticipantAccountDetailsResponse)) const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) - test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.calledWith(fxPayload.commitRequestId)) test.deepEqual(result, { isFx: true, positionChanges: [ @@ -500,7 +500,7 @@ Test('Cyril', cyrilTest => { createdDate: new Date() }] )) - fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve( { initiatingFspParticipantId: 2, targetAmount: fxPayload.targetAmount.amount, @@ -520,7 +520,7 @@ Test('Cyril', cyrilTest => { ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve({ inScheme: false, participantCurrencyId: null })) const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) - test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.calledWith(fxPayload.commitRequestId)) test.ok(ProxyCache.getProxyParticipantAccountDetails.calledWith( 'dfsp2', fxPayload.targetAmount.currency @@ -550,7 +550,7 @@ Test('Cyril', cyrilTest => { createdDate: new Date() }] )) - fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve( { initiatingFspParticipantId: 2, targetAmount: fxPayload.targetAmount.amount, @@ -572,7 +572,7 @@ Test('Cyril', cyrilTest => { ProxyCache.getProxyParticipantAccountDetails.onCall(2).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 345 })) // FXP Target Currency const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) - test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.calledWith(fxPayload.commitRequestId)) test.ok(ProxyCache.getProxyParticipantAccountDetails.calledWith( 'dfsp2', fxPayload.targetAmount.currency @@ -615,7 +615,7 @@ Test('Cyril', cyrilTest => { createdDate: new Date() }] )) - fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve( { initiatingFspParticipantId: 2, targetAmount: fxPayload.targetAmount.amount, @@ -637,7 +637,7 @@ Test('Cyril', cyrilTest => { ProxyCache.getProxyParticipantAccountDetails.onCall(2).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 456 })) // FXP Target Currency const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) - test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.calledWith(fxPayload.commitRequestId)) test.ok(ProxyCache.getProxyParticipantAccountDetails.calledWith( 'dfsp2', fxPayload.targetAmount.currency @@ -674,7 +674,7 @@ Test('Cyril', cyrilTest => { createdDate: new Date() }] )) - fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve( { initiatingFspParticipantId: 1, targetAmount: fxPayload.targetAmount.amount, @@ -694,7 +694,7 @@ Test('Cyril', cyrilTest => { ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve({ inScheme: false, participantCurrencyId: null })) const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) - test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.calledWith(fxPayload.commitRequestId)) test.deepEqual(result, { isFx: true, positionChanges: [], @@ -720,7 +720,7 @@ Test('Cyril', cyrilTest => { createdDate: new Date() }] )) - fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve( { initiatingFspParticipantId: 1, targetAmount: fxPayload.targetAmount.amount, @@ -742,7 +742,7 @@ Test('Cyril', cyrilTest => { ProxyCache.getProxyParticipantAccountDetails.onCall(2).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 123 })) // Payer Source Currency const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) - test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.calledWith(fxPayload.commitRequestId)) test.deepEqual(result, { isFx: true, positionChanges: [ @@ -781,7 +781,7 @@ Test('Cyril', cyrilTest => { createdDate: new Date() }] )) - fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve( { initiatingFspParticipantId: 1, targetAmount: fxPayload.targetAmount.amount, @@ -803,7 +803,7 @@ Test('Cyril', cyrilTest => { ProxyCache.getProxyParticipantAccountDetails.onCall(2).returns(Promise.resolve({ inScheme: false, participantCurrencyId: 234 })) // Payer Source Currency const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) - test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.calledWith(fxPayload.commitRequestId)) test.deepEqual(result, { isFx: true, positionChanges: [ @@ -844,7 +844,7 @@ Test('Cyril', cyrilTest => { } ] )) - fxTransfer.getAllDetailsByCommitRequestId.returns(Promise.resolve( + fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.returns(Promise.resolve( { initiatingFspParticipantId: 1, targetAmount: fxPayload.targetAmount.amount, @@ -864,7 +864,7 @@ Test('Cyril', cyrilTest => { ProxyCache.getProxyParticipantAccountDetails.returns(Promise.resolve({ inScheme: true, participantCurrencyId: null })) const result = await Cyril.processFulfilMessage(payload.transferId, payload, payload) test.ok(watchList.getItemsInWatchListByDeterminingTransferId.calledWith(payload.transferId)) - test.ok(fxTransfer.getAllDetailsByCommitRequestId.calledWith(fxPayload.commitRequestId)) + test.ok(fxTransfer.getAllDetailsByCommitRequestIdForProxiedFxTransfer.calledWith(fxPayload.commitRequestId)) test.deepEqual(result, { isFx: true, positionChanges: [], diff --git a/test/unit/domain/position/abort.test.js b/test/unit/domain/position/abort.test.js index 63588ab79..3b6705fe3 100644 --- a/test/unit/domain/position/abort.test.js +++ b/test/unit/domain/position/abort.test.js @@ -412,16 +412,18 @@ Test('abort domain', positionIndexTest => { try { await processPositionAbortBin( binItems, - 0, - 0, { - 'a0000001-0000-0000-0000-000000000000': 'INVALID_STATE', - 'a0000002-0000-0000-0000-000000000000': 'INVALID_STATE' - }, - { - 'b0000001-0000-0000-0000-000000000000': 'INVALID_STATE' - }, - false + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'a0000001-0000-0000-0000-000000000000': 'INVALID_STATE', + 'a0000002-0000-0000-0000-000000000000': 'INVALID_STATE' + }, + accumulatedFxTransferStates: { + 'b0000001-0000-0000-0000-000000000000': 'INVALID_STATE' + }, + isFx: false + } ) test.fail('Error not thrown') } catch (e) { @@ -438,16 +440,18 @@ Test('abort domain', positionIndexTest => { try { await processPositionAbortBin( binItems, - 0, - 0, - { - 'a0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, - 'a0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, { - 'b0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, - false + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'a0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, + 'a0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + accumulatedFxTransferStates: { + 'b0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + isFx: false + } ) test.fail('Error not thrown') } catch (e) { @@ -461,16 +465,18 @@ Test('abort domain', positionIndexTest => { try { const processedResult = await processPositionAbortBin( binItems, - 0, - 0, - { - 'a0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, - 'a0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, { - 'b0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, - false + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'a0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, + 'a0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + accumulatedFxTransferStates: { + 'b0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + isFx: false + } ) test.pass('Error not thrown') test.equal(processedResult.notifyMessages.length, 1) @@ -496,16 +502,18 @@ Test('abort domain', positionIndexTest => { try { const processedResult = await processPositionAbortBin( binItems, - 0, - 0, { - 'a0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, - 'a0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, - { - 'b0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, - false + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'a0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, + 'a0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + accumulatedFxTransferStates: { + 'b0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + isFx: false + } ) test.pass('Error not thrown') test.equal(processedResult.notifyMessages.length, 0) @@ -522,6 +530,34 @@ Test('abort domain', positionIndexTest => { test.end() }) + processPositionAbortBinTest.test('skip position changes if changePositions is false', async (test) => { + const binItems = getAbortBinItems() + try { + const processedResult = await processPositionAbortBin( + binItems, + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'a0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, + 'a0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + isFx: false, + changePositions: false + } + ) + test.equal(processedResult.accumulatedPositionChanges.length, 0) + test.equal(processedResult.accumulatedPositionValue, 0) + test.equal(processedResult.accumulatedTransferStateChanges.length, 2) + processedResult.accumulatedTransferStateChanges.forEach(transferStateChange => test.equal(transferStateChange.transferStateId, Enum.Transfers.TransferInternalState.ABORTED_ERROR)) + processedResult.accumulatedTransferStates[abortMessage1.value.id] = Enum.Transfers.TransferInternalState.ABORTED_ERROR + processedResult.accumulatedTransferStates[abortMessage2.value.id] = Enum.Transfers.TransferInternalState.ABORTED_ERROR + } catch (e) { + test.fail('Error thrown') + } + test.end() + }) + processPositionAbortBinTest.end() }) @@ -531,16 +567,18 @@ Test('abort domain', positionIndexTest => { try { await processPositionAbortBin( binItems, - 0, - 0, - { - 'd0000001-0000-0000-0000-000000000000': 'INVALID_STATE' - }, { - 'c0000001-0000-0000-0000-000000000000': 'INVALID_STATE', - 'c0000002-0000-0000-0000-000000000000': 'INVALID_STATE' - }, - true + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'd0000001-0000-0000-0000-000000000000': 'INVALID_STATE' + }, + accumulatedFxTransferStates: { + 'c0000001-0000-0000-0000-000000000000': 'INVALID_STATE', + 'c0000002-0000-0000-0000-000000000000': 'INVALID_STATE' + }, + isFx: true + } ) test.fail('Error not thrown') } catch (e) { @@ -557,16 +595,18 @@ Test('abort domain', positionIndexTest => { try { await processPositionAbortBin( binItems, - 0, - 0, - { - 'd0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, { - 'c0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, - 'c0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, - true + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'd0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + accumulatedFxTransferStates: { + 'c0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, + 'c0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + isFx: true + } ) test.fail('Error not thrown') } catch (e) { @@ -580,16 +620,18 @@ Test('abort domain', positionIndexTest => { try { const processedResult = await processPositionAbortBin( binItems, - 0, - 0, { - 'd0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, - { - 'c0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, - 'c0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, - true + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'd0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + accumulatedFxTransferStates: { + 'c0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, + 'c0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + isFx: true + } ) test.pass('Error not thrown') test.equal(processedResult.notifyMessages.length, 1) @@ -611,16 +653,18 @@ Test('abort domain', positionIndexTest => { try { const processedResult = await processPositionAbortBin( binItems, - 0, - 0, - { - 'd0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, { - 'c0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, - 'c0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR - }, - true + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'd0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + accumulatedFxTransferStates: { + 'c0000001-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR, + 'c0000002-0000-0000-0000-000000000000': Enum.Transfers.TransferInternalState.RECEIVED_ERROR + }, + isFx: true + } ) test.pass('Error not thrown') test.equal(processedResult.notifyMessages.length, 0) diff --git a/test/unit/domain/position/binProcessor.test.js b/test/unit/domain/position/binProcessor.test.js index c9dd02a3f..9274fde4a 100644 --- a/test/unit/domain/position/binProcessor.test.js +++ b/test/unit/domain/position/binProcessor.test.js @@ -79,6 +79,7 @@ const fxTimeoutReservedTransfers = [ Test('BinProcessor', async (binProcessorTest) => { let sandbox + binProcessorTest.beforeEach(async test => { sandbox = Sinon.createSandbox() sandbox.stub(BatchPositionModel) @@ -439,8 +440,8 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - binProcessorTest.test('binProcessor should', prepareActionTest => { - prepareActionTest.test('processBins should process a bin of positions and return the expected results', async (test) => { + binProcessorTest.test('binProcessor should', processBinsTest => { + processBinsTest.test('processBins should process a bin of positions and return the expected results', async (test) => { const sampleParticipantLimitReturnValues = [ { participantId: 2, @@ -484,7 +485,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should handle prepare messages', async (test) => { + processBinsTest.test('processBins should handle prepare messages', async (test) => { const sampleParticipantLimitReturnValues = [ { participantId: 2, @@ -536,7 +537,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should handle commit messages', async (test) => { + processBinsTest.test('processBins should handle commit messages', async (test) => { const sampleParticipantLimitReturnValues = [ { participantId: 2, @@ -585,7 +586,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should handle reserve messages', async (test) => { + processBinsTest.test('processBins should handle reserve messages', async (test) => { const sampleParticipantLimitReturnValues = [ { participantId: 2, @@ -634,7 +635,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should handle timeout-reserved messages', async (test) => { + processBinsTest.test('processBins should handle timeout-reserved messages', async (test) => { const sampleParticipantLimitReturnValues = [ { participantId: 2, @@ -683,7 +684,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should handle fx-timeout-reserved messages', async (test) => { + processBinsTest.test('processBins should handle fx-timeout-reserved messages', async (test) => { const sampleParticipantLimitReturnValues = [ { participantId: 2, @@ -732,7 +733,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should throw error if any accountId cannot be matched to atleast one participantCurrencyId', async (test) => { + processBinsTest.test('processBins should throw error if any accountId cannot be matched to atleast one participantCurrencyId', async (test) => { const sampleParticipantLimitReturnValues = [ { participantId: 2, @@ -761,7 +762,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should throw error if no settlement model is found', async (test) => { + processBinsTest.test('processBins should throw error if no settlement model is found', async (test) => { SettlementModelCached.getAll.returns([]) const sampleParticipantLimitReturnValues = [ { @@ -787,7 +788,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should throw error if no default settlement model if currency model is missing', async (test) => { + processBinsTest.test('processBins should throw error if no default settlement model if currency model is missing', async (test) => { SettlementModelCached.getAll.returns([ { settlementModelId: 3, @@ -828,7 +829,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should use default settlement model if currency model is missing', async (test) => { + processBinsTest.test('processBins should use default settlement model if currency model is missing', async (test) => { SettlementModelCached.getAll.returns([ { settlementModelId: 2, @@ -885,7 +886,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should handle no binItems', async (test) => { + processBinsTest.test('processBins should handle no binItems', async (test) => { const sampleParticipantLimitReturnValues = [ { participantId: 2, @@ -936,7 +937,7 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.test('processBins should handle non supported bins', async (test) => { + processBinsTest.test('processBins should handle non supported bins', async (test) => { const sampleParticipantLimitReturnValues = [ { participantId: 2, @@ -964,8 +965,45 @@ Test('BinProcessor', async (binProcessorTest) => { test.end() }) - prepareActionTest.end() + + processBinsTest.test('processBins should process bins with accountId 0 differently', async (test) => { + const sampleParticipantLimitReturnValues = [ + { + participantId: 2, + currencyId: 'USD', + participantLimitTypeId: 1, + value: 1000000 + }, + { + participantId: 3, + currencyId: 'USD', + participantLimitTypeId: 1, + value: 1000000 + } + ] + participantFacade.getParticipantLimitByParticipantCurrencyLimit.returns(sampleParticipantLimitReturnValues.shift()) + const binsWithZeroId = JSON.parse(JSON.stringify(sampleBins)) + binsWithZeroId[0] = binsWithZeroId[15] + delete binsWithZeroId[15] + delete binsWithZeroId[7] + + const result = await BinProcessor.processBins(binsWithZeroId, trx) + + // Assert on result.notifyMessages + test.equal(result.notifyMessages.length, 6, 'processBins should return 6 messages') + + // Assert on number of function calls for DB update on position value + test.equal(BatchPositionModel.updateParticipantPosition.callCount, 0, 'updateParticipantPosition should not be called') + test.ok(BatchPositionModel.bulkInsertTransferStateChanges.calledOnce, 'bulkInsertTrasferStateChanges should be called once') + test.ok(BatchPositionModel.bulkInsertFxTransferStateChanges.calledOnce, 'bulkInsertFxTrasferStateChanges should be called once') + test.equal(BatchPositionModel.bulkInsertParticipantPositionChanges.callCount, 0, 'bulkInsertParticipantPositionChanges should not be called') + + test.end() + }) + + processBinsTest.end() }) + binProcessorTest.test('iterateThroughBins should', async (iterateThroughBinsTest) => { iterateThroughBinsTest.test('iterateThroughBins should call callback function for each message in bins', async (test) => { const spyCb = sandbox.spy() @@ -995,5 +1033,6 @@ Test('BinProcessor', async (binProcessorTest) => { }) iterateThroughBinsTest.end() }) + binProcessorTest.end() }) diff --git a/test/unit/domain/position/fulfil.test.js b/test/unit/domain/position/fulfil.test.js index 02f100492..6ac37b728 100644 --- a/test/unit/domain/position/fulfil.test.js +++ b/test/unit/domain/position/fulfil.test.js @@ -241,12 +241,14 @@ Test('Fulfil domain', processPositionFulfilBinTest => { // Call the function const result = await processPositionFulfilBin( [commitBinItems, []], - 0, - 0, - accumulatedTransferStates, - accumulatedFxTransferStates, - transferInfoList, - [] + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates, + accumulatedFxTransferStates, + transferInfoList, + reservedActionTransfers: [] + } ) // Assert the expected results @@ -293,12 +295,14 @@ Test('Fulfil domain', processPositionFulfilBinTest => { // Call the function const result = await processPositionFulfilBin( [[], reserveBinItems], - 0, - 0, - accumulatedTransferStates, - accumulatedFxTransferStates, - transferInfoList, - reservedActionTransfers + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates, + accumulatedFxTransferStates, + transferInfoList, + reservedActionTransfers + } ) // Assert the expected results @@ -349,12 +353,14 @@ Test('Fulfil domain', processPositionFulfilBinTest => { // Call the function const result = await processPositionFulfilBin( [commitBinItems, reserveBinItems], - 0, - 0, - accumulatedTransferStates, - accumulatedFxTransferStates, - transferInfoList, - reservedActionTransfers + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates, + accumulatedFxTransferStates, + transferInfoList, + reservedActionTransfers + } ) // Assert the expected results @@ -415,11 +421,14 @@ Test('Fulfil domain', processPositionFulfilBinTest => { // Call the function const result = await processPositionFulfilBin( [commitBinItems, []], - 0, - 0, - accumulatedTransferStates, - accumulatedFxTransferStates, - transferInfoList + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates, + accumulatedFxTransferStates, + transferInfoList, + reservedActionTransfers: [] + } ) // Assert the expected results @@ -456,11 +465,14 @@ Test('Fulfil domain', processPositionFulfilBinTest => { // Call the function const result = await processPositionFulfilBin( [commitBinItems, []], - 0, - 0, - accumulatedTransferStates, - accumulatedFxTransferStates, - transferInfoList + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates, + accumulatedFxTransferStates, + transferInfoList, + reservedActionTransfers: [] + } ) // Assert the expected results @@ -490,6 +502,46 @@ Test('Fulfil domain', processPositionFulfilBinTest => { test.end() }) + processPositionFulfilBinTest.test('should skip position changes if changePosition is false', async (test) => { + const accumulatedTransferStates = { + [transferTestData1.message.value.id]: Enum.Transfers.TransferInternalState.RECEIVED_FULFIL, + [transferTestData2.message.value.id]: Enum.Transfers.TransferInternalState.RECEIVED_FULFIL + } + const accumulatedFxTransferStates = {} + const transferInfoList = { + [transferTestData1.message.value.id]: transferTestData1.transferInfo, + [transferTestData2.message.value.id]: transferTestData2.transferInfo + } + // Call the function + const result = await processPositionFulfilBin( + [commitBinItems, []], + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates, + accumulatedFxTransferStates, + transferInfoList, + reservedActionTransfers: [], + changePositions: false + } + ) + + // Assert the expected results + test.equal(result.notifyMessages.length, 2) + test.equal(result.accumulatedPositionValue, 0) + test.equal(result.accumulatedTransferStateChanges.length, 2) + test.equal(result.accumulatedPositionChanges.length, 0) + + test.equal(result.accumulatedTransferStateChanges[0].transferId, transferTestData1.message.value.id) + test.equal(result.accumulatedTransferStateChanges[1].transferId, transferTestData2.message.value.id) + test.equal(result.accumulatedTransferStateChanges[0].transferStateId, Enum.Transfers.TransferState.COMMITTED) + test.equal(result.accumulatedTransferStateChanges[1].transferStateId, Enum.Transfers.TransferState.COMMITTED) + test.equal(result.accumulatedTransferStates[transferTestData1.message.value.id], Enum.Transfers.TransferState.COMMITTED) + test.equal(result.accumulatedTransferStates[transferTestData2.message.value.id], Enum.Transfers.TransferState.COMMITTED) + + test.end() + }) + // FX tests processPositionFulfilBinTest.test('should process a bin of position-commit messages involved in fx transfers', async (test) => { @@ -505,12 +557,14 @@ Test('Fulfil domain', processPositionFulfilBinTest => { // Call the function const result = await processPositionFulfilBin( [commitWithFxBinItems, []], - 0, - 0, - accumulatedTransferStates, - accumulatedFxTransferStates, - transferInfoList, - [] + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates, + accumulatedFxTransferStates, + transferInfoList, + reservedActionTransfers: [] + } ) // Assert the expected results @@ -556,12 +610,14 @@ Test('Fulfil domain', processPositionFulfilBinTest => { // Call the function const result = await processPositionFulfilBin( [commitWithPartiallyProcessedFxBinItems, []], - 0, - 0, - accumulatedTransferStates, - accumulatedFxTransferStates, - transferInfoList, - [] + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates, + accumulatedFxTransferStates, + transferInfoList, + reservedActionTransfers: [] + } ) // Assert the expected results diff --git a/test/unit/domain/position/fx-fulfil.test.js b/test/unit/domain/position/fx-fulfil.test.js index 22a14c81f..1924d10ff 100644 --- a/test/unit/domain/position/fx-fulfil.test.js +++ b/test/unit/domain/position/fx-fulfil.test.js @@ -161,7 +161,7 @@ Test('Fx Fulfil domain', processPositionFxFulfilBinTest => { // Call the function const processedMessages = await processPositionFxFulfilBin( reserveBinItems, - accumulatedFxTransferStates + { accumulatedFxTransferStates } ) // Assert the expected results diff --git a/test/unit/domain/position/fx-prepare.test.js b/test/unit/domain/position/fx-prepare.test.js index c9e6643de..987a373a8 100644 --- a/test/unit/domain/position/fx-prepare.test.js +++ b/test/unit/domain/position/fx-prepare.test.js @@ -189,11 +189,13 @@ Test('FX Prepare domain', positionIndexTest => { } const processedMessages = await processFxPositionPrepareBin( binItems, - 0, // Accumulated position value - 0, - accumulatedFxTransferStates, - -1000, // Settlement participant position value - participantLimit + { + accumulatedPositionValue: 0, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedFxTransferStates, + settlementParticipantPosition: -1000, // Settlement participant position value + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -247,11 +249,13 @@ Test('FX Prepare domain', positionIndexTest => { } const processedMessages = await processFxPositionPrepareBin( binItems, - 0, // No accumulated position value - 0, - accumulatedFxTransferStates, - 0, // Settlement participant position value - participantLimit + { + accumulatedPositionValue: 0, // No accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedFxTransferStates, + settlementParticipantPosition: 0, // Settlement participant position value + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -314,11 +318,13 @@ Test('FX Prepare domain', positionIndexTest => { } const processedMessages = await processFxPositionPrepareBin( binItems, - 1000, // Position value has reached limit of 1000 - 0, - accumulatedFxTransferStates, - -2000, // Payer has liquidity - participantLimit + { + accumulatedPositionValue: 1000, // Position value has reached limit of 1000 + accumulatedPositionReservedValue: 0, + accumulatedFxTransferStates, + settlementParticipantPosition: -2000, // Payer has liquidity + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -381,11 +387,13 @@ Test('FX Prepare domain', positionIndexTest => { } const processedMessages = await processFxPositionPrepareBin( binItems, - 0, // Accumulated position value - 0, - accumulatedFxTransferStates, - -2000, // Payer has liquidity - participantLimit + { + accumulatedPositionValue: 0, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedFxTransferStates, + settlementParticipantPosition: -2000, // Payer has liquidity + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -444,11 +452,13 @@ Test('FX Prepare domain', positionIndexTest => { } const processedMessages = await processFxPositionPrepareBin( binItems, - 0, - 0, - accumulatedFxTransferStates, - -sourceAmount * 2, - participantLimit + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedFxTransferStates, + settlementParticipantPosition: -sourceAmount * 2, + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -457,6 +467,81 @@ Test('FX Prepare domain', positionIndexTest => { test.end() }) + changeParticipantPositionTest.test('skip position changes if changePositions is false', async (test) => { + const participantLimit = { + participantCurrencyId: 1, + participantLimitTypeId: 1, + value: 10000, + isActive: 1, + createdBy: 'unknown', + participantLimitId: 1, + thresholdAlarmPercentage: 0.5 + } + const accumulatedFxTransferStates = { + [fxTransferTestData1.message.value.id]: Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + [fxTransferTestData2.message.value.id]: Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + [fxTransferTestData3.message.value.id]: 'INVALID_STATE' + } + const processedMessages = await processFxPositionPrepareBin( + binItems, + { + accumulatedPositionValue: -4, + accumulatedPositionReservedValue: 0, + accumulatedFxTransferStates, + settlementParticipantPosition: -2000, + participantLimit, + changePositions: false + } + ) + Logger.isInfoEnabled && Logger.info(processedMessages) + test.equal(processedMessages.notifyMessages.length, 3) + test.equal(processedMessages.accumulatedPositionChanges.length, 0) + test.equal(processedMessages.accumulatedPositionValue, -4) + test.end() + }) + + changeParticipantPositionTest.test('use targetAmount as transferAmount if cyrilResult currency equals targetAmount currency', async (test) => { + const participantLimit = { + participantCurrencyId: 1, + participantLimitTypeId: 1, + value: 10000, + isActive: 1, + createdBy: 'unknown', + participantLimitId: 1, + thresholdAlarmPercentage: 0.5 + } + const accumulatedFxTransferStates = { + [fxTransferTestData1.message.value.id]: Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + [fxTransferTestData2.message.value.id]: Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + [fxTransferTestData3.message.value.id]: 'INVALID_STATE' + } + const cyrilResult = { + participantName: 'perffsp1', + currencyId: 'XXX', + amount: 50 + } + const binItemsWithModifiedCyrilResult = binItems.map(item => { + item.message.value.content.context.cyrilResult = cyrilResult + return item + }) + const processedMessages = await processFxPositionPrepareBin( + binItemsWithModifiedCyrilResult, + { + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedFxTransferStates, + settlementParticipantPosition: -2000, + participantLimit + } + ) + Logger.isInfoEnabled && Logger.info(processedMessages) + test.equal(processedMessages.notifyMessages.length, 3) + test.equal(processedMessages.accumulatedPositionChanges.length, 2) + test.equal(processedMessages.accumulatedPositionChanges[0].value, 50) + test.equal(processedMessages.accumulatedPositionChanges[1].value, 100) + test.end() + }) + changeParticipantPositionTest.end() }) diff --git a/test/unit/domain/position/fx-timeout-reserved.test.js b/test/unit/domain/position/fx-timeout-reserved.test.js index 5cf119b3a..50acb5741 100644 --- a/test/unit/domain/position/fx-timeout-reserved.test.js +++ b/test/unit/domain/position/fx-timeout-reserved.test.js @@ -207,13 +207,15 @@ Test('timeout reserved domain', positionIndexTest => { try { await processPositionFxTimeoutReservedBin( binItems, - 0, // Accumulated position value - 0, { - 'd6a036a5-65a3-48af-a0c7-ee089c412ada': 'INVALID_STATE', - '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': 'INVALID_STATE' - }, - {} + accumulatedPositionValue: 0, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedFxTransferStates: { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': 'INVALID_STATE', + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': 'INVALID_STATE' + }, + fetchedReservedPositionChangesByCommitRequestIds: {} + } ) test.fail('Error not thrown') } catch (e) { @@ -225,21 +227,23 @@ Test('timeout reserved domain', positionIndexTest => { changeParticipantPositionTest.test('produce reserved messages/position changes for valid timeout messages', async (test) => { const processedMessages = await processPositionFxTimeoutReservedBin( binItems, - 0, // Accumulated position value - 0, - { - 'd6a036a5-65a3-48af-a0c7-ee089c412ada': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT, - '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT - }, { - 'd6a036a5-65a3-48af-a0c7-ee089c412ada': { - 51: { - value: 10 - } + accumulatedPositionValue: 0, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedFxTransferStates: { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT }, - '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': { - 51: { - value: 5 + fetchedReservedPositionChangesByCommitRequestIds: { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': { + 51: { + value: 10 + } + }, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': { + 51: { + value: 5 + } } } } @@ -270,6 +274,44 @@ Test('timeout reserved domain', positionIndexTest => { test.end() }) + changeParticipantPositionTest.test('skip position changes if changePositions is false', async (test) => { + const processedMessages = await processPositionFxTimeoutReservedBin( + binItems, + { + accumulatedPositionValue: 0, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedFxTransferStates: { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT + }, + fetchedReservedPositionChangesByCommitRequestIds: { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': { + 51: { + value: 10 + } + }, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': { + 51: { + value: 5 + } + } + }, + changePositions: false + } + ) + test.equal(processedMessages.notifyMessages.length, 2) + test.equal(processedMessages.accumulatedPositionValue, 0) + test.equal(processedMessages.accumulatedPositionChanges.length, 0) + test.equal(processedMessages.accumulatedFxTransferStateChanges[0].commitRequestId, fxTimeoutMessage1.value.id) + test.equal(processedMessages.accumulatedFxTransferStateChanges[1].commitRequestId, fxTimeoutMessage2.value.id) + test.equal(processedMessages.accumulatedFxTransferStateChanges[0].transferStateId, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + test.equal(processedMessages.accumulatedFxTransferStateChanges[1].transferStateId, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + test.equal(processedMessages.accumulatedFxTransferStates[fxTimeoutMessage1.value.id], Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + test.equal(processedMessages.accumulatedFxTransferStates[fxTimeoutMessage2.value.id], Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + + test.end() + }) + changeParticipantPositionTest.end() }) diff --git a/test/unit/domain/position/prepare.test.js b/test/unit/domain/position/prepare.test.js index 19e4d6101..038c4d20e 100644 --- a/test/unit/domain/position/prepare.test.js +++ b/test/unit/domain/position/prepare.test.js @@ -324,32 +324,19 @@ Test('Prepare domain', positionIndexTest => { participantLimitId: 1, thresholdAlarmPercentage: 0.5 } - const settlementModel = { - settlementModelId: 1, - name: 'DEFERREDNET', - isActive: 1, - settlementGranularityId: 2, - settlementInterchangeId: 2, - settlementDelayId: 2, // 1 Immediate, 2 Deferred - currencyId: 'USD', - requireLiquidityCheck: 1, - ledgerAccountTypeId: 1, // 1 Position, 2 Settlement - autoPositionReset: 1, - adjustPosition: 0, - settlementAccountTypeId: 2 - } const processedMessages = await processPositionPrepareBin( binItems, - 0, // Accumulated position value - 0, { - '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' - }, - -1000, // Settlement participant position value - settlementModel, - participantLimit + accumulatedPositionValue: 0, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' + }, + settlementParticipantPosition: -1000, // Settlement participant position value + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -396,32 +383,19 @@ Test('Prepare domain', positionIndexTest => { participantLimitId: 1, thresholdAlarmPercentage: 0.5 } - const settlementModel = { - settlementModelId: 1, - name: 'DEFERREDNET', - isActive: 1, - settlementGranularityId: 2, - settlementInterchangeId: 2, - settlementDelayId: 2, // 1 Immediate, 2 Deferred - currencyId: 'USD', - requireLiquidityCheck: 1, - ledgerAccountTypeId: 1, // 1 Position, 2 Settlement - autoPositionReset: 1, - adjustPosition: 0, - settlementAccountTypeId: 2 - } const processedMessages = await processPositionPrepareBin( binItems, - 0, // No accumulated position value - 0, { - '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' - }, - 0, // Settlement participant position value - settlementModel, - participantLimit + accumulatedPositionValue: 0, // No accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' + }, + settlementParticipantPosition: 0, // Settlement participant position value + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -477,32 +451,19 @@ Test('Prepare domain', positionIndexTest => { participantLimitId: 1, thresholdAlarmPercentage: 0.5 } - const settlementModel = { - settlementModelId: 1, - name: 'DEFERREDNET', - isActive: 1, - settlementGranularityId: 2, - settlementInterchangeId: 2, - settlementDelayId: 2, // 1 Immediate, 2 Deferred - currencyId: 'USD', - requireLiquidityCheck: 1, - ledgerAccountTypeId: 1, // 1 Position, 2 Settlement - autoPositionReset: 1, - adjustPosition: 0, - settlementAccountTypeId: 2 - } const processedMessages = await processPositionPrepareBin( binItems, - 1000, // Position value has reached limit of 1000 - 0, { - '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' - }, - -2000, // Payer has liquidity - settlementModel, - participantLimit + accumulatedPositionValue: 1000, // Position value has reached limit of 1000 + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' + }, + settlementParticipantPosition: -2000, // Payer has liquidity + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -558,32 +519,19 @@ Test('Prepare domain', positionIndexTest => { participantLimitId: 1, thresholdAlarmPercentage: 0.5 } - const settlementModel = { - settlementModelId: 1, - name: 'DEFERREDNET', - isActive: 1, - settlementGranularityId: 2, - settlementInterchangeId: 2, - settlementDelayId: 2, // 1 Immediate, 2 Deferred - currencyId: 'USD', - requireLiquidityCheck: 1, - ledgerAccountTypeId: 1, // 1 Position, 2 Settlement - autoPositionReset: 1, - adjustPosition: 0, - settlementAccountTypeId: 2 - } const processedMessages = await processPositionPrepareBin( binItems, - -4, // Accumulated position value - 0, { - '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' - }, - 0, // Settlement participant position value - settlementModel, - participantLimit + accumulatedPositionValue: -4, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' + }, + settlementParticipantPosition: 0, // Settlement participant position value + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -635,20 +583,6 @@ Test('Prepare domain', positionIndexTest => { participantLimitId: 1, thresholdAlarmPercentage: 0.5 } - const settlementModel = { - settlementModelId: 1, - name: 'DEFERREDNET', - isActive: 1, - settlementGranularityId: 2, - settlementInterchangeId: 2, - settlementDelayId: 2, // 1 Immediate, 2 Deferred - currencyId: 'USD', - requireLiquidityCheck: 1, - ledgerAccountTypeId: 1, // 1 Position, 2 Settlement - autoPositionReset: 1, - adjustPosition: 0, - settlementAccountTypeId: 2 - } // Modifying first transfer message to contain a context object with cyrilResult so that it is considered an FX transfer const binItemsCopy = JSON.parse(JSON.stringify(binItems)) @@ -659,16 +593,17 @@ Test('Prepare domain', positionIndexTest => { } const processedMessages = await processPositionPrepareBin( binItemsCopy, - -20, // Accumulated position value - 0, { - '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' - }, - 0, // Settlement participant position value - settlementModel, - participantLimit + accumulatedPositionValue: -20, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' + }, + settlementParticipantPosition: 0, // Settlement participant position value + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -720,32 +655,19 @@ Test('Prepare domain', positionIndexTest => { participantLimitId: 1, thresholdAlarmPercentage: 0.5 } - const settlementModel = { - settlementModelId: 1, - name: 'DEFERREDNET', - isActive: 1, - settlementGranularityId: 2, - settlementInterchangeId: 2, - settlementDelayId: 2, // 1 Immediate, 2 Deferred - currencyId: null, // Default settlement model is null currencyId - requireLiquidityCheck: 1, - ledgerAccountTypeId: 1, // 1 Position, 2 Settlement - autoPositionReset: 1, - adjustPosition: 0, - settlementAccountTypeId: 2 - } const processedMessages = await processPositionPrepareBin( binItems, - -4, - 0, { - '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' - }, - 0, - settlementModel, - participantLimit + accumulatedPositionValue: -4, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' + }, + settlementParticipantPosition: 0, + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -796,32 +718,19 @@ Test('Prepare domain', positionIndexTest => { participantLimitId: 1, thresholdAlarmPercentage: 0.5 } - const settlementModel = { - settlementModelId: 1, - name: 'DEFERREDNET', - isActive: 1, - settlementGranularityId: 2, - settlementInterchangeId: 2, - settlementDelayId: 2, // 1 Immediate, 2 Deferred - currencyId: null, // Default settlement model is null currencyId - requireLiquidityCheck: 1, - ledgerAccountTypeId: 1, // 1 Position, 2 Settlement - autoPositionReset: 1, - adjustPosition: 0, - settlementAccountTypeId: 2 - } const processedMessages = await processPositionPrepareBin( binItems, - 0, - 0, { - '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, - '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' - }, - -4, - settlementModel, - participantLimit + accumulatedPositionValue: 0, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' + }, + settlementParticipantPosition: -4, + participantLimit + } ) Logger.isInfoEnabled && Logger.info(processedMessages) test.equal(processedMessages.notifyMessages.length, 3) @@ -830,6 +739,38 @@ Test('Prepare domain', positionIndexTest => { test.end() }) + changeParticipantPositionTest.test('skip position changes if changePosition is false', async (test) => { + const participantLimit = { + participantCurrencyId: 1, + participantLimitTypeId: 1, + value: 10000, + isActive: 1, + createdBy: 'unknown', + participantLimitId: 1, + thresholdAlarmPercentage: 0.5 + } + const processedMessages = await processPositionPrepareBin( + binItems, + { + accumulatedPositionValue: -4, + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + '1cf6981b-25d8-4bd7-b9d9-b1c0fc8cdeaf': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '6c2c09c3-19b6-48ba-becc-cbdffcaadd7e': Enum.Transfers.TransferInternalState.RECEIVED_PREPARE, + '5dff336f-62c0-4619-92c6-9ccd7c8f0369': 'INVALID_STATE' + }, + settlementParticipantPosition: 0, + participantLimit, + changePositions: false + } + ) + Logger.isInfoEnabled && Logger.info(processedMessages) + test.equal(processedMessages.notifyMessages.length, 3) + test.equal(processedMessages.accumulatedPositionChanges.length, 0) + test.equal(processedMessages.accumulatedPositionValue, -4) + test.end() + }) + changeParticipantPositionTest.end() }) diff --git a/test/unit/domain/position/timeout-reserved.test.js b/test/unit/domain/position/timeout-reserved.test.js index 7e87dd1f8..1bff3f152 100644 --- a/test/unit/domain/position/timeout-reserved.test.js +++ b/test/unit/domain/position/timeout-reserved.test.js @@ -206,13 +206,15 @@ Test('timeout reserved domain', positionIndexTest => { try { await processPositionTimeoutReservedBin( binItems, - 0, // Accumulated position value - 0, { - 'd6a036a5-65a3-48af-a0c7-ee089c412ada': 'INVALID_STATE', - '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': 'INVALID_STATE' - }, - {} + accumulatedPositionValue: 0, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': 'INVALID_STATE', + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': 'INVALID_STATE' + }, + transferInfoList: {} + } ) test.fail('Error not thrown') } catch (e) { @@ -224,18 +226,20 @@ Test('timeout reserved domain', positionIndexTest => { changeParticipantPositionTest.test('produce reserved messages/position changes for valid timeout messages', async (test) => { const processedMessages = await processPositionTimeoutReservedBin( binItems, - 0, // Accumulated position value - 0, - { - 'd6a036a5-65a3-48af-a0c7-ee089c412ada': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT, - '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT - }, { - 'd6a036a5-65a3-48af-a0c7-ee089c412ada': { - amount: -10 + accumulatedPositionValue: 0, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT }, - '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': { - amount: -5 + transferInfoList: { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': { + amount: -10 + }, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': { + amount: -5 + } } } ) @@ -265,6 +269,39 @@ Test('timeout reserved domain', positionIndexTest => { test.end() }) + changeParticipantPositionTest.test('skip position changes if changePositions is false', async (test) => { + const processedMessages = await processPositionTimeoutReservedBin( + binItems, + { + accumulatedPositionValue: 0, // Accumulated position value + accumulatedPositionReservedValue: 0, + accumulatedTransferStates: { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT + }, + transferInfoList: { + 'd6a036a5-65a3-48af-a0c7-ee089c412ada': { + amount: -10 + }, + '7e3fa3f7-9a1b-4a81-83c9-5b41112dd7f5': { + amount: -5 + } + }, + changePositions: false + } + ) + test.equal(processedMessages.notifyMessages.length, 2) + test.equal(processedMessages.accumulatedPositionChanges.length, 0) + test.equal(processedMessages.accumulatedPositionValue, 0) + test.equal(processedMessages.accumulatedTransferStateChanges[0].transferId, timeoutMessage1.value.id) + test.equal(processedMessages.accumulatedTransferStateChanges[1].transferId, timeoutMessage2.value.id) + test.equal(processedMessages.accumulatedTransferStateChanges[0].transferStateId, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + test.equal(processedMessages.accumulatedTransferStateChanges[1].transferStateId, Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + test.equal(processedMessages.accumulatedTransferStates[timeoutMessage1.value.id], Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + test.equal(processedMessages.accumulatedTransferStates[timeoutMessage2.value.id], Enum.Transfers.TransferInternalState.EXPIRED_RESERVED) + test.end() + }) + changeParticipantPositionTest.end() }) diff --git a/test/unit/handlers/positions/handlerBatch.test.js b/test/unit/handlers/positions/handlerBatch.test.js index 28d5e5f4c..605ad261e 100644 --- a/test/unit/handlers/positions/handlerBatch.test.js +++ b/test/unit/handlers/positions/handlerBatch.test.js @@ -566,37 +566,6 @@ Test('Position handler', positionBatchHandlerTest => { } }) - positionsTest.test('skip processing if message key is 0', async test => { - // Arrange - await Consumer.createHandler(topicName, config, command) - Kafka.transformGeneralTopicName.returns(topicName) - Kafka.getKafkaConfig.returns(config) - Kafka.proceed.returns(true) - BinProcessor.processBins.resolves({ - notifyMessages: [], - followupMessages: [] - }) - - const message = { - key: '0', - value: prepareMessageValue, - topic: topicName - } - - // Act - try { - await allTransferHandlers.positions(null, [message]) - test.ok(BatchPositionModel.startDbTransaction.notCalled, 'startDbTransaction should not be called') - test.ok(BinProcessor.processBins.notCalled, 'processBins should not be called') - test.ok(Kafka.proceed.notCalled, 'kafkaProceed should not be called') - test.end() - } catch (err) { - Logger.info(err) - test.fail('Error should not be thrown') - test.end() - } - }) - positionsTest.end() }) diff --git a/test/unit/lib/proxyCache.test.js b/test/unit/lib/proxyCache.test.js index 3aa637132..4104b7570 100644 --- a/test/unit/lib/proxyCache.test.js +++ b/test/unit/lib/proxyCache.test.js @@ -46,8 +46,6 @@ Test('Proxy Cache test', async (proxyCacheTest) => { await connectTest.test('connect to cache with lazyConnect', async (test) => { await ProxyCache.connect() test.ok(connectStub.calledOnce) - const secondArg = createProxyCacheStub.getCall(0).args[1] - test.ok(secondArg.lazyConnect) test.end() })