Skip to content

Commit

Permalink
feat: bull full coverage (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
KagChi authored Jul 4, 2022
1 parent bdb0a30 commit c368231
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 7 deletions.
16 changes: 14 additions & 2 deletions src/Structures/TaskManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { ListenerStore } from "../Stores/ListenerStore.js";
import { Util } from "../Utilities/Util.js";
import { createAmqp, RoutingPublisher, RpcSubscriber } from "@nezuchan/cordis-brokers";
import { handleJob } from "../Utilities/handleJob.js";
import { Result } from "@sapphire/result";
import { cast } from "@sapphire/utilities";

export class TaskManager extends EventEmitter {
public stores = new StoreRegistry();
Expand Down Expand Up @@ -68,14 +70,24 @@ export class TaskManager extends EventEmitter {
name: `${process.env.AMQP_QUEUE_NAME ?? "scheduled-tasks"}.send`,
cb: async message => {
const isJobReady = await this.bull.isReady();
return handleJob(message, isJobReady, this.clusterId, this);
const result = await Result.fromAsync(() => handleJob(message, isJobReady, this.clusterId, this));
if (result.isErr()) {
this.logger.error(result.unwrapErr());
return JSON.stringify({ error: cast<string>(result.unwrapErr()).toString() });
}
return result.unwrap();
}
});
await this.amqpReceiverCluster.init({
name: `${process.env.AMQP_QUEUE_NAME ?? "scheduled-tasks"}.send-cluster-${this.clusterId}`,
cb: async message => {
const isJobReady = await this.bull.isReady();
return handleJob(message, isJobReady, this.clusterId, this);
const result = await Result.fromAsync(() => handleJob(message, isJobReady, this.clusterId, this));
if (result.isErr()) {
this.logger.error(result.unwrapErr());
return JSON.stringify({ error: cast<string>(result.unwrapErr()).toString() });
}
return result.unwrap();
}
});
await this.amqpSender.init({ name: `${process.env.AMQP_QUEUE_NAME ?? "scheduled-tasks"}.recv`, durable: true, exchangeType: "topic", useExchangeBinding: true });
Expand Down
171 changes: 166 additions & 5 deletions src/Utilities/handleJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { TaskManager } from "../Structures/TaskManager.js";
export async function handleJob(message: Record<string, any>, bull: Bull.Queue, clusterId: number, manager: TaskManager): Promise<string> {
switch (message.type) {
case "add": {
if (!message.payload) {
if (!message.data) {
return JSON.stringify({ message: "Please provide payload to sent when job is done !" });
}
if (!message.options) {
Expand All @@ -16,10 +16,9 @@ export async function handleJob(message: Record<string, any>, bull: Bull.Queue,
return JSON.stringify({ message: "Please provide name of the job to be created !" });
}
manager.logger.info(`Adding job ${message.name} to queue ${bull.name} in cluster ${manager.clusterId}`);
const job = await bull.add(cast<string>(message.name), message.payload, cast<Bull.JobOptions>(message.options));
const job = await bull.add(cast<string>(message.name), message.data, cast<Bull.JobOptions>(message.options));
return JSON.stringify({ message: "Job added successfully !", jobId: job.id.toString(), fromCluster: clusterId });
}

case "remove": {
if (!message.jobId) {
return JSON.stringify({ message: "Please provide jobId to be removed !" });
Expand All @@ -37,9 +36,171 @@ export async function handleJob(message: Record<string, any>, bull: Bull.Queue,
manager.logger.info(`Job with id ${message.jobId} removed successfully !`);
return JSON.stringify({ message: "Job success deleted !", fromCluster: clusterId });
}
case "pause": {
manager.logger.info(`Received request to pause queue ${bull.name} in cluster ${clusterId}`);
await bull.pause(cast<boolean | undefined>(message.isLocal), cast<boolean | undefined>(message.doNotWaitActive));
return JSON.stringify({ message: "Bull job success paused !", fromCluster: clusterId });
}
case "resume": {
manager.logger.info(`Received request to resume queue ${bull.name} in cluster ${clusterId}`);
await bull.resume(cast<boolean | undefined>(message.isLocal));
return JSON.stringify({ message: "Bull job success resumed !", fromCluster: clusterId });
}
case "getJob": {
if (!message.jobId) {
return JSON.stringify({ message: "Please provide jobId to lookup !" });
}

manager.logger.info(`Received request to get job ${message.jobId} in cluster ${clusterId}`);

const job = await bull.getJob(cast<string>(message.jobId));
if (!job) {
manager.logger.info(`Job with id ${message.jobId} not found in cluster ${clusterId}`);
return JSON.stringify({ message: "Job not found !" });
}

return JSON.stringify({ message: "Job found !", job: job.toJSON(), fromCluster: clusterId });
}
case "getJobs": {
manager.logger.info(`Received request to get jobs in cluster ${clusterId}`);

if (!message.types) {
return JSON.stringify({ message: "Please provide types to lookup !" });
}

const jobs = await bull.getJobs(cast<Bull.JobStatus[]>(message.types), cast<number>(message.start), cast<number>(message.end), cast<boolean>(message.asc));
return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId });
}
case "getWaiting": {
manager.logger.info(`Received request to get waiting jobs in cluster ${clusterId}`);

const jobs = await bull.getWaiting();
return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId });
}
case "getActive": {
manager.logger.info(`Received request to get active jobs in cluster ${clusterId}`);

const jobs = await bull.getActive();
return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId });
}
case "getCompleted": {
manager.logger.info(`Received request to get completed jobs in cluster ${clusterId}`);

const jobs = await bull.getCompleted();
return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId });
}
case "getFailed": {
manager.logger.info(`Received request to get failed jobs in cluster ${clusterId}`);

const jobs = await bull.getFailed();
return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId });
}
case "getDelayed": {
manager.logger.info(`Received request to get delayed jobs in cluster ${clusterId}`);

const jobs = await bull.getDelayed();
return JSON.stringify({ message: "Jobs found !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId });
}
case "getActiveCount": {
manager.logger.info(`Received request to get active count jobs in cluster ${clusterId}`);

const active = await bull.getActiveCount();
return JSON.stringify({ message: "Jobs found !", active, fromCluster: clusterId });
}
case "getWaitingCount": {
manager.logger.info(`Received request to get waiting jobs in cluster ${clusterId}`);

const waiting = await bull.getWaitingCount();
return JSON.stringify({ message: "Jobs found !", waiting, fromCluster: clusterId });
}
case "getCompletedCount": {
manager.logger.info(`Received request to get completed jobs in cluster ${clusterId}`);

const completed = await bull.getCompletedCount();
return JSON.stringify({ message: "Jobs found !", completed, fromCluster: clusterId });
}
case "getFailedCount": {
manager.logger.info(`Received request to get failed jobs in cluster ${clusterId}`);

const failed = await bull.getFailedCount();
return JSON.stringify({ message: "Jobs found !", failed, fromCluster: clusterId });
}
case "getDelayedCount": {
manager.logger.info(`Received request to get delayed jobs in cluster ${clusterId}`);

const delayed = await bull.getDelayedCount();
return JSON.stringify({ message: "Jobs found !", delayed, fromCluster: clusterId });
}
case "addBulk": {
manager.logger.info(`Received request to add bulk jobs in cluster ${clusterId}`);
if (!Array.isArray(message.jobs)) {
return JSON.stringify({ message: "Please provide jobs to add !" });
}
const jobs = await bull.addBulk(cast<BulkJobType[]>(message.jobs));
return JSON.stringify({ message: "Jobs added !", jobs: jobs.map(j => j.toJSON()), fromCluster: clusterId });
}
case "removeJobs": {
manager.logger.info(`Received request to remove jobs in cluster ${clusterId}`);

if (!message.pattern) {
return JSON.stringify({ message: "Please provide jobIds to remove !" });
}

await bull.removeJobs(cast<string>(message.pattern));
return JSON.stringify({ message: "Jobs removed !", fromCluster: clusterId });
}
case "getNextJob": {
manager.logger.info(`Received request to get next job in cluster ${clusterId}`);

const job = await bull.getNextJob();

if (!job) {
manager.logger.info(`No job found in cluster ${clusterId}`);
return JSON.stringify({ message: "No job found !", fromCluster: clusterId });
}

return JSON.stringify({ message: "Job found !", job: job.toJSON(), fromCluster: clusterId });
}
case "getJobCountByTypes": {
manager.logger.info(`Received request to get job count by types in cluster ${clusterId}`);

if (!message.types) {
return JSON.stringify({ message: "Please provide types to lookup !" });
}

const jobCountByTypes = await bull.getJobCountByTypes(cast<Bull.JobStatus | Bull.JobStatus[]>(message.types));
return JSON.stringify({ message: "Job count by types found !", jobCountByTypes, fromCluster: clusterId });
}
case "nextRepeatableJob": {
manager.logger.info(`Received request to get next repeatable job in cluster ${clusterId}`);

if (!message.data) {
return JSON.stringify({ message: "Please provide data to loop up !" });
}
if (!message.options) {
return JSON.stringify({ message: "Please provide options to be passed to bull job !" });
}
if (!message.name) {
return JSON.stringify({ message: "Please provide name of the job to look up !" });
}
const job = await bull.nextRepeatableJob(cast<string>(message.name), message.data, cast<Bull.JobOptions>(message.options));
return JSON.stringify({ message: "Job found !", job: job.toJSON(), fromCluster: clusterId });
}
case "getRepeatableJobs": {
manager.logger.info(`Received request to get repeatable jobs in cluster ${clusterId}`);

const jobs = await bull.getRepeatableJobs(cast<number>(message.start), cast<number>(message.end), cast<boolean>(message.asc));
return JSON.stringify({ message: "Jobs found !", jobs, fromCluster: clusterId });
}
default: {
manager.logger.info(message, `Unhandled job type ${message.type}`);
return JSON.stringify({ message: "Unhandled job, please open issue if you need this to be handled !" });
manager.logger.warn(message, `Unhandled job type ${message.type}`);
return JSON.stringify({ message: "Unhandled job, please open issue if you need this to be handled or this is missing from the implementation !" });
}
}
}

interface BulkJobType {
name?: string | undefined;
data: any;
opts?: Omit<Bull.JobOptions, "repeat"> | undefined;
}

0 comments on commit c368231

Please sign in to comment.