Rework messaging modules (#5710)

In this PR, I'm refactoring the messaging module into smaller pieces
that have **ONE** responsibility: import messages, clean messages,
handle message participant creation, instead of having ~30 modules (1
per service, jobs, cron, ...). This is mandatory to start introducing
drivers (gmails, office365, ...) IMO. It is too difficult to enforce
common interfaces as we have too many interfaces (30 modules...). All
modules should not be exposed

Right now, we have services that are almost functions:
do-that-and-this.service.ts / do-that-and-this.module.ts
I believe we should have something more organized at a high level and it
does not matter that much if we have a bit of code duplicates.

Note that the proposal is not fully implemented in the current PR that
has only focused on messaging folder (biggest part)

Here is the high level proposal:
- connected-account: token-refresher
- blocklist
- messaging: message-importer, message-cleaner, message-participants,
... (right now I'm keeping a big messaging-common but this will
disappear see below)
- calendar: calendar-importer, calendar-cleaner, ...

Consequences:
1) It's OK to re-implement several times some things. Example:
- error handling in connected-account, messaging, and calendar instead
of trying to unify. They are actually different error handling. The only
things that might be in common is the GmailError => CommonError parsing
and I'm not even sure it makes a lot of sense as these 3 apis might have
different format actually
- auto-creation. Calendar and Messaging could actually have different
rules

2) **We should not have circular dependencies:** 
- I believe this was the reason why we had so many modules, to be able
to cherry pick the one we wanted to avoid circular deps. This is not the
right approach IMO, we need architect the whole messaging by defining
high level blocks that won't have circular dependencies by design. If we
encounter one, we should rethink and break the block in a way that makes
sense.
- ex: connected-account.resolver is not in the same module as
token-refresher. ==> connected-account.resolver => message-importer (as
we trigger full sync job when we connect an account) => token-refresher
(as we refresh token on message import).

connected-account.resolver and token-refresher both in connected-account
folder but should be in different modules. Otherwise it's a circular
dependency. It does not mean that we should create 1 module per service
as it was done before

In a nutshell: The code needs to be thought in term of reponsibilities
and in a way that enforce high level interfaces (and avoid circular
dependencies)

Bonus: As you can see, this code is also removing a lot of code because
of the removal of many .module.ts (also because I'm removing the sync
scripts v2 feature flag end removing old code)
Bonus: I have prefixed services name with Messaging to improve dev xp.
GmailErrorHandler could be different between MessagingGmailErrorHandler
and CalendarGmailErrorHandler for instance
This commit is contained in:
Charles Bochet
2024-06-03 11:16:05 +02:00
committed by GitHub
parent c4b6b1e076
commit eab8deb211
121 changed files with 690 additions and 2065 deletions

View File

@ -0,0 +1,5 @@
export const MESSAGING_GMAIL_EXCLUDED_CATEGORIES = [
'promotions',
'social',
'forums',
];

View File

@ -0,0 +1 @@
export const MESSAGING_GMAIL_USERS_HISTORY_MAX_RESULT = 500;

View File

@ -0,0 +1 @@
export const MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE = 20;

View File

@ -0,0 +1 @@
export const MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT = 500;

View File

@ -0,0 +1,53 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { EnvironmentModule } from 'src/engine/integrations/environment/environment.module';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module';
import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider';
import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service';
import { MessagingGmailFetchMessageIdsToExcludeService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-ids-to-exclude.service';
import { MessagingGmailFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service';
import { MessagingGmailHistoryService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service';
import { MessagingGmailMessagesImportService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service';
import { MessagingGmailPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service';
@Module({
imports: [
GoogleAPIRefreshAccessTokenModule,
EnvironmentModule,
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
MessageChannelWorkspaceEntity,
MessageChannelMessageAssociationWorkspaceEntity,
BlocklistWorkspaceEntity,
]),
MessagingCommonModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
],
providers: [
MessagingGmailClientProvider,
MessagingGmailHistoryService,
MessagingGmailFetchMessagesByBatchesService,
MessagingGmailPartialMessageListFetchService,
MessagingGmailFullMessageListFetchService,
MessagingGmailMessagesImportService,
MessagingGmailFetchMessageIdsToExcludeService,
],
exports: [
MessagingGmailClientProvider,
MessagingGmailHistoryService,
MessagingGmailFetchMessagesByBatchesService,
MessagingGmailPartialMessageListFetchService,
MessagingGmailFullMessageListFetchService,
MessagingGmailMessagesImportService,
MessagingGmailFetchMessageIdsToExcludeService,
],
})
export class MessagingGmailDriverModule {}

View File

@ -0,0 +1,40 @@
import { Injectable } from '@nestjs/common';
import { OAuth2Client } from 'google-auth-library';
import { gmail_v1, google } from 'googleapis';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
@Injectable()
export class MessagingGmailClientProvider {
constructor(private readonly environmentService: EnvironmentService) {}
public async getGmailClient(refreshToken: string): Promise<gmail_v1.Gmail> {
const oAuth2Client = await this.getOAuth2Client(refreshToken);
const gmailClient = google.gmail({
version: 'v1',
auth: oAuth2Client,
});
return gmailClient;
}
private async getOAuth2Client(refreshToken: string): Promise<OAuth2Client> {
const gmailClientId = this.environmentService.get('AUTH_GOOGLE_CLIENT_ID');
const gmailClientSecret = this.environmentService.get(
'AUTH_GOOGLE_CLIENT_SECRET',
);
const oAuth2Client = new google.auth.OAuth2(
gmailClientId,
gmailClientSecret,
);
oAuth2Client.setCredentials({
refresh_token: refreshToken,
});
return oAuth2Client;
}
}

View File

@ -0,0 +1,246 @@
import { Injectable, Logger } from '@nestjs/common';
import { AxiosResponse } from 'axios';
import planer from 'planer';
import addressparser from 'addressparser';
import { gmail_v1 } from 'googleapis';
import { assert, assertNotNull } from 'src/utils/assert';
import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
import { MessageQuery } from 'src/modules/messaging/message-import-manager/types/message-or-thread-query';
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util';
import { MessagingFetchByBatchesService } from 'src/modules/messaging/common/services/messaging-fetch-by-batch.service';
@Injectable()
export class MessagingGmailFetchMessagesByBatchesService {
private readonly logger = new Logger(
MessagingGmailFetchMessagesByBatchesService.name,
);
constructor(
private readonly fetchByBatchesService: MessagingFetchByBatchesService,
) {}
async fetchAllMessages(
queries: MessageQuery[],
accessToken: string,
workspaceId: string,
connectedAccountId: string,
): Promise<GmailMessage[]> {
let startTime = Date.now();
const batchResponses = await this.fetchByBatchesService.fetchAllByBatches(
queries,
accessToken,
'batch_gmail_messages',
);
let endTime = Date.now();
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} fetching ${
queries.length
} messages in ${endTime - startTime}ms`,
);
startTime = Date.now();
const formattedResponse = this.formatBatchResponsesAsGmailMessages(
batchResponses,
workspaceId,
connectedAccountId,
);
endTime = Date.now();
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} formatting ${
queries.length
} messages in ${endTime - startTime}ms`,
);
return formattedResponse;
}
private formatBatchResponseAsGmailMessage(
responseCollection: AxiosResponse<any, any>,
workspaceId: string,
connectedAccountId: string,
): GmailMessage[] {
const parsedResponses =
this.fetchByBatchesService.parseBatch(responseCollection);
const sanitizeString = (str: string) => {
return str.replace(/\0/g, '');
};
const formattedResponse = parsedResponses.map(
(response): GmailMessage | null => {
if ('error' in response) {
if (response.error.code === 404) {
return null;
}
throw response.error;
}
const {
historyId,
id,
threadId,
internalDate,
subject,
from,
to,
cc,
bcc,
headerMessageId,
text,
attachments,
deliveredTo,
} = this.parseGmailMessage(response);
if (!from) {
this.logger.log(
`From value is missing while importing message in workspace ${workspaceId} and account ${connectedAccountId}`,
);
return null;
}
if (!to && !deliveredTo && !bcc && !cc) {
this.logger.log(
`To, Delivered-To, Bcc or Cc value is missing while importing message in workspace ${workspaceId} and account ${connectedAccountId}`,
);
return null;
}
const participants = [
...formatAddressObjectAsParticipants(from, 'from'),
...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'),
...formatAddressObjectAsParticipants(cc, 'cc'),
...formatAddressObjectAsParticipants(bcc, 'bcc'),
];
let textWithoutReplyQuotations = text;
if (text) {
textWithoutReplyQuotations = planer.extractFrom(text, 'text/plain');
}
const messageFromGmail: GmailMessage = {
historyId,
externalId: id,
headerMessageId,
subject: subject || '',
messageThreadExternalId: threadId,
internalDate,
fromHandle: from[0].address || '',
fromDisplayName: from[0].name || '',
participants,
text: sanitizeString(textWithoutReplyQuotations || ''),
attachments,
};
return messageFromGmail;
},
);
const filteredMessages = formattedResponse.filter((message) =>
assertNotNull(message),
) as GmailMessage[];
return filteredMessages;
}
private formatBatchResponsesAsGmailMessages(
batchResponses: AxiosResponse<any, any>[],
workspaceId: string,
connectedAccountId: string,
): GmailMessage[] {
const messageBatches = batchResponses.map((response) => {
return this.formatBatchResponseAsGmailMessage(
response,
workspaceId,
connectedAccountId,
);
});
return messageBatches.flat();
}
private parseGmailMessage(message: gmail_v1.Schema$Message) {
const subject = this.getPropertyFromHeaders(message, 'Subject');
const rawFrom = this.getPropertyFromHeaders(message, 'From');
const rawTo = this.getPropertyFromHeaders(message, 'To');
const rawDeliveredTo = this.getPropertyFromHeaders(message, 'Delivered-To');
const rawCc = this.getPropertyFromHeaders(message, 'Cc');
const rawBcc = this.getPropertyFromHeaders(message, 'Bcc');
const messageId = this.getPropertyFromHeaders(message, 'Message-ID');
const id = message.id;
const threadId = message.threadId;
const historyId = message.historyId;
const internalDate = message.internalDate;
assert(id);
assert(messageId);
assert(threadId);
assert(historyId);
assert(internalDate);
const bodyData = this.getBodyData(message);
const text = bodyData ? Buffer.from(bodyData, 'base64').toString() : '';
const attachments = this.getAttachmentData(message);
return {
id,
headerMessageId: messageId,
threadId,
historyId,
internalDate,
subject,
from: rawFrom ? addressparser(rawFrom) : undefined,
deliveredTo: rawDeliveredTo ? addressparser(rawDeliveredTo) : undefined,
to: rawTo ? addressparser(rawTo) : undefined,
cc: rawCc ? addressparser(rawCc) : undefined,
bcc: rawBcc ? addressparser(rawBcc) : undefined,
text,
attachments,
};
}
private getBodyData(message: gmail_v1.Schema$Message) {
const firstPart = message.payload?.parts?.[0];
if (firstPart?.mimeType === 'text/plain') {
return firstPart?.body?.data;
}
return firstPart?.parts?.find((part) => part.mimeType === 'text/plain')
?.body?.data;
}
private getAttachmentData(message: gmail_v1.Schema$Message) {
return (
message.payload?.parts
?.filter((part) => part.filename && part.body?.attachmentId)
.map((part) => ({
filename: part.filename || '',
id: part.body?.attachmentId || '',
mimeType: part.mimeType || '',
size: part.body?.size || 0,
})) || []
);
}
private getPropertyFromHeaders(
message: gmail_v1.Schema$Message,
property: string,
) {
const header = message.payload?.headers?.find(
(header) => header.name?.toLowerCase() === property.toLowerCase(),
);
return header?.value;
}
}

View File

@ -0,0 +1,46 @@
import { Injectable } from '@nestjs/common';
import { gmail_v1 } from 'googleapis';
import { MESSAGING_GMAIL_EXCLUDED_CATEGORIES } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories';
import { MessagingGmailHistoryService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service';
import { computeGmailCategoryLabelId } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id';
import { assertNotNull } from 'src/utils/assert';
@Injectable()
export class MessagingGmailFetchMessageIdsToExcludeService {
constructor(
private readonly gmailGetHistoryService: MessagingGmailHistoryService,
) {}
public async fetchEmailIdsToExcludeOrThrow(
gmailClient: gmail_v1.Gmail,
lastSyncHistoryId: string,
): Promise<string[]> {
const emailIds: string[] = [];
for (const category of MESSAGING_GMAIL_EXCLUDED_CATEGORIES) {
const { history, error } = await this.gmailGetHistoryService.getHistory(
gmailClient,
lastSyncHistoryId,
['messageAdded'],
computeGmailCategoryLabelId(category),
);
if (error) {
throw error;
}
const emailIdsFromCategory = history
.map((history) => history.messagesAdded)
.flat()
.map((message) => message?.message?.id)
.filter((id) => id)
.filter(assertNotNull);
emailIds.push(...emailIdsFromCategory);
}
return emailIds;
}
}

View File

@ -0,0 +1,215 @@
import { Injectable, Logger } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { gmail_v1 } from 'googleapis';
import { GaxiosResponse } from 'gaxios';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
GmailError,
MessagingErrorHandlingService,
} from 'src/modules/messaging/common/services/messaging-error-handling.service';
import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider';
import { MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-list-max-result.constant';
import { MESSAGING_GMAIL_EXCLUDED_CATEGORIES } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories';
@Injectable()
export class MessagingGmailFullMessageListFetchService {
private readonly logger = new Logger(
MessagingGmailFullMessageListFetchService.name,
);
constructor(
private readonly gmailClientProvider: MessagingGmailClientProvider,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
) {}
public async processMessageListFetch(
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>,
workspaceId: string,
) {
await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing(
messageChannel.id,
workspaceId,
);
const gmailClient: gmail_v1.Gmail =
await this.gmailClientProvider.getGmailClient(
connectedAccount.refreshToken,
);
const { error: gmailError } =
await this.fetchAllMessageIdsFromGmailAndStoreInCache(
gmailClient,
messageChannel.id,
workspaceId,
);
if (gmailError) {
await this.gmailErrorHandlingService.handleGmailError(
gmailError,
'full-message-list-fetch',
messageChannel,
workspaceId,
);
return;
}
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
}
private async fetchAllMessageIdsFromGmailAndStoreInCache(
gmailClient: gmail_v1.Gmail,
messageChannelId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<{ error?: GmailError }> {
let pageToken: string | undefined;
let fetchedMessageIdsCount = 0;
let hasMoreMessages = true;
let firstMessageExternalId: string | undefined;
let response: GaxiosResponse<gmail_v1.Schema$ListMessagesResponse>;
while (hasMoreMessages) {
try {
response = await gmailClient.users.messages.list({
userId: 'me',
maxResults: MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT,
pageToken,
q: computeGmailCategoryExcludeSearchFilter(
MESSAGING_GMAIL_EXCLUDED_CATEGORIES,
),
});
} catch (error) {
return {
error: {
code: error.response?.status,
reason: error.response?.data?.error,
},
};
}
if (response.data?.messages) {
const messageExternalIds = response.data.messages
.filter((message): message is { id: string } => message.id != null)
.map((message) => message.id);
if (!firstMessageExternalId) {
firstMessageExternalId = messageExternalIds[0];
}
const existingMessageChannelMessageAssociations =
await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId(
messageExternalIds,
messageChannelId,
workspaceId,
transactionManager,
);
const existingMessageChannelMessageAssociationsExternalIds =
existingMessageChannelMessageAssociations.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageExternalId,
);
const messageIdsToImport = messageExternalIds.filter(
(messageExternalId) =>
!existingMessageChannelMessageAssociationsExternalIds.includes(
messageExternalId,
),
);
if (messageIdsToImport.length) {
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
messageIdsToImport,
);
}
fetchedMessageIdsCount += messageExternalIds.length;
}
pageToken = response.data.nextPageToken ?? undefined;
hasMoreMessages = !!pageToken;
}
this.logger.log(
`Added ${fetchedMessageIdsCount} messages ids from Gmail for messageChannel ${messageChannelId} in workspace ${workspaceId} and added to cache for import`,
);
if (!firstMessageExternalId) {
throw new Error(
`No first message found for workspace ${workspaceId} and account ${messageChannelId}, can't update sync external id`,
);
}
await this.updateLastSyncCursor(
gmailClient,
messageChannelId,
firstMessageExternalId,
workspaceId,
transactionManager,
);
return {};
}
private async updateLastSyncCursor(
gmailClient: gmail_v1.Gmail,
messageChannelId: string,
firstMessageExternalId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const firstMessageContent = await gmailClient.users.messages.get({
userId: 'me',
id: firstMessageExternalId,
});
if (!firstMessageContent?.data) {
throw new Error(
`No first message content found for message ${firstMessageExternalId} in workspace ${workspaceId}`,
);
}
const historyId = firstMessageContent?.data?.historyId;
if (!historyId) {
throw new Error(
`No historyId found for message ${firstMessageExternalId} in workspace ${workspaceId}`,
);
}
await this.messageChannelRepository.updateLastSyncCursorIfHigher(
messageChannelId,
historyId,
workspaceId,
transactionManager,
);
}
}

View File

@ -0,0 +1,106 @@
import { Injectable } from '@nestjs/common';
import { gmail_v1 } from 'googleapis';
import { GaxiosResponse } from 'gaxios';
import { GmailError } from 'src/modules/messaging/common/services/messaging-error-handling.service';
import { MESSAGING_GMAIL_USERS_HISTORY_MAX_RESULT } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-history-max-result.constant';
@Injectable()
export class MessagingGmailHistoryService {
constructor() {}
public async getHistory(
gmailClient: gmail_v1.Gmail,
lastSyncHistoryId: string,
historyTypes?: ('messageAdded' | 'messageDeleted')[],
labelId?: string,
): Promise<{
history: gmail_v1.Schema$History[];
historyId?: string | null;
error?: GmailError;
}> {
const fullHistory: gmail_v1.Schema$History[] = [];
let pageToken: string | undefined;
let hasMoreMessages = true;
let nextHistoryId: string | undefined;
let response: GaxiosResponse<gmail_v1.Schema$ListHistoryResponse>;
while (hasMoreMessages) {
try {
response = await gmailClient.users.history.list({
userId: 'me',
maxResults: MESSAGING_GMAIL_USERS_HISTORY_MAX_RESULT,
pageToken,
startHistoryId: lastSyncHistoryId,
historyTypes: historyTypes || ['messageAdded', 'messageDeleted'],
labelId,
});
} catch (error) {
return {
history: [],
error: {
code: error.response?.status,
reason: error.response?.data?.error,
},
historyId: lastSyncHistoryId,
};
}
nextHistoryId = response?.data?.historyId ?? undefined;
if (response?.data?.history) {
fullHistory.push(...response.data.history);
}
pageToken = response?.data?.nextPageToken ?? undefined;
hasMoreMessages = !!pageToken;
}
return { history: fullHistory, historyId: nextHistoryId };
}
public async getMessageIdsFromHistory(
history: gmail_v1.Schema$History[],
): Promise<{
messagesAdded: string[];
messagesDeleted: string[];
}> {
const { messagesAdded, messagesDeleted } = history.reduce(
(
acc: {
messagesAdded: string[];
messagesDeleted: string[];
},
history,
) => {
const messagesAdded = history.messagesAdded?.map(
(messageAdded) => messageAdded.message?.id || '',
);
const messagesDeleted = history.messagesDeleted?.map(
(messageDeleted) => messageDeleted.message?.id || '',
);
if (messagesAdded) acc.messagesAdded.push(...messagesAdded);
if (messagesDeleted) acc.messagesDeleted.push(...messagesDeleted);
return acc;
},
{ messagesAdded: [], messagesDeleted: [] },
);
const uniqueMessagesAdded = messagesAdded.filter(
(messageId) => !messagesDeleted.includes(messageId),
);
const uniqueMessagesDeleted = messagesDeleted.filter(
(messageId) => !messagesAdded.includes(messageId),
);
return {
messagesAdded: uniqueMessagesAdded,
messagesDeleted: uniqueMessagesDeleted,
};
}
}

View File

@ -0,0 +1,175 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { BlocklistRepository } from 'src/modules/connected-account/repositories/blocklist.repository';
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncSubStatus,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { createQueriesFromMessageIds } from 'src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util';
import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant';
import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service';
import { MessagingErrorHandlingService } from 'src/modules/messaging/common/services/messaging-error-handling.service';
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service';
@Injectable()
export class MessagingGmailMessagesImportService {
private readonly logger = new Logger(
MessagingGmailMessagesImportService.name,
);
constructor(
private readonly fetchMessagesByBatchesService: MessagingGmailFetchMessagesByBatchesService,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly saveMessagesAndEnqueueContactCreationService: MessagingSaveMessagesAndEnqueueContactCreationService,
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
private readonly messagingTelemetryService: MessagingTelemetryService,
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
private readonly blocklistRepository: BlocklistRepository,
) {}
async processMessageBatchImport(
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>,
workspaceId: string,
) {
if (
messageChannel.syncSubStatus !==
MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING
) {
return;
}
await this.messagingTelemetryService.track({
eventName: 'messages_import.started',
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
});
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`,
);
await this.messagingChannelSyncStatusService.markAsMessagesImportOngoing(
messageChannel.id,
workspaceId,
);
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
workspaceId,
connectedAccount.id,
);
const messageIdsToFetch =
(await this.cacheStorage.setPop(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE,
)) ?? [];
if (!messageIdsToFetch?.length) {
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
);
}
const messageQueries = createQueriesFromMessageIds(messageIdsToFetch);
try {
const allMessages =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
connectedAccount.accessToken,
workspaceId,
connectedAccount.id,
);
const blocklist = await this.blocklistRepository.getByWorkspaceMemberId(
connectedAccount.accountOwnerId,
workspaceId,
);
const messagesToSave = filterEmails(
allMessages,
blocklist.map((blocklistItem) => blocklistItem.handle),
);
await this.saveMessagesAndEnqueueContactCreationService.saveMessagesAndEnqueueContactCreationJob(
messagesToSave,
messageChannel,
connectedAccount,
workspaceId,
);
if (
messageIdsToFetch.length < MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE
) {
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
} else {
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
}
return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
);
} catch (error) {
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
messageIdsToFetch,
);
await this.gmailErrorHandlingService.handleGmailError(
{
code: error.code,
reason: error.errors?.[0]?.reason,
},
'messages-import',
messageChannel,
workspaceId,
);
return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
);
}
}
private async trackMessageImportCompleted(
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
workspaceId: string,
) {
await this.messagingTelemetryService.track({
eventName: 'messages_import.completed',
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
});
}
}

View File

@ -0,0 +1,152 @@
import { Injectable, Logger } from '@nestjs/common';
import { gmail_v1 } from 'googleapis';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessagingGmailFetchMessageIdsToExcludeService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-ids-to-exclude.service';
import { MessagingErrorHandlingService } from 'src/modules/messaging/common/services/messaging-error-handling.service';
import { MessagingGmailHistoryService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service';
@Injectable()
export class MessagingGmailPartialMessageListFetchService {
private readonly logger = new Logger(
MessagingGmailPartialMessageListFetchService.name,
);
constructor(
private readonly gmailClientProvider: MessagingGmailClientProvider,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
private readonly gmailGetHistoryService: MessagingGmailHistoryService,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly gmailFetchMessageIdsToExcludeService: MessagingGmailFetchMessageIdsToExcludeService,
) {}
public async processMessageListFetch(
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing(
messageChannel.id,
workspaceId,
);
const lastSyncHistoryId = messageChannel.syncCursor;
const gmailClient: gmail_v1.Gmail =
await this.gmailClientProvider.getGmailClient(
connectedAccount.refreshToken,
);
const { history, historyId, error } =
await this.gmailGetHistoryService.getHistory(
gmailClient,
lastSyncHistoryId,
);
if (error) {
await this.gmailErrorHandlingService.handleGmailError(
error,
'partial-message-list-fetch',
messageChannel,
workspaceId,
);
return;
}
if (!historyId) {
throw new Error(
`No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`,
);
}
if (historyId === lastSyncHistoryId || !history?.length) {
this.logger.log(
`Partial message list import done with history ${historyId} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
return;
}
const { messagesAdded, messagesDeleted } =
await this.gmailGetHistoryService.getMessageIdsFromHistory(history);
let messageIdsToFilter: string[] = [];
try {
messageIdsToFilter =
await this.gmailFetchMessageIdsToExcludeService.fetchEmailIdsToExcludeOrThrow(
gmailClient,
lastSyncHistoryId,
);
} catch (error) {
await this.gmailErrorHandlingService.handleGmailError(
error,
'partial-message-list-fetch',
messageChannel,
workspaceId,
);
return;
}
const messagesAddedFiltered = messagesAdded.filter(
(messageId) => !messageIdsToFilter.includes(messageId),
);
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
messagesAddedFiltered,
);
this.logger.log(
`Added ${messagesAddedFiltered.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messageChannelMessageAssociationRepository.deleteByMessageExternalIdsAndMessageChannelId(
messagesDeleted,
messageChannel.id,
workspaceId,
);
this.logger.log(
`Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messageChannelRepository.updateLastSyncCursorIfHigher(
messageChannel.id,
historyId,
workspaceId,
);
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
}
}

View File

@ -0,0 +1,13 @@
import { gmail_v1 } from 'googleapis';
type GmailMessageError = {
error: {
code: number;
message: string;
status: string;
};
};
export type GmailMessageParsedResponse =
| gmail_v1.Schema$Message
| GmailMessageError;

View File

@ -0,0 +1,32 @@
export type GmailMessage = {
historyId: string;
externalId: string;
headerMessageId: string;
subject: string;
messageThreadExternalId: string;
internalDate: string;
fromHandle: string;
fromDisplayName: string;
participants: Participant[];
text: string;
attachments: Attachment[];
};
export type Participant = {
role: 'from' | 'to' | 'cc' | 'bcc';
handle: string;
displayName: string;
};
export type ParticipantWithMessageId = Participant & { messageId: string };
export type ParticipantWithId = Participant & {
id: string;
};
export type Attachment = {
id: string;
filename: string;
size: number;
mimeType: string;
};

View File

@ -0,0 +1,4 @@
export type GmailThread = {
id: string;
subject: string;
};

View File

@ -0,0 +1,27 @@
import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter';
describe('computeGmailCategoryExcludeSearchFilter', () => {
it('should return correct exclude search filter with empty category array', () => {
const result = computeGmailCategoryExcludeSearchFilter([]);
expect(result).toBe('');
});
it('should return correct exclude search filter with one category', () => {
const result = computeGmailCategoryExcludeSearchFilter(['CATEGORY1']);
expect(result).toBe('-category:CATEGORY1');
});
it('should return correct exclude search filter with multiple categories', () => {
const result = computeGmailCategoryExcludeSearchFilter([
'CATEGORY1',
'CATEGORY2',
'CATEGORY3',
]);
expect(result).toBe(
'-category:CATEGORY1 -category:CATEGORY2 -category:CATEGORY3',
);
});
});

View File

@ -0,0 +1,9 @@
import { computeGmailCategoryLabelId } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id';
describe('computeGmailCategoryLabelId', () => {
it('should return correct category label id', () => {
const result = computeGmailCategoryLabelId('CATEGORY1');
expect(result).toBe('CATEGORY_CATEGORY1');
});
});

View File

@ -0,0 +1,3 @@
export const computeGmailCategoryExcludeSearchFilter = (
excludedCategories: string[],
) => excludedCategories.map((category) => `-category:${category}`).join(' ');

View File

@ -0,0 +1,2 @@
export const computeGmailCategoryLabelId = (category: string) =>
`CATEGORY_${category.toUpperCase()}`;