Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up sourceUrl backfill scripts by leveraging indexes #10039

Merged
merged 5 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 67 additions & 67 deletions front/migrations/20250115_backfill_webcrawler_source_url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,41 @@ import {
getConnectorsReplicaDbConnection,
getCorePrimaryDbConnection,
} from "@app/lib/production_checks/utils";
import { DataSourceModel } from "@app/lib/resources/storage/models/data_source";
import type Logger from "@app/logger/logger";
import { makeScript } from "@app/scripts/helpers";

const BATCH_SIZE = 1024;

async function updateNodes(
coreSequelize: Sequelize,
nodeIds: string[],
urls: string[]
) {
await coreSequelize.query(
// No possible mismatch even though some pages are upserted in connectors' db but not as document
// - unnest preserves array order and creates parallel tuples,
`UPDATE data_sources_nodes
SET source_url = urls.url
FROM (SELECT unnest(ARRAY [:nodeIds]::text[]) as node_id,
unnest(ARRAY [:urls]::text[]) as url) urls
WHERE data_sources_nodes.node_id = urls.node_id;`,
{ replacements: { urls, nodeIds } }
);
}

// see the call to upsertDataSourceFolder in webcrawler/temporal/activities.ts
async function backfillFolders(
frontDataSource: DataSourceModel,
coreSequelize: Sequelize,
connectorsSequelize: Sequelize,
execute: boolean,
logger: typeof Logger
) {
logger.info("Processing folders");

const coreDataSourceIds: { id: number }[] = await coreSequelize.query(
`SELECT id
FROM data_sources
WHERE project = :projectId
AND data_source_id = :dataSourceId;`,
{
replacements: {
dataSourceId: frontDataSource.dustAPIDataSourceId,
projectId: frontDataSource.dustAPIProjectId,
},
type: QueryTypes.SELECT,
}
);
const coreDataSourceId = coreDataSourceIds[0].id;
if (!coreDataSourceId) {
logger.error("No core data source found for the given front data source.");
return;
}

let lastId = 0;
let rows: { id: number; internalId: string; url: string }[] = [];

Expand All @@ -44,10 +48,15 @@ async function backfillFolders(
`SELECT id, "internalId", "url"
FROM webcrawler_folders
WHERE id > :lastId
AND "connectorId" = :connectorId -- does not leverage any index, we'll see if too slow or not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep likely not used but still good to have there

ORDER BY id
LIMIT :batchSize;`,
{
replacements: { batchSize: BATCH_SIZE, lastId },
replacements: {
batchSize: BATCH_SIZE,
lastId,
connectorId: frontDataSource.connectorId,
},
type: QueryTypes.SELECT,
}
);
Expand All @@ -57,11 +66,20 @@ async function backfillFolders(
}

logger.info({ row: rows[0] }, "Sample row.");
const urls = rows.map((row) => row.url);
const nodeIds = rows.map((row) => row.internalId);

if (execute) {
await updateNodes(
coreSequelize,
rows.map((row) => row.internalId),
rows.map((row) => row.url)
await coreSequelize.query(
// No possible mismatch even though some pages are upserted in connectors' db but not as document
// - unnest preserves array order and creates parallel tuples,
`UPDATE data_sources_nodes
SET source_url = urls.url
FROM (SELECT unnest(ARRAY [:nodeIds]::text[]) as node_id,
unnest(ARRAY [:urls]::text[]) as url) urls
WHERE data_sources_nodes.data_source = :dataSourceId
AND data_sources_nodes.node_id = urls.node_id;`,
{ replacements: { urls, nodeIds, dataSourceId: coreDataSourceId } }
);
logger.info(
`Updated ${rows.length} folders from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
Expand All @@ -76,59 +94,41 @@ async function backfillFolders(
} while (rows.length === BATCH_SIZE);
}

// see the call to upsertDataSourceDocument in webcrawler/temporal/activities.ts
async function backfillPages(
async function backfillDataSource(
frontDataSource: DataSourceModel,
coreSequelize: Sequelize,
connectorsSequelize: Sequelize,
execute: boolean,
logger: typeof Logger
) {
logger.info("Processing pages");

let lastId = 0;
let rows: { id: number; documentId: string; url: string }[] = [];

do {
rows = await connectorsSequelize.query(
`SELECT id, "documentId", "url"
FROM webcrawler_pages
WHERE id > :lastId
ORDER BY id
LIMIT :batchSize;`,
{
replacements: { batchSize: BATCH_SIZE, lastId },
type: QueryTypes.SELECT,
}
);

if (rows.length === 0) {
break;
}

logger.info({ row: rows[0] }, "Sample row.");
if (execute) {
await updateNodes(
coreSequelize,
rows.map((row) => row.documentId),
rows.map((row) => row.url)
);
logger.info(
`Updated ${rows.length} pages from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
} else {
logger.info(
`Would update ${rows.length} pages from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
}

lastId = rows[rows.length - 1].id;
} while (rows.length === BATCH_SIZE);
logger.info("Processing data source");

await backfillFolders(
frontDataSource,
coreSequelize,
connectorsSequelize,
execute,
logger
);
}

makeScript({}, async ({ execute }, logger) => {
const coreSequelize = getCorePrimaryDbConnection();
const connectorsSequelize = getConnectorsReplicaDbConnection();

await backfillFolders(coreSequelize, connectorsSequelize, execute, logger);
await backfillPages(coreSequelize, connectorsSequelize, execute, logger);
const frontDataSources = await DataSourceModel.findAll({
where: { connectorProvider: "webcrawler" },
});
logger.info(`Found ${frontDataSources.length} Webcrawler data sources`);
for (const frontDataSource of frontDataSources) {
await backfillDataSource(
frontDataSource,
coreSequelize,
connectorsSequelize,
execute,
logger.child({
dataSourceId: frontDataSource.id,
connectorId: frontDataSource.connectorId,
})
);
}
});
149 changes: 68 additions & 81 deletions front/migrations/20250116_backfill_notion_source_url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,79 +7,38 @@ import {
} from "@app/lib/production_checks/utils";
import type Logger from "@app/logger/logger";
import { makeScript } from "@app/scripts/helpers";
import { DataSourceModel } from "@app/lib/resources/storage/models/data_source";

const BATCH_SIZE = 1024;

async function updateNodes(
coreSequelize: Sequelize,
nodeIds: string[],
urls: string[]
) {
await coreSequelize.query(
`UPDATE data_sources_nodes
SET source_url = urls.url
FROM (SELECT unnest(ARRAY [:nodeIds]::text[]) as node_id,
unnest(ARRAY [:urls]::text[]) as url) urls
WHERE data_sources_nodes.node_id = urls.node_id;`,
{ replacements: { urls, nodeIds } }
);
}

async function backfillPages(
coreSequelize: Sequelize,
connectorsSequelize: Sequelize,
execute: boolean,
logger: typeof Logger
) {
logger.info("Processing pages");

let lastId = 0;
let rows: { id: number; notionPageId: string; notionUrl: string }[] = [];

do {
rows = await connectorsSequelize.query(
`SELECT id, "notionPageId", "notionUrl"
FROM notion_pages
WHERE id > :lastId
ORDER BY id
LIMIT :batchSize;`,
{
replacements: { batchSize: BATCH_SIZE, lastId },
type: QueryTypes.SELECT,
}
);

const urls = rows.map((row) => row.notionUrl);
// taken from connectors/migrations/20241030_fix_notion_parents.ts
const nodeIds = rows.map((row) => `notion-${row.notionPageId}`);

if (rows.length === 0) {
break;
}

if (execute) {
await updateNodes(coreSequelize, nodeIds, urls);
logger.info(
`Updated ${rows.length} pages from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
} else {
logger.info(
`Would update ${rows.length} pages from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
}

lastId = rows[rows.length - 1].id;
} while (rows.length === BATCH_SIZE);
}

async function backfillDatabases(
frontDataSource: DataSourceModel,
coreSequelize: Sequelize,
connectorsSequelize: Sequelize,
execute: boolean,
logger: typeof Logger
) {
logger.info("Processing databases");

const coreDataSourceIds: { id: number }[] = await coreSequelize.query(
`SELECT id
FROM data_sources
WHERE project = :projectId
AND data_source_id = :dataSourceId;`,
{
replacements: {
dataSourceId: frontDataSource.dustAPIDataSourceId,
projectId: frontDataSource.dustAPIProjectId,
},
type: QueryTypes.SELECT,
}
);
const coreDataSourceId = coreDataSourceIds[0].id;
if (!coreDataSourceId) {
logger.error("No core data source found for the given front data source.");
return;
}

let lastId = 0;
let rows: {
id: number;
Expand All @@ -102,12 +61,8 @@ async function backfillDatabases(
);

// taken from connectors/migrations/20241030_fix_notion_parents.ts
// for each database, we upsert documents with an id starting in `notion-database-` +
// for each database, we upsert documents (not backfilled here) with an id starting in `notion-database-` +
// if structuredDataEnabled we also upsert a table with an id starting in `notion-`
const documentUrls = rows.map((row) => row.notionUrl);
const documentNodeIds = rows.map(
(row) => `notion-database-${row.notionDatabaseId}`
);
const tableRows = rows.filter(
(row) => row.structuredDataUpsertedTs !== null
);
Expand All @@ -120,20 +75,23 @@ async function backfillDatabases(
break;
}

if (execute) {
await updateNodes(coreSequelize, documentNodeIds, documentUrls);
logger.info(
`Updated ${rows.length} databases (documents) from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
} else {
logger.info(
`Would update ${rows.length} databases (documents) from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
}

if (tableRows.length > 0) {
if (execute) {
await updateNodes(coreSequelize, tableNodeIds, tableUrls);
await coreSequelize.query(
`UPDATE data_sources_nodes
SET source_url = urls.url
FROM (SELECT unnest(ARRAY [:nodeIds]::text[]) as node_id,
unnest(ARRAY [:urls]::text[]) as url) urls
WHERE data_sources_nodes.data_source = :dataSourceId
AND data_sources_nodes.node_id = urls.node_id;`,
{
replacements: {
urls: tableUrls,
nodeIds: tableNodeIds,
dataSourceId: coreDataSourceId,
},
}
);
logger.info(
`Updated ${tableRows.length} databases (tables) from id ${tableRows[0].id} to id ${tableRows[tableRows.length - 1].id}.`
);
Expand All @@ -148,10 +106,39 @@ async function backfillDatabases(
} while (rows.length === BATCH_SIZE);
}

async function backfillDataSource(
frontDataSource: DataSourceModel,
coreSequelize: Sequelize,
connectorsSequelize: Sequelize,
execute: boolean,
logger: typeof Logger
) {
logger.info("Processing data source");

await backfillDatabases(
frontDataSource,
coreSequelize,
connectorsSequelize,
execute,
logger
);
}

makeScript({}, async ({ execute }, logger) => {
const coreSequelize = getCorePrimaryDbConnection();
const connectorsSequelize = getConnectorsReplicaDbConnection();

await backfillPages(coreSequelize, connectorsSequelize, execute, logger);
await backfillDatabases(coreSequelize, connectorsSequelize, execute, logger);
const frontDataSources = await DataSourceModel.findAll({
where: { connectorProvider: "notion" },
});
logger.info(`Found ${frontDataSources.length} Notion data sources`);
for (const frontDataSource of frontDataSources) {
await backfillDataSource(
frontDataSource,
coreSequelize,
connectorsSequelize,
execute,
logger
);
}
});
Loading