diff --git a/connectors/migrations/20230725_slack_channel_permissions.ts b/connectors/migrations/20230725_slack_channel_permissions.ts index 4097e4ea80acb..d8a0e6db1aa23 100644 --- a/connectors/migrations/20230725_slack_channel_permissions.ts +++ b/connectors/migrations/20230725_slack_channel_permissions.ts @@ -26,7 +26,7 @@ async function main() { ); const accessToken = await getAccessToken(c.connectionId); - const channelsInSlack = await getChannels(accessToken); + const channelsInSlack = await getChannels(accessToken, true); const channelIdsInSlackSet = new Set( channelsInSlack.map((c) => c.id).filter((id) => id) ); diff --git a/connectors/src/api/webhooks/webhook_slack.ts b/connectors/src/api/webhooks/webhook_slack.ts index d67c8e65c4eea..c0d4d0f847159 100644 --- a/connectors/src/api/webhooks/webhook_slack.ts +++ b/connectors/src/api/webhooks/webhook_slack.ts @@ -6,7 +6,6 @@ import { getBotUserIdMemoized, } from "@connectors/connectors/slack/temporal/activities"; import { - launchSlackBotJoinedWorkflow, launchSlackSyncOneMessageWorkflow, launchSlackSyncOneThreadWorkflow, } from "@connectors/connectors/slack/temporal/client"; @@ -17,7 +16,7 @@ import { SlackChannel, SlackConfiguration, } from "@connectors/lib/models"; -import { Err, Ok } from "@connectors/lib/result"; +import { Ok } from "@connectors/lib/result"; import logger from "@connectors/logger/logger"; import { apiError, withLogging } from "@connectors/logger/withlogging"; @@ -327,76 +326,6 @@ const _webhookSlackAPIHandler = async ( break; } - /** - * `member_joined_channel` handler. - */ - case "member_joined_channel": { - if (!req.body.event?.channel || !req.body.event?.user) { - return apiError(req, res, { - api_error: { - type: "invalid_request_error", - message: - "Missing channel or user in request body for member_joined_channel event", - }, - status_code: 400, - }); - } - const channel = req.body.event.channel; - const user = req.body.event.user; - let err: Error | null = null; - - const results = await Promise.all( - slackConfigurations.map((c) => - (async (): Promise | Ok> => { - const connector = await Connector.findByPk(c.connectorId); - if (!connector) { - return new Err( - new Error(`Connector ${c.connectorId} not found`) - ); - } - - const slackAccessToken = await getAccessToken( - connector.connectionId - ); - const myUserId = await getBotUserIdMemoized(slackAccessToken); - if (myUserId !== user) { - return new Ok(""); - } - return await launchSlackBotJoinedWorkflow( - c.connectorId.toString(), - channel - ); - })() - ) - ); - for (const r of results) { - if (r.isErr()) { - err = r.error; - } - } - - if (err) { - return apiError(req, res, { - api_error: { - type: "internal_server_error", - message: err.message, - }, - status_code: 500, - }); - } else { - logger.info( - { - type: req.body.event.type, - channel: req.body.event.channel, - user: req.body.event.user, - }, - `Successfully processed Slack Webhook` - ); - return res.status(200).send(); - } - break; - } - /** * `channel_left`, `channel_deleted` handler. */ diff --git a/connectors/src/connectors/slack/bot.ts b/connectors/src/connectors/slack/bot.ts index 4ba33a5ff3f63..f9b4e6a65a6bf 100644 --- a/connectors/src/connectors/slack/bot.ts +++ b/connectors/src/connectors/slack/bot.ts @@ -351,7 +351,6 @@ async function botAnswerMessage( } let conversation: ConversationType | undefined = undefined; let userMessage: UserMessageType | undefined = undefined; - if (lastSlackChatBotMessage?.conversationId) { if (buildContentFragmentRes.value) { const contentFragmentRes = await dustAPI.postContentFragment({ @@ -609,7 +608,7 @@ async function makeContentFragment( startingAtTs: string | null, connector: Connector ) { - const slackChannelPromise = slackClient.conversations.info({ + const slackChannelPromise = await slackClient.conversations.info({ channel: channelId, }); let allMessages: Message[] = []; diff --git a/connectors/src/connectors/slack/index.ts b/connectors/src/connectors/slack/index.ts index 19bc61b53692a..3c3e2b919a739 100644 --- a/connectors/src/connectors/slack/index.ts +++ b/connectors/src/connectors/slack/index.ts @@ -1,6 +1,5 @@ import { WebClient } from "@slack/web-api"; import PQueue from "p-queue"; -import { Op } from "sequelize"; import { ConnectorPermissionRetriever } from "@connectors/connectors/interface"; import { @@ -30,7 +29,12 @@ import { ConnectorResource, } from "@connectors/types/resources"; -import { getAccessToken, getSlackClient } from "./temporal/activities"; +import { + getAccessToken, + getChannels, + getSlackClient, + joinChannel, +} from "./temporal/activities"; const { NANGO_SLACK_CONNECTOR_ID, SLACK_CLIENT_ID, SLACK_CLIENT_SECRET } = process.env; @@ -169,15 +173,19 @@ export async function updateSlackConnector( const updateParams: Parameters[0] = {}; if (connectionId) { - const newConnectionRes = await nango_client().getConnection( - NANGO_SLACK_CONNECTOR_ID, - connectionId, - false, - false - ); - const newTeamId = newConnectionRes?.team?.id || null; + const accessToken = await getAccessToken(connectionId); + const slackClient = await getSlackClient(accessToken); + const teamInfoRes = await slackClient.team.info(); + if (!teamInfoRes.ok || !teamInfoRes.team?.id) { + return new Err({ + error: { + message: "Can't get the Slack team information.", + }, + }); + } - if (!newTeamId || newTeamId !== currentSlackConfig.slackTeamId) { + const newTeamId = teamInfoRes.team.id; + if (newTeamId !== currentSlackConfig.slackTeamId) { return new Err({ error: { type: "connector_oauth_target_mismatch", @@ -353,20 +361,40 @@ export async function retrieveSlackConnectorPermissions({ break; } - const slackChannels = await SlackChannel.findAll({ - where: filterPermission - ? { - connectorId: connectorId, - permission: { [Op.or]: permissionToFilter }, - } - : { - connectorId: connectorId, - }, - order: [ - ["createdAt", "DESC"], - ["slackChannelName", "ASC"], - ], - }); + const slackChannels: { + slackChannelId: string; + slackChannelName: string; + permission: ConnectorPermission; + }[] = []; + + const accessToken = await getAccessToken(c.connectionId); + const remoteChannels = await getChannels(accessToken, false); + for (const remoteChannel of remoteChannels) { + if (!remoteChannel.id || !remoteChannel.name) { + continue; + } + + const permissions = + ( + await SlackChannel.findOne({ + where: { + connectorId: connectorId, + slackChannelId: remoteChannel.id, + }, + }) + )?.permission || (remoteChannel.is_member ? "write" : "none"); + + if ( + permissionToFilter.length === 0 || + permissionToFilter.includes(permissions) + ) { + slackChannels.push({ + slackChannelId: remoteChannel.id, + slackChannelName: remoteChannel.name, + permission: permissions, + }); + } + } const resources: ConnectorResource[] = slackChannels.map((ch) => ({ provider: "slack", @@ -427,17 +455,49 @@ export async function setSlackConnectorPermissions( let shouldGarbageCollect = false; for (const [id, permission] of Object.entries(permissions)) { - const channel = channels[id]; + let channel = channels[id]; + const slackAccessToken = await getAccessToken(connector.connectionId); + const slackClient = await getSlackClient(slackAccessToken); if (!channel) { - logger.error( - { connectorId, channelId: id }, - "Could not find channel in DB" - ); - continue; + const remoteChannel = await slackClient.conversations.info({ + channel: id, + }); + if (!remoteChannel.ok || !remoteChannel.channel?.name) { + logger.error( + { + connectorId, + channelId: id, + error: remoteChannel.error, + }, + "Could not get the Slack channel information" + ); + return new Err( + new Error("Could not get the Slack channel information.") + ); + } + const joinRes = await joinChannel(connectorId, id); + if (joinRes === "cant_join") { + return new Err( + new Error( + `Our Slack bot (@Dust) was not able to join the Slack channel #${remoteChannel.channel.name}. Please re-authorize Slack or invite @Dust to #${remoteChannel.channel.name} manually.` + ) + ); + } + const slackChannel = await SlackChannel.create({ + connectorId: connectorId, + slackChannelId: id, + slackChannelName: remoteChannel.channel.name, + permission: "none", + }); + channels[id] = slackChannel; + channel = slackChannel; } promises.push( q.add(async () => { + if (!channel) { + return; + } const oldPermission = channel.permission; if (oldPermission === permission) { return; @@ -451,6 +511,16 @@ export async function setSlackConnectorPermissions( ["read", "read_write"].includes(permission) ) { // handle read permission enabled + const joinChannelRes = await joinChannel( + connectorId, + channel.slackChannelId + ); + if (joinChannelRes === "cant_join") { + throw new Error( + `Our Slack bot (@Dust) was not able to join the Slack channel #${channel.slackChannelName}. Please re-authorize Slack or invite @Dust to #${channel.slackChannelName} manually.` + ); + } + const res = await launchSlackBotJoinedWorkflow( connectorId.toString(), channel.slackChannelId diff --git a/connectors/src/connectors/slack/temporal/activities.ts b/connectors/src/connectors/slack/temporal/activities.ts index d5716110f90b3..d78d2ab103151 100644 --- a/connectors/src/connectors/slack/temporal/activities.ts +++ b/connectors/src/connectors/slack/temporal/activities.ts @@ -25,7 +25,12 @@ import { upsertToDatasource, } from "@connectors/lib/data_sources"; import { WorkflowError } from "@connectors/lib/error"; -import { SlackChannel, SlackMessages } from "@connectors/lib/models"; +import { + Connector, + ModelId, + SlackChannel, + SlackMessages, +} from "@connectors/lib/models"; import { getAccessTokenFromNango } from "@connectors/lib/nango_helpers"; import { reportInitialSyncProgress, @@ -56,7 +61,8 @@ const NETWORK_REQUEST_TIMEOUT_MS = 30000; */ export async function getChannels( - slackAccessToken: string + slackAccessToken: string, + joinedOnly: boolean ): Promise { const client = getSlackClient(slackAccessToken); const allChannels = []; @@ -79,8 +85,10 @@ export async function getChannels( ); } for (const channel of c.channels) { - if (channel && channel.id && channel.is_member) { - allChannels.push(channel); + if (channel && channel.id) { + if (!joinedOnly || channel.is_member) { + allChannels.push(channel); + } } } } while (nextCursor); @@ -119,6 +127,48 @@ interface SyncChannelRes { weeksSynced: Record; } +export async function joinChannel( + connectorId: ModelId, + channelId: string +): Promise<"ok" | "already_joined" | "cant_join"> { + const connector = await Connector.findByPk(connectorId); + if (!connector) { + throw new Error(`Connector ${connectorId} not found`); + } + const accessToken = await getAccessToken(connector.connectionId); + const client = getSlackClient(accessToken); + try { + const channelInfo = await client.conversations.info({ channel: channelId }); + if (channelInfo.channel?.is_member) { + return "already_joined"; + } + const joinRes = await client.conversations.join({ channel: channelId }); + if (joinRes.ok) { + return "ok"; + } else { + return "already_joined"; + } + } catch (e) { + const slackError = e as CodedError; + if (slackError.code === ErrorCode.PlatformError) { + const platformError = slackError as WebAPIPlatformError; + if (platformError.data.error === "missing_scope") { + logger.info( + { + channelId, + connectorId, + }, + "Could not join channel because of a missing scope. User need to re-authorize it's Slack connection to get the channels:join scope." + ); + return "cant_join"; + } + throw e; + } + } + + return "cant_join"; +} + export async function syncChannel( slackAccessToken: string, channelId: string, @@ -858,7 +908,7 @@ export async function getChannelsToGarbageCollect( ); const remoteChannels = new Set( - (await getChannels(slackAccessToken)) + (await getChannels(slackAccessToken, true)) .filter((c) => c.id) .map((c) => c.id as string) ); diff --git a/connectors/src/connectors/slack/temporal/workflows.ts b/connectors/src/connectors/slack/temporal/workflows.ts index 833d98f0d4fa9..bfc9e6a178c8b 100644 --- a/connectors/src/connectors/slack/temporal/workflows.ts +++ b/connectors/src/connectors/slack/temporal/workflows.ts @@ -27,6 +27,7 @@ const { reportInitialSyncProgressActivity, getChannelsToGarbageCollect, deleteChannel, + joinChannel, deleteChannelsFromConnectorDb, } = proxyActivities({ startToCloseTimeout: "10 minutes", @@ -52,7 +53,7 @@ export async function workspaceFullSync( ): Promise { const slackAccessToken = await getAccessToken(nangoConnectionId); await fetchUsers(slackAccessToken, connectorId); - const channels = await getChannels(slackAccessToken); + const channels = await getChannels(slackAccessToken, true); let i = 0; for (const channel of channels) { if (!channel.id || !channel.name) { @@ -88,6 +89,7 @@ export async function syncOneChannel( fromTs: number | null ) { console.log(`Syncing channel ${channelName} (${channelId})`); + await joinChannel(parseInt(connectorId, 10), channelId); const slackAccessToken = await getAccessToken(nangoConnectionId); let messagesCursor: string | undefined = undefined; diff --git a/front/components/ConnectorPermissionsModal.tsx b/front/components/ConnectorPermissionsModal.tsx index d4bd939e90fd3..45d891e7dcc20 100644 --- a/front/components/ConnectorPermissionsModal.tsx +++ b/front/components/ConnectorPermissionsModal.tsx @@ -122,7 +122,8 @@ export default function ConnectorPermissionsModal({ ); if (!r.ok) { - window.alert("An unexpected error occurred"); + const error: { error: { message: string } } = await r.json(); + window.alert(error.error.message); } await mutate( diff --git a/front/pages/api/w/[wId]/data_sources/[name]/managed/permissions/index.ts b/front/pages/api/w/[wId]/data_sources/[name]/managed/permissions/index.ts index 587abccdb043c..19539fbe376cf 100644 --- a/front/pages/api/w/[wId]/data_sources/[name]/managed/permissions/index.ts +++ b/front/pages/api/w/[wId]/data_sources/[name]/managed/permissions/index.ts @@ -182,7 +182,7 @@ async function handler( status_code: 500, api_error: { type: "internal_server_error", - message: `An error occurred while setting the data source permissions.`, + message: connectorsRes.error.error.message, }, }); }