From 4476f5215b9cb42f2168077b4d9daf62ade4aa20 Mon Sep 17 00:00:00 2001 From: Weiko Date: Tue, 12 Mar 2024 17:49:45 +0100 Subject: [PATCH] [messaging] Fix thread cleaner service subqueries (#4416) * [messaging] Fix thread cleaner service subqueries * add pagination * various fixes * Fix thread merging * fix * fix --- .../auth/services/google-gmail.service.ts | 1 - .../messaging/jobs/gmail-full-sync.job.ts | 18 +++-- .../messaging/jobs/gmail-partial-sync.job.ts | 18 +++-- .../connected-account.service.ts | 24 +++++-- ...age-channel-message-association.service.ts | 12 ---- .../message-channel.service.ts | 18 ++++- .../message-thread/message-thread.module.ts | 9 ++- .../message-thread/message-thread.service.ts | 46 ++++++++---- .../repositories/message/message.module.ts | 4 +- .../repositories/message/message.service.ts | 70 +++++++++++++------ .../services/gmail-full-sync.service.ts | 22 +++++- .../services/gmail-partial-sync.service.ts | 20 +++++- .../gmail-refresh-access-token.service.ts | 8 ++- ...ve-messages-and-create-contacts.service.ts | 15 +++- .../thread-cleaner/thread-cleaner.service.ts | 59 +++++----------- .../delete-using-pagination.util.spec.ts | 44 ++++++++++++ .../utils/delete-using-pagination.util.ts | 38 ++++++++++ 17 files changed, 311 insertions(+), 115 deletions(-) create mode 100644 packages/twenty-server/src/workspace/messaging/services/thread-cleaner/utils/delete-using-pagination.util.spec.ts create mode 100644 packages/twenty-server/src/workspace/messaging/services/thread-cleaner/utils/delete-using-pagination.util.ts diff --git a/packages/twenty-server/src/core/auth/services/google-gmail.service.ts b/packages/twenty-server/src/core/auth/services/google-gmail.service.ts index 25e33c208..8af8605ff 100644 --- a/packages/twenty-server/src/core/auth/services/google-gmail.service.ts +++ b/packages/twenty-server/src/core/auth/services/google-gmail.service.ts @@ -79,7 +79,6 @@ export class GoogleGmailService { connectedAccountId, }, { - id: connectedAccountId, retryLimit: 2, }, ); diff --git a/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts b/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts index 3ab9f5bb8..764ba2f62 100644 --- a/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts +++ b/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts @@ -26,10 +26,20 @@ export class GmailFullSyncJob implements MessageQueueJob { data.connectedAccountId } ${data.nextPageToken ? `and ${data.nextPageToken} pageToken` : ''}`, ); - await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken( - data.workspaceId, - data.connectedAccountId, - ); + + try { + await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken( + data.workspaceId, + data.connectedAccountId, + ); + } catch (e) { + this.logger.error( + `Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`, + e, + ); + + return; + } await this.gmailFullSyncService.fetchConnectedAccountThreads( data.workspaceId, diff --git a/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts b/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts index f8169517f..057b5f762 100644 --- a/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts +++ b/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts @@ -25,10 +25,20 @@ export class GmailPartialSyncJob this.logger.log( `gmail partial-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`, ); - await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken( - data.workspaceId, - data.connectedAccountId, - ); + + try { + await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken( + data.workspaceId, + data.connectedAccountId, + ); + } catch (e) { + this.logger.error( + `Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`, + e, + ); + + return; + } await this.gmailPartialSyncService.fetchConnectedAccountThreads( data.workspaceId, diff --git a/packages/twenty-server/src/workspace/messaging/repositories/connected-account/connected-account.service.ts b/packages/twenty-server/src/workspace/messaging/repositories/connected-account/connected-account.service.ts index c20fd9529..dc32ede6e 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/connected-account/connected-account.service.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/connected-account/connected-account.service.ts @@ -43,11 +43,11 @@ export class ConnectedAccountService { ); } - public async getByIdOrFail( + public async getById( connectedAccountId: string, workspaceId: string, transactionManager?: EntityManager, - ): Promise> { + ): Promise | undefined> { const dataSourceSchema = this.workspaceDataSourceService.getSchemaName(workspaceId); @@ -59,13 +59,27 @@ export class ConnectedAccountService { transactionManager, ); - if (!connectedAccounts || connectedAccounts.length === 0) { + return connectedAccounts[0]; + } + + public async getByIdOrFail( + connectedAccountId: string, + workspaceId: string, + transactionManager?: EntityManager, + ): Promise> { + const connectedAccount = await this.getById( + connectedAccountId, + workspaceId, + transactionManager, + ); + + if (!connectedAccount) { throw new NotFoundException( - `No connected account found for id ${connectedAccountId} in workspace ${workspaceId}`, + `Connected account with id ${connectedAccountId} not found in workspace ${workspaceId}`, ); } - return connectedAccounts[0]; + return connectedAccount; } public async updateLastSyncHistoryId( diff --git a/packages/twenty-server/src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service.ts b/packages/twenty-server/src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service.ts index 18649d722..15a4edefd 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service.ts @@ -84,18 +84,6 @@ export class MessageChannelMessageAssociationService { ); } - public async deleteByMessageChannelId( - messageChannelId: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - this.deleteByMessageChannelIds( - [messageChannelId], - workspaceId, - transactionManager, - ); - } - public async deleteByMessageChannelIds( messageChannelIds: string[], workspaceId: string, diff --git a/packages/twenty-server/src/workspace/messaging/repositories/message-channel/message-channel.service.ts b/packages/twenty-server/src/workspace/messaging/repositories/message-channel/message-channel.service.ts index 1ca85c5b1..4754afe91 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/message-channel/message-channel.service.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/message-channel/message-channel.service.ts @@ -32,17 +32,29 @@ export class MessageChannelService { connectedAccountId: string, workspaceId: string, ): Promise> { - const messageChannels = await this.getByConnectedAccountId( + const messageChannel = await this.getFirstByConnectedAccountId( connectedAccountId, workspaceId, ); - if (!messageChannels || messageChannels.length === 0) { + if (!messageChannel) { throw new Error( - `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`, + `Message channel for connected account ${connectedAccountId} not found in workspace ${workspaceId}`, ); } + return messageChannel; + } + + public async getFirstByConnectedAccountId( + connectedAccountId: string, + workspaceId: string, + ): Promise | undefined> { + const messageChannels = await this.getByConnectedAccountId( + connectedAccountId, + workspaceId, + ); + return messageChannels[0]; } diff --git a/packages/twenty-server/src/workspace/messaging/repositories/message-thread/message-thread.module.ts b/packages/twenty-server/src/workspace/messaging/repositories/message-thread/message-thread.module.ts index 4d4cfc932..547ba1caa 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/message-thread/message-thread.module.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/message-thread/message-thread.module.ts @@ -1,11 +1,16 @@ -import { Module } from '@nestjs/common'; +import { Module, forwardRef } from '@nestjs/common'; import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-assocation.module'; import { MessageThreadService } from 'src/workspace/messaging/repositories/message-thread/message-thread.service'; +import { MessageModule } from 'src/workspace/messaging/repositories/message/message.module'; import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; @Module({ - imports: [WorkspaceDataSourceModule, MessageChannelMessageAssociationModule], + imports: [ + WorkspaceDataSourceModule, + MessageChannelMessageAssociationModule, + forwardRef(() => MessageModule), + ], providers: [MessageThreadService], exports: [MessageThreadService], }) diff --git a/packages/twenty-server/src/workspace/messaging/repositories/message-thread/message-thread.service.ts b/packages/twenty-server/src/workspace/messaging/repositories/message-thread/message-thread.service.ts index eb0b98e9c..be51802ae 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/message-thread/message-thread.service.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/message-thread/message-thread.service.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Inject, Injectable, forwardRef } from '@nestjs/common'; import { EntityManager } from 'typeorm'; import { v4 } from 'uuid'; @@ -6,33 +6,38 @@ import { v4 } from 'uuid'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service'; -import { MessageThreadObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-thread.object-metadata'; -import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; +import { MessageService } from 'src/workspace/messaging/repositories/message/message.service'; @Injectable() export class MessageThreadService { constructor( private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, private readonly workspaceDataSourceService: WorkspaceDataSourceService, + @Inject(forwardRef(() => MessageService)) + private readonly messageService: MessageService, ) {} - public async getOrphanThreads( + public async getOrphanThreadIdsPaginated( + limit: number, + offset: number, workspaceId: string, transactionManager?: EntityManager, - ): Promise[]> { + ): Promise { const dataSourceSchema = this.workspaceDataSourceService.getSchemaName(workspaceId); - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT mt.* FROM ${dataSourceSchema}."messageThread" mt - WHERE NOT EXISTS ( - SELECT 1 FROM ${dataSourceSchema}."message" m - WHERE m."messageThreadId" = mt.id - )`, - [], + const orphanThreads = await this.workspaceDataSourceService.executeRawQuery( + `SELECT mt.id + FROM ${dataSourceSchema}."messageThread" mt + LEFT JOIN ${dataSourceSchema}."message" m ON mt.id = m."messageThreadId" + WHERE m."messageThreadId" IS NULL + LIMIT $1 OFFSET $2`, + [limit, offset], workspaceId, transactionManager, ); + + return orphanThreads.map(({ id }) => id); } public async deleteByIds( @@ -52,11 +57,13 @@ export class MessageThreadService { } public async saveMessageThreadOrReturnExistingMessageThread( + headerMessageId: string, messageThreadExternalId: string, dataSourceMetadata: DataSourceEntity, workspaceId: string, manager: EntityManager, ) { + // Check if message thread already exists via threadExternalId const existingMessageChannelMessageAssociationByMessageThreadExternalId = await this.messageChannelMessageAssociationService.getFirstByMessageThreadExternalId( messageThreadExternalId, @@ -71,6 +78,21 @@ export class MessageThreadService { return Promise.resolve(existingMessageThread); } + // Check if message thread already exists via existing message headerMessageId + const existingMessageWithSameHeaderMessageId = + await this.messageService.getFirstOrNullByHeaderMessageId( + headerMessageId, + workspaceId, + manager, + ); + + if (existingMessageWithSameHeaderMessageId) { + return Promise.resolve( + existingMessageWithSameHeaderMessageId.messageThreadId, + ); + } + + // If message thread does not exist, create new message thread const newMessageThreadId = v4(); await manager.query( diff --git a/packages/twenty-server/src/workspace/messaging/repositories/message/message.module.ts b/packages/twenty-server/src/workspace/messaging/repositories/message/message.module.ts index e7b1fed3d..202352818 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/message/message.module.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/message/message.module.ts @@ -1,4 +1,4 @@ -import { Module } from '@nestjs/common'; +import { Module, forwardRef } from '@nestjs/common'; import { MessageChannelModule } from 'src/workspace/messaging/repositories/message-channel/message-channel.module'; import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-assocation.module'; @@ -11,7 +11,7 @@ import { CreateCompaniesAndContactsModule } from 'src/workspace/messaging/servic @Module({ imports: [ WorkspaceDataSourceModule, - MessageThreadModule, + forwardRef(() => MessageThreadModule), MessageParticipantModule, MessageChannelMessageAssociationModule, MessageChannelModule, diff --git a/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts b/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts index 45002250c..f64740a65 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Inject, Injectable, Logger, forwardRef } from '@nestjs/common'; import { DataSource, EntityManager } from 'typeorm'; import { v4 } from 'uuid'; @@ -9,42 +9,47 @@ import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; import { GmailMessage } from 'src/workspace/messaging/types/gmail-message'; import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata'; -import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service'; import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/repositories/message-channel-message-association/message-channel-message-association.service'; -import { MessageParticipantService } from 'src/workspace/messaging/repositories/message-participant/message-participant.service'; import { MessageThreadService } from 'src/workspace/messaging/repositories/message-thread/message-thread.service'; -import { CreateCompaniesAndContactsService } from 'src/workspace/messaging/services/create-companies-and-contacts/create-companies-and-contacts.service'; +import { MessageChannelService } from 'src/workspace/messaging/repositories/message-channel/message-channel.service'; + @Injectable() export class MessageService { + private readonly logger = new Logger(MessageService.name); + constructor( private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, + @Inject(forwardRef(() => MessageThreadService)) private readonly messageThreadService: MessageThreadService, - private readonly messageParticipantService: MessageParticipantService, private readonly messageChannelService: MessageChannelService, - private readonly createCompaniesAndContactsService: CreateCompaniesAndContactsService, ) {} - public async getNonAssociatedMessages( + public async getNonAssociatedMessageIdsPaginated( + limit: number, + offset: number, workspaceId: string, transactionManager?: EntityManager, - ): Promise[]> { + ): Promise { const dataSourceSchema = this.workspaceDataSourceService.getSchemaName(workspaceId); - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT m.* FROM ${dataSourceSchema}."message" m - WHERE NOT EXISTS ( - SELECT 1 FROM ${dataSourceSchema}."messageChannelMessageAssociation" mcma - WHERE mcma."messageId" = m.id - )`, - [], - workspaceId, - transactionManager, - ); + const nonAssociatedMessages = + await this.workspaceDataSourceService.executeRawQuery( + `SELECT m.id FROM ${dataSourceSchema}."message" m + LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" mcma + ON m.id = mcma."messageId" + WHERE mcma.id IS NULL + LIMIT $1 OFFSET $2`, + [limit, offset], + workspaceId, + transactionManager, + ); + + return nonAssociatedMessages.map(({ id }) => id); } - public async getFirstByHeaderMessageId( + public async getFirstOrNullByHeaderMessageId( headerMessageId: string, workspaceId: string, transactionManager?: EntityManager, @@ -125,9 +130,32 @@ export class MessageService { const messageExternalIdsAndIdsMap = new Map(); try { + let keepImporting = true; + for (const message of messages) { + if (!keepImporting) { + break; + } + await workspaceDataSource?.transaction( async (manager: EntityManager) => { + const gmailMessageChannel = + await this.messageChannelService.getByIds( + [gmailMessageChannelId], + workspaceId, + manager, + ); + + if (gmailMessageChannel.length === 0) { + this.logger.error( + `No message channel found for connected account ${connectedAccount.id} in workspace ${workspaceId} in saveMessages`, + ); + + keepImporting = false; + + return; + } + const existingMessageChannelMessageAssociationsCount = await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId( [message.externalId], @@ -140,8 +168,10 @@ export class MessageService { return; } + // TODO: This does not handle all thread merging use cases and might create orphan threads. const savedOrExistingMessageThreadId = await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread( + message.headerMessageId, message.messageThreadExternalId, dataSourceMetadata, workspaceId, @@ -193,7 +223,7 @@ export class MessageService { workspaceId: string, manager: EntityManager, ): Promise { - const existingMessage = await this.getFirstByHeaderMessageId( + const existingMessage = await this.getFirstOrNullByHeaderMessageId( message.headerMessageId, workspaceId, ); diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts index 047be4ec8..9e37b317d 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts @@ -46,11 +46,19 @@ export class GmailFullSyncService { connectedAccountId: string, nextPageToken?: string, ): Promise { - const connectedAccount = await this.connectedAccountService.getByIdOrFail( + const connectedAccount = await this.connectedAccountService.getById( connectedAccountId, workspaceId, ); + if (!connectedAccount) { + this.logger.error( + `Connected account ${connectedAccountId} not found in workspace ${workspaceId} during full-sync`, + ); + + return; + } + const accessToken = connectedAccount.accessToken; const refreshToken = connectedAccount.refreshToken; const workspaceMemberId = connectedAccount.accountOwnerId; @@ -62,11 +70,19 @@ export class GmailFullSyncService { } const gmailMessageChannel = - await this.messageChannelService.getFirstByConnectedAccountIdOrFail( + await this.messageChannelService.getFirstByConnectedAccountId( connectedAccountId, workspaceId, ); + if (!gmailMessageChannel) { + this.logger.error( + `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId} during full-syn`, + ); + + return; + } + const gmailMessageChannelId = gmailMessageChannel.id; const gmailClient = @@ -173,7 +189,7 @@ export class GmailFullSyncService { ); if (messagesToSave.length > 0) { - this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts( + await this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts( messagesToSave, connectedAccount, workspaceId, diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts index 7ce6419fd..28cc43fa6 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts @@ -48,11 +48,19 @@ export class GmailPartialSyncService { connectedAccountId: string, maxResults = 500, ): Promise { - const connectedAccount = await this.connectedAccountService.getByIdOrFail( + const connectedAccount = await this.connectedAccountService.getById( connectedAccountId, workspaceId, ); + if (!connectedAccount) { + this.logger.error( + `Connected account ${connectedAccountId} not found in workspace ${workspaceId} during partial-sync`, + ); + + return; + } + const lastSyncHistoryId = connectedAccount.lastSyncHistoryId; if (!lastSyncHistoryId) { @@ -135,11 +143,19 @@ export class GmailPartialSyncService { } const gmailMessageChannel = - await this.messageChannelService.getFirstByConnectedAccountIdOrFail( + await this.messageChannelService.getFirstByConnectedAccountId( connectedAccountId, workspaceId, ); + if (!gmailMessageChannel) { + this.logger.error( + `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId} during partial-sync`, + ); + + return; + } + const gmailMessageChannelId = gmailMessageChannel.id; const { messagesAdded, messagesDeleted } = diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts index 7a01b663b..a94a372ef 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts @@ -16,11 +16,17 @@ export class GmailRefreshAccessTokenService { workspaceId: string, connectedAccountId: string, ): Promise { - const connectedAccount = await this.connectedAccountService.getByIdOrFail( + const connectedAccount = await this.connectedAccountService.getById( connectedAccountId, workspaceId, ); + if (!connectedAccount) { + throw new Error( + `No connected account found for ${connectedAccountId} in workspace ${workspaceId}`, + ); + } + const refreshToken = connectedAccount.refreshToken; if (!refreshToken) { diff --git a/packages/twenty-server/src/workspace/messaging/services/save-messages-and-create-contacts.service.ts b/packages/twenty-server/src/workspace/messaging/services/save-messages-and-create-contacts.service.ts index 380b43136..c47832ca6 100644 --- a/packages/twenty-server/src/workspace/messaging/services/save-messages-and-create-contacts.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/save-messages-and-create-contacts.service.ts @@ -59,12 +59,23 @@ export class SaveMessagesAndCreateContactsService { } in ${endTime - startTime}ms`, ); - const isContactAutoCreationEnabled = - await this.messageChannelService.getIsContactAutoCreationEnabledByConnectedAccountIdOrFail( + const gmailMessageChannel = + await this.messageChannelService.getFirstByConnectedAccountId( connectedAccount.id, workspaceId, ); + if (!gmailMessageChannel) { + this.logger.error( + `No message channel found for connected account ${connectedAccount.id} in workspace ${workspaceId} in saveMessagesAndCreateContacts`, + ); + + return; + } + + const isContactAutoCreationEnabled = + gmailMessageChannel.isContactAutoCreationEnabled; + const participantsWithMessageId: ParticipantWithMessageId[] = messagesToSave.flatMap((message) => { const messageId = messageExternalIdsAndIdsMap.get(message.externalId); diff --git a/packages/twenty-server/src/workspace/messaging/services/thread-cleaner/thread-cleaner.service.ts b/packages/twenty-server/src/workspace/messaging/services/thread-cleaner/thread-cleaner.service.ts index 2d07c8cb7..04d684bbc 100644 --- a/packages/twenty-server/src/workspace/messaging/services/thread-cleaner/thread-cleaner.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/thread-cleaner/thread-cleaner.service.ts @@ -4,6 +4,7 @@ import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { MessageThreadService } from 'src/workspace/messaging/repositories/message-thread/message-thread.service'; import { MessageService } from 'src/workspace/messaging/repositories/message/message.service'; +import { deleteUsingPagination } from 'src/workspace/messaging/services/thread-cleaner/utils/delete-using-pagination.util'; @Injectable() export class ThreadCleanerService { @@ -15,48 +16,22 @@ export class ThreadCleanerService { ) {} public async cleanWorkspaceThreads(workspaceId: string) { - const dataSourceMetadata = - await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( - workspaceId, - ); + await deleteUsingPagination( + workspaceId, + 500, + this.messageService.getNonAssociatedMessageIdsPaginated.bind( + this.messageService, + ), + this.messageService.deleteByIds.bind(this.messageService), + ); - const workspaceDataSource = - await this.typeORMService.connectToDataSource(dataSourceMetadata); - - await workspaceDataSource?.transaction(async (transactionManager) => { - const messagesToDelete = - await this.messageService.getNonAssociatedMessages( - workspaceId, - transactionManager, - ); - - const messageIdsToDelete = messagesToDelete.map(({ id }) => id); - - if (messageIdsToDelete.length > 0) { - await this.messageService.deleteByIds( - messageIdsToDelete, - workspaceId, - transactionManager, - ); - } - - const messageThreadsToDelete = - await this.messageThreadService.getOrphanThreads( - workspaceId, - transactionManager, - ); - - const messageThreadToDeleteIds = messageThreadsToDelete.map( - ({ id }) => id, - ); - - if (messageThreadToDeleteIds.length > 0) { - await this.messageThreadService.deleteByIds( - messageThreadToDeleteIds, - workspaceId, - transactionManager, - ); - } - }); + await deleteUsingPagination( + workspaceId, + 500, + this.messageThreadService.getOrphanThreadIdsPaginated.bind( + this.messageThreadService, + ), + this.messageThreadService.deleteByIds.bind(this.messageThreadService), + ); } } diff --git a/packages/twenty-server/src/workspace/messaging/services/thread-cleaner/utils/delete-using-pagination.util.spec.ts b/packages/twenty-server/src/workspace/messaging/services/thread-cleaner/utils/delete-using-pagination.util.spec.ts new file mode 100644 index 000000000..cca39a8d6 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/services/thread-cleaner/utils/delete-using-pagination.util.spec.ts @@ -0,0 +1,44 @@ +import { deleteUsingPagination } from './delete-using-pagination.util'; + +describe('deleteUsingPagination', () => { + it('should delete items using pagination', async () => { + const workspaceId = 'workspace123'; + const batchSize = 10; + const getterPaginated = jest + .fn() + .mockResolvedValueOnce(['id1', 'id2']) + .mockResolvedValueOnce([]); + const deleter = jest.fn(); + const transactionManager = undefined; + + await deleteUsingPagination( + workspaceId, + batchSize, + getterPaginated, + deleter, + transactionManager, + ); + + expect(getterPaginated).toHaveBeenNthCalledWith( + 1, + batchSize, + 0, + workspaceId, + transactionManager, + ); + expect(getterPaginated).toHaveBeenNthCalledWith( + 2, + batchSize, + batchSize, + workspaceId, + transactionManager, + ); + expect(deleter).toHaveBeenNthCalledWith( + 1, + ['id1', 'id2'], + workspaceId, + transactionManager, + ); + expect(deleter).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/twenty-server/src/workspace/messaging/services/thread-cleaner/utils/delete-using-pagination.util.ts b/packages/twenty-server/src/workspace/messaging/services/thread-cleaner/utils/delete-using-pagination.util.ts new file mode 100644 index 000000000..86564bd43 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/services/thread-cleaner/utils/delete-using-pagination.util.ts @@ -0,0 +1,38 @@ +import { EntityManager } from 'typeorm'; + +export const deleteUsingPagination = async ( + workspaceId: string, + batchSize: number, + getterPaginated: ( + limit: number, + offset: number, + workspaceId: string, + transactionManager?: EntityManager, + ) => Promise, + deleter: ( + ids: string[], + workspaceId: string, + transactionManager?: EntityManager, + ) => Promise, + transactionManager?: EntityManager, +) => { + let offset = 0; + let hasMoreData = true; + + while (hasMoreData) { + const idsToDelete = await getterPaginated( + batchSize, + offset, + workspaceId, + transactionManager, + ); + + if (idsToDelete.length > 0) { + await deleter(idsToDelete, workspaceId, transactionManager); + } else { + hasMoreData = false; + } + + offset += batchSize; + } +};