diff --git a/services/feed-requests/migrations/Migration20250102112346.ts b/services/feed-requests/migrations/Migration20250102112346.ts new file mode 100644 index 000000000..9e12b2ddb --- /dev/null +++ b/services/feed-requests/migrations/Migration20250102112346.ts @@ -0,0 +1,24 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20250102112346 extends Migration { + + async up(): Promise { + // create table response_bodies with auto increment id, text column "body" and hash_key column "hash_key" + this.addSql('CREATE TABLE "response_bodies" ("content" TEXT NOT NULL, "hash_key" TEXT NOT NULL);'); + // create index on hash_key column + this.addSql('CREATE UNIQUE INDEX "response_bodies_hash_key" ON "response_bodies" ("hash_key");'); + + // add column response_body_hash_key to request_partitioned + this.addSql('ALTER TABLE "request_partitioned" ADD COLUMN "response_body_hash_key" TEXT DEFAULT NULL NULL;'); + // add foreign key constraint to response_body_hash_key column + this.addSql(`ALTER TABLE "request_partitioned" ADD CONSTRAINT "request_partitioned_response_body_hash_key_fk" FOREIGN KEY ("response_body_hash_key") REFERENCES "response_bodies" ("hash_key") ON DELETE SET NULL;`); + } + + async down(): Promise { + // drop table response_bodies + this.addSql('DROP TABLE "response_bodies";'); + // drop column response_body_hash_key from request_partitioned + this.addSql('ALTER TABLE "request_partitioned" DROP COLUMN "response_body_hash_key";'); + } + +} diff --git a/services/feed-requests/migrations/Migration20250102220055.ts b/services/feed-requests/migrations/Migration20250102220055.ts new file mode 100644 index 000000000..c3c4752c2 --- /dev/null +++ b/services/feed-requests/migrations/Migration20250102220055.ts @@ -0,0 +1,15 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20250102220055 extends Migration { + + async up(): Promise { + // add column content_hash to response_bodies + this.addSql('ALTER TABLE "response_bodies" ADD COLUMN "content_hash" TEXT DEFAULT NULL NULL;'); + } + + async down(): Promise { + // drop column content_hash from response_bodies + this.addSql('ALTER TABLE "response_bodies" DROP COLUMN "content_hash";'); + } + +} diff --git a/services/feed-requests/src/feed-fetcher/entities/response.entity.ts b/services/feed-requests/src/feed-fetcher/entities/response.entity.ts index abbca7bf3..121419f1c 100644 --- a/services/feed-requests/src/feed-fetcher/entities/response.entity.ts +++ b/services/feed-requests/src/feed-fetcher/entities/response.entity.ts @@ -59,4 +59,7 @@ export class Response { nullable: true, }) createdAt: Date = new Date(); + + responseHashKey!: string | null; + content!: string | null; } diff --git a/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts b/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts index 1859b1303..fe72ae5fc 100644 --- a/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts +++ b/services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts @@ -242,7 +242,7 @@ export class FeedFetcherListenerService { } const latestOkRequest = - await this.partitionedRequestsStoreService.getLatestOkRequestWithResponseBody( + await this.partitionedRequestsStoreService.getLatestRequestWithOkStatus( data.lookupKey || data.url, { fields: ['response_headers'] }, ); @@ -259,9 +259,9 @@ export class FeedFetcherListenerService { source: RequestSource.Schedule, headers: { ...data.headers, - // 'If-Modified-Since': - // latestOkRequest?.responseHeaders?.['last-modified'] || '', - // 'If-None-Match': latestOkRequest?.responseHeaders?.etag, + 'If-Modified-Since': + latestOkRequest?.responseHeaders?.['last-modified'] || '', + 'If-None-Match': latestOkRequest?.responseHeaders?.etag, }, }, ); @@ -509,8 +509,11 @@ export class FeedFetcherListenerService { url: string; }): Promise { const latestOkRequest = - await this.partitionedRequestsStoreService.getLatestOkRequestWithResponseBody( + await this.partitionedRequestsStoreService.getLatestRequestWithOkStatus( lookupKey || url, + { + include304: true, + }, ); return this.partitionedRequestsStoreService.countFailedRequests( diff --git a/services/feed-requests/src/feed-fetcher/feed-fetcher.controller.ts b/services/feed-requests/src/feed-fetcher/feed-fetcher.controller.ts index 2320ba882..f7270a116 100644 --- a/services/feed-requests/src/feed-fetcher/feed-fetcher.controller.ts +++ b/services/feed-requests/src/feed-fetcher/feed-fetcher.controller.ts @@ -227,6 +227,7 @@ export class FeedFetcherController { if ( data.hashToCompare && + latestRequest.request.response?.textHash && data.hashToCompare === latestRequest.request.response?.textHash ) { return { diff --git a/services/feed-requests/src/feed-fetcher/feed-fetcher.service.ts b/services/feed-requests/src/feed-fetcher/feed-fetcher.service.ts index dcee7edd5..9756693ba 100644 --- a/services/feed-requests/src/feed-fetcher/feed-fetcher.service.ts +++ b/services/feed-requests/src/feed-fetcher/feed-fetcher.service.ts @@ -152,10 +152,16 @@ export class FeedFetcherService { return null; } - if (request.response?.redisCacheKey) { - const compressedText = await this.cacheStorageService.getFeedHtmlContent({ - key: request.response.redisCacheKey, - }); + if (request.response?.redisCacheKey || request.response?.content) { + let compressedText: string | null = null; + + if (request.response.content) { + compressedText = request.response.content; + } else if (request.response.redisCacheKey) { + compressedText = await this.cacheStorageService.getFeedHtmlContent({ + key: request.response.redisCacheKey, + }); + } const text = compressedText ? ( @@ -277,23 +283,14 @@ export class FeedFetcherService { } } - response.textHash = text - ? sha1.copy().update(text).digest('hex') - : ''; - - /** - * This has a problem where 304 responses will have no feed response, which won't refresh the - * previous cache contents for 200 codes - */ - const hashKey = + const key = url + JSON.stringify(request.fetchOptions) + res.status.toString(); - response.redisCacheKey = sha1.copy().update(hashKey).digest('hex'); + if (text.length) { + response.responseHashKey = sha1.copy().update(key).digest('hex'); - await this.cacheStorageService.setFeedHtmlContent({ - key: response.redisCacheKey, - body: compressedText, - }); + response.content = compressedText; + } } catch (err) { if (err instanceof FeedTooLargeException) { throw err; @@ -339,6 +336,13 @@ export class FeedFetcherService { s3ObjectKey: response.s3ObjectKey || null, redisCacheKey: response.redisCacheKey || null, headers: response.headers, + body: + response.responseHashKey && response.content + ? { + hashKey: response.responseHashKey, + contents: response.content, + } + : null, }, }; diff --git a/services/feed-requests/src/partitioned-requests-store/partitioned-requests-store.service.ts b/services/feed-requests/src/partitioned-requests-store/partitioned-requests-store.service.ts index 40bf1ff23..96516c3e8 100644 --- a/services/feed-requests/src/partitioned-requests-store/partitioned-requests-store.service.ts +++ b/services/feed-requests/src/partitioned-requests-store/partitioned-requests-store.service.ts @@ -32,7 +32,7 @@ export default class PartitionedRequestsStoreService { try { await Promise.all( - this.pendingInserts.map((responseInsert) => { + this.pendingInserts.map(async (responseInsert) => { const urlHash = sha1.copy().update(responseInsert.url).digest('hex'); let hostHash: string | null = null; @@ -46,6 +46,35 @@ export default class PartitionedRequestsStoreService { }); } + if (responseInsert.response?.body) { + const contentHash = sha1 + .copy() + .update(responseInsert.response.body.contents) + .digest('hex'); + + // does this content already exist in the db? + const results = await em.execute( + `SELECT 1 FROM response_bodies WHERE hash_key = ? AND content_hash = ?`, + [responseInsert.response.body.hashKey, contentHash], + ); + + if (results.length === 0) { + await em.execute( + `INSERT INTO response_bodies (content, content_hash, hash_key) VALUES (?, ?, ?) + ON CONFLICT (hash_key) DO UPDATE SET content = ?, content_hash = ? + `, + [ + responseInsert.response.body.contents, + contentHash, + responseInsert.response.body.hashKey, + responseInsert.response.body.contents, + contentHash, + ], + transaction, + ); + } + } + return em.execute( `INSERT INTO request_partitioned ( id, @@ -61,11 +90,12 @@ export default class PartitionedRequestsStoreService { error_message, response_status_code, response_text_hash, + response_body_hash_key, response_s3object_key, response_redis_cache_key, response_headers ) VALUES ( - ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? + ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )`, [ randomUUID(), @@ -80,7 +110,8 @@ export default class PartitionedRequestsStoreService { responseInsert.nextRetryDate, responseInsert.errorMessage, responseInsert.response?.statusCode, - responseInsert.response?.textHash, + null, + responseInsert.response?.body?.hashKey, responseInsert.response?.s3ObjectKey, responseInsert.response?.redisCacheKey, this.stringifyJson(responseInsert.response?.headers), @@ -118,9 +149,10 @@ export default class PartitionedRequestsStoreService { return new Date(result.next_retry_date) || null; } - async getLatestOkRequestWithResponseBody( + async getLatestRequestWithOkStatus( lookupKey: string, - opts?: { + opts: { + include304?: boolean; fields?: Array<'response_headers'>; }, ): Promise