diff --git a/packages/twenty-server/src/modules/messaging/common/services/messaging-fetch-by-batch.service.ts b/packages/twenty-server/src/modules/messaging/common/services/messaging-fetch-by-batch.service.ts index 354ab92659f0..0af4b578e855 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/messaging-fetch-by-batch.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/messaging-fetch-by-batch.service.ts @@ -5,25 +5,31 @@ import { AxiosResponse } from 'axios'; import { GmailMessageParsedResponse } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message-parsed-response'; import { BatchQueries } from 'src/modules/messaging/message-import-manager/types/batch-queries'; +import { createQueriesFromMessageIds } from 'src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util'; @Injectable() export class MessagingFetchByBatchesService { constructor(private readonly httpService: HttpService) {} async fetchAllByBatches( - queries: BatchQueries, + messageIds: string[], accessToken: string, boundary: string, - ): Promise[]> { + ): Promise<{ + messageIdsByBatch: string[][]; + batchResponses: AxiosResponse[]; + }> { const batchLimit = 50; let batchOffset = 0; let batchResponses: AxiosResponse[] = []; - while (batchOffset < queries.length) { + const messageIdsByBatch: string[][] = []; + + while (batchOffset < messageIds.length) { const batchResponse = await this.fetchBatch( - queries, + messageIds, accessToken, batchOffset, batchLimit, @@ -32,19 +38,25 @@ export class MessagingFetchByBatchesService { batchResponses = batchResponses.concat(batchResponse); + messageIdsByBatch.push( + messageIds.slice(batchOffset, batchOffset + batchLimit), + ); + batchOffset += batchLimit; } - return batchResponses; + return { messageIdsByBatch, batchResponses }; } async fetchBatch( - queries: BatchQueries, + messageIds: string[], accessToken: string, batchOffset: number, batchLimit: number, boundary: string, ): Promise> { + const queries = createQueriesFromMessageIds(messageIds); + const limitedQueries = queries.slice(batchOffset, batchOffset + batchLimit); const response = await this.httpService.axiosRef.post( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command.ts new file mode 100644 index 000000000000..c55465f7dedc --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command.ts @@ -0,0 +1,69 @@ +import { Command, CommandRunner, Option } from 'nest-commander'; + +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { + MessagingAddSingleMessageToCacheForImportJob, + MessagingAddSingleMessageToCacheForImportJobData, +} from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job'; + +type MessagingSingleMessageImportCommandOptions = { + messageExternalId: string; + messageChannelId: string; + workspaceId: string; +}; + +@Command({ + name: 'messaging:single-message-import', + description: 'Enqueue a job to schedule the import of a single message', +}) +export class MessagingSingleMessageImportCommand extends CommandRunner { + constructor( + @InjectMessageQueue(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run( + _passedParam: string[], + options: MessagingSingleMessageImportCommandOptions, + ): Promise { + await this.messageQueueService.add( + MessagingAddSingleMessageToCacheForImportJob.name, + { + messageExternalId: options.messageExternalId, + messageChannelId: options.messageChannelId, + workspaceId: options.workspaceId, + }, + ); + } + + @Option({ + flags: '-m, --message-external-id [message_external_id]', + description: 'Message external ID', + required: true, + }) + parseMessageId(value: string): string { + return value; + } + + @Option({ + flags: '-M, --message-channel-id [message_channel_id]', + description: 'Message channel ID', + required: true, + }) + parseMessageChannelId(value: string): string { + return value; + } + + @Option({ + flags: '-w, --workspace-id [workspace_id]', + description: 'Workspace ID', + required: true, + }) + parseWorkspaceId(value: string): string { + return value; + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service.ts index 1e0c0c6e979c..f6cbb064df28 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service.ts @@ -7,7 +7,6 @@ import { gmail_v1 } from 'googleapis'; import { assert, assertNotNull } from 'src/utils/assert'; import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; -import { MessageQuery } from 'src/modules/messaging/message-import-manager/types/message-or-thread-query'; import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util'; import { MessagingFetchByBatchesService } from 'src/modules/messaging/common/services/messaging-fetch-by-batch.service'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; @@ -27,7 +26,7 @@ export class MessagingGmailFetchMessagesByBatchesService { ) {} async fetchAllMessages( - queries: MessageQuery[], + messageIds: string[], connectedAccountId: string, workspaceId: string, ): Promise { @@ -46,22 +45,24 @@ export class MessagingGmailFetchMessagesByBatchesService { const accessToken = connectedAccount.accessToken; - const batchResponses = await this.fetchByBatchesService.fetchAllByBatches( - queries, - accessToken, - 'batch_gmail_messages', - ); + const { messageIdsByBatch, batchResponses } = + await this.fetchByBatchesService.fetchAllByBatches( + messageIds, + accessToken, + 'batch_gmail_messages', + ); let endTime = Date.now(); this.logger.log( `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} fetching ${ - queries.length + messageIds.length } messages in ${endTime - startTime}ms`, ); startTime = Date.now(); const formattedResponse = this.formatBatchResponsesAsGmailMessages( + messageIdsByBatch, batchResponses, workspaceId, connectedAccountId, @@ -71,7 +72,7 @@ export class MessagingGmailFetchMessagesByBatchesService { this.logger.log( `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} formatting ${ - queries.length + messageIds.length } messages in ${endTime - startTime}ms`, ); @@ -79,6 +80,7 @@ export class MessagingGmailFetchMessagesByBatchesService { } private formatBatchResponseAsGmailMessage( + messageIds: string[], responseCollection: AxiosResponse, workspaceId: string, connectedAccountId: string, @@ -90,94 +92,92 @@ export class MessagingGmailFetchMessagesByBatchesService { return str.replace(/\0/g, ''); }; - const formattedResponse = parsedResponses.map( - (response): GmailMessage | null => { - if ('error' in response) { - if (response.error.code === 404) { - return null; - } - - throw response.error; - } - - const { - historyId, - id, - threadId, - internalDate, - subject, - from, - to, - cc, - bcc, - headerMessageId, - text, - attachments, - deliveredTo, - } = this.parseGmailMessage(response); - - if (!from) { - this.logger.log( - `From value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, - ); - - return null; - } - - if (!to && !deliveredTo && !bcc && !cc) { - this.logger.log( - `To, Delivered-To, Bcc or Cc value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, - ); - - return null; - } - - if (!headerMessageId) { - this.logger.log( - `Message-ID is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, - ); - - return null; - } - - if (!threadId) { - this.logger.log( - `Thread Id is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, - ); - + const formattedResponse = parsedResponses.map((response, index) => { + if ('error' in response) { + if (response.error.code === 404) { return null; } - const participants = [ - ...formatAddressObjectAsParticipants(from, 'from'), - ...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'), - ...formatAddressObjectAsParticipants(cc, 'cc'), - ...formatAddressObjectAsParticipants(bcc, 'bcc'), - ]; - - let textWithoutReplyQuotations = text; - - if (text) { - textWithoutReplyQuotations = planer.extractFrom(text, 'text/plain'); - } - - const messageFromGmail: GmailMessage = { - historyId, - externalId: id, - headerMessageId, - subject: subject || '', - messageThreadExternalId: threadId, - internalDate, - fromHandle: from[0].address || '', - fromDisplayName: from[0].name || '', - participants, - text: sanitizeString(textWithoutReplyQuotations || ''), - attachments, - }; - - return messageFromGmail; - }, - ); + throw { ...response.error, messageId: messageIds[index] }; + } + + const { + historyId, + id, + threadId, + internalDate, + subject, + from, + to, + cc, + bcc, + headerMessageId, + text, + attachments, + deliveredTo, + } = this.parseGmailMessage(response); + + if (!from) { + this.logger.log( + `From value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, + ); + + return null; + } + + if (!to && !deliveredTo && !bcc && !cc) { + this.logger.log( + `To, Delivered-To, Bcc or Cc value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, + ); + + return null; + } + + if (!headerMessageId) { + this.logger.log( + `Message-ID is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, + ); + + return null; + } + + if (!threadId) { + this.logger.log( + `Thread Id is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, + ); + + return null; + } + + const participants = [ + ...formatAddressObjectAsParticipants(from, 'from'), + ...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'), + ...formatAddressObjectAsParticipants(cc, 'cc'), + ...formatAddressObjectAsParticipants(bcc, 'bcc'), + ]; + + let textWithoutReplyQuotations = text; + + if (text) { + textWithoutReplyQuotations = planer.extractFrom(text, 'text/plain'); + } + + const messageFromGmail: GmailMessage = { + historyId, + externalId: id, + headerMessageId, + subject: subject || '', + messageThreadExternalId: threadId, + internalDate, + fromHandle: from[0].address || '', + fromDisplayName: from[0].name || '', + participants, + text: sanitizeString(textWithoutReplyQuotations || ''), + attachments, + }; + + return messageFromGmail; + }); const filteredMessages = formattedResponse.filter((message) => assertNotNull(message), @@ -187,12 +187,14 @@ export class MessagingGmailFetchMessagesByBatchesService { } private formatBatchResponsesAsGmailMessages( + messageIdsByBatch: string[][], batchResponses: AxiosResponse[], workspaceId: string, connectedAccountId: string, ): GmailMessage[] { - const messageBatches = batchResponses.map((response) => { + const messageBatches = batchResponses.map((response, index) => { return this.formatBatchResponseAsGmailMessage( + messageIdsByBatch[index], response, workspaceId, connectedAccountId, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts index bf3dec6cddad..083ff2d522af 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts @@ -14,7 +14,6 @@ import { MessageChannelWorkspaceEntity, MessageChannelSyncStage, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -import { createQueriesFromMessageIds } from 'src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util'; import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant'; @@ -95,12 +94,10 @@ export class MessagingGmailMessagesImportService { ); } - const messageQueries = createQueriesFromMessageIds(messageIdsToFetch); - try { const allMessages = await this.fetchMessagesByBatchesService.fetchAllMessages( - messageQueries, + messageIdsToFetch, connectedAccount.id, workspaceId, ); @@ -153,7 +150,9 @@ export class MessagingGmailMessagesImportService { ); } catch (error) { this.logger.log( - `Messaging import for workspace ${workspaceId} and connected account ${ + `Messaging import for messageId ${ + error.messageId + }, workspace ${workspaceId} and connected account ${ connectedAccount.id } failed with error: ${JSON.stringify(error)}`, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job.ts new file mode 100644 index 000000000000..d81bba9cd096 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job.ts @@ -0,0 +1,32 @@ +import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; +import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; + +export type MessagingAddSingleMessageToCacheForImportJobData = { + messageExternalId: string; + messageChannelId: string; + workspaceId: string; +}; + +@Processor(MessageQueue.messagingQueue) +export class MessagingAddSingleMessageToCacheForImportJob { + constructor( + @InjectCacheStorage(CacheStorageNamespace.Messaging) + private readonly cacheStorage: CacheStorageService, + ) {} + + @Process(MessagingAddSingleMessageToCacheForImportJob.name) + async handle( + data: MessagingAddSingleMessageToCacheForImportJobData, + ): Promise { + const { messageExternalId, messageChannelId, workspaceId } = data; + + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, + [messageExternalId], + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index 494edba8fb6d..38d0e8db58cb 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -4,11 +4,13 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; +import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command'; import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command'; import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command'; import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job'; import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job'; import { MessagingGmailDriverModule } from 'src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module'; +import { MessagingAddSingleMessageToCacheForImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job'; import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job'; import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job'; @@ -22,10 +24,12 @@ import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import providers: [ MessagingMessageListFetchCronCommand, MessagingMessagesImportCronCommand, + MessagingSingleMessageImportCommand, MessagingMessageListFetchJob, MessagingMessagesImportJob, MessagingMessageListFetchCronJob, MessagingMessagesImportCronJob, + MessagingAddSingleMessageToCacheForImportJob, ], exports: [], })