From af092fa6b9e1c8b149cc5ad21051f09ad06fdca8 Mon Sep 17 00:00:00 2001 From: Aubin Date: Thu, 16 Jan 2025 18:36:41 +0100 Subject: [PATCH] remove the pages backfill and leverage an index in the update --- ...20250115_backfill_webcrawler_source_url.ts | 137 +++++++++--------- 1 file changed, 70 insertions(+), 67 deletions(-) diff --git a/front/migrations/20250115_backfill_webcrawler_source_url.ts b/front/migrations/20250115_backfill_webcrawler_source_url.ts index d57c0f66e6ce..5324c51f2735 100644 --- a/front/migrations/20250115_backfill_webcrawler_source_url.ts +++ b/front/migrations/20250115_backfill_webcrawler_source_url.ts @@ -5,30 +5,15 @@ import { getConnectorsReplicaDbConnection, getCorePrimaryDbConnection, } from "@app/lib/production_checks/utils"; +import { DataSourceModel } from "@app/lib/resources/storage/models/data_source"; import type Logger from "@app/logger/logger"; import { makeScript } from "@app/scripts/helpers"; const BATCH_SIZE = 1024; -async function updateNodes( - coreSequelize: Sequelize, - nodeIds: string[], - urls: string[] -) { - await coreSequelize.query( - // No possible mismatch even though some pages are upserted in connectors' db but not as document - // - unnest preserves array order and creates parallel tuples, - `UPDATE data_sources_nodes - SET source_url = urls.url - FROM (SELECT unnest(ARRAY [:nodeIds]::text[]) as node_id, - unnest(ARRAY [:urls]::text[]) as url) urls - WHERE data_sources_nodes.node_id = urls.node_id;`, - { replacements: { urls, nodeIds } } - ); -} - // see the call to upsertDataSourceFolder in webcrawler/temporal/activities.ts async function backfillFolders( + frontDataSource: DataSourceModel, coreSequelize: Sequelize, connectorsSequelize: Sequelize, execute: boolean, @@ -44,10 +29,15 @@ async function backfillFolders( `SELECT id, "internalId", "url" FROM webcrawler_folders WHERE id > :lastId + AND "connectorId" = :connectorId -- does not leverage any index, we'll see if too slow or not ORDER BY id LIMIT :batchSize;`, { - replacements: { batchSize: BATCH_SIZE, lastId }, + replacements: { + batchSize: BATCH_SIZE, + lastId, + connectorId: frontDataSource.connectorId, + }, type: QueryTypes.SELECT, } ); @@ -57,11 +47,42 @@ async function backfillFolders( } logger.info({ row: rows[0] }, "Sample row."); + const urls = rows.map((row) => row.url); + const nodeIds = rows.map((row) => row.internalId); + if (execute) { - await updateNodes( - coreSequelize, - rows.map((row) => row.internalId), - rows.map((row) => row.url) + const coreDataSourceIds: { id: number }[] = await coreSequelize.query( + `SELECT id + FROM data_sources + WHERE project = :projectId + AND data_source_id = :dataSourceId;`, + { + replacements: { + dataSourceId: frontDataSource.dustAPIDataSourceId, + projectId: frontDataSource.dustAPIProjectId, + nodeIds, + }, + type: QueryTypes.SELECT, + } + ); + const coreDataSourceId = coreDataSourceIds[0].id; + if (!coreDataSourceId) { + logger.error( + "No core data source found for the given front data source." + ); + return; + } + + await coreSequelize.query( + // No possible mismatch even though some pages are upserted in connectors' db but not as document + // - unnest preserves array order and creates parallel tuples, + `UPDATE data_sources_nodes + SET source_url = urls.url + FROM (SELECT unnest(ARRAY [:nodeIds]::text[]) as node_id, + unnest(ARRAY [:urls]::text[]) as url) urls + WHERE data_sources_nodes.node_id = urls.node_id + AND data_source = :dataSourceId;`, + { replacements: { urls, nodeIds, dataSourceId: coreDataSourceId } } ); logger.info( `Updated ${rows.length} folders from id ${rows[0].id} to id ${rows[rows.length - 1].id}.` @@ -76,59 +97,41 @@ async function backfillFolders( } while (rows.length === BATCH_SIZE); } -// see the call to upsertDataSourceDocument in webcrawler/temporal/activities.ts -async function backfillPages( +async function backfillDataSource( + frontDataSource: DataSourceModel, coreSequelize: Sequelize, connectorsSequelize: Sequelize, execute: boolean, logger: typeof Logger ) { - logger.info("Processing pages"); - - let lastId = 0; - let rows: { id: number; documentId: string; url: string }[] = []; - - do { - rows = await connectorsSequelize.query( - `SELECT id, "documentId", "url" - FROM webcrawler_pages - WHERE id > :lastId - ORDER BY id - LIMIT :batchSize;`, - { - replacements: { batchSize: BATCH_SIZE, lastId }, - type: QueryTypes.SELECT, - } - ); - - if (rows.length === 0) { - break; - } - - logger.info({ row: rows[0] }, "Sample row."); - if (execute) { - await updateNodes( - coreSequelize, - rows.map((row) => row.documentId), - rows.map((row) => row.url) - ); - logger.info( - `Updated ${rows.length} pages from id ${rows[0].id} to id ${rows[rows.length - 1].id}.` - ); - } else { - logger.info( - `Would update ${rows.length} pages from id ${rows[0].id} to id ${rows[rows.length - 1].id}.` - ); - } - - lastId = rows[rows.length - 1].id; - } while (rows.length === BATCH_SIZE); + logger.info("Processing data source"); + + await backfillFolders( + frontDataSource, + coreSequelize, + connectorsSequelize, + execute, + logger + ); } makeScript({}, async ({ execute }, logger) => { const coreSequelize = getCorePrimaryDbConnection(); const connectorsSequelize = getConnectorsReplicaDbConnection(); - - await backfillFolders(coreSequelize, connectorsSequelize, execute, logger); - await backfillPages(coreSequelize, connectorsSequelize, execute, logger); + const frontDataSources = await DataSourceModel.findAll({ + where: { connectorProvider: "webcrawler" }, + }); + logger.info(`Found ${frontDataSources.length} Webcrawler data sources`); + for (const frontDataSource of frontDataSources) { + await backfillDataSource( + frontDataSource, + coreSequelize, + connectorsSequelize, + execute, + logger.child({ + dataSourceId: frontDataSource.id, + connectorId: frontDataSource.connectorId, + }) + ); + } });