From 0de3ecc958cfc8935b32173cc6d7504a6900a3e1 Mon Sep 17 00:00:00 2001 From: Aric Lasry Date: Tue, 23 Apr 2024 11:51:54 +0200 Subject: [PATCH] [Gdrive] Narrower incremental sync (#4805) * [Gdrive] Narrower incremental sync * Addressing PopDaph comments --- .../src/connectors/google_drive/index.ts | 4 +- connectors/src/connectors/google_drive/lib.ts | 8 ++-- .../google_drive/temporal/activities.ts | 42 +++++++++++-------- .../google_drive/temporal/workflows.ts | 8 ++-- 4 files changed, 34 insertions(+), 28 deletions(-) diff --git a/connectors/src/connectors/google_drive/index.ts b/connectors/src/connectors/google_drive/index.ts index a38deddea73a..a21e74710691 100644 --- a/connectors/src/connectors/google_drive/index.ts +++ b/connectors/src/connectors/google_drive/index.ts @@ -18,7 +18,7 @@ import { nangoDeleteConnection } from "@connectors/lib/nango_client"; import logger from "@connectors/logger/logger"; import type { DataSourceConfig } from "@connectors/types/data_source_config.js"; -import { folderHasChildren, getDrivesIds } from "./temporal/activities"; +import { folderHasChildren, getDrives } from "./temporal/activities"; import { launchGoogleDriveFullSyncWorkflow, launchGoogleGarbageCollector, @@ -438,7 +438,7 @@ export async function retrieveGoogleDriveConnectorPermissions({ } else if (filterPermission === null) { if (parentInternalId === null) { // Return the list of remote shared drives. - const drives = await getDrivesIds(c.id); + const drives = await getDrives(c.id); const nodes: ContentNode[] = await Promise.all( drives.map(async (d): Promise => { diff --git a/connectors/src/connectors/google_drive/lib.ts b/connectors/src/connectors/google_drive/lib.ts index 3a7c5eb51d72..60ef63baf454 100644 --- a/connectors/src/connectors/google_drive/lib.ts +++ b/connectors/src/connectors/google_drive/lib.ts @@ -13,7 +13,7 @@ import { v4 as uuidv4 } from "uuid"; import { GOOGLE_DRIVE_WEBHOOK_LIFE_MS } from "@connectors/connectors/google_drive/lib/config"; import { getGoogleDriveObject } from "@connectors/connectors/google_drive/lib/google_drive_api"; import { - getDrivesIdsToSync, + getDrivesToSync, getSyncPageToken, } from "@connectors/connectors/google_drive/temporal/activities"; import { isGoogleDriveSpreadSheetFile } from "@connectors/connectors/google_drive/temporal/mime_types"; @@ -40,10 +40,10 @@ export async function registerWebhooksForAllDrives({ connector: ConnectorResource; marginMs: number; }): Promise> { - const driveIdsToSync = await getDrivesIdsToSync(connector.id); + const drivesToSync = await getDrivesToSync(connector.id); const allRes = await Promise.all( - driveIdsToSync.map((driveId) => { - return ensureWebhookForDriveId(connector, driveId, marginMs); + drivesToSync.map((drive) => { + return ensureWebhookForDriveId(connector, drive.id, marginMs); }) ); diff --git a/connectors/src/connectors/google_drive/temporal/activities.ts b/connectors/src/connectors/google_drive/temporal/activities.ts index 97c1ff574f22..9a9afbd189a3 100644 --- a/connectors/src/connectors/google_drive/temporal/activities.ts +++ b/connectors/src/connectors/google_drive/temporal/activities.ts @@ -47,15 +47,17 @@ import { FILE_ATTRIBUTES_TO_FETCH } from "@connectors/types/google_drive"; const FILES_SYNC_CONCURRENCY = 10; const FILES_GC_CONCURRENCY = 5; +type LightGoogledrive = { + id: string; + name: string; + isSharedDrive: boolean; +}; + export const statsDClient = new StatsD(); -export async function getDrivesIds(connectorId: ModelId): Promise< - { - id: string; - name: string; - sharedDrive: boolean; - }[] -> { +export async function getDrives( + connectorId: ModelId +): Promise { const connector = await ConnectorResource.fetchById(connectorId); if (!connector) { throw new Error(`Connector ${connectorId} not found`); @@ -64,9 +66,9 @@ export async function getDrivesIds(connectorId: ModelId): Promise< let nextPageToken: string | undefined | null = undefined; const authCredentials = await getAuthObject(connector.connectionId); - const ids: { id: string; name: string; sharedDrive: boolean }[] = []; + const drives: LightGoogledrive[] = []; const myDriveId = await getMyDriveIdCached(authCredentials); - ids.push({ id: myDriveId, name: "My Drive", sharedDrive: false }); + drives.push({ id: myDriveId, name: "My Drive", isSharedDrive: false }); do { const res: GaxiosResponse = await drive.drives.list({ @@ -84,19 +86,19 @@ export async function getDrivesIds(connectorId: ModelId): Promise< } for (const drive of res.data.drives) { if (drive.id && drive.name) { - ids.push({ id: drive.id, name: drive.name, sharedDrive: true }); + drives.push({ id: drive.id, name: drive.name, isSharedDrive: true }); } } nextPageToken = res.data.nextPageToken; } while (nextPageToken); - return ids; + return drives; } // Get the list of drives that have folders selected for sync. -export async function getDrivesIdsToSync( +export async function getDrivesToSync( connectorId: ModelId -): Promise { +): Promise { const selectedFolders = await GoogleDriveFolders.findAll({ where: { connectorId: connectorId, @@ -107,7 +109,7 @@ export async function getDrivesIdsToSync( throw new Error(`Connector ${connectorId} not found`); } const authCredentials = await getAuthObject(connector.connectionId); - const driveIds = new Set(); + const drives: Record = {}; for (const folder of selectedFolders) { const remoteFolder = await getGoogleDriveObject( @@ -118,11 +120,15 @@ export async function getDrivesIdsToSync( if (!remoteFolder.driveId) { throw new Error(`Folder ${folder.folderId} does not have a driveId.`); } - driveIds.add(remoteFolder.driveId); + drives[remoteFolder.driveId] = { + id: remoteFolder.driveId, + name: remoteFolder.name, + isSharedDrive: remoteFolder.isInSharedDrive, + }; } } - return [...driveIds]; + return Object.values(drives); } export async function syncFiles( @@ -691,12 +697,12 @@ export async function populateSyncTokens(connectorId: ModelId) { if (!connector) { throw new Error(`Connector ${connectorId} not found`); } - const drivesIds = await getDrivesIds(connector.id); + const drivesIds = await getDrives(connector.id); for (const drive of drivesIds) { const lastSyncToken = await getSyncPageToken( connectorId, drive.id, - drive.sharedDrive + drive.isSharedDrive ); await GoogleDriveSyncToken.upsert({ connectorId: connectorId, diff --git a/connectors/src/connectors/google_drive/temporal/workflows.ts b/connectors/src/connectors/google_drive/temporal/workflows.ts index ce442617e532..e743bbb15790 100644 --- a/connectors/src/connectors/google_drive/temporal/workflows.ts +++ b/connectors/src/connectors/google_drive/temporal/workflows.ts @@ -16,7 +16,7 @@ import { GDRIVE_INCREMENTAL_SYNC_DEBOUNCE_SEC } from "./config"; import { newWebhookSignal } from "./signals"; const { - getDrivesIds, + getDrivesToSync, garbageCollector, getFoldersToSync, renewWebhooks, @@ -151,16 +151,16 @@ export async function googleDriveIncrementalSync( passCount++; console.log("Doing pass number", passCount); console.log(`Processing after debouncing ${debounceCount} time(s)`); - const drivesIds = await getDrivesIds(connectorId); + const drives = await getDrivesToSync(connectorId); const startSyncTs = new Date().getTime(); - for (const googleDrive of drivesIds) { + for (const googleDrive of drives) { let nextPageToken: undefined | string = undefined; do { nextPageToken = await incrementalSync( connectorId, dataSourceConfig, googleDrive.id, - googleDrive.sharedDrive, + googleDrive.isSharedDrive, startSyncTs, nextPageToken );