4209 speed up gmail full sync by using search params to query only the relevant emails (#4213)
* create blocklist service * blocklist is working on email import in full sync * add log * add blocklist to partial sync * define rule for blocklist imports * gmail filter is working * correct typo * fix bugs * getCompanyNameFromDomainName * renaming * remove unused service * add transaction
This commit is contained in:
@ -27,6 +27,7 @@ import { CompanyModule } from 'src/workspace/messaging/repositories/company/comp
|
||||
import { PersonModule } from 'src/workspace/messaging/repositories/person/person.module';
|
||||
import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service';
|
||||
import { MessagingConnectedAccountListener } from 'src/workspace/messaging/listeners/messaging-connected-account.listener';
|
||||
import { BlocklistModule } from 'src/workspace/messaging/repositories/blocklist/blocklist.module';
|
||||
@Module({
|
||||
imports: [
|
||||
EnvironmentModule,
|
||||
@ -42,6 +43,7 @@ import { MessagingConnectedAccountListener } from 'src/workspace/messaging/liste
|
||||
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
|
||||
CompanyModule,
|
||||
PersonModule,
|
||||
BlocklistModule,
|
||||
],
|
||||
providers: [
|
||||
GmailFullSyncService,
|
||||
|
||||
@ -0,0 +1,11 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { BlocklistService } from 'src/workspace/messaging/repositories/blocklist/blocklist.service';
|
||||
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
|
||||
|
||||
@Module({
|
||||
imports: [WorkspaceDataSourceModule],
|
||||
providers: [BlocklistService],
|
||||
exports: [BlocklistService],
|
||||
})
|
||||
export class BlocklistModule {}
|
||||
@ -0,0 +1,30 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { EntityManager } from 'typeorm';
|
||||
|
||||
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
|
||||
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
|
||||
import { BlocklistObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/blocklist.object-metadata';
|
||||
|
||||
@Injectable()
|
||||
export class BlocklistService {
|
||||
constructor(
|
||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||
) {}
|
||||
|
||||
public async getByWorkspaceMemberId(
|
||||
workspaceMemberId: string,
|
||||
workspaceId: string,
|
||||
transactionManager?: EntityManager,
|
||||
): Promise<ObjectRecord<BlocklistObjectMetadata>[]> {
|
||||
const dataSourceSchema =
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
return await this.workspaceDataSourceService.executeRawQuery(
|
||||
`SELECT * FROM ${dataSourceSchema}."blocklist" WHERE "workspaceMemberId" = $1`,
|
||||
[workspaceMemberId],
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -13,7 +13,6 @@ import { MessageChannelService } from 'src/workspace/messaging/repositories/mess
|
||||
import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
|
||||
import { MessageParticipantService } from 'src/workspace/messaging/repositories/message-participant/message-participant.service';
|
||||
import { MessageThreadService } from 'src/workspace/messaging/repositories/message-thread/message-thread.service';
|
||||
import { isPersonEmail } from 'src/workspace/messaging/utils/is-person-email.util';
|
||||
import { CreateCompaniesAndContactsService } from 'src/workspace/messaging/services/create-companies-and-contacts/create-companies-and-contacts.service';
|
||||
@Injectable()
|
||||
export class MessageService {
|
||||
@ -126,10 +125,6 @@ export class MessageService {
|
||||
const messageExternalIdsAndIdsMap = new Map<string, string>();
|
||||
|
||||
for (const message of messages) {
|
||||
if (this.shouldSkipImport(message)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await workspaceDataSource?.transaction(async (manager: EntityManager) => {
|
||||
const existingMessageChannelMessageAssociationsCount =
|
||||
await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId(
|
||||
@ -182,10 +177,6 @@ export class MessageService {
|
||||
return messageExternalIdsAndIdsMap;
|
||||
}
|
||||
|
||||
private shouldSkipImport(message: GmailMessage): boolean {
|
||||
return !isPersonEmail(message.fromHandle);
|
||||
}
|
||||
|
||||
private async saveMessageOrReturnExistingMessage(
|
||||
message: GmailMessage,
|
||||
messageThreadId: string,
|
||||
|
||||
@ -5,7 +5,7 @@ import { v4 } from 'uuid';
|
||||
import axios, { AxiosInstance } from 'axios';
|
||||
|
||||
import { CompanyService } from 'src/workspace/messaging/repositories/company/company.service';
|
||||
import { capitalize } from 'src/utils/capitalize';
|
||||
import { getCompanyNameFromDomainName } from 'src/workspace/messaging/utils/get-company-name-from-domain-name.util';
|
||||
@Injectable()
|
||||
export class CreateCompanyService {
|
||||
private readonly httpService: AxiosInstance;
|
||||
@ -100,12 +100,12 @@ export class CreateCompanyService {
|
||||
const data = response.data;
|
||||
|
||||
return {
|
||||
name: data.name,
|
||||
name: data.name ?? getCompanyNameFromDomainName(domainName),
|
||||
city: data.city,
|
||||
};
|
||||
} catch (e) {
|
||||
return {
|
||||
name: capitalize(domainName.split('.')[0]),
|
||||
name: getCompanyNameFromDomainName(domainName),
|
||||
city: '',
|
||||
};
|
||||
}
|
||||
|
||||
@ -12,6 +12,8 @@ import { ConnectedAccountService } from 'src/workspace/messaging/repositories/co
|
||||
import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service';
|
||||
import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
|
||||
import { createQueriesFromMessageIds } from 'src/workspace/messaging/utils/create-queries-from-message-ids.util';
|
||||
import { gmailSearchFilterExcludeEmails } from 'src/workspace/messaging/utils/gmail-search-filter';
|
||||
import { BlocklistService } from 'src/workspace/messaging/repositories/blocklist/blocklist.service';
|
||||
import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service';
|
||||
|
||||
@Injectable()
|
||||
@ -26,6 +28,7 @@ export class GmailFullSyncService {
|
||||
private readonly connectedAccountService: ConnectedAccountService,
|
||||
private readonly messageChannelService: MessageChannelService,
|
||||
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
|
||||
private readonly blocklistService: BlocklistService,
|
||||
private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService,
|
||||
) {}
|
||||
|
||||
@ -41,6 +44,7 @@ export class GmailFullSyncService {
|
||||
|
||||
const accessToken = connectedAccount.accessToken;
|
||||
const refreshToken = connectedAccount.refreshToken;
|
||||
const workspaceMemberId = connectedAccount.accountOwnerId;
|
||||
|
||||
if (!refreshToken) {
|
||||
throw new Error('No refresh token found');
|
||||
@ -57,12 +61,19 @@ export class GmailFullSyncService {
|
||||
const gmailClient =
|
||||
await this.gmailClientProvider.getGmailClient(refreshToken);
|
||||
|
||||
const blocklist = await this.blocklistService.getByWorkspaceMemberId(
|
||||
workspaceMemberId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
const blocklistedEmails = blocklist.map((blocklist) => blocklist.handle);
|
||||
let startTime = Date.now();
|
||||
|
||||
const messages = await gmailClient.users.messages.list({
|
||||
userId: 'me',
|
||||
maxResults: 500,
|
||||
pageToken: nextPageToken,
|
||||
q: gmailSearchFilterExcludeEmails(blocklistedEmails),
|
||||
});
|
||||
|
||||
let endTime = Date.now();
|
||||
@ -80,6 +91,10 @@ export class GmailFullSyncService {
|
||||
: [];
|
||||
|
||||
if (!messageExternalIds || messageExternalIds?.length === 0) {
|
||||
this.logger.log(
|
||||
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -11,10 +11,12 @@ import {
|
||||
GmailFullSyncJobData,
|
||||
} from 'src/workspace/messaging/jobs/gmail-full-sync.job';
|
||||
import { ConnectedAccountService } from 'src/workspace/messaging/repositories/connected-account/connected-account.service';
|
||||
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
|
||||
import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service';
|
||||
import { MessageService } from 'src/workspace/messaging/repositories/message/message.service';
|
||||
import { createQueriesFromMessageIds } from 'src/workspace/messaging/utils/create-queries-from-message-ids.util';
|
||||
import { GmailMessage } from 'src/workspace/messaging/types/gmail-message';
|
||||
import { isPersonEmail } from 'src/workspace/messaging/utils/is-person-email.util';
|
||||
import { BlocklistService } from 'src/workspace/messaging/repositories/blocklist/blocklist.service';
|
||||
import { SaveMessagesAndCreateContactsService } from 'src/workspace/messaging/services/save-messages-and-create-contacts.service';
|
||||
|
||||
@Injectable()
|
||||
@ -26,10 +28,10 @@ export class GmailPartialSyncService {
|
||||
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
|
||||
@Inject(MessageQueue.messagingQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||
private readonly connectedAccountService: ConnectedAccountService,
|
||||
private readonly messageChannelService: MessageChannelService,
|
||||
private readonly messageService: MessageService,
|
||||
private readonly blocklistService: BlocklistService,
|
||||
private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService,
|
||||
) {}
|
||||
|
||||
@ -118,7 +120,7 @@ export class GmailPartialSyncService {
|
||||
|
||||
const messageQueries = createQueriesFromMessageIds(messagesAdded);
|
||||
|
||||
const { messages: messagesToSave, errors } =
|
||||
const { messages, errors } =
|
||||
await this.fetchMessagesByBatchesService.fetchAllMessages(
|
||||
messageQueries,
|
||||
accessToken,
|
||||
@ -127,6 +129,22 @@ export class GmailPartialSyncService {
|
||||
connectedAccountId,
|
||||
);
|
||||
|
||||
const blocklist = await this.blocklistService.getByWorkspaceMemberId(
|
||||
connectedAccount.accountOwnerId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
const blocklistedEmails = blocklist.map((blocklist) => blocklist.handle);
|
||||
|
||||
const messagesToSave = messages.filter(
|
||||
(message) =>
|
||||
!this.shouldSkipImport(
|
||||
connectedAccount.handle,
|
||||
message,
|
||||
blocklistedEmails,
|
||||
),
|
||||
);
|
||||
|
||||
if (messagesToSave.length !== 0) {
|
||||
await this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts(
|
||||
messagesToSave,
|
||||
@ -292,4 +310,34 @@ export class GmailPartialSyncService {
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
private isHandleBlocked = (
|
||||
selfHandle: string,
|
||||
message: GmailMessage,
|
||||
blocklistedEmails: string[],
|
||||
): boolean => {
|
||||
// If the message is received, check if the sender is in the blocklist
|
||||
// If the message is sent, check if any of the recipients with role 'to' is in the blocklist
|
||||
|
||||
if (message.fromHandle === selfHandle) {
|
||||
return message.participants.some(
|
||||
(participant) =>
|
||||
participant.role === 'to' &&
|
||||
blocklistedEmails.includes(participant.handle),
|
||||
);
|
||||
}
|
||||
|
||||
return blocklistedEmails.includes(message.fromHandle);
|
||||
};
|
||||
|
||||
private shouldSkipImport(
|
||||
selfHandle: string,
|
||||
message: GmailMessage,
|
||||
blocklistedEmails: string[],
|
||||
): boolean {
|
||||
return (
|
||||
!isPersonEmail(message.fromHandle) ||
|
||||
this.isHandleBlocked(selfHandle, message, blocklistedEmails)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { EntityManager } from 'typeorm';
|
||||
|
||||
import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service';
|
||||
import { MessageParticipantService } from 'src/workspace/messaging/repositories/message-participant/message-participant.service';
|
||||
import { MessageService } from 'src/workspace/messaging/repositories/message/message.service';
|
||||
@ -82,10 +84,15 @@ export class SaveMessagesAndCreateContactsService {
|
||||
if (isContactAutoCreationEnabled) {
|
||||
startTime = Date.now();
|
||||
|
||||
await this.createCompaniesAndContactsService.createCompaniesAndContacts(
|
||||
connectedAccount.handle,
|
||||
contactsToCreate,
|
||||
workspaceId,
|
||||
await workspaceDataSource?.transaction(
|
||||
async (transactionManager: EntityManager) => {
|
||||
await this.createCompaniesAndContactsService.createCompaniesAndContacts(
|
||||
connectedAccount.handle,
|
||||
contactsToCreate,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
const handles = participantsWithMessageId.map(
|
||||
|
||||
@ -0,0 +1,5 @@
|
||||
import { capitalize } from 'src/utils/capitalize';
|
||||
|
||||
export const getCompanyNameFromDomainName = (domainName: string): string => {
|
||||
return capitalize(domainName.split('.').slice(-2, -1)[0]);
|
||||
};
|
||||
@ -0,0 +1,14 @@
|
||||
export const gmailSearchFilterNonPersonalEmails =
|
||||
'noreply|no-reply|do_not_reply|no.reply|accounts@|info@|admin@|contact@|hello@|support@|sales@|feedback@|service@|help@|mailer-daemon|notifications|digest|auto|apps|assign|comments|customer-success|enterprise|esign|express|forum|gc@|learn|mailer|marketing|messages|news|notification|payments|receipts|recrutement|security|service|support|team';
|
||||
|
||||
export const gmailSearchFilterExcludeEmails = (emails: string[]): string => {
|
||||
if (emails.length === 0) {
|
||||
return `from:-(${gmailSearchFilterNonPersonalEmails}`;
|
||||
}
|
||||
|
||||
return `(in:inbox from:-(${gmailSearchFilterNonPersonalEmails}|${emails.join(
|
||||
'|',
|
||||
)})|(in:sent to:-(${gmailSearchFilterNonPersonalEmails}|${emails.join(
|
||||
'|',
|
||||
)}))`;
|
||||
};
|
||||
Reference in New Issue
Block a user