Skip to content

Commit

Permalink
[connectors] Backfill microsoft folders (#9424)
Browse files Browse the repository at this point in the history
* Backfill ms folders

* include drives
  • Loading branch information
tdraier authored Dec 17, 2024
1 parent 099bc11 commit cdac695
Showing 1 changed file with 162 additions and 0 deletions.
162 changes: 162 additions & 0 deletions connectors/migrations/20241216_backfill_ms_folders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import { concurrentExecutor } from "@dust-tt/types";
import { makeScript } from "scripts/helpers";
import { Op } from "sequelize";

import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
import { getFolderNode, upsertFolderNode } from "@connectors/lib/data_sources";
import {} from "@connectors/lib/models/google_drive";
import { MicrosoftNodeModel } from "@connectors/lib/models/microsoft";
import type { Logger } from "@connectors/logger/logger";
import logger from "@connectors/logger/logger";
import { ConnectorModel } from "@connectors/resources/storage/models/connector_model";

const QUERY_BATCH_SIZE = 1024;

function getParents(
fileId: string | null,
parentsMap: Record<string, string | null>,
logger: Logger
) {
const parents = [];
let current: string | null = fileId;
while (current) {
parents.push(current);
if (typeof parentsMap[current] === "undefined") {
logger.error({ fileId: current }, "Parent not found");
return null;
}
current = parentsMap[current] || null;
}
return parents;
}

async function backfillFolder({
connector,
execute,
}: {
connector: ConnectorModel;
execute: boolean;
}) {
const childLogger = logger.child({ connectorId: connector.id });
childLogger.info(`Processing connector ${connector.id}...`);

const dataSourceConfig = dataSourceConfigFromConnector(connector);
const parentsMap: Record<string, string | null> = {};
let nextId: number | undefined = 0;
do {
const msFolders: MicrosoftNodeModel[] = await MicrosoftNodeModel.findAll({
where: {
connectorId: connector.id,
id: {
[Op.gt]: nextId,
},
nodeType: { [Op.or]: ["folder", "drive"] },
},
});

msFolders.forEach((file) => {
parentsMap[file.internalId] = file.parentInternalId;
});

nextId = msFolders[msFolders.length - 1]?.id;
} while (nextId);

nextId = 0;
do {
const msFolders: MicrosoftNodeModel[] = await MicrosoftNodeModel.findAll({
where: {
connectorId: connector.id,
id: {
[Op.gt]: nextId,
},
nodeType: { [Op.or]: ["folder", "drive"] },
},
order: [["id", "ASC"]],
limit: QUERY_BATCH_SIZE,
});

await concurrentExecutor(
msFolders,
async (file) => {
const internalId = file.internalId;
const parents = getParents(
file.parentInternalId,
parentsMap,
childLogger.child({ nodeId: internalId })
);
if (!parents) {
return;
}
parents.unshift(internalId);

const folder = await getFolderNode({
dataSourceConfig,
folderId: internalId,
});
if (!folder || folder.parents.join("/") !== parents.join("/")) {
childLogger.info(
{ folderId: file.internalId, parents },
"Upsert folder"
);

if (execute) {
// upsert repository as folder
await upsertFolderNode({
dataSourceConfig,
folderId: file.internalId,
parents,
parentId: file.parentInternalId,
title: file.name || "",
});
}
}
},
{ concurrency: 16 }
);

nextId = msFolders[msFolders.length - 1]?.id;
} while (nextId);
}

makeScript(
{
nextConnectorId: {
type: "number",
required: false,
default: 0,
},
connectorId: {
type: "number",
required: false,
default: 0,
},
},
async ({ nextConnectorId, connectorId, execute }) => {
if (connectorId) {
const connector = await ConnectorModel.findByPk(connectorId);
if (connector) {
await backfillFolder({
connector,
execute,
});
}
} else {
const connectors = await ConnectorModel.findAll({
where: {
type: "microsoft",
id: {
[Op.gt]: nextConnectorId,
},
},
order: [["id", "ASC"]],
});

for (const connector of connectors) {
await backfillFolder({
connector,
execute,
});
}
}
}
);

0 comments on commit cdac695

Please sign in to comment.