From 0cf65301e03a3b8e8ff3ffc8fd5c1d12887e44bb Mon Sep 17 00:00:00 2001 From: Aubin <60398825+aubin-tchoi@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:46:56 +0100 Subject: [PATCH] github - add new workflows with the folders upsert (#9562) * add new workflows that are duplicates of the old ones with the new upsert activities * fix backfill script * rename workflows with v2 * :memo: --- .../20241219_backfill_github_folders.ts | 2 +- .../src/connectors/github/temporal/client.ts | 10 +- .../connectors/github/temporal/workflows.ts | 363 ++++++++++++++++++ 3 files changed, 369 insertions(+), 6 deletions(-) diff --git a/connectors/migrations/20241219_backfill_github_folders.ts b/connectors/migrations/20241219_backfill_github_folders.ts index 403502e7eba5..a8ee24170d11 100644 --- a/connectors/migrations/20241219_backfill_github_folders.ts +++ b/connectors/migrations/20241219_backfill_github_folders.ts @@ -139,7 +139,7 @@ async function upsertFoldersForConnector( } } makeScript({}, async ({ execute }, logger) => { - const connectors = await ConnectorResource.listByType("zendesk", {}); + const connectors = await ConnectorResource.listByType("github", {}); for (const connector of connectors) { logger.info(`Upserting folders for connector ${connector.id}`); diff --git a/connectors/src/connectors/github/temporal/client.ts b/connectors/src/connectors/github/temporal/client.ts index 3ad80d072a9e..78a0b26441f6 100644 --- a/connectors/src/connectors/github/temporal/client.ts +++ b/connectors/src/connectors/github/temporal/client.ts @@ -23,11 +23,11 @@ import { githubCodeSyncWorkflow, githubDiscussionGarbageCollectWorkflow, githubDiscussionSyncWorkflow, - githubFullSyncWorkflow, + githubFullSyncWorkflowV2, githubIssueGarbageCollectWorkflow, githubIssueSyncWorkflow, githubRepoGarbageCollectWorkflow, - githubReposSyncWorkflow, + githubReposSyncWorkflowV2, } from "@connectors/connectors/github/temporal/workflows"; import { dataSourceConfigFromConnector } from "@connectors/lib/api/data_source_config"; import { getTemporalClient } from "@connectors/lib/temporal"; @@ -68,7 +68,7 @@ export async function launchGithubFullSyncWorkflow({ return; } - await client.workflow.start(githubFullSyncWorkflow, { + await client.workflow.start(githubFullSyncWorkflowV2, { args: [dataSourceConfig, connectorId, syncCodeOnly, forceCodeResync], taskQueue: QUEUE_NAME, workflowId: getFullSyncWorkflowId(connectorId), @@ -87,7 +87,7 @@ export async function getGithubFullSyncWorkflow(connectorId: ModelId): Promise<{ } | null> { const client = await getTemporalClient(); - const handle: WorkflowHandle = + const handle: WorkflowHandle = client.workflow.getHandle(getFullSyncWorkflowId(connectorId)); try { @@ -116,7 +116,7 @@ export async function launchGithubReposSyncWorkflow( } const dataSourceConfig = dataSourceConfigFromConnector(connector); - await client.workflow.start(githubReposSyncWorkflow, { + await client.workflow.start(githubReposSyncWorkflowV2, { args: [dataSourceConfig, connectorId, orgLogin, repos], taskQueue: QUEUE_NAME, workflowId: getReposSyncWorkflowId(connectorId), diff --git a/connectors/src/connectors/github/temporal/workflows.ts b/connectors/src/connectors/github/temporal/workflows.ts index 5e79a411b0b4..1c839de56a83 100644 --- a/connectors/src/connectors/github/temporal/workflows.ts +++ b/connectors/src/connectors/github/temporal/workflows.ts @@ -29,6 +29,9 @@ const { githubGetRepoDiscussionsResultPageActivity, githubIssueGarbageCollectActivity, githubDiscussionGarbageCollectActivity, + githubUpsertDiscussionsFolderActivity, + githubUpsertIssuesFolderActivity, + githubUpsertRepositoryFolderActivity, } = proxyActivities({ startToCloseTimeout: "5 minute", }); @@ -58,6 +61,10 @@ const { githubCodeSyncActivity } = proxyActivities({ const MAX_CONCURRENT_REPO_SYNC_WORKFLOWS = 3; const MAX_CONCURRENT_ISSUE_SYNC_ACTIVITIES_PER_WORKFLOW = 8; +/** + * Duplicate of the one belows apart from the child workflow (githubSyncRepoWorkflow vs githubRepoSyncWorkflow). + * Kept for backwards compatibility (to avoid non-deterministic errors). + */ export async function githubFullSyncWorkflow( dataSourceConfig: DataSourceConfig, connectorId: ModelId, @@ -118,6 +125,74 @@ export async function githubFullSyncWorkflow( await githubSaveSuccessSyncActivity(dataSourceConfig); } +/** + * This workflow is used to fetch and sync all the repositories of a GitHub connector. + * It's called v2 because we had to add it when there was already a workflow without the v2 to avoid non-deterministic errors. + */ +export async function githubFullSyncWorkflowV2( + dataSourceConfig: DataSourceConfig, + connectorId: ModelId, + // Used to re-trigger a code-only full-sync after code syncing is enabled/disabled. + syncCodeOnly: boolean, + forceCodeResync = false +) { + await githubSaveStartSyncActivity(dataSourceConfig); + + const queue = new PQueue({ concurrency: MAX_CONCURRENT_REPO_SYNC_WORKFLOWS }); + const promises: Promise[] = []; + + let pageNumber = 1; // 1-indexed + + for (;;) { + const resultsPage = await githubGetReposResultPageActivity( + connectorId, + pageNumber, + { syncCodeOnly: syncCodeOnly.toString() } + ); + if (!resultsPage.length) { + break; + } + pageNumber += 1; + + for (const repo of resultsPage) { + const fullSyncWorkflowId = getFullSyncWorkflowId(connectorId); + const childWorkflowId = `${fullSyncWorkflowId}-repo-${repo.id}-syncCodeOnly-${syncCodeOnly}`; + promises.push( + queue.add(() => + executeChild(githubRepoSyncWorkflowV2, { + workflowId: childWorkflowId, + searchAttributes: { + connectorId: [connectorId], + }, + args: [ + { + dataSourceConfig, + connectorId, + repoName: repo.name, + repoId: repo.id, + repoLogin: repo.login, + syncCodeOnly, + isFullSync: true, + forceCodeResync, + }, + ], + parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, + memo: workflowInfo().memo, + }) + ) + ); + } + } + + await Promise.all(promises); + + await githubSaveSuccessSyncActivity(dataSourceConfig); +} + +/** + * Duplicate of the one belows apart from the child workflow (githubSyncRepoWorkflow vs githubRepoSyncWorkflow). + * Kept for backwards compatibility (to avoid non-deterministic errors). + */ export async function githubReposSyncWorkflow( dataSourceConfig: DataSourceConfig, connectorId: ModelId, @@ -159,6 +234,55 @@ export async function githubReposSyncWorkflow( await githubSaveSuccessSyncActivity(dataSourceConfig); } +/** + * This workflow is used to sync the given repositories of a GitHub connector. + * It's called v2 because we had to add it when there was already a workflow without the v2 to avoid non-deterministic errors. + */ +export async function githubReposSyncWorkflowV2( + dataSourceConfig: DataSourceConfig, + connectorId: ModelId, + orgLogin: string, + repos: { name: string; id: number }[] +) { + const queue = new PQueue({ concurrency: MAX_CONCURRENT_REPO_SYNC_WORKFLOWS }); + const promises: Promise[] = []; + + for (const repo of repos) { + const reposSyncWorkflowId = getReposSyncWorkflowId(connectorId); + const childWorkflowId = `${reposSyncWorkflowId}-repo-${repo.id}`; + promises.push( + queue.add(() => + executeChild(githubRepoSyncWorkflowV2, { + workflowId: childWorkflowId, + searchAttributes: { + connectorId: [connectorId], + }, + args: [ + { + dataSourceConfig, + connectorId, + repoName: repo.name, + repoId: repo.id, + repoLogin: orgLogin, + syncCodeOnly: false, + isFullSync: false, + }, + ], + parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, + memo: workflowInfo().memo, + }) + ) + ); + } + + await Promise.all(promises); + await githubSaveSuccessSyncActivity(dataSourceConfig); +} + +/** + * Duplicate of the one belows apart from the missing activity githubUpsertIssuesFolderActivity. + * Kept for backwards compatibility (to avoid non-deterministic errors). + */ export async function githubRepoIssuesSyncWorkflow({ dataSourceConfig, connectorId, @@ -213,6 +337,71 @@ export async function githubRepoIssuesSyncWorkflow({ return true; } +/** + * This workflow is used to sync all the issues of a GitHub connector. + * It's called v2 because we had to add it when there was already a workflow without the v2 to avoid non-deterministic errors. + */ +export async function githubRepoIssuesSyncWorkflowV2({ + dataSourceConfig, + connectorId, + repoName, + repoId, + repoLogin, + pageNumber, +}: { + dataSourceConfig: DataSourceConfig; + connectorId: ModelId; + repoName: string; + repoId: number; + repoLogin: string; + pageNumber: number; +}): Promise { + // upserting the folder with all the issues + await githubUpsertIssuesFolderActivity({ connectorId, repoId }); + + const queue = new PQueue({ + concurrency: MAX_CONCURRENT_ISSUE_SYNC_ACTIVITIES_PER_WORKFLOW, + }); + const promises: Promise[] = []; + + const resultsPage = await githubGetRepoIssuesResultPageActivity( + connectorId, + repoName, + repoLogin, + pageNumber, + { repoId } + ); + + if (!resultsPage.length) { + return false; + } + + for (const issueNumber of resultsPage) { + promises.push( + queue.add(() => + githubUpsertIssueActivity( + connectorId, + repoName, + repoId, + repoLogin, + issueNumber, + dataSourceConfig, + {}, + true // isBatchSync + ) + ) + ); + } + + await Promise.all(promises); + + return true; +} + +/** + * Duplicate of the one belows apart from the missing activity githubUpsertDiscussionsFolderActivity. + * Kept for backwards compatibility (to avoid non-deterministic errors). + */ export async function githubRepoDiscussionsSyncWorkflow({ dataSourceConfig, connectorId, @@ -264,6 +453,69 @@ export async function githubRepoDiscussionsSyncWorkflow({ return cursor; } +/** + * This workflow is used to sync all the discussions of a GitHub connector. + * It's called v2 because we had to add it when there was already a workflow without the v2 to avoid non-deterministic errors. + */ +export async function githubRepoDiscussionsSyncWorkflowV2({ + dataSourceConfig, + connectorId, + repoName, + repoId, + repoLogin, + nextCursor, +}: { + dataSourceConfig: DataSourceConfig; + connectorId: ModelId; + repoName: string; + repoId: number; + repoLogin: string; + nextCursor: string | null; +}): Promise { + // upserting the folder with all the discussions + await githubUpsertDiscussionsFolderActivity({ connectorId, repoId }); + + const queue = new PQueue({ + concurrency: MAX_CONCURRENT_ISSUE_SYNC_ACTIVITIES_PER_WORKFLOW, + }); + const promises: Promise[] = []; + + const { cursor, discussionNumbers } = + await githubGetRepoDiscussionsResultPageActivity( + connectorId, + repoName, + repoLogin, + nextCursor, + { repoId } + ); + + for (const discussionNumber of discussionNumbers) { + promises.push( + queue.add(() => + githubUpsertDiscussionActivity( + connectorId, + repoName, + repoId, + repoLogin, + discussionNumber, + dataSourceConfig, + {}, + true // isBatchSync + ) + ) + ); + } + + await Promise.all(promises); + + return cursor; +} + +/** + * Duplicate of the one belows apart from the missing activity githubUpsertRepositoryFolderActivity and the fact + * that this one calls the old workflows githubRepoIssuesSyncWorkflow and githubRepoDiscussionsSyncWorkflow. + * Kept for backwards compatibility (to avoid non-deterministic errors). + */ export async function githubRepoSyncWorkflow({ dataSourceConfig, connectorId, @@ -365,6 +617,117 @@ export async function githubRepoSyncWorkflow({ }); } +/** + * This workflow is used to sync all the issues, discussions and code of a GitHub connector. + * It's called v2 because we had to add it when there was already a workflow without the v2 to avoid non-deterministic errors. + */ +export async function githubRepoSyncWorkflowV2({ + dataSourceConfig, + connectorId, + repoName, + repoId, + repoLogin, + syncCodeOnly, + isFullSync, + forceCodeResync = false, +}: { + dataSourceConfig: DataSourceConfig; + connectorId: ModelId; + repoName: string; + repoId: number; + repoLogin: string; + syncCodeOnly: boolean; + isFullSync: boolean; + forceCodeResync?: boolean; +}) { + // upserting the root folder for the repository + await githubUpsertRepositoryFolderActivity({ connectorId, repoId, repoName }); + + if (!syncCodeOnly) { + let pageNumber = 1; // 1-indexed + for (;;) { + const childWorkflowId = `${ + isFullSync + ? getFullSyncWorkflowId(connectorId) + : getReposSyncWorkflowId(connectorId) + }-repo-${repoId}-issues-page-${pageNumber}`; + + const shouldContinue = await executeChild( + githubRepoIssuesSyncWorkflowV2, + { + workflowId: childWorkflowId, + searchAttributes: { + connectorId: [connectorId], + }, + args: [ + { + dataSourceConfig, + connectorId, + repoName, + repoId, + repoLogin, + pageNumber, + }, + ], + parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, + memo: workflowInfo().memo, + } + ); + + if (!shouldContinue) { + break; + } + pageNumber += 1; + } + + let nextCursor: string | null = null; + let cursorIteration = 0; + for (;;) { + const childWorkflowId = `${ + isFullSync + ? getFullSyncWorkflowId(connectorId) + : getReposSyncWorkflowId(connectorId) + }-repo-${repoId}-issues-page-${cursorIteration}`; + + nextCursor = await executeChild(githubRepoDiscussionsSyncWorkflowV2, { + workflowId: childWorkflowId, + searchAttributes: { + connectorId: [connectorId], + }, + args: [ + { + dataSourceConfig, + connectorId, + repoName, + repoId, + repoLogin, + nextCursor, + }, + ], + parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE, + memo: workflowInfo().memo, + }); + + if (!nextCursor) { + break; + } + cursorIteration += 1; + } + } + + // Start code syncing activity. + await githubCodeSyncActivity({ + dataSourceConfig, + connectorId, + repoLogin, + repoName, + repoId, + loggerArgs: { syncCodeOnly: syncCodeOnly ? "true" : "false" }, + isBatchSync: true, + forceResync: forceCodeResync, + }); +} + export async function githubCodeSyncWorkflow( dataSourceConfig: DataSourceConfig, connectorId: ModelId,