Skip to content

Commit

Permalink
remove the pages backfill and leverage an index in the update
Browse files Browse the repository at this point in the history
  • Loading branch information
aubin-tchoi committed Jan 16, 2025
1 parent b17527b commit af092fa
Showing 1 changed file with 70 additions and 67 deletions.
137 changes: 70 additions & 67 deletions front/migrations/20250115_backfill_webcrawler_source_url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,15 @@ import {
getConnectorsReplicaDbConnection,
getCorePrimaryDbConnection,
} from "@app/lib/production_checks/utils";
import { DataSourceModel } from "@app/lib/resources/storage/models/data_source";
import type Logger from "@app/logger/logger";
import { makeScript } from "@app/scripts/helpers";

const BATCH_SIZE = 1024;

async function updateNodes(
coreSequelize: Sequelize,
nodeIds: string[],
urls: string[]
) {
await coreSequelize.query(
// No possible mismatch even though some pages are upserted in connectors' db but not as document
// - unnest preserves array order and creates parallel tuples,
`UPDATE data_sources_nodes
SET source_url = urls.url
FROM (SELECT unnest(ARRAY [:nodeIds]::text[]) as node_id,
unnest(ARRAY [:urls]::text[]) as url) urls
WHERE data_sources_nodes.node_id = urls.node_id;`,
{ replacements: { urls, nodeIds } }
);
}

// see the call to upsertDataSourceFolder in webcrawler/temporal/activities.ts
async function backfillFolders(
frontDataSource: DataSourceModel,
coreSequelize: Sequelize,
connectorsSequelize: Sequelize,
execute: boolean,
Expand All @@ -44,10 +29,15 @@ async function backfillFolders(
`SELECT id, "internalId", "url"
FROM webcrawler_folders
WHERE id > :lastId
AND "connectorId" = :connectorId -- does not leverage any index, we'll see if too slow or not
ORDER BY id
LIMIT :batchSize;`,
{
replacements: { batchSize: BATCH_SIZE, lastId },
replacements: {
batchSize: BATCH_SIZE,
lastId,
connectorId: frontDataSource.connectorId,
},
type: QueryTypes.SELECT,
}
);
Expand All @@ -57,11 +47,42 @@ async function backfillFolders(
}

logger.info({ row: rows[0] }, "Sample row.");
const urls = rows.map((row) => row.url);
const nodeIds = rows.map((row) => row.internalId);

if (execute) {
await updateNodes(
coreSequelize,
rows.map((row) => row.internalId),
rows.map((row) => row.url)
const coreDataSourceIds: { id: number }[] = await coreSequelize.query(
`SELECT id
FROM data_sources
WHERE project = :projectId
AND data_source_id = :dataSourceId;`,
{
replacements: {
dataSourceId: frontDataSource.dustAPIDataSourceId,
projectId: frontDataSource.dustAPIProjectId,
nodeIds,
},
type: QueryTypes.SELECT,
}
);
const coreDataSourceId = coreDataSourceIds[0].id;
if (!coreDataSourceId) {
logger.error(
"No core data source found for the given front data source."
);
return;
}

await coreSequelize.query(
// No possible mismatch even though some pages are upserted in connectors' db but not as document
// - unnest preserves array order and creates parallel tuples,
`UPDATE data_sources_nodes
SET source_url = urls.url
FROM (SELECT unnest(ARRAY [:nodeIds]::text[]) as node_id,
unnest(ARRAY [:urls]::text[]) as url) urls
WHERE data_sources_nodes.node_id = urls.node_id
AND data_source = :dataSourceId;`,
{ replacements: { urls, nodeIds, dataSourceId: coreDataSourceId } }
);
logger.info(
`Updated ${rows.length} folders from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
Expand All @@ -76,59 +97,41 @@ async function backfillFolders(
} while (rows.length === BATCH_SIZE);
}

// see the call to upsertDataSourceDocument in webcrawler/temporal/activities.ts
async function backfillPages(
async function backfillDataSource(
frontDataSource: DataSourceModel,
coreSequelize: Sequelize,
connectorsSequelize: Sequelize,
execute: boolean,
logger: typeof Logger
) {
logger.info("Processing pages");

let lastId = 0;
let rows: { id: number; documentId: string; url: string }[] = [];

do {
rows = await connectorsSequelize.query(
`SELECT id, "documentId", "url"
FROM webcrawler_pages
WHERE id > :lastId
ORDER BY id
LIMIT :batchSize;`,
{
replacements: { batchSize: BATCH_SIZE, lastId },
type: QueryTypes.SELECT,
}
);

if (rows.length === 0) {
break;
}

logger.info({ row: rows[0] }, "Sample row.");
if (execute) {
await updateNodes(
coreSequelize,
rows.map((row) => row.documentId),
rows.map((row) => row.url)
);
logger.info(
`Updated ${rows.length} pages from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
} else {
logger.info(
`Would update ${rows.length} pages from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
}

lastId = rows[rows.length - 1].id;
} while (rows.length === BATCH_SIZE);
logger.info("Processing data source");

await backfillFolders(
frontDataSource,
coreSequelize,
connectorsSequelize,
execute,
logger
);
}

makeScript({}, async ({ execute }, logger) => {
const coreSequelize = getCorePrimaryDbConnection();
const connectorsSequelize = getConnectorsReplicaDbConnection();

await backfillFolders(coreSequelize, connectorsSequelize, execute, logger);
await backfillPages(coreSequelize, connectorsSequelize, execute, logger);
const frontDataSources = await DataSourceModel.findAll({
where: { connectorProvider: "webcrawler" },
});
logger.info(`Found ${frontDataSources.length} Webcrawler data sources`);
for (const frontDataSource of frontDataSources) {
await backfillDataSource(
frontDataSource,
coreSequelize,
connectorsSequelize,
execute,
logger.child({
dataSourceId: frontDataSource.id,
connectorId: frontDataSource.connectorId,
})
);
}
});

0 comments on commit af092fa

Please sign in to comment.