Skip to content

Commit

Permalink
feat: add ability to unpause connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
Henry Fontanier committed Apr 4, 2024
1 parent 081d591 commit 5e75077
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 1 deletion.
60 changes: 60 additions & 0 deletions connectors/src/api/unpause_connector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import type { WithConnectorsAPIErrorReponse } from "@dust-tt/types";
import type { Request, Response } from "express";

import { UNPAUSE_CONNECTOR_BY_TYPE } from "@connectors/connectors";
import { errorFromAny } from "@connectors/lib/error";
import logger from "@connectors/logger/logger";
import { apiError, withLogging } from "@connectors/logger/withlogging";
import { ConnectorResource } from "@connectors/resources/connector_resource";

type ConnectorUnpauseResBody = WithConnectorsAPIErrorReponse<{
connectorId: string;
}>;

const _unpauseConnectorAPIHandler = async (
req: Request<{ connector_id: string }, ConnectorUnpauseResBody>,
res: Response<ConnectorUnpauseResBody>
) => {
try {
const connector = await ConnectorResource.fetchById(
req.params.connector_id
);
if (!connector) {
return apiError(req, res, {
api_error: {
type: "connector_not_found",
message: "Connector not found",
},
status_code: 404,
});
}
const connectorUnpauser = UNPAUSE_CONNECTOR_BY_TYPE[connector.type];

const unpauseRes = await connectorUnpauser(connector.id);

if (unpauseRes.isErr()) {
return apiError(req, res, {
status_code: 500,
api_error: {
type: "internal_server_error",
message: unpauseRes.error.message,
},
});
}

return res.sendStatus(204);
} catch (e) {
logger.error(errorFromAny(e), "Failed to unpause the connector");
return apiError(req, res, {
status_code: 500,
api_error: {
type: "internal_server_error",
message: "Could not unpause the connector",
},
});
}
};

export const unpauseConnectorAPIHandler = withLogging(
_unpauseConnectorAPIHandler
);
2 changes: 2 additions & 0 deletions connectors/src/api_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
} from "@connectors/api/slack_channels_linked_with_agent";
import { stopConnectorAPIHandler } from "@connectors/api/stop_connector";
import { syncConnectorAPIHandler } from "@connectors/api/sync_connector";
import { unpauseConnectorAPIHandler } from "@connectors/api/unpause_connector";
import { postConnectorUpdateAPIHandler } from "@connectors/api/update_connector";
import { webhookGithubAPIHandler } from "@connectors/api/webhooks/webhook_github";
import { webhookGoogleDriveAPIHandler } from "@connectors/api/webhooks/webhook_google_drive";
Expand Down Expand Up @@ -91,6 +92,7 @@ export function startServer(port: number) {
app.post("/connectors/update/:connector_id/", postConnectorUpdateAPIHandler);
app.post("/connectors/stop/:connector_id", stopConnectorAPIHandler);
app.post("/connectors/pause/:connector_id", pauseConnectorAPIHandler);
app.post("/connectors/unpause/:connector_id", unpauseConnectorAPIHandler);
app.post("/connectors/resume/:connector_id", resumeConnectorAPIHandler);
app.delete("/connectors/delete/:connector_id", deleteConnectorAPIHandler);
app.get("/connectors/:connector_id", getConnectorAPIHandler);
Expand Down
32 changes: 32 additions & 0 deletions connectors/src/connectors/confluence/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,38 @@ export async function resumeConfluenceConnector(
}
}

export async function pauseConfluenceConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "Connector not found.");
return new Err(new Error("Connector not found"));
}

await connector.markAsPaused();

return new Ok(undefined);
}

export async function unpauseConfluenceConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "Connector not found.");
return new Err(new Error("Connector not found"));
}

await connector.markAsUnpaused();
const r = await launchConfluenceSyncWorkflow(connectorId, null);
if (r.isErr()) {
return r;
}

return new Ok(undefined);
}

export async function cleanupConfluenceConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
Expand Down
17 changes: 17 additions & 0 deletions connectors/src/connectors/github/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,23 @@ export async function pauseGithubConnector(
return new Ok(undefined);
}

export async function unpauseGithubConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "Connector not found");
return new Err(new Error("Connector not found"));
}
await connector.markAsUnpaused();
await launchGithubFullSyncWorkflow({
connectorId,
syncCodeOnly: false,
});

return new Ok(undefined);
}

export async function resumeGithubConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
Expand Down
13 changes: 13 additions & 0 deletions connectors/src/connectors/google_drive/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -836,3 +836,16 @@ export async function pauseGoogleDriveConnector(connectorId: ModelId) {
await terminateAllWorkflowsForConnectorId(connectorId);
return new Ok(undefined);
}

export async function unpauseGoogleDriveConnector(connectorId: ModelId) {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
return new Err(new Error(`Connector not found with id ${connectorId}`));
}
await connector.markAsUnpaused();
const r = await launchGoogleDriveFullSyncWorkflow(connectorId, null);
if (r.isErr()) {
return r;
}
return new Ok(undefined);
}
26 changes: 25 additions & 1 deletion connectors/src/connectors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import { Err, Ok } from "@dust-tt/types";
import {
cleanupConfluenceConnector,
createConfluenceConnector,
pauseConfluenceConnector,
resumeConfluenceConnector,
retrieveConfluenceConnectorPermissions,
retrieveConfluenceContentNodeParents,
retrieveConfluenceContentNodes,
setConfluenceConnectorPermissions,
stopConfluenceConnector,
unpauseConfluenceConnector,
updateConfluenceConnector,
} from "@connectors/connectors/confluence";
import { launchConfluenceSyncWorkflow } from "@connectors/connectors/confluence/temporal/client";
Expand All @@ -29,6 +31,7 @@ import {
retrieveGithubReposContentNodes,
setGithubConfig,
stopGithubConnector,
unpauseGithubConnector,
updateGithubConnector,
} from "@connectors/connectors/github";
import {
Expand All @@ -42,6 +45,7 @@ import {
retrieveGoogleDriveContentNodes,
setGoogleDriveConfig,
setGoogleDriveConnectorPermissions,
unpauseGoogleDriveConnector,
updateGoogleDriveConnector,
} from "@connectors/connectors/google_drive";
import { launchGoogleDriveFullSyncWorkflow } from "@connectors/connectors/google_drive/temporal/client";
Expand All @@ -56,6 +60,7 @@ import {
retrieveIntercomContentNodes,
setIntercomConnectorPermissions,
stopIntercomConnector,
unpauseIntercomConnector,
updateIntercomConnector,
} from "@connectors/connectors/intercom";
import type {
Expand All @@ -71,6 +76,7 @@ import type {
ConnectorProviderUpdateConfigurationMapping,
ConnectorResumer,
ConnectorStopper,
ConnectorUnpauser,
ConnectorUpdater,
ContentNodeParentsRetriever,
SyncConnector,
Expand All @@ -85,6 +91,7 @@ import {
retrieveNotionContentNodeParents,
retrieveNotionContentNodes,
stopNotionConnector,
unpauseNotionConnector,
updateNotionConnector,
} from "@connectors/connectors/notion";
import {
Expand All @@ -96,6 +103,7 @@ import {
retrieveSlackContentNodes,
setSlackConfig,
setSlackConnectorPermissions,
unpauseSlackConnector,
updateSlackConnector,
} from "@connectors/connectors/slack";
import { launchSlackSyncWorkflow } from "@connectors/connectors/slack/temporal/client";
Expand All @@ -110,6 +118,7 @@ import {
retrieveWebCrawlerContentNodes,
setWebcrawlerConfiguration,
stopWebcrawlerConnector,
unpauseWebcrawlerConnector,
} from "./webcrawler";
import { launchCrawlWebsiteWorkflow } from "./webcrawler/temporal/client";

Expand Down Expand Up @@ -368,11 +377,26 @@ export const PAUSE_CONNECTOR_BY_TYPE: Record<
ConnectorProvider,
ConnectorPauser
> = {
confluence: stopConfluenceConnector,
confluence: pauseConfluenceConnector,
slack: pauseSlackConnector,
notion: pauseNotionConnector,
github: pauseGithubConnector,
google_drive: pauseGoogleDriveConnector,
intercom: pauseIntercomConnector,
webcrawler: pauseWebcrawlerConnector,
};

// If the connector has webhooks: resume processing them, and trigger a full sync.
// If the connector has long-running workflows: resume them. If they support "partial resync" do that, otherwise trigger a full sync.
export const UNPAUSE_CONNECTOR_BY_TYPE: Record<
ConnectorProvider,
ConnectorUnpauser
> = {
confluence: unpauseConfluenceConnector,
slack: unpauseSlackConnector,
notion: unpauseNotionConnector,
github: unpauseGithubConnector,
google_drive: unpauseGoogleDriveConnector,
intercom: unpauseIntercomConnector,
webcrawler: unpauseWebcrawlerConnector,
};
19 changes: 19 additions & 0 deletions connectors/src/connectors/intercom/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -756,3 +756,22 @@ export async function pauseIntercomConnector(connectorId: ModelId) {

return new Ok(undefined);
}

export async function unpauseIntercomConnector(connectorId: ModelId) {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
logger.error({ connectorId }, "[Intercom] Connector not found.");
return new Err(new Error("Connector not found"));
}

await connector.markAsUnpaused();

const r = await launchIntercomSyncWorkflow({
connectorId,
});
if (r.isErr()) {
return r;
}

return new Ok(undefined);
}
4 changes: 4 additions & 0 deletions connectors/src/connectors/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ export type ConnectorGarbageCollector = (
export type ConnectorPauser = (
connectorId: ModelId
) => Promise<Result<undefined, Error>>;
export type ConnectorUnpauser = (
connectorId: ModelId
) => Promise<Result<undefined, Error>>;

export type ConnectorConfigurationSetter<T extends ConnectorConfiguration> = (
connectorId: ModelId,
configuration: T
Expand Down
25 changes: 25 additions & 0 deletions connectors/src/connectors/notion/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,31 @@ export async function pauseNotionConnector(
return new Ok(undefined);
}

export async function unpauseNotionConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
const connector = await ConnectorResource.fetchById(connectorId);

if (!connector) {
logger.error(
{
connectorId,
},
"Notion connector not found."
);

return new Err(new Error("Connector not found"));
}

await connector.markAsUnpaused();
const r = await resumeNotionConnector(connector.id);
if (r.isErr()) {
return r;
}

return new Ok(undefined);
}

export async function resumeNotionConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
Expand Down
13 changes: 13 additions & 0 deletions connectors/src/connectors/slack/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -653,3 +653,16 @@ export async function pauseSlackConnector(connectorId: ModelId) {
await terminateAllWorkflowsForConnectorId(connectorId);
return new Ok(undefined);
}

export async function unpauseSlackConnector(connectorId: ModelId) {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
return new Err(new Error(`Connector not found with id ${connectorId}`));
}
await connector.markAsUnpaused();
const r = await launchSlackSyncWorkflow(connectorId, null);
if (r.isErr()) {
return r;
}
return new Ok(undefined);
}
15 changes: 15 additions & 0 deletions connectors/src/connectors/webcrawler/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,21 @@ export async function pauseWebcrawlerConnector(
return new Ok(undefined);
}

export async function unpauseWebcrawlerConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
const connector = await ConnectorResource.fetchById(connectorId);
if (!connector) {
throw new Error("Connector not found.");
}
await connector.markAsUnpaused();
const startRes = await launchCrawlWebsiteWorkflow(connectorId);
if (startRes.isErr()) {
return startRes;
}
return new Ok(undefined);
}

export async function cleanupWebcrawlerConnector(
connectorId: ModelId
): Promise<Result<undefined, Error>> {
Expand Down
4 changes: 4 additions & 0 deletions connectors/src/resources/connector_resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,8 @@ export class ConnectorResource extends BaseResource<ConnectorModel> {
async markAsPaused() {
return this.update({ pausedAt: new Date() });
}

async markAsUnpaused() {
return this.update({ pausedAt: null });
}
}
14 changes: 14 additions & 0 deletions types/src/front/lib/connectors_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ export class ConnectorsAPI {
return this._resultFromResponse(res);
}

async unpauseConnector(
connectorId: string
): Promise<ConnectorsAPIResponse<undefined>> {
const res = await fetch(
`${CONNECTORS_API}/connectors/unpause/${encodeURIComponent(connectorId)}`,
{
method: "POST",
headers: this.getDefaultHeaders(),
}
);

return this._resultFromResponse(res);
}

async resumeConnector(
connectorId: string
): Promise<ConnectorsAPIResponse<undefined>> {
Expand Down

0 comments on commit 5e75077

Please sign in to comment.