Skip to content

Commit

Permalink
Select Slack channels to sync directly on Dust and have the bot auto …
Browse files Browse the repository at this point in the history
…join them.
  • Loading branch information
lasryaric committed Oct 26, 2023
1 parent bf42a4f commit c9c4278
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async function main() {
);

const accessToken = await getAccessToken(c.connectionId);
const channelsInSlack = await getChannels(accessToken);
const channelsInSlack = await getChannels(accessToken, true);
const channelIdsInSlackSet = new Set(
channelsInSlack.map((c) => c.id).filter((id) => id)
);
Expand Down
73 changes: 1 addition & 72 deletions connectors/src/api/webhooks/webhook_slack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
getBotUserIdMemoized,
} from "@connectors/connectors/slack/temporal/activities";
import {
launchSlackBotJoinedWorkflow,
launchSlackSyncOneMessageWorkflow,
launchSlackSyncOneThreadWorkflow,
} from "@connectors/connectors/slack/temporal/client";
Expand All @@ -17,7 +16,7 @@ import {
SlackChannel,
SlackConfiguration,
} from "@connectors/lib/models";
import { Err, Ok } from "@connectors/lib/result";
import { Ok } from "@connectors/lib/result";
import logger from "@connectors/logger/logger";
import { apiError, withLogging } from "@connectors/logger/withlogging";

Expand Down Expand Up @@ -327,76 +326,6 @@ const _webhookSlackAPIHandler = async (
break;
}

/**
* `member_joined_channel` handler.
*/
case "member_joined_channel": {
if (!req.body.event?.channel || !req.body.event?.user) {
return apiError(req, res, {
api_error: {
type: "invalid_request_error",
message:
"Missing channel or user in request body for member_joined_channel event",
},
status_code: 400,
});
}
const channel = req.body.event.channel;
const user = req.body.event.user;
let err: Error | null = null;

const results = await Promise.all(
slackConfigurations.map((c) =>
(async (): Promise<Err<Error> | Ok<string>> => {
const connector = await Connector.findByPk(c.connectorId);
if (!connector) {
return new Err(
new Error(`Connector ${c.connectorId} not found`)
);
}

const slackAccessToken = await getAccessToken(
connector.connectionId
);
const myUserId = await getBotUserIdMemoized(slackAccessToken);
if (myUserId !== user) {
return new Ok("");
}
return await launchSlackBotJoinedWorkflow(
c.connectorId.toString(),
channel
);
})()
)
);
for (const r of results) {
if (r.isErr()) {
err = r.error;
}
}

if (err) {
return apiError(req, res, {
api_error: {
type: "internal_server_error",
message: err.message,
},
status_code: 500,
});
} else {
logger.info(
{
type: req.body.event.type,
channel: req.body.event.channel,
user: req.body.event.user,
},
`Successfully processed Slack Webhook`
);
return res.status(200).send();
}
break;
}

/**
* `channel_left`, `channel_deleted` handler.
*/
Expand Down
3 changes: 1 addition & 2 deletions connectors/src/connectors/slack/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ async function botAnswerMessage(
}
let conversation: ConversationType | undefined = undefined;
let userMessage: UserMessageType | undefined = undefined;

if (lastSlackChatBotMessage?.conversationId) {
if (buildContentFragmentRes.value) {
const contentFragmentRes = await dustAPI.postContentFragment({
Expand Down Expand Up @@ -609,7 +608,7 @@ async function makeContentFragment(
startingAtTs: string | null,
connector: Connector
) {
const slackChannelPromise = slackClient.conversations.info({
const slackChannelPromise = await slackClient.conversations.info({
channel: channelId,
});
let allMessages: Message[] = [];
Expand Down
130 changes: 100 additions & 30 deletions connectors/src/connectors/slack/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { WebClient } from "@slack/web-api";
import PQueue from "p-queue";
import { Op } from "sequelize";

import { ConnectorPermissionRetriever } from "@connectors/connectors/interface";
import {
Expand Down Expand Up @@ -30,7 +29,12 @@ import {
ConnectorResource,
} from "@connectors/types/resources";

import { getAccessToken, getSlackClient } from "./temporal/activities";
import {
getAccessToken,
getChannels,
getSlackClient,
joinChannel,
} from "./temporal/activities";

const { NANGO_SLACK_CONNECTOR_ID, SLACK_CLIENT_ID, SLACK_CLIENT_SECRET } =
process.env;
Expand Down Expand Up @@ -169,15 +173,19 @@ export async function updateSlackConnector(
const updateParams: Parameters<typeof c.update>[0] = {};

if (connectionId) {
const newConnectionRes = await nango_client().getConnection(
NANGO_SLACK_CONNECTOR_ID,
connectionId,
false,
false
);
const newTeamId = newConnectionRes?.team?.id || null;
const accessToken = await getAccessToken(connectionId);
const slackClient = await getSlackClient(accessToken);
const teamInfoRes = await slackClient.team.info();
if (!teamInfoRes.ok || !teamInfoRes.team?.id) {
return new Err({
error: {
message: "Can't get the Slack team information.",
},
});
}

if (!newTeamId || newTeamId !== currentSlackConfig.slackTeamId) {
const newTeamId = teamInfoRes.team.id;
if (newTeamId !== currentSlackConfig.slackTeamId) {
return new Err({
error: {
type: "connector_oauth_target_mismatch",
Expand Down Expand Up @@ -353,20 +361,40 @@ export async function retrieveSlackConnectorPermissions({
break;
}

const slackChannels = await SlackChannel.findAll({
where: filterPermission
? {
connectorId: connectorId,
permission: { [Op.or]: permissionToFilter },
}
: {
connectorId: connectorId,
},
order: [
["createdAt", "DESC"],
["slackChannelName", "ASC"],
],
});
const slackChannels: {
slackChannelId: string;
slackChannelName: string;
permission: ConnectorPermission;
}[] = [];

const accessToken = await getAccessToken(c.connectionId);
const remoteChannels = await getChannels(accessToken, false);
for (const remoteChannel of remoteChannels) {
if (!remoteChannel.id || !remoteChannel.name) {
continue;
}

const permissions =
(
await SlackChannel.findOne({
where: {
connectorId: connectorId,
slackChannelId: remoteChannel.id,
},
})
)?.permission || (remoteChannel.is_member ? "write" : "none");

if (
permissionToFilter.length === 0 ||
permissionToFilter.includes(permissions)
) {
slackChannels.push({
slackChannelId: remoteChannel.id,
slackChannelName: remoteChannel.name,
permission: permissions,
});
}
}

const resources: ConnectorResource[] = slackChannels.map((ch) => ({
provider: "slack",
Expand Down Expand Up @@ -427,17 +455,49 @@ export async function setSlackConnectorPermissions(

let shouldGarbageCollect = false;
for (const [id, permission] of Object.entries(permissions)) {
const channel = channels[id];
let channel = channels[id];
const slackAccessToken = await getAccessToken(connector.connectionId);
const slackClient = await getSlackClient(slackAccessToken);
if (!channel) {
logger.error(
{ connectorId, channelId: id },
"Could not find channel in DB"
);
continue;
const remoteChannel = await slackClient.conversations.info({
channel: id,
});
if (!remoteChannel.ok || !remoteChannel.channel?.name) {
logger.error(
{
connectorId,
channelId: id,
error: remoteChannel.error,
},
"Could not get the Slack channel information"
);
return new Err(
new Error("Could not get the Slack channel information.")
);
}
const joinRes = await joinChannel(connectorId, id);
if (joinRes === "cant_join") {
return new Err(
new Error(
`Our Slack bot (@Dust) was not able to join the Slack channel #${remoteChannel.channel.name}. Please re-authorize Slack or invite @Dust to #${remoteChannel.channel.name} manually.`
)
);
}
const slackChannel = await SlackChannel.create({
connectorId: connectorId,
slackChannelId: id,
slackChannelName: remoteChannel.channel.name,
permission: "none",
});
channels[id] = slackChannel;
channel = slackChannel;
}

promises.push(
q.add(async () => {
if (!channel) {
return;
}
const oldPermission = channel.permission;
if (oldPermission === permission) {
return;
Expand All @@ -451,6 +511,16 @@ export async function setSlackConnectorPermissions(
["read", "read_write"].includes(permission)
) {
// handle read permission enabled
const joinChannelRes = await joinChannel(
connectorId,
channel.slackChannelId
);
if (joinChannelRes === "cant_join") {
throw new Error(
`Our Slack bot (@Dust) was not able to join the Slack channel #${channel.slackChannelName}. Please re-authorize Slack or invite @Dust to #${channel.slackChannelName} manually.`
);
}

const res = await launchSlackBotJoinedWorkflow(
connectorId.toString(),
channel.slackChannelId
Expand Down
60 changes: 55 additions & 5 deletions connectors/src/connectors/slack/temporal/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import {
upsertToDatasource,
} from "@connectors/lib/data_sources";
import { WorkflowError } from "@connectors/lib/error";
import { SlackChannel, SlackMessages } from "@connectors/lib/models";
import {
Connector,
ModelId,
SlackChannel,
SlackMessages,
} from "@connectors/lib/models";
import { getAccessTokenFromNango } from "@connectors/lib/nango_helpers";
import {
reportInitialSyncProgress,
Expand Down Expand Up @@ -56,7 +61,8 @@ const NETWORK_REQUEST_TIMEOUT_MS = 30000;
*/

export async function getChannels(
slackAccessToken: string
slackAccessToken: string,
joinedOnly: boolean
): Promise<Channel[]> {
const client = getSlackClient(slackAccessToken);
const allChannels = [];
Expand All @@ -79,8 +85,10 @@ export async function getChannels(
);
}
for (const channel of c.channels) {
if (channel && channel.id && channel.is_member) {
allChannels.push(channel);
if (channel && channel.id) {
if (!joinedOnly || channel.is_member) {
allChannels.push(channel);
}
}
}
} while (nextCursor);
Expand Down Expand Up @@ -119,6 +127,48 @@ interface SyncChannelRes {
weeksSynced: Record<number, boolean>;
}

export async function joinChannel(
connectorId: ModelId,
channelId: string
): Promise<"ok" | "already_joined" | "cant_join"> {
const connector = await Connector.findByPk(connectorId);
if (!connector) {
throw new Error(`Connector ${connectorId} not found`);
}
const accessToken = await getAccessToken(connector.connectionId);
const client = getSlackClient(accessToken);
try {
const channelInfo = await client.conversations.info({ channel: channelId });
if (channelInfo.channel?.is_member) {
return "already_joined";
}
const joinRes = await client.conversations.join({ channel: channelId });
if (joinRes.ok) {
return "ok";
} else {
return "already_joined";
}
} catch (e) {
const slackError = e as CodedError;
if (slackError.code === ErrorCode.PlatformError) {
const platformError = slackError as WebAPIPlatformError;
if (platformError.data.error === "missing_scope") {
logger.info(
{
channelId,
connectorId,
},
"Could not join channel because of a missing scope. User need to re-authorize it's Slack connection to get the channels:join scope."
);
return "cant_join";
}
throw e;
}
}

return "cant_join";
}

export async function syncChannel(
slackAccessToken: string,
channelId: string,
Expand Down Expand Up @@ -858,7 +908,7 @@ export async function getChannelsToGarbageCollect(
);

const remoteChannels = new Set(
(await getChannels(slackAccessToken))
(await getChannels(slackAccessToken, true))
.filter((c) => c.id)
.map((c) => c.id as string)
);
Expand Down
Loading

0 comments on commit c9c4278

Please sign in to comment.