Skip to content

Commit

Permalink
feat: default to transformGeneralTopicName when batch prepare topic i…
Browse files Browse the repository at this point in the history
…s not set
  • Loading branch information
oderayi committed Dec 5, 2023
1 parent d827219 commit 2b7be99
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 53 deletions.
43 changes: 17 additions & 26 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
"@mojaloop/central-services-health": "14.0.2",
"@mojaloop/central-services-logger": "11.2.2",
"@mojaloop/central-services-metrics": "12.0.8",
"@mojaloop/central-services-shared": "18.1.3",
"@mojaloop/central-services-shared": "18.2.0",
"@mojaloop/central-services-stream": "11.2.0",
"@mojaloop/event-sdk": "14.0.0",
"@mojaloop/ml-number": "11.2.3",
Expand Down Expand Up @@ -127,7 +127,7 @@
"get-port": "5.1.1",
"jsdoc": "4.0.2",
"jsonpath": "1.1.1",
"nodemon": "3.0.1",
"nodemon": "3.0.2",
"npm-check-updates": "16.14.11",
"nyc": "15.1.0",
"pre-commit": "1.2.2",
Expand Down
49 changes: 24 additions & 25 deletions src/handlers/positions/handlerBatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,11 @@ const Enum = require('@mojaloop/central-services-shared').Enum
const Metrics = require('@mojaloop/central-services-metrics')
const Config = require('../../lib/config')
const { randomUUID } = require('crypto')
// const decodePayload = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodePayload
// const decodeMessages = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodeMessages
const ErrorHandler = require('@mojaloop/central-services-error-handling')
// const location = { module: 'PositionHandler', method: '', path: '' } // var object used as pointer
const BatchPositionModel = require('../../models/position/batch')
const decodePayload = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodePayload

const consumerCommit = true
// const fromSwitch = true

/**
* @function positions
Expand Down Expand Up @@ -90,9 +86,6 @@ const positions = async (error, messages) => {
const lastMessageOffset = consumedMessages[consumedMessages.length - 1]?.offset
const binId = `${firstMessageOffset}-${lastMessageOffset}`

// TODO: How to handle spans and audits for batch of messages ??
// Currently we have to create a span and do and audit for each message

// Iterate through consumedMessages
const bins = {}
for (const message of consumedMessages) {
Expand All @@ -101,20 +94,22 @@ const positions = async (error, messages) => {
'Process a prepare transfer message',
['success', 'action']
).startTimer()

// Create a span for each message
const contextFromMessage = EventSdk.Tracer.extractContextFromMessage(message.value)
const span = EventSdk.Tracer.createChildSpanFromContext('cl_transfer_position', contextFromMessage)
span.setTags({
processedAsBatch: true,
binId
})
// 1. Assign message to account-bin by accountID and child action-bin by action
// (References to the messages to be stored in bins, no duplication of messages)

// Assign message to account-bin by accountID and child action-bin by action
// (References to the messages to be stored in bins, no duplication of messages)
const accountID = message.key.toString()
const action = message.value.metadata.event.action

const accountBin = bins[accountID] || (bins[accountID] = {})
const actionBin = accountBin[action] || (accountBin[action] = [])

// Decode the payload and pass it as a separate parameter
const decodedPayload = decodePayload(message.value.content.payload)
actionBin.push({
Expand All @@ -128,39 +123,38 @@ const positions = async (error, messages) => {
await span.audit(message, EventSdk.AuditEventAction.start)
}

// 3. Start DB Transaction
// Start DB Transaction
const trx = await BatchPositionModel.startDbTransaction()

try {
// 4. Call Bin Processor with the list of account-bins and trx
// const decodedMessages = decodeMessages(consumedMessages)
// Call Bin Processor with the list of account-bins and trx
const result = await BinProcessor.processBins(bins, trx)

// 5. If Bin Processor processed bins successfully
// - 5.1. Commit Kafka offset
// If Bin Processor processed bins successfully, commit Kafka offset
// Commit the offset of last message in the array
const lastMessageToCommit = consumedMessages[consumedMessages.length - 1]
const params = { message: lastMessageToCommit, kafkaTopic: lastMessageToCommit.topic, consumer: Consumer }

// We are using Kafka.proceed() to just commit the offset of the last message in the array
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit })

// - 5.2. Commit DB transaction
// Commit DB transaction
await trx.commit()

// - 5.3. Loop through results and produce notification messages and audit messages
// Loop through results and produce notification messages and audit messages
for (const item of result.notifyMessages) {
// 5.3.1. Produce notification message and audit message
// Produce notification message and audit message
const action = item.binItem.message?.value.metadata.event.action
const eventStatus = item?.message.metadata.event.state.status === Enum.Events.EventStatus.SUCCESS.status ? Enum.Events.EventStatus.SUCCESS : Enum.Events.EventStatus.FAILURE
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Events.Event.Type.NOTIFICATION, action, item.message, eventStatus, null, item.binItem.span)
}
histTimerEnd({ success: true })
} catch (err) {
// 6. If Bin Processor returns failure
// 6.1. Rollback DB transaction
// If Bin Processor returns failure
// - Rollback DB transaction
await trx.rollback()

// 6.2. Audit Error for each message
// - Audit Error for each message
const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err)
const state = new EventSdk.EventStateMetadata(EventSdk.EventStatusType.failed, fspiopError.apiErrorCode.code, fspiopError.apiErrorCode.message)
await BinProcessor.iterateThroughBins(bins, async (_accountID, _action, item) => {
Expand Down Expand Up @@ -191,12 +185,17 @@ const positions = async (error, messages) => {
const registerPositionHandler = async () => {
try {
await SettlementModelCached.initialize()
// If there is no mapping, use default transformGeneralTopicName
const topicName =
Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.PREPARE ||
Kafka.transformGeneralTopicName(
Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE,
Enum.Events.Event.Type.POSITION,
Enum.Events.Event.Action.PREPARE
)
const positionHandler = {
command: positions,
// topicName: Kafka.transformGeneralTopicName(Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, Enum.Events.Event.Type.POSITION, Enum.Events.Event.Action.PREPARE),
// TODO: If there is no mapping, use default transformGeneralTopicName
topicName: Config.KAFKA_CONFIG.EVENT_TYPE_ACTION_TOPIC_MAP?.POSITION?.PREPARE,
// config: Kafka.getKafkaConfig(Config.KAFKA_CONFIG, Enum.Kafka.Config.CONSUMER, Enum.Events.Event.Type.TRANSFER.toUpperCase(), Enum.Events.Event.Action.POSITION.toUpperCase())
topicName,
// There is no corresponding action for POSITION_BATCH, so using straight value
config: Kafka.getKafkaConfig(Config.KAFKA_CONFIG, Enum.Kafka.Config.CONSUMER, Enum.Events.Event.Type.TRANSFER.toUpperCase(), 'POSITION_BATCH')
}
Expand Down

0 comments on commit 2b7be99

Please sign in to comment.