From 1013f4fcc463b3b6011153259a63943ea59a906d Mon Sep 17 00:00:00 2001 From: intpro Date: Thu, 21 May 2020 13:38:32 +0600 Subject: [PATCH] feat: pep up queue, helps to alive broken cron tasks via `...:delay` steam * fix: job name not necessarily * feat: add pepUp queue mutation --- src/composeBull.ts | 2 ++ src/helpers/fixDelayStream.ts | 64 +++++++++++++++++++++++++++++++++++ src/helpers/index.ts | 1 + src/mutation/index.ts | 1 + src/mutation/queuePepUp.ts | 32 ++++++++++++++++++ 5 files changed, 100 insertions(+) create mode 100644 src/helpers/fixDelayStream.ts create mode 100644 src/mutation/queuePepUp.ts diff --git a/src/composeBull.ts b/src/composeBull.ts index 125e4fb..165beb1 100644 --- a/src/composeBull.ts +++ b/src/composeBull.ts @@ -21,6 +21,7 @@ import { createJobUpdateFC, createJobLogAddFC, createJobMoveToDelayedFC, + createQueuePepUpFC, } from './mutation'; import { wrapMutationFC, wrapQueueArgs, composeFC } from './helpers'; @@ -58,6 +59,7 @@ export function composeBull(opts: Options & { schemaComposer?: SchemaComposer { + const redis = getBullConnection(opts); + + const fullQueueName = [prefix, queueName].join(':'); + + if (checkExistence) { + const queueExists = await redis.exists([fullQueueName, 'meta'].join(':')); + + if (!queueExists) { + throw new MutationError('Queue not found!', ErrorCodeEnum.QUEUE_NOT_FOUND); + } + } + + const streamName = fullQueueName + ':delay'; + + const first = await redis.xrange(streamName, '-', '+', 'count', 1); + const last = await redis.xrevrange(streamName, '+', '-', 'count', 1); + const defaultRec = { + id: '0-0', + nextTimestamp: '', + }; + const fixLastNextTimestamp = `${Date.now()}`; + const fixLastId = await redis.xadd(streamName, '*', 'nextTimestamp', fixLastNextTimestamp); + + function readRecords(records: [string, string[]][]): Record { + if (records.length > 0) { + return { + id: records[0][0], + nextTimestamp: records[0][1][1], + }; + } else { + return defaultRec; + } + } + + return { + first: readRecords(first), + last: readRecords(last), + fixLast: { + id: fixLastId, + nextTimestamp: fixLastNextTimestamp, + }, + }; +} diff --git a/src/helpers/index.ts b/src/helpers/index.ts index 9e3af47..2a89cf5 100644 --- a/src/helpers/index.ts +++ b/src/helpers/index.ts @@ -6,3 +6,4 @@ export * from './wrapMutationFC'; export * from './wrapQueueArgs'; export * from './composeFC'; export * from './deleteKeys'; +export * from './fixDelayStream'; diff --git a/src/mutation/index.ts b/src/mutation/index.ts index a2840ae..68b9636 100644 --- a/src/mutation/index.ts +++ b/src/mutation/index.ts @@ -16,3 +16,4 @@ export { createJobRetryFC } from './jobRetry'; export { createJobUpdateFC } from './jobUpdate'; export { createJobLogAddFC } from './jobLogAdd'; export { createJobMoveToDelayedFC } from './jobMoveToDelayed'; +export { createQueuePepUpFC } from './queuePepUp'; diff --git a/src/mutation/queuePepUp.ts b/src/mutation/queuePepUp.ts new file mode 100644 index 0000000..fe2db2b --- /dev/null +++ b/src/mutation/queuePepUp.ts @@ -0,0 +1,32 @@ +import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose'; +import { addFixRecordToDelayStream } from '../helpers'; +import { Options } from '../definitions'; + +export function createQueuePepUpFC( + sc: SchemaComposer, + opts: Options +): ObjectTypeComposerFieldConfigAsObjectDefinition { + const { typePrefix } = opts; + + return { + type: sc.createObjectTC({ + name: `${typePrefix}QueuePepUpPayload`, + fields: { records: 'JSON' }, + }), + args: { + prefix: { + type: 'String!', + defaultValue: 'bull', + }, + queueName: 'String!', + checkExistence: { + type: 'Boolean', + defaultValue: true, + }, + }, + resolve: async (_, { prefix, queueName, checkExistence }) => { + const records = await addFixRecordToDelayStream(prefix, queueName, opts, checkExistence); + return { records }; + }, + }; +}