From a45e8774b229f2b2892e1b5a528409e8ed07814a Mon Sep 17 00:00:00 2001 From: gintil Date: Mon, 9 Dec 2024 17:16:56 -0500 Subject: [PATCH] Fix temp table not being used in transaction --- ...tioned-feed-article-field-store.service.ts | 46 +++++++++++-------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/services/user-feeds/src/articles/partitioned-feed-article-field-store.service.ts b/services/user-feeds/src/articles/partitioned-feed-article-field-store.service.ts index 560891d38..0aecd359a 100644 --- a/services/user-feeds/src/articles/partitioned-feed-article-field-store.service.ts +++ b/services/user-feeds/src/articles/partitioned-feed-article-field-store.service.ts @@ -9,6 +9,7 @@ import dayjs from "dayjs"; import logger from "../shared/utils/logger"; import PartitionedFeedArticleFieldInsert from "./types/pending-feed-article-field-insert.types"; import { AsyncLocalStorage } from "node:async_hooks"; +import { PostgreSqlDriver } from "@mikro-orm/postgresql"; interface AsyncStore { toInsert: PartitionedFeedArticleFieldInsert[]; @@ -21,7 +22,7 @@ export class PartitionedFeedArticleFieldStoreService { connection: Connection; TABLE_NAME = "feed_article_field_partitioned"; - constructor(private readonly orm: MikroORM) { + constructor(private readonly orm: MikroORM) { this.connection = this.orm.em.getConnection(); } @@ -127,25 +128,30 @@ export class PartitionedFeedArticleFieldStoreService { [oneMonthAgo, feedId, ...ids] ); } else { - const temporaryTableName = `current_article_ids_${feedId}`; - const sql = - `CREATE TEMP TABLE ${temporaryTableName} AS` + - ` SELECT * FROM (VALUES ${ids.map(() => "(?)").join(", ")}) AS t(id)`; - - await this.connection.execute(sql, ids); - - const result = await this.connection.execute( - `SELECT field_hashed_value` + - ` FROM ${this.TABLE_NAME}` + - ` INNER JOIN ${temporaryTableName} t ON (field_hashed_value = t.id)` + - ` WHERE ${ - olderThanOneMonth ? `created_at <= ?` : `created_at > ?` - } AND feed_id = ? AND field_name = 'id'` + - ``, - [oneMonthAgo, feedId] - ); - - await this.connection.execute(`DROP TABLE ${temporaryTableName}`); + const result = await this.orm.em.transactional(async (em) => { + const temporaryTableName = `current_article_ids_${feedId}`; + const sql = + `CREATE TEMP TABLE ${temporaryTableName} AS` + + ` SELECT * FROM (VALUES ${ids.map(() => "(?)").join(", ")}) AS t(id)`; + + await em.execute(sql, ids); + + const result = await em.execute( + `SELECT field_hashed_value` + + ` FROM ${this.TABLE_NAME}` + + ` INNER JOIN ${temporaryTableName} t ON (field_hashed_value = t.id)` + + ` WHERE ${ + olderThanOneMonth ? `created_at <= ?` : `created_at > ?` + } AND feed_id = ? AND field_name = 'id'` + + ``, + [oneMonthAgo, feedId] + ); + + await em.execute(`DROP TABLE ${temporaryTableName}`); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return result as any; + }); return result; }