diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts index c21ee8a2c..213508471 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts @@ -54,7 +54,7 @@ export class BlocklistReimportMessagesJob { }, }); - await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( + await this.messagingChannelSyncStatusService.resetAndScheduleMessageListFetch( messageChannels.map((messageChannel) => messageChannel.id), workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts index a378502bf..07551f05b 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts @@ -27,7 +27,7 @@ export class MessageChannelSyncStatusService { private readonly metricsService: MetricsService, ) {} - public async scheduleFullMessageListFetch(messageChannelIds: string[]) { + public async scheduleMessageListFetch(messageChannelIds: string[]) { if (!messageChannelIds.length) { return; } @@ -42,21 +42,6 @@ export class MessageChannelSyncStatusService { }); } - public async schedulePartialMessageListFetch(messageChannelIds: string[]) { - if (!messageChannelIds.length) { - return; - } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - await messageChannelRepository.update(messageChannelIds, { - syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, - }); - } - public async scheduleMessagesImport(messageChannelIds: string[]) { if (!messageChannelIds.length) { return; @@ -72,7 +57,7 @@ export class MessageChannelSyncStatusService { }); } - public async resetAndScheduleFullMessageListFetch( + public async resetAndScheduleMessageListFetch( messageChannelIds: string[], workspaceId: string, ) { @@ -97,7 +82,7 @@ export class MessageChannelSyncStatusService { throttleFailureCount: 0, }); - await this.scheduleFullMessageListFetch(messageChannelIds); + await this.scheduleMessageListFetch(messageChannelIds); } public async resetSyncStageStartedAt(messageChannelIds: string[]) { @@ -132,7 +117,7 @@ export class MessageChannelSyncStatusService { }); } - public async markAsCompletedAndSchedulePartialMessageListFetch( + public async markAsCompletedAndScheduleMessageListFetch( messageChannelIds: string[], ) { if (!messageChannelIds.length) { @@ -146,7 +131,7 @@ export class MessageChannelSyncStatusService { await messageChannelRepository.update(messageChannelIds, { syncStatus: MessageChannelSyncStatus.ACTIVE, - syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, + syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, throttleFailureCount: 0, syncStageStartedAt: null, syncedAt: new Date().toISOString(), diff --git a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts index d2004971a..23c6f8cff 100644 --- a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts +++ b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts @@ -31,8 +31,8 @@ export enum MessageChannelSyncStatus { } export enum MessageChannelSyncStage { - FULL_MESSAGE_LIST_FETCH_PENDING = 'FULL_MESSAGE_LIST_FETCH_PENDING', - PARTIAL_MESSAGE_LIST_FETCH_PENDING = 'PARTIAL_MESSAGE_LIST_FETCH_PENDING', + FULL_MESSAGE_LIST_FETCH_PENDING = 'FULL_MESSAGE_LIST_FETCH_PENDING', // TODO: rename to MESSAGE_LIST_FETCH_PENDING + PARTIAL_MESSAGE_LIST_FETCH_PENDING = 'PARTIAL_MESSAGE_LIST_FETCH_PENDING', // TODO: to be removed, deprecated MESSAGE_LIST_FETCH_ONGOING = 'MESSAGE_LIST_FETCH_ONGOING', MESSAGES_IMPORT_PENDING = 'MESSAGES_IMPORT_PENDING', MESSAGES_IMPORT_ONGOING = 'MESSAGES_IMPORT_ONGOING', @@ -291,13 +291,13 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity { icon: 'IconStatusChange', options: [ { - value: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, + value: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, // TODO: Rename to MESSAGE_LIST_FETCH_PENDING label: 'Full messages list fetch pending', position: 0, color: 'blue', }, { - value: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, + value: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, // TODO: Deprecate label: 'Partial messages list fetch pending', position: 1, color: 'blue', diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts index 1e5306d0e..896f121b8 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts @@ -52,6 +52,7 @@ export class MessagingMessageListFetchCronJob { activeWorkspace.id, ); + // TODO: deprecate looking for FULL_MESSAGE_LIST_FETCH_PENDING as we introduce MESSAGE_LIST_FETCH_PENDING const messageChannels = await mainDataSource.query( `SELECT * FROM ${schemaName}."messageChannel" WHERE "isSyncEnabled" = true AND "syncStage" IN ('${MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING}', '${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}')`, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-network.exception.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-network.exception.ts index 2a2ee92f6..aa2fef27f 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-network.exception.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-network.exception.ts @@ -4,4 +4,5 @@ export enum MessageNetworkExceptionCode { ECONNABORTED = 'ECONNABORTED', ETIMEDOUT = 'ETIMEDOUT', ERR_NETWORK = 'ERR_NETWORK', + EHOSTUNREACH = 'EHOSTUNREACH', } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts index d9f46c48e..d795c7aa8 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts @@ -1,6 +1,8 @@ +import { GmailDefaultMessageCategory } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type'; + export const MESSAGING_GMAIL_EXCLUDED_CATEGORIES = [ - 'promotions', - 'social', - 'forums', - 'updates', + GmailDefaultMessageCategory.promotions, + GmailDefaultMessageCategory.forums, + GmailDefaultMessageCategory.social, + GmailDefaultMessageCategory.updates, ]; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.spec.ts index dc015de1a..95e3cd8df 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.spec.ts @@ -14,12 +14,13 @@ describe('GmailGetMessageListService', () => { const mockConnectedAccount: Pick< ConnectedAccountWorkspaceEntity, - 'provider' | 'refreshToken' | 'id' | 'handle' + 'provider' | 'refreshToken' | 'id' | 'handle' | 'connectionParameters' > = { id: 'connected-account-id', provider: ConnectedAccountProvider.GOOGLE, refreshToken: 'refresh-token', handle: 'test@gmail.com', + connectionParameters: {}, }; beforeEach(async () => { @@ -55,7 +56,7 @@ describe('GmailGetMessageListService', () => { gmailClientProvider = module.get(GmailClientProvider); }); - describe('getFullMessageList', () => { + describe('getMessageList', () => { it('should return 0 messageExternalIds when gmail returns 0 messages', async () => { const mockGmailClient = { users: { @@ -74,9 +75,13 @@ describe('GmailGetMessageListService', () => { mockGmailClient, ); - const result = await service.getFullMessageList(mockConnectedAccount); + const result = await service.getMessageLists({ + messageChannel: { syncCursor: '', id: 'my-id' }, + connectedAccount: mockConnectedAccount, + messageFolders: [], + }); - expect(result.messageExternalIds).toHaveLength(0); + expect(result[0].messageExternalIds).toHaveLength(0); expect(mockGmailClient.users.messages.list).toHaveBeenCalledTimes(1); }); @@ -121,9 +126,13 @@ describe('GmailGetMessageListService', () => { mockGmailClient, ); - const result = await service.getFullMessageList(mockConnectedAccount); + const result = await service.getMessageLists({ + messageChannel: { syncCursor: '', id: 'my-id' }, + connectedAccount: mockConnectedAccount, + messageFolders: [], + }); - expect(result.messageExternalIds).toHaveLength(5); + expect(result[0].messageExternalIds).toHaveLength(5); }); it('should return 3 messageExternalIds when gmail provides a nextpagetoken with 2 messages, then 1', async () => { @@ -168,9 +177,13 @@ describe('GmailGetMessageListService', () => { mockGmailClient, ); - const result = await service.getFullMessageList(mockConnectedAccount); + const result = await service.getMessageLists({ + messageChannel: { syncCursor: '', id: 'my-id' }, + connectedAccount: mockConnectedAccount, + messageFolders: [], + }); - expect(result.messageExternalIds).toHaveLength(3); + expect(result[0].messageExternalIds).toHaveLength(3); expect(mockGmailClient.users.messages.list).toHaveBeenCalledTimes(2); }); it('should go through while loop once when gmail provides a nextpagetoken but 0 messages - should never happen IRL', async () => { @@ -196,9 +209,13 @@ describe('GmailGetMessageListService', () => { mockGmailClient, ); - const result = await service.getFullMessageList(mockConnectedAccount); + const result = await service.getMessageLists({ + messageChannel: { syncCursor: '', id: 'my-id' }, + connectedAccount: mockConnectedAccount, + messageFolders: [], + }); - expect(result.messageExternalIds).toHaveLength(0); + expect(result[0].messageExternalIds).toHaveLength(0); expect(mockGmailClient.users.messages.list).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts index 8c3149b68..17aa6f686 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts @@ -1,8 +1,11 @@ import { Injectable } from '@nestjs/common'; +import { isNonEmptyString } from '@sniptt/guards'; import { gmail_v1 as gmailV1 } from 'googleapis'; +import { isDefined } from 'twenty-shared/utils'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { MessageImportDriverException, MessageImportDriverExceptionCode, @@ -14,10 +17,9 @@ import { GmailGetHistoryService } from 'src/modules/messaging/message-import-man import { GmailHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-handle-error.service'; import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter.util'; import { computeGmailCategoryLabelId } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id.util'; -import { - GetFullMessageListResponse, - GetPartialMessageListResponse, -} from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; +import { mapGmailDefaultFolderToCategoryOrUndefined } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/map-gmail-default-folder-to-category'; +import { GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type'; +import { GetMessageListsResponse } from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type'; import { assertNotNull } from 'src/utils/assert'; @Injectable() @@ -28,12 +30,13 @@ export class GmailGetMessageListService { private readonly gmailHandleErrorService: GmailHandleErrorService, ) {} - public async getFullMessageList( + private async getMessageListWithoutCursor( connectedAccount: Pick< ConnectedAccountWorkspaceEntity, 'provider' | 'refreshToken' | 'id' | 'handle' >, - ): Promise { + messageFolders: Pick[], + ): Promise { const gmailClient = await this.gmailClientProvider.getGmailClient(connectedAccount); @@ -41,6 +44,7 @@ export class GmailGetMessageListService { let hasMoreMessages = true; const messageExternalIds: string[] = []; + const excludedCategories = this.comptuteExcludedCategories(messageFolders); while (hasMoreMessages) { const messageList = await gmailClient.users.messages @@ -48,9 +52,7 @@ export class GmailGetMessageListService { userId: 'me', maxResults: MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT, pageToken, - q: computeGmailCategoryExcludeSearchFilter( - MESSAGING_GMAIL_EXCLUDED_CATEGORIES, - ), + q: computeGmailCategoryExcludeSearchFilter(excludedCategories), }) .catch((error) => { this.gmailHandleErrorService.handleGmailMessageListFetchError(error); @@ -78,10 +80,15 @@ export class GmailGetMessageListService { } if (messageExternalIds.length === 0) { - return { - messageExternalIds, - nextSyncCursor: '', - }; + return [ + { + messageExternalIds, + nextSyncCursor: '', + previousSyncCursor: '', + messageExternalIdsToDelete: [], + folderId: undefined, + }, + ]; } const firstMessageExternalId = messageExternalIds[0]; @@ -106,28 +113,42 @@ export class GmailGetMessageListService { ); } - return { messageExternalIds, nextSyncCursor }; + return [ + { + messageExternalIds, + nextSyncCursor, + previousSyncCursor: '', + messageExternalIdsToDelete: [], + folderId: undefined, + }, + ]; } - public async getPartialMessageList( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'provider' | 'refreshToken' | 'id' - >, - syncCursor: string, - ): Promise { + public async getMessageLists({ + messageChannel, + connectedAccount, + messageFolders, + }: GetMessageListsArgs): Promise { const gmailClient = await this.gmailClientProvider.getGmailClient(connectedAccount); + if (!isNonEmptyString(messageChannel.syncCursor)) { + return this.getMessageListWithoutCursor(connectedAccount, messageFolders); + } + const { history, historyId: nextSyncCursor } = - await this.gmailGetHistoryService.getHistory(gmailClient, syncCursor); + await this.gmailGetHistoryService.getHistory( + gmailClient, + messageChannel.syncCursor, + ); const { messagesAdded, messagesDeleted } = await this.gmailGetHistoryService.getMessageIdsFromHistory(history); const messageIdsToFilter = await this.getEmailIdsFromExcludedCategories( gmailClient, - syncCursor, + messageChannel.syncCursor, + messageFolders, ); const messagesAddedFiltered = messagesAdded.filter( @@ -141,21 +162,42 @@ export class GmailGetMessageListService { ); } - return { - messageExternalIds: messagesAddedFiltered, - messageExternalIdsToDelete: messagesDeleted, - previousSyncCursor: syncCursor, - nextSyncCursor, - }; + return [ + { + messageExternalIds: messagesAddedFiltered, + messageExternalIdsToDelete: messagesDeleted, + previousSyncCursor: messageChannel.syncCursor, + nextSyncCursor, + folderId: undefined, + }, + ]; + } + + private comptuteExcludedCategories( + messageFolders: Pick[], + ) { + const includedDefaultCategories = messageFolders + .map((messageFolder) => + mapGmailDefaultFolderToCategoryOrUndefined(messageFolder.name), + ) + .filter(isDefined); + + return MESSAGING_GMAIL_EXCLUDED_CATEGORIES.filter( + (excludedCategory) => + !includedDefaultCategories.includes(excludedCategory), + ); } private async getEmailIdsFromExcludedCategories( gmailClient: gmailV1.Gmail, lastSyncHistoryId: string, + messageFolders: Pick[], ): Promise { const emailIds: string[] = []; - for (const category of MESSAGING_GMAIL_EXCLUDED_CATEGORIES) { + const excludedCategories = this.comptuteExcludedCategories(messageFolders); + + for (const category of excludedCategories) { const { history } = await this.gmailGetHistoryService.getHistory( gmailClient, lastSyncHistoryId, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type.ts new file mode 100644 index 000000000..aaa90891b --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type.ts @@ -0,0 +1,6 @@ +export enum GmailDefaultMessageCategory { + promotions = 'promotions', + social = 'social', + forums = 'forums', + updates = 'updates', +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-folder.type.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-folder.type.ts new file mode 100644 index 000000000..6cd72fb55 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-folder.type.ts @@ -0,0 +1,6 @@ +export enum GmailDefaultMessageFolder { + CATEGORY_PROMOTIONS = 'CATEGORY_PROMOTIONS', + CATEGORY_SOCIAL = 'CATEGORY_SOCIAL', + CATEGORY_FORUMS = 'CATEGORY_FORUMS', + CATEGORY_UPDATES = 'CATEGORY_UPDATES', +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/map-gmail-default-folder-to-category.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/map-gmail-default-folder-to-category.ts new file mode 100644 index 000000000..6f93cbb70 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/map-gmail-default-folder-to-category.ts @@ -0,0 +1,37 @@ +import { isDefined } from 'twenty-shared/utils'; + +import { GmailDefaultMessageCategory } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type'; +import { GmailDefaultMessageFolder } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-folder.type'; + +const DEFAULT_FOLDER_NAME_TO_CATEGORY_MAPPING = [ + { + folderName: GmailDefaultMessageFolder.CATEGORY_FORUMS, + category: GmailDefaultMessageCategory.forums, + }, + { + folderName: GmailDefaultMessageFolder.CATEGORY_PROMOTIONS, + category: GmailDefaultMessageCategory.promotions, + }, + { + folderName: GmailDefaultMessageFolder.CATEGORY_SOCIAL, + category: GmailDefaultMessageCategory.social, + }, + { + folderName: GmailDefaultMessageFolder.CATEGORY_UPDATES, + category: GmailDefaultMessageCategory.updates, + }, +]; + +export const mapGmailDefaultFolderToCategoryOrUndefined = ( + messageFolderName: string, +) => { + const mapping = DEFAULT_FOLDER_NAME_TO_CATEGORY_MAPPING.find( + ({ folderName }) => folderName === messageFolderName, + ); + + if (!isDefined(mapping)) { + return undefined; + } + + return mapping.category; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts index 2050d65c7..9c5986d36 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts @@ -17,6 +17,7 @@ export const parseGaxiosError = ( case MessageNetworkExceptionCode.ECONNABORTED: case MessageNetworkExceptionCode.ETIMEDOUT: case MessageNetworkExceptionCode.ERR_NETWORK: + case MessageNetworkExceptionCode.EHOSTUNREACH: return new MessageImportDriverException( error.message, MessageImportDriverExceptionCode.TEMPORARY_ERROR, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service.ts index f49ead617..491a45ae0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service.ts @@ -2,11 +2,11 @@ import { Injectable, Logger } from '@nestjs/common'; import { ImapFlow } from 'imapflow'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { ImapClientProvider } from 'src/modules/messaging/message-import-manager/drivers/imap/providers/imap-client.provider'; import { ImapHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-handle-error.service'; import { findSentMailbox } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/find-sent-mailbox.util'; -import { GetFullMessageListResponse } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; +import { GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type'; +import { GetMessageListsResponse } from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type'; @Injectable() export class ImapGetMessageListService { @@ -17,74 +17,10 @@ export class ImapGetMessageListService { private readonly imapHandleErrorService: ImapHandleErrorService, ) {} - async getFullMessageList( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'id' | 'provider' | 'connectionParameters' | 'handle' - >, - ): Promise { - try { - const client = await this.imapClientProvider.getClient(connectedAccount); - - const mailboxes = ['INBOX']; - - const sentFolder = await findSentMailbox(client, this.logger); - - if (sentFolder) { - mailboxes.push(sentFolder); - } - - let allMessages: { id: string; date: string }[] = []; - - for (const mailbox of mailboxes) { - try { - const messages = await this.getMessagesFromMailbox(client, mailbox); - - allMessages = [...allMessages, ...messages]; - this.logger.log( - `Fetched ${messages.length} messages from ${mailbox}`, - ); - } catch (error) { - this.logger.warn( - `Error fetching from mailbox ${mailbox}: ${error.message}. Continuing with other mailboxes.`, - ); - } - } - - allMessages.sort( - (a, b) => new Date(b.date).getTime() - new Date(a.date).getTime(), - ); - - const messageExternalIds = allMessages.map((message) => message.id); - - const nextSyncCursor = - allMessages.length > 0 ? allMessages[allMessages.length - 1].date : ''; - - return { - messageExternalIds, - nextSyncCursor, - }; - } catch (error) { - this.logger.error( - `Error getting message list: ${error.message}`, - error.stack, - ); - - this.imapHandleErrorService.handleImapMessageListFetchError(error); - - return { messageExternalIds: [], nextSyncCursor: '' }; - } finally { - await this.imapClientProvider.closeClient(connectedAccount.id); - } - } - - async getPartialMessageList( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'id' | 'provider' | 'connectionParameters' | 'handle' - >, - syncCursor?: string, - ): Promise<{ messageExternalIds: string[]; nextSyncCursor: string }> { + async getMessageLists({ + messageChannel, + connectedAccount, + }: GetMessageListsArgs): Promise { try { const client = await this.imapClientProvider.getClient(connectedAccount); @@ -103,7 +39,7 @@ export class ImapGetMessageListService { const messages = await this.getMessagesFromMailbox( client, mailbox, - syncCursor, + messageChannel.syncCursor, ); allMessages = [...allMessages, ...messages]; @@ -126,12 +62,17 @@ export class ImapGetMessageListService { const nextSyncCursor = allMessages.length > 0 ? allMessages[allMessages.length - 1].date - : syncCursor || ''; + : messageChannel.syncCursor || ''; - return { - messageExternalIds, - nextSyncCursor, - }; + return [ + { + messageExternalIds, + nextSyncCursor, + previousSyncCursor: messageChannel.syncCursor, + messageExternalIdsToDelete: [], + folderId: undefined, + }, + ]; } catch (error) { this.logger.error( `Error getting message list: ${error.message}`, @@ -140,7 +81,15 @@ export class ImapGetMessageListService { this.imapHandleErrorService.handleImapMessageListFetchError(error); - return { messageExternalIds: [], nextSyncCursor: syncCursor || '' }; + return [ + { + messageExternalIds: [], + nextSyncCursor: messageChannel.syncCursor || '', + previousSyncCursor: messageChannel.syncCursor, + messageExternalIdsToDelete: [], + folderId: undefined, + }, + ]; } finally { await this.imapClientProvider.closeClient(connectedAccount.id); } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts index 836586d57..f5cf39555 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts @@ -5,6 +5,7 @@ import { ConnectedAccountProvider } from 'twenty-shared/types'; import { TwentyConfigModule } from 'src/engine/core-modules/twenty-config/twenty-config.module'; import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { microsoftGraphWithMessagesDeltaLink } from 'src/modules/messaging/message-import-manager/drivers/microsoft/mocks/microsoft-api-examples'; @@ -17,10 +18,23 @@ import { MicrosoftHandleErrorService } from './microsoft-handle-error.service'; // in case you have "Please provide a valid token" it may be because you need to pass the env varible to the .env.test file const refreshToken = 'replace-with-your-refresh-token'; const syncCursor = `replace-with-your-sync-cursor`; -const mockConnectedAccount = { +const mockConnectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'provider' | 'refreshToken' | 'id' | 'handle' | 'connectionParameters' +> = { id: 'connected-account-id', provider: ConnectedAccountProvider.MICROSOFT, refreshToken: refreshToken, + handle: 'test@gmail.com', + connectionParameters: {}, +}; + +const mockMessageChannel: Pick< + MessageChannelWorkspaceEntity, + 'id' | 'syncCursor' +> = { + id: 'message-channel-id', + syncCursor: '', // Should be empty for Microsoft as cursors are stored at the folder level }; xdescribe('Microsoft dev tests : get message list service', () => { @@ -44,12 +58,19 @@ xdescribe('Microsoft dev tests : get message list service', () => { }); it('Should fetch and return message list successfully', async () => { - const result = await service.getFullMessageList( - mockConnectedAccount, - MessageFolderName.INBOX, - ); + const result = await service.getMessageLists({ + connectedAccount: mockConnectedAccount, + messageChannel: mockMessageChannel, + messageFolders: [ + { + id: 'inbox-folder-id', + name: MessageFolderName.INBOX, + syncCursor: 'inbox-sync-cursor', + }, + ], + }); - expect(result.messageExternalIds.length).toBeGreaterThan(0); + expect(result[0].messageExternalIds.length).toBeGreaterThan(0); }); it('Should throw token error', async () => { @@ -57,113 +78,65 @@ xdescribe('Microsoft dev tests : get message list service', () => { id: 'connected-account-id', provider: ConnectedAccountProvider.MICROSOFT, refreshToken: 'invalid-token', + handle: 'test@microsoft.com', + connectionParameters: {}, }; await expect( - service.getFullMessageList( - mockConnectedAccountUnvalid, - MessageFolderName.INBOX, - ), + service.getMessageLists({ + connectedAccount: mockConnectedAccountUnvalid, + messageChannel: mockMessageChannel, + messageFolders: [ + { + id: 'inbox-folder-id', + name: MessageFolderName.INBOX, + syncCursor: 'inbox-sync-cursor', + }, + ], + }), ).rejects.toThrowError('Access token is undefined or empty'); }); // if you need to run this test, you need to manually update the syncCursor to a valid one xit('Should fetch and return partial message list successfully', async () => { - const result = await service.getPartialMessageList( - mockConnectedAccount, - syncCursor, - ); + const result = await service.getMessageLists({ + connectedAccount: mockConnectedAccount, + messageChannel: mockMessageChannel, + messageFolders: [ + { + id: 'inbox-folder-id', + name: MessageFolderName.INBOX, + syncCursor: syncCursor, + }, + ], + }); - expect(result.nextSyncCursor).toBeTruthy(); + expect(result[0].nextSyncCursor).toBeTruthy(); }); it('Should fail partial message if syncCursor is invalid', async () => { await expect( - service.getPartialMessageList(mockConnectedAccount, 'invalid-syncCursor'), + service.getMessageLists({ + messageChannel: { + id: 'message-channel-id', + syncCursor: '', + }, + connectedAccount: mockConnectedAccount, + messageFolders: [ + { + id: 'inbox-folder-id', + name: MessageFolderName.INBOX, + syncCursor: 'invalid-syncCursor', + }, + ], + }), ).rejects.toThrowError( /Resource not found for the segment|Badly formed content/g, ); }); - - it('Should fail partial message if syncCursor is missing', async () => { - await expect( - service.getPartialMessageList(mockConnectedAccount, ''), - ).rejects.toThrowError(/Missing SyncCursor/g); - }); }); -xdescribe('Microsoft dev tests : get full message list service for folders', () => { - let service: MicrosoftGetMessageListService; - - const inboxFolder = new MessageFolderWorkspaceEntity(); - - inboxFolder.id = 'inbox-folder-id'; - inboxFolder.name = MessageFolderName.INBOX; - inboxFolder.syncCursor = 'inbox-sync-cursor'; - inboxFolder.messageChannelId = 'message-channel-1'; - - const sentFolder = new MessageFolderWorkspaceEntity(); - - sentFolder.id = 'sent-folder-id'; - sentFolder.name = MessageFolderName.SENT_ITEMS; - sentFolder.syncCursor = 'sent-sync-cursor'; - sentFolder.messageChannelId = 'message-channel-1'; - - const otherFolder = new MessageFolderWorkspaceEntity(); - - otherFolder.id = 'other-folder-id'; - otherFolder.name = 'other'; - otherFolder.syncCursor = 'other-sync-cursor'; - otherFolder.messageChannelId = 'message-channel-2'; - - beforeEach(async () => { - const module: TestingModule = await Test.createTestingModule({ - imports: [TwentyConfigModule.forRoot()], - providers: [ - MicrosoftGetMessageListService, - MicrosoftClientProvider, - MicrosoftHandleErrorService, - MicrosoftOAuth2ClientManagerService, - ConfigService, - ], - }).compile(); - - service = module.get( - MicrosoftGetMessageListService, - ); - }); - - it('Should return empty array', async () => { - const result = await service.getFullMessageListForFolders( - mockConnectedAccount, - [], - ); - - expect(result.length).toBe(0); - }); - - it('Should return an array of one item', async () => { - const result = await service.getFullMessageListForFolders( - mockConnectedAccount, - [inboxFolder], - ); - - expect(result.length).toBe(1); - expect(result[0].folderId).toBe(inboxFolder.id); - expect(result[0].messageExternalIds.length).toBeGreaterThan(0); - }); - - it('Should return an array of two items', async () => { - const result = await service.getFullMessageListForFolders( - mockConnectedAccount, - [inboxFolder, sentFolder], - ); - - expect(result.length).toBe(2); - }); -}); - -xdescribe('Microsoft dev tests : get partial message list service for folders', () => { +xdescribe('Microsoft dev tests : get message list service for folders', () => { let service: MicrosoftGetMessageListService; const inboxFolder = new MessageFolderWorkspaceEntity(); @@ -234,19 +207,21 @@ xdescribe('Microsoft dev tests : get partial message list service for folders', }); it('Should return empty array', async () => { - const result = await service.getPartialMessageListForFolders( - mockConnectedAccount, - messageChannelNoFolders, - ); + const result = await service.getMessageLists({ + messageChannel: messageChannelNoFolders, + connectedAccount: mockConnectedAccount, + messageFolders: [], + }); expect(result.length).toBe(0); }); it('Should return an array of one items', async () => { - const result = await service.getPartialMessageListForFolders( - mockConnectedAccount, - messageChannelMicrosoftOneFolder, - ); + const result = await service.getMessageLists({ + messageChannel: messageChannelMicrosoftOneFolder, + connectedAccount: mockConnectedAccount, + messageFolders: [inboxFolder], + }); expect(result.length).toBe(1); expect(result[0].folderId).toBe(inboxFolder.id); @@ -254,10 +229,11 @@ xdescribe('Microsoft dev tests : get partial message list service for folders', }); it('Should return an array of two items', async () => { - const result = await service.getPartialMessageListForFolders( - mockConnectedAccount, - messageChannelMicrosoft, - ); + const result = await service.getMessageLists({ + messageChannel: messageChannelMicrosoft, + connectedAccount: mockConnectedAccount, + messageFolders: [inboxFolder, sentFolder], + }); expect(result.length).toBe(2); }); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts index 02888d833..44e22f8f5 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts @@ -5,11 +5,11 @@ import { PageIterator, PageIteratorCallback, } from '@microsoft/microsoft-graph-client'; +import { isNonEmptyString } from '@sniptt/guards'; import { v4 } from 'uuid'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { MessageImportDriverException, @@ -19,12 +19,12 @@ import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-ma import { MicrosoftHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-handle-error.service'; import { MessageFolderName } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/folders'; import { isAccessTokenRefreshingError } from 'src/modules/messaging/message-import-manager/drivers/microsoft/utils/is-access-token-refreshing-error.utils'; +import { GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type'; import { - GetFullMessageListForFoldersResponse, - GetFullMessageListResponse, - GetPartialMessageListForFoldersResponse, - GetPartialMessageListResponse, -} from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; + GetMessageListsResponse, + GetOneMessageListResponse, +} from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type'; + // Microsoft API limit is 999 messages per request on this endpoint const MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT = 999; @@ -36,101 +36,14 @@ export class MicrosoftGetMessageListService { private readonly twentyORMManager: TwentyORMManager, ) {} - public async getFullMessageListForFolders( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'refreshToken' | 'id' - >, - folders: Pick[], - ): Promise { - const result: GetFullMessageListForFoldersResponse[] = []; + public async getMessageLists({ + messageChannel, + connectedAccount, + messageFolders, + }: GetMessageListsArgs): Promise { + const result: GetMessageListsResponse = []; - for (const folder of folders) { - const response = await this.getFullMessageList( - connectedAccount, - folder.name as MessageFolderName, - ); - - result.push({ - ...response, - folderId: folder.id, - }); - } - - return result; - } - - public async getFullMessageList( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'refreshToken' | 'id' - >, - folderName: MessageFolderName, - ): Promise { - const messageExternalIds: string[] = []; - - const microsoftClient = - await this.microsoftClientProvider.getMicrosoftClient(connectedAccount); - - const response: PageCollection = await microsoftClient - .api(`/me/mailfolders/${folderName}/messages/delta?$select=id`) - .version('beta') - .headers({ - Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`, - }) - .get() - .catch((error) => { - if (isAccessTokenRefreshingError(error?.body)) { - throw new MessageImportDriverException( - error.message, - MessageImportDriverExceptionCode.CLIENT_NOT_AVAILABLE, - ); - } - this.microsoftHandleErrorService.handleMicrosoftGetMessageListError( - error, - ); - }); - - const callback: PageIteratorCallback = (data) => { - messageExternalIds.push(data.id); - - return true; - }; - - const pageIterator = new PageIterator(microsoftClient, response, callback, { - headers: { - Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`, - }, - }); - - await pageIterator.iterate().catch((error) => { - if (isAccessTokenRefreshingError(error?.body)) { - throw new MessageImportDriverException( - error.message, - MessageImportDriverExceptionCode.CLIENT_NOT_AVAILABLE, - ); - } - this.microsoftHandleErrorService.handleMicrosoftGetMessageListError( - error, - ); - }); - - return { - messageExternalIds: messageExternalIds, - nextSyncCursor: pageIterator.getDeltaLink() || '', - }; - } - - public async getPartialMessageListForFolders( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'provider' | 'refreshToken' | 'id' - >, - messageChannel: MessageChannelWorkspaceEntity, - ): Promise { - const result: GetPartialMessageListForFoldersResponse[] = []; - - if (messageChannel.messageFolders.length === 0) { + if (messageFolders.length === 0) { // permanent solution: // throw new MessageImportDriverException( // `Message channel ${messageChannel.id} has no message folders`, @@ -158,10 +71,10 @@ export class MicrosoftGetMessageListService { syncCursor: messageChannel.syncCursor, }); - const response = await this.getPartialMessageList( - connectedAccount, - messageChannel.syncCursor, - ); + const response = await this.getMessageList(connectedAccount, { + name: MessageFolderName.INBOX, + syncCursor: messageChannel.syncCursor, + }); result.push({ ...response, @@ -173,11 +86,8 @@ export class MicrosoftGetMessageListService { return result; } - for (const folder of messageChannel.messageFolders) { - const response = await this.getPartialMessageList( - connectedAccount, - folder.syncCursor, - ); + for (const folder of messageFolders) { + const response = await this.getMessageList(connectedAccount, folder); result.push({ ...response, @@ -188,29 +98,25 @@ export class MicrosoftGetMessageListService { return result; } - public async getPartialMessageList( + public async getMessageList( connectedAccount: Pick< ConnectedAccountWorkspaceEntity, 'provider' | 'refreshToken' | 'id' >, - syncCursor: string, - ): Promise { - // important: otherwise tries to get the full message list - if (!syncCursor) { - throw new MessageImportDriverException( - 'Missing SyncCursor', - MessageImportDriverExceptionCode.SYNC_CURSOR_ERROR, - ); - } - + messageFolder: Pick, + ): Promise { const messageExternalIds: string[] = []; const messageExternalIdsToDelete: string[] = []; const microsoftClient = await this.microsoftClientProvider.getMicrosoftClient(connectedAccount); + const apiUrl = isNonEmptyString(messageFolder.syncCursor) + ? messageFolder.syncCursor + : `/me/mailfolders/${messageFolder.name}/messages/delta?$select=id`; + const response: PageCollection = await microsoftClient - .api(syncCursor) + .api(apiUrl) .version('beta') .headers({ Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`, @@ -259,8 +165,9 @@ export class MicrosoftGetMessageListService { return { messageExternalIds, messageExternalIdsToDelete, - previousSyncCursor: syncCursor, + previousSyncCursor: messageFolder.syncCursor, nextSyncCursor: pageIterator.getDeltaLink() || '', + folderId: undefined, }; } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index 77bc0e57c..6aa49d3c9 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -12,12 +12,11 @@ import { MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service'; -import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessageImportExceptionHandlerService, MessageImportSyncStep, } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; -import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; +import { MessagingMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service'; import { MessagingMonitoringService } from 'src/modules/messaging/monitoring/services/messaging-monitoring.service'; export type MessagingMessageListFetchJobData = { @@ -31,8 +30,7 @@ export type MessagingMessageListFetchJobData = { }) export class MessagingMessageListFetchJob { constructor( - private readonly messagingFullMessageListFetchService: MessagingFullMessageListFetchService, - private readonly messagingPartialMessageListFetchService: MessagingPartialMessageListFetchService, + private readonly messagingMessageListFetchService: MessagingMessageListFetchService, private readonly messagingMonitoringService: MessagingMonitoringService, private readonly twentyORMManager: TwentyORMManager, private readonly connectedAccountRefreshTokensService: ConnectedAccountRefreshTokensService, @@ -88,29 +86,7 @@ export class MessagingMessageListFetchJob { ); switch (messageChannel.syncStage) { - case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING: - await this.messagingMonitoringService.track({ - eventName: 'partial_message_list_fetch.started', - workspaceId, - connectedAccountId: messageChannel.connectedAccount.id, - messageChannelId: messageChannel.id, - }); - - await this.messagingPartialMessageListFetchService.processMessageListFetch( - messageChannel, - messageChannel.connectedAccount, - workspaceId, - ); - - await this.messagingMonitoringService.track({ - eventName: 'partial_message_list_fetch.completed', - workspaceId, - connectedAccountId: messageChannel.connectedAccount.id, - messageChannelId: messageChannel.id, - }); - - break; - + case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING: // TODO: deprecate as we introduce MESSAGE_LIST_FETCH_PENDING case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING: await this.messagingMonitoringService.track({ eventName: 'full_message_list_fetch.started', @@ -119,7 +95,7 @@ export class MessagingMessageListFetchJob { messageChannelId: messageChannel.id, }); - await this.messagingFullMessageListFetchService.processMessageListFetch( + await this.messagingMessageListFetchService.processMessageListFetch( messageChannel, workspaceId, ); @@ -139,7 +115,7 @@ export class MessagingMessageListFetchJob { } catch (error) { await this.messageImportErrorHandlerService.handleDriverException( error, - MessageImportSyncStep.FULL_OR_PARTIAL_MESSAGE_LIST_FETCH, + MessageImportSyncStep.MESSAGE_LIST_FETCH, messageChannel, workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts index f8ec658d3..cd8578333 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts @@ -61,7 +61,7 @@ export class MessagingOngoingStaleJob { switch (messageChannel.syncStage) { case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING: - await this.messageChannelSyncStatusService.schedulePartialMessageListFetch( + await this.messageChannelSyncStatusService.scheduleMessageListFetch( [messageChannel.id], ); break; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index aaf8617fd..f0a0aa971 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -31,13 +31,12 @@ import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-m import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener'; import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service'; import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; -import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service'; import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; +import { MessagingMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service'; import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service'; import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service'; -import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service'; import { MessagingSendMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-send-message.service'; import { MessageParticipantManagerModule } from 'src/modules/messaging/message-participant-manager/message-participant-manager.module'; @@ -78,8 +77,7 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess MessagingMessageImportManagerMessageChannelListener, MessagingCleanCacheJob, MessagingMessageService, - MessagingPartialMessageListFetchService, - MessagingFullMessageListFetchService, + MessagingMessageListFetchService, MessagingMessagesImportService, MessagingSaveMessagesAndEnqueueContactCreationService, MessagingGetMessageListService, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts index 44b86b03a..12b6716ed 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts @@ -2,9 +2,7 @@ import { Injectable } from '@nestjs/common'; import { ConnectedAccountProvider } from 'twenty-shared/types'; -import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { GmailGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service'; import { ImapGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service'; import { MicrosoftGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service'; @@ -12,29 +10,7 @@ import { MessageImportException, MessageImportExceptionCode, } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; -import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; - -export type GetFullMessageListResponse = { - messageExternalIds: string[]; - nextSyncCursor: string; -}; - -export type GetFullMessageListForFoldersResponse = - GetFullMessageListResponse & { - folderId: string | undefined; - }; - -export type GetPartialMessageListResponse = { - messageExternalIds: string[]; - messageExternalIdsToDelete: string[]; - previousSyncCursor: string; - nextSyncCursor: string; -}; - -export type GetPartialMessageListForFoldersResponse = - GetPartialMessageListResponse & { - folderId: string | undefined; - }; +import { GetMessageListsResponse } from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type'; @Injectable() export class MessagingGetMessageListService { @@ -42,100 +18,30 @@ export class MessagingGetMessageListService { private readonly gmailGetMessageListService: GmailGetMessageListService, private readonly microsoftGetMessageListService: MicrosoftGetMessageListService, private readonly imapGetMessageListService: ImapGetMessageListService, - private readonly messagingCursorService: MessagingCursorService, - private readonly twentyORMManager: TwentyORMManager, ) {} - public async getFullMessageLists( + public async getMessageLists( messageChannel: MessageChannelWorkspaceEntity, - ): Promise { - switch (messageChannel.connectedAccount.provider) { - case ConnectedAccountProvider.GOOGLE: { - const fullMessageList = - await this.gmailGetMessageListService.getFullMessageList( - messageChannel.connectedAccount, - ); - - return [ - { - ...fullMessageList, - folderId: undefined, - }, - ]; - } - case ConnectedAccountProvider.MICROSOFT: { - const folderRepository = - await this.twentyORMManager.getRepository( - 'messageFolder', - ); - - const folders = await folderRepository.find({ - where: { - messageChannelId: messageChannel.id, - }, - }); - - return this.microsoftGetMessageListService.getFullMessageListForFolders( - messageChannel.connectedAccount, - folders, - ); - } - case ConnectedAccountProvider.IMAP_SMTP_CALDAV: { - const fullMessageList = - await this.imapGetMessageListService.getFullMessageList( - messageChannel.connectedAccount, - ); - - return [ - { - ...fullMessageList, - folderId: undefined, - }, - ]; - } - default: - throw new MessageImportException( - `Provider ${messageChannel.connectedAccount.provider} is not supported`, - MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, - ); - } - } - - public async getPartialMessageLists( - messageChannel: MessageChannelWorkspaceEntity, - ): Promise { + ): Promise { switch (messageChannel.connectedAccount.provider) { case ConnectedAccountProvider.GOOGLE: - return [ - { - ...(await this.gmailGetMessageListService.getPartialMessageList( - messageChannel.connectedAccount, - messageChannel.syncCursor, - )), - folderId: undefined, - }, - ]; - case ConnectedAccountProvider.MICROSOFT: - return this.microsoftGetMessageListService.getPartialMessageListForFolders( - messageChannel.connectedAccount, + return await this.gmailGetMessageListService.getMessageLists({ messageChannel, - ); + connectedAccount: messageChannel.connectedAccount, + messageFolders: messageChannel.messageFolders, + }); + case ConnectedAccountProvider.MICROSOFT: + return this.microsoftGetMessageListService.getMessageLists({ + messageChannel, + connectedAccount: messageChannel.connectedAccount, + messageFolders: messageChannel.messageFolders, + }); case ConnectedAccountProvider.IMAP_SMTP_CALDAV: { - const messageList = - await this.imapGetMessageListService.getPartialMessageList( - messageChannel.connectedAccount, - messageChannel.syncCursor, - ); - - return [ - { - messageExternalIds: messageList.messageExternalIds, - messageExternalIdsToDelete: [], - previousSyncCursor: messageChannel.syncCursor || '', - nextSyncCursor: messageList.nextSyncCursor || '', - folderId: undefined, - }, - ]; + return await this.imapGetMessageListService.getMessageLists({ + messageChannel, + connectedAccount: messageChannel.connectedAccount, + messageFolders: messageChannel.messageFolders, + }); } default: throw new MessageImportException( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts index 32b05fd4b..97ab4fb85 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts @@ -21,9 +21,9 @@ import { } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; export enum MessageImportSyncStep { - FULL_MESSAGE_LIST_FETCH = 'FULL_MESSAGE_LIST_FETCH', - PARTIAL_MESSAGE_LIST_FETCH = 'PARTIAL_MESSAGE_LIST_FETCH', - FULL_OR_PARTIAL_MESSAGE_LIST_FETCH = 'FULL_OR_PARTIAL_MESSAGE_LIST_FETCH', + FULL_MESSAGE_LIST_FETCH = 'FULL_MESSAGE_LIST_FETCH', // TODO: deprecate to only use MESSAGE_LIST_FETCH + PARTIAL_MESSAGE_LIST_FETCH = 'PARTIAL_MESSAGE_LIST_FETCH', // TODO: deprecate to only use MESSAGE_LIST_FETCH + MESSAGE_LIST_FETCH = 'MESSAGE_LIST_FETCH', MESSAGES_IMPORT_PENDING = 'MESSAGES_IMPORT_PENDING', MESSAGES_IMPORT_ONGOING = 'MESSAGES_IMPORT_ONGOING', } @@ -145,16 +145,11 @@ export class MessageImportExceptionHandlerService { switch (syncStep) { case MessageImportSyncStep.FULL_MESSAGE_LIST_FETCH: - await this.messageChannelSyncStatusService.scheduleFullMessageListFetch( - [messageChannel.id], - ); + await this.messageChannelSyncStatusService.scheduleMessageListFetch([ + messageChannel.id, + ]); break; - case MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH: - await this.messageChannelSyncStatusService.schedulePartialMessageListFetch( - [messageChannel.id], - ); - break; case MessageImportSyncStep.MESSAGES_IMPORT_PENDING: case MessageImportSyncStep.MESSAGES_IMPORT_ONGOING: await this.messageChannelSyncStatusService.scheduleMessagesImport([ @@ -233,7 +228,7 @@ export class MessageImportExceptionHandlerService { return; } - await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch( + await this.messageChannelSyncStatusService.resetAndScheduleMessageListFetch( [messageChannel.id], workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.spec.ts similarity index 87% rename from packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.spec.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.spec.ts index bedb6aab2..fd4e3b9ef 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.spec.ts @@ -10,12 +10,12 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/stan import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; -import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; +import { MessagingMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service'; -describe('MessagingFullMessageListFetchService', () => { - let messagingFullMessageListFetchService: MessagingFullMessageListFetchService; +describe('MessagingMessageListFetchService', () => { + let messagingMessageListFetchService: MessagingMessageListFetchService; let messagingGetMessageListService: MessagingGetMessageListService; let messageChannelSyncStatusService: MessageChannelSyncStatusService; let twentyORMManager: TwentyORMManager; @@ -72,7 +72,7 @@ describe('MessagingFullMessageListFetchService', () => { const module: TestingModule = await Test.createTestingModule({ providers: [ - MessagingFullMessageListFetchService, + MessagingMessageListFetchService, { provide: CacheStorageNamespace.ModuleMessaging, useValue: { @@ -82,12 +82,11 @@ describe('MessagingFullMessageListFetchService', () => { { provide: MessagingGetMessageListService, useValue: { - getFullMessageLists: jest + getMessageLists: jest .fn() - .mockImplementation((messageChannel) => { + .mockImplementation(({ connectedAccount }) => { if ( - messageChannel.connectedAccount.provider === - ConnectedAccountProvider.GOOGLE + connectedAccount.provider === ConnectedAccountProvider.GOOGLE ) { return [ { @@ -162,9 +161,9 @@ describe('MessagingFullMessageListFetchService', () => { ], }).compile(); - messagingFullMessageListFetchService = - module.get( - MessagingFullMessageListFetchService, + messagingMessageListFetchService = + module.get( + MessagingMessageListFetchService, ); messagingGetMessageListService = module.get( MessagingGetMessageListService, @@ -180,7 +179,7 @@ describe('MessagingFullMessageListFetchService', () => { }); it('should process Microsoft message list fetch correctly', async () => { - await messagingFullMessageListFetchService.processMessageListFetch( + await messagingMessageListFetchService.processMessageListFetch( mockMicrosoftMessageChannel, workspaceId, ); @@ -189,9 +188,9 @@ describe('MessagingFullMessageListFetchService', () => { messageChannelSyncStatusService.markAsMessagesListFetchOngoing, ).toHaveBeenCalledWith([mockMicrosoftMessageChannel.id]); - expect( - messagingGetMessageListService.getFullMessageLists, - ).toHaveBeenCalledWith(mockMicrosoftMessageChannel); + expect(messagingGetMessageListService.getMessageLists).toHaveBeenCalledWith( + mockMicrosoftMessageChannel, + ); expect(twentyORMManager.getRepository).toHaveBeenCalledWith( 'messageChannelMessageAssociation', @@ -209,7 +208,7 @@ describe('MessagingFullMessageListFetchService', () => { }); it('should process Google message list fetch correctly', async () => { - await messagingFullMessageListFetchService.processMessageListFetch( + await messagingMessageListFetchService.processMessageListFetch( mockGoogleMessageChannel, workspaceId, ); @@ -218,9 +217,9 @@ describe('MessagingFullMessageListFetchService', () => { messageChannelSyncStatusService.markAsMessagesListFetchOngoing, ).toHaveBeenCalledWith([mockGoogleMessageChannel.id]); - expect( - messagingGetMessageListService.getFullMessageLists, - ).toHaveBeenCalledWith(mockGoogleMessageChannel); + expect(messagingGetMessageListService.getMessageLists).toHaveBeenCalledWith( + mockGoogleMessageChannel, + ); expect(twentyORMManager.getRepository).toHaveBeenCalledWith( 'messageChannelMessageAssociation', diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.ts similarity index 92% rename from packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.ts index 93e34bb66..7d52f69ec 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.ts @@ -17,7 +17,7 @@ import { MessageImportSyncStep, } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; @Injectable() -export class MessagingFullMessageListFetchService { +export class MessagingMessageListFetchService { constructor( @InjectCacheStorage(CacheStorageNamespace.ModuleMessaging) private readonly cacheStorage: CacheStorageService, @@ -38,17 +38,17 @@ export class MessagingFullMessageListFetchService { [messageChannel.id], ); - const fullMessageLists = - await this.messagingGetMessageListService.getFullMessageLists( + const messageLists = + await this.messagingGetMessageListService.getMessageLists( messageChannel, ); - const isEmptyMailbox = fullMessageLists.some( - (fullMessageList) => fullMessageList.messageExternalIds.length === 0, + const isEmptyMailbox = messageLists.some( + (messageList) => messageList.messageExternalIds.length === 0, ); if (isEmptyMailbox) { - await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch( + await this.messageChannelSyncStatusService.resetAndScheduleMessageListFetch( [messageChannel.id], workspaceId, ); @@ -56,9 +56,8 @@ export class MessagingFullMessageListFetchService { return; } - for (const fullMessageList of fullMessageLists) { - const { messageExternalIds, nextSyncCursor, folderId } = - fullMessageList; + for (const messageList of messageLists) { + const { messageExternalIds, nextSyncCursor, folderId } = messageList; const messageChannelMessageAssociationRepository = await this.twentyORMManager.getRepository( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.spec.ts index ec6484ddb..51fbca8c0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.spec.ts @@ -66,7 +66,7 @@ describe('MessagingMessagesImportService', () => { provide: MessageChannelSyncStatusService, useValue: { markAsMessagesImportOngoing: jest.fn().mockResolvedValue(undefined), - markAsCompletedAndSchedulePartialMessageListFetch: jest + markAsCompletedAndScheduleMessageListFetch: jest .fn() .mockResolvedValue(undefined), scheduleMessagesImport: jest.fn().mockResolvedValue(undefined), @@ -191,7 +191,7 @@ describe('MessagingMessagesImportService', () => { it('should fails if SyncStage is not MESSAGES_IMPORT_PENDING', async () => { mockMessageChannel.syncStage = - MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING; + MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING; expect( service.processMessageBatchImport( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts index 9b1ccceff..58d7e0d0d 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts @@ -88,7 +88,7 @@ export class MessagingMessagesImportService { ); if (!messageIdsToFetch?.length) { - await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( + await this.messageChannelSyncStatusService.markAsCompletedAndScheduleMessageListFetch( [messageChannel.id], ); @@ -125,7 +125,7 @@ export class MessagingMessagesImportService { if ( messageIdsToFetch.length < MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE ) { - await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( + await this.messageChannelSyncStatusService.markAsCompletedAndScheduleMessageListFetch( [messageChannel.id], ); } else { diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts deleted file mode 100644 index 21ce030e3..000000000 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { In } from 'typeorm'; - -import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; -import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; -import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; -import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; -import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; -import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; -import { - MessageImportExceptionHandlerService, - MessageImportSyncStep, -} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; - -@Injectable() -export class MessagingPartialMessageListFetchService { - constructor( - @InjectCacheStorage(CacheStorageNamespace.ModuleMessaging) - private readonly cacheStorage: CacheStorageService, - private readonly messagingGetMessageListService: MessagingGetMessageListService, - private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService, - private readonly twentyORMManager: TwentyORMManager, - private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, - private readonly messagingMessageCleanerService: MessagingMessageCleanerService, - private readonly messagingCursorService: MessagingCursorService, - ) {} - - public async processMessageListFetch( - messageChannel: MessageChannelWorkspaceEntity, - connectedAccount: ConnectedAccountWorkspaceEntity, - workspaceId: string, - ): Promise { - try { - await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing( - [messageChannel.id], - ); - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - await messageChannelRepository.update( - { - id: messageChannel.id, - }, - { - throttleFailureCount: 0, - }, - ); - - const partialMessageLists = - await this.messagingGetMessageListService.getPartialMessageLists( - messageChannel, - ); - - for (const partialMessageList of partialMessageLists) { - const { - messageExternalIds, - messageExternalIdsToDelete, - previousSyncCursor, - nextSyncCursor, - folderId, - } = partialMessageList; - - const isPartialImportFinished = this.isPartialImportFinished( - previousSyncCursor, - nextSyncCursor, - ); - - if (isPartialImportFinished) { - continue; - } - - await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:${messageChannel.id}`, - messageExternalIds, - ); - - const messageChannelMessageAssociationRepository = - await this.twentyORMManager.getRepository( - 'messageChannelMessageAssociation', - ); - - if (messageExternalIdsToDelete.length) { - await messageChannelMessageAssociationRepository.delete({ - messageChannelId: messageChannel.id, - messageExternalId: In(messageExternalIdsToDelete), - }); - - await this.messagingMessageCleanerService.cleanWorkspaceThreads( - workspaceId, - ); - } - - await this.messagingCursorService.updateCursor( - messageChannel, - nextSyncCursor, - folderId, - ); - } - - const isPartialImportFinishedForAllFolders = partialMessageLists.every( - (partialMessageList) => - this.isPartialImportFinished( - partialMessageList.previousSyncCursor, - partialMessageList.nextSyncCursor, - ), - ); - - if (isPartialImportFinishedForAllFolders) { - await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( - [messageChannel.id], - ); - - return; - } - - await this.messageChannelSyncStatusService.scheduleMessagesImport([ - messageChannel.id, - ]); - } catch (error) { - await this.messageImportErrorHandlerService.handleDriverException( - error, - MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH, - messageChannel, - workspaceId, - ); - } - } - - private isPartialImportFinished( - previousSyncCursor: string, - nextSyncCursor: string, - ): boolean { - return previousSyncCursor === nextSyncCursor; - } -} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-args.type.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-args.type.ts new file mode 100644 index 000000000..785f43728 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-args.type.ts @@ -0,0 +1,15 @@ +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; + +export type GetMessageListsArgs = { + messageChannel: Pick; + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'provider' | 'refreshToken' | 'id' | 'handle' | 'connectionParameters' + >; + messageFolders: Pick< + MessageFolderWorkspaceEntity, + 'name' | 'syncCursor' | 'id' + >[]; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-response.type.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-response.type.ts new file mode 100644 index 000000000..ca97e4820 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-response.type.ts @@ -0,0 +1,9 @@ +export type GetOneMessageListResponse = { + messageExternalIds: string[]; + messageExternalIdsToDelete: string[]; + previousSyncCursor: string; + nextSyncCursor: string; + folderId: string | undefined; +}; + +export type GetMessageListsResponse = Array;