From edcb95bcabeee31f66294df592337683028131ea Mon Sep 17 00:00:00 2001 From: gintil Date: Sun, 5 Jan 2025 14:16:41 -0500 Subject: [PATCH] Avoid duplicate feed request processing via redis --- .../cache-storage/cache-storage.service.ts | 33 +++++++++++++ .../feed-fetcher-listener.service.ts | 49 ++++++++++++------- 2 files changed, 64 insertions(+), 18 deletions(-) diff --git a/services/feed-requests/src/cache-storage/cache-storage.service.ts b/services/feed-requests/src/cache-storage/cache-storage.service.ts index dc566478a..b4bdc146b 100644 --- a/services/feed-requests/src/cache-storage/cache-storage.service.ts +++ b/services/feed-requests/src/cache-storage/cache-storage.service.ts @@ -17,6 +17,39 @@ export class CacheStorageService { return `feed-requests:${key}`; } + async set({ + key, + body, + expSeconds, + getOldValue, + }: { + body: string; + key: string; + expSeconds?: number; + getOldValue?: boolean; + }) { + try { + return await this.redisClient.set(this.generateKey(key), body, { + EX: expSeconds, + GET: getOldValue ? true : undefined, + }); + } catch (err) { + logger.error(`Failed to set content in cache storage`, { + err: (err as Error).stack, + }); + } + } + + async del(key: string) { + try { + await this.redisClient.del(this.generateKey(key)); + } catch (err) { + logger.error(`Failed to delete content from cache storage`, { + err: (err as Error).stack, + }); + } + } + async increment( key: string, opts?: { diff --git a/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts b/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts index fe72ae5fc..9dc789f16 100644 --- a/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts +++ b/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts @@ -15,6 +15,7 @@ import { HostRateLimiterService } from '../host-rate-limiter/host-rate-limiter.s import retryUntilTrue, { RetryException, } from '../shared/utils/retry-until-true'; +import { CacheStorageService } from '../cache-storage/cache-storage.service'; interface BatchRequestMessage { timestamp: number; @@ -40,6 +41,7 @@ export class FeedFetcherListenerService { private readonly em: EntityManager, private readonly partitionedRequestsStoreService: PartitionedRequestsStoreService, private readonly hostRateLimiterService: HostRateLimiterService, + private readonly cacheStorageService: CacheStorageService, ) { this.maxFailAttempts = this.configService.get( 'FEED_REQUESTS_MAX_FAIL_ATTEMPTS', @@ -49,6 +51,14 @@ export class FeedFetcherListenerService { ); } + calculateCacheKeyForMessage = ( + event: BatchRequestMessage['data'][number], + ) => { + const lookupKey = event.lookupKey || event.url; + + return `listener-service-${lookupKey}`; + }; + static BASE_FAILED_ATTEMPT_WAIT_MINUTES = 5; @RabbitSubscribe({ @@ -94,6 +104,23 @@ export class FeedFetcherListenerService { const { url, lookupKey, saveToObjectStorage, headers } = message; let request: PartitionedRequestInsert | undefined = undefined; + const currentlyProcessing = await this.cacheStorageService.set({ + key: this.calculateCacheKeyForMessage(message), + body: '1', + getOldValue: true, + expSeconds: rateSeconds, + }); + + if (currentlyProcessing) { + logger.info( + `Request with key ${ + lookupKey || url + } with rate ${rateSeconds} is already being processed, skipping`, + ); + + return; + } + try { await retryUntilTrue( async () => { @@ -163,6 +190,10 @@ export class FeedFetcherListenerService { }, ); } + + await this.cacheStorageService.del( + this.calculateCacheKeyForMessage(message), + ); } }), ); @@ -208,24 +239,6 @@ export class FeedFetcherListenerService { const rateSeconds = data.rateSeconds; const lookupKey = data.lookupKey; - const dateToCheck = dayjs() - .subtract(Math.round(rateSeconds * 0.75), 'seconds') - .toDate(); - - const latestRequestAfterTime = - await this.partitionedRequestsStoreService.getLatestStatusAfterTime( - lookupKey || url, - dateToCheck, - ); - - if (latestRequestAfterTime) { - logger.debug( - `Request ${url} with rate ${rateSeconds} has been recently processed, skipping`, - ); - - return { successful: latestRequestAfterTime.status === RequestStatus.OK }; - } - const { skip, nextRetryDate, failedAttemptsCount } = await this.shouldSkipAfterPreviousFailedAttempt({ lookupKey,