From 1d1976ef22a5c10ebddb0b76d7b1932588457e05 Mon Sep 17 00:00:00 2001 From: bosiraphael <71827178+bosiraphael@users.noreply.github.com> Date: Tue, 13 Feb 2024 14:24:28 +0100 Subject: [PATCH] 3807 auto creation of contact when importing emails (#3888) * Add CreateContactService to messaging services * Add logic to create a contact if it doesn't exist * Add name * Improvements * contact creation working * fix bug * Add IsPersonEmailService to check if an email is personal or not * filter is working * improve filter * create companies and people * Refactor createContactFromHandleAndDisplayName to createContactAndCompanyFromHandleAndDisplayName * improve regex * reorganizing services * updates * reorganize folders * wip * use transaction * wip * wip * wip * batch queries * almost working * working --- .../create-company/create-company.module.ts | 10 + .../create-company.service.ts | 59 +++- .../create-contact/create-contact.module.ts | 10 + .../create-contact/create-contact.service.ts | 88 +++++ .../message-participant.module.ts | 8 +- .../message-participant.service.ts | 107 ++++++ .../message-thread/message-thread.module.ts | 3 +- .../message-thread/message-thread.service.ts | 34 ++ .../messaging/message/message.module.ts | 10 +- .../messaging/message/message.service.ts | 199 ++++++++++- .../workspace/messaging/messaging.module.ts | 13 +- .../fetch-messages-by-batches.service.ts | 5 +- .../services/gmail-full-sync.service.ts | 12 +- .../services/gmail-partial-sync.service.ts | 14 +- .../services/is-person-email.service.ts | 15 + .../services/messaging-utils.service.ts | 310 ------------------ ...create-queries-from-message-ids.service.ts | 16 + 17 files changed, 576 insertions(+), 337 deletions(-) create mode 100644 packages/twenty-server/src/workspace/messaging/create-company/create-company.module.ts rename packages/twenty-server/src/workspace/messaging/{services => create-company}/create-company.service.ts (53%) create mode 100644 packages/twenty-server/src/workspace/messaging/create-contact/create-contact.module.ts create mode 100644 packages/twenty-server/src/workspace/messaging/create-contact/create-contact.service.ts create mode 100644 packages/twenty-server/src/workspace/messaging/services/is-person-email.service.ts delete mode 100644 packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts create mode 100644 packages/twenty-server/src/workspace/messaging/services/utils/create-queries-from-message-ids.service.ts diff --git a/packages/twenty-server/src/workspace/messaging/create-company/create-company.module.ts b/packages/twenty-server/src/workspace/messaging/create-company/create-company.module.ts new file mode 100644 index 000000000..36a00ae43 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/create-company/create-company.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; + +import { CreateCompanyService } from 'src/workspace/messaging/create-company/create-company.service'; + +@Module({ + imports: [], + providers: [CreateCompanyService], + exports: [CreateCompanyService], +}) +export class CreateCompanyModule {} diff --git a/packages/twenty-server/src/workspace/messaging/services/create-company.service.ts b/packages/twenty-server/src/workspace/messaging/create-company/create-company.service.ts similarity index 53% rename from packages/twenty-server/src/workspace/messaging/services/create-company.service.ts rename to packages/twenty-server/src/workspace/messaging/create-company/create-company.service.ts index 5ca92c13c..67d45e770 100644 --- a/packages/twenty-server/src/workspace/messaging/services/create-company.service.ts +++ b/packages/twenty-server/src/workspace/messaging/create-company/create-company.service.ts @@ -16,19 +16,60 @@ export class CreateCompanyService { }); } - async createCompanyFromDomainName( + async createCompanies( + domainNames: string[], + dataSourceMetadata: DataSourceEntity, + manager: EntityManager, + ): Promise<{ + [domainName: string]: string; + }> { + const uniqueDomainNames = [...new Set(domainNames)]; + + const existingCompanies = await manager.query( + `SELECT id, "domainName" FROM ${dataSourceMetadata.schema}.company WHERE "domainName" = ANY($1)`, + [uniqueDomainNames], + ); + + const companiesObject = existingCompanies.reduce( + ( + acc: { + [domainName: string]: string; + }, + company: { + domainName: string; + id: string; + }, + ) => ({ + ...acc, + [company.domainName]: company.id, + }), + {}, + ); + + const filteredDomainNames = uniqueDomainNames.filter( + (domainName) => + !existingCompanies.some( + (company: { domainName: string }) => + company.domainName === domainName, + ), + ); + + for (const domainName of filteredDomainNames) { + companiesObject[domainName] = await this.createCompany( + domainName, + dataSourceMetadata, + manager, + ); + } + + return companiesObject; + } + + async createCompany( domainName: string, dataSourceMetadata: DataSourceEntity, manager: EntityManager, ): Promise { - const existingCompany = await manager.query( - `SELECT * FROM ${dataSourceMetadata.schema}.company WHERE "domainName" = '${domainName}'`, - ); - - if (existingCompany.length > 0) { - return existingCompany[0].id; - } - const companyId = v4(); const { name, city } = await this.getCompanyInfoFromDomainName(domainName); diff --git a/packages/twenty-server/src/workspace/messaging/create-contact/create-contact.module.ts b/packages/twenty-server/src/workspace/messaging/create-contact/create-contact.module.ts new file mode 100644 index 000000000..f90a1c6b1 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/create-contact/create-contact.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; + +import { CreateContactService } from 'src/workspace/messaging/create-contact/create-contact.service'; + +@Module({ + imports: [], + providers: [CreateContactService], + exports: [CreateContactService], +}) +export class CreateContactModule {} diff --git a/packages/twenty-server/src/workspace/messaging/create-contact/create-contact.service.ts b/packages/twenty-server/src/workspace/messaging/create-contact/create-contact.service.ts new file mode 100644 index 000000000..478aa2524 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/create-contact/create-contact.service.ts @@ -0,0 +1,88 @@ +import { Injectable } from '@nestjs/common'; + +import { EntityManager } from 'typeorm'; +import { v4 } from 'uuid'; + +import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; +import { capitalize } from 'src/utils/capitalize'; + +type ContactToCreate = { + handle: string; + displayName: string; + companyId: string; +}; + +type FormattedContactToCreate = { + id: string; + handle: string; + firstName: string; + lastName: string; + companyId: string; +}; + +@Injectable() +export class CreateContactService { + constructor() {} + + formatContacts( + contactsToCreate: ContactToCreate[], + ): FormattedContactToCreate[] { + return contactsToCreate.map((contact) => { + const { handle, displayName, companyId } = contact; + + const contactFirstName = displayName.split(' ')[0]; + const contactLastName = displayName.split(' ')[1]; + + const contactFullNameFromHandle = handle.split('@')[0]; + const contactFirstNameFromHandle = + contactFullNameFromHandle.split('.')[0]; + const contactLastNameFromHandle = contactFullNameFromHandle.split('.')[1]; + + const id = v4(); + + return { + id, + handle, + firstName: capitalize( + contactFirstName || contactFirstNameFromHandle || '', + ), + lastName: capitalize( + contactLastName || contactLastNameFromHandle || '', + ), + companyId, + }; + }); + } + + async createContacts( + contactsToCreate: ContactToCreate[], + dataSourceMetadata: DataSourceEntity, + manager: EntityManager, + ): Promise { + if (contactsToCreate.length === 0) return; + + const formattedContacts = this.formatContacts(contactsToCreate); + + const valuesString = formattedContacts + .map( + (_, index) => + `($${index * 5 + 1}, $${index * 5 + 2}, $${index * 5 + 3}, $${ + index * 5 + 4 + }, $${index * 5 + 5})`, + ) + .join(', '); + + await manager.query( + `INSERT INTO ${dataSourceMetadata.schema}.person (id, email, "nameFirstName", "nameLastName", "companyId") VALUES ${valuesString}`, + formattedContacts + .map((contact) => [ + contact.id, + contact.handle, + contact.firstName, + contact.lastName, + contact.companyId, + ]) + .flat(), + ); + } +} diff --git a/packages/twenty-server/src/workspace/messaging/message-participant/message-participant.module.ts b/packages/twenty-server/src/workspace/messaging/message-participant/message-participant.module.ts index ae44e5411..9a866627b 100644 --- a/packages/twenty-server/src/workspace/messaging/message-participant/message-participant.module.ts +++ b/packages/twenty-server/src/workspace/messaging/message-participant/message-participant.module.ts @@ -1,10 +1,16 @@ import { Module } from '@nestjs/common'; +import { CreateCompanyModule } from 'src/workspace/messaging/create-company/create-company.module'; +import { CreateContactModule } from 'src/workspace/messaging/create-contact/create-contact.module'; import { MessageParticipantService } from 'src/workspace/messaging/message-participant/message-participant.service'; import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; @Module({ - imports: [WorkspaceDataSourceModule], + imports: [ + WorkspaceDataSourceModule, + CreateContactModule, + CreateCompanyModule, + ], providers: [MessageParticipantService], exports: [MessageParticipantService], }) diff --git a/packages/twenty-server/src/workspace/messaging/message-participant/message-participant.service.ts b/packages/twenty-server/src/workspace/messaging/message-participant/message-participant.service.ts index cec95aa06..9a8680a93 100644 --- a/packages/twenty-server/src/workspace/messaging/message-participant/message-participant.service.ts +++ b/packages/twenty-server/src/workspace/messaging/message-participant/message-participant.service.ts @@ -5,11 +5,17 @@ import { EntityManager } from 'typeorm'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { MessageParticipantObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-participant.object-metadata'; import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; +import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; +import { Participant } from 'src/workspace/messaging/types/gmail-message'; +import { CreateContactService } from 'src/workspace/messaging/create-contact/create-contact.service'; +import { CreateCompanyService } from 'src/workspace/messaging/create-company/create-company.service'; @Injectable() export class MessageParticipantService { constructor( private readonly workspaceDataSourceService: WorkspaceDataSourceService, + private readonly createContactService: CreateContactService, + private readonly createCompaniesService: CreateCompanyService, ) {} public async getByHandles( @@ -61,4 +67,105 @@ export class MessageParticipantService { transactionManager, ); } + + public async saveMessageParticipants( + participants: Participant[], + messageId: string, + dataSourceMetadata: DataSourceEntity, + manager: EntityManager, + ): Promise { + if (!participants) return; + + const alreadyCreatedContacts = await manager.query( + `SELECT email FROM ${dataSourceMetadata.schema}."person" WHERE "email" = ANY($1)`, + [participants.map((participant) => participant.handle)], + ); + + const alreadyCreatedContactEmails: string[] = alreadyCreatedContacts?.map( + ({ email }) => email, + ); + + const filteredParticipants = participants.filter( + (participant) => + !alreadyCreatedContactEmails.includes(participant.handle) && + participant.handle.includes('@'), + ); + + const filteredParticipantsWihCompanyDomainNames = filteredParticipants?.map( + (participant) => ({ + handle: participant.handle, + displayName: participant.displayName, + companyDomainName: participant.handle + .split('@')?.[1] + .split('.') + .slice(-2) + .join('.') + .toLowerCase(), + }), + ); + + const domainNamesToCreate = filteredParticipantsWihCompanyDomainNames.map( + (participant) => participant.companyDomainName, + ); + + const companiesObject = await this.createCompaniesService.createCompanies( + domainNamesToCreate, + dataSourceMetadata, + manager, + ); + + const contactsToCreate = filteredParticipantsWihCompanyDomainNames.map( + (participant) => ({ + handle: participant.handle, + displayName: participant.displayName, + companyId: companiesObject[participant.companyDomainName], + }), + ); + + await this.createContactService.createContacts( + contactsToCreate, + dataSourceMetadata, + manager, + ); + + const handles = participants.map((participant) => participant.handle); + + const participantPersonIds = await manager.query( + `SELECT id, email FROM ${dataSourceMetadata.schema}."person" WHERE "email" = ANY($1)`, + [handles], + ); + + const participantWorkspaceMemberIds = await manager.query( + `SELECT "workspaceMember"."id", "connectedAccount"."handle" AS email FROM ${dataSourceMetadata.schema}."workspaceMember" + JOIN ${dataSourceMetadata.schema}."connectedAccount" ON ${dataSourceMetadata.schema}."workspaceMember"."id" = ${dataSourceMetadata.schema}."connectedAccount"."accountOwnerId" + WHERE ${dataSourceMetadata.schema}."connectedAccount"."handle" = ANY($1)`, + [handles], + ); + + const messageParticipantsToSave = participants.map((participant) => [ + messageId, + participant.role, + participant.handle, + participant.displayName, + participantPersonIds.find((e) => e.email === participant.handle)?.id, + participantWorkspaceMemberIds.find((e) => e.email === participant.handle) + ?.id, + ]); + + const valuesString = messageParticipantsToSave + .map( + (_, index) => + `($${index * 6 + 1}, $${index * 6 + 2}, $${index * 6 + 3}, $${ + index * 6 + 4 + }, $${index * 6 + 5}, $${index * 6 + 6})`, + ) + .join(', '); + + if (messageParticipantsToSave.length === 0) return; + + await manager.query( + `INSERT INTO ${dataSourceMetadata.schema}."messageParticipant" ("messageId", "role", "handle", "displayName", "personId", "workspaceMemberId") VALUES ${valuesString}`, + messageParticipantsToSave.flat(), + ); + } } diff --git a/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.module.ts b/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.module.ts index 8ee9ec903..f2c9b54c5 100644 --- a/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.module.ts +++ b/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.module.ts @@ -1,10 +1,11 @@ import { Module } from '@nestjs/common'; +import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module'; import { MessageThreadService } from 'src/workspace/messaging/message-thread/message-thread.service'; import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; @Module({ - imports: [WorkspaceDataSourceModule], + imports: [WorkspaceDataSourceModule, MessageChannelMessageAssociationModule], providers: [MessageThreadService], exports: [MessageThreadService], }) diff --git a/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.service.ts b/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.service.ts index f25fa16b3..57fbda912 100644 --- a/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.service.ts +++ b/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.service.ts @@ -1,12 +1,16 @@ import { Injectable } from '@nestjs/common'; import { EntityManager } from 'typeorm'; +import { v4 } from 'uuid'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; +import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; +import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service'; @Injectable() export class MessageThreadService { constructor( + private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} @@ -25,4 +29,34 @@ export class MessageThreadService { transactionManager, ); } + + public async saveMessageThreadOrReturnExistingMessageThread( + messageThreadExternalId: string, + dataSourceMetadata: DataSourceEntity, + workspaceId: string, + manager: EntityManager, + ) { + const existingMessageChannelMessageAssociationByMessageThreadExternalId = + await this.messageChannelMessageAssociationService.getFirstByMessageThreadExternalId( + messageThreadExternalId, + workspaceId, + manager, + ); + + const existingMessageThread = + existingMessageChannelMessageAssociationByMessageThreadExternalId?.messageThreadId; + + if (existingMessageThread) { + return Promise.resolve(existingMessageThread); + } + + const newMessageThreadId = v4(); + + await manager.query( + `INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("id") VALUES ($1)`, + [newMessageThreadId], + ); + + return Promise.resolve(newMessageThreadId); + } } diff --git a/packages/twenty-server/src/workspace/messaging/message/message.module.ts b/packages/twenty-server/src/workspace/messaging/message/message.module.ts index b9380587f..cf3b92975 100644 --- a/packages/twenty-server/src/workspace/messaging/message/message.module.ts +++ b/packages/twenty-server/src/workspace/messaging/message/message.module.ts @@ -1,10 +1,18 @@ import { Module } from '@nestjs/common'; +import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module'; +import { MessageParticipantModule } from 'src/workspace/messaging/message-participant/message-participant.module'; +import { MessageThreadModule } from 'src/workspace/messaging/message-thread/message-thread.module'; import { MessageService } from 'src/workspace/messaging/message/message.service'; import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; @Module({ - imports: [WorkspaceDataSourceModule], + imports: [ + WorkspaceDataSourceModule, + MessageThreadModule, + MessageParticipantModule, + MessageChannelMessageAssociationModule, + ], providers: [MessageService], exports: [MessageService], }) diff --git a/packages/twenty-server/src/workspace/messaging/message/message.service.ts b/packages/twenty-server/src/workspace/messaging/message/message.service.ts index cfa4a21e8..861cf1822 100644 --- a/packages/twenty-server/src/workspace/messaging/message/message.service.ts +++ b/packages/twenty-server/src/workspace/messaging/message/message.service.ts @@ -1,15 +1,25 @@ import { Injectable } from '@nestjs/common'; -import { EntityManager } from 'typeorm'; +import { DataSource, EntityManager } from 'typeorm'; +import { v4 } from 'uuid'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { MessageObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message.object-metadata'; import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; +import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; +import { GmailMessage } from 'src/workspace/messaging/types/gmail-message'; +import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata'; +import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service'; +import { MessageThreadService } from 'src/workspace/messaging/message-thread/message-thread.service'; +import { MessageParticipantService } from 'src/workspace/messaging/message-participant/message-participant.service'; @Injectable() export class MessageService { constructor( private readonly workspaceDataSourceService: WorkspaceDataSourceService, + private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, + private readonly messageThreadService: MessageThreadService, + private readonly messageParticipantService: MessageParticipantService, ) {} public async getFirstByHeaderMessageId( @@ -81,4 +91,191 @@ export class MessageService { transactionManager, ); } + + public async saveMessages( + messages: GmailMessage[], + dataSourceMetadata: DataSourceEntity, + workspaceDataSource: DataSource, + connectedAccount: ObjectRecord, + gmailMessageChannelId: string, + workspaceId: string, + ) { + for (const message of messages) { + await workspaceDataSource?.transaction(async (manager: EntityManager) => { + const existingMessageChannelMessageAssociationsCount = + await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId( + [message.externalId], + gmailMessageChannelId, + workspaceId, + manager, + ); + + if (existingMessageChannelMessageAssociationsCount > 0) { + return; + } + + const savedOrExistingMessageThreadId = + await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread( + message.messageThreadExternalId, + dataSourceMetadata, + workspaceId, + manager, + ); + + const savedOrExistingMessageId = + await this.saveMessageOrReturnExistingMessage( + message, + savedOrExistingMessageThreadId, + connectedAccount, + dataSourceMetadata, + workspaceId, + manager, + ); + + await manager.query( + `INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`, + [ + gmailMessageChannelId, + savedOrExistingMessageId, + message.externalId, + savedOrExistingMessageThreadId, + message.messageThreadExternalId, + ], + ); + }); + } + } + + private async saveMessageOrReturnExistingMessage( + message: GmailMessage, + messageThreadId: string, + connectedAccount: ObjectRecord, + dataSourceMetadata: DataSourceEntity, + workspaceId: string, + manager: EntityManager, + ): Promise { + const existingMessage = await this.getFirstByHeaderMessageId( + message.headerMessageId, + workspaceId, + ); + const existingMessageId = existingMessage?.id; + + if (existingMessageId) { + return Promise.resolve(existingMessageId); + } + + const newMessageId = v4(); + + const messageDirection = + connectedAccount.handle === message.fromHandle ? 'outgoing' : 'incoming'; + + const receivedAt = new Date(parseInt(message.internalDate)); + + await manager.query( + `INSERT INTO ${dataSourceMetadata.schema}."message" ("id", "headerMessageId", "subject", "receivedAt", "direction", "messageThreadId", "text", "html") VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, + [ + newMessageId, + message.headerMessageId, + message.subject, + receivedAt, + messageDirection, + messageThreadId, + message.text, + message.html, + ], + ); + + await this.messageParticipantService.saveMessageParticipants( + message.participants, + newMessageId, + dataSourceMetadata, + manager, + ); + + return Promise.resolve(newMessageId); + } + + public async deleteMessages( + workspaceDataSource: DataSource, + messagesDeletedMessageExternalIds: string[], + gmailMessageChannelId: string, + workspaceId: string, + ) { + await workspaceDataSource?.transaction(async (manager: EntityManager) => { + const messageChannelMessageAssociationsToDelete = + await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId( + messagesDeletedMessageExternalIds, + gmailMessageChannelId, + workspaceId, + manager, + ); + + const messageChannelMessageAssociationIdsToDeleteIds = + messageChannelMessageAssociationsToDelete.map( + (messageChannelMessageAssociationToDelete) => + messageChannelMessageAssociationToDelete.id, + ); + + await this.messageChannelMessageAssociationService.deleteByIds( + messageChannelMessageAssociationIdsToDeleteIds, + workspaceId, + manager, + ); + + const messageIdsFromMessageChannelMessageAssociationsToDelete = + messageChannelMessageAssociationsToDelete.map( + (messageChannelMessageAssociationToDelete) => + messageChannelMessageAssociationToDelete.messageId, + ); + + const messageChannelMessageAssociationByMessageIds = + await this.messageChannelMessageAssociationService.getByMessageIds( + messageIdsFromMessageChannelMessageAssociationsToDelete, + workspaceId, + manager, + ); + + const messageIdsFromMessageChannelMessageAssociationByMessageIds = + messageChannelMessageAssociationByMessageIds.map( + (messageChannelMessageAssociation) => + messageChannelMessageAssociation.messageId, + ); + + const messageIdsToDelete = + messageIdsFromMessageChannelMessageAssociationsToDelete.filter( + (messageId) => + !messageIdsFromMessageChannelMessageAssociationByMessageIds.includes( + messageId, + ), + ); + + await this.deleteByIds(messageIdsToDelete, workspaceId, manager); + + const messageThreadIdsFromMessageChannelMessageAssociationsToDelete = + messageChannelMessageAssociationsToDelete.map( + (messageChannelMessageAssociationToDelete) => + messageChannelMessageAssociationToDelete.messageThreadId, + ); + + const messagesByThreadIds = await this.getByMessageThreadIds( + messageThreadIdsFromMessageChannelMessageAssociationsToDelete, + workspaceId, + manager, + ); + + const threadIdsToDelete = + messageThreadIdsFromMessageChannelMessageAssociationsToDelete.filter( + (threadId) => + !messagesByThreadIds.find( + (message) => message.messageThreadId === threadId, + ), + ); + + await this.messageThreadService.deleteByIds( + threadIdsToDelete, + workspaceId, + manager, + ); + }); + } } diff --git a/packages/twenty-server/src/workspace/messaging/messaging.module.ts b/packages/twenty-server/src/workspace/messaging/messaging.module.ts index 72613cd9f..3a349d3c9 100644 --- a/packages/twenty-server/src/workspace/messaging/messaging.module.ts +++ b/packages/twenty-server/src/workspace/messaging/messaging.module.ts @@ -4,19 +4,22 @@ import { ConnectedAccountModule } from 'src/workspace/messaging/connected-accoun import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module'; import { MessageChannelModule } from 'src/workspace/messaging/message-channel/message-channel.module'; import { MessageThreadModule } from 'src/workspace/messaging/message-thread/message-thread.module'; -import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; import { EnvironmentModule } from 'src/integrations/environment/environment.module'; import { MessagingPersonListener } from 'src/workspace/messaging/listeners/messaging-person.listener'; import { MessageModule } from 'src/workspace/messaging/message/message.module'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; -import { CreateCompanyService } from 'src/workspace/messaging/services/create-company.service'; +import { CreateContactService } from 'src/workspace/messaging/create-contact/create-contact.service'; +import { CreateCompanyService } from 'src/workspace/messaging/create-company/create-company.service'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service'; import { GmailPartialSyncService } from 'src/workspace/messaging/services/gmail-partial-sync.service'; import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service'; +import { IsPersonEmailService } from 'src/workspace/messaging/services/is-person-email.service'; import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; import { MessageParticipantModule } from 'src/workspace/messaging/message-participant/message-participant.module'; import { MessagingWorkspaceMemberListener } from 'src/workspace/messaging/listeners/messaging-workspace-member.listener'; +import { MessageService } from 'src/workspace/messaging/message/message.service'; +import { CreateQueriesFromMessageIdsService } from 'src/workspace/messaging/services/utils/create-queries-from-message-ids.service'; @Module({ imports: [ @@ -34,17 +37,19 @@ import { MessagingWorkspaceMemberListener } from 'src/workspace/messaging/listen GmailPartialSyncService, FetchMessagesByBatchesService, GmailRefreshAccessTokenService, - MessagingUtilsService, GmailClientProvider, + CreateContactService, + IsPersonEmailService, CreateCompanyService, MessagingPersonListener, MessagingWorkspaceMemberListener, + MessageService, + CreateQueriesFromMessageIdsService, ], exports: [ GmailPartialSyncService, GmailFullSyncService, GmailRefreshAccessTokenService, - MessagingUtilsService, ], }) export class MessagingModule {} diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts index 66006a906..a9a60b063 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts @@ -10,12 +10,13 @@ import { } from 'src/workspace/messaging/types/gmail-message'; import { MessageQuery } from 'src/workspace/messaging/types/message-or-thread-query'; import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmail-message-parsed-response'; +import { IsPersonEmailService } from 'src/workspace/messaging/services/is-person-email.service'; @Injectable() export class FetchMessagesByBatchesService { private readonly httpService: AxiosInstance; - constructor() { + constructor(private readonly isPersonEmailService: IsPersonEmailService) { this.httpService = axios.create({ baseURL: 'https://www.googleapis.com/batch/gmail/v1', }); @@ -189,6 +190,8 @@ export class FetchMessagesByBatchesService { } = parsed; if (!from) throw new Error('From value is missing'); + if (!this.isPersonEmailService.isPersonEmail(from.value[0].address)) + return; if (!to) throw new Error('To value is missing'); const participants = [ diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts index 55a34a8bc..e456d9f9a 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts @@ -2,7 +2,6 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; -import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { @@ -13,6 +12,8 @@ import { ConnectedAccountService } from 'src/workspace/messaging/connected-accou import { MessageChannelService } from 'src/workspace/messaging/message-channel/message-channel.service'; import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; +import { CreateQueriesFromMessageIdsService } from 'src/workspace/messaging/services/utils/create-queries-from-message-ids.service'; +import { MessageService } from 'src/workspace/messaging/message/message.service'; @Injectable() export class GmailFullSyncService { @@ -21,13 +22,14 @@ export class GmailFullSyncService { constructor( private readonly gmailClientProvider: GmailClientProvider, private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, - private readonly utils: MessagingUtilsService, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly connectedAccountService: ConnectedAccountService, private readonly messageChannelService: MessageChannelService, private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, + private readonly messageService: MessageService, + private readonly createQueriesFromMessageIdsService: CreateQueriesFromMessageIdsService, ) {} public async fetchConnectedAccountThreads( @@ -100,7 +102,9 @@ export class GmailFullSyncService { ); const messageQueries = - this.utils.createQueriesFromMessageIds(messagesToFetch); + this.createQueriesFromMessageIdsService.createQueriesFromMessageIds( + messagesToFetch, + ); const { messages: messagesToSave, errors } = await this.fetchMessagesByBatchesService.fetchAllMessages( @@ -116,7 +120,7 @@ export class GmailFullSyncService { return; } - await this.utils.saveMessages( + await this.messageService.saveMessages( messagesToSave, dataSourceMetadata, workspaceDataSource, diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts index 7f835f33a..048394c42 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts @@ -13,7 +13,8 @@ import { import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { MessageChannelService } from 'src/workspace/messaging/message-channel/message-channel.service'; -import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; +import { MessageService } from 'src/workspace/messaging/message/message.service'; +import { CreateQueriesFromMessageIdsService } from 'src/workspace/messaging/services/utils/create-queries-from-message-ids.service'; @Injectable() export class GmailPartialSyncService { @@ -22,12 +23,13 @@ export class GmailPartialSyncService { constructor( private readonly gmailClientProvider: GmailClientProvider, private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, - private readonly utils: MessagingUtilsService, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly connectedAccountService: ConnectedAccountService, private readonly messageChannelService: MessageChannelService, + private readonly messageService: MessageService, + private readonly createQueriesFromMessageIdsService: CreateQueriesFromMessageIdsService, ) {} public async fetchConnectedAccountThreads( @@ -98,7 +100,9 @@ export class GmailPartialSyncService { await this.getMessageIdsFromHistory(history); const messageQueries = - this.utils.createQueriesFromMessageIds(messagesAdded); + this.createQueriesFromMessageIdsService.createQueriesFromMessageIds( + messagesAdded, + ); const { messages: messagesToSave, errors } = await this.fetchMessagesByBatchesService.fetchAllMessages( @@ -107,7 +111,7 @@ export class GmailPartialSyncService { ); if (messagesToSave.length !== 0) { - await this.utils.saveMessages( + await this.messageService.saveMessages( messagesToSave, dataSourceMetadata, workspaceDataSource, @@ -118,7 +122,7 @@ export class GmailPartialSyncService { } if (messagesDeleted.length !== 0) { - await this.utils.deleteMessages( + await this.messageService.deleteMessages( workspaceDataSource, messagesDeleted, gmailMessageChannelId, diff --git a/packages/twenty-server/src/workspace/messaging/services/is-person-email.service.ts b/packages/twenty-server/src/workspace/messaging/services/is-person-email.service.ts new file mode 100644 index 000000000..6ec343212 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/services/is-person-email.service.ts @@ -0,0 +1,15 @@ +import { Injectable } from '@nestjs/common'; + +@Injectable() +export class IsPersonEmailService { + constructor() {} + + isPersonEmail(email: string | undefined): boolean { + if (!email) return false; + + const nonPersonalPattern = + /noreply|no-reply|do_not_reply|no\.reply|^(accounts@|info@|admin@|contact@|hello@|support@|sales@|feedback@|service@|help@|mailer-daemon|notifications?|digest|auto|apps|assign|comments|customer-success|enterprise|esign|express|forum|gc@|learn|mailer|marketing|messages|news|notification|payments|receipts|recrutement|security|service|support|team)/; + + return !nonPersonalPattern.test(email); + } +} diff --git a/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts b/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts deleted file mode 100644 index 51436215d..000000000 --- a/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts +++ /dev/null @@ -1,310 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager, DataSource } from 'typeorm'; -import { v4 } from 'uuid'; - -import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; -import { - GmailMessage, - Participant, -} from 'src/workspace/messaging/types/gmail-message'; -import { MessageQuery } from 'src/workspace/messaging/types/message-or-thread-query'; -import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service'; -import { MessageService } from 'src/workspace/messaging/message/message.service'; -import { MessageThreadService } from 'src/workspace/messaging/message-thread/message-thread.service'; -import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; -import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata'; -import { CreateCompanyService } from 'src/workspace/messaging/services/create-company.service'; - -@Injectable() -export class MessagingUtilsService { - constructor( - private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, - private readonly messageService: MessageService, - private readonly messageThreadService: MessageThreadService, - private readonly createCompaniesService: CreateCompanyService, - ) {} - - public createQueriesFromMessageIds( - messageExternalIds: string[], - ): MessageQuery[] { - return messageExternalIds.map((messageId) => ({ - uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW', - })); - } - - public async saveMessages( - messages: GmailMessage[], - dataSourceMetadata: DataSourceEntity, - workspaceDataSource: DataSource, - connectedAccount: ObjectRecord, - gmailMessageChannelId: string, - workspaceId: string, - ) { - for (const message of messages) { - await workspaceDataSource?.transaction(async (manager: EntityManager) => { - const existingMessageChannelMessageAssociationsCount = - await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId( - [message.externalId], - gmailMessageChannelId, - workspaceId, - manager, - ); - - if (existingMessageChannelMessageAssociationsCount > 0) { - return; - } - - const savedOrExistingMessageThreadId = - await this.saveMessageThreadOrReturnExistingMessageThread( - message.messageThreadExternalId, - dataSourceMetadata, - workspaceId, - manager, - ); - - const savedOrExistingMessageId = - await this.saveMessageOrReturnExistingMessage( - message, - savedOrExistingMessageThreadId, - connectedAccount, - dataSourceMetadata, - workspaceId, - manager, - ); - - await manager.query( - `INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`, - [ - gmailMessageChannelId, - savedOrExistingMessageId, - message.externalId, - savedOrExistingMessageThreadId, - message.messageThreadExternalId, - ], - ); - }); - } - } - - private async saveMessageOrReturnExistingMessage( - message: GmailMessage, - messageThreadId: string, - connectedAccount: ObjectRecord, - dataSourceMetadata: DataSourceEntity, - workspaceId: string, - manager: EntityManager, - ): Promise { - const existingMessage = await this.messageService.getFirstByHeaderMessageId( - message.headerMessageId, - workspaceId, - ); - const existingMessageId = existingMessage?.id; - - if (existingMessageId) { - return Promise.resolve(existingMessageId); - } - - const newMessageId = v4(); - - const messageDirection = - connectedAccount.handle === message.fromHandle ? 'outgoing' : 'incoming'; - - const receivedAt = new Date(parseInt(message.internalDate)); - - await manager.query( - `INSERT INTO ${dataSourceMetadata.schema}."message" ("id", "headerMessageId", "subject", "receivedAt", "direction", "messageThreadId", "text", "html") VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, - [ - newMessageId, - message.headerMessageId, - message.subject, - receivedAt, - messageDirection, - messageThreadId, - message.text, - message.html, - ], - ); - - await this.saveMessageParticipants( - message.participants, - newMessageId, - dataSourceMetadata, - manager, - ); - - return Promise.resolve(newMessageId); - } - - private async saveMessageThreadOrReturnExistingMessageThread( - messageThreadExternalId: string, - dataSourceMetadata: DataSourceEntity, - workspaceId: string, - manager: EntityManager, - ) { - const existingMessageChannelMessageAssociationByMessageThreadExternalId = - await this.messageChannelMessageAssociationService.getFirstByMessageThreadExternalId( - messageThreadExternalId, - workspaceId, - manager, - ); - - const existingMessageThread = - existingMessageChannelMessageAssociationByMessageThreadExternalId?.messageThreadId; - - if (existingMessageThread) { - return Promise.resolve(existingMessageThread); - } - - const newMessageThreadId = v4(); - - await manager.query( - `INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("id") VALUES ($1)`, - [newMessageThreadId], - ); - - return Promise.resolve(newMessageThreadId); - } - - private async saveMessageParticipants( - participants: Participant[], - messageId: string, - dataSourceMetadata: DataSourceEntity, - manager: EntityManager, - ): Promise { - if (!participants) return; - - for (const participant of participants) { - const participantPerson = await manager.query( - `SELECT "person"."id" FROM ${dataSourceMetadata.schema}."person" WHERE "email" = $1 LIMIT 1`, - [participant.handle], - ); - - const participantPersonId = participantPerson[0]?.id; - - const workspaceMember = await manager.query( - `SELECT "workspaceMember"."id" FROM ${dataSourceMetadata.schema}."workspaceMember" - JOIN ${dataSourceMetadata.schema}."connectedAccount" ON ${dataSourceMetadata.schema}."workspaceMember"."id" = ${dataSourceMetadata.schema}."connectedAccount"."accountOwnerId" - WHERE ${dataSourceMetadata.schema}."connectedAccount"."handle" = $1 - LIMIT 1`, - [participant.handle], - ); - - const participantWorkspaceMemberId = workspaceMember[0]?.id; - - await manager.query( - `INSERT INTO ${dataSourceMetadata.schema}."messageParticipant" ("messageId", "role", "handle", "displayName", "personId", "workspaceMemberId") VALUES ($1, $2, $3, $4, $5, $6)`, - [ - messageId, - participant.role, - participant.handle, - participant.displayName, - participantPersonId, - participantWorkspaceMemberId, - ], - ); - - const companyDomainName = participant.handle - .split('@')?.[1] - .split('.') - .slice(-2) - .join('.') - .toLowerCase(); - - await this.createCompaniesService.createCompanyFromDomainName( - companyDomainName, - dataSourceMetadata, - manager, - ); - } - } - - public async deleteMessages( - workspaceDataSource: DataSource, - messagesDeletedMessageExternalIds: string[], - gmailMessageChannelId: string, - workspaceId: string, - ) { - await workspaceDataSource?.transaction(async (manager: EntityManager) => { - const messageChannelMessageAssociationsToDelete = - await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId( - messagesDeletedMessageExternalIds, - gmailMessageChannelId, - workspaceId, - manager, - ); - - const messageChannelMessageAssociationIdsToDeleteIds = - messageChannelMessageAssociationsToDelete.map( - (messageChannelMessageAssociationToDelete) => - messageChannelMessageAssociationToDelete.id, - ); - - await this.messageChannelMessageAssociationService.deleteByIds( - messageChannelMessageAssociationIdsToDeleteIds, - workspaceId, - manager, - ); - - const messageIdsFromMessageChannelMessageAssociationsToDelete = - messageChannelMessageAssociationsToDelete.map( - (messageChannelMessageAssociationToDelete) => - messageChannelMessageAssociationToDelete.messageId, - ); - - const messageChannelMessageAssociationByMessageIds = - await this.messageChannelMessageAssociationService.getByMessageIds( - messageIdsFromMessageChannelMessageAssociationsToDelete, - workspaceId, - manager, - ); - - const messageIdsFromMessageChannelMessageAssociationByMessageIds = - messageChannelMessageAssociationByMessageIds.map( - (messageChannelMessageAssociation) => - messageChannelMessageAssociation.messageId, - ); - - const messageIdsToDelete = - messageIdsFromMessageChannelMessageAssociationsToDelete.filter( - (messageId) => - !messageIdsFromMessageChannelMessageAssociationByMessageIds.includes( - messageId, - ), - ); - - await this.messageService.deleteByIds( - messageIdsToDelete, - workspaceId, - manager, - ); - - const messageThreadIdsFromMessageChannelMessageAssociationsToDelete = - messageChannelMessageAssociationsToDelete.map( - (messageChannelMessageAssociationToDelete) => - messageChannelMessageAssociationToDelete.messageThreadId, - ); - - const messagesByThreadIds = - await this.messageService.getByMessageThreadIds( - messageThreadIdsFromMessageChannelMessageAssociationsToDelete, - workspaceId, - manager, - ); - - const threadIdsToDelete = - messageThreadIdsFromMessageChannelMessageAssociationsToDelete.filter( - (threadId) => - !messagesByThreadIds.find( - (message) => message.messageThreadId === threadId, - ), - ); - - await this.messageThreadService.deleteByIds( - threadIdsToDelete, - workspaceId, - manager, - ); - }); - } -} diff --git a/packages/twenty-server/src/workspace/messaging/services/utils/create-queries-from-message-ids.service.ts b/packages/twenty-server/src/workspace/messaging/services/utils/create-queries-from-message-ids.service.ts new file mode 100644 index 000000000..3371ad5d6 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/services/utils/create-queries-from-message-ids.service.ts @@ -0,0 +1,16 @@ +import { Injectable } from '@nestjs/common'; + +import { MessageQuery } from 'src/workspace/messaging/types/message-or-thread-query'; + +@Injectable() +export class CreateQueriesFromMessageIdsService { + constructor() {} + + public createQueriesFromMessageIds( + messageExternalIds: string[], + ): MessageQuery[] { + return messageExternalIds.map((messageId) => ({ + uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW', + })); + } +}