From a2017eaeb7df89f26022d41deb4f9c827367b26b Mon Sep 17 00:00:00 2001 From: Charles Bochet Date: Mon, 6 May 2024 23:43:18 +0200 Subject: [PATCH] Improve messaging/calendar create contact performance (#5314) In this PR, I'm refactoring the way we associate messageParticipant post person/company creation. Instead of looking a all person without participant, we are passing the one that were just created. Also, I'm making sure the message and messageParticipant creation transaction is commited before creating person/company creation (and then messageParticipant association) --- .../calendar-event-participant.repository.ts | 2 +- .../calendar-event-participant.service.ts | 7 +- .../create-contact/create-contact.service.ts | 9 +- .../create-company-and-contact.service.ts | 15 +- ...fetch-message-content-from-cache.module.ts | 5 +- ...etch-message-content-from-cache.service.ts | 194 ++++++++++------ .../gmail-partial-sync.module.ts | 2 - .../message-participant.service.ts | 11 +- ...-and-emit-contact-creation-event.module.ts | 22 -- ...and-emit-contact-creation-event.service.ts | 219 ------------------ .../person/repositories/person.repository.ts | 4 +- 11 files changed, 154 insertions(+), 336 deletions(-) delete mode 100644 packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module.ts delete mode 100644 packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service.ts diff --git a/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts b/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts index bbcb96759..b91e32697 100644 --- a/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts +++ b/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts @@ -212,7 +212,7 @@ export class CalendarEventParticipantRepository { handle: 'text', displayName: 'text', isOrganizer: 'boolean', - responseStatus: `${dataSourceSchema}."calendarEventParticipant_responsestatus_enum"`, + responseStatus: `${dataSourceSchema}."calendarEventParticipant_responseStatus_enum"`, }, ); diff --git a/packages/twenty-server/src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service.ts b/packages/twenty-server/src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service.ts index a9a83f597..578babd9b 100644 --- a/packages/twenty-server/src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service.ts +++ b/packages/twenty-server/src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service.ts @@ -11,6 +11,7 @@ import { CalendarEventParticipant } from 'src/modules/calendar/types/calendar-ev import { CalendarEventParticipantRepository } from 'src/modules/calendar/repositories/calendar-event-participant.repository'; import { CalendarEventParticipantObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-event-participant.object-metadata'; import { AddPersonIdAndWorkspaceMemberIdService } from 'src/modules/calendar-messaging-participant/services/add-person-id-and-workspace-member-id/add-person-id-and-workspace-member-id.service'; +import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; @Injectable() export class CalendarEventParticipantService { @@ -24,11 +25,13 @@ export class CalendarEventParticipantService { ) {} public async updateCalendarEventParticipantsAfterPeopleCreation( + createdPeople: ObjectRecord[], workspaceId: string, transactionManager?: EntityManager, ): Promise { const participants = - await this.calendarEventParticipantRepository.getWithoutPersonIdAndWorkspaceMemberId( + await this.calendarEventParticipantRepository.getByHandles( + createdPeople.map((person) => person.email), workspaceId, ); @@ -102,7 +105,7 @@ export class CalendarEventParticipantService { handle: 'text', displayName: 'text', isOrganizer: 'boolean', - responseStatus: `${dataSourceSchema}."calendarEventParticipant_responsestatus_enum"`, + responseStatus: `${dataSourceSchema}."calendarEventParticipant_responseStatus_enum"`, personId: 'uuid', workspaceMemberId: 'uuid', }, diff --git a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/create-contact/create-contact.service.ts b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/create-contact/create-contact.service.ts index 720dee298..9521a8f25 100644 --- a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/create-contact/create-contact.service.ts +++ b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/create-contact/create-contact.service.ts @@ -7,6 +7,7 @@ import { PersonRepository } from 'src/modules/person/repositories/person.reposit import { getFirstNameAndLastNameFromHandleAndDisplayName } from 'src/modules/calendar-messaging-participant/utils/get-first-name-and-last-name-from-handle-and-display-name.util'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { PersonObjectMetadata } from 'src/modules/person/standard-objects/person.object-metadata'; +import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; type ContactToCreate = { handle: string; @@ -50,16 +51,16 @@ export class CreateContactService { }); } - public async createContacts( + public async createPeople( contactsToCreate: ContactToCreate[], workspaceId: string, transactionManager?: EntityManager, - ): Promise { - if (contactsToCreate.length === 0) return; + ): Promise[]> { + if (contactsToCreate.length === 0) return []; const formattedContacts = this.formatContacts(contactsToCreate); - await this.personRepository.createPeople( + return await this.personRepository.createPeople( formattedContacts, workspaceId, transactionManager, diff --git a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/services/create-company-and-contact.service.ts b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/services/create-company-and-contact.service.ts index fd1fd186a..d3f5bbf2b 100644 --- a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/services/create-company-and-contact.service.ts +++ b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/services/create-company-and-contact.service.ts @@ -20,6 +20,7 @@ import { MessageParticipantService } from 'src/modules/messaging/services/messag import { CalendarEventParticipantService } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service'; import { filterOutContactsFromCompanyOrWorkspace } from 'src/modules/connected-account/auto-companies-and-contacts-creation/utils/filter-out-contacts-from-company-or-workspace.util'; import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; @Injectable() export class CreateCompanyAndContactService { @@ -37,14 +38,14 @@ export class CreateCompanyAndContactService { private readonly featureFlagRepository: Repository, ) {} - async createCompaniesAndContacts( + async createCompaniesAndPeople( connectedAccountHandle: string, contactsToCreate: Contacts, workspaceId: string, transactionManager?: EntityManager, - ) { + ): Promise[]> { if (!contactsToCreate || contactsToCreate.length === 0) { - return; + return []; } // TODO: This is a feature that may be implemented in the future @@ -68,7 +69,7 @@ export class CreateCompanyAndContactService { ); if (uniqueHandles.length === 0) { - return; + return []; } const alreadyCreatedContacts = await this.personRepository.getByEmails( @@ -120,7 +121,7 @@ export class CreateCompanyAndContactService { : undefined, })); - await this.createContactService.createContacts( + return await this.createContactService.createPeople( formattedContactsToCreate, workspaceId, transactionManager, @@ -139,7 +140,7 @@ export class CreateCompanyAndContactService { await workspaceDataSource?.transaction( async (transactionManager: EntityManager) => { - await this.createCompaniesAndContacts( + const createdPeople = await this.createCompaniesAndPeople( connectedAccountHandle, contactsToCreate, workspaceId, @@ -147,11 +148,13 @@ export class CreateCompanyAndContactService { ); await this.messageParticipantService.updateMessageParticipantsAfterPeopleCreation( + createdPeople, workspaceId, transactionManager, ); await this.calendarEventParticipantService.updateCalendarEventParticipantsAfterPeopleCreation( + createdPeople, workspaceId, transactionManager, ); diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts index 587619793..eabedddf9 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts @@ -5,8 +5,8 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module'; import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service'; +import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module'; import { MessageModule } from 'src/modules/messaging/services/message/message.module'; -import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module'; import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; @Module({ @@ -16,9 +16,10 @@ import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-obj ConnectedAccountObjectMetadata, MessageChannelObjectMetadata, ]), - SaveMessageAndEmitContactCreationEventModule, MessageModule, WorkspaceDataSourceModule, + MessageModule, + MessageParticipantModule, ], providers: [GmailFetchMessageContentFromCacheService], exports: [GmailFetchMessageContentFromCacheService], diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts index 8a3f06555..35556a7d1 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts @@ -17,7 +17,6 @@ import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/typ import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; -import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service'; import { GmailFullSyncJobData, GmailFullSyncJob, @@ -25,6 +24,13 @@ import { import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { GMAIL_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant'; +import { MessageParticipantService } from 'src/modules/messaging/services/message-participant/message-participant.service'; +import { MessageService } from 'src/modules/messaging/services/message/message.service'; +import { ParticipantWithMessageId } from 'src/modules/messaging/types/gmail-message'; +import { + CreateCompanyAndContactJobData, + CreateCompanyAndContactJob, +} from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job'; @Injectable() export class GmailFetchMessageContentFromCacheService { @@ -38,12 +44,13 @@ export class GmailFetchMessageContentFromCacheService { private readonly connectedAccountRepository: ConnectedAccountRepository, @InjectObjectMetadataRepository(MessageChannelObjectMetadata) private readonly messageChannelRepository: MessageChannelRepository, - private readonly saveMessageAndEmitContactCreationEventService: SaveMessageAndEmitContactCreationEventService, @InjectCacheStorage(CacheStorageNamespace.Messaging) private readonly cacheStorage: CacheStorageService, private readonly workspaceDataSourceService: WorkspaceDataSourceService, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, + private readonly messageService: MessageService, + private readonly messageParticipantService: MessageParticipantService, ) {} async fetchMessageContentFromCache( @@ -164,89 +171,131 @@ export class GmailFetchMessageContentFromCacheService { workspaceId, ); - await workspaceDataSource - ?.transaction(async (transactionManager: EntityManager) => { - const messageQueries = createQueriesFromMessageIds(messageIdsToFetch); + const messageQueries = createQueriesFromMessageIds(messageIdsToFetch); - const { messages: messagesToSave, errors } = - await this.fetchMessagesByBatchesService.fetchAllMessages( - messageQueries, - accessToken, + try { + const { messages: messagesToSave, errors } = + await this.fetchMessagesByBatchesService.fetchAllMessages( + messageQueries, + accessToken, + workspaceId, + connectedAccountId, + ); + + const participantsWithMessageId = await workspaceDataSource?.transaction( + async (transactionManager: EntityManager) => { + if (!messagesToSave.length) { + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannelId, + MessageChannelSyncStatus.PENDING, + workspaceId, + ); + + return []; + } + + if (errors.length) { + const errorsCanBeIgnored = errors.every( + (error) => error.code === 404, + ); + + if (!errorsCanBeIgnored) { + throw new Error( + `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${JSON.stringify( + errors, + null, + 2, + )}`, + ); + } + } + + const messageExternalIdsAndIdsMap = + await this.messageService.saveMessagesWithinTransaction( + messagesToSave, + connectedAccount, + gmailMessageChannel.id, + workspaceId, + transactionManager, + ); + + const participantsWithMessageId: (ParticipantWithMessageId & { + shouldCreateContact: boolean; + })[] = messagesToSave.flatMap((message) => { + const messageId = messageExternalIdsAndIdsMap.get( + message.externalId, + ); + + return messageId + ? message.participants.map((participant) => ({ + ...participant, + messageId, + shouldCreateContact: + gmailMessageChannel.isContactAutoCreationEnabled && + message.participants.find((p) => p.role === 'from') + ?.handle === connectedAccount.handle, + })) + : []; + }); + + await this.messageParticipantService.saveMessageParticipants( + participantsWithMessageId, workspaceId, - connectedAccountId, + transactionManager, ); - if (!messagesToSave.length) { - await this.messageChannelRepository.updateSyncStatus( - gmailMessageChannelId, - MessageChannelSyncStatus.PENDING, - workspaceId, - ); + if (messageIdsToFetch.length < GMAIL_USERS_MESSAGES_GET_BATCH_SIZE) { + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannelId, + MessageChannelSyncStatus.SUCCEEDED, + workspaceId, + transactionManager, + ); - return; - } + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with no more messages to import.`, + ); + } else { + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannelId, + MessageChannelSyncStatus.PENDING, + workspaceId, + transactionManager, + ); - if (errors.length) { - const errorsCanBeIgnored = errors.every( - (error) => error.code === 404, - ); - - if (!errorsCanBeIgnored) { - throw new Error( - `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${JSON.stringify( - errors, - null, - 2, - )}`, + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with more messages to import.`, ); } - } - await this.saveMessageAndEmitContactCreationEventService.saveMessagesAndEmitContactCreationEventWithinTransaction( - messagesToSave, - connectedAccount, - workspaceId, - gmailMessageChannel, - transactionManager, + return participantsWithMessageId; + }, + ); + + if (gmailMessageChannel.isContactAutoCreationEnabled) { + const contactsToCreate = participantsWithMessageId.filter( + (participant) => participant.shouldCreateContact, ); - if (messageIdsToFetch.length < GMAIL_USERS_MESSAGES_GET_BATCH_SIZE) { - await this.messageChannelRepository.updateSyncStatus( - gmailMessageChannelId, - MessageChannelSyncStatus.SUCCEEDED, + await this.messageQueueService.add( + CreateCompanyAndContactJob.name, + { workspaceId, - transactionManager, - ); - - this.logger.log( - `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with no more messages to import.`, - ); - } else { - await this.messageChannelRepository.updateSyncStatus( - gmailMessageChannelId, - MessageChannelSyncStatus.PENDING, - workspaceId, - transactionManager, - ); - - this.logger.log( - `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with more messages to import.`, - ); - } - }) - .catch(async (error) => { - await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:gmail:${gmailMessageChannelId}`, - messageIdsToFetch, + connectedAccountHandle: connectedAccount.handle, + contactsToCreate, + }, ); + } + } catch (error) { + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:gmail:${gmailMessageChannelId}`, + messageIdsToFetch, + ); - if (error?.message?.code === 429) { - this.logger.error( - `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: Resource has been exhausted, locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`, - ); - - return; - } + if (error?.message?.code === 429) { + this.logger.error( + `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: Resource has been exhausted, locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`, + ); await this.messageChannelRepository.updateSyncStatus( gmailMessageChannelId, @@ -257,7 +306,8 @@ export class GmailFetchMessageContentFromCacheService { throw new Error( `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`, ); - }); + } + } } private async fallbackToFullSync( diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module.ts index bcc4ef34d..cac4cdd6c 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module.ts @@ -10,7 +10,6 @@ import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fet import { GmailPartialSyncV2Service as GmailPartialSyncService } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service'; import { MessageModule } from 'src/modules/messaging/services/message/message.module'; import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module'; -import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module'; import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; @Module({ @@ -23,7 +22,6 @@ import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-obj BlocklistObjectMetadata, ]), MessageModule, - SaveMessageAndEmitContactCreationEventModule, TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), WorkspaceDataSourceModule, ], diff --git a/packages/twenty-server/src/modules/messaging/services/message-participant/message-participant.service.ts b/packages/twenty-server/src/modules/messaging/services/message-participant/message-participant.service.ts index 98b0a3489..ed446bb36 100644 --- a/packages/twenty-server/src/modules/messaging/services/message-participant/message-participant.service.ts +++ b/packages/twenty-server/src/modules/messaging/services/message-participant/message-participant.service.ts @@ -11,6 +11,7 @@ import { getFlattenedValuesAndValuesStringForBatchRawQuery } from 'src/modules/c import { MessageParticipantRepository } from 'src/modules/messaging/repositories/message-participant.repository'; import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata'; import { AddPersonIdAndWorkspaceMemberIdService } from 'src/modules/calendar-messaging-participant/services/add-person-id-and-workspace-member-id/add-person-id-and-workspace-member-id.service'; +import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; @Injectable() export class MessageParticipantService { @@ -24,13 +25,15 @@ export class MessageParticipantService { ) {} public async updateMessageParticipantsAfterPeopleCreation( + createdPeople: ObjectRecord[], workspaceId: string, transactionManager?: EntityManager, ): Promise { - const participants = - await this.messageParticipantRepository.getWithoutPersonIdAndWorkspaceMemberId( - workspaceId, - ); + const participants = await this.messageParticipantRepository.getByHandles( + createdPeople.map((person) => person.email), + workspaceId, + transactionManager, + ); if (!participants) return; diff --git a/packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module.ts b/packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module.ts deleted file mode 100644 index 22740ba9c..000000000 --- a/packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Module } from '@nestjs/common'; - -import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; -import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; -import { AutoCompaniesAndContactsCreationModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/auto-companies-and-contacts-creation.module'; -import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module'; -import { MessageModule } from 'src/modules/messaging/services/message/message.module'; -import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service'; -import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; - -@Module({ - imports: [ - MessageModule, - ObjectMetadataRepositoryModule.forFeature([MessageChannelObjectMetadata]), - AutoCompaniesAndContactsCreationModule, - MessageParticipantModule, - WorkspaceDataSourceModule, - ], - providers: [SaveMessageAndEmitContactCreationEventService], - exports: [SaveMessageAndEmitContactCreationEventService], -}) -export class SaveMessageAndEmitContactCreationEventModule {} diff --git a/packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service.ts b/packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service.ts deleted file mode 100644 index 7fc4587a7..000000000 --- a/packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service.ts +++ /dev/null @@ -1,219 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; -import { - GmailMessage, - ParticipantWithMessageId, -} from 'src/modules/messaging/types/gmail-message'; -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; -import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; -import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; -import { MessageService } from 'src/modules/messaging/services/message/message.service'; -import { MessageParticipantService } from 'src/modules/messaging/services/message-participant/message-participant.service'; -import { - CreateCompanyAndContactJobData, - CreateCompanyAndContactJob, -} from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job'; -import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; - -@Injectable() -export class SaveMessageAndEmitContactCreationEventService { - private readonly logger = new Logger( - SaveMessageAndEmitContactCreationEventService.name, - ); - - constructor( - private readonly messageService: MessageService, - @InjectObjectMetadataRepository(MessageChannelObjectMetadata) - private readonly messageChannelRepository: MessageChannelRepository, - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - private readonly messageParticipantService: MessageParticipantService, - @InjectMessageQueue(MessageQueue.emailQueue) - private readonly messageQueueService: MessageQueueService, - ) {} - - public async saveMessagesAndEmitContactCreationEventWithinTransaction( - messagesToSave: GmailMessage[], - connectedAccount: ObjectRecord, - workspaceId: string, - gmailMessageChannel: ObjectRecord, - transactionManager: EntityManager, - ) { - const messageExternalIdsAndIdsMap = - await this.messageService.saveMessagesWithinTransaction( - messagesToSave, - connectedAccount, - gmailMessageChannel.id, - workspaceId, - transactionManager, - ); - - const participantsWithMessageId: (ParticipantWithMessageId & { - shouldCreateContact: boolean; - })[] = messagesToSave.flatMap((message) => { - const messageId = messageExternalIdsAndIdsMap.get(message.externalId); - - return messageId - ? message.participants.map((participant) => ({ - ...participant, - messageId, - shouldCreateContact: - gmailMessageChannel.isContactAutoCreationEnabled && - message.participants.find((p) => p.role === 'from')?.handle === - connectedAccount.handle, - })) - : []; - }); - - await this.messageParticipantService.saveMessageParticipants( - participantsWithMessageId, - workspaceId, - transactionManager, - ); - - if (gmailMessageChannel.isContactAutoCreationEnabled) { - const contactsToCreate = participantsWithMessageId.filter( - (participant) => participant.shouldCreateContact, - ); - - await this.messageQueueService.add( - CreateCompanyAndContactJob.name, - { - workspaceId, - connectedAccountHandle: connectedAccount.handle, - contactsToCreate, - }, - ); - } - } - - async saveMessagesAndEmitContactCreation( - messagesToSave: GmailMessage[], - connectedAccount: ObjectRecord, - workspaceId: string, - gmailMessageChannelId: string, - ) { - const { dataSource: workspaceDataSource } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); - - let startTime = Date.now(); - - const messageExternalIdsAndIdsMap = await this.messageService.saveMessages( - messagesToSave, - workspaceDataSource, - connectedAccount, - gmailMessageChannelId, - workspaceId, - ); - - let endTime = Date.now(); - - this.logger.log( - `Saving messages for workspace ${workspaceId} and account ${ - connectedAccount.id - } in ${endTime - startTime}ms`, - ); - - const gmailMessageChannel = - await this.messageChannelRepository.getFirstByConnectedAccountId( - connectedAccount.id, - workspaceId, - ); - - if (!gmailMessageChannel) { - this.logger.error( - `No message channel found for connected account ${connectedAccount.id} in workspace ${workspaceId} in saveMessagesAndCreateContacts`, - ); - - return; - } - - const participantsWithMessageId: (ParticipantWithMessageId & { - shouldCreateContact: boolean; - })[] = messagesToSave.flatMap((message) => { - const messageId = messageExternalIdsAndIdsMap.get(message.externalId); - - return messageId - ? message.participants.map((participant) => ({ - ...participant, - messageId, - shouldCreateContact: - gmailMessageChannel.isContactAutoCreationEnabled && - message.participants.find((p) => p.role === 'from')?.handle === - connectedAccount.handle, - })) - : []; - }); - - startTime = Date.now(); - - await this.tryToSaveMessageParticipantsOrDeleteMessagesIfError( - participantsWithMessageId, - gmailMessageChannel, - workspaceId, - connectedAccount, - ); - - endTime = Date.now(); - - this.logger.log( - `Saving message participants for workspace ${workspaceId} and account in ${ - connectedAccount.id - } ${endTime - startTime}ms`, - ); - } - - private async tryToSaveMessageParticipantsOrDeleteMessagesIfError( - participantsWithMessageId: (ParticipantWithMessageId & { - shouldCreateContact: boolean; - })[], - gmailMessageChannel: ObjectRecord, - workspaceId: string, - connectedAccount: ObjectRecord, - ) { - try { - await this.messageParticipantService.saveMessageParticipants( - participantsWithMessageId, - workspaceId, - ); - - if (gmailMessageChannel.isContactAutoCreationEnabled) { - const contactsToCreate = participantsWithMessageId.filter( - (participant) => participant.shouldCreateContact, - ); - - await this.messageQueueService.add( - CreateCompanyAndContactJob.name, - { - workspaceId, - connectedAccountHandle: connectedAccount.handle, - contactsToCreate, - }, - ); - } - } catch (error) { - this.logger.error( - `Error saving message participants for workspace ${workspaceId} and account ${connectedAccount.id}`, - error, - ); - - const messagesToDelete = participantsWithMessageId.map( - (participant) => participant.messageId, - ); - - await this.messageService.deleteMessages( - messagesToDelete, - gmailMessageChannel.id, - workspaceId, - ); - } - } -} diff --git a/packages/twenty-server/src/modules/person/repositories/person.repository.ts b/packages/twenty-server/src/modules/person/repositories/person.repository.ts index cc66dc1a3..d191d0a54 100644 --- a/packages/twenty-server/src/modules/person/repositories/person.repository.ts +++ b/packages/twenty-server/src/modules/person/repositories/person.repository.ts @@ -56,7 +56,7 @@ export class PersonRepository { }[], workspaceId: string, transactionManager?: EntityManager, - ): Promise { + ): Promise[]> { const dataSourceSchema = this.workspaceDataSourceService.getSchemaName(workspaceId); @@ -81,7 +81,7 @@ export class PersonRepository { }); return await this.workspaceDataSourceService.executeRawQuery( - `INSERT INTO ${dataSourceSchema}.person (id, email, "nameFirstName", "nameLastName", "companyId", "position") VALUES ${valuesString}`, + `INSERT INTO ${dataSourceSchema}.person (id, email, "nameFirstName", "nameLastName", "companyId", "position") VALUES ${valuesString} RETURNING *`, flattenedValues, workspaceId, transactionManager,