Skip to content

Commit

Permalink
[Gdrive] Narrower incremental sync (#4805)
Browse files Browse the repository at this point in the history
* [Gdrive] Narrower incremental sync

* Addressing PopDaph comments
  • Loading branch information
lasryaric authored Apr 23, 2024
1 parent 32d8ef7 commit 0de3ecc
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 28 deletions.
4 changes: 2 additions & 2 deletions connectors/src/connectors/google_drive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { nangoDeleteConnection } from "@connectors/lib/nango_client";
import logger from "@connectors/logger/logger";
import type { DataSourceConfig } from "@connectors/types/data_source_config.js";

import { folderHasChildren, getDrivesIds } from "./temporal/activities";
import { folderHasChildren, getDrives } from "./temporal/activities";
import {
launchGoogleDriveFullSyncWorkflow,
launchGoogleGarbageCollector,
Expand Down Expand Up @@ -438,7 +438,7 @@ export async function retrieveGoogleDriveConnectorPermissions({
} else if (filterPermission === null) {
if (parentInternalId === null) {
// Return the list of remote shared drives.
const drives = await getDrivesIds(c.id);
const drives = await getDrives(c.id);

const nodes: ContentNode[] = await Promise.all(
drives.map(async (d): Promise<ContentNode> => {
Expand Down
8 changes: 4 additions & 4 deletions connectors/src/connectors/google_drive/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { v4 as uuidv4 } from "uuid";
import { GOOGLE_DRIVE_WEBHOOK_LIFE_MS } from "@connectors/connectors/google_drive/lib/config";
import { getGoogleDriveObject } from "@connectors/connectors/google_drive/lib/google_drive_api";
import {
getDrivesIdsToSync,
getDrivesToSync,
getSyncPageToken,
} from "@connectors/connectors/google_drive/temporal/activities";
import { isGoogleDriveSpreadSheetFile } from "@connectors/connectors/google_drive/temporal/mime_types";
Expand All @@ -40,10 +40,10 @@ export async function registerWebhooksForAllDrives({
connector: ConnectorResource;
marginMs: number;
}): Promise<Result<undefined, Error[]>> {
const driveIdsToSync = await getDrivesIdsToSync(connector.id);
const drivesToSync = await getDrivesToSync(connector.id);
const allRes = await Promise.all(
driveIdsToSync.map((driveId) => {
return ensureWebhookForDriveId(connector, driveId, marginMs);
drivesToSync.map((drive) => {
return ensureWebhookForDriveId(connector, drive.id, marginMs);
})
);

Expand Down
42 changes: 24 additions & 18 deletions connectors/src/connectors/google_drive/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,17 @@ import { FILE_ATTRIBUTES_TO_FETCH } from "@connectors/types/google_drive";
const FILES_SYNC_CONCURRENCY = 10;
const FILES_GC_CONCURRENCY = 5;

type LightGoogledrive = {
id: string;
name: string;
isSharedDrive: boolean;
};

export const statsDClient = new StatsD();

export async function getDrivesIds(connectorId: ModelId): Promise<
{
id: string;
name: string;
sharedDrive: boolean;
}[]
> {
export async function getDrives(
connectorId: ModelId
): Promise<LightGoogledrive[]> {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error(`Connector ${connectorId} not found`);
Expand All @@ -64,9 +66,9 @@ export async function getDrivesIds(connectorId: ModelId): Promise<

let nextPageToken: string | undefined | null = undefined;
const authCredentials = await getAuthObject(connector.connectionId);
const ids: { id: string; name: string; sharedDrive: boolean }[] = [];
const drives: LightGoogledrive[] = [];
const myDriveId = await getMyDriveIdCached(authCredentials);
ids.push({ id: myDriveId, name: "My Drive", sharedDrive: false });
drives.push({ id: myDriveId, name: "My Drive", isSharedDrive: false });
do {
const res: GaxiosResponse<drive_v3.Schema$DriveList> =
await drive.drives.list({
Expand All @@ -84,19 +86,19 @@ export async function getDrivesIds(connectorId: ModelId): Promise<
}
for (const drive of res.data.drives) {
if (drive.id && drive.name) {
ids.push({ id: drive.id, name: drive.name, sharedDrive: true });
drives.push({ id: drive.id, name: drive.name, isSharedDrive: true });
}
}
nextPageToken = res.data.nextPageToken;
} while (nextPageToken);

return ids;
return drives;
}

// Get the list of drives that have folders selected for sync.
export async function getDrivesIdsToSync(
export async function getDrivesToSync(
connectorId: ModelId
): Promise<string[]> {
): Promise<LightGoogledrive[]> {
const selectedFolders = await GoogleDriveFolders.findAll({
where: {
connectorId: connectorId,
Expand All @@ -107,7 +109,7 @@ export async function getDrivesIdsToSync(
throw new Error(`Connector ${connectorId} not found`);
}
const authCredentials = await getAuthObject(connector.connectionId);
const driveIds = new Set<string>();
const drives: Record<string, LightGoogledrive> = {};

for (const folder of selectedFolders) {
const remoteFolder = await getGoogleDriveObject(
Expand All @@ -118,11 +120,15 @@ export async function getDrivesIdsToSync(
if (!remoteFolder.driveId) {
throw new Error(`Folder ${folder.folderId} does not have a driveId.`);
}
driveIds.add(remoteFolder.driveId);
drives[remoteFolder.driveId] = {
id: remoteFolder.driveId,
name: remoteFolder.name,
isSharedDrive: remoteFolder.isInSharedDrive,
};
}
}

return [...driveIds];
return Object.values(drives);
}

export async function syncFiles(
Expand Down Expand Up @@ -691,12 +697,12 @@ export async function populateSyncTokens(connectorId: ModelId) {
if (!connector) {
throw new Error(`Connector ${connectorId} not found`);
}
const drivesIds = await getDrivesIds(connector.id);
const drivesIds = await getDrives(connector.id);
for (const drive of drivesIds) {
const lastSyncToken = await getSyncPageToken(
connectorId,
drive.id,
drive.sharedDrive
drive.isSharedDrive
);
await GoogleDriveSyncToken.upsert({
connectorId: connectorId,
Expand Down
8 changes: 4 additions & 4 deletions connectors/src/connectors/google_drive/temporal/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { GDRIVE_INCREMENTAL_SYNC_DEBOUNCE_SEC } from "./config";
import { newWebhookSignal } from "./signals";

const {
getDrivesIds,
getDrivesToSync,
garbageCollector,
getFoldersToSync,
renewWebhooks,
Expand Down Expand Up @@ -151,16 +151,16 @@ export async function googleDriveIncrementalSync(
passCount++;
console.log("Doing pass number", passCount);
console.log(`Processing after debouncing ${debounceCount} time(s)`);
const drivesIds = await getDrivesIds(connectorId);
const drives = await getDrivesToSync(connectorId);
const startSyncTs = new Date().getTime();
for (const googleDrive of drivesIds) {
for (const googleDrive of drives) {
let nextPageToken: undefined | string = undefined;
do {
nextPageToken = await incrementalSync(
connectorId,
dataSourceConfig,
googleDrive.id,
googleDrive.sharedDrive,
googleDrive.isSharedDrive,
startSyncTs,
nextPageToken
);
Expand Down

0 comments on commit 0de3ecc

Please sign in to comment.