Skip to content

Commit

Permalink
feat: add jobsRetry mutation, where id argument is array (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
intpro authored Apr 26, 2023
1 parent 3ca1567 commit de3224b
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 5 deletions.
14 changes: 10 additions & 4 deletions example/src/demo_queues/fetchMetrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@ export const metricsQueue = new Queue(queueSettings.name, {
});

metricsQueue.add(
'fetch_metrics_every_5m',
{ field1: 'asdasdadas' },
{ repeat: { cron: '*/1 * * * *' } }
'fetch_metrics_every_2m',
{ field1: 'some data' },
{ repeat: { cron: '*/2 * * * *' } }
);

metricsQueue.add('fetch_metrics_every_5000', { field1: 'asdasdadas' }, { repeat: { every: 5000 } });
metricsQueue.add(
'fetch_metrics_every_30000',
{ field1: 'some data' },
{ repeat: { every: 30000 } }
);

const metricsWorker = new Worker(
queueSettings.name,
async (job) => {
if (Math.random() > 0.8) throw new Error('Bull worker random error');

for (let i = 0; i < 5; i++) {
job.updateProgress(i * 20);
await new Promise((resolve) => setTimeout(resolve, 1000));
Expand Down
2 changes: 2 additions & 0 deletions src/composeBull.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
createJobRremoveFC,
createJobRremoveBulkFC,
createJobRetryFC,
createJobsRetryFC,
createJobUpdateFC,
createJobLogAddFC,
createJobMoveToDelayedFC,
Expand Down Expand Up @@ -91,6 +92,7 @@ export function composeBull(
jobRemove: wrapMutation(createJobRremoveFC),
jobRemoveBulk: wrapMutation(createJobRremoveBulkFC),
jobRetry: wrapMutation(createJobRetryFC),
jobsRetry: wrapMutation(createJobsRetryFC),
jobUpdate: wrapMutation(createJobUpdateFC),
jobLogAdd: wrapMutation(createJobLogAddFC),
jobMoveToDelayed: wrapMutation(createJobMoveToDelayedFC),
Expand Down
2 changes: 1 addition & 1 deletion src/helpers/MutationError.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export class MutationError extends Error {
constructor(message: string, public code: ErrorCodeEnum) {
constructor(message: string, public code: ErrorCodeEnum, public id?: string) {
super(message);
Object.setPrototypeOf(this, MutationError.prototype);
}
Expand Down
3 changes: 3 additions & 0 deletions src/helpers/getAsArray.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export function getAsArray<T>(value: T | T[]): T[] {
return Array.isArray(value) ? value : [value];
}
1 change: 1 addition & 0 deletions src/mutation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export { createjobPromoteFC } from './jobPromote';
export { createJobRremoveFC } from './jobRemove';
export { createJobRremoveBulkFC } from './jobRemoveBulk';
export { createJobRetryFC } from './jobRetry';
export { createJobsRetryFC } from './jobsRetry';
export { createJobUpdateFC } from './jobUpdate';
export { createJobLogAddFC } from './jobLogAdd';
export { createJobMoveToDelayedFC } from './jobMoveToDelayed';
Expand Down
63 changes: 63 additions & 0 deletions src/mutation/jobsRetry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
import { MutationError, ErrorCodeEnum } from '../helpers';
import { JobStatusEnum, getJobStatusEnumTC } from '../types';
import { findQueue } from '../helpers';
import { Options } from '../definitions';
import { getAsArray } from '../helpers/getAsArray';

export function createJobsRetryFC(
sc: SchemaComposer<any>,
opts: Options
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
const { typePrefix } = opts;

return {
type: sc.createObjectTC({
name: `${typePrefix}JobsRetryPayload`,
fields: {
ids: '[String]',
state: getJobStatusEnumTC(sc, opts),
},
}),
args: {
prefix: {
type: 'String!',
defaultValue: 'bull',
},
queueName: 'String!',
ids: '[String!]!',
},
resolve: async (_, { prefix, queueName, ids }) => {
const queue = await findQueue(prefix, queueName, opts);
const _ids = getAsArray(ids);

if (_ids.length > 100) {
throw new MutationError(
'Arg. <id> constraint: send less than 100 IDs.',
ErrorCodeEnum.OTHER_ERROR
);
}

const promises: Promise<void>[] = [];

for (const _id of _ids) {
promises.push(
queue.getJob(_id).then((job) => {
if (!job)
throw new MutationError(`Job ${_id} not found!`, ErrorCodeEnum.JOB_NOT_FOUND, _id);
return job.retry();
})
);
}

// Let there be a delay (await),
// this will make the execution more obvious to client.
await Promise.all(promises);

return {
ids,
state: JobStatusEnum.WAITING,
};
},
};
}

0 comments on commit de3224b

Please sign in to comment.