From f3150788c4f7a8bcf0fa05d00ec1b9aef602fe15 Mon Sep 17 00:00:00 2001 From: gintil Date: Mon, 23 Dec 2024 10:32:07 -0500 Subject: [PATCH] Auto retry when a feed is rated limited based on host --- .../feed-fetcher-listener.service.ts | 133 +++++++++++------- .../host-rate-limiter.service.ts | 7 + .../partitioned-requests-store.service.ts | 17 ++- .../src/shared/utils/retry-until-true.ts | 58 ++++++++ .../feed-requests/src/utils/get-url-host.ts | 5 + 5 files changed, 165 insertions(+), 55 deletions(-) create mode 100644 services/feed-requests/src/shared/utils/retry-until-true.ts create mode 100644 services/feed-requests/src/utils/get-url-host.ts diff --git a/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts b/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts index 49457dc30..483b8fe37 100644 --- a/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts +++ b/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts @@ -12,6 +12,9 @@ import { RequestSource } from './constants/request-source.constants'; import PartitionedRequestsStoreService from '../partitioned-requests-store/partitioned-requests-store.service'; import { PartitionedRequestInsert } from '../partitioned-requests-store/types/partitioned-request.type'; import { HostRateLimiterService } from '../host-rate-limiter/host-rate-limiter.service'; +import retryUntilTrue, { + RetryException, +} from '../shared/utils/retry-until-true'; interface BatchRequestMessage { timestamp: number; @@ -65,16 +68,16 @@ export class FeedFetcherListenerService { @UseRequestContext() private async onBrokerFetchRequestBatchHandler( - message: BatchRequestMessage, + batchRequest: BatchRequestMessage, ): Promise { - const urls = message?.data?.map((u) => u.url); - const rateSeconds = message?.rateSeconds; + const urls = batchRequest?.data?.map((u) => u.url); + const rateSeconds = batchRequest?.rateSeconds; - if (!message.data || rateSeconds == null) { + if (!batchRequest.data || rateSeconds == null) { logger.error( `Received fetch batch request message has no urls and/or rateSeconds, skipping`, { - event: message, + event: batchRequest, }, ); @@ -82,64 +85,86 @@ export class FeedFetcherListenerService { } logger.debug(`Fetch batch request message received for batch urls`, { - event: message, + event: batchRequest, }); try { const results = await Promise.allSettled( - message.data.map( - async ({ url, lookupKey, saveToObjectStorage, headers }) => { - let request: PartitionedRequestInsert | undefined = undefined; - - try { - const { isRateLimited } = - await this.hostRateLimiterService.incrementUrlCount(url); - - if (isRateLimited) { - logger.debug(`Host ${url} is rate limited, skipping`); - - return; - } + batchRequest.data.map(async (message) => { + const { url, lookupKey, saveToObjectStorage, headers } = message; + let request: PartitionedRequestInsert | undefined = undefined; + + try { + await retryUntilTrue( + async () => { + const { isRateLimited } = + await this.hostRateLimiterService.incrementUrlCount(url); + + if (isRateLimited) { + logger.debug( + `Host ${url} is still rate limited, retrying later`, + ); + } + + return !isRateLimited; + }, + 5000, + (rateSeconds * 1000) / 1.5, // 1.5 is the backoff factor of retryUntilTrue + ); + + const result = await this.handleBrokerFetchRequest({ + lookupKey, + url, + rateSeconds, + saveToObjectStorage, + headers, + }); + + if (result) { + request = result.request; + } - const result = await this.handleBrokerFetchRequest({ + if (result.successful) { + await this.emitFetchCompleted({ lookupKey, url, - rateSeconds, - saveToObjectStorage, - headers, + rateSeconds: rateSeconds, }); - - if (result) { - request = result.request; - } - - if (result.successful) { - await this.emitFetchCompleted({ - lookupKey, + } + } catch (err) { + if (err instanceof RetryException) { + logger.error( + `Error while retrying due to host rate limits: ${err.message}`, + { + event: message, + err: (err as Error).stack, + }, + ); + } else { + logger.error(`Error processing fetch request message`, { + event: message, + err: (err as Error).stack, + }); + } + } finally { + if (batchRequest.timestamp) { + const nowTs = Date.now(); + const finishedTs = nowTs - batchRequest.timestamp; + + logger.datadog( + `Finished handling feed requests batch event URL in ${finishedTs}s`, + { + duration: finishedTs, url, - rateSeconds: rateSeconds, - }); - } - } finally { - if (message.timestamp) { - const nowTs = Date.now(); - const finishedTs = nowTs - message.timestamp; - - logger.datadog( - `Finished handling feed requests batch event URL in ${finishedTs}s`, - { - duration: finishedTs, - url, - lookupKey, - requestStatus: request?.status, - statusCode: request?.response?.statusCode, - errorMessage: request?.errorMessage, - }, - ); - } + lookupKey, + requestStatus: request?.status, + statusCode: request?.response?.statusCode, + errorMessage: request?.errorMessage, + }, + ); } - }, - ), + } + }), ); for (let i = 0; i < results.length; ++i) { @@ -163,7 +188,7 @@ export class FeedFetcherListenerService { }); } catch (err) { logger.error(`Error processing fetch batch request message`, { - event: message, + event: batchRequest, err: (err as Error).stack, }); } diff --git a/services/feed-requests/src/host-rate-limiter/host-rate-limiter.service.ts b/services/feed-requests/src/host-rate-limiter/host-rate-limiter.service.ts index e1123cc26..15fd964bb 100644 --- a/services/feed-requests/src/host-rate-limiter/host-rate-limiter.service.ts +++ b/services/feed-requests/src/host-rate-limiter/host-rate-limiter.service.ts @@ -15,6 +15,13 @@ const RATE_LIMITED_HOSTS = new Map([ intervalSec: 2, }, ], + [ + 'nasdaq.com', + { + requestLimit: 1, + intervalSec: 30, + }, + ], ]); @Injectable() diff --git a/services/feed-requests/src/partitioned-requests-store/partitioned-requests-store.service.ts b/services/feed-requests/src/partitioned-requests-store/partitioned-requests-store.service.ts index d7daf47ac..c514ef0bf 100644 --- a/services/feed-requests/src/partitioned-requests-store/partitioned-requests-store.service.ts +++ b/services/feed-requests/src/partitioned-requests-store/partitioned-requests-store.service.ts @@ -5,6 +5,8 @@ import { MikroORM } from '@mikro-orm/core'; import { RequestSource } from '../feed-fetcher/constants/request-source.constants'; import { Request } from '../feed-fetcher/entities'; import { RequestStatus } from '../feed-fetcher/constants'; +import { getUrlHost } from '../utils/get-url-host'; +import logger from '../utils/logger'; const sha1 = createHash('sha1'); @@ -32,6 +34,17 @@ export default class PartitionedRequestsStoreService { await Promise.all( this.pendingInserts.map((responseInsert) => { const urlHash = sha1.copy().update(responseInsert.url).digest('hex'); + let hostHash: string | null = null; + + try { + const host = getUrlHost(responseInsert.url); + hostHash = sha1.copy().update(host).digest('hex'); + } catch (err) { + logger.error('Failed to get host from url', { + url: responseInsert.url, + err: (err as Error).stack, + }); + } return em.execute( `INSERT INTO request_partitioned ( @@ -41,6 +54,7 @@ export default class PartitionedRequestsStoreService { fetch_options, url, url_hash, + host_hash, lookup_key, created_at, next_retry_date, @@ -51,7 +65,7 @@ export default class PartitionedRequestsStoreService { response_redis_cache_key, response_headers ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )`, [ randomUUID(), @@ -60,6 +74,7 @@ export default class PartitionedRequestsStoreService { this.stringifyJson(responseInsert.fetchOptions), responseInsert.url, urlHash, + hostHash, responseInsert.lookupKey, responseInsert.createdAt, responseInsert.nextRetryDate, diff --git a/services/feed-requests/src/shared/utils/retry-until-true.ts b/services/feed-requests/src/shared/utils/retry-until-true.ts new file mode 100644 index 000000000..d48eb6759 --- /dev/null +++ b/services/feed-requests/src/shared/utils/retry-until-true.ts @@ -0,0 +1,58 @@ +export class RetryException extends Error {} + +const delay = (timeout: number) => + new Promise((resolve) => setTimeout(resolve, timeout)); + +/** + * Repeatedly calls the provided function until it returns true or the maximum timeout is reached. + * + * @param {() => Promise} fn - The function to be called repeatedly. + * @param {number} startTimeout - The initial timeout in milliseconds. + * @param {number} maxTimeout - The maximum timeout in milliseconds. + * Uses exponential backoff for retries. + * @returns {Promise} A promise that resolves when the function returns true or rejects if + * the maximum timeout is reached. + */ +const retryUntilTrue = ( + fn: () => Promise, + startTimeout: number, + maxTimeout: number, +) => { + return new Promise(async (resolve, reject) => { + try { + let currentTimeout = startTimeout; + + while (true) { + if (await fn()) { + resolve(); + + return; + } + + await delay(currentTimeout); + currentTimeout = Math.min(currentTimeout * 1.5, maxTimeout); + + if (currentTimeout >= maxTimeout) { + break; + } + } + + reject( + new RetryException( + `Timeout reached (next timeout of ${currentTimeout} is greater than max timeout of` + + ` ${maxTimeout})`, + ), + ); + } catch (err) { + const toThrow = new RetryException((err as Error).message); + + if (err instanceof Error) { + toThrow.stack = err.stack; + } + + reject(toThrow); + } + }); +}; + +export default retryUntilTrue; diff --git a/services/feed-requests/src/utils/get-url-host.ts b/services/feed-requests/src/utils/get-url-host.ts new file mode 100644 index 000000000..9055a7260 --- /dev/null +++ b/services/feed-requests/src/utils/get-url-host.ts @@ -0,0 +1,5 @@ +import { URL } from 'node:url'; + +export const getUrlHost = (url: string) => { + return new URL(url).host; +};