From 8b5eb04b07cd81864b3dd4e55b4ecb0fb09f401c Mon Sep 17 00:00:00 2001 From: Charles Bochet Date: Tue, 22 Jul 2025 11:20:57 +0200 Subject: [PATCH] Refactor mail folders (#13302) ## Context We would like to start leveraging more messageFolders during messageSync which would allow users to select folders they want to sync on their mailbox. MessageFolder is an abstraction that means folder for Microsoft, labels for Gmail, not supported for IMAP. ## What 1) We do not aim to build a FE now but we would like to take it into a consideration if they have been modified through API. So **out of scope** 2) MessageFolders will be synced regularly from Gmail, Microsoft. This is currently partially done and improving it is **out of scope** 3) Change: we were having two synchronization mechanism so far: FULL_SYNC (first time, or when no cursor is present) and PARTIAL_SYNC (when we have a cursor, we can fetch the diff since the last pull). This Full vs Partial mode was a high level concept. We now think it's an driver implementation detail (and can be inferred from the existence of a cursor). Why making this change now: as we are trying to pull several folders, a folder could need a full sync if it had no cursor, and another a partial if it has one. It does not make sense anymore to have this full vs partial difference at MessageChannel level 4) Once we are sure this work, we can start syncing different folders on Gmail side (the case for the user it to be able to fetch Promotion label) ## Implementation strategy 1) Re-use PartialMessageList implementation / API and rename it to MessageList at it's more complete 2) re-use driver level fullMessageList methods and call them messageListWithoutCursor 3) make sure that these method are folder specific ## Tests ### Gmail - Fresh fetch (without cursor): OK - Message import: OK - Additional fetch (with messageChannel cursor): OK ### Microsoft - Fresh fetch (without cursor): OK - Message import: OK - Additional fetch (with messageChannel cursor): OK ### Imap - Fresh fetch (without cursor): OK - Message import: OK - Additional fetch (with messageChannel cursor): OK --- ...ssaging-blocklist-reimport-messages.job.ts | 2 +- .../message-channel-sync-status.service.ts | 25 +-- .../message-channel.workspace-entity.ts | 8 +- .../messaging-message-list-fetch.cron.job.ts | 1 + .../exceptions/message-network.exception.ts | 1 + .../messaging-gmail-excluded-categories.ts | 10 +- .../gmail-get-message-list.service.spec.ts | 37 +++- .../gmail-get-message-list.service.ts | 102 +++++++--- .../gmail-default-message-category.type.ts | 6 + .../gmail-default-message-folder.type.ts | 6 + .../map-gmail-default-folder-to-category.ts | 37 ++++ .../gmail/utils/parse-gaxios-error.util.ts | 1 + .../services/imap-get-message-list.service.ts | 103 +++------- ...osoft-get-message-list.service.dev.spec.ts | 188 ++++++++---------- .../microsoft-get-message-list.service.ts | 151 +++----------- .../jobs/messaging-message-list-fetch.job.ts | 34 +--- .../jobs/messaging-ongoing-stale.job.ts | 2 +- .../messaging-import-manager.module.ts | 6 +- .../messaging-get-message-list.service.ts | 130 ++---------- ...saging-import-exception-handler.service.ts | 19 +- ...saging-message-list-fetch.service.spec.ts} | 37 ++-- ...> messaging-message-list-fetch.service.ts} | 17 +- .../messaging-messages-import.service.spec.ts | 4 +- .../messaging-messages-import.service.ts | 4 +- ...ging-partial-message-list-fetch.service.ts | 144 -------------- .../types/get-message-lists-args.type.ts | 15 ++ .../types/get-message-lists-response.type.ts | 9 + 27 files changed, 391 insertions(+), 708 deletions(-) create mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type.ts create mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-folder.type.ts create mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/map-gmail-default-folder-to-category.ts rename packages/twenty-server/src/modules/messaging/message-import-manager/services/{messaging-full-message-list-fetch.service.spec.ts => messaging-message-list-fetch.service.spec.ts} (87%) rename packages/twenty-server/src/modules/messaging/message-import-manager/services/{messaging-full-message-list-fetch.service.ts => messaging-message-list-fetch.service.ts} (92%) delete mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts create mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-args.type.ts create mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-response.type.ts diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts index c21ee8a2c..213508471 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts @@ -54,7 +54,7 @@ export class BlocklistReimportMessagesJob { }, }); - await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( + await this.messagingChannelSyncStatusService.resetAndScheduleMessageListFetch( messageChannels.map((messageChannel) => messageChannel.id), workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts index a378502bf..07551f05b 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts @@ -27,7 +27,7 @@ export class MessageChannelSyncStatusService { private readonly metricsService: MetricsService, ) {} - public async scheduleFullMessageListFetch(messageChannelIds: string[]) { + public async scheduleMessageListFetch(messageChannelIds: string[]) { if (!messageChannelIds.length) { return; } @@ -42,21 +42,6 @@ export class MessageChannelSyncStatusService { }); } - public async schedulePartialMessageListFetch(messageChannelIds: string[]) { - if (!messageChannelIds.length) { - return; - } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - await messageChannelRepository.update(messageChannelIds, { - syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, - }); - } - public async scheduleMessagesImport(messageChannelIds: string[]) { if (!messageChannelIds.length) { return; @@ -72,7 +57,7 @@ export class MessageChannelSyncStatusService { }); } - public async resetAndScheduleFullMessageListFetch( + public async resetAndScheduleMessageListFetch( messageChannelIds: string[], workspaceId: string, ) { @@ -97,7 +82,7 @@ export class MessageChannelSyncStatusService { throttleFailureCount: 0, }); - await this.scheduleFullMessageListFetch(messageChannelIds); + await this.scheduleMessageListFetch(messageChannelIds); } public async resetSyncStageStartedAt(messageChannelIds: string[]) { @@ -132,7 +117,7 @@ export class MessageChannelSyncStatusService { }); } - public async markAsCompletedAndSchedulePartialMessageListFetch( + public async markAsCompletedAndScheduleMessageListFetch( messageChannelIds: string[], ) { if (!messageChannelIds.length) { @@ -146,7 +131,7 @@ export class MessageChannelSyncStatusService { await messageChannelRepository.update(messageChannelIds, { syncStatus: MessageChannelSyncStatus.ACTIVE, - syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, + syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, throttleFailureCount: 0, syncStageStartedAt: null, syncedAt: new Date().toISOString(), diff --git a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts index d2004971a..23c6f8cff 100644 --- a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts +++ b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts @@ -31,8 +31,8 @@ export enum MessageChannelSyncStatus { } export enum MessageChannelSyncStage { - FULL_MESSAGE_LIST_FETCH_PENDING = 'FULL_MESSAGE_LIST_FETCH_PENDING', - PARTIAL_MESSAGE_LIST_FETCH_PENDING = 'PARTIAL_MESSAGE_LIST_FETCH_PENDING', + FULL_MESSAGE_LIST_FETCH_PENDING = 'FULL_MESSAGE_LIST_FETCH_PENDING', // TODO: rename to MESSAGE_LIST_FETCH_PENDING + PARTIAL_MESSAGE_LIST_FETCH_PENDING = 'PARTIAL_MESSAGE_LIST_FETCH_PENDING', // TODO: to be removed, deprecated MESSAGE_LIST_FETCH_ONGOING = 'MESSAGE_LIST_FETCH_ONGOING', MESSAGES_IMPORT_PENDING = 'MESSAGES_IMPORT_PENDING', MESSAGES_IMPORT_ONGOING = 'MESSAGES_IMPORT_ONGOING', @@ -291,13 +291,13 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity { icon: 'IconStatusChange', options: [ { - value: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, + value: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, // TODO: Rename to MESSAGE_LIST_FETCH_PENDING label: 'Full messages list fetch pending', position: 0, color: 'blue', }, { - value: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, + value: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, // TODO: Deprecate label: 'Partial messages list fetch pending', position: 1, color: 'blue', diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts index 1e5306d0e..896f121b8 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts @@ -52,6 +52,7 @@ export class MessagingMessageListFetchCronJob { activeWorkspace.id, ); + // TODO: deprecate looking for FULL_MESSAGE_LIST_FETCH_PENDING as we introduce MESSAGE_LIST_FETCH_PENDING const messageChannels = await mainDataSource.query( `SELECT * FROM ${schemaName}."messageChannel" WHERE "isSyncEnabled" = true AND "syncStage" IN ('${MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING}', '${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}')`, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-network.exception.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-network.exception.ts index 2a2ee92f6..aa2fef27f 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-network.exception.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-network.exception.ts @@ -4,4 +4,5 @@ export enum MessageNetworkExceptionCode { ECONNABORTED = 'ECONNABORTED', ETIMEDOUT = 'ETIMEDOUT', ERR_NETWORK = 'ERR_NETWORK', + EHOSTUNREACH = 'EHOSTUNREACH', } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts index d9f46c48e..d795c7aa8 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts @@ -1,6 +1,8 @@ +import { GmailDefaultMessageCategory } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type'; + export const MESSAGING_GMAIL_EXCLUDED_CATEGORIES = [ - 'promotions', - 'social', - 'forums', - 'updates', + GmailDefaultMessageCategory.promotions, + GmailDefaultMessageCategory.forums, + GmailDefaultMessageCategory.social, + GmailDefaultMessageCategory.updates, ]; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.spec.ts index dc015de1a..95e3cd8df 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.spec.ts @@ -14,12 +14,13 @@ describe('GmailGetMessageListService', () => { const mockConnectedAccount: Pick< ConnectedAccountWorkspaceEntity, - 'provider' | 'refreshToken' | 'id' | 'handle' + 'provider' | 'refreshToken' | 'id' | 'handle' | 'connectionParameters' > = { id: 'connected-account-id', provider: ConnectedAccountProvider.GOOGLE, refreshToken: 'refresh-token', handle: 'test@gmail.com', + connectionParameters: {}, }; beforeEach(async () => { @@ -55,7 +56,7 @@ describe('GmailGetMessageListService', () => { gmailClientProvider = module.get(GmailClientProvider); }); - describe('getFullMessageList', () => { + describe('getMessageList', () => { it('should return 0 messageExternalIds when gmail returns 0 messages', async () => { const mockGmailClient = { users: { @@ -74,9 +75,13 @@ describe('GmailGetMessageListService', () => { mockGmailClient, ); - const result = await service.getFullMessageList(mockConnectedAccount); + const result = await service.getMessageLists({ + messageChannel: { syncCursor: '', id: 'my-id' }, + connectedAccount: mockConnectedAccount, + messageFolders: [], + }); - expect(result.messageExternalIds).toHaveLength(0); + expect(result[0].messageExternalIds).toHaveLength(0); expect(mockGmailClient.users.messages.list).toHaveBeenCalledTimes(1); }); @@ -121,9 +126,13 @@ describe('GmailGetMessageListService', () => { mockGmailClient, ); - const result = await service.getFullMessageList(mockConnectedAccount); + const result = await service.getMessageLists({ + messageChannel: { syncCursor: '', id: 'my-id' }, + connectedAccount: mockConnectedAccount, + messageFolders: [], + }); - expect(result.messageExternalIds).toHaveLength(5); + expect(result[0].messageExternalIds).toHaveLength(5); }); it('should return 3 messageExternalIds when gmail provides a nextpagetoken with 2 messages, then 1', async () => { @@ -168,9 +177,13 @@ describe('GmailGetMessageListService', () => { mockGmailClient, ); - const result = await service.getFullMessageList(mockConnectedAccount); + const result = await service.getMessageLists({ + messageChannel: { syncCursor: '', id: 'my-id' }, + connectedAccount: mockConnectedAccount, + messageFolders: [], + }); - expect(result.messageExternalIds).toHaveLength(3); + expect(result[0].messageExternalIds).toHaveLength(3); expect(mockGmailClient.users.messages.list).toHaveBeenCalledTimes(2); }); it('should go through while loop once when gmail provides a nextpagetoken but 0 messages - should never happen IRL', async () => { @@ -196,9 +209,13 @@ describe('GmailGetMessageListService', () => { mockGmailClient, ); - const result = await service.getFullMessageList(mockConnectedAccount); + const result = await service.getMessageLists({ + messageChannel: { syncCursor: '', id: 'my-id' }, + connectedAccount: mockConnectedAccount, + messageFolders: [], + }); - expect(result.messageExternalIds).toHaveLength(0); + expect(result[0].messageExternalIds).toHaveLength(0); expect(mockGmailClient.users.messages.list).toHaveBeenCalledTimes(1); }); }); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts index 8c3149b68..17aa6f686 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts @@ -1,8 +1,11 @@ import { Injectable } from '@nestjs/common'; +import { isNonEmptyString } from '@sniptt/guards'; import { gmail_v1 as gmailV1 } from 'googleapis'; +import { isDefined } from 'twenty-shared/utils'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { MessageImportDriverException, MessageImportDriverExceptionCode, @@ -14,10 +17,9 @@ import { GmailGetHistoryService } from 'src/modules/messaging/message-import-man import { GmailHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-handle-error.service'; import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter.util'; import { computeGmailCategoryLabelId } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id.util'; -import { - GetFullMessageListResponse, - GetPartialMessageListResponse, -} from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; +import { mapGmailDefaultFolderToCategoryOrUndefined } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/map-gmail-default-folder-to-category'; +import { GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type'; +import { GetMessageListsResponse } from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type'; import { assertNotNull } from 'src/utils/assert'; @Injectable() @@ -28,12 +30,13 @@ export class GmailGetMessageListService { private readonly gmailHandleErrorService: GmailHandleErrorService, ) {} - public async getFullMessageList( + private async getMessageListWithoutCursor( connectedAccount: Pick< ConnectedAccountWorkspaceEntity, 'provider' | 'refreshToken' | 'id' | 'handle' >, - ): Promise { + messageFolders: Pick[], + ): Promise { const gmailClient = await this.gmailClientProvider.getGmailClient(connectedAccount); @@ -41,6 +44,7 @@ export class GmailGetMessageListService { let hasMoreMessages = true; const messageExternalIds: string[] = []; + const excludedCategories = this.comptuteExcludedCategories(messageFolders); while (hasMoreMessages) { const messageList = await gmailClient.users.messages @@ -48,9 +52,7 @@ export class GmailGetMessageListService { userId: 'me', maxResults: MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT, pageToken, - q: computeGmailCategoryExcludeSearchFilter( - MESSAGING_GMAIL_EXCLUDED_CATEGORIES, - ), + q: computeGmailCategoryExcludeSearchFilter(excludedCategories), }) .catch((error) => { this.gmailHandleErrorService.handleGmailMessageListFetchError(error); @@ -78,10 +80,15 @@ export class GmailGetMessageListService { } if (messageExternalIds.length === 0) { - return { - messageExternalIds, - nextSyncCursor: '', - }; + return [ + { + messageExternalIds, + nextSyncCursor: '', + previousSyncCursor: '', + messageExternalIdsToDelete: [], + folderId: undefined, + }, + ]; } const firstMessageExternalId = messageExternalIds[0]; @@ -106,28 +113,42 @@ export class GmailGetMessageListService { ); } - return { messageExternalIds, nextSyncCursor }; + return [ + { + messageExternalIds, + nextSyncCursor, + previousSyncCursor: '', + messageExternalIdsToDelete: [], + folderId: undefined, + }, + ]; } - public async getPartialMessageList( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'provider' | 'refreshToken' | 'id' - >, - syncCursor: string, - ): Promise { + public async getMessageLists({ + messageChannel, + connectedAccount, + messageFolders, + }: GetMessageListsArgs): Promise { const gmailClient = await this.gmailClientProvider.getGmailClient(connectedAccount); + if (!isNonEmptyString(messageChannel.syncCursor)) { + return this.getMessageListWithoutCursor(connectedAccount, messageFolders); + } + const { history, historyId: nextSyncCursor } = - await this.gmailGetHistoryService.getHistory(gmailClient, syncCursor); + await this.gmailGetHistoryService.getHistory( + gmailClient, + messageChannel.syncCursor, + ); const { messagesAdded, messagesDeleted } = await this.gmailGetHistoryService.getMessageIdsFromHistory(history); const messageIdsToFilter = await this.getEmailIdsFromExcludedCategories( gmailClient, - syncCursor, + messageChannel.syncCursor, + messageFolders, ); const messagesAddedFiltered = messagesAdded.filter( @@ -141,21 +162,42 @@ export class GmailGetMessageListService { ); } - return { - messageExternalIds: messagesAddedFiltered, - messageExternalIdsToDelete: messagesDeleted, - previousSyncCursor: syncCursor, - nextSyncCursor, - }; + return [ + { + messageExternalIds: messagesAddedFiltered, + messageExternalIdsToDelete: messagesDeleted, + previousSyncCursor: messageChannel.syncCursor, + nextSyncCursor, + folderId: undefined, + }, + ]; + } + + private comptuteExcludedCategories( + messageFolders: Pick[], + ) { + const includedDefaultCategories = messageFolders + .map((messageFolder) => + mapGmailDefaultFolderToCategoryOrUndefined(messageFolder.name), + ) + .filter(isDefined); + + return MESSAGING_GMAIL_EXCLUDED_CATEGORIES.filter( + (excludedCategory) => + !includedDefaultCategories.includes(excludedCategory), + ); } private async getEmailIdsFromExcludedCategories( gmailClient: gmailV1.Gmail, lastSyncHistoryId: string, + messageFolders: Pick[], ): Promise { const emailIds: string[] = []; - for (const category of MESSAGING_GMAIL_EXCLUDED_CATEGORIES) { + const excludedCategories = this.comptuteExcludedCategories(messageFolders); + + for (const category of excludedCategories) { const { history } = await this.gmailGetHistoryService.getHistory( gmailClient, lastSyncHistoryId, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type.ts new file mode 100644 index 000000000..aaa90891b --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type.ts @@ -0,0 +1,6 @@ +export enum GmailDefaultMessageCategory { + promotions = 'promotions', + social = 'social', + forums = 'forums', + updates = 'updates', +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-folder.type.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-folder.type.ts new file mode 100644 index 000000000..6cd72fb55 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-folder.type.ts @@ -0,0 +1,6 @@ +export enum GmailDefaultMessageFolder { + CATEGORY_PROMOTIONS = 'CATEGORY_PROMOTIONS', + CATEGORY_SOCIAL = 'CATEGORY_SOCIAL', + CATEGORY_FORUMS = 'CATEGORY_FORUMS', + CATEGORY_UPDATES = 'CATEGORY_UPDATES', +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/map-gmail-default-folder-to-category.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/map-gmail-default-folder-to-category.ts new file mode 100644 index 000000000..6f93cbb70 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/map-gmail-default-folder-to-category.ts @@ -0,0 +1,37 @@ +import { isDefined } from 'twenty-shared/utils'; + +import { GmailDefaultMessageCategory } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type'; +import { GmailDefaultMessageFolder } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-folder.type'; + +const DEFAULT_FOLDER_NAME_TO_CATEGORY_MAPPING = [ + { + folderName: GmailDefaultMessageFolder.CATEGORY_FORUMS, + category: GmailDefaultMessageCategory.forums, + }, + { + folderName: GmailDefaultMessageFolder.CATEGORY_PROMOTIONS, + category: GmailDefaultMessageCategory.promotions, + }, + { + folderName: GmailDefaultMessageFolder.CATEGORY_SOCIAL, + category: GmailDefaultMessageCategory.social, + }, + { + folderName: GmailDefaultMessageFolder.CATEGORY_UPDATES, + category: GmailDefaultMessageCategory.updates, + }, +]; + +export const mapGmailDefaultFolderToCategoryOrUndefined = ( + messageFolderName: string, +) => { + const mapping = DEFAULT_FOLDER_NAME_TO_CATEGORY_MAPPING.find( + ({ folderName }) => folderName === messageFolderName, + ); + + if (!isDefined(mapping)) { + return undefined; + } + + return mapping.category; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts index 2050d65c7..9c5986d36 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts @@ -17,6 +17,7 @@ export const parseGaxiosError = ( case MessageNetworkExceptionCode.ECONNABORTED: case MessageNetworkExceptionCode.ETIMEDOUT: case MessageNetworkExceptionCode.ERR_NETWORK: + case MessageNetworkExceptionCode.EHOSTUNREACH: return new MessageImportDriverException( error.message, MessageImportDriverExceptionCode.TEMPORARY_ERROR, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service.ts index f49ead617..491a45ae0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service.ts @@ -2,11 +2,11 @@ import { Injectable, Logger } from '@nestjs/common'; import { ImapFlow } from 'imapflow'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { ImapClientProvider } from 'src/modules/messaging/message-import-manager/drivers/imap/providers/imap-client.provider'; import { ImapHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-handle-error.service'; import { findSentMailbox } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/find-sent-mailbox.util'; -import { GetFullMessageListResponse } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; +import { GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type'; +import { GetMessageListsResponse } from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type'; @Injectable() export class ImapGetMessageListService { @@ -17,74 +17,10 @@ export class ImapGetMessageListService { private readonly imapHandleErrorService: ImapHandleErrorService, ) {} - async getFullMessageList( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'id' | 'provider' | 'connectionParameters' | 'handle' - >, - ): Promise { - try { - const client = await this.imapClientProvider.getClient(connectedAccount); - - const mailboxes = ['INBOX']; - - const sentFolder = await findSentMailbox(client, this.logger); - - if (sentFolder) { - mailboxes.push(sentFolder); - } - - let allMessages: { id: string; date: string }[] = []; - - for (const mailbox of mailboxes) { - try { - const messages = await this.getMessagesFromMailbox(client, mailbox); - - allMessages = [...allMessages, ...messages]; - this.logger.log( - `Fetched ${messages.length} messages from ${mailbox}`, - ); - } catch (error) { - this.logger.warn( - `Error fetching from mailbox ${mailbox}: ${error.message}. Continuing with other mailboxes.`, - ); - } - } - - allMessages.sort( - (a, b) => new Date(b.date).getTime() - new Date(a.date).getTime(), - ); - - const messageExternalIds = allMessages.map((message) => message.id); - - const nextSyncCursor = - allMessages.length > 0 ? allMessages[allMessages.length - 1].date : ''; - - return { - messageExternalIds, - nextSyncCursor, - }; - } catch (error) { - this.logger.error( - `Error getting message list: ${error.message}`, - error.stack, - ); - - this.imapHandleErrorService.handleImapMessageListFetchError(error); - - return { messageExternalIds: [], nextSyncCursor: '' }; - } finally { - await this.imapClientProvider.closeClient(connectedAccount.id); - } - } - - async getPartialMessageList( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'id' | 'provider' | 'connectionParameters' | 'handle' - >, - syncCursor?: string, - ): Promise<{ messageExternalIds: string[]; nextSyncCursor: string }> { + async getMessageLists({ + messageChannel, + connectedAccount, + }: GetMessageListsArgs): Promise { try { const client = await this.imapClientProvider.getClient(connectedAccount); @@ -103,7 +39,7 @@ export class ImapGetMessageListService { const messages = await this.getMessagesFromMailbox( client, mailbox, - syncCursor, + messageChannel.syncCursor, ); allMessages = [...allMessages, ...messages]; @@ -126,12 +62,17 @@ export class ImapGetMessageListService { const nextSyncCursor = allMessages.length > 0 ? allMessages[allMessages.length - 1].date - : syncCursor || ''; + : messageChannel.syncCursor || ''; - return { - messageExternalIds, - nextSyncCursor, - }; + return [ + { + messageExternalIds, + nextSyncCursor, + previousSyncCursor: messageChannel.syncCursor, + messageExternalIdsToDelete: [], + folderId: undefined, + }, + ]; } catch (error) { this.logger.error( `Error getting message list: ${error.message}`, @@ -140,7 +81,15 @@ export class ImapGetMessageListService { this.imapHandleErrorService.handleImapMessageListFetchError(error); - return { messageExternalIds: [], nextSyncCursor: syncCursor || '' }; + return [ + { + messageExternalIds: [], + nextSyncCursor: messageChannel.syncCursor || '', + previousSyncCursor: messageChannel.syncCursor, + messageExternalIdsToDelete: [], + folderId: undefined, + }, + ]; } finally { await this.imapClientProvider.closeClient(connectedAccount.id); } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts index 836586d57..f5cf39555 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts @@ -5,6 +5,7 @@ import { ConnectedAccountProvider } from 'twenty-shared/types'; import { TwentyConfigModule } from 'src/engine/core-modules/twenty-config/twenty-config.module'; import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { microsoftGraphWithMessagesDeltaLink } from 'src/modules/messaging/message-import-manager/drivers/microsoft/mocks/microsoft-api-examples'; @@ -17,10 +18,23 @@ import { MicrosoftHandleErrorService } from './microsoft-handle-error.service'; // in case you have "Please provide a valid token" it may be because you need to pass the env varible to the .env.test file const refreshToken = 'replace-with-your-refresh-token'; const syncCursor = `replace-with-your-sync-cursor`; -const mockConnectedAccount = { +const mockConnectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'provider' | 'refreshToken' | 'id' | 'handle' | 'connectionParameters' +> = { id: 'connected-account-id', provider: ConnectedAccountProvider.MICROSOFT, refreshToken: refreshToken, + handle: 'test@gmail.com', + connectionParameters: {}, +}; + +const mockMessageChannel: Pick< + MessageChannelWorkspaceEntity, + 'id' | 'syncCursor' +> = { + id: 'message-channel-id', + syncCursor: '', // Should be empty for Microsoft as cursors are stored at the folder level }; xdescribe('Microsoft dev tests : get message list service', () => { @@ -44,12 +58,19 @@ xdescribe('Microsoft dev tests : get message list service', () => { }); it('Should fetch and return message list successfully', async () => { - const result = await service.getFullMessageList( - mockConnectedAccount, - MessageFolderName.INBOX, - ); + const result = await service.getMessageLists({ + connectedAccount: mockConnectedAccount, + messageChannel: mockMessageChannel, + messageFolders: [ + { + id: 'inbox-folder-id', + name: MessageFolderName.INBOX, + syncCursor: 'inbox-sync-cursor', + }, + ], + }); - expect(result.messageExternalIds.length).toBeGreaterThan(0); + expect(result[0].messageExternalIds.length).toBeGreaterThan(0); }); it('Should throw token error', async () => { @@ -57,113 +78,65 @@ xdescribe('Microsoft dev tests : get message list service', () => { id: 'connected-account-id', provider: ConnectedAccountProvider.MICROSOFT, refreshToken: 'invalid-token', + handle: 'test@microsoft.com', + connectionParameters: {}, }; await expect( - service.getFullMessageList( - mockConnectedAccountUnvalid, - MessageFolderName.INBOX, - ), + service.getMessageLists({ + connectedAccount: mockConnectedAccountUnvalid, + messageChannel: mockMessageChannel, + messageFolders: [ + { + id: 'inbox-folder-id', + name: MessageFolderName.INBOX, + syncCursor: 'inbox-sync-cursor', + }, + ], + }), ).rejects.toThrowError('Access token is undefined or empty'); }); // if you need to run this test, you need to manually update the syncCursor to a valid one xit('Should fetch and return partial message list successfully', async () => { - const result = await service.getPartialMessageList( - mockConnectedAccount, - syncCursor, - ); + const result = await service.getMessageLists({ + connectedAccount: mockConnectedAccount, + messageChannel: mockMessageChannel, + messageFolders: [ + { + id: 'inbox-folder-id', + name: MessageFolderName.INBOX, + syncCursor: syncCursor, + }, + ], + }); - expect(result.nextSyncCursor).toBeTruthy(); + expect(result[0].nextSyncCursor).toBeTruthy(); }); it('Should fail partial message if syncCursor is invalid', async () => { await expect( - service.getPartialMessageList(mockConnectedAccount, 'invalid-syncCursor'), + service.getMessageLists({ + messageChannel: { + id: 'message-channel-id', + syncCursor: '', + }, + connectedAccount: mockConnectedAccount, + messageFolders: [ + { + id: 'inbox-folder-id', + name: MessageFolderName.INBOX, + syncCursor: 'invalid-syncCursor', + }, + ], + }), ).rejects.toThrowError( /Resource not found for the segment|Badly formed content/g, ); }); - - it('Should fail partial message if syncCursor is missing', async () => { - await expect( - service.getPartialMessageList(mockConnectedAccount, ''), - ).rejects.toThrowError(/Missing SyncCursor/g); - }); }); -xdescribe('Microsoft dev tests : get full message list service for folders', () => { - let service: MicrosoftGetMessageListService; - - const inboxFolder = new MessageFolderWorkspaceEntity(); - - inboxFolder.id = 'inbox-folder-id'; - inboxFolder.name = MessageFolderName.INBOX; - inboxFolder.syncCursor = 'inbox-sync-cursor'; - inboxFolder.messageChannelId = 'message-channel-1'; - - const sentFolder = new MessageFolderWorkspaceEntity(); - - sentFolder.id = 'sent-folder-id'; - sentFolder.name = MessageFolderName.SENT_ITEMS; - sentFolder.syncCursor = 'sent-sync-cursor'; - sentFolder.messageChannelId = 'message-channel-1'; - - const otherFolder = new MessageFolderWorkspaceEntity(); - - otherFolder.id = 'other-folder-id'; - otherFolder.name = 'other'; - otherFolder.syncCursor = 'other-sync-cursor'; - otherFolder.messageChannelId = 'message-channel-2'; - - beforeEach(async () => { - const module: TestingModule = await Test.createTestingModule({ - imports: [TwentyConfigModule.forRoot()], - providers: [ - MicrosoftGetMessageListService, - MicrosoftClientProvider, - MicrosoftHandleErrorService, - MicrosoftOAuth2ClientManagerService, - ConfigService, - ], - }).compile(); - - service = module.get( - MicrosoftGetMessageListService, - ); - }); - - it('Should return empty array', async () => { - const result = await service.getFullMessageListForFolders( - mockConnectedAccount, - [], - ); - - expect(result.length).toBe(0); - }); - - it('Should return an array of one item', async () => { - const result = await service.getFullMessageListForFolders( - mockConnectedAccount, - [inboxFolder], - ); - - expect(result.length).toBe(1); - expect(result[0].folderId).toBe(inboxFolder.id); - expect(result[0].messageExternalIds.length).toBeGreaterThan(0); - }); - - it('Should return an array of two items', async () => { - const result = await service.getFullMessageListForFolders( - mockConnectedAccount, - [inboxFolder, sentFolder], - ); - - expect(result.length).toBe(2); - }); -}); - -xdescribe('Microsoft dev tests : get partial message list service for folders', () => { +xdescribe('Microsoft dev tests : get message list service for folders', () => { let service: MicrosoftGetMessageListService; const inboxFolder = new MessageFolderWorkspaceEntity(); @@ -234,19 +207,21 @@ xdescribe('Microsoft dev tests : get partial message list service for folders', }); it('Should return empty array', async () => { - const result = await service.getPartialMessageListForFolders( - mockConnectedAccount, - messageChannelNoFolders, - ); + const result = await service.getMessageLists({ + messageChannel: messageChannelNoFolders, + connectedAccount: mockConnectedAccount, + messageFolders: [], + }); expect(result.length).toBe(0); }); it('Should return an array of one items', async () => { - const result = await service.getPartialMessageListForFolders( - mockConnectedAccount, - messageChannelMicrosoftOneFolder, - ); + const result = await service.getMessageLists({ + messageChannel: messageChannelMicrosoftOneFolder, + connectedAccount: mockConnectedAccount, + messageFolders: [inboxFolder], + }); expect(result.length).toBe(1); expect(result[0].folderId).toBe(inboxFolder.id); @@ -254,10 +229,11 @@ xdescribe('Microsoft dev tests : get partial message list service for folders', }); it('Should return an array of two items', async () => { - const result = await service.getPartialMessageListForFolders( - mockConnectedAccount, - messageChannelMicrosoft, - ); + const result = await service.getMessageLists({ + messageChannel: messageChannelMicrosoft, + connectedAccount: mockConnectedAccount, + messageFolders: [inboxFolder, sentFolder], + }); expect(result.length).toBe(2); }); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts index 02888d833..44e22f8f5 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts @@ -5,11 +5,11 @@ import { PageIterator, PageIteratorCallback, } from '@microsoft/microsoft-graph-client'; +import { isNonEmptyString } from '@sniptt/guards'; import { v4 } from 'uuid'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { MessageImportDriverException, @@ -19,12 +19,12 @@ import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-ma import { MicrosoftHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-handle-error.service'; import { MessageFolderName } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/folders'; import { isAccessTokenRefreshingError } from 'src/modules/messaging/message-import-manager/drivers/microsoft/utils/is-access-token-refreshing-error.utils'; +import { GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type'; import { - GetFullMessageListForFoldersResponse, - GetFullMessageListResponse, - GetPartialMessageListForFoldersResponse, - GetPartialMessageListResponse, -} from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; + GetMessageListsResponse, + GetOneMessageListResponse, +} from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type'; + // Microsoft API limit is 999 messages per request on this endpoint const MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT = 999; @@ -36,101 +36,14 @@ export class MicrosoftGetMessageListService { private readonly twentyORMManager: TwentyORMManager, ) {} - public async getFullMessageListForFolders( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'refreshToken' | 'id' - >, - folders: Pick[], - ): Promise { - const result: GetFullMessageListForFoldersResponse[] = []; + public async getMessageLists({ + messageChannel, + connectedAccount, + messageFolders, + }: GetMessageListsArgs): Promise { + const result: GetMessageListsResponse = []; - for (const folder of folders) { - const response = await this.getFullMessageList( - connectedAccount, - folder.name as MessageFolderName, - ); - - result.push({ - ...response, - folderId: folder.id, - }); - } - - return result; - } - - public async getFullMessageList( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'refreshToken' | 'id' - >, - folderName: MessageFolderName, - ): Promise { - const messageExternalIds: string[] = []; - - const microsoftClient = - await this.microsoftClientProvider.getMicrosoftClient(connectedAccount); - - const response: PageCollection = await microsoftClient - .api(`/me/mailfolders/${folderName}/messages/delta?$select=id`) - .version('beta') - .headers({ - Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`, - }) - .get() - .catch((error) => { - if (isAccessTokenRefreshingError(error?.body)) { - throw new MessageImportDriverException( - error.message, - MessageImportDriverExceptionCode.CLIENT_NOT_AVAILABLE, - ); - } - this.microsoftHandleErrorService.handleMicrosoftGetMessageListError( - error, - ); - }); - - const callback: PageIteratorCallback = (data) => { - messageExternalIds.push(data.id); - - return true; - }; - - const pageIterator = new PageIterator(microsoftClient, response, callback, { - headers: { - Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`, - }, - }); - - await pageIterator.iterate().catch((error) => { - if (isAccessTokenRefreshingError(error?.body)) { - throw new MessageImportDriverException( - error.message, - MessageImportDriverExceptionCode.CLIENT_NOT_AVAILABLE, - ); - } - this.microsoftHandleErrorService.handleMicrosoftGetMessageListError( - error, - ); - }); - - return { - messageExternalIds: messageExternalIds, - nextSyncCursor: pageIterator.getDeltaLink() || '', - }; - } - - public async getPartialMessageListForFolders( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'provider' | 'refreshToken' | 'id' - >, - messageChannel: MessageChannelWorkspaceEntity, - ): Promise { - const result: GetPartialMessageListForFoldersResponse[] = []; - - if (messageChannel.messageFolders.length === 0) { + if (messageFolders.length === 0) { // permanent solution: // throw new MessageImportDriverException( // `Message channel ${messageChannel.id} has no message folders`, @@ -158,10 +71,10 @@ export class MicrosoftGetMessageListService { syncCursor: messageChannel.syncCursor, }); - const response = await this.getPartialMessageList( - connectedAccount, - messageChannel.syncCursor, - ); + const response = await this.getMessageList(connectedAccount, { + name: MessageFolderName.INBOX, + syncCursor: messageChannel.syncCursor, + }); result.push({ ...response, @@ -173,11 +86,8 @@ export class MicrosoftGetMessageListService { return result; } - for (const folder of messageChannel.messageFolders) { - const response = await this.getPartialMessageList( - connectedAccount, - folder.syncCursor, - ); + for (const folder of messageFolders) { + const response = await this.getMessageList(connectedAccount, folder); result.push({ ...response, @@ -188,29 +98,25 @@ export class MicrosoftGetMessageListService { return result; } - public async getPartialMessageList( + public async getMessageList( connectedAccount: Pick< ConnectedAccountWorkspaceEntity, 'provider' | 'refreshToken' | 'id' >, - syncCursor: string, - ): Promise { - // important: otherwise tries to get the full message list - if (!syncCursor) { - throw new MessageImportDriverException( - 'Missing SyncCursor', - MessageImportDriverExceptionCode.SYNC_CURSOR_ERROR, - ); - } - + messageFolder: Pick, + ): Promise { const messageExternalIds: string[] = []; const messageExternalIdsToDelete: string[] = []; const microsoftClient = await this.microsoftClientProvider.getMicrosoftClient(connectedAccount); + const apiUrl = isNonEmptyString(messageFolder.syncCursor) + ? messageFolder.syncCursor + : `/me/mailfolders/${messageFolder.name}/messages/delta?$select=id`; + const response: PageCollection = await microsoftClient - .api(syncCursor) + .api(apiUrl) .version('beta') .headers({ Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`, @@ -259,8 +165,9 @@ export class MicrosoftGetMessageListService { return { messageExternalIds, messageExternalIdsToDelete, - previousSyncCursor: syncCursor, + previousSyncCursor: messageFolder.syncCursor, nextSyncCursor: pageIterator.getDeltaLink() || '', + folderId: undefined, }; } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index 77bc0e57c..6aa49d3c9 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -12,12 +12,11 @@ import { MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service'; -import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessageImportExceptionHandlerService, MessageImportSyncStep, } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; -import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; +import { MessagingMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service'; import { MessagingMonitoringService } from 'src/modules/messaging/monitoring/services/messaging-monitoring.service'; export type MessagingMessageListFetchJobData = { @@ -31,8 +30,7 @@ export type MessagingMessageListFetchJobData = { }) export class MessagingMessageListFetchJob { constructor( - private readonly messagingFullMessageListFetchService: MessagingFullMessageListFetchService, - private readonly messagingPartialMessageListFetchService: MessagingPartialMessageListFetchService, + private readonly messagingMessageListFetchService: MessagingMessageListFetchService, private readonly messagingMonitoringService: MessagingMonitoringService, private readonly twentyORMManager: TwentyORMManager, private readonly connectedAccountRefreshTokensService: ConnectedAccountRefreshTokensService, @@ -88,29 +86,7 @@ export class MessagingMessageListFetchJob { ); switch (messageChannel.syncStage) { - case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING: - await this.messagingMonitoringService.track({ - eventName: 'partial_message_list_fetch.started', - workspaceId, - connectedAccountId: messageChannel.connectedAccount.id, - messageChannelId: messageChannel.id, - }); - - await this.messagingPartialMessageListFetchService.processMessageListFetch( - messageChannel, - messageChannel.connectedAccount, - workspaceId, - ); - - await this.messagingMonitoringService.track({ - eventName: 'partial_message_list_fetch.completed', - workspaceId, - connectedAccountId: messageChannel.connectedAccount.id, - messageChannelId: messageChannel.id, - }); - - break; - + case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING: // TODO: deprecate as we introduce MESSAGE_LIST_FETCH_PENDING case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING: await this.messagingMonitoringService.track({ eventName: 'full_message_list_fetch.started', @@ -119,7 +95,7 @@ export class MessagingMessageListFetchJob { messageChannelId: messageChannel.id, }); - await this.messagingFullMessageListFetchService.processMessageListFetch( + await this.messagingMessageListFetchService.processMessageListFetch( messageChannel, workspaceId, ); @@ -139,7 +115,7 @@ export class MessagingMessageListFetchJob { } catch (error) { await this.messageImportErrorHandlerService.handleDriverException( error, - MessageImportSyncStep.FULL_OR_PARTIAL_MESSAGE_LIST_FETCH, + MessageImportSyncStep.MESSAGE_LIST_FETCH, messageChannel, workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts index f8ec658d3..cd8578333 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts @@ -61,7 +61,7 @@ export class MessagingOngoingStaleJob { switch (messageChannel.syncStage) { case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING: - await this.messageChannelSyncStatusService.schedulePartialMessageListFetch( + await this.messageChannelSyncStatusService.scheduleMessageListFetch( [messageChannel.id], ); break; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index aaf8617fd..f0a0aa971 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -31,13 +31,12 @@ import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-m import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener'; import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service'; import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; -import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service'; import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; +import { MessagingMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service'; import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service'; import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service'; -import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service'; import { MessagingSendMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-send-message.service'; import { MessageParticipantManagerModule } from 'src/modules/messaging/message-participant-manager/message-participant-manager.module'; @@ -78,8 +77,7 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess MessagingMessageImportManagerMessageChannelListener, MessagingCleanCacheJob, MessagingMessageService, - MessagingPartialMessageListFetchService, - MessagingFullMessageListFetchService, + MessagingMessageListFetchService, MessagingMessagesImportService, MessagingSaveMessagesAndEnqueueContactCreationService, MessagingGetMessageListService, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts index 44b86b03a..12b6716ed 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts @@ -2,9 +2,7 @@ import { Injectable } from '@nestjs/common'; import { ConnectedAccountProvider } from 'twenty-shared/types'; -import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { GmailGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service'; import { ImapGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service'; import { MicrosoftGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service'; @@ -12,29 +10,7 @@ import { MessageImportException, MessageImportExceptionCode, } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; -import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; - -export type GetFullMessageListResponse = { - messageExternalIds: string[]; - nextSyncCursor: string; -}; - -export type GetFullMessageListForFoldersResponse = - GetFullMessageListResponse & { - folderId: string | undefined; - }; - -export type GetPartialMessageListResponse = { - messageExternalIds: string[]; - messageExternalIdsToDelete: string[]; - previousSyncCursor: string; - nextSyncCursor: string; -}; - -export type GetPartialMessageListForFoldersResponse = - GetPartialMessageListResponse & { - folderId: string | undefined; - }; +import { GetMessageListsResponse } from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type'; @Injectable() export class MessagingGetMessageListService { @@ -42,100 +18,30 @@ export class MessagingGetMessageListService { private readonly gmailGetMessageListService: GmailGetMessageListService, private readonly microsoftGetMessageListService: MicrosoftGetMessageListService, private readonly imapGetMessageListService: ImapGetMessageListService, - private readonly messagingCursorService: MessagingCursorService, - private readonly twentyORMManager: TwentyORMManager, ) {} - public async getFullMessageLists( + public async getMessageLists( messageChannel: MessageChannelWorkspaceEntity, - ): Promise { - switch (messageChannel.connectedAccount.provider) { - case ConnectedAccountProvider.GOOGLE: { - const fullMessageList = - await this.gmailGetMessageListService.getFullMessageList( - messageChannel.connectedAccount, - ); - - return [ - { - ...fullMessageList, - folderId: undefined, - }, - ]; - } - case ConnectedAccountProvider.MICROSOFT: { - const folderRepository = - await this.twentyORMManager.getRepository( - 'messageFolder', - ); - - const folders = await folderRepository.find({ - where: { - messageChannelId: messageChannel.id, - }, - }); - - return this.microsoftGetMessageListService.getFullMessageListForFolders( - messageChannel.connectedAccount, - folders, - ); - } - case ConnectedAccountProvider.IMAP_SMTP_CALDAV: { - const fullMessageList = - await this.imapGetMessageListService.getFullMessageList( - messageChannel.connectedAccount, - ); - - return [ - { - ...fullMessageList, - folderId: undefined, - }, - ]; - } - default: - throw new MessageImportException( - `Provider ${messageChannel.connectedAccount.provider} is not supported`, - MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, - ); - } - } - - public async getPartialMessageLists( - messageChannel: MessageChannelWorkspaceEntity, - ): Promise { + ): Promise { switch (messageChannel.connectedAccount.provider) { case ConnectedAccountProvider.GOOGLE: - return [ - { - ...(await this.gmailGetMessageListService.getPartialMessageList( - messageChannel.connectedAccount, - messageChannel.syncCursor, - )), - folderId: undefined, - }, - ]; - case ConnectedAccountProvider.MICROSOFT: - return this.microsoftGetMessageListService.getPartialMessageListForFolders( - messageChannel.connectedAccount, + return await this.gmailGetMessageListService.getMessageLists({ messageChannel, - ); + connectedAccount: messageChannel.connectedAccount, + messageFolders: messageChannel.messageFolders, + }); + case ConnectedAccountProvider.MICROSOFT: + return this.microsoftGetMessageListService.getMessageLists({ + messageChannel, + connectedAccount: messageChannel.connectedAccount, + messageFolders: messageChannel.messageFolders, + }); case ConnectedAccountProvider.IMAP_SMTP_CALDAV: { - const messageList = - await this.imapGetMessageListService.getPartialMessageList( - messageChannel.connectedAccount, - messageChannel.syncCursor, - ); - - return [ - { - messageExternalIds: messageList.messageExternalIds, - messageExternalIdsToDelete: [], - previousSyncCursor: messageChannel.syncCursor || '', - nextSyncCursor: messageList.nextSyncCursor || '', - folderId: undefined, - }, - ]; + return await this.imapGetMessageListService.getMessageLists({ + messageChannel, + connectedAccount: messageChannel.connectedAccount, + messageFolders: messageChannel.messageFolders, + }); } default: throw new MessageImportException( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts index 32b05fd4b..97ab4fb85 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts @@ -21,9 +21,9 @@ import { } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; export enum MessageImportSyncStep { - FULL_MESSAGE_LIST_FETCH = 'FULL_MESSAGE_LIST_FETCH', - PARTIAL_MESSAGE_LIST_FETCH = 'PARTIAL_MESSAGE_LIST_FETCH', - FULL_OR_PARTIAL_MESSAGE_LIST_FETCH = 'FULL_OR_PARTIAL_MESSAGE_LIST_FETCH', + FULL_MESSAGE_LIST_FETCH = 'FULL_MESSAGE_LIST_FETCH', // TODO: deprecate to only use MESSAGE_LIST_FETCH + PARTIAL_MESSAGE_LIST_FETCH = 'PARTIAL_MESSAGE_LIST_FETCH', // TODO: deprecate to only use MESSAGE_LIST_FETCH + MESSAGE_LIST_FETCH = 'MESSAGE_LIST_FETCH', MESSAGES_IMPORT_PENDING = 'MESSAGES_IMPORT_PENDING', MESSAGES_IMPORT_ONGOING = 'MESSAGES_IMPORT_ONGOING', } @@ -145,16 +145,11 @@ export class MessageImportExceptionHandlerService { switch (syncStep) { case MessageImportSyncStep.FULL_MESSAGE_LIST_FETCH: - await this.messageChannelSyncStatusService.scheduleFullMessageListFetch( - [messageChannel.id], - ); + await this.messageChannelSyncStatusService.scheduleMessageListFetch([ + messageChannel.id, + ]); break; - case MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH: - await this.messageChannelSyncStatusService.schedulePartialMessageListFetch( - [messageChannel.id], - ); - break; case MessageImportSyncStep.MESSAGES_IMPORT_PENDING: case MessageImportSyncStep.MESSAGES_IMPORT_ONGOING: await this.messageChannelSyncStatusService.scheduleMessagesImport([ @@ -233,7 +228,7 @@ export class MessageImportExceptionHandlerService { return; } - await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch( + await this.messageChannelSyncStatusService.resetAndScheduleMessageListFetch( [messageChannel.id], workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.spec.ts similarity index 87% rename from packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.spec.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.spec.ts index bedb6aab2..fd4e3b9ef 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.spec.ts @@ -10,12 +10,12 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/stan import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; -import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; +import { MessagingMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service'; -describe('MessagingFullMessageListFetchService', () => { - let messagingFullMessageListFetchService: MessagingFullMessageListFetchService; +describe('MessagingMessageListFetchService', () => { + let messagingMessageListFetchService: MessagingMessageListFetchService; let messagingGetMessageListService: MessagingGetMessageListService; let messageChannelSyncStatusService: MessageChannelSyncStatusService; let twentyORMManager: TwentyORMManager; @@ -72,7 +72,7 @@ describe('MessagingFullMessageListFetchService', () => { const module: TestingModule = await Test.createTestingModule({ providers: [ - MessagingFullMessageListFetchService, + MessagingMessageListFetchService, { provide: CacheStorageNamespace.ModuleMessaging, useValue: { @@ -82,12 +82,11 @@ describe('MessagingFullMessageListFetchService', () => { { provide: MessagingGetMessageListService, useValue: { - getFullMessageLists: jest + getMessageLists: jest .fn() - .mockImplementation((messageChannel) => { + .mockImplementation(({ connectedAccount }) => { if ( - messageChannel.connectedAccount.provider === - ConnectedAccountProvider.GOOGLE + connectedAccount.provider === ConnectedAccountProvider.GOOGLE ) { return [ { @@ -162,9 +161,9 @@ describe('MessagingFullMessageListFetchService', () => { ], }).compile(); - messagingFullMessageListFetchService = - module.get( - MessagingFullMessageListFetchService, + messagingMessageListFetchService = + module.get( + MessagingMessageListFetchService, ); messagingGetMessageListService = module.get( MessagingGetMessageListService, @@ -180,7 +179,7 @@ describe('MessagingFullMessageListFetchService', () => { }); it('should process Microsoft message list fetch correctly', async () => { - await messagingFullMessageListFetchService.processMessageListFetch( + await messagingMessageListFetchService.processMessageListFetch( mockMicrosoftMessageChannel, workspaceId, ); @@ -189,9 +188,9 @@ describe('MessagingFullMessageListFetchService', () => { messageChannelSyncStatusService.markAsMessagesListFetchOngoing, ).toHaveBeenCalledWith([mockMicrosoftMessageChannel.id]); - expect( - messagingGetMessageListService.getFullMessageLists, - ).toHaveBeenCalledWith(mockMicrosoftMessageChannel); + expect(messagingGetMessageListService.getMessageLists).toHaveBeenCalledWith( + mockMicrosoftMessageChannel, + ); expect(twentyORMManager.getRepository).toHaveBeenCalledWith( 'messageChannelMessageAssociation', @@ -209,7 +208,7 @@ describe('MessagingFullMessageListFetchService', () => { }); it('should process Google message list fetch correctly', async () => { - await messagingFullMessageListFetchService.processMessageListFetch( + await messagingMessageListFetchService.processMessageListFetch( mockGoogleMessageChannel, workspaceId, ); @@ -218,9 +217,9 @@ describe('MessagingFullMessageListFetchService', () => { messageChannelSyncStatusService.markAsMessagesListFetchOngoing, ).toHaveBeenCalledWith([mockGoogleMessageChannel.id]); - expect( - messagingGetMessageListService.getFullMessageLists, - ).toHaveBeenCalledWith(mockGoogleMessageChannel); + expect(messagingGetMessageListService.getMessageLists).toHaveBeenCalledWith( + mockGoogleMessageChannel, + ); expect(twentyORMManager.getRepository).toHaveBeenCalledWith( 'messageChannelMessageAssociation', diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.ts similarity index 92% rename from packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.ts index 93e34bb66..7d52f69ec 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service.ts @@ -17,7 +17,7 @@ import { MessageImportSyncStep, } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; @Injectable() -export class MessagingFullMessageListFetchService { +export class MessagingMessageListFetchService { constructor( @InjectCacheStorage(CacheStorageNamespace.ModuleMessaging) private readonly cacheStorage: CacheStorageService, @@ -38,17 +38,17 @@ export class MessagingFullMessageListFetchService { [messageChannel.id], ); - const fullMessageLists = - await this.messagingGetMessageListService.getFullMessageLists( + const messageLists = + await this.messagingGetMessageListService.getMessageLists( messageChannel, ); - const isEmptyMailbox = fullMessageLists.some( - (fullMessageList) => fullMessageList.messageExternalIds.length === 0, + const isEmptyMailbox = messageLists.some( + (messageList) => messageList.messageExternalIds.length === 0, ); if (isEmptyMailbox) { - await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch( + await this.messageChannelSyncStatusService.resetAndScheduleMessageListFetch( [messageChannel.id], workspaceId, ); @@ -56,9 +56,8 @@ export class MessagingFullMessageListFetchService { return; } - for (const fullMessageList of fullMessageLists) { - const { messageExternalIds, nextSyncCursor, folderId } = - fullMessageList; + for (const messageList of messageLists) { + const { messageExternalIds, nextSyncCursor, folderId } = messageList; const messageChannelMessageAssociationRepository = await this.twentyORMManager.getRepository( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.spec.ts index ec6484ddb..51fbca8c0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.spec.ts @@ -66,7 +66,7 @@ describe('MessagingMessagesImportService', () => { provide: MessageChannelSyncStatusService, useValue: { markAsMessagesImportOngoing: jest.fn().mockResolvedValue(undefined), - markAsCompletedAndSchedulePartialMessageListFetch: jest + markAsCompletedAndScheduleMessageListFetch: jest .fn() .mockResolvedValue(undefined), scheduleMessagesImport: jest.fn().mockResolvedValue(undefined), @@ -191,7 +191,7 @@ describe('MessagingMessagesImportService', () => { it('should fails if SyncStage is not MESSAGES_IMPORT_PENDING', async () => { mockMessageChannel.syncStage = - MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING; + MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING; expect( service.processMessageBatchImport( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts index 9b1ccceff..58d7e0d0d 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts @@ -88,7 +88,7 @@ export class MessagingMessagesImportService { ); if (!messageIdsToFetch?.length) { - await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( + await this.messageChannelSyncStatusService.markAsCompletedAndScheduleMessageListFetch( [messageChannel.id], ); @@ -125,7 +125,7 @@ export class MessagingMessagesImportService { if ( messageIdsToFetch.length < MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE ) { - await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( + await this.messageChannelSyncStatusService.markAsCompletedAndScheduleMessageListFetch( [messageChannel.id], ); } else { diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts deleted file mode 100644 index 21ce030e3..000000000 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ /dev/null @@ -1,144 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { In } from 'typeorm'; - -import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; -import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; -import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; -import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; -import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; -import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; -import { - MessageImportExceptionHandlerService, - MessageImportSyncStep, -} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; - -@Injectable() -export class MessagingPartialMessageListFetchService { - constructor( - @InjectCacheStorage(CacheStorageNamespace.ModuleMessaging) - private readonly cacheStorage: CacheStorageService, - private readonly messagingGetMessageListService: MessagingGetMessageListService, - private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService, - private readonly twentyORMManager: TwentyORMManager, - private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, - private readonly messagingMessageCleanerService: MessagingMessageCleanerService, - private readonly messagingCursorService: MessagingCursorService, - ) {} - - public async processMessageListFetch( - messageChannel: MessageChannelWorkspaceEntity, - connectedAccount: ConnectedAccountWorkspaceEntity, - workspaceId: string, - ): Promise { - try { - await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing( - [messageChannel.id], - ); - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - await messageChannelRepository.update( - { - id: messageChannel.id, - }, - { - throttleFailureCount: 0, - }, - ); - - const partialMessageLists = - await this.messagingGetMessageListService.getPartialMessageLists( - messageChannel, - ); - - for (const partialMessageList of partialMessageLists) { - const { - messageExternalIds, - messageExternalIdsToDelete, - previousSyncCursor, - nextSyncCursor, - folderId, - } = partialMessageList; - - const isPartialImportFinished = this.isPartialImportFinished( - previousSyncCursor, - nextSyncCursor, - ); - - if (isPartialImportFinished) { - continue; - } - - await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:${messageChannel.id}`, - messageExternalIds, - ); - - const messageChannelMessageAssociationRepository = - await this.twentyORMManager.getRepository( - 'messageChannelMessageAssociation', - ); - - if (messageExternalIdsToDelete.length) { - await messageChannelMessageAssociationRepository.delete({ - messageChannelId: messageChannel.id, - messageExternalId: In(messageExternalIdsToDelete), - }); - - await this.messagingMessageCleanerService.cleanWorkspaceThreads( - workspaceId, - ); - } - - await this.messagingCursorService.updateCursor( - messageChannel, - nextSyncCursor, - folderId, - ); - } - - const isPartialImportFinishedForAllFolders = partialMessageLists.every( - (partialMessageList) => - this.isPartialImportFinished( - partialMessageList.previousSyncCursor, - partialMessageList.nextSyncCursor, - ), - ); - - if (isPartialImportFinishedForAllFolders) { - await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( - [messageChannel.id], - ); - - return; - } - - await this.messageChannelSyncStatusService.scheduleMessagesImport([ - messageChannel.id, - ]); - } catch (error) { - await this.messageImportErrorHandlerService.handleDriverException( - error, - MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH, - messageChannel, - workspaceId, - ); - } - } - - private isPartialImportFinished( - previousSyncCursor: string, - nextSyncCursor: string, - ): boolean { - return previousSyncCursor === nextSyncCursor; - } -} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-args.type.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-args.type.ts new file mode 100644 index 000000000..785f43728 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-args.type.ts @@ -0,0 +1,15 @@ +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; + +export type GetMessageListsArgs = { + messageChannel: Pick; + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'provider' | 'refreshToken' | 'id' | 'handle' | 'connectionParameters' + >; + messageFolders: Pick< + MessageFolderWorkspaceEntity, + 'name' | 'syncCursor' | 'id' + >[]; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-response.type.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-response.type.ts new file mode 100644 index 000000000..ca97e4820 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/types/get-message-lists-response.type.ts @@ -0,0 +1,9 @@ +export type GetOneMessageListResponse = { + messageExternalIds: string[]; + messageExternalIdsToDelete: string[]; + previousSyncCursor: string; + nextSyncCursor: string; + folderId: string | undefined; +}; + +export type GetMessageListsResponse = Array;