diff --git a/src/Structures/TaskManager.ts b/src/Structures/TaskManager.ts index 3b64808..3bc65c4 100644 --- a/src/Structures/TaskManager.ts +++ b/src/Structures/TaskManager.ts @@ -10,6 +10,8 @@ import { ListenerStore } from "../Stores/ListenerStore.js"; import { Util } from "../Utilities/Util.js"; import { createAmqp, RoutingPublisher, RpcSubscriber } from "@nezuchan/cordis-brokers"; import { handleJob } from "../Utilities/handleJob.js"; +import { Result } from "@sapphire/result"; +import { cast } from "@sapphire/utilities"; export class TaskManager extends EventEmitter { public stores = new StoreRegistry(); @@ -68,14 +70,24 @@ export class TaskManager extends EventEmitter { name: `${process.env.AMQP_QUEUE_NAME ?? "scheduled-tasks"}.send`, cb: async message => { const isJobReady = await this.bull.isReady(); - return handleJob(message, isJobReady, this.clusterId, this); + const result = await Result.fromAsync(() => handleJob(message, isJobReady, this.clusterId, this)); + if (result.isErr()) { + this.logger.error(result.unwrapErr()); + return JSON.stringify({ error: cast(result.unwrapErr()).toString() }); + } + return result.unwrap(); } }); await this.amqpReceiverCluster.init({ name: `${process.env.AMQP_QUEUE_NAME ?? "scheduled-tasks"}.send-cluster-${this.clusterId}`, cb: async message => { const isJobReady = await this.bull.isReady(); - return handleJob(message, isJobReady, this.clusterId, this); + const result = await Result.fromAsync(() => handleJob(message, isJobReady, this.clusterId, this)); + if (result.isErr()) { + this.logger.error(result.unwrapErr()); + return JSON.stringify({ error: cast(result.unwrapErr()).toString() }); + } + return result.unwrap(); } }); await this.amqpSender.init({ name: `${process.env.AMQP_QUEUE_NAME ?? "scheduled-tasks"}.recv`, durable: true, exchangeType: "topic", useExchangeBinding: true }); diff --git a/src/Utilities/handleJob.ts b/src/Utilities/handleJob.ts index 29a1547..2ec724c 100644 --- a/src/Utilities/handleJob.ts +++ b/src/Utilities/handleJob.ts @@ -6,7 +6,7 @@ import { TaskManager } from "../Structures/TaskManager.js"; export async function handleJob(message: Record, bull: Bull.Queue, clusterId: number, manager: TaskManager): Promise { switch (message.type) { case "add": { - if (!message.payload) { + if (!message.data) { return JSON.stringify({ message: "Please provide payload to sent when job is done !" }); } if (!message.options) { @@ -16,10 +16,9 @@ export async function handleJob(message: Record, bull: Bull.Queue, return JSON.stringify({ message: "Please provide name of the job to be created !" }); } manager.logger.info(`Adding job ${message.name} to queue ${bull.name} in cluster ${manager.clusterId}`); - const job = await bull.add(cast(message.name), message.payload, cast(message.options)); + const job = await bull.add(cast(message.name), message.data, cast(message.options)); return JSON.stringify({ message: "Job added successfully !", jobId: job.id.toString(), fromCluster: clusterId }); } - case "remove": { if (!message.jobId) { return JSON.stringify({ message: "Please provide jobId to be removed !" }); @@ -37,9 +36,171 @@ export async function handleJob(message: Record, bull: Bull.Queue, manager.logger.info(`Job with id ${message.jobId} removed successfully !`); return JSON.stringify({ message: "Job success deleted !", fromCluster: clusterId }); } + case "pause": { + manager.logger.info(`Received request to pause queue ${bull.name} in cluster ${clusterId}`); + await bull.pause(cast(message.isLocal), cast(message.doNotWaitActive)); + return JSON.stringify({ message: "Bull job success paused !", fromCluster: clusterId }); + } + case "resume": { + manager.logger.info(`Received request to resume queue ${bull.name} in cluster ${clusterId}`); + await bull.resume(cast(message.isLocal)); + return JSON.stringify({ message: "Bull job success resumed !", fromCluster: clusterId }); + } + case "getJob": { + if (!message.jobId) { + return JSON.stringify({ message: "Please provide jobId to lookup !" }); + } + + manager.logger.info(`Received request to get job ${message.jobId} in cluster ${clusterId}`); + + const job = await bull.getJob(cast(message.jobId)); + if (!job) { + manager.logger.info(`Job with id ${message.jobId} not found in cluster ${clusterId}`); + return JSON.stringify({ message: "Job not found !" }); + } + + return JSON.stringify({ message: "Job found !", job: job.toJSON(), fromCluster: clusterId }); + } + case "getJobs": { + manager.logger.info(`Received request to get jobs in cluster ${clusterId}`); + + if (!message.types) { + return JSON.stringify({ message: "Please provide types to lookup !" }); + } + + const jobs = await bull.getJobs(cast(message.types), cast(message.start), cast(message.end), cast(message.asc)); + return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId }); + } + case "getWaiting": { + manager.logger.info(`Received request to get waiting jobs in cluster ${clusterId}`); + + const jobs = await bull.getWaiting(); + return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId }); + } + case "getActive": { + manager.logger.info(`Received request to get active jobs in cluster ${clusterId}`); + + const jobs = await bull.getActive(); + return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId }); + } + case "getCompleted": { + manager.logger.info(`Received request to get completed jobs in cluster ${clusterId}`); + + const jobs = await bull.getCompleted(); + return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId }); + } + case "getFailed": { + manager.logger.info(`Received request to get failed jobs in cluster ${clusterId}`); + + const jobs = await bull.getFailed(); + return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId }); + } + case "getDelayed": { + manager.logger.info(`Received request to get delayed jobs in cluster ${clusterId}`); + + const jobs = await bull.getDelayed(); + return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId }); + } + case "getActiveCount": { + manager.logger.info(`Received request to get active count jobs in cluster ${clusterId}`); + + const active = await bull.getActiveCount(); + return JSON.stringify({ message: "Jobs found !", active, fromCluster: clusterId }); + } + case "getWaitingCount": { + manager.logger.info(`Received request to get waiting jobs in cluster ${clusterId}`); + + const waiting = await bull.getWaitingCount(); + return JSON.stringify({ message: "Jobs found !", waiting, fromCluster: clusterId }); + } + case "getCompletedCount": { + manager.logger.info(`Received request to get completed jobs in cluster ${clusterId}`); + + const completed = await bull.getCompletedCount(); + return JSON.stringify({ message: "Jobs found !", completed, fromCluster: clusterId }); + } + case "getFailedCount": { + manager.logger.info(`Received request to get failed jobs in cluster ${clusterId}`); + + const failed = await bull.getFailedCount(); + return JSON.stringify({ message: "Jobs found !", failed, fromCluster: clusterId }); + } + case "getDelayedCount": { + manager.logger.info(`Received request to get delayed jobs in cluster ${clusterId}`); + + const delayed = await bull.getDelayedCount(); + return JSON.stringify({ message: "Jobs found !", delayed, fromCluster: clusterId }); + } + case "addBulk": { + manager.logger.info(`Received request to add bulk jobs in cluster ${clusterId}`); + if (!Array.isArray(message.jobs)) { + return JSON.stringify({ message: "Please provide jobs to add !" }); + } + const jobs = await bull.addBulk(cast(message.jobs)); + return JSON.stringify({ message: "Jobs added !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId }); + } + case "removeJobs": { + manager.logger.info(`Received request to remove jobs in cluster ${clusterId}`); + + if (!message.pattern) { + return JSON.stringify({ message: "Please provide jobIds to remove !" }); + } + + await bull.removeJobs(cast(message.pattern)); + return JSON.stringify({ message: "Jobs removed !", fromCluster: clusterId }); + } + case "getNextJob": { + manager.logger.info(`Received request to get next job in cluster ${clusterId}`); + + const job = await bull.getNextJob(); + + if (!job) { + manager.logger.info(`No job found in cluster ${clusterId}`); + return JSON.stringify({ message: "No job found !", fromCluster: clusterId }); + } + + return JSON.stringify({ message: "Job found !", job: job.toJSON(), fromCluster: clusterId }); + } + case "getJobCountByTypes": { + manager.logger.info(`Received request to get job count by types in cluster ${clusterId}`); + + if (!message.types) { + return JSON.stringify({ message: "Please provide types to lookup !" }); + } + + const jobCountByTypes = await bull.getJobCountByTypes(cast(message.types)); + return JSON.stringify({ message: "Job count by types found !", jobCountByTypes, fromCluster: clusterId }); + } + case "nextRepeatableJob": { + manager.logger.info(`Received request to get next repeatable job in cluster ${clusterId}`); + + if (!message.data) { + return JSON.stringify({ message: "Please provide data to loop up !" }); + } + if (!message.options) { + return JSON.stringify({ message: "Please provide options to be passed to bull job !" }); + } + if (!message.name) { + return JSON.stringify({ message: "Please provide name of the job to look up !" }); + } + const job = await bull.nextRepeatableJob(cast(message.name), message.data, cast(message.options)); + return JSON.stringify({ message: "Job found !", job: job.toJSON(), fromCluster: clusterId }); + } + case "getRepeatableJobs": { + manager.logger.info(`Received request to get repeatable jobs in cluster ${clusterId}`); + + const jobs = await bull.getRepeatableJobs(cast(message.start), cast(message.end), cast(message.asc)); + return JSON.stringify({ message: "Jobs found !", jobs, fromCluster: clusterId }); + } default: { - manager.logger.info(message, `Unhandled job type ${message.type}`); - return JSON.stringify({ message: "Unhandled job, please open issue if you need this to be handled !" }); + manager.logger.warn(message, `Unhandled job type ${message.type}`); + return JSON.stringify({ message: "Unhandled job, please open issue if you need this to be handled or this is missing from the implementation !" }); } } } + +interface BulkJobType { + name?: string | undefined; + data: any; + opts?: Omit | undefined; +}