From b7ddfe2c4d32ffba0c0e8b8ddf4e327c98deb4f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Thu, 6 Jul 2023 08:29:47 +0200 Subject: [PATCH 01/30] bugfix --- .../src/service/conversation.service.ts | 10 +++++-- services/libs/sqs/src/client.ts | 14 +++++++-- services/libs/sqs/src/queue.ts | 30 +++++++++++++++++-- 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/services/libs/conversations/src/service/conversation.service.ts b/services/libs/conversations/src/service/conversation.service.ts index 9b99d463ee..e47cbd6694 100644 --- a/services/libs/conversations/src/service/conversation.service.ts +++ b/services/libs/conversations/src/service/conversation.service.ts @@ -248,9 +248,13 @@ export class ConversationService extends LoggerBase { if (conversationSettings.autoPublish.status === 'all') { shouldAutoPublish = true } else if (conversationSettings.autoPublish.status === 'custom') { - shouldAutoPublish = - conversationSettings.autoPublish.channelsByPlatform[platform] && - conversationSettings.autoPublish.channelsByPlatform[platform].includes(channel) + if (conversationSettings.autoPublish.channelsByPlatform) { + const channels = conversationSettings.autoPublish.channelsByPlatform[platform] + if (channels && Array.isArray(channels)) { + shouldAutoPublish = + conversationSettings.autoPublish.channelsByPlatform[platform].includes(channel) + } + } } return shouldAutoPublish } diff --git a/services/libs/sqs/src/client.ts b/services/libs/sqs/src/client.ts index 752960caab..637781eec1 100644 --- a/services/libs/sqs/src/client.ts +++ b/services/libs/sqs/src/client.ts @@ -1,15 +1,18 @@ -import { IS_DEV_ENV, IS_STAGING_ENV } from '@crowd/common' -import { getServiceChildLogger } from '@crowd/logging' import { DeleteMessageCommand, DeleteMessageRequest, ReceiveMessageCommand, ReceiveMessageRequest, SQSClient, + SendMessageBatchCommand, + SendMessageBatchCommandOutput, + SendMessageBatchRequest, SendMessageCommand, SendMessageRequest, SendMessageResult, } from '@aws-sdk/client-sqs' +import { IS_DEV_ENV, IS_STAGING_ENV } from '@crowd/common' +import { getServiceChildLogger } from '@crowd/logging' import { ISqsClientConfig, SqsClient, SqsMessage } from './types' const log = getServiceChildLogger('sqs.client') @@ -67,3 +70,10 @@ export const sendMessage = async ( ): Promise => { return client.send(new SendMessageCommand(params)) } + +export const sendMessagesBulk = async ( + client: SqsClient, + params: SendMessageBatchRequest, +): Promise => { + return client.send(new SendMessageBatchCommand(params)) +} diff --git a/services/libs/sqs/src/queue.ts b/services/libs/sqs/src/queue.ts index 3bebd228b1..c38b796cc6 100644 --- a/services/libs/sqs/src/queue.ts +++ b/services/libs/sqs/src/queue.ts @@ -3,11 +3,12 @@ import { DeleteMessageRequest, GetQueueUrlCommand, ReceiveMessageRequest, + SendMessageBatchRequestEntry, SendMessageRequest, } from '@aws-sdk/client-sqs' -import { IS_PROD_ENV, IS_STAGING_ENV, timeout } from '@crowd/common' +import { IS_PROD_ENV, IS_STAGING_ENV, generateUUIDv1, timeout } from '@crowd/common' import { Logger, LoggerBase } from '@crowd/logging' -import { deleteMessage, receiveMessage, sendMessage } from './client' +import { deleteMessage, receiveMessage, sendMessage, sendMessagesBulk } from './client' import { ISqsQueueConfig, SqsClient, SqsMessage, SqsQueueType } from './types' import { IQueueMessage, ISqsQueueEmitter } from '@crowd/types' @@ -193,4 +194,29 @@ export abstract class SqsQueueEmitter extends SqsQueueBase implements ISqsQueueE await sendMessage(this.sqsClient, params) } + + public async sendMessages( + messages: { payload: T; groupId: string; deduplicationId?: string; id?: string }[], + ): Promise { + if (messages.length > 10) { + throw new Error('Maximum number of messages to send is 10!') + } + const time = new Date().getTime() + + const entries: SendMessageBatchRequestEntry[] = messages.map((msg) => { + return { + Id: msg.id || generateUUIDv1(), + MessageBody: JSON.stringify(msg.payload), + MessageDeduplicationId: this.isFifo + ? msg.deduplicationId || `${msg.groupId}-${time}` + : undefined, + MessageGroupId: msg.groupId, + } + }) + + await sendMessagesBulk(this.sqsClient, { + QueueUrl: this.getQueueUrl(), + Entries: entries, + }) + } } From 8633071bf830979c817e34f187a656aa4a274d16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Thu, 6 Jul 2023 09:57:25 +0200 Subject: [PATCH 02/30] Ignore non-pending results. --- .../apps/data_sink_worker/src/service/dataSink.service.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index 53d0e4f62a..865cb80161 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -58,10 +58,10 @@ export default class DataSinkService extends LoggerBase { }) if (resultInfo.state !== IntegrationResultState.PENDING) { - this.log.error({ actualState: resultInfo.state }, 'Result is not pending.') - await this.triggerResultError(resultId, 'check-result-state', 'Result is not pending.', { - actualState: resultInfo.state, - }) + this.log.warn({ actualState: resultInfo.state }, 'Result is not pending. Skipping.') + // await this.triggerResultError(resultId, 'check-result-state', 'Result is not pending.', { + // actualState: resultInfo.state, + // }) return } From 37d6561357208ee7edfaaf521093327079c0e74f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Thu, 6 Jul 2023 09:59:10 +0200 Subject: [PATCH 03/30] fixes --- .../data_sink_worker/src/bin/restart-failed-results.ts | 2 +- services/apps/data_sink_worker/src/bin/restart-result.ts | 2 +- services/apps/data_sink_worker/src/repo/dataSink.repo.ts | 2 +- .../apps/data_sink_worker/src/service/dataSink.service.ts | 8 +++++++- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/services/apps/data_sink_worker/src/bin/restart-failed-results.ts b/services/apps/data_sink_worker/src/bin/restart-failed-results.ts index 10e818695c..9b383f57ce 100644 --- a/services/apps/data_sink_worker/src/bin/restart-failed-results.ts +++ b/services/apps/data_sink_worker/src/bin/restart-failed-results.ts @@ -28,7 +28,7 @@ setImmediate(async () => { let results = await repo.getFailedResults(runId, 1, 20) while (results.length > 0) { - await repo.resetFailedResults(results.map((r) => r.id)) + await repo.resetResults(results.map((r) => r.id)) for (const result of results) { await emitter.sendMessage( diff --git a/services/apps/data_sink_worker/src/bin/restart-result.ts b/services/apps/data_sink_worker/src/bin/restart-result.ts index 42c665814b..2d11bdc2bf 100644 --- a/services/apps/data_sink_worker/src/bin/restart-result.ts +++ b/services/apps/data_sink_worker/src/bin/restart-result.ts @@ -31,7 +31,7 @@ setImmediate(async () => { log.error(`Result ${resultId} not found!`) process.exit(1) } else { - await repo.resetFailedResults([resultId]) + await repo.resetResults([resultId]) await emitter.sendMessage( `results-${result.tenantId}-${result.platform}`, new ProcessIntegrationResultQueueMessage(result.id), diff --git a/services/apps/data_sink_worker/src/repo/dataSink.repo.ts b/services/apps/data_sink_worker/src/repo/dataSink.repo.ts index 14be2b821e..8ffecc0c92 100644 --- a/services/apps/data_sink_worker/src/repo/dataSink.repo.ts +++ b/services/apps/data_sink_worker/src/repo/dataSink.repo.ts @@ -121,7 +121,7 @@ export default class DataSinkRepository extends RepositoryBase { + public async resetResults(resultIds: string[]): Promise { const result = await this.db().result( `update integration.results set state = $(newState), diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index 865cb80161..0e8e94b262 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -58,7 +58,13 @@ export default class DataSinkService extends LoggerBase { }) if (resultInfo.state !== IntegrationResultState.PENDING) { - this.log.warn({ actualState: resultInfo.state }, 'Result is not pending. Skipping.') + this.log.warn({ actualState: resultInfo.state }, 'Result is not pending.') + if (resultInfo.state === IntegrationResultState.PROCESSED) { + this.log.warn('Result has already been processed. Skipping...') + return + } + + await this.repo.resetResults([resultId]) // await this.triggerResultError(resultId, 'check-result-state', 'Result is not pending.', { // actualState: resultInfo.state, // }) From 73da85c670851b29a95178094bc1be42042a968e Mon Sep 17 00:00:00 2001 From: Igor Kotua <36304232+garrrikkotua@users.noreply.github.com> Date: Thu, 6 Jul 2023 18:00:38 +0300 Subject: [PATCH 04/30] Fix missing segment ids in the conversation service (#1084) --- .../src/service/activity.service.ts | 4 +- .../src/repo/conversation.data.ts | 2 +- .../src/repo/conversation.repo.ts | 46 ++++++++++--- .../src/service/conversation.service.ts | 68 +++++++++++++------ 4 files changed, 86 insertions(+), 34 deletions(-) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index 5b514dff7a..65e90f6cbd 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -80,7 +80,7 @@ export default class ActivityService extends LoggerBase { return id }) await this.nodejsWorkerEmitter.processAutomationForNewActivity(tenantId, id, segmentId) - const affectedIds = await this.conversationService.processActivity(tenantId, id) + const affectedIds = await this.conversationService.processActivity(tenantId, segmentId, id) if (fireSync) { await this.searchSyncWorkerEmitter.triggerMemberSync(tenantId, activity.memberId) @@ -158,7 +158,7 @@ export default class ActivityService extends LoggerBase { }) if (updated) { - await this.conversationService.processActivity(tenantId, id) + await this.conversationService.processActivity(tenantId, segmentId, id) if (fireSync) { await this.searchSyncWorkerEmitter.triggerMemberSync(tenantId, activity.memberId) diff --git a/services/libs/conversations/src/repo/conversation.data.ts b/services/libs/conversations/src/repo/conversation.data.ts index 263c9cd04c..d8f1ad13a6 100644 --- a/services/libs/conversations/src/repo/conversation.data.ts +++ b/services/libs/conversations/src/repo/conversation.data.ts @@ -27,7 +27,7 @@ export const getInsertConversationColumnSet = (instance: DbInstance): DbColumnSe if (insertConversationColumnSet) return insertConversationColumnSet insertConversationColumnSet = new instance.helpers.ColumnSet( - ['id', 'title', 'slug', 'published', 'tenantId', 'createdAt', 'updatedAt'], + ['id', 'title', 'slug', 'published', 'tenantId', 'segmentId', 'createdAt', 'updatedAt'], { table: { table: 'conversations', diff --git a/services/libs/conversations/src/repo/conversation.repo.ts b/services/libs/conversations/src/repo/conversation.repo.ts index 03ea80afc5..ede5b7f991 100644 --- a/services/libs/conversations/src/repo/conversation.repo.ts +++ b/services/libs/conversations/src/repo/conversation.repo.ts @@ -18,6 +18,7 @@ export class ConversationRepository extends RepositoryBase { + public async checkSlugExists( + tenantId: string, + segmentId: string, + slug: string, + ): Promise { const results = await this.db().any( - `select id from conversations where "tenantId" = $(tenantId) and slug = $(slug)`, + `select id from conversations where "tenantId" = $(tenantId) and slug = $(slug) and "segmentId" = $(segmentId)`, { tenantId, slug, + segmentId, }, ) @@ -90,26 +97,36 @@ export class ConversationRepository extends RepositoryBase { + public async getConversation( + tenantId: string, + segmentId: string, + id: string, + ): Promise { const result = await this.db().oneOrNone( - `select id, published from conversations where "tenantId" = $(tenantId) and id = $(id)`, + `select id, published from conversations where "tenantId" = $(tenantId) and "segmentId" = $(segmentId) and id = $(id)`, { tenantId, id, + segmentId, }, ) return result } - public async getActivityData(tenantId: string, activityId: string): Promise { + public async getActivityData( + tenantId: string, + segmentId: string, + activityId: string, + ): Promise { const results = await this.db().one( `select id, "conversationId", "sourceId", "sourceParentId", "parentId", platform, body, title, channel from activities - where "tenantId" = $(tenantId) and id = $(activityId)`, + where "tenantId" = $(tenantId) and id = $(activityId) and "segmentId" = $(segmentId)`, { tenantId, activityId, + segmentId, }, ) return results @@ -117,6 +134,7 @@ export class ConversationRepository extends RepositoryBase { const result = await this.db().result( - `update activities set "conversationId" = $(conversationId) where id = $(activityId) and "tenantId" = $(tenantId)`, + `update activities set "conversationId" = $(conversationId) where id = $(activityId) and "tenantId" = $(tenantId) and "segmentId" = $(segmentId)`, { activityId, conversationId, tenantId, + segmentId, }, ) this.checkUpdateRowCount(result.rowCount, 1) } - public async getConversationCount(tenantId: string): Promise { + public async getConversationCount(tenantId: string, segmentId: string): Promise { const result = await this.db().one( - `select count(id) as count from conversations where "tenantId" = $(tenantId)`, + `select count(id) as count from conversations where "tenantId" = $(tenantId) and "segmentId" = $(segmentId)`, { tenantId, + segmentId, }, ) diff --git a/services/libs/conversations/src/service/conversation.service.ts b/services/libs/conversations/src/service/conversation.service.ts index 9b99d463ee..ade9e15747 100644 --- a/services/libs/conversations/src/service/conversation.service.ts +++ b/services/libs/conversations/src/service/conversation.service.ts @@ -17,10 +17,11 @@ export class ConversationService extends LoggerBase { private async getConversation( tenantId: string, + segmentId: string, id: string, repo: ConversationRepository, ): Promise { - const conversation = await repo.getConversation(tenantId, id) + const conversation = await repo.getConversation(tenantId, segmentId, id) if (!conversation) { throw new Error(`Conversation ${id} does not exist!`) @@ -29,10 +30,15 @@ export class ConversationService extends LoggerBase { return conversation } - public async generateTitle(tenantId: string, title: string, isHtml = false): Promise { + public async generateTitle( + tenantId: string, + segmentId: string, + title: string, + isHtml = false, + ): Promise { if (!title && getCleanString(title).length === 0) { const repo = new ConversationRepository(this.store, this.log) - const count = await repo.getConversationCount(tenantId) + const count = await repo.getConversationCount(tenantId, segmentId) return `conversation-${count}` } @@ -48,7 +54,7 @@ export class ConversationService extends LoggerBase { } static readonly MAX_SLUG_WORD_LENGTH = 10 - public async generateSlug(tenantId: string, title: string): Promise { + public async generateSlug(tenantId: string, segmentId: string, title: string): Promise { // Remove non-standart characters and extra whitespaces const cleanedTitle = getCleanString(title) @@ -68,7 +74,7 @@ export class ConversationService extends LoggerBase { // check generated slug already exists in tenant const repo = new ConversationRepository(this.store, this.log) - let slugExists = await repo.checkSlugExists(tenantId, cleanedSlug) + let slugExists = await repo.checkSlugExists(tenantId, segmentId, cleanedSlug) // generated slug already exists in the tenant, start adding suffixes and re-check if (slugExists) { @@ -78,7 +84,7 @@ export class ConversationService extends LoggerBase { while (slugExists) { const suffixedSlug = `${slugCopy}-${suffix}` - slugExists = await repo.checkSlugExists(tenantId, cleanedSlug) + slugExists = await repo.checkSlugExists(tenantId, segmentId, cleanedSlug) suffix += 1 cleanedSlug = suffixedSlug } @@ -88,23 +94,27 @@ export class ConversationService extends LoggerBase { } // returns activity ids that were changed - public async processActivity(tenantId: string, activityId: string): Promise { + public async processActivity( + tenantId: string, + segmentId: string, + activityId: string, + ): Promise { const repo = new ConversationRepository(this.store, this.log) - const activity = await repo.getActivityData(tenantId, activityId) + const activity = await repo.getActivityData(tenantId, segmentId, activityId) if (activity.parentId) { - const parent = await repo.getActivityData(tenantId, activity.parentId) - return await this.addToConversation(tenantId, activity, parent) + const parent = await repo.getActivityData(tenantId, segmentId, activity.parentId) + return await this.addToConversation(tenantId, segmentId, activity, parent) } else { const ids: string[] = [] await processPaginated( async (page) => { - return repo.getActivities(tenantId, activity.sourceId, page, 10) + return repo.getActivities(tenantId, segmentId, activity.sourceId, page, 10) }, async (activities) => { for (const child of activities) { - const results = await this.addToConversation(tenantId, child, activity) + const results = await this.addToConversation(tenantId, segmentId, child, activity) ids.push(...results) } }, @@ -116,6 +126,7 @@ export class ConversationService extends LoggerBase { public async addToConversation( tenantId: string, + segmentId: string, child: IDbActivityInfo, parent: IDbActivityInfo, ): Promise { @@ -133,43 +144,59 @@ export class ConversationService extends LoggerBase { // check if parent is in a conversation already if (parent.conversationId) { - conversation = await this.getConversation(tenantId, parent.conversationId, txRepo) - await txRepo.setActivityConversationId(tenantId, child.id, parent.conversationId) + conversation = await this.getConversation( + tenantId, + segmentId, + parent.conversationId, + txRepo, + ) + await txRepo.setActivityConversationId(tenantId, segmentId, child.id, parent.conversationId) affectedIds.push(child.id) } // if child is already in a conversation else if (child.conversationId) { - conversation = await this.getConversation(tenantId, child.conversationId, txRepo) + conversation = await this.getConversation(tenantId, segmentId, child.conversationId, txRepo) if (!conversation.published) { const txService = new ConversationService(txStore, this.log) const newConversationTitle = await txService.generateTitle( tenantId, + segmentId, parent.title || parent.body, ConversationService.hasHtmlActivities(parent.platform as PlatformType), ) - const newConversationSlug = await txService.generateSlug(tenantId, newConversationTitle) + const newConversationSlug = await txService.generateSlug( + tenantId, + segmentId, + newConversationTitle, + ) await txRepo.setConversationTitleAndSlug( tenantId, + segmentId, conversation.id, newConversationTitle, newConversationSlug, ) } - await txRepo.setActivityConversationId(tenantId, parent.id, conversation.id) + await txRepo.setActivityConversationId(tenantId, segmentId, parent.id, conversation.id) affectedIds.push(parent.id) } else { // create a new conversation const txService = new ConversationService(txStore, this.log) const conversationTitle = await txService.generateTitle( tenantId, + segmentId, parent.title || parent.body, ConversationService.hasHtmlActivities(parent.platform as PlatformType), ) - const conversationSlug = await txService.generateSlug(tenantId, conversationTitle) + const conversationSlug = await txService.generateSlug( + tenantId, + segmentId, + conversationTitle, + ) const conversationSettings = await txRepo.getConversationSettings(tenantId) const channel = ConversationService.getChannelFromActivity( parent.platform as PlatformType, @@ -184,6 +211,7 @@ export class ConversationService extends LoggerBase { const conversationId = await txRepo.createConversation( tenantId, + segmentId, conversationTitle, published, conversationSlug, @@ -194,8 +222,8 @@ export class ConversationService extends LoggerBase { published, } - await txRepo.setActivityConversationId(tenantId, parent.id, conversationId) - await txRepo.setActivityConversationId(tenantId, child.id, conversationId) + await txRepo.setActivityConversationId(tenantId, segmentId, parent.id, conversationId) + await txRepo.setActivityConversationId(tenantId, segmentId, child.id, conversationId) affectedIds.push(parent.id) affectedIds.push(child.id) } From 848fa90b4e574300b2b3190d87591bdae11f39bd Mon Sep 17 00:00:00 2001 From: Igor Kotua <36304232+garrrikkotua@users.noreply.github.com> Date: Thu, 6 Jul 2023 18:13:18 +0300 Subject: [PATCH 05/30] Migrate Slack integration to the new framework (#1080) --- .../services/integrationProcessor.ts | 2 - .../integrations/slackIntegrationService.ts | 586 ------------------ .../integrations/types/slackTypes.ts | 87 --- backend/src/services/integrationService.ts | 23 +- .../src/repo/integrationData.data.ts | 1 + .../src/repo/integrationData.repo.ts | 1 + .../src/service/integrationDataService.ts | 1 + .../src/repo/integrationRun.data.ts | 1 + .../src/repo/integrationRun.repo.ts | 1 + .../src/service/integrationRunService.ts | 26 + .../config/default.json | 3 + .../config/production.json | 6 +- .../src/repo/integrationStream.data.ts | 1 + .../src/repo/integrationStream.repo.ts | 1 + .../src/service/integrationStreamService.ts | 1 + .../integrations/slack/api}/errorHandler.ts | 11 +- .../integrations/slack/api}/getChannels.ts | 10 +- .../src/integrations/slack/api}/getMember.ts | 10 +- .../src/integrations/slack/api}/getMembers.ts | 10 +- .../integrations/slack/api}/getMessages.ts | 24 +- .../slack/api}/getMessagesInThreads.ts | 18 +- .../src/integrations/slack/api}/getProfile.ts | 10 +- .../src/integrations/slack/api}/getTeam.ts | 13 +- .../src/integrations/slack/generateStreams.ts | 20 + .../src/integrations/slack/index.ts | 27 + .../src/integrations/slack/processData.ts | 176 ++++++ .../src/integrations/slack/processStream.ts | 341 ++++++++++ .../src/integrations/slack/types.ts | 155 +++++ services/libs/integrations/src/types.ts | 11 + services/libs/types/src/integrations.ts | 1 + 30 files changed, 858 insertions(+), 720 deletions(-) delete mode 100644 backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts delete mode 100644 backend/src/serverless/integrations/types/slackTypes.ts rename {backend/src/serverless/integrations/usecases/slack => services/libs/integrations/src/integrations/slack/api}/errorHandler.ts (71%) rename {backend/src/serverless/integrations/usecases/slack => services/libs/integrations/src/integrations/slack/api}/getChannels.ts (73%) rename {backend/src/serverless/integrations/usecases/slack => services/libs/integrations/src/integrations/slack/api}/getMember.ts (82%) rename {backend/src/serverless/integrations/usecases/slack => services/libs/integrations/src/integrations/slack/api}/getMembers.ts (86%) rename {backend/src/serverless/integrations/usecases/slack => services/libs/integrations/src/integrations/slack/api}/getMessages.ts (58%) rename {backend/src/serverless/integrations/usecases/slack => services/libs/integrations/src/integrations/slack/api}/getMessagesInThreads.ts (67%) rename {backend/src/serverless/integrations/usecases/slack => services/libs/integrations/src/integrations/slack/api}/getProfile.ts (82%) rename {backend/src/serverless/integrations/usecases/slack => services/libs/integrations/src/integrations/slack/api}/getTeam.ts (64%) create mode 100644 services/libs/integrations/src/integrations/slack/generateStreams.ts create mode 100644 services/libs/integrations/src/integrations/slack/index.ts create mode 100644 services/libs/integrations/src/integrations/slack/processData.ts create mode 100644 services/libs/integrations/src/integrations/slack/processStream.ts diff --git a/backend/src/serverless/integrations/services/integrationProcessor.ts b/backend/src/serverless/integrations/services/integrationProcessor.ts index 6a6b8de053..0e4c0c5ef6 100644 --- a/backend/src/serverless/integrations/services/integrationProcessor.ts +++ b/backend/src/serverless/integrations/services/integrationProcessor.ts @@ -11,7 +11,6 @@ import { IntegrationTickProcessor } from './integrationTickProcessor' import { DiscordIntegrationService } from './integrations/discordIntegrationService' import { DiscourseIntegrationService } from './integrations/discourseIntegrationService' import { GithubIntegrationService } from './integrations/githubIntegrationService' -import { SlackIntegrationService } from './integrations/slackIntegrationService' import { TwitterIntegrationService } from './integrations/twitterIntegrationService' import { TwitterReachIntegrationService } from './integrations/twitterReachIntegrationService' import { WebhookProcessor } from './webhookProcessor' @@ -32,7 +31,6 @@ export class IntegrationProcessor extends LoggerBase { new DiscordIntegrationService(), new TwitterIntegrationService(), new TwitterReachIntegrationService(), - new SlackIntegrationService(), new GithubIntegrationService(), new DiscourseIntegrationService(), ] diff --git a/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts b/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts deleted file mode 100644 index 78d06b25d7..0000000000 --- a/backend/src/serverless/integrations/services/integrations/slackIntegrationService.ts +++ /dev/null @@ -1,586 +0,0 @@ -import moment from 'moment/moment' -import sanitizeHtml from 'sanitize-html' -import { SLACK_GRID, SLACK_MEMBER_ATTRIBUTES, SlackActivityType } from '@crowd/integrations' -import { RedisCache, getRedisClient } from '@crowd/redis' -import { timeout } from '@crowd/common' -import { IntegrationType, MemberAttributeName, PlatformType } from '@crowd/types' -import { SLACK_CONFIG, REDIS_CONFIG } from '../../../../conf' -import { - IIntegrationStream, - IPendingStream, - IProcessStreamResults, - IStepContext, - IStreamResultOperation, -} from '../../../../types/integration/stepResult' -import { SlackMessages } from '../../types/slackTypes' -import { IntegrationServiceBase } from '../integrationServiceBase' -import MemberAttributeSettingsService from '../../../../services/memberAttributeSettingsService' -import getChannels from '../../usecases/slack/getChannels' -import { Thread } from '../../types/iteratorTypes' -import getMessagesThreads from '../../usecases/slack/getMessagesInThreads' -import getMessages from '../../usecases/slack/getMessages' -import getTeam from '../../usecases/slack/getTeam' -import { AddActivitiesSingle, Member, PlatformIdentities } from '../../types/messageTypes' -import Operations from '../../../dbOperations/operations' -import getMember from '../../usecases/slack/getMember' -import getMembers from '../../usecases/slack/getMembers' - -/* eslint class-methods-use-this: 0 */ - -/* eslint-disable @typescript-eslint/no-unused-vars */ - -/* eslint-disable no-case-declarations */ - -export class SlackIntegrationService extends IntegrationServiceBase { - static maxRetrospect: number = SLACK_CONFIG.maxRetrospectInSeconds || 3600 - - constructor() { - super(IntegrationType.SLACK, 20) - - this.globalLimit = SLACK_CONFIG.globalLimit || 0 - } - - async preprocess(context: IStepContext): Promise { - const redis = await getRedisClient(REDIS_CONFIG, true) - const membersCache = new RedisCache('slack-members', redis, context.logger) - - let channelsFromSlackAPI = await getChannels( - { token: context.integration.token }, - context.logger, - ) - - const channels = context.integration.settings.channels - ? context.integration.settings.channels - : [] - - channelsFromSlackAPI = channelsFromSlackAPI.map((c) => { - if (channels.filter((a) => a.id === c.id).length <= 0) { - return { ...c, new: true } - } - return c - }) - - const team = await getTeam({ token: context.integration.token }, context.logger) - const teamUrl = team.url - - context.pipelineData = { - membersCache, - channels: channelsFromSlackAPI, - team, - teamUrl, - channelsInfo: channelsFromSlackAPI.reduce((acc, channel) => { - acc[channel.id] = { - name: channel.name, - new: !!(channel as any).new, - } - return acc - }, {}), - } - } - - async createMemberAttributes(context: IStepContext): Promise { - const service = new MemberAttributeSettingsService(context.repoContext) - await service.createPredefined(SLACK_MEMBER_ATTRIBUTES) - } - - async getStreams(context: IStepContext): Promise { - const streams = [] - - if (context.onboarding) { - streams.push({ - value: 'members', - metadata: { page: '' }, - }) - } - - const channelStreams = context.pipelineData.channels.map((c) => ({ - value: 'channel', - metadata: { channelId: c.id, page: '', general: c.general }, - })) - if (channelStreams.length > 0) { - streams.push(...channelStreams) - } - - return streams - } - - async processStream( - stream: IIntegrationStream, - context: IStepContext, - ): Promise { - await timeout(1000) - - const operations: IStreamResultOperation[] = [] - let nextPage: string - let newStreams: IPendingStream[] - let lastRecord - - switch (stream.value) { - case 'channel': { - const result = await getMessages( - { - channelId: stream.metadata.channelId, - page: stream.metadata.page, - perPage: 200, - token: context.integration.token, - }, - context.logger, - ) - - nextPage = result.nextPage - - if (result.records.length > 0) { - const { activities, additionalStreams } = await this.parseActivities( - result.records, - stream, - context, - ) - - operations.push({ - type: Operations.UPSERT_ACTIVITIES_WITH_MEMBERS, - records: activities, - }) - newStreams = additionalStreams - lastRecord = activities.length > 0 ? activities[activities.length - 1] : undefined - } - - break - } - case 'threads': { - const result = await getMessagesThreads( - { - token: context.integration.token, - channelId: stream.metadata.channelId, - page: stream.metadata.page, - perPage: 200, - threadId: stream.metadata.threadId, - }, - context.logger, - ) - - nextPage = result.nextPage - - if (result.records.length > 0) { - const { activities, additionalStreams } = await this.parseActivities( - result.records, - stream, - context, - ) - - operations.push({ - type: Operations.UPSERT_ACTIVITIES_WITH_MEMBERS, - records: activities, - }) - newStreams = additionalStreams - lastRecord = activities.length > 0 ? activities[activities.length - 1] : undefined - } - break - } - case 'members': { - const result = await getMembers( - { - token: context.integration.token, - page: stream.metadata.page, - perPage: 200, - teamId: context.pipelineData.team.id, - }, - context.logger, - ) - - nextPage = result.nextPage - if (result.records.length > 0) { - const { activities } = await this.parseActivities(result.records, stream, context) - - operations.push({ - type: Operations.UPSERT_ACTIVITIES_WITH_MEMBERS, - records: activities, - }) - } - break - } - - default: - throw new Error(`Unknown stream value ${stream.value}!`) - } - - const nextPageStream: IPendingStream = nextPage - ? { value: stream.value, metadata: { ...(stream.metadata || {}), page: nextPage } } - : undefined - - return { - operations, - lastRecord, - lastRecordTimestamp: lastRecord ? lastRecord.timestamp.getTime() : undefined, - newStreams, - nextPageStream, - } - } - - async postprocess(context: IStepContext): Promise { - // Strip the new property from channels - context.integration.settings.channels = context.pipelineData.channels.map((ch) => { - const { new: _, ...raw } = ch - return raw - }) - } - - private async parseActivities( - records: any[], - stream: IIntegrationStream, - context: IStepContext, - ): Promise<{ activities: AddActivitiesSingle[]; additionalStreams: IPendingStream[] }> { - switch (stream.value) { - case 'members': { - const members = await this.parseMembers(records, context) - return { - activities: members, - additionalStreams: [], - } - } - case 'threads': - const parseMessagesInThreadsResult = await this.parseMessagesInThreads( - records, - stream, - context, - ) - return { - activities: parseMessagesInThreadsResult.activities, - additionalStreams: parseMessagesInThreadsResult.additionalStreams, - } - default: - const parseMessagesResult = await this.parseMessages(records, stream, context) - return { - activities: parseMessagesResult.activities, - additionalStreams: parseMessagesResult.additionalStreams, - } - } - } - - /** - * Get the URL for a Slack message - * @param stream Stream we are parsing - * @param pipelineData Pipeline data - * @param record Message record - * @returns Return the url: workspaceUrl + channelUrl + messageUrl - */ - private static getUrl(stream, pipelineData, record) { - const channelId = stream.metadata.channelId - return `${pipelineData.teamUrl}archives/${channelId}/p${record.ts.replace('.', '')}` - } - - private static parseMember(record: any, context: IStepContext): Member { - const member: Member = { - displayName: record.profile.real_name, - username: { - [PlatformType.SLACK]: { - username: record.name, - integrationId: context.integration.id, - sourceId: record.id, - }, - } as PlatformIdentities, - emails: record.profile.email ? [record.profile.email] : [], - attributes: { - [MemberAttributeName.SOURCE_ID]: { - [PlatformType.SLACK]: record.id, - }, - ...(record.profile.image_72 && { - [MemberAttributeName.AVATAR_URL]: { - [PlatformType.SLACK]: record.profile.image_72, - }, - }), - ...(record.tz_label && { - [MemberAttributeName.TIMEZONE]: { - [PlatformType.SLACK]: record.tz_label, - }, - }), - ...(record.profile.title && { - [MemberAttributeName.JOB_TITLE]: { - [PlatformType.SLACK]: record.profile.title, - }, - }), - }, - } - - ;(member as any).platform = PlatformType.SLACK - - return member - } - - private static async getMember(userId: string, context: IStepContext): Promise { - const membersCache: RedisCache = context.pipelineData.membersCache - - const cached = await membersCache.get(userId) - if (cached) { - if (cached === 'null') { - return undefined - } - - return JSON.parse(cached) - } - const result = await getMember({ token: context.integration.token, userId }, context.logger) - - const member = result.records - - if (member) { - await membersCache.set(userId, JSON.stringify(member), 24 * 60 * 60) - - return member - } - - await membersCache.set(userId, 'null', 24 * 60 * 60) - return undefined - } - - private async fetchAndParseMember(context: IStepContext, userId: string): Promise { - try { - if (userId === undefined) { - return undefined - } - - const record = await SlackIntegrationService.getMember(userId, context) - - if (!record || record.is_bot) { - return undefined - } - - return SlackIntegrationService.parseMember(record, context) - } catch (e) { - context.logger.error('Error getting member in Slack', { userId }) - throw e - } - } - - /** - * Map the messages coming from Slack to activities and members - * @param records List of records coming from the API - * @param stream - * @param context - * @returns List of activities and members - */ - private async parseMessages( - records: SlackMessages, - stream: IIntegrationStream, - context: IStepContext, - ): Promise<{ activities: AddActivitiesSingle[]; additionalStreams: IPendingStream[] }> { - const newStreams: IPendingStream[] = [] - const activities: AddActivitiesSingle[] = [] - - for (const record of records) { - const member = await this.fetchAndParseMember(context, record.user) - - if (member !== undefined) { - let body = record.text - ? await SlackIntegrationService.removeMentions(record.text, context) - : '' - - let activityType - let score - let isContribution - let sourceId - if (record.subtype === 'channel_join') { - activityType = 'channel_joined' - score = SLACK_GRID[SlackActivityType.JOINED_CHANNEL].score - isContribution = SLACK_GRID[SlackActivityType.JOINED_CHANNEL].isContribution - body = undefined - sourceId = record.user - } else { - activityType = 'message' - score = SLACK_GRID[SlackActivityType.MESSAGE].score - isContribution = SLACK_GRID[SlackActivityType.MESSAGE].isContribution - sourceId = record.ts - } - activities.push({ - username: member.username[PlatformType.SLACK].username, - tenant: context.integration.tenantId, - platform: PlatformType.SLACK, - type: activityType, - sourceId, - sourceParentId: '', - timestamp: moment(parseInt(record.ts, 10) * 1000) - .utc() - .toDate(), - body, - url: SlackIntegrationService.getUrl(stream, context.pipelineData, record), - channel: context.pipelineData.channelsInfo[stream.metadata.channelId].name, - attributes: { - thread: false, - reactions: record.reactions ? record.reactions : [], - attachments: record.attachments ? record.attachments : [], - }, - score, - isContribution, - member, - }) - if (record.thread_ts) { - newStreams.push({ - value: 'threads', - metadata: { - page: '', - threadId: record.thread_ts, - channel: context.pipelineData.channelsInfo[stream.metadata.channelId].name, - channelId: stream.metadata.channelId, - placeholder: body, - new: context.pipelineData.channelsInfo[stream.metadata.channelId].new, - }, - }) - } - } - } - - return { - activities, - additionalStreams: newStreams, - } - } - - private async parseMembers( - records: any[], - context: IStepContext, - ): Promise { - const activities: AddActivitiesSingle[] = [] - for (const record of records) { - if (record.is_bot) { - // eslint-disable-next-line no-continue - continue - } - - const member = SlackIntegrationService.parseMember(record, context) - - activities.push({ - username: member.username[PlatformType.SLACK].username, - tenant: context.integration.tenantId, - platform: PlatformType.SLACK, - type: 'channel_joined', - sourceId: record.id, - timestamp: moment('1970-01-01T00:00:00+00:00').utc().toDate(), - body: undefined, - attributes: { - thread: false, - reactions: record.reactions ? record.reactions : [], - attachments: record.attachments ? record.attachments : [], - }, - score: SLACK_GRID[SlackActivityType.JOINED_CHANNEL].score, - isContribution: SLACK_GRID[SlackActivityType.JOINED_CHANNEL].isContribution, - member, - }) - } - return activities - } - - /** - * Map the messages coming from Slack to activities and members records to the format of the message to add activities and members - * @param records List of records coming from the API - * @param stream - * @param context - * @returns List of activities and members - */ - private async parseMessagesInThreads( - records: SlackMessages, - stream: IIntegrationStream, - context: IStepContext, - ): Promise<{ activities: AddActivitiesSingle[]; additionalStreams: IIntegrationStream[] }> { - const threadInfo = stream.metadata - const activities: AddActivitiesSingle[] = [] - for (const record of records) { - const member = await this.fetchAndParseMember(context, record.user) - if (member !== undefined) { - const body = record.text - ? await SlackIntegrationService.removeMentions(record.text, context) - : '' - activities.push({ - username: member.username[PlatformType.SLACK].username, - tenant: context.integration.tenantId, - platform: PlatformType.SLACK, - type: 'message', - sourceId: record.ts, - sourceParentId: threadInfo.threadId, - timestamp: moment(parseInt(record.ts, 10) * 1000) - .utc() - .toDate(), - body, - url: SlackIntegrationService.getUrl(stream, context.pipelineData, record), - channel: threadInfo.channel, - attributes: { - thread: { - body: sanitizeHtml(threadInfo.placeholder), - id: threadInfo.threadId, - }, - reactions: record.reactions ? record.reactions : [], - attachments: record.attachments ? record.attachments : [], - }, - member, - score: SLACK_GRID[SlackActivityType.MESSAGE].score, - isContribution: SLACK_GRID[SlackActivityType.MESSAGE].isContribution, - }) - } - } - - return { - activities, - additionalStreams: [], - } - } - - /** - * Parse mentions - * @param text Message text - * @param context - * @returns Message text, swapping mention IDs by mentions - */ - private static async removeMentions(text: string, context: IStepContext): Promise { - const regex = /<@!?[^>]*>/ - const globalRegex = /<@!?[^>]*>/g - const matches = text.match(globalRegex) - if (matches) { - for (let match of matches) { - match = match.replace('<', '').replace('>', '').replace('@', '').replace('!', '') - - const user = await SlackIntegrationService.getMember(match, context) - const username = user ? user.name : 'mention' - text = text.replace(regex, `@${username}`) - } - } - - return text - } - - async isProcessingFinished( - context: IStepContext, - currentStream: IIntegrationStream, - lastOperations: IStreamResultOperation[], - lastRecord?: any, - lastRecordTimestamp?: number, - ): Promise { - switch (currentStream.value) { - case 'members': - if (lastRecord === undefined) return true - - return lastRecord.sourceId in context.pipelineData.members - case 'threads': - if ((currentStream.metadata as Thread).new) { - return false - } - - if (lastRecordTimestamp === undefined) return true - - return IntegrationServiceBase.isRetrospectOver( - lastRecordTimestamp, - context.startTimestamp, - SlackIntegrationService.maxRetrospect, - ) - - default: - if (context.pipelineData.channelsInfo[currentStream.metadata.channelId].new) { - return false - } - - if (lastRecordTimestamp === undefined) return true - - return IntegrationServiceBase.isRetrospectOver( - lastRecordTimestamp, - context.startTimestamp, - SlackIntegrationService.maxRetrospect, - ) - } - } -} diff --git a/backend/src/serverless/integrations/types/slackTypes.ts b/backend/src/serverless/integrations/types/slackTypes.ts deleted file mode 100644 index 327cb8a45d..0000000000 --- a/backend/src/serverless/integrations/types/slackTypes.ts +++ /dev/null @@ -1,87 +0,0 @@ -export interface SlackGetChannelsInput { - token: string -} - -export interface SlackGetMessagesInput { - channelId: string - token: string - page: string | undefined - perPage: number | 100 -} - -export interface SlackGetMessagesInThreadsInput { - channelId: string - threadId: string - token: string - page: string | undefined - perPage: number | 100 -} - -export interface SlackGetMembersInput { - token: string - teamId: string - page: string | undefined - perPage: number | 100 -} - -export interface SlackGetMemberInput { - token: string - userId: string -} - -export interface SlackChannel { - id: string - name: string - is_member?: boolean - is_general: boolean -} - -export interface SlackTeam { - id: string - name: string - url: string - domain: string -} - -export type SlackChannels = SlackChannel[] - -export interface SlackMessage { - ts: string - type: string - text: string - subtype?: string - reactions?: any - attachments?: any - user: string - thread_ts?: string -} - -export type SlackMessages = SlackMessage[] - -export interface SlackMember { - id: string - name: string - tz_label: string - is_bot: boolean - profile: { - title: string - real_name: string - image_72: string - email: string - } -} - -export type SlackMembers = SlackMember[] - -export interface SlackParsedResponse { - records: any - nextPage: string -} - -export interface SlackGetMembersOutput extends SlackParsedResponse { - records: SlackMembers | [] -} - -export interface SlackGetMemberOutput extends SlackParsedResponse { - records: SlackMember -} diff --git a/backend/src/services/integrationService.ts b/backend/src/services/integrationService.ts index 3730470df2..afba2b93ea 100644 --- a/backend/src/services/integrationService.ts +++ b/backend/src/services/integrationService.ts @@ -703,9 +703,9 @@ export default class IntegrationService { const transaction = await SequelizeRepository.createTransaction(this.options) let integration - let run try { + this.options.log.info('Creating Slack integration!') integration = await this.createOrUpdate( { platform: PlatformType.SLACK, @@ -715,23 +715,24 @@ export default class IntegrationService { transaction, ) - const isOnboarding: boolean = !('channels' in integration.settings) - - run = await new IntegrationRunRepository({ ...this.options, transaction }).create({ - integrationId: integration.id, - tenantId: integration.tenantId, - onboarding: isOnboarding, - state: IntegrationRunState.PENDING, - }) await SequelizeRepository.commitTransaction(transaction) } catch (err) { await SequelizeRepository.rollbackTransaction(transaction) throw err } - await sendNodeWorkerMessage( + this.options.log.info( + { tenantId: integration.tenantId }, + 'Sending Slack message to int-run-worker!', + ) + + const isOnboarding: boolean = !('channels' in integration.settings) + const emitter = await getIntegrationRunWorkerEmitter() + await emitter.triggerIntegrationRun( integration.tenantId, - new NodeWorkerIntegrationProcessMessage(run.id), + integration.platform, + integration.id, + isOnboarding, ) return integration diff --git a/services/apps/integration_data_worker/src/repo/integrationData.data.ts b/services/apps/integration_data_worker/src/repo/integrationData.data.ts index 2e3ac04684..fa93078e8b 100644 --- a/services/apps/integration_data_worker/src/repo/integrationData.data.ts +++ b/services/apps/integration_data_worker/src/repo/integrationData.data.ts @@ -6,6 +6,7 @@ export interface IApiDataInfo { integrationType: string integrationState: IntegrationState integrationIdentifier: string | null + integrationToken: string | null runState: IntegrationRunState streamId: string runId: string diff --git a/services/apps/integration_data_worker/src/repo/integrationData.repo.ts b/services/apps/integration_data_worker/src/repo/integrationData.repo.ts index ecafbeee6a..ca0e58925b 100644 --- a/services/apps/integration_data_worker/src/repo/integrationData.repo.ts +++ b/services/apps/integration_data_worker/src/repo/integrationData.repo.ts @@ -20,6 +20,7 @@ export default class IntegrationDataRepository extends RepositoryBase s.type === runInfo.integrationType, + ) + + if (service.postProcess) { + let settings = runInfo.integrationSettings as object + const newSettings = service.postProcess(settings) + + if (newSettings) { + settings = { ...settings, ...newSettings } + await this.updateIntegrationSettings(runId, settings) + } + + this.log.info('Finished post processing integration settings!') + } else { + this.log.info('Integration does not have post processing!') + } + } catch (err) { + this.log.error({ err }, 'Error while post processing integration settings!') + } + await this.repo.markRunProcessed(runId) await this.repo.markIntegration(runId, 'done') @@ -327,6 +352,7 @@ export default class IntegrationRunService extends LoggerBase { platform: runInfo.integrationType, status: runInfo.integrationState, settings: runInfo.integrationSettings, + token: runInfo.integrationToken, }, log: this.log, diff --git a/services/apps/integration_stream_worker/config/default.json b/services/apps/integration_stream_worker/config/default.json index 38c63e21c9..fb9e4f8f91 100644 --- a/services/apps/integration_stream_worker/config/default.json +++ b/services/apps/integration_stream_worker/config/default.json @@ -5,5 +5,8 @@ "nango": {}, "worker": { "maxStreamRetries": 5 + }, + "slack": { + "maxRetrospectInSeconds": 86400 } } diff --git a/services/apps/integration_stream_worker/config/production.json b/services/apps/integration_stream_worker/config/production.json index 0967ef424b..67928f018b 100644 --- a/services/apps/integration_stream_worker/config/production.json +++ b/services/apps/integration_stream_worker/config/production.json @@ -1 +1,5 @@ -{} +{ + "slack": { + "maxRetrospectInSeconds": 86400 + } +} diff --git a/services/apps/integration_stream_worker/src/repo/integrationStream.data.ts b/services/apps/integration_stream_worker/src/repo/integrationStream.data.ts index 4683ca9699..1655f90238 100644 --- a/services/apps/integration_stream_worker/src/repo/integrationStream.data.ts +++ b/services/apps/integration_stream_worker/src/repo/integrationStream.data.ts @@ -6,6 +6,7 @@ export interface IStreamData { integrationType: string integrationState: IntegrationState integrationIdentifier: string | null + integrationToken: string | null runState: IntegrationRunState runId: string tenantId: string diff --git a/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts b/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts index 072ecc44f4..e80b8781c5 100644 --- a/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts +++ b/services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts @@ -69,6 +69,7 @@ export default class IntegrationStreamRepository extends RepositoryBase, - input: any, + config: AxiosRequestConfig, + input: SlackGetMessagesInput | SlackGetChannelsInput | SlackGetMembersInput, logger: Logger, -): any => { +) => { const queryParams: string[] = [] if (config.params) { for (const [key, value] of Object.entries(config.params)) { - queryParams.push(`${key}=${encodeURIComponent(value as any)}`) + queryParams.push(`${key}=${encodeURIComponent(value as string)}`) } } diff --git a/backend/src/serverless/integrations/usecases/slack/getChannels.ts b/services/libs/integrations/src/integrations/slack/api/getChannels.ts similarity index 73% rename from backend/src/serverless/integrations/usecases/slack/getChannels.ts rename to services/libs/integrations/src/integrations/slack/api/getChannels.ts index 0c1f61064c..114d2d684d 100644 --- a/backend/src/serverless/integrations/usecases/slack/getChannels.ts +++ b/services/libs/integrations/src/integrations/slack/api/getChannels.ts @@ -1,13 +1,15 @@ import axios, { AxiosRequestConfig } from 'axios' -import { Logger } from '@crowd/logging' import { timeout } from '@crowd/common' import { handleSlackError } from './errorHandler' -import { SlackChannels, SlackGetChannelsInput } from '../../types/slackTypes' +import { SlackChannels, SlackGetChannelsInput } from '../types' +import { IProcessStreamContext } from '@/types' -async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promise { +async function getChannels(input: SlackGetChannelsInput, ctx: IProcessStreamContext) { await timeout(2000) - const config: AxiosRequestConfig = { + const logger = ctx.log + + const config: AxiosRequestConfig = { method: 'get', url: 'https://slack.com/api/conversations.list', params: { diff --git a/backend/src/serverless/integrations/usecases/slack/getMember.ts b/services/libs/integrations/src/integrations/slack/api/getMember.ts similarity index 82% rename from backend/src/serverless/integrations/usecases/slack/getMember.ts rename to services/libs/integrations/src/integrations/slack/api/getMember.ts index 4a3efa4fd4..3bfefed71b 100644 --- a/backend/src/serverless/integrations/usecases/slack/getMember.ts +++ b/services/libs/integrations/src/integrations/slack/api/getMember.ts @@ -1,16 +1,18 @@ import axios, { AxiosRequestConfig } from 'axios' -import { Logger } from '@crowd/logging' import { timeout } from '@crowd/common' -import { SlackGetMemberInput, SlackGetMemberOutput } from '../../types/slackTypes' +import { SlackGetMemberInput, SlackGetMemberOutput } from '../types' import { handleSlackError } from './errorHandler' +import { IProcessStreamContext } from '@/types' async function getMembers( input: SlackGetMemberInput, - logger: Logger, + ctx: IProcessStreamContext, ): Promise { await timeout(2000) - const config: AxiosRequestConfig = { + const logger = ctx.log + + const config: AxiosRequestConfig = { method: 'get', url: `https://slack.com/api/users.info`, params: { diff --git a/backend/src/serverless/integrations/usecases/slack/getMembers.ts b/services/libs/integrations/src/integrations/slack/api/getMembers.ts similarity index 86% rename from backend/src/serverless/integrations/usecases/slack/getMembers.ts rename to services/libs/integrations/src/integrations/slack/api/getMembers.ts index c7259507dd..b855a9a27b 100644 --- a/backend/src/serverless/integrations/usecases/slack/getMembers.ts +++ b/services/libs/integrations/src/integrations/slack/api/getMembers.ts @@ -1,16 +1,18 @@ import axios, { AxiosRequestConfig } from 'axios' -import { Logger } from '@crowd/logging' import { timeout } from '@crowd/common' -import { SlackGetMembersInput, SlackGetMembersOutput, SlackMembers } from '../../types/slackTypes' +import { SlackGetMembersInput, SlackGetMembersOutput, SlackMembers } from '../types' import { handleSlackError } from './errorHandler' +import { IProcessStreamContext } from '@/types' async function getMembers( input: SlackGetMembersInput, - logger: Logger, + ctx: IProcessStreamContext, ): Promise { await timeout(2000) - const config: AxiosRequestConfig = { + const logger = ctx.log + + const config: AxiosRequestConfig = { method: 'get', url: 'https://slack.com/api/users.list', params: {}, diff --git a/backend/src/serverless/integrations/usecases/slack/getMessages.ts b/services/libs/integrations/src/integrations/slack/api/getMessages.ts similarity index 58% rename from backend/src/serverless/integrations/usecases/slack/getMessages.ts rename to services/libs/integrations/src/integrations/slack/api/getMessages.ts index 1e712e1396..6c32ee1619 100644 --- a/backend/src/serverless/integrations/usecases/slack/getMessages.ts +++ b/services/libs/integrations/src/integrations/slack/api/getMessages.ts @@ -1,16 +1,26 @@ import axios, { AxiosRequestConfig } from 'axios' -import { Logger } from '@crowd/logging' import { timeout } from '@crowd/common' -import { SlackGetMessagesInput, SlackMessages, SlackParsedResponse } from '../../types/slackTypes' +import { + SlackGetMessagesInput, + SlackMessages, + SlackParsedResponse, + ISlackPlatformSettings, +} from '../types' import { handleSlackError } from './errorHandler' +import { IProcessStreamContext } from '@/types' async function getMessages( input: SlackGetMessagesInput, - logger: Logger, + ctx: IProcessStreamContext, ): Promise { await timeout(2000) - const config: AxiosRequestConfig = { + const logger = ctx.log + + const platformSettings = ctx.platformSettings as ISlackPlatformSettings + const maxRetrospectInSeconds = platformSettings.maxRetrospectInSeconds + + const config: AxiosRequestConfig = { method: 'get', url: `https://slack.com/api/conversations.history`, params: { @@ -29,6 +39,12 @@ async function getMessages( config.params.limit = input.perPage } + if (!ctx.onboarding && !input.new) { + // we don't want to get messages older than maxRetrospectInSeconds during incremental sync + // but if it's a completely new channel, we want to get all messages + config.params.oldest = new Date(Date.now() - maxRetrospectInSeconds * 1000).getTime() / 1000 + } + try { const response = await axios(config) const records: SlackMessages = response.data.messages diff --git a/backend/src/serverless/integrations/usecases/slack/getMessagesInThreads.ts b/services/libs/integrations/src/integrations/slack/api/getMessagesInThreads.ts similarity index 67% rename from backend/src/serverless/integrations/usecases/slack/getMessagesInThreads.ts rename to services/libs/integrations/src/integrations/slack/api/getMessagesInThreads.ts index d33ea939b7..242e31cb73 100644 --- a/backend/src/serverless/integrations/usecases/slack/getMessagesInThreads.ts +++ b/services/libs/integrations/src/integrations/slack/api/getMessagesInThreads.ts @@ -1,20 +1,25 @@ import axios, { AxiosRequestConfig } from 'axios' -import { Logger } from '@crowd/logging' import { timeout } from '@crowd/common' import { SlackGetMessagesInThreadsInput, SlackMessages, SlackParsedResponse, -} from '../../types/slackTypes' + ISlackPlatformSettings, +} from '../types' import { handleSlackError } from './errorHandler' +import { IProcessStreamContext } from '@/types' async function getMessagesInThreads( input: SlackGetMessagesInThreadsInput, - logger: Logger, + ctx: IProcessStreamContext, ): Promise { await timeout(2000) - const config: AxiosRequestConfig = { + const logger = ctx.log + const platformSettings = ctx.platformSettings as ISlackPlatformSettings + const maxRetrospectInSeconds = platformSettings.maxRetrospectInSeconds + + const config: AxiosRequestConfig = { method: 'get', url: `https://slack.com/api/conversations.replies`, params: { @@ -34,6 +39,11 @@ async function getMessagesInThreads( config.params.limit = input.perPage } + if (!ctx.onboarding && !input.new) { + // we don't want to get messages older than maxRetrospectInSeconds + config.params.oldest = new Date(Date.now() - maxRetrospectInSeconds * 1000).getTime() / 1000 + } + try { const response = await axios(config) const records: SlackMessages = response.data.messages diff --git a/backend/src/serverless/integrations/usecases/slack/getProfile.ts b/services/libs/integrations/src/integrations/slack/api/getProfile.ts similarity index 82% rename from backend/src/serverless/integrations/usecases/slack/getProfile.ts rename to services/libs/integrations/src/integrations/slack/api/getProfile.ts index 42fd929517..4a33a1e6b8 100644 --- a/backend/src/serverless/integrations/usecases/slack/getProfile.ts +++ b/services/libs/integrations/src/integrations/slack/api/getProfile.ts @@ -1,16 +1,18 @@ import axios, { AxiosRequestConfig } from 'axios' -import { Logger } from '@crowd/logging' import { timeout } from '@crowd/common' -import { SlackGetMemberInput, SlackGetMemberOutput } from '../../types/slackTypes' +import { SlackGetMemberInput, SlackGetMemberOutput } from '../types' import { handleSlackError } from './errorHandler' +import { IProcessStreamContext } from '@/types' async function getMembers( input: SlackGetMemberInput, - logger: Logger, + ctx: IProcessStreamContext, ): Promise { await timeout(2000) - const config: AxiosRequestConfig = { + const logger = ctx.log + + const config: AxiosRequestConfig = { method: 'get', url: `https://slack.com/api/users.profile.get`, params: { diff --git a/backend/src/serverless/integrations/usecases/slack/getTeam.ts b/services/libs/integrations/src/integrations/slack/api/getTeam.ts similarity index 64% rename from backend/src/serverless/integrations/usecases/slack/getTeam.ts rename to services/libs/integrations/src/integrations/slack/api/getTeam.ts index d02c92054a..a90f38cae5 100644 --- a/backend/src/serverless/integrations/usecases/slack/getTeam.ts +++ b/services/libs/integrations/src/integrations/slack/api/getTeam.ts @@ -1,13 +1,18 @@ import axios, { AxiosRequestConfig } from 'axios' -import { Logger } from '@crowd/logging' import { timeout } from '@crowd/common' -import { SlackGetChannelsInput, SlackTeam } from '../../types/slackTypes' +import { SlackGetChannelsInput, SlackTeam } from '../types' import { handleSlackError } from './errorHandler' +import { IProcessStreamContext } from '@/types' -async function getChannels(input: SlackGetChannelsInput, logger: Logger): Promise { +async function getChannels( + input: SlackGetChannelsInput, + ctx: IProcessStreamContext, +): Promise { await timeout(2000) - const config: AxiosRequestConfig = { + const logger = ctx.log + + const config: AxiosRequestConfig = { method: 'get', url: 'https://slack.com/api/team.info', headers: { diff --git a/services/libs/integrations/src/integrations/slack/generateStreams.ts b/services/libs/integrations/src/integrations/slack/generateStreams.ts new file mode 100644 index 0000000000..7c9c97f8f5 --- /dev/null +++ b/services/libs/integrations/src/integrations/slack/generateStreams.ts @@ -0,0 +1,20 @@ +import { GenerateStreamsHandler } from '../../types' +import { ISlackIntegrationSettings, SlackStreamType, ISlackRootStreamData } from './types' + +const handler: GenerateStreamsHandler = async (ctx) => { + const settings = ctx.integration.settings as ISlackIntegrationSettings + const token = ctx.integration.token + + const channels = settings?.channels || [] + + if (!token) { + await ctx.abortRunWithError('No Slack token found, aborting run!') + } + + await ctx.publishStream(`${SlackStreamType.ROOT}`, { + token, + channels, + }) +} + +export default handler diff --git a/services/libs/integrations/src/integrations/slack/index.ts b/services/libs/integrations/src/integrations/slack/index.ts new file mode 100644 index 0000000000..7ed5820950 --- /dev/null +++ b/services/libs/integrations/src/integrations/slack/index.ts @@ -0,0 +1,27 @@ +import { IIntegrationDescriptor } from '../../types' +import generateStreams from './generateStreams' +import { SLACK_MEMBER_ATTRIBUTES } from './memberAttributes' +import processStream from './processStream' +import processData from './processData' +import { PlatformType } from '@crowd/types' + +const descriptor: IIntegrationDescriptor = { + type: PlatformType.SLACK, + memberAttributes: SLACK_MEMBER_ATTRIBUTES, + checkEvery: 20, + generateStreams, + processStream, + processData, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + postProcess: (settings: any) => { + const copy = { ...settings } + copy.channels = settings.channels.map((ch) => { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { new: _, ...raw } = ch + return raw + }) + return copy + }, +} + +export default descriptor diff --git a/services/libs/integrations/src/integrations/slack/processData.ts b/services/libs/integrations/src/integrations/slack/processData.ts new file mode 100644 index 0000000000..9ea90f19af --- /dev/null +++ b/services/libs/integrations/src/integrations/slack/processData.ts @@ -0,0 +1,176 @@ +import { ProcessDataHandler, IProcessDataContext } from '@/types' +import { ISlackAPIData, SlackActivityType, SlackMember, SlackMessage } from './types' +import { SLACK_GRID } from './grid' +import { IActivityData, IMemberData, MemberAttributeName, PlatformType } from '@crowd/types' +import sanitizeHtml from 'sanitize-html' + +/** + * Get the URL for a Slack message + * @param stream Stream we are parsing + * @param pipelineData Pipeline data + * @param record Message record + * @returns Return the url: workspaceUrl + channelUrl + messageUrl + */ +function getUrl(channelId: string, teamUrl: string, record: SlackMessage) { + return `${teamUrl}archives/${channelId}/p${record.ts.replace('.', '')}` +} + +function parseMember(record: SlackMember): IMemberData { + const member: IMemberData = { + displayName: record.profile.real_name, + identities: [ + { + platform: PlatformType.SLACK, + username: record.name, + sourceId: record.id, + }, + ], + emails: record.profile.email ? [record.profile.email] : [], + attributes: { + [MemberAttributeName.SOURCE_ID]: { + [PlatformType.SLACK]: record.id, + }, + ...(record.profile.image_72 && { + [MemberAttributeName.AVATAR_URL]: { + [PlatformType.SLACK]: record.profile.image_72, + }, + }), + ...(record.tz_label && { + [MemberAttributeName.TIMEZONE]: { + [PlatformType.SLACK]: record.tz_label, + }, + }), + ...(record.profile.title && { + [MemberAttributeName.JOB_TITLE]: { + [PlatformType.SLACK]: record.profile.title, + }, + }), + }, + } + + return member +} + +const parseChannel = async (ctx: IProcessDataContext) => { + const data = ctx.data as ISlackAPIData + + const message = data.message + const member = data.member + const channelId = data.channelId + + if (member !== undefined) { + let body = message.text // it's already cleaned from mentions + let activityType + let score + let isContribution + let sourceId + if (message.subtype === 'channel_join') { + activityType = 'channel_joined' + score = SLACK_GRID[SlackActivityType.JOINED_CHANNEL].score + isContribution = SLACK_GRID[SlackActivityType.JOINED_CHANNEL].isContribution + body = undefined + sourceId = message.user + } else { + activityType = 'message' + score = SLACK_GRID[SlackActivityType.MESSAGE].score + isContribution = SLACK_GRID[SlackActivityType.MESSAGE].isContribution + sourceId = message.ts + } + + const activity: IActivityData = { + type: activityType, + sourceId, + sourceParentId: '', + timestamp: new Date(parseInt(message.ts, 10) * 1000).toISOString(), + body, + url: getUrl(channelId, data.base.teamUrl, message), + channel: data.base.channelsInfo[channelId].name, + attributes: { + thread: false, + reactions: message.reactions ? message.reactions : [], + attachments: message.attachments ? message.attachments : [], + }, + score, + isContribution, + member: parseMember(member), + } + + await ctx.publishActivity(activity) + } +} + +const parseThreads = async (ctx: IProcessDataContext) => { + const data = ctx.data as ISlackAPIData + + const message = data.message + const member = data.member + + if (member !== undefined) { + const activity: IActivityData = { + type: 'message', + sourceId: message.ts, + sourceParentId: data.threadId, + timestamp: new Date(parseInt(message.ts, 10) * 1000).toISOString(), + body: message.text ? sanitizeHtml(message.text) : '', + url: getUrl(data.channelId, data.base.teamUrl, message), + channel: data.channel, + attributes: { + thread: { + body: sanitizeHtml(data.placeholder), + id: data.threadId, + }, + reactions: message.reactions ? message.reactions : [], + attachments: message.attachments ? message.attachments : [], + }, + score: SLACK_GRID[SlackActivityType.MESSAGE].score, + isContribution: SLACK_GRID[SlackActivityType.MESSAGE].isContribution, + member: parseMember(member), + } + + await ctx.publishActivity(activity) + } +} + +const parseMembers = async (ctx: IProcessDataContext) => { + const data = ctx.data as ISlackAPIData + + const member = data.member + + if (member !== undefined && !member.is_bot) { + const activity: IActivityData = { + type: 'channel_joined', + sourceId: member.id, + timestamp: new Date('1970-01-01T00:00:00+00:00').toISOString(), + body: undefined, + attributes: { + thread: false, + }, + score: SLACK_GRID[SlackActivityType.JOINED_CHANNEL].score, + isContribution: SLACK_GRID[SlackActivityType.JOINED_CHANNEL].isContribution, + member: parseMember(member), + } + + await ctx.publishActivity(activity) + } +} + +const handler: ProcessDataHandler = async (ctx) => { + const data = ctx.data as ISlackAPIData + switch (data.type) { + case 'channel': + await parseChannel(ctx) + break + case 'threads': + await parseThreads(ctx) + break + case 'members': + await parseMembers(ctx) + break + default: + ctx.log.warn(`Unknown data type: ${data.type}`) + await ctx.abortRunWithError(`Unknown data type: ${data.type}`) + break + } +} + +export default handler diff --git a/services/libs/integrations/src/integrations/slack/processStream.ts b/services/libs/integrations/src/integrations/slack/processStream.ts new file mode 100644 index 0000000000..03f0370a10 --- /dev/null +++ b/services/libs/integrations/src/integrations/slack/processStream.ts @@ -0,0 +1,341 @@ +import { IProcessStreamContext, ProcessStreamHandler } from '@/types' +import { + SlackStreamType, + ISlackRootStreamData, + ISlackMemberStreamData, + ISlackChannelStreamData, + ISlackThreadStreamData, + ISlackStreamBase, + ISlackAPIData, + SlackMessage, + SlackMember, +} from './types' +import getChannels from './api/getChannels' +import getTeam from './api/getTeam' +import getMessages from './api/getMessages' +import getMember from './api/getMember' +import getMembers from './api/getMembers' +import getMessagesInThreads from './api/getMessagesInThreads' + +async function removeMentions(text: string, ctx: IProcessStreamContext): Promise { + const regex = /<@!?[^>]*>/ + const globalRegex = /<@!?[^>]*>/g + const matches = text.match(globalRegex) + if (matches) { + for (let match of matches) { + match = match.replace('<', '').replace('>', '').replace('@', '').replace('!', '') + + const user = await getSlackMember(match, ctx) + const username = user ? user.name : 'mention' + text = text.replace(regex, `@${username}`) + } + } + + return text +} + +async function getSlackMember( + userId: string, + ctx: IProcessStreamContext, +): Promise { + const membersCache = ctx.cache + const metadata = ctx.stream.data as ISlackStreamBase + + const prefix = (val: string) => `slack-member:${val}` + + const cached = await membersCache.get(prefix(userId)) + if (cached) { + if (cached === 'null') { + return undefined + } + + return JSON.parse(cached) + } + const result = await getMember( + { + token: metadata.token, + userId, + }, + ctx, + ) + + const member = result.records + + if (member) { + await membersCache.set(prefix(userId), JSON.stringify(member), 24 * 60 * 60) + + return member + } + + await membersCache.set(prefix(userId), 'null', 24 * 60 * 60) + return undefined +} + +const processRootStream: ProcessStreamHandler = async (ctx) => { + const metadata = ctx.stream.data as ISlackRootStreamData + const token = metadata.token + const channels = metadata.channels ? metadata.channels : [] + + let channelsFromSlackAPI = await getChannels({ token }, ctx) + + channelsFromSlackAPI = channelsFromSlackAPI.map((c) => { + if (channels.filter((a) => a.id === c.id).length <= 0) { + return { ...c, new: true } + } + return c + }) + + await ctx.updateIntegrationSettings({ + ...(ctx.integration.settings as object), + channels: channelsFromSlackAPI, + }) + + const team = await getTeam({ token }, ctx) + const teamUrl = team.url + + const channelsInfo = channelsFromSlackAPI.reduce((acc, channel) => { + acc[channel.id] = { + name: channel.name, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + new: !!(channel as any).new, + } + return acc + }, {}) + + if (ctx.onboarding) { + await ctx.publishStream(`${SlackStreamType.MEMBERS}:`, { + page: '', + token, + channelsInfo, + teamUrl, + team, + channels: channelsFromSlackAPI, + }) + } + + for (const c of channelsFromSlackAPI) { + await ctx.publishStream(`${SlackStreamType.CHANNEL}:${c.id}`, { + channelId: c.id, + page: '', + general: c.general, + token, + channelsInfo, + teamUrl, + team, + channels: channelsFromSlackAPI, + }) + } +} + +const processChannelStream: ProcessStreamHandler = async (ctx) => { + const metadata = ctx.stream.data as ISlackChannelStreamData + + const result = await getMessages( + { + channelId: metadata.channelId, + page: metadata.page, + perPage: 200, + token: metadata.token, + new: metadata.channelsInfo[metadata.channelId].new, + }, + ctx, + ) + + const nextPage = result.nextPage + + // publishing next page stream + if (nextPage) { + await ctx.publishStream( + `${SlackStreamType.CHANNEL}:${metadata.channelId}:${nextPage}`, + { + page: nextPage, + channelId: metadata.channelId, + general: metadata.general, + token: metadata.token, + channelsInfo: metadata.channelsInfo, + teamUrl: metadata.teamUrl, + team: metadata.team, + channels: metadata.channels, + }, + ) + } + + const messages = result.records as SlackMessage[] + + // publishing new streams for each thread + while (messages.length > 0) { + const message = messages.shift() + const userId = message.user + const member = await getSlackMember(userId, ctx) + + // if member is undefined we don't publish the activity and thread stream + if (member !== undefined) { + // removing methions here instead of in the processData handler because we need to do API calls + message.text = message.text ? await removeMentions(message.text, ctx) : '' + + const thread_ts = message.thread_ts + + if (thread_ts) { + await ctx.publishStream( + `${SlackStreamType.THREADS}:${metadata.channelId}:${thread_ts}`, + { + page: '', + threadId: thread_ts, + channel: metadata.channelsInfo[metadata.channelId].name, + channelId: metadata.channelId, + new: metadata.channelsInfo[metadata.channelId].new, + token: metadata.token, + channelsInfo: metadata.channelsInfo, + teamUrl: metadata.teamUrl, + team: metadata.team, + channels: metadata.channels, + placeholder: message.text, + }, + ) + } + + await ctx.publishData({ + type: 'channel', + message, + member, + channelId: metadata.channelId, + base: { + token: metadata.token, + channelsInfo: metadata.channelsInfo, + teamUrl: metadata.teamUrl, + team: metadata.team, + channels: metadata.channels, + }, + }) + } + } +} + +const processThreadStream: ProcessStreamHandler = async (ctx) => { + const metadata = ctx.stream.data as ISlackThreadStreamData + const result = await getMessagesInThreads( + { + token: metadata.token, + channelId: metadata.channelId, + page: metadata.page, + perPage: 200, + threadId: metadata.threadId, + new: metadata.new, + }, + ctx, + ) + + const nextPage = result.nextPage + + // publishing next page stream + if (nextPage) { + await ctx.publishStream( + `${SlackStreamType.THREADS}:${metadata.channelId}:${metadata.threadId}:${nextPage}`, + { + page: nextPage, + channelId: metadata.channelId, + token: metadata.token, + channelsInfo: metadata.channelsInfo, + teamUrl: metadata.teamUrl, + team: metadata.team, + channels: metadata.channels, + threadId: metadata.threadId, + channel: metadata.channel, + new: metadata.new, + placeholder: metadata.placeholder, + }, + ) + } + + const messages = result.records as SlackMessage[] + + // publishing new streams for each thread + while (messages.length > 0) { + const message = messages.shift() + const userId = message.user + const member = await getSlackMember(userId, ctx) + + // removing methions here instead of in the processData handler because we need to do API calls + message.text = message.text ? await removeMentions(message.text, ctx) : '' + + // if member is undefined we don't publish the activity + if (member !== undefined) { + await ctx.publishData({ + type: 'threads', + message, + member, + channelId: metadata.channelId, + threadId: metadata.threadId, + channel: metadata.channel, + placeholder: metadata.placeholder, + base: { + token: metadata.token, + channelsInfo: metadata.channelsInfo, + teamUrl: metadata.teamUrl, + team: metadata.team, + channels: metadata.channels, + }, + }) + } + } +} + +const processMembersStream: ProcessStreamHandler = async (ctx) => { + const metadata = ctx.stream.data as ISlackMemberStreamData + + const result = await getMembers( + { + token: metadata.token, + page: metadata.page, + perPage: 200, + teamId: metadata.team.id, + }, + ctx, + ) + + const nextPage = result.nextPage + + // publishing next page stream + if (nextPage) { + await ctx.publishStream(`${SlackStreamType.MEMBERS}:${nextPage}`, { + page: nextPage, + token: metadata.token, + channelsInfo: metadata.channelsInfo, + teamUrl: metadata.teamUrl, + team: metadata.team, + channels: metadata.channels, + }) + } + + const members = result.records as SlackMember[] + while (members.length > 0) { + const member = members.shift() + await ctx.publishData({ + type: 'members', + member, + base: { + token: metadata.token, + channelsInfo: metadata.channelsInfo, + teamUrl: metadata.teamUrl, + team: metadata.team, + channels: metadata.channels, + }, + }) + } +} + +const handler: ProcessStreamHandler = async (ctx) => { + if (ctx.stream.identifier.startsWith(SlackStreamType.ROOT)) { + await processRootStream(ctx) + } else if (ctx.stream.identifier.startsWith(SlackStreamType.CHANNEL)) { + await processChannelStream(ctx) + } else if (ctx.stream.identifier.startsWith(SlackStreamType.MEMBERS)) { + await processMembersStream(ctx) + } else if (ctx.stream.identifier.startsWith(SlackStreamType.THREADS)) { + await processThreadStream(ctx) + } else { + await ctx.abortRunWithError(`Unknown stream type: ${ctx.stream.identifier}`) + } +} + +export default handler diff --git a/services/libs/integrations/src/integrations/slack/types.ts b/services/libs/integrations/src/integrations/slack/types.ts index 77aa856106..708302be66 100644 --- a/services/libs/integrations/src/integrations/slack/types.ts +++ b/services/libs/integrations/src/integrations/slack/types.ts @@ -2,3 +2,158 @@ export enum SlackActivityType { JOINED_CHANNEL = 'channel_joined', MESSAGE = 'message', } + +export enum SlackStreamType { + ROOT = 'root', + CHANNEL = 'channel', + MEMBERS = 'members', + THREADS = 'threads', +} +export interface ISlackPlatformSettings { + maxRetrospectInSeconds: number +} + +export interface ISlackStreamBase { + token: string + // eslint-disable-next-line @typescript-eslint/no-explicit-any + channelsInfo: any + teamUrl: string + team: SlackTeam + // eslint-disable-next-line @typescript-eslint/no-explicit-any + channels: any[] +} + +export interface ISlackAPIData { + type: 'members' | 'threads' | 'channel' + message?: SlackMessage + member: SlackMember + channelId?: string + threadId?: string + placeholder?: string + channel?: string + base: ISlackStreamBase +} + +export interface ISlackRootStreamData { + token: string + // eslint-disable-next-line @typescript-eslint/no-explicit-any + channels: any[] +} + +export interface ISlackMemberStreamData extends ISlackStreamBase { + page: string +} + +export interface ISlackChannelStreamData extends ISlackStreamBase { + channelId: string + page: string + // eslint-disable-next-line @typescript-eslint/no-explicit-any + general: any +} + +export interface ISlackThreadStreamData extends ISlackStreamBase { + channelId: string + threadId: string + page: string + channel: string + new: boolean + placeholder: string +} + +export interface ISlackIntegrationSettings { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + channels: any[] +} + +export interface SlackGetChannelsInput { + token: string +} + +export interface SlackGetMessagesInput { + channelId: string + token: string + page: string | undefined + perPage: number | 100 + new: boolean +} + +export interface SlackGetMessagesInThreadsInput { + channelId: string + threadId: string + token: string + page: string | undefined + perPage: number | 100 + new: boolean +} + +export interface SlackGetMembersInput { + token: string + teamId: string + page: string | undefined + perPage: number | 100 +} + +export interface SlackGetMemberInput { + token: string + userId: string +} + +export interface SlackChannel { + id: string + name: string + is_member?: boolean + is_general: boolean +} + +export interface SlackTeam { + id: string + name: string + url: string + domain: string +} + +export type SlackChannels = SlackChannel[] + +export interface SlackMessage { + ts: string + type: string + text: string + subtype?: string + // eslint-disable-next-line @typescript-eslint/no-explicit-any + reactions?: any + // eslint-disable-next-line @typescript-eslint/no-explicit-any + attachments?: any + user: string + thread_ts?: string +} + +export type SlackMessages = SlackMessage[] + +export interface SlackMember { + id: string + name: string + tz_label: string + is_bot: boolean + profile: { + title: string + real_name: string + image_72: string + email: string + } +} + +export type SlackMembers = SlackMember[] + +export interface SlackParsedResponse { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + records: any + nextPage: string +} + +export interface SlackGetMembersOutput extends SlackParsedResponse { + records: SlackMembers | [] +} + +export interface SlackGetMemberOutput extends SlackParsedResponse { + records: SlackMember +} diff --git a/services/libs/integrations/src/types.ts b/services/libs/integrations/src/types.ts index a8fb5a3a48..a47ddb4ddf 100644 --- a/services/libs/integrations/src/types.ts +++ b/services/libs/integrations/src/types.ts @@ -77,6 +77,17 @@ export interface IIntegrationDescriptor { */ processData: ProcessDataHandler + /** + * Function that will be called in the end of successful integration run. + * The result of this function should be new settings of the integration. + * The new settings will be merged with the old settings and saved. + * + * @param settings current settings of the integration + * @returns new settings of the integration + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + postProcess?: (settings: any) => any + // type of integration service type: string diff --git a/services/libs/types/src/integrations.ts b/services/libs/types/src/integrations.ts index a67b65be29..7aebf4d1da 100644 --- a/services/libs/types/src/integrations.ts +++ b/services/libs/types/src/integrations.ts @@ -21,4 +21,5 @@ export interface IIntegration { platform: string status: IntegrationState settings: unknown + token: string | null } From 864bf41af2dfb446e0abf203399a331fb6653e2e Mon Sep 17 00:00:00 2001 From: Igor Kotua <36304232+garrrikkotua@users.noreply.github.com> Date: Thu, 6 Jul 2023 18:13:47 +0300 Subject: [PATCH 06/30] Add post processing to the new framework (#1083) From d7bd8b6d482a5e1a9356f82e01b4f4986a56cb37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 7 Jul 2023 08:03:04 +0200 Subject: [PATCH 07/30] deduplication ids --- services/libs/sqs/src/instances/dataSinkWorker.ts | 1 + .../sqs/src/instances/integrationDataWorker.ts | 1 + .../sqs/src/instances/integrationRunWorker.ts | 6 +++++- .../sqs/src/instances/integrationStreamWorker.ts | 1 + services/libs/sqs/src/instances/nodejsWorker.ts | 15 ++++++++++++--- 5 files changed, 20 insertions(+), 4 deletions(-) diff --git a/services/libs/sqs/src/instances/dataSinkWorker.ts b/services/libs/sqs/src/instances/dataSinkWorker.ts index 30a3ae07d7..e8d3886381 100644 --- a/services/libs/sqs/src/instances/dataSinkWorker.ts +++ b/services/libs/sqs/src/instances/dataSinkWorker.ts @@ -13,6 +13,7 @@ export class DataSinkWorkerEmitter extends SqsQueueEmitter { await this.sendMessage( `results-${tenantId}-${platform}`, new ProcessIntegrationResultQueueMessage(resultId), + resultId, ) } } diff --git a/services/libs/sqs/src/instances/integrationDataWorker.ts b/services/libs/sqs/src/instances/integrationDataWorker.ts index 351bb34612..3a5ab2dfbd 100644 --- a/services/libs/sqs/src/instances/integrationDataWorker.ts +++ b/services/libs/sqs/src/instances/integrationDataWorker.ts @@ -13,6 +13,7 @@ export class IntegrationDataWorkerEmitter extends SqsQueueEmitter { await this.sendMessage( `data-${tenantId}-${platform}`, new ProcessStreamDataQueueMessage(dataId), + dataId, ) } } diff --git a/services/libs/sqs/src/instances/integrationRunWorker.ts b/services/libs/sqs/src/instances/integrationRunWorker.ts index 1ad9dcb716..38c62000c2 100644 --- a/services/libs/sqs/src/instances/integrationRunWorker.ts +++ b/services/libs/sqs/src/instances/integrationRunWorker.ts @@ -35,7 +35,11 @@ export class IntegrationRunWorkerEmitter extends SqsQueueEmitter { platform: string, runId: string, ): Promise { - await this.sendMessage(`${tenantId}-${platform}`, new GenerateRunStreamsQueueMessage(runId)) + await this.sendMessage( + `${tenantId}-${platform}`, + new GenerateRunStreamsQueueMessage(runId), + runId, + ) } public async streamProcessed(tenantId: string, platform: string, runId: string): Promise { diff --git a/services/libs/sqs/src/instances/integrationStreamWorker.ts b/services/libs/sqs/src/instances/integrationStreamWorker.ts index 128ca6fae2..0957a547f0 100644 --- a/services/libs/sqs/src/instances/integrationStreamWorker.ts +++ b/services/libs/sqs/src/instances/integrationStreamWorker.ts @@ -36,6 +36,7 @@ export class IntegrationStreamWorkerEmitter extends SqsQueueEmitter { await this.sendMessage( `streams-${tenantId}-${platform}`, new ProcessStreamQueueMessage(streamId), + streamId, ) } } diff --git a/services/libs/sqs/src/instances/nodejsWorker.ts b/services/libs/sqs/src/instances/nodejsWorker.ts index 0634b4e268..f65aab1690 100644 --- a/services/libs/sqs/src/instances/nodejsWorker.ts +++ b/services/libs/sqs/src/instances/nodejsWorker.ts @@ -13,14 +13,18 @@ export class NodejsWorkerEmitter extends SqsQueueEmitter { super(client, NODEJS_WORKER_QUEUE_SETTINGS, parentLog) } - public override sendMessage(groupId: string, message: IQueueMessage): Promise { + public override sendMessage( + groupId: string, + message: IQueueMessage, + deduplicationId: string, + ): Promise { this.log.info( { messageType: message.type, }, 'Sending nodejs-worker sqs message!', ) - return super.sendMessage(groupId, message) + return super.sendMessage(groupId, message, deduplicationId) } public async processAutomationForNewActivity( @@ -31,10 +35,15 @@ export class NodejsWorkerEmitter extends SqsQueueEmitter { await this.sendMessage( tenantId, new NewActivityAutomationQueueMessage(tenantId, activityId, segmentId), + `${activityId}--${segmentId}`, ) } public async processAutomationForNewMember(tenantId: string, memberId: string): Promise { - await this.sendMessage(tenantId, new NewMemberAutomationQueueMessage(tenantId, memberId)) + await this.sendMessage( + tenantId, + new NewMemberAutomationQueueMessage(tenantId, memberId), + memberId, + ) } } From 6814c151dedd234654256273cc6a2fa3053a410c Mon Sep 17 00:00:00 2001 From: Mish Savelyev <1564970+sausage-todd@users.noreply.github.com> Date: Fri, 7 Jul 2023 19:50:12 +0200 Subject: [PATCH 08/30] Add `activitives.platform` to some cubejs pre-aggregations (#1076) --- backend/src/cubejs/schema/Members.js | 59 +++++++++++++++++++--- backend/src/cubejs/schema/Organizations.js | 16 +++++- 2 files changed, 65 insertions(+), 10 deletions(-) diff --git a/backend/src/cubejs/schema/Members.js b/backend/src/cubejs/schema/Members.js index d9e7bf567a..920cce0798 100644 --- a/backend/src/cubejs/schema/Members.js +++ b/backend/src/cubejs/schema/Members.js @@ -28,6 +28,7 @@ cube(`Members`, { Members.isBot, Members.isOrganization, Segments.id, + Activities.platform, ], timeDimension: Members.joinedAt, granularity: `day`, @@ -36,17 +37,54 @@ cube(`Members`, { }, }, - ActiveMembers: { + MembersByJoinedAtPure: { measures: [Members.count], dimensions: [ Members.score, Members.location, Members.tenantId, + Members.isTeamMember, + Members.isBot, + Members.isOrganization, + Segments.id, + ], + timeDimension: Members.joinedAt, + granularity: `day`, + refreshKey: { + every: `10 minute`, + }, + }, + + MembersByJoinedAtTags: { + measures: [Members.count], + dimensions: [ + Members.score, + Members.location, + Members.tenantId, + Members.isTeamMember, + Members.isBot, + Members.isOrganization, + Segments.id, Tags.name, + ], + timeDimension: Members.joinedAt, + granularity: `day`, + refreshKey: { + every: `10 minute`, + }, + }, + + MembersByJoinedAtPlatform: { + measures: [Members.count], + dimensions: [ + Members.score, + Members.location, + Members.tenantId, Members.isTeamMember, Members.isBot, Members.isOrganization, Segments.id, + Activities.platform, ], timeDimension: Members.joinedAt, granularity: `day`, @@ -55,9 +93,11 @@ cube(`Members`, { }, }, - MembersActivities: { + MembersByActivityPure: { measures: [Members.count], dimensions: [ + Members.score, + Members.location, Members.tenantId, Members.isTeamMember, Members.isBot, @@ -71,16 +111,17 @@ cube(`Members`, { }, }, - MActivitiesDupDimensions: { + MembersByActivityPlatform: { measures: [Members.count], dimensions: [ + Members.score, + Members.location, Members.tenantId, - Activities.platform, - Activities.type, Members.isTeamMember, Members.isBot, Members.isOrganization, Segments.id, + Activities.platform, ], timeDimension: Activities.date, granularity: `day`, @@ -89,17 +130,19 @@ cube(`Members`, { }, }, - MembersTags: { + MembersByActivityActivityType: { measures: [Members.count], dimensions: [ + Members.score, + Members.location, Members.tenantId, - Tags.name, Members.isTeamMember, Members.isBot, Members.isOrganization, Segments.id, + Activities.type, ], - timeDimension: Members.joinedAt, + timeDimension: Activities.date, granularity: `day`, refreshKey: { every: `10 minute`, diff --git a/backend/src/cubejs/schema/Organizations.js b/backend/src/cubejs/schema/Organizations.js index 1bffe9ba73..c49387734f 100644 --- a/backend/src/cubejs/schema/Organizations.js +++ b/backend/src/cubejs/schema/Organizations.js @@ -5,13 +5,25 @@ cube(`Organizations`, { preAggregations: { newOrganizations: { measures: [Organizations.count], - dimensions: [Organizations.tenantId, Members.isTeamMember, Members.isBot, Segments.id], + dimensions: [ + Organizations.tenantId, + Members.isTeamMember, + Members.isBot, + Segments.id, + Activities.platform, + ], timeDimension: Organizations.joinedAt, granularity: `day`, }, activeOrganizations: { measures: [Organizations.count], - dimensions: [Organizations.tenantId, Members.isTeamMember, Members.isBot, Segments.id], + dimensions: [ + Organizations.tenantId, + Members.isTeamMember, + Members.isBot, + Segments.id, + Activities.platform, + ], timeDimension: Activities.date, granularity: `day`, }, From 1eb654f26ae5aecd134f8bf40b3a0e5867bba670 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Fri, 7 Jul 2023 23:36:41 +0530 Subject: [PATCH 09/30] Fill `jobTitle` from member enrichment (#1081) --- .../services/premium/enrichment/memberEnrichmentService.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/backend/src/services/premium/enrichment/memberEnrichmentService.ts b/backend/src/services/premium/enrichment/memberEnrichmentService.ts index 1a41792cc2..9bc9f71845 100644 --- a/backend/src/services/premium/enrichment/memberEnrichmentService.ts +++ b/backend/src/services/premium/enrichment/memberEnrichmentService.ts @@ -300,6 +300,12 @@ export default class MemberEnrichmentService extends LoggerBase { organization.location = organizationsByWorkExperience[0].location organization.linkedin = organizationsByWorkExperience[0].companyLinkedInUrl organization.url = organizationsByWorkExperience[0].companyUrl + + // fetch jobTitle from most recent work experience + member.attributes.jobTitle = { + custom: organizationsByWorkExperience[0].title, + default: organizationsByWorkExperience[0].title, + } } } From 87daee1f7844892ab28a3aec78cd6823e185beb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Mon, 10 Jul 2023 08:45:46 +0200 Subject: [PATCH 10/30] remove results after processing --- .../data_sink_worker/src/repo/dataSink.repo.ts | 18 ++++-------------- .../src/service/dataSink.service.ts | 2 +- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/services/apps/data_sink_worker/src/repo/dataSink.repo.ts b/services/apps/data_sink_worker/src/repo/dataSink.repo.ts index 8ffecc0c92..8098dd72f9 100644 --- a/services/apps/data_sink_worker/src/repo/dataSink.repo.ts +++ b/services/apps/data_sink_worker/src/repo/dataSink.repo.ts @@ -65,20 +65,10 @@ export default class DataSinkRepository extends RepositoryBase { - const result = await this.db().result( - `update integration.results - set state = $(state), - "processedAt" = now(), - "updatedAt" = now() - where id = $(resultId)`, - { - resultId, - state: IntegrationResultState.PROCESSED, - }, - ) - - this.checkUpdateRowCount(result.rowCount, 1) + public async deleteResult(resultId: string): Promise { + await this.db().none(`delete from integration.results where id = $(resultId)`, { + resultId, + }) } public async touchRun(runId: string): Promise { diff --git a/services/apps/data_sink_worker/src/service/dataSink.service.ts b/services/apps/data_sink_worker/src/service/dataSink.service.ts index 0e8e94b262..588345b03b 100644 --- a/services/apps/data_sink_worker/src/service/dataSink.service.ts +++ b/services/apps/data_sink_worker/src/service/dataSink.service.ts @@ -100,7 +100,7 @@ export default class DataSinkService extends LoggerBase { throw new Error(`Unknown result type: ${data.type}`) } } - await this.repo.markResultProcessed(resultId) + await this.repo.deleteResult(resultId) } catch (err) { this.log.error(err, 'Error processing result.') await this.triggerResultError( From ae041167a58b3f5163170b62029b77f94d6cac81 Mon Sep 17 00:00:00 2001 From: Yeganathan S Date: Mon, 10 Jul 2023 14:01:09 +0530 Subject: [PATCH 11/30] Assign members to organization based on Email (#1086) --- backend/src/services/memberService.ts | 21 +++++++++++++++++++++ backend/src/services/organizationService.ts | 4 ++++ 2 files changed, 25 insertions(+) diff --git a/backend/src/services/memberService.ts b/backend/src/services/memberService.ts index d23348d6a4..14fa366f51 100644 --- a/backend/src/services/memberService.ts +++ b/backend/src/services/memberService.ts @@ -327,6 +327,27 @@ export default class MemberService extends LoggerBase { organizationsIds.push(organizationRecord.id) } } + + // Auto assign member to organization if email domain matches + if (data.emails) { + const emailDomains = new Set() + + // Collect unique domains + for (const email of data.emails) { + const domain = email.split('@')[1] + emailDomains.add(domain) + } + + // Fetch organization ids for these domains + const organizationService = new OrganizationService(this.options) + for (const domain of emailDomains) { + const organizationRecord = await organizationService.findByUrl(domain) + if (organizationRecord) { + organizationsIds.push(organizationRecord.id) + } + } + } + // Remove dups data.organizations = [...new Set(organizationsIds)] } diff --git a/backend/src/services/organizationService.ts b/backend/src/services/organizationService.ts index 05864a88c3..ade9b550c2 100644 --- a/backend/src/services/organizationService.ts +++ b/backend/src/services/organizationService.ts @@ -192,6 +192,10 @@ export default class OrganizationService extends LoggerBase { return OrganizationRepository.findAndCountAll(args, this.options) } + async findByUrl(url) { + return OrganizationRepository.findByUrl(url, this.options) + } + async query(data) { const advancedFilter = data.filter const orderBy = data.orderBy From e4d3bfd856f006e735fef5e8d1cd78f9238c5cbd Mon Sep 17 00:00:00 2001 From: Dhruwang Jariwala <67850763+Dhruwang@users.noreply.github.com> Date: Mon, 10 Jul 2023 15:11:37 +0530 Subject: [PATCH 12/30] Fix : Eagle Eye onboarding wizard redirects back to first step (#1014) Co-authored-by: Joana Maia --- frontend/src/modules/layout/components/layout.vue | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/modules/layout/components/layout.vue b/frontend/src/modules/layout/components/layout.vue index 3e3dec8d7f..21c5d8a631 100644 --- a/frontend/src/modules/layout/components/layout.vue +++ b/frontend/src/modules/layout/components/layout.vue @@ -1,7 +1,7 @@