Skip to content

Commit

Permalink
[Nodes Core] Fill & backfill source_url for google drive (#10029)
Browse files Browse the repository at this point in the history
* spreadsheet url fn

* add source url where needed

* backfill

* clean

* remove gdrive sharedwme url

* clean

* naming

* fix sheet backfill
  • Loading branch information
philipperolet authored Jan 16, 2025
1 parent 04ec35f commit 243ac24
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 9 deletions.
25 changes: 20 additions & 5 deletions connectors/src/connectors/google_drive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import {
isGoogleDriveFolder,
isGoogleDriveSpreadSheetFile,
} from "@connectors/connectors/google_drive/temporal/mime_types";
import type { Sheet } from "@connectors/connectors/google_drive/temporal/spreadsheets";
import {
driveObjectToDustType,
getAuthObject,
Expand Down Expand Up @@ -67,6 +68,7 @@ import { terminateAllWorkflowsForConnectorId } from "@connectors/lib/temporal";
import logger from "@connectors/logger/logger";
import { ConnectorResource } from "@connectors/resources/connector_resource";
import type { DataSourceConfig } from "@connectors/types/data_source_config.js";
import type { GoogleDriveObjectType } from "@connectors/types/google_drive";
import { FILE_ATTRIBUTES_TO_FETCH } from "@connectors/types/google_drive";

export class GoogleDriveConnectorManager extends BaseConnectorManager<null> {
Expand Down Expand Up @@ -706,7 +708,7 @@ export class GoogleDriveConnectorManager extends BaseConnectorManager<null> {
type: "database",
title: s.name || "",
lastUpdatedAt: s.updatedAt.getTime() || null,
sourceUrl: `https://docs.google.com/spreadsheets/d/${s.driveFileId}/edit#gid=${s.driveSheetId}`,
sourceUrl: getSourceUrlForGoogleDriveSheet(s),
expandable: false,
permission: "read",
}));
Expand Down Expand Up @@ -976,12 +978,25 @@ async function getFoldersAsContentNodes({
);
}

function getSourceUrlForGoogleDriveFiles(f: GoogleDriveFiles): string {
export function getSourceUrlForGoogleDriveFiles(
f: GoogleDriveFiles | GoogleDriveObjectType
): string {
const driveFileId = f instanceof GoogleDriveFiles ? f.driveFileId : f.id;

if (isGoogleDriveSpreadSheetFile(f)) {
return `https://docs.google.com/spreadsheets/d/${f.driveFileId}/edit`;
return `https://docs.google.com/spreadsheets/d/${driveFileId}/edit`;
} else if (isGoogleDriveFolder(f)) {
return `https://drive.google.com/drive/folders/${f.driveFileId}`;
return `https://drive.google.com/drive/folders/${driveFileId}`;
}

return `https://drive.google.com/file/d/${f.driveFileId}/view`;
return `https://drive.google.com/file/d/${driveFileId}/view`;
}

export function getSourceUrlForGoogleDriveSheet(
s: GoogleDriveSheet | Sheet
): string {
const driveFileId =
s instanceof GoogleDriveSheet ? s.driveFileId : s.spreadsheet.id;
const driveSheetId = s instanceof GoogleDriveSheet ? s.driveSheetId : s.id;
return `https://docs.google.com/spreadsheets/d/${driveFileId}/edit#gid=${driveSheetId}`;
}
3 changes: 3 additions & 0 deletions connectors/src/connectors/google_drive/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import StatsD from "hot-shots";
import PQueue from "p-queue";
import { Op } from "sequelize";

import { getSourceUrlForGoogleDriveFiles } from "@connectors/connectors/google_drive";
import {
GOOGLE_DRIVE_SHARED_WITH_ME_VIRTUAL_ID,
GOOGLE_DRIVE_USER_SPACE_VIRTUAL_DRIVE_ID,
Expand Down Expand Up @@ -516,6 +517,7 @@ export async function incrementalSync(
parentId: parents[1] || null,
title: driveFile.name ?? "",
mimeType: MIME_TYPES.GOOGLE_DRIVE.FOLDER,
sourceUrl: getSourceUrlForGoogleDriveFiles(driveFile),
});

await GoogleDriveFiles.upsert({
Expand Down Expand Up @@ -861,6 +863,7 @@ export async function markFolderAsVisited(
parentId: parents[1] || null,
title: file.name ?? "",
mimeType: MIME_TYPES.GOOGLE_DRIVE.FOLDER,
sourceUrl: getSourceUrlForGoogleDriveFiles(file),
});

await GoogleDriveFiles.upsert({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import type { GoogleDriveFiles } from "@connectors/lib/models/google_drive";

export const MIME_TYPES_TO_EXPORT: { [key: string]: string } = {
"application/vnd.google-apps.document": "text/plain",
"application/vnd.google-apps.presentation": "text/plain",
Expand Down Expand Up @@ -48,7 +46,7 @@ export async function getMimeTypesToSync({
return mimeTypes;
}

export function isGoogleDriveFolder(file: GoogleDriveFiles) {
export function isGoogleDriveFolder(file: { mimeType: string }) {
return file.mimeType === "application/vnd.google-apps.folder";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { sheets_v4 } from "googleapis";
import { google } from "googleapis";
import type { OAuth2Client } from "googleapis-common";

import { getSourceUrlForGoogleDriveSheet } from "@connectors/connectors/google_drive";
import { getFileParentsMemoized } from "@connectors/connectors/google_drive/lib/hierarchy";
import { getInternalId } from "@connectors/connectors/google_drive/temporal/utils";
import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config";
Expand All @@ -30,7 +31,7 @@ import type { GoogleDriveObjectType } from "@connectors/types/google_drive";

const MAXIMUM_NUMBER_OF_GSHEET_ROWS = 50000;

type Sheet = sheets_v4.Schema$ValueRange & {
export type Sheet = sheets_v4.Schema$ValueRange & {
id: number;
spreadsheet: {
id: string;
Expand Down Expand Up @@ -87,6 +88,7 @@ async function upsertGdriveTable(
useAppForHeaderDetection: true,
title: `${spreadsheet.title} - ${title}`,
mimeType: "application/vnd.google-apps.spreadsheet",
sourceUrl: getSourceUrlForGoogleDriveSheet(sheet),
});

logger.info(loggerArgs, "[Spreadsheet] Table upserted.");
Expand Down
1 change: 1 addition & 0 deletions connectors/src/types/google_drive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type GoogleDriveObjectType = {
driveId: string;
isInSharedDrive: boolean;
};

export type GoogleDriveFolderType = {
id: string;
name: string;
Expand Down
236 changes: 236 additions & 0 deletions front/migrations/20250116_backfill_google_drive_source_url.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
import type { Sequelize } from "sequelize";
import { QueryTypes } from "sequelize";

import {
getConnectorsReplicaDbConnection,
getCorePrimaryDbConnection,
} from "@app/lib/production_checks/utils";
import { DataSourceModel } from "@app/lib/resources/storage/models/data_source";
import type Logger from "@app/logger/logger";
import { makeScript } from "@app/scripts/helpers";

const BATCH_SIZE = 128;

async function backfillDataSource(
frontDataSource: DataSourceModel,
coreSequelize: Sequelize,
connectorsSequelize: Sequelize,
execute: boolean,
logger: typeof Logger
) {
logger.info("Processing data source");

await backfillFolders(
frontDataSource,
coreSequelize,
connectorsSequelize,
execute,
logger.child({ type: "folders" })
);

await backfillSpreadsheets(
frontDataSource,
coreSequelize,
connectorsSequelize,
execute,
logger.child({ type: "spreadsheets" })
);
}

async function backfillSpreadsheets(
frontDataSource: DataSourceModel,
coreSequelize: Sequelize,
connectorsSequelize: Sequelize,
execute: boolean,
logger: typeof Logger
) {
logger.info("Processing spreadsheets");

// processing the spreadsheets chunk by chunk
let lastId = 0;
let rows: { id: number; driveFileId: string; driveSheetId: number }[] = [];

do {
// querying connectors for the next batch of spreadsheets

rows = await connectorsSequelize.query(
`SELECT id, "driveFileId", "driveSheetId"
FROM google_drive_sheets
WHERE id > :lastId
AND "connectorId" = :connectorId
ORDER BY id
LIMIT :batchSize;`,
{
replacements: {
batchSize: BATCH_SIZE,
lastId,
connectorId: frontDataSource.connectorId,
},
type: QueryTypes.SELECT,
}
);

if (rows.length === 0) {
break;
}
// reconstructing the URLs and node IDs
const urls = rows.map((row) =>
getSourceUrlForGoogleDriveSheet(row.driveFileId, row.driveSheetId)
);
const nodeIds = rows.map((row) =>
getGoogleSheetTableId(row.driveFileId, row.driveSheetId)
);

if (execute) {
// updating on core on the nodeIds
await coreSequelize.query(
`UPDATE data_sources_nodes
SET source_url = urls.url
FROM (SELECT unnest(ARRAY [:nodeIds]::text[]) as node_id,
unnest(ARRAY [:urls]::text[]) as url) urls
WHERE data_sources_nodes.node_id = urls.node_id;`,
{ replacements: { urls, nodeIds } }
);
logger.info(
`Updated ${rows.length} spreadsheets from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
} else {
logger.info(
`Would update ${rows.length} spreadsheets from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
}

lastId = rows[rows.length - 1].id;
} while (rows.length === BATCH_SIZE);
}

async function backfillFolders(
frontDataSource: DataSourceModel,
coreSequelize: Sequelize,
connectorsSequelize: Sequelize,
execute: boolean,
logger: typeof Logger
) {
logger.info("Processing folders");

// processing the folders chunk by chunk
let lastId = 0;
let rows: {
id: number;
driveFileId: string;
dustFileId: string;
mimeType: string;
}[] = [];

do {
// querying connectors for the next batch of folders

rows = await connectorsSequelize.query(
`SELECT id, "driveFileId", "dustFileId", "mimeType"
FROM google_drive_files
WHERE id > :lastId
AND "connectorId" = :connectorId
AND "mimeType" = 'application/vnd.google-apps.folder'
ORDER BY id
LIMIT :batchSize;`,
{
replacements: {
batchSize: BATCH_SIZE,
lastId,
connectorId: frontDataSource.connectorId,
},
type: QueryTypes.SELECT,
}
);

if (rows.length === 0) {
break;
}
// reconstructing the URLs and node IDs
const urls = rows.map((row) =>
getSourceUrlForGoogleDriveFiles(row.driveFileId, row.mimeType)
);
const nodeIds = rows.map((row) => row.dustFileId);

if (execute) {
// updating on core on the nodeIds
await coreSequelize.query(
`UPDATE data_sources_nodes
SET source_url = urls.url
FROM (SELECT unnest(ARRAY [:nodeIds]::text[]) as node_id,
unnest(ARRAY [:urls]::text[]) as url) urls
WHERE data_sources_nodes.node_id = urls.node_id;`,
{ replacements: { urls, nodeIds } }
);
logger.info(
`Updated ${rows.length} folders from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
} else {
logger.info(
`Would update ${rows.length} folders from id ${rows[0].id} to id ${rows[rows.length - 1].id}.`
);
}

lastId = rows[rows.length - 1].id;
} while (rows.length === BATCH_SIZE);
}

makeScript({}, async ({ execute }, logger) => {
const coreSequelize = getCorePrimaryDbConnection();
const connectorsSequelize = getConnectorsReplicaDbConnection();
const frontDataSources = await DataSourceModel.findAll({
where: { connectorProvider: "google_drive" },
});
logger.info(`Found ${frontDataSources.length} Google Drive data sources`);

for (const frontDataSource of frontDataSources) {
await backfillDataSource(
frontDataSource,
coreSequelize,
connectorsSequelize,
execute,
logger.child({
dataSourceId: frontDataSource.id,
connectorId: frontDataSource.connectorId,
name: frontDataSource.name,
})
);
}
});

// Copy-pasted from connectors/src/connectors/google_drive/index.ts
function getSourceUrlForGoogleDriveFiles(
driveFileId: string,
mimeType: string
): string {
if (isGoogleDriveSpreadSheetFile(mimeType)) {
return `https://docs.google.com/spreadsheets/d/${driveFileId}/edit`;
} else if (isGoogleDriveFolder(mimeType)) {
return `https://drive.google.com/drive/folders/${driveFileId}`;
}

return `https://drive.google.com/file/d/${driveFileId}/view`;
}

function isGoogleDriveFolder(mimeType: string) {
return mimeType === "application/vnd.google-apps.folder";
}

function isGoogleDriveSpreadSheetFile(mimeType: string) {
return mimeType === "application/vnd.google-apps.spreadsheet";
}

function getSourceUrlForGoogleDriveSheet(
driveFileId: string,
driveSheetId: number
): string {
return `https://docs.google.com/spreadsheets/d/${driveFileId}/edit#gid=${driveSheetId}`;
}

// Copy-pasted from types/src/connectors/google_drive.ts
export function getGoogleSheetTableId(
googleFileId: string,
googleSheetId: number
): string {
return `google-spreadsheet-${googleFileId}-sheet-${googleSheetId}`;
}

0 comments on commit 243ac24

Please sign in to comment.