diff --git a/src/helpers/queueTitles.ts b/src/helpers/queueTitles.ts index 4c600f7..dd0d2ca 100644 --- a/src/helpers/queueTitles.ts +++ b/src/helpers/queueTitles.ts @@ -16,3 +16,35 @@ export async function fetchQueueTitles(prefix: string, opts: Options): Promise> { + const connection = getBullConnection(opts); + const normalizedPrefixGlob = normalizePrefixGlob(prefix); + + const result = await new Promise((resolve, reject) => { + //redis-cli: scan 0 match bull.vpc:*:meta count 100000 + const stream = connection.scanStream({ match: normalizedPrefixGlob, count: 100000 }); + + const titles: string[] = []; + + stream.on('data', async (keys) => { + for (let i = 0; i < keys.length; i++) { + titles.push(keys[i]); + } + }); + + stream.on('end', () => { + resolve(titles); + }); + + stream.on('error', reject); + }); + + return result.map((key) => { + const parts = key.split(':'); + return { + prefix: parts.slice(0, -2).join(':'), + queueName: parts[parts.length - 2], + }; + }); +} diff --git a/src/query/queueKeys.ts b/src/query/queueKeys.ts index 9c28781..38b5450 100644 --- a/src/query/queueKeys.ts +++ b/src/query/queueKeys.ts @@ -1,5 +1,5 @@ import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose'; -import { fetchQueueTitles } from '../helpers'; +import { scanQueueTitles } from '../helpers'; import { Options } from '../definitions'; export function createQueueKeysFC( @@ -22,7 +22,7 @@ export function createQueueKeysFC( }, }, resolve: async (_, { prefixGlob }) => { - return fetchQueueTitles(prefixGlob, opts); + return scanQueueTitles(prefixGlob, opts); }, }; } diff --git a/src/query/queues.ts b/src/query/queues.ts index 01afdad..b619ac6 100644 --- a/src/query/queues.ts +++ b/src/query/queues.ts @@ -1,6 +1,6 @@ import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose'; import { getQueueTC } from '../types/queue/Queue'; -import { fetchQueueTitles, getQueues } from '../helpers'; +import { scanQueueTitles, getQueues } from '../helpers'; import { Options } from '../definitions'; export function createQueuesFC( @@ -16,7 +16,7 @@ export function createQueuesFC( }, }, resolve: async (_, { prefix }) => { - const titles = await fetchQueueTitles(prefix, opts); + const titles = await scanQueueTitles(prefix, opts); return getQueues(titles, opts); }, };