Skip to content

Commit

Permalink
Auto retry when a feed is rated limited based on host
Browse files Browse the repository at this point in the history
  • Loading branch information
synzen committed Dec 23, 2024
1 parent e4c0030 commit f315078
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import { RequestSource } from './constants/request-source.constants';
import PartitionedRequestsStoreService from '../partitioned-requests-store/partitioned-requests-store.service';
import { PartitionedRequestInsert } from '../partitioned-requests-store/types/partitioned-request.type';
import { HostRateLimiterService } from '../host-rate-limiter/host-rate-limiter.service';
import retryUntilTrue, {
RetryException,
} from '../shared/utils/retry-until-true';

interface BatchRequestMessage {
timestamp: number;
Expand Down Expand Up @@ -65,81 +68,103 @@ export class FeedFetcherListenerService {

@UseRequestContext()
private async onBrokerFetchRequestBatchHandler(
message: BatchRequestMessage,
batchRequest: BatchRequestMessage,
): Promise<void> {
const urls = message?.data?.map((u) => u.url);
const rateSeconds = message?.rateSeconds;
const urls = batchRequest?.data?.map((u) => u.url);
const rateSeconds = batchRequest?.rateSeconds;

if (!message.data || rateSeconds == null) {
if (!batchRequest.data || rateSeconds == null) {
logger.error(
`Received fetch batch request message has no urls and/or rateSeconds, skipping`,
{
event: message,
event: batchRequest,
},
);

return;
}

logger.debug(`Fetch batch request message received for batch urls`, {
event: message,
event: batchRequest,
});

try {
const results = await Promise.allSettled(
message.data.map(
async ({ url, lookupKey, saveToObjectStorage, headers }) => {
let request: PartitionedRequestInsert | undefined = undefined;

try {
const { isRateLimited } =
await this.hostRateLimiterService.incrementUrlCount(url);

if (isRateLimited) {
logger.debug(`Host ${url} is rate limited, skipping`);

return;
}
batchRequest.data.map(async (message) => {
const { url, lookupKey, saveToObjectStorage, headers } = message;
let request: PartitionedRequestInsert | undefined = undefined;

try {
await retryUntilTrue(
async () => {
const { isRateLimited } =
await this.hostRateLimiterService.incrementUrlCount(url);

if (isRateLimited) {
logger.debug(
`Host ${url} is still rate limited, retrying later`,
);
}

return !isRateLimited;
},
5000,
(rateSeconds * 1000) / 1.5, // 1.5 is the backoff factor of retryUntilTrue
);

const result = await this.handleBrokerFetchRequest({
lookupKey,
url,
rateSeconds,
saveToObjectStorage,
headers,
});

if (result) {
request = result.request;
}

const result = await this.handleBrokerFetchRequest({
if (result.successful) {
await this.emitFetchCompleted({
lookupKey,
url,
rateSeconds,
saveToObjectStorage,
headers,
rateSeconds: rateSeconds,
});

if (result) {
request = result.request;
}

if (result.successful) {
await this.emitFetchCompleted({
lookupKey,
}
} catch (err) {
if (err instanceof RetryException) {
logger.error(
`Error while retrying due to host rate limits: ${err.message}`,
{
event: message,
err: (err as Error).stack,
},
);
} else {
logger.error(`Error processing fetch request message`, {
event: message,
err: (err as Error).stack,
});
}
} finally {
if (batchRequest.timestamp) {
const nowTs = Date.now();
const finishedTs = nowTs - batchRequest.timestamp;

logger.datadog(
`Finished handling feed requests batch event URL in ${finishedTs}s`,
{
duration: finishedTs,
url,
rateSeconds: rateSeconds,
});
}
} finally {
if (message.timestamp) {
const nowTs = Date.now();
const finishedTs = nowTs - message.timestamp;

logger.datadog(
`Finished handling feed requests batch event URL in ${finishedTs}s`,
{
duration: finishedTs,
url,
lookupKey,
requestStatus: request?.status,
statusCode: request?.response?.statusCode,
errorMessage: request?.errorMessage,
},
);
}
lookupKey,
requestStatus: request?.status,
statusCode: request?.response?.statusCode,
errorMessage: request?.errorMessage,
},
);
}
},
),
}
}),
);

for (let i = 0; i < results.length; ++i) {
Expand All @@ -163,7 +188,7 @@ export class FeedFetcherListenerService {
});
} catch (err) {
logger.error(`Error processing fetch batch request message`, {
event: message,
event: batchRequest,
err: (err as Error).stack,
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ const RATE_LIMITED_HOSTS = new Map<string, RateLimitData>([
intervalSec: 2,
},
],
[
'nasdaq.com',
{
requestLimit: 1,
intervalSec: 30,
},
],
]);

@Injectable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { MikroORM } from '@mikro-orm/core';
import { RequestSource } from '../feed-fetcher/constants/request-source.constants';
import { Request } from '../feed-fetcher/entities';
import { RequestStatus } from '../feed-fetcher/constants';
import { getUrlHost } from '../utils/get-url-host';
import logger from '../utils/logger';

const sha1 = createHash('sha1');

Expand Down Expand Up @@ -32,6 +34,17 @@ export default class PartitionedRequestsStoreService {
await Promise.all(
this.pendingInserts.map((responseInsert) => {
const urlHash = sha1.copy().update(responseInsert.url).digest('hex');
let hostHash: string | null = null;

try {
const host = getUrlHost(responseInsert.url);
hostHash = sha1.copy().update(host).digest('hex');
} catch (err) {
logger.error('Failed to get host from url', {
url: responseInsert.url,
err: (err as Error).stack,
});
}

return em.execute(
`INSERT INTO request_partitioned (
Expand All @@ -41,6 +54,7 @@ export default class PartitionedRequestsStoreService {
fetch_options,
url,
url_hash,
host_hash,
lookup_key,
created_at,
next_retry_date,
Expand All @@ -51,7 +65,7 @@ export default class PartitionedRequestsStoreService {
response_redis_cache_key,
response_headers
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)`,
[
randomUUID(),
Expand All @@ -60,6 +74,7 @@ export default class PartitionedRequestsStoreService {
this.stringifyJson(responseInsert.fetchOptions),
responseInsert.url,
urlHash,
hostHash,
responseInsert.lookupKey,
responseInsert.createdAt,
responseInsert.nextRetryDate,
Expand Down
58 changes: 58 additions & 0 deletions services/feed-requests/src/shared/utils/retry-until-true.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
export class RetryException extends Error {}

const delay = (timeout: number) =>
new Promise<void>((resolve) => setTimeout(resolve, timeout));

/**
* Repeatedly calls the provided function until it returns true or the maximum timeout is reached.
*
* @param {() => Promise<boolean>} fn - The function to be called repeatedly.
* @param {number} startTimeout - The initial timeout in milliseconds.
* @param {number} maxTimeout - The maximum timeout in milliseconds.
* Uses exponential backoff for retries.
* @returns {Promise<void>} A promise that resolves when the function returns true or rejects if
* the maximum timeout is reached.
*/
const retryUntilTrue = (
fn: () => Promise<boolean>,
startTimeout: number,
maxTimeout: number,
) => {
return new Promise<void>(async (resolve, reject) => {
try {
let currentTimeout = startTimeout;

while (true) {
if (await fn()) {
resolve();

return;
}

await delay(currentTimeout);
currentTimeout = Math.min(currentTimeout * 1.5, maxTimeout);

if (currentTimeout >= maxTimeout) {
break;
}
}

reject(
new RetryException(
`Timeout reached (next timeout of ${currentTimeout} is greater than max timeout of` +
` ${maxTimeout})`,
),
);
} catch (err) {
const toThrow = new RetryException((err as Error).message);

if (err instanceof Error) {
toThrow.stack = err.stack;
}

reject(toThrow);
}
});
};

export default retryUntilTrue;
5 changes: 5 additions & 0 deletions services/feed-requests/src/utils/get-url-host.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { URL } from 'node:url';

export const getUrlHost = (url: string) => {
return new URL(url).host;
};

0 comments on commit f315078

Please sign in to comment.