Skip to content

Commit

Permalink
Avoid duplicate feed request processing via redis
Browse files Browse the repository at this point in the history
  • Loading branch information
synzen committed Jan 5, 2025
1 parent 25ad0a3 commit edcb95b
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 18 deletions.
33 changes: 33 additions & 0 deletions services/feed-requests/src/cache-storage/cache-storage.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,39 @@ export class CacheStorageService {
return `feed-requests:${key}`;
}

async set({
key,
body,
expSeconds,
getOldValue,
}: {
body: string;
key: string;
expSeconds?: number;
getOldValue?: boolean;
}) {
try {
return await this.redisClient.set(this.generateKey(key), body, {
EX: expSeconds,
GET: getOldValue ? true : undefined,
});
} catch (err) {
logger.error(`Failed to set content in cache storage`, {
err: (err as Error).stack,
});
}
}

async del(key: string) {
try {
await this.redisClient.del(this.generateKey(key));
} catch (err) {
logger.error(`Failed to delete content from cache storage`, {
err: (err as Error).stack,
});
}
}

async increment(
key: string,
opts?: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { HostRateLimiterService } from '../host-rate-limiter/host-rate-limiter.s
import retryUntilTrue, {
RetryException,
} from '../shared/utils/retry-until-true';
import { CacheStorageService } from '../cache-storage/cache-storage.service';

interface BatchRequestMessage {
timestamp: number;
Expand All @@ -40,6 +41,7 @@ export class FeedFetcherListenerService {
private readonly em: EntityManager,
private readonly partitionedRequestsStoreService: PartitionedRequestsStoreService,
private readonly hostRateLimiterService: HostRateLimiterService,
private readonly cacheStorageService: CacheStorageService,
) {
this.maxFailAttempts = this.configService.get(
'FEED_REQUESTS_MAX_FAIL_ATTEMPTS',
Expand All @@ -49,6 +51,14 @@ export class FeedFetcherListenerService {
);
}

calculateCacheKeyForMessage = (
event: BatchRequestMessage['data'][number],
) => {
const lookupKey = event.lookupKey || event.url;

return `listener-service-${lookupKey}`;
};

static BASE_FAILED_ATTEMPT_WAIT_MINUTES = 5;

@RabbitSubscribe({
Expand Down Expand Up @@ -94,6 +104,23 @@ export class FeedFetcherListenerService {
const { url, lookupKey, saveToObjectStorage, headers } = message;
let request: PartitionedRequestInsert | undefined = undefined;

const currentlyProcessing = await this.cacheStorageService.set({
key: this.calculateCacheKeyForMessage(message),
body: '1',
getOldValue: true,
expSeconds: rateSeconds,
});

if (currentlyProcessing) {
logger.info(
`Request with key ${
lookupKey || url
} with rate ${rateSeconds} is already being processed, skipping`,
);

return;
}

try {
await retryUntilTrue(
async () => {
Expand Down Expand Up @@ -163,6 +190,10 @@ export class FeedFetcherListenerService {
},
);
}

await this.cacheStorageService.del(
this.calculateCacheKeyForMessage(message),
);
}
}),
);
Expand Down Expand Up @@ -208,24 +239,6 @@ export class FeedFetcherListenerService {
const rateSeconds = data.rateSeconds;
const lookupKey = data.lookupKey;

const dateToCheck = dayjs()
.subtract(Math.round(rateSeconds * 0.75), 'seconds')
.toDate();

const latestRequestAfterTime =
await this.partitionedRequestsStoreService.getLatestStatusAfterTime(
lookupKey || url,
dateToCheck,
);

if (latestRequestAfterTime) {
logger.debug(
`Request ${url} with rate ${rateSeconds} has been recently processed, skipping`,
);

return { successful: latestRequestAfterTime.status === RequestStatus.OK };
}

const { skip, nextRetryDate, failedAttemptsCount } =
await this.shouldSkipAfterPreviousFailedAttempt({
lookupKey,
Expand Down

0 comments on commit edcb95b

Please sign in to comment.