Skip to content

Commit

Permalink
[Feature] Add queue and queue message classes
Browse files Browse the repository at this point in the history
  • Loading branch information
levinmr committed Jan 8, 2025
1 parent 90b7603 commit 9d3351a
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 90 deletions.
136 changes: 46 additions & 90 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
const { AsyncLocalStorage } = require("node:async_hooks");
const knex = require("knex");
const PgBoss = require("pg-boss");
const util = require("util");
const AppConfig = require("./src/app_config");
const ReportProcessingContext = require("./src/report_processing_context");
const Logger = require("./src/logger");
const Processor = require("./src/processor");
const PgBossKnexAdapter = require("./src/pg_boss_knex_adapter");
const Queue = require("./src/queue/queue");
const ReportJobQueueMessage = require("./src/queue/report_job_queue_message");

/**
* Gets an array of JSON report objects from the application confing, then runs
Expand Down Expand Up @@ -37,12 +37,16 @@ async function run(options = {}) {
const appConfig = new AppConfig(options);
const context = new ReportProcessingContext(new AsyncLocalStorage());
const reportConfigs = appConfig.filteredReportConfigurations;
const knexInstance = appConfig.shouldWriteToDatabase
? await knex(appConfig.knexConfig)
: undefined;
const processor = Processor.buildAnalyticsProcessor(
appConfig,
Logger.initialize({
agencyName: appConfig.agencyLogName,
scriptName: appConfig.scriptName,
}),
knexInstance,
);

for (const reportConfig of reportConfigs) {
Expand Down Expand Up @@ -124,57 +128,34 @@ async function runQueuePublish(options = {}) {
scriptName: appConfig.scriptName,
});
const knexInstance = await knex(appConfig.knexConfig);
const queueClient = await _initQueueClient(knexInstance, appLogger);
const queueClient = await _initQueueClient(
knexInstance,
appConfig.messageQueueName,
appLogger,
);

for (const agency of agencies) {
for (const reportConfig of reportConfigs) {
process.env.AGENCY_NAME = agency.agencyName;
const reportLogger = Logger.initialize({
agencyName: appConfig.agencyLogName,
scriptName: appConfig.scriptName,
reportName: reportConfig.name,
});
try {
let jobId = await queueClient.send(
appConfig.messageQueueName,
_createQueueMessage(
options,
agency,
await queueClient.sendMessage(
new ReportJobQueueMessage({
agencyName: agency.agencyName,
analyticsReportIds: agency.analyticsReportIds,
awsBucketPath: agency.awsBucketPath,
reportOptions: options,
reportConfig,
appConfig.scriptName,
),
{
priority: _messagePriority(reportConfig),
retryLimit: 2,
retryDelay: 10,
retryBackoff: true,
singletonKey: `${appConfig.scriptName}-${agency.agencyName}-${reportConfig.name}`,
},
scriptName: appConfig.scriptName,
}),
);
if (jobId) {
reportLogger.info(
`Created job in queue: ${appConfig.messageQueueName} with job ID: ${jobId}`,
);
} else {
reportLogger.info(
`Found a duplicate job in queue: ${appConfig.messageQueueName}`,
);
}
} catch (e) {
reportLogger.error(
`Error sending to queue: ${appConfig.messageQueueName}`,
);
reportLogger.error(util.inspect(e));
// Do nothing so that the remaining messages still process.
}
}
}

try {
await queueClient.stop();
appLogger.debug(`Stopping queue client`);
} catch (e) {
appLogger.error("Error stopping queue client");
appLogger.error(util.inspect(e));
} finally {
appLogger.debug(`Destroying database connection pool`);
knexInstance.destroy();
Expand All @@ -198,49 +179,29 @@ function _initAgencies(agencies_file) {
return Array.isArray(agencies) ? agencies : legacyAgencies;
}

async function _initQueueClient(knexInstance, logger) {
let queueClient;
try {
queueClient = new PgBoss({ db: new PgBossKnexAdapter(knexInstance) });
await queueClient.start();
logger.debug("Starting queue client");
} catch (e) {
logger.error("Error starting queue client");
logger.error(util.inspect(e));
}

async function _initQueueClient(knexInstance, queueName, logger) {
const queueClient = Queue.buildQueue({
knexInstance,
queueName,
messageClass: ReportJobQueueMessage,
logger,
});
await queueClient.start();
return queueClient;
}

function _createQueueMessage(options, agency, reportConfig, scriptName) {
return {
...agency,
options,
reportConfig,
scriptName,
};
}

function _messagePriority(reportConfig) {
if (!reportConfig.frequency) {
return 0;
} else if (reportConfig.frequency == "daily") {
return 1;
} else if (reportConfig.frequency == "hourly") {
return 2;
} else if (reportConfig.frequency == "realtime") {
return 3;
}
}

/**
* @returns {Promise} when the process ends
*/
async function runQueueConsume() {
const appConfig = new AppConfig();
const appLogger = Logger.initialize();
const knexInstance = await knex(appConfig.knexConfig);
const queueClient = await _initQueueClient(knexInstance, appLogger);
const queueClient = await _initQueueClient(
knexInstance,
appConfig.messageQueueName,
appLogger,
);

try {
const context = new ReportProcessingContext(new AsyncLocalStorage());
Expand All @@ -250,24 +211,19 @@ async function runQueueConsume() {
knexInstance,
);

await queueClient.work(
appConfig.messageQueueName,
{ newJobCheckIntervalSeconds: 1 },
async (message) => {
appLogger.info("Queue message received");
process.env.AGENCY_NAME = message.data.agencyName;
process.env.ANALYTICS_REPORT_IDS = message.data.analyticsReportIds;
process.env.AWS_BUCKET_PATH = message.data.awsBucketPath;
process.env.ANALYTICS_SCRIPT_NAME = message.data.scriptName;

await _processReport(
new AppConfig(message.data.options),
context,
message.data.reportConfig,
processor,
);
},
);
await queueClient.poll(async (message) => {
process.env.AGENCY_NAME = message.agencyName;
process.env.ANALYTICS_REPORT_IDS = message.analyticsReportIds;
process.env.AWS_BUCKET_PATH = message.awsBucketPath;
process.env.ANALYTICS_SCRIPT_NAME = message.scriptName;

await _processReport(
new AppConfig(message.options),
context,
message.reportConfig,
processor,
);
});
} catch (e) {
appLogger.error("Error polling queue for messages");
appLogger.error(util.inspect(e));
Expand Down
File renamed without changes.
130 changes: 130 additions & 0 deletions src/queue/queue.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
const PgBoss = require("pg-boss");
const PgBossKnexAdapter = require("./pg_boss_knex_adapter");
const util = require("util");

/**
* Implements a message queue using the PgBoss library.
*/
class Queue {
#queueClient;
#queueName;
#messageClass;
#logger;

/**
* @param {object} params the parameter object
* @param {import('pg-boss')} params.queueClient the queue client instance to
* use for queue operations.
* @param {string} params.queueName the identifier for the queue.
* @param {*} params.messageClass a class which implements the fromMessage
* static method to return an instance of the class from a PgBoss message
* object. This can be omitted if the queue instance only sends messages.
* @param {import('winston').Logger} params.logger an application logger instance.
*/
constructor({ queueClient, queueName, messageClass, logger }) {
this.#queueClient = queueClient;
this.#queueName = queueName;
this.#messageClass = messageClass;
this.#logger = logger;
}

/**
* @returns {string} the queue name
*/
get name() {
return this.#queueName;
}

/**
* @returns {Promise} resolves when the PgBoss queue client has started
*/
async start() {
try {
await this.#queueClient.start();
this.#logger.debug("Starting queue client");
} catch (e) {
this.#logger.error("Error starting queue client");
this.#logger.error(util.inspect(e));
throw e;
}
}

/**
* @returns {Promise} resolves when the PgBoss queue client has stopped
*/
async stop() {
try {
await this.#queueClient.stop();
this.#logger.debug(`Stopping queue client`);
} catch (e) {
this.#logger.error("Error stopping queue client");
this.#logger.error(util.inspect(e));
throw e;
}
}

/**
* @param {import('./queue_message')} queueMessage a QueueMessage instance
* @returns {string} a message ID or null if a duplicate message exists on the
* queue.
*/
async sendMessage(queueMessage) {
try {
const messageId = await this.#queueClient.send(
this.#queueName,
queueMessage.toJSON(),
queueMessage.sendOptions(),
);
if (messageId) {
this.#logger.info(
`Created message in queue: ${this.#queueName} with message ID: ${messageId}`,
);
} else {
this.#logger.info(
`Found a duplicate message in queue: ${this.#queueName}`,
);
}
return messageId;
} catch (e) {
this.#logger.error(`Error sending to queue: ${this.#queueName}`);
this.#logger.error(util.inspect(e));
throw e;
}
}

/**
* @param {Function} callback the function to call for each message
* @param {object} options the options to pass to the PgBoss work function
* @returns {Promise} resolves when the queue poller process stops
*/
poll(callback, options = { newJobCheckIntervalSeconds: 1 }) {
return this.#queueClient.work(this.#queueName, options, async (message) => {
this.#logger.info("Queue message received");
await callback(this.#messageClass.fromMessage(message).toJSON());
});
}

/**
* @param {object} params the parameter object
* @param {import('knex')} params.knexInstance an initialized instance of the knex
* library which provides a database connection.
* @param {string} params.queueName the name of the queue to use for the
* client.
* @param {*} params.messageClass a class which implements the fromMessage
* static method to return an instance of the class from a PgBoss message
* object. This can be omitted if the queue instance only sends messages.
* @param {import('winston').Logger} params.logger an application logger instance.
* @returns {Queue} the queue instance configured with the PgBoss queue
* client.
*/
static buildQueue({ knexInstance, queueName, messageClass, logger }) {
return new Queue({
queueClient: new PgBoss({ db: new PgBossKnexAdapter(knexInstance) }),
queueName,
messageClass,
logger,
});
}
}

module.exports = Queue;
28 changes: 28 additions & 0 deletions src/queue/queue_message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/**
* Abstract class for a queue message to be sent to a PgBoss queue client.
*/
class QueueMessage {
/**
* @returns {object} the class converted to a JSON object.
*/
toJSON() {
return {};
}

/**
* @returns {object} an options object for the PgBoss send method
*/
sendOptions() {
return {};
}

/**
* @param {object} message a PgBoss message object from the report job queue.
* @returns {QueueMessage} the built queue message instance.
*/
static fromMessage(message) {
return new QueueMessage(message.data);
}
}

module.exports = QueueMessage;
Loading

0 comments on commit 9d3351a

Please sign in to comment.