From a4806b72c715d286fc4914a34c6fd110795f9ff5 Mon Sep 17 00:00:00 2001 From: Guillim Date: Tue, 11 Feb 2025 17:19:53 +0100 Subject: [PATCH] folders (#10081) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Folders Adding the possibility to synchronize messages form more than one microsoft folder (think "inbox" or "sent items") It will keep the current way for gmail. - step 1 : implement a first version of full message & partial message ✅ - step 2 : implement retro-compatibility which includes the command to run the migration to backfill microsoft synccursor from messageChannelt o messageFolders --- .../auth/services/microsoft-apis.service.ts | 31 ++- .../gmail-get-message-list.service.ts | 1 + .../microsoft/mocks/microsoft-api-examples.ts | 13 +- ...osoft-get-message-list.service.dev.spec.ts | 201 +++++++++++++++++- .../microsoft-get-message-list.service.ts | 107 +++++++++- .../drivers/microsoft/types/folders.ts | 4 + .../exceptions/message-import.exception.ts | 1 + .../jobs/messaging-message-list-fetch.job.ts | 2 +- .../messaging-import-manager.module.ts | 3 +- .../services/messaging-cursor.service.ts | 103 +++++++++ ...ssaging-full-message-list-fetch.service.ts | 122 +++++------ .../messaging-get-message-list.service.ts | 94 +++++--- ...ging-partial-message-list-fetch.service.ts | 127 ++++++----- 13 files changed, 644 insertions(+), 165 deletions(-) create mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/types/folders.ts create mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-cursor.service.ts diff --git a/packages/twenty-server/src/engine/core-modules/auth/services/microsoft-apis.service.ts b/packages/twenty-server/src/engine/core-modules/auth/services/microsoft-apis.service.ts index 44c6d1227..c95dfe2e8 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/services/microsoft-apis.service.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/services/microsoft-apis.service.ts @@ -31,12 +31,13 @@ import { MessageChannelVisibility, MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; +import { MessageFolderName } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/folders'; import { MessagingMessageListFetchJob, MessagingMessageListFetchJobData, } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; - @Injectable() export class MicrosoftAPIsService { constructor( @@ -94,6 +95,12 @@ export class MicrosoftAPIsService { 'messageChannel', ); + const messageFolderRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'messageFolder', + ); + const workspaceDataSource = await this.twentyORMGlobalManager.getDataSourceForWorkspace(workspaceId); @@ -149,6 +156,28 @@ export class MicrosoftAPIsService { manager, ); + await messageFolderRepository.save( + { + id: v4(), + messageChannelId: newMessageChannel.id, + name: MessageFolderName.INBOX, + syncCursor: '', + }, + {}, + manager, + ); + + await messageFolderRepository.save( + { + id: v4(), + messageChannelId: newMessageChannel.id, + name: MessageFolderName.SENT_ITEMS, + syncCursor: '', + }, + {}, + manager, + ); + const messageChannelMetadata = await this.objectMetadataRepository.findOneOrFail({ where: { nameSingular: 'messageChannel', workspaceId }, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts index afb5ecb22..73cf16736 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts @@ -146,6 +146,7 @@ export class GmailGetMessageListService { return { messageExternalIds: messagesAddedFiltered, messageExternalIdsToDelete: messagesDeleted, + previousSyncCursor: syncCursor, nextSyncCursor, }; } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/mocks/microsoft-api-examples.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/mocks/microsoft-api-examples.ts index 0e08ccb67..d7ad24225 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/mocks/microsoft-api-examples.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/mocks/microsoft-api-examples.ts @@ -1,15 +1,20 @@ -export const microsoftGraphWithMessages = { +export const microsoftGraphWithMessagesDeltaLink = { '@odata.context': 'https://graph.microsoft.com/beta/$metadata#Collection(message)', value: [ { '@odata.type': '#microsoft.graph.message', '@odata.etag': 'W/"CQAAABYAAAAadQ+1xAL8SLCZzf1KYyk+AAACItFa"', - id: 'AAMkAGZlMDQ1NjU5LTUzN2UtNDAyMC1hNmVlLTZhZmExMGU3ZDU1NwBGAAAAAADzAhgkpMbwQYnkXH1D-Va3BwAadQ_1xAL8SLCZzf1KYyk_AAAAAAEMAAAadQ_1xAL8SLCZzf1KYyk_AAACJSmSAAA=', + id: 'AAkALgAAAAAAHYQDEapmEc2byACqAC-EWg0AGnUPtcQC-Eiwmc39SmMpPgAAEksJ3gAA', + }, + { + '@odata.type': '#microsoft.graph.message', + '@odata.etag': 'W/"CQAAABYAAAAadQ+1xAL8SLCZzf1KYyk+AAANikYP', + id: 'AAkALgAAAAAAHYQDEapmEc2byACqAC-EWg0AGnUPtcQC-Eiwmc39SmMpPgAADZJ8HwAA', }, ], - '@odata.nextLink': - "https://graph.microsoft.com/beta/me/mailFolders('inbox')/messages/delta?$skiptoken=jWnSM_TVmEdmKBzfVjDdNbDwpt3yYSUqEf9CFdhRcTxhbogC9oaTvY1ZdONMplHuz0pwtPay_qkEcFQ5RLEuDZ3O6IgnI5FXRcfekzOECWlL7zRVdGBidZ5TkXmXV7O7P8cxtvBMFJ2_dV951teFMatpdnD6hvksBK0Ff4tJKfo.HvZwAw_DM9PR3xf90ThtbqSdMCkGCHNPkjpaedxSBN3", + '@odata.deltaLink': + "https://graph.microsoft.com/beta/me/mailFolders('inbox')/messages/delta?$skiptoken=jWnSM_TVmEdmKBzfVjDdNbDwpt3yYSUqEf9CFdhRcTxhbogC9oaTvY1ZdONMplHuz0pwtPay_qkEcFQ5RLEuDZ3O6IgnI5FXRcfekzOECWlL7zRVdGBidZ5TkXmXV7O7P8cxtvBMFJ2_dV951teFMatpdnD6hvksBK0Ff4tJKfo.HvZwAw_DM9PR3xf90ThtbqSdMCkGCHNPkjpaedxSBN4", }; export const microsoftGraphBatchWithTwoMessagesResponse = [ diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts index 29b38ada6..e78ffb518 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.dev.spec.ts @@ -5,13 +5,22 @@ import { ConnectedAccountProvider } from 'twenty-shared'; import { EnvironmentModule } from 'src/engine/core-modules/environment/environment.module'; import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; +import { microsoftGraphWithMessagesDeltaLink } from 'src/modules/messaging/message-import-manager/drivers/microsoft/mocks/microsoft-api-examples'; import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-manager/drivers/microsoft/providers/microsoft-client.provider'; +import { MessageFolderName } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/folders'; import { MicrosoftGetMessageListService } from './microsoft-get-message-list.service'; import { MicrosoftHandleErrorService } from './microsoft-handle-error.service'; - +// in case you have "Please provide a valid token" it may be because you need to pass the env varible to the .env.test file const refreshToken = 'replace-with-your-refresh-token'; const syncCursor = `replace-with-your-sync-cursor`; +const mockConnectedAccount = { + id: 'connected-account-id', + provider: ConnectedAccountProvider.MICROSOFT, + refreshToken: refreshToken, +}; xdescribe('Microsoft dev tests : get message list service', () => { let service: MicrosoftGetMessageListService; @@ -33,14 +42,11 @@ xdescribe('Microsoft dev tests : get message list service', () => { ); }); - const mockConnectedAccount = { - id: 'connected-account-id', - provider: ConnectedAccountProvider.MICROSOFT, - refreshToken: refreshToken, - }; - it('Should fetch and return message list successfully', async () => { - const result = await service.getFullMessageList(mockConnectedAccount); + const result = await service.getFullMessageList( + mockConnectedAccount, + MessageFolderName.INBOX, + ); expect(result.messageExternalIds.length).toBeGreaterThan(0); }); @@ -53,11 +59,15 @@ xdescribe('Microsoft dev tests : get message list service', () => { }; await expect( - service.getFullMessageList(mockConnectedAccountUnvalid), + service.getFullMessageList( + mockConnectedAccountUnvalid, + MessageFolderName.INBOX, + ), ).rejects.toThrowError('Access token is undefined or empty'); }); - it('Should fetch and return partial message list successfully', async () => { + // if you need to run this test, you need to manually update the syncCursor to a valid one + xit('Should fetch and return partial message list successfully', async () => { const result = await service.getPartialMessageList( mockConnectedAccount, syncCursor, @@ -80,3 +90,174 @@ xdescribe('Microsoft dev tests : get message list service', () => { ).rejects.toThrowError(/Missing SyncCursor/g); }); }); + +xdescribe('Microsoft dev tests : get full message list service for folders', () => { + let service: MicrosoftGetMessageListService; + + const inboxFolder = new MessageFolderWorkspaceEntity(); + + inboxFolder.id = 'inbox-folder-id'; + inboxFolder.name = MessageFolderName.INBOX; + inboxFolder.syncCursor = 'inbox-sync-cursor'; + inboxFolder.messageChannelId = 'message-channel-1'; + + const sentFolder = new MessageFolderWorkspaceEntity(); + + sentFolder.id = 'sent-folder-id'; + sentFolder.name = MessageFolderName.SENT_ITEMS; + sentFolder.syncCursor = 'sent-sync-cursor'; + sentFolder.messageChannelId = 'message-channel-1'; + + const otherFolder = new MessageFolderWorkspaceEntity(); + + otherFolder.id = 'other-folder-id'; + otherFolder.name = 'other'; + otherFolder.syncCursor = 'other-sync-cursor'; + otherFolder.messageChannelId = 'message-channel-2'; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + imports: [EnvironmentModule.forRoot({})], + providers: [ + MicrosoftGetMessageListService, + MicrosoftClientProvider, + MicrosoftHandleErrorService, + MicrosoftOAuth2ClientManagerService, + ConfigService, + ], + }).compile(); + + service = module.get( + MicrosoftGetMessageListService, + ); + }); + + it('Should return empty array', async () => { + const result = await service.getFullMessageListForFolders( + mockConnectedAccount, + [], + ); + + expect(result.length).toBe(0); + }); + + it('Should return an array of one item', async () => { + const result = await service.getFullMessageListForFolders( + mockConnectedAccount, + [inboxFolder], + ); + + expect(result.length).toBe(1); + expect(result[0].folderId).toBe(inboxFolder.id); + expect(result[0].messageExternalIds.length).toBeGreaterThan(0); + }); + + it('Should return an array of two items', async () => { + const result = await service.getFullMessageListForFolders( + mockConnectedAccount, + [inboxFolder, sentFolder], + ); + + expect(result.length).toBe(2); + }); +}); + +xdescribe('Microsoft dev tests : get partial message list service for folders', () => { + let service: MicrosoftGetMessageListService; + + const inboxFolder = new MessageFolderWorkspaceEntity(); + + inboxFolder.id = 'inbox-folder-id'; + inboxFolder.name = MessageFolderName.INBOX; + inboxFolder.syncCursor = 'inbox-sync-cursor'; + inboxFolder.messageChannelId = 'message-channel-1'; + + const sentFolder = new MessageFolderWorkspaceEntity(); + + sentFolder.id = 'sent-folder-id'; + sentFolder.name = MessageFolderName.SENT_ITEMS; + sentFolder.syncCursor = 'sent-sync-cursor'; + sentFolder.messageChannelId = 'message-channel-1'; + + const otherFolder = new MessageFolderWorkspaceEntity(); + + otherFolder.id = 'other-folder-id'; + otherFolder.name = 'other'; + otherFolder.syncCursor = 'other-sync-cursor'; + otherFolder.messageChannelId = 'message-channel-2'; + + const messageChannelNoFolders = new MessageChannelWorkspaceEntity(); + + messageChannelNoFolders.id = 'message-channel-0'; + messageChannelNoFolders.messageFolders = []; + messageChannelNoFolders.syncCursor = ''; + + const messageChannelMicrosoftOneFolder = new MessageChannelWorkspaceEntity(); + + messageChannelMicrosoftOneFolder.id = 'message-channel-1'; + messageChannelMicrosoftOneFolder.messageFolders = [inboxFolder]; + messageChannelMicrosoftOneFolder.syncCursor = ''; + + const messageChannelMicrosoft = new MessageChannelWorkspaceEntity(); + + messageChannelMicrosoft.id = 'message-channel-2'; + messageChannelMicrosoft.messageFolders = [inboxFolder, sentFolder]; + messageChannelMicrosoft.syncCursor = ''; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + imports: [EnvironmentModule.forRoot({})], + providers: [ + MicrosoftGetMessageListService, + MicrosoftClientProvider, + MicrosoftHandleErrorService, + MicrosoftOAuth2ClientManagerService, + ConfigService, + ], + }).compile(); + + service = module.get( + MicrosoftGetMessageListService, + ); + + const mockMicrosoftClient = { + api: jest.fn().mockReturnThis(), + version: jest.fn().mockReturnThis(), + headers: jest.fn().mockReturnThis(), + get: jest.fn().mockResolvedValue(microsoftGraphWithMessagesDeltaLink), + }; + + jest + .spyOn(MicrosoftClientProvider.prototype, 'getMicrosoftClient') + .mockResolvedValue(mockMicrosoftClient as any); + }); + + it('Should return empty array', async () => { + const result = await service.getPartialMessageListForFolders( + mockConnectedAccount, + messageChannelNoFolders, + ); + + expect(result.length).toBe(0); + }); + + it('Should return an array of one items', async () => { + const result = await service.getPartialMessageListForFolders( + mockConnectedAccount, + messageChannelMicrosoftOneFolder, + ); + + expect(result.length).toBe(1); + expect(result[0].folderId).toBe(inboxFolder.id); + expect(result[0].messageExternalIds.length).toBeGreaterThan(0); + }); + + it('Should return an array of two items', async () => { + const result = await service.getPartialMessageListForFolders( + mockConnectedAccount, + messageChannelMicrosoft, + ); + + expect(result.length).toBe(2); + }); +}); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts index 335665346..4d10286a0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service.ts @@ -5,19 +5,25 @@ import { PageIterator, PageIteratorCallback, } from '@microsoft/microsoft-graph-client'; +import { v4 } from 'uuid'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { MessageImportDriverException, MessageImportDriverExceptionCode, } from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception'; import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-manager/drivers/microsoft/providers/microsoft-client.provider'; import { MicrosoftHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-handle-error.service'; +import { MessageFolderName } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/folders'; import { + GetFullMessageListForFoldersResponse, GetFullMessageListResponse, + GetPartialMessageListForFoldersResponse, GetPartialMessageListResponse, } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; - // Microsoft API limit is 999 messages per request on this endpoint const MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT = 999; @@ -26,14 +32,39 @@ export class MicrosoftGetMessageListService { constructor( private readonly microsoftClientProvider: MicrosoftClientProvider, private readonly microsoftHandleErrorService: MicrosoftHandleErrorService, + private readonly twentyORMManager: TwentyORMManager, ) {} + public async getFullMessageListForFolders( + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'refreshToken' | 'id' + >, + folders: Pick[], + ): Promise { + const result: GetFullMessageListForFoldersResponse[] = []; + + for (const folder of folders) { + const response = await this.getFullMessageList( + connectedAccount, + folder.name as MessageFolderName, + ); + + result.push({ + ...response, + folderId: folder.id, + }); + } + + return result; + } + public async getFullMessageList( connectedAccount: Pick< ConnectedAccountWorkspaceEntity, - 'provider' | 'refreshToken' | 'id' + 'refreshToken' | 'id' >, - syncCursor?: string, + folderName: MessageFolderName, ): Promise { const messageExternalIds: string[] = []; @@ -41,7 +72,7 @@ export class MicrosoftGetMessageListService { await this.microsoftClientProvider.getMicrosoftClient(connectedAccount); const response: PageCollection = await microsoftClient - .api(syncCursor || '/me/mailfolders/inbox/messages/delta?$select=id') + .api(`/me/mailfolders/${folderName}/messages/delta?$select=id`) .version('beta') .headers({ Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`, @@ -66,6 +97,73 @@ export class MicrosoftGetMessageListService { }; } + public async getPartialMessageListForFolders( + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'provider' | 'refreshToken' | 'id' + >, + messageChannel: MessageChannelWorkspaceEntity, + ): Promise { + const result: GetPartialMessageListForFoldersResponse[] = []; + + if (messageChannel.messageFolders.length === 0) { + // permanent solution: + // throw new MessageImportDriverException( + // `Message channel ${messageChannel.id} has no message folders`, + // MessageImportDriverExceptionCode.NOT_FOUND, + // ); + + // temporary solution: TODO: remove this once we have a permanent solution + // if no folders exist, most probably a first time sync for microsoft + // so we create the folders INBOX and SENTITEMS + // and fill the INBOX with the previous sync cursor + // and for sentitms, we do the full message list fetch + // console.warn( + // `Message channel ${messageChannel.id} has no message folders, most probably a first time`, + // ); + + const messageFolderRepository = + await this.twentyORMManager.getRepository( + 'messageFolder', + ); + + const newFolder = await messageFolderRepository.save({ + id: v4(), + messageChannelId: messageChannel.id, + name: MessageFolderName.INBOX, + syncCursor: messageChannel.syncCursor, + }); + + const response = await this.getPartialMessageList( + connectedAccount, + messageChannel.syncCursor, + ); + + result.push({ + ...response, + folderId: newFolder.id, + }); + + // we are ok with not synchronizing the legacy connected microsoft accounts. + // so we return an empty array. + return result; + } + + for (const folder of messageChannel.messageFolders) { + const response = await this.getPartialMessageList( + connectedAccount, + folder.syncCursor, + ); + + result.push({ + ...response, + folderId: folder.id, + }); + } + + return result; + } + public async getPartialMessageList( connectedAccount: Pick< ConnectedAccountWorkspaceEntity, @@ -114,6 +212,7 @@ export class MicrosoftGetMessageListService { return { messageExternalIds, messageExternalIdsToDelete, + previousSyncCursor: syncCursor, nextSyncCursor: pageIterator.getDeltaLink() || '', }; } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/types/folders.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/types/folders.ts new file mode 100644 index 000000000..aeb45fc87 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/microsoft/types/folders.ts @@ -0,0 +1,4 @@ +export enum MessageFolderName { + INBOX = 'inbox', + SENT_ITEMS = 'sentItems', +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/exceptions/message-import.exception.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/exceptions/message-import.exception.ts index 1363ddca7..f4701f536 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/exceptions/message-import.exception.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/exceptions/message-import.exception.ts @@ -11,4 +11,5 @@ export enum MessageImportExceptionCode { UNKNOWN = 'UNKNOWN', PROVIDER_NOT_SUPPORTED = 'PROVIDER_NOT_SUPPORTED', MESSAGE_CHANNEL_NOT_FOUND = 'MESSAGE_CHANNEL_NOT_FOUND', + FOLDER_ID_REQUIRED = 'FOLDER_ID_REQUIRED', } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index c7ff4bced..1a8aaf4e6 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -51,7 +51,7 @@ export class MessagingMessageListFetchJob { where: { id: messageChannelId, }, - relations: ['connectedAccount'], + relations: ['connectedAccount', 'messageFolders'], }); if (!messageChannel) { diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index d5320042f..6e9ee2ee2 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -26,6 +26,7 @@ import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job'; import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener'; import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; +import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service'; @@ -35,7 +36,6 @@ import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/m import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service'; import { MessageParticipantManagerModule } from 'src/modules/messaging/message-participant-manager/message-participant-manager.module'; import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/messaging-monitoring.module'; - @Module({ imports: [ RefreshAccessTokenManagerModule, @@ -74,6 +74,7 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess MessagingGetMessageListService, MessagingGetMessagesService, MessageImportExceptionHandlerService, + MessagingCursorService, ], exports: [], }) diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-cursor.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-cursor.service.ts new file mode 100644 index 000000000..300a7ec45 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-cursor.service.ts @@ -0,0 +1,103 @@ +import { Injectable } from '@nestjs/common'; + +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; +import { + MessageImportException, + MessageImportExceptionCode, +} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; + +@Injectable() +export class MessagingCursorService { + constructor(private readonly twentyORMManager: TwentyORMManager) {} + + public async getCursor( + messageChannel: MessageChannelWorkspaceEntity, + connectedAccount: ConnectedAccountWorkspaceEntity, + folderId?: string, + ): Promise { + const folderRepository = + await this.twentyORMManager.getRepository( + 'messageFolder', + ); + + switch (connectedAccount.provider) { + case 'google': + return messageChannel.syncCursor; + case 'microsoft': { + const folder = await folderRepository.findOne({ + where: { + id: folderId, + }, + }); + + if (!folder) { + throw new MessageImportException( + `Folder is required to get cursor for ${connectedAccount.provider}`, + MessageImportExceptionCode.FOLDER_ID_REQUIRED, + ); + } + + return folder.syncCursor; + } + + default: + throw new MessageImportException( + `Update Cursor for provider ${connectedAccount.provider} not implemented`, + MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, + ); + } + } + + public async updateCursor( + messageChannel: MessageChannelWorkspaceEntity, + nextSyncCursor: string, + folderId?: string, + ) { + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + const folderRepository = + await this.twentyORMManager.getRepository( + 'messageFolder', + ); + + if (!folderId) { + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + throttleFailureCount: 0, + syncStageStartedAt: null, + syncCursor: + !messageChannel.syncCursor || + nextSyncCursor > messageChannel.syncCursor + ? nextSyncCursor + : messageChannel.syncCursor, + }, + ); + } else { + await folderRepository.update( + { + id: folderId, + }, + { + syncCursor: nextSyncCursor, + }, + ); + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + throttleFailureCount: 0, + syncStageStartedAt: null, + }, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts index 8b9072bbe..7c343078e 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts @@ -15,8 +15,8 @@ import { MessageImportExceptionHandlerService, MessageImportSyncStep, } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; +import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; - @Injectable() export class MessagingFullMessageListFetchService { constructor( @@ -27,6 +27,7 @@ export class MessagingFullMessageListFetchService { private readonly messagingGetMessageListService: MessagingGetMessageListService, private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, private readonly messagingMessageCleanerService: MessagingMessageCleanerService, + private readonly messagingCursorService: MessagingCursorService, ) {} public async processMessageListFetch( @@ -39,81 +40,72 @@ export class MessagingFullMessageListFetchService { [messageChannel.id], ); - const { messageExternalIds, nextSyncCursor } = - await this.messagingGetMessageListService.getFullMessageList( - connectedAccount, + const fullMessageLists = + await this.messagingGetMessageListService.getFullMessageLists( + messageChannel, ); - const messageChannelMessageAssociationRepository = - await this.twentyORMManager.getRepository( - 'messageChannelMessageAssociation', + for (const fullMessageList of fullMessageLists) { + const { messageExternalIds, nextSyncCursor, folderId } = + fullMessageList; + + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', + ); + + const existingMessageChannelMessageAssociations = + await messageChannelMessageAssociationRepository.find({ + where: { + messageChannelId: messageChannel.id, + }, + }); + + const existingMessageChannelMessageAssociationsExternalIds = + existingMessageChannelMessageAssociations.map( + (messageChannelMessageAssociation) => + messageChannelMessageAssociation.messageExternalId, + ); + + const messageExternalIdsToImport = messageExternalIds.filter( + (messageExternalId) => + !existingMessageChannelMessageAssociationsExternalIds.includes( + messageExternalId, + ), ); - const existingMessageChannelMessageAssociations = - await messageChannelMessageAssociationRepository.find({ - where: { + const messageExternalIdsToDelete = + existingMessageChannelMessageAssociationsExternalIds.filter( + (existingMessageCMAExternalId) => + existingMessageCMAExternalId && + !messageExternalIds.includes(existingMessageCMAExternalId), + ); + + if (messageExternalIdsToDelete.length) { + await messageChannelMessageAssociationRepository.delete({ messageChannelId: messageChannel.id, - }, - }); + messageExternalId: In(messageExternalIdsToDelete), + }); - const existingMessageChannelMessageAssociationsExternalIds = - existingMessageChannelMessageAssociations.map( - (messageChannelMessageAssociation) => - messageChannelMessageAssociation.messageExternalId, - ); + await this.messagingMessageCleanerService.cleanWorkspaceThreads( + workspaceId, + ); + } - const messageExternalIdsToImport = messageExternalIds.filter( - (messageExternalId) => - !existingMessageChannelMessageAssociationsExternalIds.includes( - messageExternalId, - ), - ); + if (messageExternalIdsToImport.length) { + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:${messageChannel.id}`, + messageExternalIdsToImport, + ); + } - const messageExternalIdsToDelete = - existingMessageChannelMessageAssociationsExternalIds.filter( - (existingMessageCMAExternalId) => - existingMessageCMAExternalId && - !messageExternalIds.includes(existingMessageCMAExternalId), - ); - - if (messageExternalIdsToDelete.length) { - await messageChannelMessageAssociationRepository.delete({ - messageChannelId: messageChannel.id, - messageExternalId: In(messageExternalIdsToDelete), - }); - - await this.messagingMessageCleanerService.cleanWorkspaceThreads( - workspaceId, + await this.messagingCursorService.updateCursor( + messageChannel, + nextSyncCursor, + folderId, ); } - if (messageExternalIdsToImport.length) { - await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:${messageChannel.id}`, - messageExternalIdsToImport, - ); - } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - await messageChannelRepository.update( - { - id: messageChannel.id, - }, - { - throttleFailureCount: 0, - syncStageStartedAt: null, - syncCursor: - !messageChannel.syncCursor || - nextSyncCursor > messageChannel.syncCursor - ? nextSyncCursor - : messageChannel.syncCursor, - }, - ); - await this.messageChannelSyncStatusService.scheduleMessagesImport([ messageChannel.id, ]); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts index a0308572f..2e5cab1d3 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts @@ -1,75 +1,107 @@ import { Injectable } from '@nestjs/common'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity'; import { GmailGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service'; import { MicrosoftGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service'; import { MessageImportException, MessageImportExceptionCode, } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; +import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; export type GetFullMessageListResponse = { messageExternalIds: string[]; nextSyncCursor: string; }; +export type GetFullMessageListForFoldersResponse = + GetFullMessageListResponse & { + folderId: string | undefined; + }; + export type GetPartialMessageListResponse = { messageExternalIds: string[]; messageExternalIdsToDelete: string[]; + previousSyncCursor: string; nextSyncCursor: string; }; +export type GetPartialMessageListForFoldersResponse = + GetPartialMessageListResponse & { + folderId: string | undefined; + }; + @Injectable() export class MessagingGetMessageListService { constructor( private readonly gmailGetMessageListService: GmailGetMessageListService, private readonly microsoftGetMessageListService: MicrosoftGetMessageListService, + private readonly messagingCursorService: MessagingCursorService, + private readonly twentyORMManager: TwentyORMManager, ) {} - public async getFullMessageList( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'provider' | 'refreshToken' | 'id' | 'handle' - >, - ): Promise { - switch (connectedAccount.provider) { + public async getFullMessageLists( + messageChannel: MessageChannelWorkspaceEntity, + ): Promise { + switch (messageChannel.connectedAccount.provider) { case 'google': - return this.gmailGetMessageListService.getFullMessageList( - connectedAccount, - ); - case 'microsoft': - return this.microsoftGetMessageListService.getFullMessageList( - connectedAccount, + return [ + { + ...(await this.gmailGetMessageListService.getFullMessageList( + messageChannel.connectedAccount, + )), + folderId: undefined, + }, + ]; + case 'microsoft': { + const folderRepository = + await this.twentyORMManager.getRepository( + 'messageFolder', + ); + + const folders = await folderRepository.find({ + where: { + messageChannelId: messageChannel.id, + }, + }); + + return this.microsoftGetMessageListService.getFullMessageListForFolders( + messageChannel.connectedAccount, + folders, ); + } default: throw new MessageImportException( - `Provider ${connectedAccount.provider} is not supported`, + `Provider ${messageChannel.connectedAccount.provider} is not supported`, MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, ); } } - public async getPartialMessageList( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'provider' | 'refreshToken' | 'id' - >, - syncCursor: string, - ): Promise { - switch (connectedAccount.provider) { + public async getPartialMessageLists( + messageChannel: MessageChannelWorkspaceEntity, + ): Promise { + switch (messageChannel.connectedAccount.provider) { case 'google': - return this.gmailGetMessageListService.getPartialMessageList( - connectedAccount, - syncCursor, - ); + return [ + { + ...(await this.gmailGetMessageListService.getPartialMessageList( + messageChannel.connectedAccount, + messageChannel.syncCursor, + )), + folderId: undefined, + }, + ]; case 'microsoft': - return this.microsoftGetMessageListService.getPartialMessageList( - connectedAccount, - syncCursor, + return this.microsoftGetMessageListService.getPartialMessageListForFolders( + messageChannel.connectedAccount, + messageChannel, ); default: throw new MessageImportException( - `Provider ${connectedAccount.provider} is not supported`, + `Provider ${messageChannel.connectedAccount.provider} is not supported`, MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, ); } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts index b80ebc9c8..ea75a01ce 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts @@ -15,6 +15,7 @@ import { MessageImportExceptionHandlerService, MessageImportSyncStep, } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; +import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; @Injectable() @@ -31,6 +32,7 @@ export class MessagingPartialMessageListFetchService { private readonly twentyORMManager: TwentyORMManager, private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, private readonly messagingMessageCleanerService: MessagingMessageCleanerService, + private readonly messagingCursorService: MessagingCursorService, ) {} public async processMessageListFetch( @@ -57,17 +59,79 @@ export class MessagingPartialMessageListFetchService { }, ); - const syncCursor = messageChannel.syncCursor; - - const { messageExternalIds, messageExternalIdsToDelete, nextSyncCursor } = - await this.messagingGetMessageListService.getPartialMessageList( - connectedAccount, - syncCursor, + const partialMessageLists = + await this.messagingGetMessageListService.getPartialMessageLists( + messageChannel, + ); + + for (const partialMessageList of partialMessageLists) { + const { + messageExternalIds, + messageExternalIdsToDelete, + previousSyncCursor, + nextSyncCursor, + folderId, + } = partialMessageList; + + const isPartialImportFinished = this.isPartialImportFinished( + previousSyncCursor, + nextSyncCursor, + ); + + if (isPartialImportFinished) { + this.logger.log( + `Partial message list import done on message channel ${messageChannel.id} in folder ${folderId} for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + continue; + } + + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:${messageChannel.id}`, + messageExternalIds, ); - if (syncCursor === nextSyncCursor) { this.logger.log( - `Partial message list import done with history ${syncCursor} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`, + `Added ${messageExternalIds.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', + ); + + if (messageExternalIdsToDelete.length) { + await messageChannelMessageAssociationRepository.delete({ + messageChannelId: messageChannel.id, + messageExternalId: In(messageExternalIdsToDelete), + }); + + await this.messagingMessageCleanerService.cleanWorkspaceThreads( + workspaceId, + ); + } + + this.logger.log( + `Deleted ${messageExternalIdsToDelete.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + await this.messagingCursorService.updateCursor( + messageChannel, + nextSyncCursor, + folderId, + ); + } + + const isPartialImportFinishedForAllFolders = partialMessageLists.every( + (partialMessageList) => + this.isPartialImportFinished( + partialMessageList.previousSyncCursor, + partialMessageList.nextSyncCursor, + ), + ); + + if (isPartialImportFinishedForAllFolders) { + this.logger.log( + `Partial message list import done on message channel ${messageChannel.id} entirely for workspace ${workspaceId} and account ${connectedAccount.id}`, ); await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( @@ -77,46 +141,6 @@ export class MessagingPartialMessageListFetchService { return; } - await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:${messageChannel.id}`, - messageExternalIds, - ); - - this.logger.log( - `Added ${messageExternalIds.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`, - ); - - const messageChannelMessageAssociationRepository = - await this.twentyORMManager.getRepository( - 'messageChannelMessageAssociation', - ); - - if (messageExternalIdsToDelete.length) { - await messageChannelMessageAssociationRepository.delete({ - messageChannelId: messageChannel.id, - messageExternalId: In(messageExternalIdsToDelete), - }); - - await this.messagingMessageCleanerService.cleanWorkspaceThreads( - workspaceId, - ); - } - - this.logger.log( - `Deleted ${messageExternalIdsToDelete.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`, - ); - - if (!syncCursor || nextSyncCursor > syncCursor) { - await messageChannelRepository.update( - { - id: messageChannel.id, - }, - { - syncCursor: nextSyncCursor, - }, - ); - } - await this.messageChannelSyncStatusService.scheduleMessagesImport([ messageChannel.id, ]); @@ -129,4 +153,11 @@ export class MessagingPartialMessageListFetchService { ); } } + + private isPartialImportFinished( + previousSyncCursor: string, + nextSyncCursor: string, + ): boolean { + return previousSyncCursor === nextSyncCursor; + } }