Skip to content

Commit

Permalink
Store feed response bodies in postgres instead of redis
Browse files Browse the repository at this point in the history
  • Loading branch information
synzen committed Jan 3, 2025
1 parent 67b51d0 commit 67e40e5
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 35 deletions.
24 changes: 24 additions & 0 deletions services/feed-requests/migrations/Migration20250102112346.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { Migration } from '@mikro-orm/migrations';

export class Migration20250102112346 extends Migration {

async up(): Promise<void> {
// create table response_bodies with auto increment id, text column "body" and hash_key column "hash_key"
this.addSql('CREATE TABLE "response_bodies" ("content" TEXT NOT NULL, "hash_key" TEXT NOT NULL);');
// create index on hash_key column
this.addSql('CREATE UNIQUE INDEX "response_bodies_hash_key" ON "response_bodies" ("hash_key");');

// add column response_body_hash_key to request_partitioned
this.addSql('ALTER TABLE "request_partitioned" ADD COLUMN "response_body_hash_key" TEXT DEFAULT NULL NULL;');
// add foreign key constraint to response_body_hash_key column
this.addSql(`ALTER TABLE "request_partitioned" ADD CONSTRAINT "request_partitioned_response_body_hash_key_fk" FOREIGN KEY ("response_body_hash_key") REFERENCES "response_bodies" ("hash_key") ON DELETE SET NULL;`);
}

async down(): Promise<void> {
// drop table response_bodies
this.addSql('DROP TABLE "response_bodies";');
// drop column response_body_hash_key from request_partitioned
this.addSql('ALTER TABLE "request_partitioned" DROP COLUMN "response_body_hash_key";');
}

}
15 changes: 15 additions & 0 deletions services/feed-requests/migrations/Migration20250102220055.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { Migration } from '@mikro-orm/migrations';

export class Migration20250102220055 extends Migration {

async up(): Promise<void> {
// add column content_hash to response_bodies
this.addSql('ALTER TABLE "response_bodies" ADD COLUMN "content_hash" TEXT DEFAULT NULL NULL;');
}

async down(): Promise<void> {
// drop column content_hash from response_bodies
this.addSql('ALTER TABLE "response_bodies" DROP COLUMN "content_hash";');
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ export class Response {
nullable: true,
})
createdAt: Date = new Date();

responseHashKey!: string | null;
content!: string | null;
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ export class FeedFetcherListenerService {
}

const latestOkRequest =
await this.partitionedRequestsStoreService.getLatestOkRequestWithResponseBody(
await this.partitionedRequestsStoreService.getLatestRequestWithOkStatus(
data.lookupKey || data.url,
{ fields: ['response_headers'] },
);
Expand All @@ -259,9 +259,9 @@ export class FeedFetcherListenerService {
source: RequestSource.Schedule,
headers: {
...data.headers,
// 'If-Modified-Since':
// latestOkRequest?.responseHeaders?.['last-modified'] || '',
// 'If-None-Match': latestOkRequest?.responseHeaders?.etag,
'If-Modified-Since':
latestOkRequest?.responseHeaders?.['last-modified'] || '',
'If-None-Match': latestOkRequest?.responseHeaders?.etag,
},
},
);
Expand Down Expand Up @@ -509,8 +509,11 @@ export class FeedFetcherListenerService {
url: string;
}): Promise<number> {
const latestOkRequest =
await this.partitionedRequestsStoreService.getLatestOkRequestWithResponseBody(
await this.partitionedRequestsStoreService.getLatestRequestWithOkStatus(
lookupKey || url,
{
include304: true,
},
);

return this.partitionedRequestsStoreService.countFailedRequests(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ export class FeedFetcherController {

if (
data.hashToCompare &&
latestRequest.request.response?.textHash &&
data.hashToCompare === latestRequest.request.response?.textHash
) {
return {
Expand Down
40 changes: 22 additions & 18 deletions services/feed-requests/src/feed-fetcher/feed-fetcher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,16 @@ export class FeedFetcherService {
return null;
}

if (request.response?.redisCacheKey) {
const compressedText = await this.cacheStorageService.getFeedHtmlContent({
key: request.response.redisCacheKey,
});
if (request.response?.redisCacheKey || request.response?.content) {
let compressedText: string | null = null;

if (request.response.content) {
compressedText = request.response.content;
} else if (request.response.redisCacheKey) {
compressedText = await this.cacheStorageService.getFeedHtmlContent({
key: request.response.redisCacheKey,
});
}

const text = compressedText
? (
Expand Down Expand Up @@ -277,23 +283,14 @@ export class FeedFetcherService {
}
}

response.textHash = text
? sha1.copy().update(text).digest('hex')
: '';

/**
* This has a problem where 304 responses will have no feed response, which won't refresh the
* previous cache contents for 200 codes
*/
const hashKey =
const key =
url + JSON.stringify(request.fetchOptions) + res.status.toString();

response.redisCacheKey = sha1.copy().update(hashKey).digest('hex');
if (text.length) {
response.responseHashKey = sha1.copy().update(key).digest('hex');

await this.cacheStorageService.setFeedHtmlContent({
key: response.redisCacheKey,
body: compressedText,
});
response.content = compressedText;
}
} catch (err) {
if (err instanceof FeedTooLargeException) {
throw err;
Expand Down Expand Up @@ -339,6 +336,13 @@ export class FeedFetcherService {
s3ObjectKey: response.s3ObjectKey || null,
redisCacheKey: response.redisCacheKey || null,
headers: response.headers,
body:
response.responseHashKey && response.content
? {
hashKey: response.responseHashKey,
contents: response.content,
}
: null,
},
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export default class PartitionedRequestsStoreService {

try {
await Promise.all(
this.pendingInserts.map((responseInsert) => {
this.pendingInserts.map(async (responseInsert) => {
const urlHash = sha1.copy().update(responseInsert.url).digest('hex');
let hostHash: string | null = null;

Expand All @@ -46,6 +46,35 @@ export default class PartitionedRequestsStoreService {
});
}

if (responseInsert.response?.body) {
const contentHash = sha1
.copy()
.update(responseInsert.response.body.contents)
.digest('hex');

// does this content already exist in the db?
const results = await em.execute(
`SELECT 1 FROM response_bodies WHERE hash_key = ? AND content_hash = ?`,
[responseInsert.response.body.hashKey, contentHash],
);

if (results.length === 0) {
await em.execute(
`INSERT INTO response_bodies (content, content_hash, hash_key) VALUES (?, ?, ?)
ON CONFLICT (hash_key) DO UPDATE SET content = ?, content_hash = ?
`,
[
responseInsert.response.body.contents,
contentHash,
responseInsert.response.body.hashKey,
responseInsert.response.body.contents,
contentHash,
],
transaction,
);
}
}

return em.execute(
`INSERT INTO request_partitioned (
id,
Expand All @@ -61,11 +90,12 @@ export default class PartitionedRequestsStoreService {
error_message,
response_status_code,
response_text_hash,
response_body_hash_key,
response_s3object_key,
response_redis_cache_key,
response_headers
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)`,
[
randomUUID(),
Expand All @@ -80,7 +110,8 @@ export default class PartitionedRequestsStoreService {
responseInsert.nextRetryDate,
responseInsert.errorMessage,
responseInsert.response?.statusCode,
responseInsert.response?.textHash,
null,
responseInsert.response?.body?.hashKey,
responseInsert.response?.s3ObjectKey,
responseInsert.response?.redisCacheKey,
this.stringifyJson(responseInsert.response?.headers),
Expand Down Expand Up @@ -118,9 +149,10 @@ export default class PartitionedRequestsStoreService {
return new Date(result.next_retry_date) || null;
}

async getLatestOkRequestWithResponseBody(
async getLatestRequestWithOkStatus(
lookupKey: string,
opts?: {
opts: {
include304?: boolean;
fields?: Array<'response_headers'>;
},
): Promise<null | {
Expand Down Expand Up @@ -244,9 +276,13 @@ export default class PartitionedRequestsStoreService {
const em = this.orm.em.getConnection();

const [result] = await em.execute(
`SELECT * FROM request_partitioned
WHERE lookup_key = ?
AND response_status_code != 304
`SELECT req.*, res.content AS response_body_content,
res.content_hash AS response_content_hash
FROM request_partitioned req
LEFT JOIN response_bodies res
ON req.response_body_hash_key = res.hash_key
WHERE req.lookup_key = ?
AND req.response_status_code != 304
ORDER BY created_at DESC
LIMIT 1`,
[lookupKey],
Expand Down Expand Up @@ -274,13 +310,15 @@ export default class PartitionedRequestsStoreService {
? {
id: result.id,
statusCode: result.response_status_code,
textHash: result.response_text_hash,
textHash: result.response_content_hash,
hasCompressedText: true,
isCloudflare: false,
s3ObjectKey: result.response_s3object_key,
redisCacheKey: result.response_redis_cache_key,
headers: result.response_headers,
createdAt: new Date(result.created_at),
content: result.response_body_content,
responseHashKey: result.response_body_hash_key,
}
: null,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@ export interface PartitionedRequestInsert {
s3ObjectKey: string | null;
redisCacheKey: string | null;
headers: object;
body: {
hashKey: string;
contents: string;
} | null;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ export class ArticleParserService {
err.message === "Not a feed" ||
err.message.startsWith("Unexpected end")
) {
reject(new InvalidFeedException("Invalid feed"));
reject(new InvalidFeedException("Invalid feed", xml));
} else {
reject(err);
}
Expand Down
2 changes: 1 addition & 1 deletion services/user-feeds/src/articles/articles.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -969,7 +969,7 @@ export class ArticlesService {
return await this.feedParserPool.exec("getArticlesFromXml", args);
} catch (err) {
if (err instanceof Error && err.message === "Invalid feed") {
throw new InvalidFeedException(err.message);
throw new InvalidFeedException(err.message, args[0]);
}

throw err;
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
export class InvalidFeedException extends Error {}
export class InvalidFeedException extends Error {
constructor(message: string, public feedText: string) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ export class FeedEventHandlerService {
if (err instanceof InvalidFeedException) {
logger.info(`Ignoring feed event due to invalid feed`, {
event,
xml: err.feedText,
stack: (err as Error).stack,
});

Expand Down

0 comments on commit 67e40e5

Please sign in to comment.