# Folders

Adding the possibility to synchronize messages form more than one
microsoft folder (think "inbox" or "sent items")

It will keep the current way for gmail.

- step 1 : implement a first version of full message & partial message 
- step 2 : implement retro-compatibility which includes the command to
run the migration to backfill microsoft synccursor from messageChannelt
o messageFolders
This commit is contained in:
Guillim
2025-02-11 17:19:53 +01:00
committed by GitHub
parent b4fd408109
commit a4806b72c7
13 changed files with 644 additions and 165 deletions

View File

@ -31,12 +31,13 @@ import {
MessageChannelVisibility, MessageChannelVisibility,
MessageChannelWorkspaceEntity, MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
import { MessageFolderName } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/folders';
import { import {
MessagingMessageListFetchJob, MessagingMessageListFetchJob,
MessagingMessageListFetchJobData, MessagingMessageListFetchJobData,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job'; } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
@Injectable() @Injectable()
export class MicrosoftAPIsService { export class MicrosoftAPIsService {
constructor( constructor(
@ -94,6 +95,12 @@ export class MicrosoftAPIsService {
'messageChannel', 'messageChannel',
); );
const messageFolderRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageFolderWorkspaceEntity>(
workspaceId,
'messageFolder',
);
const workspaceDataSource = const workspaceDataSource =
await this.twentyORMGlobalManager.getDataSourceForWorkspace(workspaceId); await this.twentyORMGlobalManager.getDataSourceForWorkspace(workspaceId);
@ -149,6 +156,28 @@ export class MicrosoftAPIsService {
manager, manager,
); );
await messageFolderRepository.save(
{
id: v4(),
messageChannelId: newMessageChannel.id,
name: MessageFolderName.INBOX,
syncCursor: '',
},
{},
manager,
);
await messageFolderRepository.save(
{
id: v4(),
messageChannelId: newMessageChannel.id,
name: MessageFolderName.SENT_ITEMS,
syncCursor: '',
},
{},
manager,
);
const messageChannelMetadata = const messageChannelMetadata =
await this.objectMetadataRepository.findOneOrFail({ await this.objectMetadataRepository.findOneOrFail({
where: { nameSingular: 'messageChannel', workspaceId }, where: { nameSingular: 'messageChannel', workspaceId },

View File

@ -146,6 +146,7 @@ export class GmailGetMessageListService {
return { return {
messageExternalIds: messagesAddedFiltered, messageExternalIds: messagesAddedFiltered,
messageExternalIdsToDelete: messagesDeleted, messageExternalIdsToDelete: messagesDeleted,
previousSyncCursor: syncCursor,
nextSyncCursor, nextSyncCursor,
}; };
} }

View File

@ -1,15 +1,20 @@
export const microsoftGraphWithMessages = { export const microsoftGraphWithMessagesDeltaLink = {
'@odata.context': '@odata.context':
'https://graph.microsoft.com/beta/$metadata#Collection(message)', 'https://graph.microsoft.com/beta/$metadata#Collection(message)',
value: [ value: [
{ {
'@odata.type': '#microsoft.graph.message', '@odata.type': '#microsoft.graph.message',
'@odata.etag': 'W/"CQAAABYAAAAadQ+1xAL8SLCZzf1KYyk+AAACItFa"', '@odata.etag': 'W/"CQAAABYAAAAadQ+1xAL8SLCZzf1KYyk+AAACItFa"',
id: 'AAMkAGZlMDQ1NjU5LTUzN2UtNDAyMC1hNmVlLTZhZmExMGU3ZDU1NwBGAAAAAADzAhgkpMbwQYnkXH1D-Va3BwAadQ_1xAL8SLCZzf1KYyk_AAAAAAEMAAAadQ_1xAL8SLCZzf1KYyk_AAACJSmSAAA=', id: 'AAkALgAAAAAAHYQDEapmEc2byACqAC-EWg0AGnUPtcQC-Eiwmc39SmMpPgAAEksJ3gAA',
},
{
'@odata.type': '#microsoft.graph.message',
'@odata.etag': 'W/"CQAAABYAAAAadQ+1xAL8SLCZzf1KYyk+AAANikYP',
id: 'AAkALgAAAAAAHYQDEapmEc2byACqAC-EWg0AGnUPtcQC-Eiwmc39SmMpPgAADZJ8HwAA',
}, },
], ],
'@odata.nextLink': '@odata.deltaLink':
"https://graph.microsoft.com/beta/me/mailFolders('inbox')/messages/delta?$skiptoken=jWnSM_TVmEdmKBzfVjDdNbDwpt3yYSUqEf9CFdhRcTxhbogC9oaTvY1ZdONMplHuz0pwtPay_qkEcFQ5RLEuDZ3O6IgnI5FXRcfekzOECWlL7zRVdGBidZ5TkXmXV7O7P8cxtvBMFJ2_dV951teFMatpdnD6hvksBK0Ff4tJKfo.HvZwAw_DM9PR3xf90ThtbqSdMCkGCHNPkjpaedxSBN3", "https://graph.microsoft.com/beta/me/mailFolders('inbox')/messages/delta?$skiptoken=jWnSM_TVmEdmKBzfVjDdNbDwpt3yYSUqEf9CFdhRcTxhbogC9oaTvY1ZdONMplHuz0pwtPay_qkEcFQ5RLEuDZ3O6IgnI5FXRcfekzOECWlL7zRVdGBidZ5TkXmXV7O7P8cxtvBMFJ2_dV951teFMatpdnD6hvksBK0Ff4tJKfo.HvZwAw_DM9PR3xf90ThtbqSdMCkGCHNPkjpaedxSBN4",
}; };
export const microsoftGraphBatchWithTwoMessagesResponse = [ export const microsoftGraphBatchWithTwoMessagesResponse = [

View File

@ -5,13 +5,22 @@ import { ConnectedAccountProvider } from 'twenty-shared';
import { EnvironmentModule } from 'src/engine/core-modules/environment/environment.module'; import { EnvironmentModule } from 'src/engine/core-modules/environment/environment.module';
import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service'; import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
import { microsoftGraphWithMessagesDeltaLink } from 'src/modules/messaging/message-import-manager/drivers/microsoft/mocks/microsoft-api-examples';
import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-manager/drivers/microsoft/providers/microsoft-client.provider'; import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-manager/drivers/microsoft/providers/microsoft-client.provider';
import { MessageFolderName } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/folders';
import { MicrosoftGetMessageListService } from './microsoft-get-message-list.service'; import { MicrosoftGetMessageListService } from './microsoft-get-message-list.service';
import { MicrosoftHandleErrorService } from './microsoft-handle-error.service'; import { MicrosoftHandleErrorService } from './microsoft-handle-error.service';
// in case you have "Please provide a valid token" it may be because you need to pass the env varible to the .env.test file
const refreshToken = 'replace-with-your-refresh-token'; const refreshToken = 'replace-with-your-refresh-token';
const syncCursor = `replace-with-your-sync-cursor`; const syncCursor = `replace-with-your-sync-cursor`;
const mockConnectedAccount = {
id: 'connected-account-id',
provider: ConnectedAccountProvider.MICROSOFT,
refreshToken: refreshToken,
};
xdescribe('Microsoft dev tests : get message list service', () => { xdescribe('Microsoft dev tests : get message list service', () => {
let service: MicrosoftGetMessageListService; let service: MicrosoftGetMessageListService;
@ -33,14 +42,11 @@ xdescribe('Microsoft dev tests : get message list service', () => {
); );
}); });
const mockConnectedAccount = {
id: 'connected-account-id',
provider: ConnectedAccountProvider.MICROSOFT,
refreshToken: refreshToken,
};
it('Should fetch and return message list successfully', async () => { it('Should fetch and return message list successfully', async () => {
const result = await service.getFullMessageList(mockConnectedAccount); const result = await service.getFullMessageList(
mockConnectedAccount,
MessageFolderName.INBOX,
);
expect(result.messageExternalIds.length).toBeGreaterThan(0); expect(result.messageExternalIds.length).toBeGreaterThan(0);
}); });
@ -53,11 +59,15 @@ xdescribe('Microsoft dev tests : get message list service', () => {
}; };
await expect( await expect(
service.getFullMessageList(mockConnectedAccountUnvalid), service.getFullMessageList(
mockConnectedAccountUnvalid,
MessageFolderName.INBOX,
),
).rejects.toThrowError('Access token is undefined or empty'); ).rejects.toThrowError('Access token is undefined or empty');
}); });
it('Should fetch and return partial message list successfully', async () => { // if you need to run this test, you need to manually update the syncCursor to a valid one
xit('Should fetch and return partial message list successfully', async () => {
const result = await service.getPartialMessageList( const result = await service.getPartialMessageList(
mockConnectedAccount, mockConnectedAccount,
syncCursor, syncCursor,
@ -80,3 +90,174 @@ xdescribe('Microsoft dev tests : get message list service', () => {
).rejects.toThrowError(/Missing SyncCursor/g); ).rejects.toThrowError(/Missing SyncCursor/g);
}); });
}); });
xdescribe('Microsoft dev tests : get full message list service for folders', () => {
let service: MicrosoftGetMessageListService;
const inboxFolder = new MessageFolderWorkspaceEntity();
inboxFolder.id = 'inbox-folder-id';
inboxFolder.name = MessageFolderName.INBOX;
inboxFolder.syncCursor = 'inbox-sync-cursor';
inboxFolder.messageChannelId = 'message-channel-1';
const sentFolder = new MessageFolderWorkspaceEntity();
sentFolder.id = 'sent-folder-id';
sentFolder.name = MessageFolderName.SENT_ITEMS;
sentFolder.syncCursor = 'sent-sync-cursor';
sentFolder.messageChannelId = 'message-channel-1';
const otherFolder = new MessageFolderWorkspaceEntity();
otherFolder.id = 'other-folder-id';
otherFolder.name = 'other';
otherFolder.syncCursor = 'other-sync-cursor';
otherFolder.messageChannelId = 'message-channel-2';
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [EnvironmentModule.forRoot({})],
providers: [
MicrosoftGetMessageListService,
MicrosoftClientProvider,
MicrosoftHandleErrorService,
MicrosoftOAuth2ClientManagerService,
ConfigService,
],
}).compile();
service = module.get<MicrosoftGetMessageListService>(
MicrosoftGetMessageListService,
);
});
it('Should return empty array', async () => {
const result = await service.getFullMessageListForFolders(
mockConnectedAccount,
[],
);
expect(result.length).toBe(0);
});
it('Should return an array of one item', async () => {
const result = await service.getFullMessageListForFolders(
mockConnectedAccount,
[inboxFolder],
);
expect(result.length).toBe(1);
expect(result[0].folderId).toBe(inboxFolder.id);
expect(result[0].messageExternalIds.length).toBeGreaterThan(0);
});
it('Should return an array of two items', async () => {
const result = await service.getFullMessageListForFolders(
mockConnectedAccount,
[inboxFolder, sentFolder],
);
expect(result.length).toBe(2);
});
});
xdescribe('Microsoft dev tests : get partial message list service for folders', () => {
let service: MicrosoftGetMessageListService;
const inboxFolder = new MessageFolderWorkspaceEntity();
inboxFolder.id = 'inbox-folder-id';
inboxFolder.name = MessageFolderName.INBOX;
inboxFolder.syncCursor = 'inbox-sync-cursor';
inboxFolder.messageChannelId = 'message-channel-1';
const sentFolder = new MessageFolderWorkspaceEntity();
sentFolder.id = 'sent-folder-id';
sentFolder.name = MessageFolderName.SENT_ITEMS;
sentFolder.syncCursor = 'sent-sync-cursor';
sentFolder.messageChannelId = 'message-channel-1';
const otherFolder = new MessageFolderWorkspaceEntity();
otherFolder.id = 'other-folder-id';
otherFolder.name = 'other';
otherFolder.syncCursor = 'other-sync-cursor';
otherFolder.messageChannelId = 'message-channel-2';
const messageChannelNoFolders = new MessageChannelWorkspaceEntity();
messageChannelNoFolders.id = 'message-channel-0';
messageChannelNoFolders.messageFolders = [];
messageChannelNoFolders.syncCursor = '';
const messageChannelMicrosoftOneFolder = new MessageChannelWorkspaceEntity();
messageChannelMicrosoftOneFolder.id = 'message-channel-1';
messageChannelMicrosoftOneFolder.messageFolders = [inboxFolder];
messageChannelMicrosoftOneFolder.syncCursor = '';
const messageChannelMicrosoft = new MessageChannelWorkspaceEntity();
messageChannelMicrosoft.id = 'message-channel-2';
messageChannelMicrosoft.messageFolders = [inboxFolder, sentFolder];
messageChannelMicrosoft.syncCursor = '';
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [EnvironmentModule.forRoot({})],
providers: [
MicrosoftGetMessageListService,
MicrosoftClientProvider,
MicrosoftHandleErrorService,
MicrosoftOAuth2ClientManagerService,
ConfigService,
],
}).compile();
service = module.get<MicrosoftGetMessageListService>(
MicrosoftGetMessageListService,
);
const mockMicrosoftClient = {
api: jest.fn().mockReturnThis(),
version: jest.fn().mockReturnThis(),
headers: jest.fn().mockReturnThis(),
get: jest.fn().mockResolvedValue(microsoftGraphWithMessagesDeltaLink),
};
jest
.spyOn(MicrosoftClientProvider.prototype, 'getMicrosoftClient')
.mockResolvedValue(mockMicrosoftClient as any);
});
it('Should return empty array', async () => {
const result = await service.getPartialMessageListForFolders(
mockConnectedAccount,
messageChannelNoFolders,
);
expect(result.length).toBe(0);
});
it('Should return an array of one items', async () => {
const result = await service.getPartialMessageListForFolders(
mockConnectedAccount,
messageChannelMicrosoftOneFolder,
);
expect(result.length).toBe(1);
expect(result[0].folderId).toBe(inboxFolder.id);
expect(result[0].messageExternalIds.length).toBeGreaterThan(0);
});
it('Should return an array of two items', async () => {
const result = await service.getPartialMessageListForFolders(
mockConnectedAccount,
messageChannelMicrosoft,
);
expect(result.length).toBe(2);
});
});

View File

@ -5,19 +5,25 @@ import {
PageIterator, PageIterator,
PageIteratorCallback, PageIteratorCallback,
} from '@microsoft/microsoft-graph-client'; } from '@microsoft/microsoft-graph-client';
import { v4 } from 'uuid';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
import { import {
MessageImportDriverException, MessageImportDriverException,
MessageImportDriverExceptionCode, MessageImportDriverExceptionCode,
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception'; } from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-manager/drivers/microsoft/providers/microsoft-client.provider'; import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-manager/drivers/microsoft/providers/microsoft-client.provider';
import { MicrosoftHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-handle-error.service'; import { MicrosoftHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-handle-error.service';
import { MessageFolderName } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/folders';
import { import {
GetFullMessageListForFoldersResponse,
GetFullMessageListResponse, GetFullMessageListResponse,
GetPartialMessageListForFoldersResponse,
GetPartialMessageListResponse, GetPartialMessageListResponse,
} from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
// Microsoft API limit is 999 messages per request on this endpoint // Microsoft API limit is 999 messages per request on this endpoint
const MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT = 999; const MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT = 999;
@ -26,14 +32,39 @@ export class MicrosoftGetMessageListService {
constructor( constructor(
private readonly microsoftClientProvider: MicrosoftClientProvider, private readonly microsoftClientProvider: MicrosoftClientProvider,
private readonly microsoftHandleErrorService: MicrosoftHandleErrorService, private readonly microsoftHandleErrorService: MicrosoftHandleErrorService,
private readonly twentyORMManager: TwentyORMManager,
) {} ) {}
public async getFullMessageListForFolders(
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'refreshToken' | 'id'
>,
folders: Pick<MessageFolderWorkspaceEntity, 'id' | 'name'>[],
): Promise<GetFullMessageListForFoldersResponse[]> {
const result: GetFullMessageListForFoldersResponse[] = [];
for (const folder of folders) {
const response = await this.getFullMessageList(
connectedAccount,
folder.name as MessageFolderName,
);
result.push({
...response,
folderId: folder.id,
});
}
return result;
}
public async getFullMessageList( public async getFullMessageList(
connectedAccount: Pick< connectedAccount: Pick<
ConnectedAccountWorkspaceEntity, ConnectedAccountWorkspaceEntity,
'provider' | 'refreshToken' | 'id' 'refreshToken' | 'id'
>, >,
syncCursor?: string, folderName: MessageFolderName,
): Promise<GetFullMessageListResponse> { ): Promise<GetFullMessageListResponse> {
const messageExternalIds: string[] = []; const messageExternalIds: string[] = [];
@ -41,7 +72,7 @@ export class MicrosoftGetMessageListService {
await this.microsoftClientProvider.getMicrosoftClient(connectedAccount); await this.microsoftClientProvider.getMicrosoftClient(connectedAccount);
const response: PageCollection = await microsoftClient const response: PageCollection = await microsoftClient
.api(syncCursor || '/me/mailfolders/inbox/messages/delta?$select=id') .api(`/me/mailfolders/${folderName}/messages/delta?$select=id`)
.version('beta') .version('beta')
.headers({ .headers({
Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`, Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`,
@ -66,6 +97,73 @@ export class MicrosoftGetMessageListService {
}; };
} }
public async getPartialMessageListForFolders(
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'provider' | 'refreshToken' | 'id'
>,
messageChannel: MessageChannelWorkspaceEntity,
): Promise<GetPartialMessageListForFoldersResponse[]> {
const result: GetPartialMessageListForFoldersResponse[] = [];
if (messageChannel.messageFolders.length === 0) {
// permanent solution:
// throw new MessageImportDriverException(
// `Message channel ${messageChannel.id} has no message folders`,
// MessageImportDriverExceptionCode.NOT_FOUND,
// );
// temporary solution: TODO: remove this once we have a permanent solution
// if no folders exist, most probably a first time sync for microsoft
// so we create the folders INBOX and SENTITEMS
// and fill the INBOX with the previous sync cursor
// and for sentitms, we do the full message list fetch
// console.warn(
// `Message channel ${messageChannel.id} has no message folders, most probably a first time`,
// );
const messageFolderRepository =
await this.twentyORMManager.getRepository<MessageFolderWorkspaceEntity>(
'messageFolder',
);
const newFolder = await messageFolderRepository.save({
id: v4(),
messageChannelId: messageChannel.id,
name: MessageFolderName.INBOX,
syncCursor: messageChannel.syncCursor,
});
const response = await this.getPartialMessageList(
connectedAccount,
messageChannel.syncCursor,
);
result.push({
...response,
folderId: newFolder.id,
});
// we are ok with not synchronizing the legacy connected microsoft accounts.
// so we return an empty array.
return result;
}
for (const folder of messageChannel.messageFolders) {
const response = await this.getPartialMessageList(
connectedAccount,
folder.syncCursor,
);
result.push({
...response,
folderId: folder.id,
});
}
return result;
}
public async getPartialMessageList( public async getPartialMessageList(
connectedAccount: Pick< connectedAccount: Pick<
ConnectedAccountWorkspaceEntity, ConnectedAccountWorkspaceEntity,
@ -114,6 +212,7 @@ export class MicrosoftGetMessageListService {
return { return {
messageExternalIds, messageExternalIds,
messageExternalIdsToDelete, messageExternalIdsToDelete,
previousSyncCursor: syncCursor,
nextSyncCursor: pageIterator.getDeltaLink() || '', nextSyncCursor: pageIterator.getDeltaLink() || '',
}; };
} }

View File

@ -0,0 +1,4 @@
export enum MessageFolderName {
INBOX = 'inbox',
SENT_ITEMS = 'sentItems',
}

View File

@ -11,4 +11,5 @@ export enum MessageImportExceptionCode {
UNKNOWN = 'UNKNOWN', UNKNOWN = 'UNKNOWN',
PROVIDER_NOT_SUPPORTED = 'PROVIDER_NOT_SUPPORTED', PROVIDER_NOT_SUPPORTED = 'PROVIDER_NOT_SUPPORTED',
MESSAGE_CHANNEL_NOT_FOUND = 'MESSAGE_CHANNEL_NOT_FOUND', MESSAGE_CHANNEL_NOT_FOUND = 'MESSAGE_CHANNEL_NOT_FOUND',
FOLDER_ID_REQUIRED = 'FOLDER_ID_REQUIRED',
} }

View File

@ -51,7 +51,7 @@ export class MessagingMessageListFetchJob {
where: { where: {
id: messageChannelId, id: messageChannelId,
}, },
relations: ['connectedAccount'], relations: ['connectedAccount', 'messageFolders'],
}); });
if (!messageChannel) { if (!messageChannel) {

View File

@ -26,6 +26,7 @@ import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import
import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job'; import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';
import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener'; import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener';
import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service';
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service'; import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service';
@ -35,7 +36,6 @@ import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/m
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service'; import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service';
import { MessageParticipantManagerModule } from 'src/modules/messaging/message-participant-manager/message-participant-manager.module'; import { MessageParticipantManagerModule } from 'src/modules/messaging/message-participant-manager/message-participant-manager.module';
import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/messaging-monitoring.module'; import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/messaging-monitoring.module';
@Module({ @Module({
imports: [ imports: [
RefreshAccessTokenManagerModule, RefreshAccessTokenManagerModule,
@ -74,6 +74,7 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess
MessagingGetMessageListService, MessagingGetMessageListService,
MessagingGetMessagesService, MessagingGetMessagesService,
MessageImportExceptionHandlerService, MessageImportExceptionHandlerService,
MessagingCursorService,
], ],
exports: [], exports: [],
}) })

View File

@ -0,0 +1,103 @@
import { Injectable } from '@nestjs/common';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
import {
MessageImportException,
MessageImportExceptionCode,
} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception';
@Injectable()
export class MessagingCursorService {
constructor(private readonly twentyORMManager: TwentyORMManager) {}
public async getCursor(
messageChannel: MessageChannelWorkspaceEntity,
connectedAccount: ConnectedAccountWorkspaceEntity,
folderId?: string,
): Promise<string> {
const folderRepository =
await this.twentyORMManager.getRepository<MessageFolderWorkspaceEntity>(
'messageFolder',
);
switch (connectedAccount.provider) {
case 'google':
return messageChannel.syncCursor;
case 'microsoft': {
const folder = await folderRepository.findOne({
where: {
id: folderId,
},
});
if (!folder) {
throw new MessageImportException(
`Folder is required to get cursor for ${connectedAccount.provider}`,
MessageImportExceptionCode.FOLDER_ID_REQUIRED,
);
}
return folder.syncCursor;
}
default:
throw new MessageImportException(
`Update Cursor for provider ${connectedAccount.provider} not implemented`,
MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED,
);
}
}
public async updateCursor(
messageChannel: MessageChannelWorkspaceEntity,
nextSyncCursor: string,
folderId?: string,
) {
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
const folderRepository =
await this.twentyORMManager.getRepository<MessageFolderWorkspaceEntity>(
'messageFolder',
);
if (!folderId) {
await messageChannelRepository.update(
{
id: messageChannel.id,
},
{
throttleFailureCount: 0,
syncStageStartedAt: null,
syncCursor:
!messageChannel.syncCursor ||
nextSyncCursor > messageChannel.syncCursor
? nextSyncCursor
: messageChannel.syncCursor,
},
);
} else {
await folderRepository.update(
{
id: folderId,
},
{
syncCursor: nextSyncCursor,
},
);
await messageChannelRepository.update(
{
id: messageChannel.id,
},
{
throttleFailureCount: 0,
syncStageStartedAt: null,
},
);
}
}
}

View File

@ -15,8 +15,8 @@ import {
MessageImportExceptionHandlerService, MessageImportExceptionHandlerService,
MessageImportSyncStep, MessageImportSyncStep,
} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
@Injectable() @Injectable()
export class MessagingFullMessageListFetchService { export class MessagingFullMessageListFetchService {
constructor( constructor(
@ -27,6 +27,7 @@ export class MessagingFullMessageListFetchService {
private readonly messagingGetMessageListService: MessagingGetMessageListService, private readonly messagingGetMessageListService: MessagingGetMessageListService,
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
private readonly messagingMessageCleanerService: MessagingMessageCleanerService, private readonly messagingMessageCleanerService: MessagingMessageCleanerService,
private readonly messagingCursorService: MessagingCursorService,
) {} ) {}
public async processMessageListFetch( public async processMessageListFetch(
@ -39,81 +40,72 @@ export class MessagingFullMessageListFetchService {
[messageChannel.id], [messageChannel.id],
); );
const { messageExternalIds, nextSyncCursor } = const fullMessageLists =
await this.messagingGetMessageListService.getFullMessageList( await this.messagingGetMessageListService.getFullMessageLists(
connectedAccount, messageChannel,
); );
const messageChannelMessageAssociationRepository = for (const fullMessageList of fullMessageLists) {
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>( const { messageExternalIds, nextSyncCursor, folderId } =
'messageChannelMessageAssociation', fullMessageList;
const messageChannelMessageAssociationRepository =
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
'messageChannelMessageAssociation',
);
const existingMessageChannelMessageAssociations =
await messageChannelMessageAssociationRepository.find({
where: {
messageChannelId: messageChannel.id,
},
});
const existingMessageChannelMessageAssociationsExternalIds =
existingMessageChannelMessageAssociations.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageExternalId,
);
const messageExternalIdsToImport = messageExternalIds.filter(
(messageExternalId) =>
!existingMessageChannelMessageAssociationsExternalIds.includes(
messageExternalId,
),
); );
const existingMessageChannelMessageAssociations = const messageExternalIdsToDelete =
await messageChannelMessageAssociationRepository.find({ existingMessageChannelMessageAssociationsExternalIds.filter(
where: { (existingMessageCMAExternalId) =>
existingMessageCMAExternalId &&
!messageExternalIds.includes(existingMessageCMAExternalId),
);
if (messageExternalIdsToDelete.length) {
await messageChannelMessageAssociationRepository.delete({
messageChannelId: messageChannel.id, messageChannelId: messageChannel.id,
}, messageExternalId: In(messageExternalIdsToDelete),
}); });
const existingMessageChannelMessageAssociationsExternalIds = await this.messagingMessageCleanerService.cleanWorkspaceThreads(
existingMessageChannelMessageAssociations.map( workspaceId,
(messageChannelMessageAssociation) => );
messageChannelMessageAssociation.messageExternalId, }
);
const messageExternalIdsToImport = messageExternalIds.filter( if (messageExternalIdsToImport.length) {
(messageExternalId) => await this.cacheStorage.setAdd(
!existingMessageChannelMessageAssociationsExternalIds.includes( `messages-to-import:${workspaceId}:${messageChannel.id}`,
messageExternalId, messageExternalIdsToImport,
), );
); }
const messageExternalIdsToDelete = await this.messagingCursorService.updateCursor(
existingMessageChannelMessageAssociationsExternalIds.filter( messageChannel,
(existingMessageCMAExternalId) => nextSyncCursor,
existingMessageCMAExternalId && folderId,
!messageExternalIds.includes(existingMessageCMAExternalId),
);
if (messageExternalIdsToDelete.length) {
await messageChannelMessageAssociationRepository.delete({
messageChannelId: messageChannel.id,
messageExternalId: In(messageExternalIdsToDelete),
});
await this.messagingMessageCleanerService.cleanWorkspaceThreads(
workspaceId,
); );
} }
if (messageExternalIdsToImport.length) {
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:${messageChannel.id}`,
messageExternalIdsToImport,
);
}
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await messageChannelRepository.update(
{
id: messageChannel.id,
},
{
throttleFailureCount: 0,
syncStageStartedAt: null,
syncCursor:
!messageChannel.syncCursor ||
nextSyncCursor > messageChannel.syncCursor
? nextSyncCursor
: messageChannel.syncCursor,
},
);
await this.messageChannelSyncStatusService.scheduleMessagesImport([ await this.messageChannelSyncStatusService.scheduleMessagesImport([
messageChannel.id, messageChannel.id,
]); ]);

View File

@ -1,75 +1,107 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
import { GmailGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service'; import { GmailGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service';
import { MicrosoftGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service'; import { MicrosoftGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service';
import { import {
MessageImportException, MessageImportException,
MessageImportExceptionCode, MessageImportExceptionCode,
} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
export type GetFullMessageListResponse = { export type GetFullMessageListResponse = {
messageExternalIds: string[]; messageExternalIds: string[];
nextSyncCursor: string; nextSyncCursor: string;
}; };
export type GetFullMessageListForFoldersResponse =
GetFullMessageListResponse & {
folderId: string | undefined;
};
export type GetPartialMessageListResponse = { export type GetPartialMessageListResponse = {
messageExternalIds: string[]; messageExternalIds: string[];
messageExternalIdsToDelete: string[]; messageExternalIdsToDelete: string[];
previousSyncCursor: string;
nextSyncCursor: string; nextSyncCursor: string;
}; };
export type GetPartialMessageListForFoldersResponse =
GetPartialMessageListResponse & {
folderId: string | undefined;
};
@Injectable() @Injectable()
export class MessagingGetMessageListService { export class MessagingGetMessageListService {
constructor( constructor(
private readonly gmailGetMessageListService: GmailGetMessageListService, private readonly gmailGetMessageListService: GmailGetMessageListService,
private readonly microsoftGetMessageListService: MicrosoftGetMessageListService, private readonly microsoftGetMessageListService: MicrosoftGetMessageListService,
private readonly messagingCursorService: MessagingCursorService,
private readonly twentyORMManager: TwentyORMManager,
) {} ) {}
public async getFullMessageList( public async getFullMessageLists(
connectedAccount: Pick< messageChannel: MessageChannelWorkspaceEntity,
ConnectedAccountWorkspaceEntity, ): Promise<GetFullMessageListForFoldersResponse[]> {
'provider' | 'refreshToken' | 'id' | 'handle' switch (messageChannel.connectedAccount.provider) {
>,
): Promise<GetFullMessageListResponse> {
switch (connectedAccount.provider) {
case 'google': case 'google':
return this.gmailGetMessageListService.getFullMessageList( return [
connectedAccount, {
); ...(await this.gmailGetMessageListService.getFullMessageList(
case 'microsoft': messageChannel.connectedAccount,
return this.microsoftGetMessageListService.getFullMessageList( )),
connectedAccount, folderId: undefined,
},
];
case 'microsoft': {
const folderRepository =
await this.twentyORMManager.getRepository<MessageFolderWorkspaceEntity>(
'messageFolder',
);
const folders = await folderRepository.find({
where: {
messageChannelId: messageChannel.id,
},
});
return this.microsoftGetMessageListService.getFullMessageListForFolders(
messageChannel.connectedAccount,
folders,
); );
}
default: default:
throw new MessageImportException( throw new MessageImportException(
`Provider ${connectedAccount.provider} is not supported`, `Provider ${messageChannel.connectedAccount.provider} is not supported`,
MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED,
); );
} }
} }
public async getPartialMessageList( public async getPartialMessageLists(
connectedAccount: Pick< messageChannel: MessageChannelWorkspaceEntity,
ConnectedAccountWorkspaceEntity, ): Promise<GetPartialMessageListForFoldersResponse[]> {
'provider' | 'refreshToken' | 'id' switch (messageChannel.connectedAccount.provider) {
>,
syncCursor: string,
): Promise<GetPartialMessageListResponse> {
switch (connectedAccount.provider) {
case 'google': case 'google':
return this.gmailGetMessageListService.getPartialMessageList( return [
connectedAccount, {
syncCursor, ...(await this.gmailGetMessageListService.getPartialMessageList(
); messageChannel.connectedAccount,
messageChannel.syncCursor,
)),
folderId: undefined,
},
];
case 'microsoft': case 'microsoft':
return this.microsoftGetMessageListService.getPartialMessageList( return this.microsoftGetMessageListService.getPartialMessageListForFolders(
connectedAccount, messageChannel.connectedAccount,
syncCursor, messageChannel,
); );
default: default:
throw new MessageImportException( throw new MessageImportException(
`Provider ${connectedAccount.provider} is not supported`, `Provider ${messageChannel.connectedAccount.provider} is not supported`,
MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED,
); );
} }

View File

@ -15,6 +15,7 @@ import {
MessageImportExceptionHandlerService, MessageImportExceptionHandlerService,
MessageImportSyncStep, MessageImportSyncStep,
} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
@Injectable() @Injectable()
@ -31,6 +32,7 @@ export class MessagingPartialMessageListFetchService {
private readonly twentyORMManager: TwentyORMManager, private readonly twentyORMManager: TwentyORMManager,
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
private readonly messagingMessageCleanerService: MessagingMessageCleanerService, private readonly messagingMessageCleanerService: MessagingMessageCleanerService,
private readonly messagingCursorService: MessagingCursorService,
) {} ) {}
public async processMessageListFetch( public async processMessageListFetch(
@ -57,17 +59,79 @@ export class MessagingPartialMessageListFetchService {
}, },
); );
const syncCursor = messageChannel.syncCursor; const partialMessageLists =
await this.messagingGetMessageListService.getPartialMessageLists(
const { messageExternalIds, messageExternalIdsToDelete, nextSyncCursor } = messageChannel,
await this.messagingGetMessageListService.getPartialMessageList( );
connectedAccount,
syncCursor, for (const partialMessageList of partialMessageLists) {
const {
messageExternalIds,
messageExternalIdsToDelete,
previousSyncCursor,
nextSyncCursor,
folderId,
} = partialMessageList;
const isPartialImportFinished = this.isPartialImportFinished(
previousSyncCursor,
nextSyncCursor,
);
if (isPartialImportFinished) {
this.logger.log(
`Partial message list import done on message channel ${messageChannel.id} in folder ${folderId} for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
continue;
}
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:${messageChannel.id}`,
messageExternalIds,
); );
if (syncCursor === nextSyncCursor) {
this.logger.log( this.logger.log(
`Partial message list import done with history ${syncCursor} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`, `Added ${messageExternalIds.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
const messageChannelMessageAssociationRepository =
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
'messageChannelMessageAssociation',
);
if (messageExternalIdsToDelete.length) {
await messageChannelMessageAssociationRepository.delete({
messageChannelId: messageChannel.id,
messageExternalId: In(messageExternalIdsToDelete),
});
await this.messagingMessageCleanerService.cleanWorkspaceThreads(
workspaceId,
);
}
this.logger.log(
`Deleted ${messageExternalIdsToDelete.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messagingCursorService.updateCursor(
messageChannel,
nextSyncCursor,
folderId,
);
}
const isPartialImportFinishedForAllFolders = partialMessageLists.every(
(partialMessageList) =>
this.isPartialImportFinished(
partialMessageList.previousSyncCursor,
partialMessageList.nextSyncCursor,
),
);
if (isPartialImportFinishedForAllFolders) {
this.logger.log(
`Partial message list import done on message channel ${messageChannel.id} entirely for workspace ${workspaceId} and account ${connectedAccount.id}`,
); );
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
@ -77,46 +141,6 @@ export class MessagingPartialMessageListFetchService {
return; return;
} }
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:${messageChannel.id}`,
messageExternalIds,
);
this.logger.log(
`Added ${messageExternalIds.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
const messageChannelMessageAssociationRepository =
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
'messageChannelMessageAssociation',
);
if (messageExternalIdsToDelete.length) {
await messageChannelMessageAssociationRepository.delete({
messageChannelId: messageChannel.id,
messageExternalId: In(messageExternalIdsToDelete),
});
await this.messagingMessageCleanerService.cleanWorkspaceThreads(
workspaceId,
);
}
this.logger.log(
`Deleted ${messageExternalIdsToDelete.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
if (!syncCursor || nextSyncCursor > syncCursor) {
await messageChannelRepository.update(
{
id: messageChannel.id,
},
{
syncCursor: nextSyncCursor,
},
);
}
await this.messageChannelSyncStatusService.scheduleMessagesImport([ await this.messageChannelSyncStatusService.scheduleMessagesImport([
messageChannel.id, messageChannel.id,
]); ]);
@ -129,4 +153,11 @@ export class MessagingPartialMessageListFetchService {
); );
} }
} }
private isPartialImportFinished(
previousSyncCursor: string,
nextSyncCursor: string,
): boolean {
return previousSyncCursor === nextSyncCursor;
}
} }