Skip to content

Commit

Permalink
feat: pep up queue, helps to alive broken cron tasks via ...:delay
Browse files Browse the repository at this point in the history
…steam

* fix: job name not necessarily

* feat: add pepUp queue mutation
  • Loading branch information
intpro authored May 21, 2020
1 parent 30a90f4 commit 1013f4f
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/composeBull.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
createJobUpdateFC,
createJobLogAddFC,
createJobMoveToDelayedFC,
createQueuePepUpFC,
} from './mutation';
import { wrapMutationFC, wrapQueueArgs, composeFC } from './helpers';

Expand Down Expand Up @@ -58,6 +59,7 @@ export function composeBull(opts: Options & { schemaComposer?: SchemaComposer<an
jobUpdate: wrapMutation(createJobUpdateFC),
jobLogAdd: wrapMutation(createJobLogAddFC),
jobMoveToDelayed: wrapMutation(createJobMoveToDelayedFC),
queuePepUp: wrapMutation(createQueuePepUpFC),
},
};
}
64 changes: 64 additions & 0 deletions src/helpers/fixDelayStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { MutationError, ErrorCodeEnum } from './MutationError';
import { getBullConnection } from './getBullConnection';
import { Options } from '../definitions';

type Record = {
id: string;
nextTimestamp: string;
};

type Result = {
first: Record;
last: Record;
fixLast: Record;
};

export async function addFixRecordToDelayStream(
prefix: string,
queueName: string,
opts: Options,
checkExistence: boolean = true
): Promise<Result> {
const redis = getBullConnection(opts);

const fullQueueName = [prefix, queueName].join(':');

if (checkExistence) {
const queueExists = await redis.exists([fullQueueName, 'meta'].join(':'));

if (!queueExists) {
throw new MutationError('Queue not found!', ErrorCodeEnum.QUEUE_NOT_FOUND);
}
}

const streamName = fullQueueName + ':delay';

const first = await redis.xrange(streamName, '-', '+', 'count', 1);
const last = await redis.xrevrange(streamName, '+', '-', 'count', 1);
const defaultRec = {
id: '0-0',
nextTimestamp: '',
};
const fixLastNextTimestamp = `${Date.now()}`;
const fixLastId = await redis.xadd(streamName, '*', 'nextTimestamp', fixLastNextTimestamp);

function readRecords(records: [string, string[]][]): Record {
if (records.length > 0) {
return {
id: records[0][0],
nextTimestamp: records[0][1][1],
};
} else {
return defaultRec;
}
}

return {
first: readRecords(first),
last: readRecords(last),
fixLast: {
id: fixLastId,
nextTimestamp: fixLastNextTimestamp,
},
};
}
1 change: 1 addition & 0 deletions src/helpers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export * from './wrapMutationFC';
export * from './wrapQueueArgs';
export * from './composeFC';
export * from './deleteKeys';
export * from './fixDelayStream';
1 change: 1 addition & 0 deletions src/mutation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ export { createJobRetryFC } from './jobRetry';
export { createJobUpdateFC } from './jobUpdate';
export { createJobLogAddFC } from './jobLogAdd';
export { createJobMoveToDelayedFC } from './jobMoveToDelayed';
export { createQueuePepUpFC } from './queuePepUp';
32 changes: 32 additions & 0 deletions src/mutation/queuePepUp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
import { addFixRecordToDelayStream } from '../helpers';
import { Options } from '../definitions';

export function createQueuePepUpFC(
sc: SchemaComposer<any>,
opts: Options
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
const { typePrefix } = opts;

return {
type: sc.createObjectTC({
name: `${typePrefix}QueuePepUpPayload`,
fields: { records: 'JSON' },
}),
args: {
prefix: {
type: 'String!',
defaultValue: 'bull',
},
queueName: 'String!',
checkExistence: {
type: 'Boolean',
defaultValue: true,
},
},
resolve: async (_, { prefix, queueName, checkExistence }) => {
const records = await addFixRecordToDelayStream(prefix, queueName, opts, checkExistence);
return { records };
},
};
}

0 comments on commit 1013f4f

Please sign in to comment.