Refactor mail folders (#13302)
## Context We would like to start leveraging more messageFolders during messageSync which would allow users to select folders they want to sync on their mailbox. MessageFolder is an abstraction that means folder for Microsoft, labels for Gmail, not supported for IMAP. ## What 1) We do not aim to build a FE now but we would like to take it into a consideration if they have been modified through API. So **out of scope** 2) MessageFolders will be synced regularly from Gmail, Microsoft. This is currently partially done and improving it is **out of scope** 3) Change: we were having two synchronization mechanism so far: FULL_SYNC (first time, or when no cursor is present) and PARTIAL_SYNC (when we have a cursor, we can fetch the diff since the last pull). This Full vs Partial mode was a high level concept. We now think it's an driver implementation detail (and can be inferred from the existence of a cursor). Why making this change now: as we are trying to pull several folders, a folder could need a full sync if it had no cursor, and another a partial if it has one. It does not make sense anymore to have this full vs partial difference at MessageChannel level 4) Once we are sure this work, we can start syncing different folders on Gmail side (the case for the user it to be able to fetch Promotion label) ## Implementation strategy 1) Re-use PartialMessageList implementation / API and rename it to MessageList at it's more complete 2) re-use driver level fullMessageList methods and call them messageListWithoutCursor 3) make sure that these method are folder specific ## Tests ### Gmail - Fresh fetch (without cursor): OK - Message import: OK - Additional fetch (with messageChannel cursor): OK ### Microsoft - Fresh fetch (without cursor): OK - Message import: OK - Additional fetch (with messageChannel cursor): OK ### Imap - Fresh fetch (without cursor): OK - Message import: OK - Additional fetch (with messageChannel cursor): OK
This commit is contained in:
@ -54,7 +54,7 @@ export class BlocklistReimportMessagesJob {
|
||||
},
|
||||
});
|
||||
|
||||
await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch(
|
||||
await this.messagingChannelSyncStatusService.resetAndScheduleMessageListFetch(
|
||||
messageChannels.map((messageChannel) => messageChannel.id),
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
@ -27,7 +27,7 @@ export class MessageChannelSyncStatusService {
|
||||
private readonly metricsService: MetricsService,
|
||||
) {}
|
||||
|
||||
public async scheduleFullMessageListFetch(messageChannelIds: string[]) {
|
||||
public async scheduleMessageListFetch(messageChannelIds: string[]) {
|
||||
if (!messageChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
@ -42,21 +42,6 @@ export class MessageChannelSyncStatusService {
|
||||
});
|
||||
}
|
||||
|
||||
public async schedulePartialMessageListFetch(messageChannelIds: string[]) {
|
||||
if (!messageChannelIds.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
const messageChannelRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||
'messageChannel',
|
||||
);
|
||||
|
||||
await messageChannelRepository.update(messageChannelIds, {
|
||||
syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
|
||||
});
|
||||
}
|
||||
|
||||
public async scheduleMessagesImport(messageChannelIds: string[]) {
|
||||
if (!messageChannelIds.length) {
|
||||
return;
|
||||
@ -72,7 +57,7 @@ export class MessageChannelSyncStatusService {
|
||||
});
|
||||
}
|
||||
|
||||
public async resetAndScheduleFullMessageListFetch(
|
||||
public async resetAndScheduleMessageListFetch(
|
||||
messageChannelIds: string[],
|
||||
workspaceId: string,
|
||||
) {
|
||||
@ -97,7 +82,7 @@ export class MessageChannelSyncStatusService {
|
||||
throttleFailureCount: 0,
|
||||
});
|
||||
|
||||
await this.scheduleFullMessageListFetch(messageChannelIds);
|
||||
await this.scheduleMessageListFetch(messageChannelIds);
|
||||
}
|
||||
|
||||
public async resetSyncStageStartedAt(messageChannelIds: string[]) {
|
||||
@ -132,7 +117,7 @@ export class MessageChannelSyncStatusService {
|
||||
});
|
||||
}
|
||||
|
||||
public async markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
public async markAsCompletedAndScheduleMessageListFetch(
|
||||
messageChannelIds: string[],
|
||||
) {
|
||||
if (!messageChannelIds.length) {
|
||||
@ -146,7 +131,7 @@ export class MessageChannelSyncStatusService {
|
||||
|
||||
await messageChannelRepository.update(messageChannelIds, {
|
||||
syncStatus: MessageChannelSyncStatus.ACTIVE,
|
||||
syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
|
||||
syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
|
||||
throttleFailureCount: 0,
|
||||
syncStageStartedAt: null,
|
||||
syncedAt: new Date().toISOString(),
|
||||
|
||||
@ -31,8 +31,8 @@ export enum MessageChannelSyncStatus {
|
||||
}
|
||||
|
||||
export enum MessageChannelSyncStage {
|
||||
FULL_MESSAGE_LIST_FETCH_PENDING = 'FULL_MESSAGE_LIST_FETCH_PENDING',
|
||||
PARTIAL_MESSAGE_LIST_FETCH_PENDING = 'PARTIAL_MESSAGE_LIST_FETCH_PENDING',
|
||||
FULL_MESSAGE_LIST_FETCH_PENDING = 'FULL_MESSAGE_LIST_FETCH_PENDING', // TODO: rename to MESSAGE_LIST_FETCH_PENDING
|
||||
PARTIAL_MESSAGE_LIST_FETCH_PENDING = 'PARTIAL_MESSAGE_LIST_FETCH_PENDING', // TODO: to be removed, deprecated
|
||||
MESSAGE_LIST_FETCH_ONGOING = 'MESSAGE_LIST_FETCH_ONGOING',
|
||||
MESSAGES_IMPORT_PENDING = 'MESSAGES_IMPORT_PENDING',
|
||||
MESSAGES_IMPORT_ONGOING = 'MESSAGES_IMPORT_ONGOING',
|
||||
@ -291,13 +291,13 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity {
|
||||
icon: 'IconStatusChange',
|
||||
options: [
|
||||
{
|
||||
value: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
|
||||
value: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, // TODO: Rename to MESSAGE_LIST_FETCH_PENDING
|
||||
label: 'Full messages list fetch pending',
|
||||
position: 0,
|
||||
color: 'blue',
|
||||
},
|
||||
{
|
||||
value: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
|
||||
value: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, // TODO: Deprecate
|
||||
label: 'Partial messages list fetch pending',
|
||||
position: 1,
|
||||
color: 'blue',
|
||||
|
||||
@ -52,6 +52,7 @@ export class MessagingMessageListFetchCronJob {
|
||||
activeWorkspace.id,
|
||||
);
|
||||
|
||||
// TODO: deprecate looking for FULL_MESSAGE_LIST_FETCH_PENDING as we introduce MESSAGE_LIST_FETCH_PENDING
|
||||
const messageChannels = await mainDataSource.query(
|
||||
`SELECT * FROM ${schemaName}."messageChannel" WHERE "isSyncEnabled" = true AND "syncStage" IN ('${MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING}', '${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}')`,
|
||||
);
|
||||
|
||||
@ -4,4 +4,5 @@ export enum MessageNetworkExceptionCode {
|
||||
ECONNABORTED = 'ECONNABORTED',
|
||||
ETIMEDOUT = 'ETIMEDOUT',
|
||||
ERR_NETWORK = 'ERR_NETWORK',
|
||||
EHOSTUNREACH = 'EHOSTUNREACH',
|
||||
}
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
import { GmailDefaultMessageCategory } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type';
|
||||
|
||||
export const MESSAGING_GMAIL_EXCLUDED_CATEGORIES = [
|
||||
'promotions',
|
||||
'social',
|
||||
'forums',
|
||||
'updates',
|
||||
GmailDefaultMessageCategory.promotions,
|
||||
GmailDefaultMessageCategory.forums,
|
||||
GmailDefaultMessageCategory.social,
|
||||
GmailDefaultMessageCategory.updates,
|
||||
];
|
||||
|
||||
@ -14,12 +14,13 @@ describe('GmailGetMessageListService', () => {
|
||||
|
||||
const mockConnectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id' | 'handle'
|
||||
'provider' | 'refreshToken' | 'id' | 'handle' | 'connectionParameters'
|
||||
> = {
|
||||
id: 'connected-account-id',
|
||||
provider: ConnectedAccountProvider.GOOGLE,
|
||||
refreshToken: 'refresh-token',
|
||||
handle: 'test@gmail.com',
|
||||
connectionParameters: {},
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
@ -55,7 +56,7 @@ describe('GmailGetMessageListService', () => {
|
||||
gmailClientProvider = module.get<GmailClientProvider>(GmailClientProvider);
|
||||
});
|
||||
|
||||
describe('getFullMessageList', () => {
|
||||
describe('getMessageList', () => {
|
||||
it('should return 0 messageExternalIds when gmail returns 0 messages', async () => {
|
||||
const mockGmailClient = {
|
||||
users: {
|
||||
@ -74,9 +75,13 @@ describe('GmailGetMessageListService', () => {
|
||||
mockGmailClient,
|
||||
);
|
||||
|
||||
const result = await service.getFullMessageList(mockConnectedAccount);
|
||||
const result = await service.getMessageLists({
|
||||
messageChannel: { syncCursor: '', id: 'my-id' },
|
||||
connectedAccount: mockConnectedAccount,
|
||||
messageFolders: [],
|
||||
});
|
||||
|
||||
expect(result.messageExternalIds).toHaveLength(0);
|
||||
expect(result[0].messageExternalIds).toHaveLength(0);
|
||||
expect(mockGmailClient.users.messages.list).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
@ -121,9 +126,13 @@ describe('GmailGetMessageListService', () => {
|
||||
mockGmailClient,
|
||||
);
|
||||
|
||||
const result = await service.getFullMessageList(mockConnectedAccount);
|
||||
const result = await service.getMessageLists({
|
||||
messageChannel: { syncCursor: '', id: 'my-id' },
|
||||
connectedAccount: mockConnectedAccount,
|
||||
messageFolders: [],
|
||||
});
|
||||
|
||||
expect(result.messageExternalIds).toHaveLength(5);
|
||||
expect(result[0].messageExternalIds).toHaveLength(5);
|
||||
});
|
||||
|
||||
it('should return 3 messageExternalIds when gmail provides a nextpagetoken with 2 messages, then 1', async () => {
|
||||
@ -168,9 +177,13 @@ describe('GmailGetMessageListService', () => {
|
||||
mockGmailClient,
|
||||
);
|
||||
|
||||
const result = await service.getFullMessageList(mockConnectedAccount);
|
||||
const result = await service.getMessageLists({
|
||||
messageChannel: { syncCursor: '', id: 'my-id' },
|
||||
connectedAccount: mockConnectedAccount,
|
||||
messageFolders: [],
|
||||
});
|
||||
|
||||
expect(result.messageExternalIds).toHaveLength(3);
|
||||
expect(result[0].messageExternalIds).toHaveLength(3);
|
||||
expect(mockGmailClient.users.messages.list).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
it('should go through while loop once when gmail provides a nextpagetoken but 0 messages - should never happen IRL', async () => {
|
||||
@ -196,9 +209,13 @@ describe('GmailGetMessageListService', () => {
|
||||
mockGmailClient,
|
||||
);
|
||||
|
||||
const result = await service.getFullMessageList(mockConnectedAccount);
|
||||
const result = await service.getMessageLists({
|
||||
messageChannel: { syncCursor: '', id: 'my-id' },
|
||||
connectedAccount: mockConnectedAccount,
|
||||
messageFolders: [],
|
||||
});
|
||||
|
||||
expect(result.messageExternalIds).toHaveLength(0);
|
||||
expect(result[0].messageExternalIds).toHaveLength(0);
|
||||
expect(mockGmailClient.users.messages.list).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
@ -1,8 +1,11 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { isNonEmptyString } from '@sniptt/guards';
|
||||
import { gmail_v1 as gmailV1 } from 'googleapis';
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
|
||||
import {
|
||||
MessageImportDriverException,
|
||||
MessageImportDriverExceptionCode,
|
||||
@ -14,10 +17,9 @@ import { GmailGetHistoryService } from 'src/modules/messaging/message-import-man
|
||||
import { GmailHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-handle-error.service';
|
||||
import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter.util';
|
||||
import { computeGmailCategoryLabelId } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id.util';
|
||||
import {
|
||||
GetFullMessageListResponse,
|
||||
GetPartialMessageListResponse,
|
||||
} from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
import { mapGmailDefaultFolderToCategoryOrUndefined } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/map-gmail-default-folder-to-category';
|
||||
import { GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type';
|
||||
import { GetMessageListsResponse } from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type';
|
||||
import { assertNotNull } from 'src/utils/assert';
|
||||
|
||||
@Injectable()
|
||||
@ -28,12 +30,13 @@ export class GmailGetMessageListService {
|
||||
private readonly gmailHandleErrorService: GmailHandleErrorService,
|
||||
) {}
|
||||
|
||||
public async getFullMessageList(
|
||||
private async getMessageListWithoutCursor(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id' | 'handle'
|
||||
>,
|
||||
): Promise<GetFullMessageListResponse> {
|
||||
messageFolders: Pick<MessageFolderWorkspaceEntity, 'name'>[],
|
||||
): Promise<GetMessageListsResponse> {
|
||||
const gmailClient =
|
||||
await this.gmailClientProvider.getGmailClient(connectedAccount);
|
||||
|
||||
@ -41,6 +44,7 @@ export class GmailGetMessageListService {
|
||||
let hasMoreMessages = true;
|
||||
|
||||
const messageExternalIds: string[] = [];
|
||||
const excludedCategories = this.comptuteExcludedCategories(messageFolders);
|
||||
|
||||
while (hasMoreMessages) {
|
||||
const messageList = await gmailClient.users.messages
|
||||
@ -48,9 +52,7 @@ export class GmailGetMessageListService {
|
||||
userId: 'me',
|
||||
maxResults: MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT,
|
||||
pageToken,
|
||||
q: computeGmailCategoryExcludeSearchFilter(
|
||||
MESSAGING_GMAIL_EXCLUDED_CATEGORIES,
|
||||
),
|
||||
q: computeGmailCategoryExcludeSearchFilter(excludedCategories),
|
||||
})
|
||||
.catch((error) => {
|
||||
this.gmailHandleErrorService.handleGmailMessageListFetchError(error);
|
||||
@ -78,10 +80,15 @@ export class GmailGetMessageListService {
|
||||
}
|
||||
|
||||
if (messageExternalIds.length === 0) {
|
||||
return {
|
||||
messageExternalIds,
|
||||
nextSyncCursor: '',
|
||||
};
|
||||
return [
|
||||
{
|
||||
messageExternalIds,
|
||||
nextSyncCursor: '',
|
||||
previousSyncCursor: '',
|
||||
messageExternalIdsToDelete: [],
|
||||
folderId: undefined,
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
const firstMessageExternalId = messageExternalIds[0];
|
||||
@ -106,28 +113,42 @@ export class GmailGetMessageListService {
|
||||
);
|
||||
}
|
||||
|
||||
return { messageExternalIds, nextSyncCursor };
|
||||
return [
|
||||
{
|
||||
messageExternalIds,
|
||||
nextSyncCursor,
|
||||
previousSyncCursor: '',
|
||||
messageExternalIdsToDelete: [],
|
||||
folderId: undefined,
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
public async getPartialMessageList(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id'
|
||||
>,
|
||||
syncCursor: string,
|
||||
): Promise<GetPartialMessageListResponse> {
|
||||
public async getMessageLists({
|
||||
messageChannel,
|
||||
connectedAccount,
|
||||
messageFolders,
|
||||
}: GetMessageListsArgs): Promise<GetMessageListsResponse> {
|
||||
const gmailClient =
|
||||
await this.gmailClientProvider.getGmailClient(connectedAccount);
|
||||
|
||||
if (!isNonEmptyString(messageChannel.syncCursor)) {
|
||||
return this.getMessageListWithoutCursor(connectedAccount, messageFolders);
|
||||
}
|
||||
|
||||
const { history, historyId: nextSyncCursor } =
|
||||
await this.gmailGetHistoryService.getHistory(gmailClient, syncCursor);
|
||||
await this.gmailGetHistoryService.getHistory(
|
||||
gmailClient,
|
||||
messageChannel.syncCursor,
|
||||
);
|
||||
|
||||
const { messagesAdded, messagesDeleted } =
|
||||
await this.gmailGetHistoryService.getMessageIdsFromHistory(history);
|
||||
|
||||
const messageIdsToFilter = await this.getEmailIdsFromExcludedCategories(
|
||||
gmailClient,
|
||||
syncCursor,
|
||||
messageChannel.syncCursor,
|
||||
messageFolders,
|
||||
);
|
||||
|
||||
const messagesAddedFiltered = messagesAdded.filter(
|
||||
@ -141,21 +162,42 @@ export class GmailGetMessageListService {
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
messageExternalIds: messagesAddedFiltered,
|
||||
messageExternalIdsToDelete: messagesDeleted,
|
||||
previousSyncCursor: syncCursor,
|
||||
nextSyncCursor,
|
||||
};
|
||||
return [
|
||||
{
|
||||
messageExternalIds: messagesAddedFiltered,
|
||||
messageExternalIdsToDelete: messagesDeleted,
|
||||
previousSyncCursor: messageChannel.syncCursor,
|
||||
nextSyncCursor,
|
||||
folderId: undefined,
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
private comptuteExcludedCategories(
|
||||
messageFolders: Pick<MessageFolderWorkspaceEntity, 'name'>[],
|
||||
) {
|
||||
const includedDefaultCategories = messageFolders
|
||||
.map((messageFolder) =>
|
||||
mapGmailDefaultFolderToCategoryOrUndefined(messageFolder.name),
|
||||
)
|
||||
.filter(isDefined);
|
||||
|
||||
return MESSAGING_GMAIL_EXCLUDED_CATEGORIES.filter(
|
||||
(excludedCategory) =>
|
||||
!includedDefaultCategories.includes(excludedCategory),
|
||||
);
|
||||
}
|
||||
|
||||
private async getEmailIdsFromExcludedCategories(
|
||||
gmailClient: gmailV1.Gmail,
|
||||
lastSyncHistoryId: string,
|
||||
messageFolders: Pick<MessageFolderWorkspaceEntity, 'name'>[],
|
||||
): Promise<string[]> {
|
||||
const emailIds: string[] = [];
|
||||
|
||||
for (const category of MESSAGING_GMAIL_EXCLUDED_CATEGORIES) {
|
||||
const excludedCategories = this.comptuteExcludedCategories(messageFolders);
|
||||
|
||||
for (const category of excludedCategories) {
|
||||
const { history } = await this.gmailGetHistoryService.getHistory(
|
||||
gmailClient,
|
||||
lastSyncHistoryId,
|
||||
|
||||
@ -0,0 +1,6 @@
|
||||
export enum GmailDefaultMessageCategory {
|
||||
promotions = 'promotions',
|
||||
social = 'social',
|
||||
forums = 'forums',
|
||||
updates = 'updates',
|
||||
}
|
||||
@ -0,0 +1,6 @@
|
||||
export enum GmailDefaultMessageFolder {
|
||||
CATEGORY_PROMOTIONS = 'CATEGORY_PROMOTIONS',
|
||||
CATEGORY_SOCIAL = 'CATEGORY_SOCIAL',
|
||||
CATEGORY_FORUMS = 'CATEGORY_FORUMS',
|
||||
CATEGORY_UPDATES = 'CATEGORY_UPDATES',
|
||||
}
|
||||
@ -0,0 +1,37 @@
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
|
||||
import { GmailDefaultMessageCategory } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-category.type';
|
||||
import { GmailDefaultMessageFolder } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-default-message-folder.type';
|
||||
|
||||
const DEFAULT_FOLDER_NAME_TO_CATEGORY_MAPPING = [
|
||||
{
|
||||
folderName: GmailDefaultMessageFolder.CATEGORY_FORUMS,
|
||||
category: GmailDefaultMessageCategory.forums,
|
||||
},
|
||||
{
|
||||
folderName: GmailDefaultMessageFolder.CATEGORY_PROMOTIONS,
|
||||
category: GmailDefaultMessageCategory.promotions,
|
||||
},
|
||||
{
|
||||
folderName: GmailDefaultMessageFolder.CATEGORY_SOCIAL,
|
||||
category: GmailDefaultMessageCategory.social,
|
||||
},
|
||||
{
|
||||
folderName: GmailDefaultMessageFolder.CATEGORY_UPDATES,
|
||||
category: GmailDefaultMessageCategory.updates,
|
||||
},
|
||||
];
|
||||
|
||||
export const mapGmailDefaultFolderToCategoryOrUndefined = (
|
||||
messageFolderName: string,
|
||||
) => {
|
||||
const mapping = DEFAULT_FOLDER_NAME_TO_CATEGORY_MAPPING.find(
|
||||
({ folderName }) => folderName === messageFolderName,
|
||||
);
|
||||
|
||||
if (!isDefined(mapping)) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return mapping.category;
|
||||
};
|
||||
@ -17,6 +17,7 @@ export const parseGaxiosError = (
|
||||
case MessageNetworkExceptionCode.ECONNABORTED:
|
||||
case MessageNetworkExceptionCode.ETIMEDOUT:
|
||||
case MessageNetworkExceptionCode.ERR_NETWORK:
|
||||
case MessageNetworkExceptionCode.EHOSTUNREACH:
|
||||
return new MessageImportDriverException(
|
||||
error.message,
|
||||
MessageImportDriverExceptionCode.TEMPORARY_ERROR,
|
||||
|
||||
@ -2,11 +2,11 @@ import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { ImapFlow } from 'imapflow';
|
||||
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { ImapClientProvider } from 'src/modules/messaging/message-import-manager/drivers/imap/providers/imap-client.provider';
|
||||
import { ImapHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-handle-error.service';
|
||||
import { findSentMailbox } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/find-sent-mailbox.util';
|
||||
import { GetFullMessageListResponse } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
import { GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type';
|
||||
import { GetMessageListsResponse } from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type';
|
||||
|
||||
@Injectable()
|
||||
export class ImapGetMessageListService {
|
||||
@ -17,74 +17,10 @@ export class ImapGetMessageListService {
|
||||
private readonly imapHandleErrorService: ImapHandleErrorService,
|
||||
) {}
|
||||
|
||||
async getFullMessageList(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'id' | 'provider' | 'connectionParameters' | 'handle'
|
||||
>,
|
||||
): Promise<GetFullMessageListResponse> {
|
||||
try {
|
||||
const client = await this.imapClientProvider.getClient(connectedAccount);
|
||||
|
||||
const mailboxes = ['INBOX'];
|
||||
|
||||
const sentFolder = await findSentMailbox(client, this.logger);
|
||||
|
||||
if (sentFolder) {
|
||||
mailboxes.push(sentFolder);
|
||||
}
|
||||
|
||||
let allMessages: { id: string; date: string }[] = [];
|
||||
|
||||
for (const mailbox of mailboxes) {
|
||||
try {
|
||||
const messages = await this.getMessagesFromMailbox(client, mailbox);
|
||||
|
||||
allMessages = [...allMessages, ...messages];
|
||||
this.logger.log(
|
||||
`Fetched ${messages.length} messages from ${mailbox}`,
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Error fetching from mailbox ${mailbox}: ${error.message}. Continuing with other mailboxes.`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
allMessages.sort(
|
||||
(a, b) => new Date(b.date).getTime() - new Date(a.date).getTime(),
|
||||
);
|
||||
|
||||
const messageExternalIds = allMessages.map((message) => message.id);
|
||||
|
||||
const nextSyncCursor =
|
||||
allMessages.length > 0 ? allMessages[allMessages.length - 1].date : '';
|
||||
|
||||
return {
|
||||
messageExternalIds,
|
||||
nextSyncCursor,
|
||||
};
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error getting message list: ${error.message}`,
|
||||
error.stack,
|
||||
);
|
||||
|
||||
this.imapHandleErrorService.handleImapMessageListFetchError(error);
|
||||
|
||||
return { messageExternalIds: [], nextSyncCursor: '' };
|
||||
} finally {
|
||||
await this.imapClientProvider.closeClient(connectedAccount.id);
|
||||
}
|
||||
}
|
||||
|
||||
async getPartialMessageList(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'id' | 'provider' | 'connectionParameters' | 'handle'
|
||||
>,
|
||||
syncCursor?: string,
|
||||
): Promise<{ messageExternalIds: string[]; nextSyncCursor: string }> {
|
||||
async getMessageLists({
|
||||
messageChannel,
|
||||
connectedAccount,
|
||||
}: GetMessageListsArgs): Promise<GetMessageListsResponse> {
|
||||
try {
|
||||
const client = await this.imapClientProvider.getClient(connectedAccount);
|
||||
|
||||
@ -103,7 +39,7 @@ export class ImapGetMessageListService {
|
||||
const messages = await this.getMessagesFromMailbox(
|
||||
client,
|
||||
mailbox,
|
||||
syncCursor,
|
||||
messageChannel.syncCursor,
|
||||
);
|
||||
|
||||
allMessages = [...allMessages, ...messages];
|
||||
@ -126,12 +62,17 @@ export class ImapGetMessageListService {
|
||||
const nextSyncCursor =
|
||||
allMessages.length > 0
|
||||
? allMessages[allMessages.length - 1].date
|
||||
: syncCursor || '';
|
||||
: messageChannel.syncCursor || '';
|
||||
|
||||
return {
|
||||
messageExternalIds,
|
||||
nextSyncCursor,
|
||||
};
|
||||
return [
|
||||
{
|
||||
messageExternalIds,
|
||||
nextSyncCursor,
|
||||
previousSyncCursor: messageChannel.syncCursor,
|
||||
messageExternalIdsToDelete: [],
|
||||
folderId: undefined,
|
||||
},
|
||||
];
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error getting message list: ${error.message}`,
|
||||
@ -140,7 +81,15 @@ export class ImapGetMessageListService {
|
||||
|
||||
this.imapHandleErrorService.handleImapMessageListFetchError(error);
|
||||
|
||||
return { messageExternalIds: [], nextSyncCursor: syncCursor || '' };
|
||||
return [
|
||||
{
|
||||
messageExternalIds: [],
|
||||
nextSyncCursor: messageChannel.syncCursor || '',
|
||||
previousSyncCursor: messageChannel.syncCursor,
|
||||
messageExternalIdsToDelete: [],
|
||||
folderId: undefined,
|
||||
},
|
||||
];
|
||||
} finally {
|
||||
await this.imapClientProvider.closeClient(connectedAccount.id);
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import { ConnectedAccountProvider } from 'twenty-shared/types';
|
||||
|
||||
import { TwentyConfigModule } from 'src/engine/core-modules/twenty-config/twenty-config.module';
|
||||
import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service';
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
|
||||
import { microsoftGraphWithMessagesDeltaLink } from 'src/modules/messaging/message-import-manager/drivers/microsoft/mocks/microsoft-api-examples';
|
||||
@ -17,10 +18,23 @@ import { MicrosoftHandleErrorService } from './microsoft-handle-error.service';
|
||||
// in case you have "Please provide a valid token" it may be because you need to pass the env varible to the .env.test file
|
||||
const refreshToken = 'replace-with-your-refresh-token';
|
||||
const syncCursor = `replace-with-your-sync-cursor`;
|
||||
const mockConnectedAccount = {
|
||||
const mockConnectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id' | 'handle' | 'connectionParameters'
|
||||
> = {
|
||||
id: 'connected-account-id',
|
||||
provider: ConnectedAccountProvider.MICROSOFT,
|
||||
refreshToken: refreshToken,
|
||||
handle: 'test@gmail.com',
|
||||
connectionParameters: {},
|
||||
};
|
||||
|
||||
const mockMessageChannel: Pick<
|
||||
MessageChannelWorkspaceEntity,
|
||||
'id' | 'syncCursor'
|
||||
> = {
|
||||
id: 'message-channel-id',
|
||||
syncCursor: '', // Should be empty for Microsoft as cursors are stored at the folder level
|
||||
};
|
||||
|
||||
xdescribe('Microsoft dev tests : get message list service', () => {
|
||||
@ -44,12 +58,19 @@ xdescribe('Microsoft dev tests : get message list service', () => {
|
||||
});
|
||||
|
||||
it('Should fetch and return message list successfully', async () => {
|
||||
const result = await service.getFullMessageList(
|
||||
mockConnectedAccount,
|
||||
MessageFolderName.INBOX,
|
||||
);
|
||||
const result = await service.getMessageLists({
|
||||
connectedAccount: mockConnectedAccount,
|
||||
messageChannel: mockMessageChannel,
|
||||
messageFolders: [
|
||||
{
|
||||
id: 'inbox-folder-id',
|
||||
name: MessageFolderName.INBOX,
|
||||
syncCursor: 'inbox-sync-cursor',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
expect(result.messageExternalIds.length).toBeGreaterThan(0);
|
||||
expect(result[0].messageExternalIds.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it('Should throw token error', async () => {
|
||||
@ -57,113 +78,65 @@ xdescribe('Microsoft dev tests : get message list service', () => {
|
||||
id: 'connected-account-id',
|
||||
provider: ConnectedAccountProvider.MICROSOFT,
|
||||
refreshToken: 'invalid-token',
|
||||
handle: 'test@microsoft.com',
|
||||
connectionParameters: {},
|
||||
};
|
||||
|
||||
await expect(
|
||||
service.getFullMessageList(
|
||||
mockConnectedAccountUnvalid,
|
||||
MessageFolderName.INBOX,
|
||||
),
|
||||
service.getMessageLists({
|
||||
connectedAccount: mockConnectedAccountUnvalid,
|
||||
messageChannel: mockMessageChannel,
|
||||
messageFolders: [
|
||||
{
|
||||
id: 'inbox-folder-id',
|
||||
name: MessageFolderName.INBOX,
|
||||
syncCursor: 'inbox-sync-cursor',
|
||||
},
|
||||
],
|
||||
}),
|
||||
).rejects.toThrowError('Access token is undefined or empty');
|
||||
});
|
||||
|
||||
// if you need to run this test, you need to manually update the syncCursor to a valid one
|
||||
xit('Should fetch and return partial message list successfully', async () => {
|
||||
const result = await service.getPartialMessageList(
|
||||
mockConnectedAccount,
|
||||
syncCursor,
|
||||
);
|
||||
const result = await service.getMessageLists({
|
||||
connectedAccount: mockConnectedAccount,
|
||||
messageChannel: mockMessageChannel,
|
||||
messageFolders: [
|
||||
{
|
||||
id: 'inbox-folder-id',
|
||||
name: MessageFolderName.INBOX,
|
||||
syncCursor: syncCursor,
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
expect(result.nextSyncCursor).toBeTruthy();
|
||||
expect(result[0].nextSyncCursor).toBeTruthy();
|
||||
});
|
||||
|
||||
it('Should fail partial message if syncCursor is invalid', async () => {
|
||||
await expect(
|
||||
service.getPartialMessageList(mockConnectedAccount, 'invalid-syncCursor'),
|
||||
service.getMessageLists({
|
||||
messageChannel: {
|
||||
id: 'message-channel-id',
|
||||
syncCursor: '',
|
||||
},
|
||||
connectedAccount: mockConnectedAccount,
|
||||
messageFolders: [
|
||||
{
|
||||
id: 'inbox-folder-id',
|
||||
name: MessageFolderName.INBOX,
|
||||
syncCursor: 'invalid-syncCursor',
|
||||
},
|
||||
],
|
||||
}),
|
||||
).rejects.toThrowError(
|
||||
/Resource not found for the segment|Badly formed content/g,
|
||||
);
|
||||
});
|
||||
|
||||
it('Should fail partial message if syncCursor is missing', async () => {
|
||||
await expect(
|
||||
service.getPartialMessageList(mockConnectedAccount, ''),
|
||||
).rejects.toThrowError(/Missing SyncCursor/g);
|
||||
});
|
||||
});
|
||||
|
||||
xdescribe('Microsoft dev tests : get full message list service for folders', () => {
|
||||
let service: MicrosoftGetMessageListService;
|
||||
|
||||
const inboxFolder = new MessageFolderWorkspaceEntity();
|
||||
|
||||
inboxFolder.id = 'inbox-folder-id';
|
||||
inboxFolder.name = MessageFolderName.INBOX;
|
||||
inboxFolder.syncCursor = 'inbox-sync-cursor';
|
||||
inboxFolder.messageChannelId = 'message-channel-1';
|
||||
|
||||
const sentFolder = new MessageFolderWorkspaceEntity();
|
||||
|
||||
sentFolder.id = 'sent-folder-id';
|
||||
sentFolder.name = MessageFolderName.SENT_ITEMS;
|
||||
sentFolder.syncCursor = 'sent-sync-cursor';
|
||||
sentFolder.messageChannelId = 'message-channel-1';
|
||||
|
||||
const otherFolder = new MessageFolderWorkspaceEntity();
|
||||
|
||||
otherFolder.id = 'other-folder-id';
|
||||
otherFolder.name = 'other';
|
||||
otherFolder.syncCursor = 'other-sync-cursor';
|
||||
otherFolder.messageChannelId = 'message-channel-2';
|
||||
|
||||
beforeEach(async () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
imports: [TwentyConfigModule.forRoot()],
|
||||
providers: [
|
||||
MicrosoftGetMessageListService,
|
||||
MicrosoftClientProvider,
|
||||
MicrosoftHandleErrorService,
|
||||
MicrosoftOAuth2ClientManagerService,
|
||||
ConfigService,
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<MicrosoftGetMessageListService>(
|
||||
MicrosoftGetMessageListService,
|
||||
);
|
||||
});
|
||||
|
||||
it('Should return empty array', async () => {
|
||||
const result = await service.getFullMessageListForFolders(
|
||||
mockConnectedAccount,
|
||||
[],
|
||||
);
|
||||
|
||||
expect(result.length).toBe(0);
|
||||
});
|
||||
|
||||
it('Should return an array of one item', async () => {
|
||||
const result = await service.getFullMessageListForFolders(
|
||||
mockConnectedAccount,
|
||||
[inboxFolder],
|
||||
);
|
||||
|
||||
expect(result.length).toBe(1);
|
||||
expect(result[0].folderId).toBe(inboxFolder.id);
|
||||
expect(result[0].messageExternalIds.length).toBeGreaterThan(0);
|
||||
});
|
||||
|
||||
it('Should return an array of two items', async () => {
|
||||
const result = await service.getFullMessageListForFolders(
|
||||
mockConnectedAccount,
|
||||
[inboxFolder, sentFolder],
|
||||
);
|
||||
|
||||
expect(result.length).toBe(2);
|
||||
});
|
||||
});
|
||||
|
||||
xdescribe('Microsoft dev tests : get partial message list service for folders', () => {
|
||||
xdescribe('Microsoft dev tests : get message list service for folders', () => {
|
||||
let service: MicrosoftGetMessageListService;
|
||||
|
||||
const inboxFolder = new MessageFolderWorkspaceEntity();
|
||||
@ -234,19 +207,21 @@ xdescribe('Microsoft dev tests : get partial message list service for folders',
|
||||
});
|
||||
|
||||
it('Should return empty array', async () => {
|
||||
const result = await service.getPartialMessageListForFolders(
|
||||
mockConnectedAccount,
|
||||
messageChannelNoFolders,
|
||||
);
|
||||
const result = await service.getMessageLists({
|
||||
messageChannel: messageChannelNoFolders,
|
||||
connectedAccount: mockConnectedAccount,
|
||||
messageFolders: [],
|
||||
});
|
||||
|
||||
expect(result.length).toBe(0);
|
||||
});
|
||||
|
||||
it('Should return an array of one items', async () => {
|
||||
const result = await service.getPartialMessageListForFolders(
|
||||
mockConnectedAccount,
|
||||
messageChannelMicrosoftOneFolder,
|
||||
);
|
||||
const result = await service.getMessageLists({
|
||||
messageChannel: messageChannelMicrosoftOneFolder,
|
||||
connectedAccount: mockConnectedAccount,
|
||||
messageFolders: [inboxFolder],
|
||||
});
|
||||
|
||||
expect(result.length).toBe(1);
|
||||
expect(result[0].folderId).toBe(inboxFolder.id);
|
||||
@ -254,10 +229,11 @@ xdescribe('Microsoft dev tests : get partial message list service for folders',
|
||||
});
|
||||
|
||||
it('Should return an array of two items', async () => {
|
||||
const result = await service.getPartialMessageListForFolders(
|
||||
mockConnectedAccount,
|
||||
messageChannelMicrosoft,
|
||||
);
|
||||
const result = await service.getMessageLists({
|
||||
messageChannel: messageChannelMicrosoft,
|
||||
connectedAccount: mockConnectedAccount,
|
||||
messageFolders: [inboxFolder, sentFolder],
|
||||
});
|
||||
|
||||
expect(result.length).toBe(2);
|
||||
});
|
||||
|
||||
@ -5,11 +5,11 @@ import {
|
||||
PageIterator,
|
||||
PageIteratorCallback,
|
||||
} from '@microsoft/microsoft-graph-client';
|
||||
import { isNonEmptyString } from '@sniptt/guards';
|
||||
import { v4 } from 'uuid';
|
||||
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
|
||||
import {
|
||||
MessageImportDriverException,
|
||||
@ -19,12 +19,12 @@ import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-ma
|
||||
import { MicrosoftHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-handle-error.service';
|
||||
import { MessageFolderName } from 'src/modules/messaging/message-import-manager/drivers/microsoft/types/folders';
|
||||
import { isAccessTokenRefreshingError } from 'src/modules/messaging/message-import-manager/drivers/microsoft/utils/is-access-token-refreshing-error.utils';
|
||||
import { GetMessageListsArgs } from 'src/modules/messaging/message-import-manager/types/get-message-lists-args.type';
|
||||
import {
|
||||
GetFullMessageListForFoldersResponse,
|
||||
GetFullMessageListResponse,
|
||||
GetPartialMessageListForFoldersResponse,
|
||||
GetPartialMessageListResponse,
|
||||
} from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
GetMessageListsResponse,
|
||||
GetOneMessageListResponse,
|
||||
} from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type';
|
||||
|
||||
// Microsoft API limit is 999 messages per request on this endpoint
|
||||
const MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT = 999;
|
||||
|
||||
@ -36,101 +36,14 @@ export class MicrosoftGetMessageListService {
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
) {}
|
||||
|
||||
public async getFullMessageListForFolders(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'refreshToken' | 'id'
|
||||
>,
|
||||
folders: Pick<MessageFolderWorkspaceEntity, 'id' | 'name'>[],
|
||||
): Promise<GetFullMessageListForFoldersResponse[]> {
|
||||
const result: GetFullMessageListForFoldersResponse[] = [];
|
||||
public async getMessageLists({
|
||||
messageChannel,
|
||||
connectedAccount,
|
||||
messageFolders,
|
||||
}: GetMessageListsArgs): Promise<GetMessageListsResponse> {
|
||||
const result: GetMessageListsResponse = [];
|
||||
|
||||
for (const folder of folders) {
|
||||
const response = await this.getFullMessageList(
|
||||
connectedAccount,
|
||||
folder.name as MessageFolderName,
|
||||
);
|
||||
|
||||
result.push({
|
||||
...response,
|
||||
folderId: folder.id,
|
||||
});
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public async getFullMessageList(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'refreshToken' | 'id'
|
||||
>,
|
||||
folderName: MessageFolderName,
|
||||
): Promise<GetFullMessageListResponse> {
|
||||
const messageExternalIds: string[] = [];
|
||||
|
||||
const microsoftClient =
|
||||
await this.microsoftClientProvider.getMicrosoftClient(connectedAccount);
|
||||
|
||||
const response: PageCollection = await microsoftClient
|
||||
.api(`/me/mailfolders/${folderName}/messages/delta?$select=id`)
|
||||
.version('beta')
|
||||
.headers({
|
||||
Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`,
|
||||
})
|
||||
.get()
|
||||
.catch((error) => {
|
||||
if (isAccessTokenRefreshingError(error?.body)) {
|
||||
throw new MessageImportDriverException(
|
||||
error.message,
|
||||
MessageImportDriverExceptionCode.CLIENT_NOT_AVAILABLE,
|
||||
);
|
||||
}
|
||||
this.microsoftHandleErrorService.handleMicrosoftGetMessageListError(
|
||||
error,
|
||||
);
|
||||
});
|
||||
|
||||
const callback: PageIteratorCallback = (data) => {
|
||||
messageExternalIds.push(data.id);
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
const pageIterator = new PageIterator(microsoftClient, response, callback, {
|
||||
headers: {
|
||||
Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`,
|
||||
},
|
||||
});
|
||||
|
||||
await pageIterator.iterate().catch((error) => {
|
||||
if (isAccessTokenRefreshingError(error?.body)) {
|
||||
throw new MessageImportDriverException(
|
||||
error.message,
|
||||
MessageImportDriverExceptionCode.CLIENT_NOT_AVAILABLE,
|
||||
);
|
||||
}
|
||||
this.microsoftHandleErrorService.handleMicrosoftGetMessageListError(
|
||||
error,
|
||||
);
|
||||
});
|
||||
|
||||
return {
|
||||
messageExternalIds: messageExternalIds,
|
||||
nextSyncCursor: pageIterator.getDeltaLink() || '',
|
||||
};
|
||||
}
|
||||
|
||||
public async getPartialMessageListForFolders(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id'
|
||||
>,
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
): Promise<GetPartialMessageListForFoldersResponse[]> {
|
||||
const result: GetPartialMessageListForFoldersResponse[] = [];
|
||||
|
||||
if (messageChannel.messageFolders.length === 0) {
|
||||
if (messageFolders.length === 0) {
|
||||
// permanent solution:
|
||||
// throw new MessageImportDriverException(
|
||||
// `Message channel ${messageChannel.id} has no message folders`,
|
||||
@ -158,10 +71,10 @@ export class MicrosoftGetMessageListService {
|
||||
syncCursor: messageChannel.syncCursor,
|
||||
});
|
||||
|
||||
const response = await this.getPartialMessageList(
|
||||
connectedAccount,
|
||||
messageChannel.syncCursor,
|
||||
);
|
||||
const response = await this.getMessageList(connectedAccount, {
|
||||
name: MessageFolderName.INBOX,
|
||||
syncCursor: messageChannel.syncCursor,
|
||||
});
|
||||
|
||||
result.push({
|
||||
...response,
|
||||
@ -173,11 +86,8 @@ export class MicrosoftGetMessageListService {
|
||||
return result;
|
||||
}
|
||||
|
||||
for (const folder of messageChannel.messageFolders) {
|
||||
const response = await this.getPartialMessageList(
|
||||
connectedAccount,
|
||||
folder.syncCursor,
|
||||
);
|
||||
for (const folder of messageFolders) {
|
||||
const response = await this.getMessageList(connectedAccount, folder);
|
||||
|
||||
result.push({
|
||||
...response,
|
||||
@ -188,29 +98,25 @@ export class MicrosoftGetMessageListService {
|
||||
return result;
|
||||
}
|
||||
|
||||
public async getPartialMessageList(
|
||||
public async getMessageList(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id'
|
||||
>,
|
||||
syncCursor: string,
|
||||
): Promise<GetPartialMessageListResponse> {
|
||||
// important: otherwise tries to get the full message list
|
||||
if (!syncCursor) {
|
||||
throw new MessageImportDriverException(
|
||||
'Missing SyncCursor',
|
||||
MessageImportDriverExceptionCode.SYNC_CURSOR_ERROR,
|
||||
);
|
||||
}
|
||||
|
||||
messageFolder: Pick<MessageFolderWorkspaceEntity, 'name' | 'syncCursor'>,
|
||||
): Promise<GetOneMessageListResponse> {
|
||||
const messageExternalIds: string[] = [];
|
||||
const messageExternalIdsToDelete: string[] = [];
|
||||
|
||||
const microsoftClient =
|
||||
await this.microsoftClientProvider.getMicrosoftClient(connectedAccount);
|
||||
|
||||
const apiUrl = isNonEmptyString(messageFolder.syncCursor)
|
||||
? messageFolder.syncCursor
|
||||
: `/me/mailfolders/${messageFolder.name}/messages/delta?$select=id`;
|
||||
|
||||
const response: PageCollection = await microsoftClient
|
||||
.api(syncCursor)
|
||||
.api(apiUrl)
|
||||
.version('beta')
|
||||
.headers({
|
||||
Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`,
|
||||
@ -259,8 +165,9 @@ export class MicrosoftGetMessageListService {
|
||||
return {
|
||||
messageExternalIds,
|
||||
messageExternalIdsToDelete,
|
||||
previousSyncCursor: syncCursor,
|
||||
previousSyncCursor: messageFolder.syncCursor,
|
||||
nextSyncCursor: pageIterator.getDeltaLink() || '',
|
||||
folderId: undefined,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@ -12,12 +12,11 @@ import {
|
||||
MessageChannelWorkspaceEntity,
|
||||
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service';
|
||||
import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service';
|
||||
import {
|
||||
MessageImportExceptionHandlerService,
|
||||
MessageImportSyncStep,
|
||||
} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
|
||||
import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service';
|
||||
import { MessagingMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service';
|
||||
import { MessagingMonitoringService } from 'src/modules/messaging/monitoring/services/messaging-monitoring.service';
|
||||
|
||||
export type MessagingMessageListFetchJobData = {
|
||||
@ -31,8 +30,7 @@ export type MessagingMessageListFetchJobData = {
|
||||
})
|
||||
export class MessagingMessageListFetchJob {
|
||||
constructor(
|
||||
private readonly messagingFullMessageListFetchService: MessagingFullMessageListFetchService,
|
||||
private readonly messagingPartialMessageListFetchService: MessagingPartialMessageListFetchService,
|
||||
private readonly messagingMessageListFetchService: MessagingMessageListFetchService,
|
||||
private readonly messagingMonitoringService: MessagingMonitoringService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly connectedAccountRefreshTokensService: ConnectedAccountRefreshTokensService,
|
||||
@ -88,29 +86,7 @@ export class MessagingMessageListFetchJob {
|
||||
);
|
||||
|
||||
switch (messageChannel.syncStage) {
|
||||
case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'partial_message_list_fetch.started',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
|
||||
await this.messagingPartialMessageListFetchService.processMessageListFetch(
|
||||
messageChannel,
|
||||
messageChannel.connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'partial_message_list_fetch.completed',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccount.id,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
|
||||
break;
|
||||
|
||||
case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING: // TODO: deprecate as we introduce MESSAGE_LIST_FETCH_PENDING
|
||||
case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING:
|
||||
await this.messagingMonitoringService.track({
|
||||
eventName: 'full_message_list_fetch.started',
|
||||
@ -119,7 +95,7 @@ export class MessagingMessageListFetchJob {
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
|
||||
await this.messagingFullMessageListFetchService.processMessageListFetch(
|
||||
await this.messagingMessageListFetchService.processMessageListFetch(
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
@ -139,7 +115,7 @@ export class MessagingMessageListFetchJob {
|
||||
} catch (error) {
|
||||
await this.messageImportErrorHandlerService.handleDriverException(
|
||||
error,
|
||||
MessageImportSyncStep.FULL_OR_PARTIAL_MESSAGE_LIST_FETCH,
|
||||
MessageImportSyncStep.MESSAGE_LIST_FETCH,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
@ -61,7 +61,7 @@ export class MessagingOngoingStaleJob {
|
||||
|
||||
switch (messageChannel.syncStage) {
|
||||
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING:
|
||||
await this.messageChannelSyncStatusService.schedulePartialMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.scheduleMessageListFetch(
|
||||
[messageChannel.id],
|
||||
);
|
||||
break;
|
||||
|
||||
@ -31,13 +31,12 @@ import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-m
|
||||
import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener';
|
||||
import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service';
|
||||
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
|
||||
import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service';
|
||||
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service';
|
||||
import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
|
||||
import { MessagingMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service';
|
||||
import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service';
|
||||
import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service';
|
||||
import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service';
|
||||
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service';
|
||||
import { MessagingSendMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-send-message.service';
|
||||
import { MessageParticipantManagerModule } from 'src/modules/messaging/message-participant-manager/message-participant-manager.module';
|
||||
@ -78,8 +77,7 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess
|
||||
MessagingMessageImportManagerMessageChannelListener,
|
||||
MessagingCleanCacheJob,
|
||||
MessagingMessageService,
|
||||
MessagingPartialMessageListFetchService,
|
||||
MessagingFullMessageListFetchService,
|
||||
MessagingMessageListFetchService,
|
||||
MessagingMessagesImportService,
|
||||
MessagingSaveMessagesAndEnqueueContactCreationService,
|
||||
MessagingGetMessageListService,
|
||||
|
||||
@ -2,9 +2,7 @@ import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { ConnectedAccountProvider } from 'twenty-shared/types';
|
||||
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
|
||||
import { GmailGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service';
|
||||
import { ImapGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service';
|
||||
import { MicrosoftGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service';
|
||||
@ -12,29 +10,7 @@ import {
|
||||
MessageImportException,
|
||||
MessageImportExceptionCode,
|
||||
} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception';
|
||||
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
|
||||
|
||||
export type GetFullMessageListResponse = {
|
||||
messageExternalIds: string[];
|
||||
nextSyncCursor: string;
|
||||
};
|
||||
|
||||
export type GetFullMessageListForFoldersResponse =
|
||||
GetFullMessageListResponse & {
|
||||
folderId: string | undefined;
|
||||
};
|
||||
|
||||
export type GetPartialMessageListResponse = {
|
||||
messageExternalIds: string[];
|
||||
messageExternalIdsToDelete: string[];
|
||||
previousSyncCursor: string;
|
||||
nextSyncCursor: string;
|
||||
};
|
||||
|
||||
export type GetPartialMessageListForFoldersResponse =
|
||||
GetPartialMessageListResponse & {
|
||||
folderId: string | undefined;
|
||||
};
|
||||
import { GetMessageListsResponse } from 'src/modules/messaging/message-import-manager/types/get-message-lists-response.type';
|
||||
|
||||
@Injectable()
|
||||
export class MessagingGetMessageListService {
|
||||
@ -42,100 +18,30 @@ export class MessagingGetMessageListService {
|
||||
private readonly gmailGetMessageListService: GmailGetMessageListService,
|
||||
private readonly microsoftGetMessageListService: MicrosoftGetMessageListService,
|
||||
private readonly imapGetMessageListService: ImapGetMessageListService,
|
||||
private readonly messagingCursorService: MessagingCursorService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
) {}
|
||||
|
||||
public async getFullMessageLists(
|
||||
public async getMessageLists(
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
): Promise<GetFullMessageListForFoldersResponse[]> {
|
||||
switch (messageChannel.connectedAccount.provider) {
|
||||
case ConnectedAccountProvider.GOOGLE: {
|
||||
const fullMessageList =
|
||||
await this.gmailGetMessageListService.getFullMessageList(
|
||||
messageChannel.connectedAccount,
|
||||
);
|
||||
|
||||
return [
|
||||
{
|
||||
...fullMessageList,
|
||||
folderId: undefined,
|
||||
},
|
||||
];
|
||||
}
|
||||
case ConnectedAccountProvider.MICROSOFT: {
|
||||
const folderRepository =
|
||||
await this.twentyORMManager.getRepository<MessageFolderWorkspaceEntity>(
|
||||
'messageFolder',
|
||||
);
|
||||
|
||||
const folders = await folderRepository.find({
|
||||
where: {
|
||||
messageChannelId: messageChannel.id,
|
||||
},
|
||||
});
|
||||
|
||||
return this.microsoftGetMessageListService.getFullMessageListForFolders(
|
||||
messageChannel.connectedAccount,
|
||||
folders,
|
||||
);
|
||||
}
|
||||
case ConnectedAccountProvider.IMAP_SMTP_CALDAV: {
|
||||
const fullMessageList =
|
||||
await this.imapGetMessageListService.getFullMessageList(
|
||||
messageChannel.connectedAccount,
|
||||
);
|
||||
|
||||
return [
|
||||
{
|
||||
...fullMessageList,
|
||||
folderId: undefined,
|
||||
},
|
||||
];
|
||||
}
|
||||
default:
|
||||
throw new MessageImportException(
|
||||
`Provider ${messageChannel.connectedAccount.provider} is not supported`,
|
||||
MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public async getPartialMessageLists(
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
): Promise<GetPartialMessageListForFoldersResponse[]> {
|
||||
): Promise<GetMessageListsResponse> {
|
||||
switch (messageChannel.connectedAccount.provider) {
|
||||
case ConnectedAccountProvider.GOOGLE:
|
||||
return [
|
||||
{
|
||||
...(await this.gmailGetMessageListService.getPartialMessageList(
|
||||
messageChannel.connectedAccount,
|
||||
messageChannel.syncCursor,
|
||||
)),
|
||||
folderId: undefined,
|
||||
},
|
||||
];
|
||||
case ConnectedAccountProvider.MICROSOFT:
|
||||
return this.microsoftGetMessageListService.getPartialMessageListForFolders(
|
||||
messageChannel.connectedAccount,
|
||||
return await this.gmailGetMessageListService.getMessageLists({
|
||||
messageChannel,
|
||||
);
|
||||
connectedAccount: messageChannel.connectedAccount,
|
||||
messageFolders: messageChannel.messageFolders,
|
||||
});
|
||||
case ConnectedAccountProvider.MICROSOFT:
|
||||
return this.microsoftGetMessageListService.getMessageLists({
|
||||
messageChannel,
|
||||
connectedAccount: messageChannel.connectedAccount,
|
||||
messageFolders: messageChannel.messageFolders,
|
||||
});
|
||||
case ConnectedAccountProvider.IMAP_SMTP_CALDAV: {
|
||||
const messageList =
|
||||
await this.imapGetMessageListService.getPartialMessageList(
|
||||
messageChannel.connectedAccount,
|
||||
messageChannel.syncCursor,
|
||||
);
|
||||
|
||||
return [
|
||||
{
|
||||
messageExternalIds: messageList.messageExternalIds,
|
||||
messageExternalIdsToDelete: [],
|
||||
previousSyncCursor: messageChannel.syncCursor || '',
|
||||
nextSyncCursor: messageList.nextSyncCursor || '',
|
||||
folderId: undefined,
|
||||
},
|
||||
];
|
||||
return await this.imapGetMessageListService.getMessageLists({
|
||||
messageChannel,
|
||||
connectedAccount: messageChannel.connectedAccount,
|
||||
messageFolders: messageChannel.messageFolders,
|
||||
});
|
||||
}
|
||||
default:
|
||||
throw new MessageImportException(
|
||||
|
||||
@ -21,9 +21,9 @@ import {
|
||||
} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception';
|
||||
|
||||
export enum MessageImportSyncStep {
|
||||
FULL_MESSAGE_LIST_FETCH = 'FULL_MESSAGE_LIST_FETCH',
|
||||
PARTIAL_MESSAGE_LIST_FETCH = 'PARTIAL_MESSAGE_LIST_FETCH',
|
||||
FULL_OR_PARTIAL_MESSAGE_LIST_FETCH = 'FULL_OR_PARTIAL_MESSAGE_LIST_FETCH',
|
||||
FULL_MESSAGE_LIST_FETCH = 'FULL_MESSAGE_LIST_FETCH', // TODO: deprecate to only use MESSAGE_LIST_FETCH
|
||||
PARTIAL_MESSAGE_LIST_FETCH = 'PARTIAL_MESSAGE_LIST_FETCH', // TODO: deprecate to only use MESSAGE_LIST_FETCH
|
||||
MESSAGE_LIST_FETCH = 'MESSAGE_LIST_FETCH',
|
||||
MESSAGES_IMPORT_PENDING = 'MESSAGES_IMPORT_PENDING',
|
||||
MESSAGES_IMPORT_ONGOING = 'MESSAGES_IMPORT_ONGOING',
|
||||
}
|
||||
@ -145,16 +145,11 @@ export class MessageImportExceptionHandlerService {
|
||||
|
||||
switch (syncStep) {
|
||||
case MessageImportSyncStep.FULL_MESSAGE_LIST_FETCH:
|
||||
await this.messageChannelSyncStatusService.scheduleFullMessageListFetch(
|
||||
[messageChannel.id],
|
||||
);
|
||||
await this.messageChannelSyncStatusService.scheduleMessageListFetch([
|
||||
messageChannel.id,
|
||||
]);
|
||||
break;
|
||||
|
||||
case MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH:
|
||||
await this.messageChannelSyncStatusService.schedulePartialMessageListFetch(
|
||||
[messageChannel.id],
|
||||
);
|
||||
break;
|
||||
case MessageImportSyncStep.MESSAGES_IMPORT_PENDING:
|
||||
case MessageImportSyncStep.MESSAGES_IMPORT_ONGOING:
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport([
|
||||
@ -233,7 +228,7 @@ export class MessageImportExceptionHandlerService {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.resetAndScheduleMessageListFetch(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
@ -10,12 +10,12 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/stan
|
||||
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
|
||||
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
|
||||
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
|
||||
import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service';
|
||||
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
|
||||
import { MessagingMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-message-list-fetch.service';
|
||||
|
||||
describe('MessagingFullMessageListFetchService', () => {
|
||||
let messagingFullMessageListFetchService: MessagingFullMessageListFetchService;
|
||||
describe('MessagingMessageListFetchService', () => {
|
||||
let messagingMessageListFetchService: MessagingMessageListFetchService;
|
||||
let messagingGetMessageListService: MessagingGetMessageListService;
|
||||
let messageChannelSyncStatusService: MessageChannelSyncStatusService;
|
||||
let twentyORMManager: TwentyORMManager;
|
||||
@ -72,7 +72,7 @@ describe('MessagingFullMessageListFetchService', () => {
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
MessagingFullMessageListFetchService,
|
||||
MessagingMessageListFetchService,
|
||||
{
|
||||
provide: CacheStorageNamespace.ModuleMessaging,
|
||||
useValue: {
|
||||
@ -82,12 +82,11 @@ describe('MessagingFullMessageListFetchService', () => {
|
||||
{
|
||||
provide: MessagingGetMessageListService,
|
||||
useValue: {
|
||||
getFullMessageLists: jest
|
||||
getMessageLists: jest
|
||||
.fn()
|
||||
.mockImplementation((messageChannel) => {
|
||||
.mockImplementation(({ connectedAccount }) => {
|
||||
if (
|
||||
messageChannel.connectedAccount.provider ===
|
||||
ConnectedAccountProvider.GOOGLE
|
||||
connectedAccount.provider === ConnectedAccountProvider.GOOGLE
|
||||
) {
|
||||
return [
|
||||
{
|
||||
@ -162,9 +161,9 @@ describe('MessagingFullMessageListFetchService', () => {
|
||||
],
|
||||
}).compile();
|
||||
|
||||
messagingFullMessageListFetchService =
|
||||
module.get<MessagingFullMessageListFetchService>(
|
||||
MessagingFullMessageListFetchService,
|
||||
messagingMessageListFetchService =
|
||||
module.get<MessagingMessageListFetchService>(
|
||||
MessagingMessageListFetchService,
|
||||
);
|
||||
messagingGetMessageListService = module.get<MessagingGetMessageListService>(
|
||||
MessagingGetMessageListService,
|
||||
@ -180,7 +179,7 @@ describe('MessagingFullMessageListFetchService', () => {
|
||||
});
|
||||
|
||||
it('should process Microsoft message list fetch correctly', async () => {
|
||||
await messagingFullMessageListFetchService.processMessageListFetch(
|
||||
await messagingMessageListFetchService.processMessageListFetch(
|
||||
mockMicrosoftMessageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
@ -189,9 +188,9 @@ describe('MessagingFullMessageListFetchService', () => {
|
||||
messageChannelSyncStatusService.markAsMessagesListFetchOngoing,
|
||||
).toHaveBeenCalledWith([mockMicrosoftMessageChannel.id]);
|
||||
|
||||
expect(
|
||||
messagingGetMessageListService.getFullMessageLists,
|
||||
).toHaveBeenCalledWith(mockMicrosoftMessageChannel);
|
||||
expect(messagingGetMessageListService.getMessageLists).toHaveBeenCalledWith(
|
||||
mockMicrosoftMessageChannel,
|
||||
);
|
||||
|
||||
expect(twentyORMManager.getRepository).toHaveBeenCalledWith(
|
||||
'messageChannelMessageAssociation',
|
||||
@ -209,7 +208,7 @@ describe('MessagingFullMessageListFetchService', () => {
|
||||
});
|
||||
|
||||
it('should process Google message list fetch correctly', async () => {
|
||||
await messagingFullMessageListFetchService.processMessageListFetch(
|
||||
await messagingMessageListFetchService.processMessageListFetch(
|
||||
mockGoogleMessageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
@ -218,9 +217,9 @@ describe('MessagingFullMessageListFetchService', () => {
|
||||
messageChannelSyncStatusService.markAsMessagesListFetchOngoing,
|
||||
).toHaveBeenCalledWith([mockGoogleMessageChannel.id]);
|
||||
|
||||
expect(
|
||||
messagingGetMessageListService.getFullMessageLists,
|
||||
).toHaveBeenCalledWith(mockGoogleMessageChannel);
|
||||
expect(messagingGetMessageListService.getMessageLists).toHaveBeenCalledWith(
|
||||
mockGoogleMessageChannel,
|
||||
);
|
||||
|
||||
expect(twentyORMManager.getRepository).toHaveBeenCalledWith(
|
||||
'messageChannelMessageAssociation',
|
||||
@ -17,7 +17,7 @@ import {
|
||||
MessageImportSyncStep,
|
||||
} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
|
||||
@Injectable()
|
||||
export class MessagingFullMessageListFetchService {
|
||||
export class MessagingMessageListFetchService {
|
||||
constructor(
|
||||
@InjectCacheStorage(CacheStorageNamespace.ModuleMessaging)
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
@ -38,17 +38,17 @@ export class MessagingFullMessageListFetchService {
|
||||
[messageChannel.id],
|
||||
);
|
||||
|
||||
const fullMessageLists =
|
||||
await this.messagingGetMessageListService.getFullMessageLists(
|
||||
const messageLists =
|
||||
await this.messagingGetMessageListService.getMessageLists(
|
||||
messageChannel,
|
||||
);
|
||||
|
||||
const isEmptyMailbox = fullMessageLists.some(
|
||||
(fullMessageList) => fullMessageList.messageExternalIds.length === 0,
|
||||
const isEmptyMailbox = messageLists.some(
|
||||
(messageList) => messageList.messageExternalIds.length === 0,
|
||||
);
|
||||
|
||||
if (isEmptyMailbox) {
|
||||
await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.resetAndScheduleMessageListFetch(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
@ -56,9 +56,8 @@ export class MessagingFullMessageListFetchService {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const fullMessageList of fullMessageLists) {
|
||||
const { messageExternalIds, nextSyncCursor, folderId } =
|
||||
fullMessageList;
|
||||
for (const messageList of messageLists) {
|
||||
const { messageExternalIds, nextSyncCursor, folderId } = messageList;
|
||||
|
||||
const messageChannelMessageAssociationRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
|
||||
@ -66,7 +66,7 @@ describe('MessagingMessagesImportService', () => {
|
||||
provide: MessageChannelSyncStatusService,
|
||||
useValue: {
|
||||
markAsMessagesImportOngoing: jest.fn().mockResolvedValue(undefined),
|
||||
markAsCompletedAndSchedulePartialMessageListFetch: jest
|
||||
markAsCompletedAndScheduleMessageListFetch: jest
|
||||
.fn()
|
||||
.mockResolvedValue(undefined),
|
||||
scheduleMessagesImport: jest.fn().mockResolvedValue(undefined),
|
||||
@ -191,7 +191,7 @@ describe('MessagingMessagesImportService', () => {
|
||||
|
||||
it('should fails if SyncStage is not MESSAGES_IMPORT_PENDING', async () => {
|
||||
mockMessageChannel.syncStage =
|
||||
MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING;
|
||||
MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING;
|
||||
|
||||
expect(
|
||||
service.processMessageBatchImport(
|
||||
|
||||
@ -88,7 +88,7 @@ export class MessagingMessagesImportService {
|
||||
);
|
||||
|
||||
if (!messageIdsToFetch?.length) {
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndScheduleMessageListFetch(
|
||||
[messageChannel.id],
|
||||
);
|
||||
|
||||
@ -125,7 +125,7 @@ export class MessagingMessagesImportService {
|
||||
if (
|
||||
messageIdsToFetch.length < MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE
|
||||
) {
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndScheduleMessageListFetch(
|
||||
[messageChannel.id],
|
||||
);
|
||||
} else {
|
||||
|
||||
@ -1,144 +0,0 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { In } from 'typeorm';
|
||||
|
||||
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
|
||||
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
|
||||
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
|
||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
|
||||
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
|
||||
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
import {
|
||||
MessageImportExceptionHandlerService,
|
||||
MessageImportSyncStep,
|
||||
} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
|
||||
|
||||
@Injectable()
|
||||
export class MessagingPartialMessageListFetchService {
|
||||
constructor(
|
||||
@InjectCacheStorage(CacheStorageNamespace.ModuleMessaging)
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
private readonly messagingGetMessageListService: MessagingGetMessageListService,
|
||||
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
|
||||
private readonly messagingMessageCleanerService: MessagingMessageCleanerService,
|
||||
private readonly messagingCursorService: MessagingCursorService,
|
||||
) {}
|
||||
|
||||
public async processMessageListFetch(
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
connectedAccount: ConnectedAccountWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
try {
|
||||
await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing(
|
||||
[messageChannel.id],
|
||||
);
|
||||
|
||||
const messageChannelRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||
'messageChannel',
|
||||
);
|
||||
|
||||
await messageChannelRepository.update(
|
||||
{
|
||||
id: messageChannel.id,
|
||||
},
|
||||
{
|
||||
throttleFailureCount: 0,
|
||||
},
|
||||
);
|
||||
|
||||
const partialMessageLists =
|
||||
await this.messagingGetMessageListService.getPartialMessageLists(
|
||||
messageChannel,
|
||||
);
|
||||
|
||||
for (const partialMessageList of partialMessageLists) {
|
||||
const {
|
||||
messageExternalIds,
|
||||
messageExternalIdsToDelete,
|
||||
previousSyncCursor,
|
||||
nextSyncCursor,
|
||||
folderId,
|
||||
} = partialMessageList;
|
||||
|
||||
const isPartialImportFinished = this.isPartialImportFinished(
|
||||
previousSyncCursor,
|
||||
nextSyncCursor,
|
||||
);
|
||||
|
||||
if (isPartialImportFinished) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await this.cacheStorage.setAdd(
|
||||
`messages-to-import:${workspaceId}:${messageChannel.id}`,
|
||||
messageExternalIds,
|
||||
);
|
||||
|
||||
const messageChannelMessageAssociationRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
|
||||
'messageChannelMessageAssociation',
|
||||
);
|
||||
|
||||
if (messageExternalIdsToDelete.length) {
|
||||
await messageChannelMessageAssociationRepository.delete({
|
||||
messageChannelId: messageChannel.id,
|
||||
messageExternalId: In(messageExternalIdsToDelete),
|
||||
});
|
||||
|
||||
await this.messagingMessageCleanerService.cleanWorkspaceThreads(
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
await this.messagingCursorService.updateCursor(
|
||||
messageChannel,
|
||||
nextSyncCursor,
|
||||
folderId,
|
||||
);
|
||||
}
|
||||
|
||||
const isPartialImportFinishedForAllFolders = partialMessageLists.every(
|
||||
(partialMessageList) =>
|
||||
this.isPartialImportFinished(
|
||||
partialMessageList.previousSyncCursor,
|
||||
partialMessageList.nextSyncCursor,
|
||||
),
|
||||
);
|
||||
|
||||
if (isPartialImportFinishedForAllFolders) {
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
[messageChannel.id],
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport([
|
||||
messageChannel.id,
|
||||
]);
|
||||
} catch (error) {
|
||||
await this.messageImportErrorHandlerService.handleDriverException(
|
||||
error,
|
||||
MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private isPartialImportFinished(
|
||||
previousSyncCursor: string,
|
||||
nextSyncCursor: string,
|
||||
): boolean {
|
||||
return previousSyncCursor === nextSyncCursor;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
|
||||
|
||||
export type GetMessageListsArgs = {
|
||||
messageChannel: Pick<MessageChannelWorkspaceEntity, 'syncCursor' | 'id'>;
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id' | 'handle' | 'connectionParameters'
|
||||
>;
|
||||
messageFolders: Pick<
|
||||
MessageFolderWorkspaceEntity,
|
||||
'name' | 'syncCursor' | 'id'
|
||||
>[];
|
||||
};
|
||||
@ -0,0 +1,9 @@
|
||||
export type GetOneMessageListResponse = {
|
||||
messageExternalIds: string[];
|
||||
messageExternalIdsToDelete: string[];
|
||||
previousSyncCursor: string;
|
||||
nextSyncCursor: string;
|
||||
folderId: string | undefined;
|
||||
};
|
||||
|
||||
export type GetMessageListsResponse = Array<GetOneMessageListResponse>;
|
||||
Reference in New Issue
Block a user