diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts index 00f146f7e..7d498ecdb 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts @@ -1,4 +1,4 @@ -import { Logger } from '@nestjs/common'; +import { Logger, Scope } from '@nestjs/common'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; @@ -17,7 +17,10 @@ export type BlocklistItemDeleteMessagesJobData = { blocklistItemId: string; }; -@Processor(MessageQueue.messagingQueue) +@Processor({ + queueName: MessageQueue.messagingQueue, + scope: Scope.REQUEST, +}) export class BlocklistItemDeleteMessagesJob { private readonly logger = new Logger(BlocklistItemDeleteMessagesJob.name); diff --git a/packages/twenty-server/src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job.ts b/packages/twenty-server/src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job.ts index fbeb4f88a..c9cc4e9e9 100644 --- a/packages/twenty-server/src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job.ts @@ -1,4 +1,4 @@ -import { Logger } from '@nestjs/common'; +import { Logger, Scope } from '@nestjs/common'; import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; @@ -10,7 +10,10 @@ export type MessagingConnectedAccountDeletionCleanupJobData = { connectedAccountId: string; }; -@Processor(MessageQueue.messagingQueue) +@Processor({ + queueName: MessageQueue.messagingQueue, + scope: Scope.REQUEST, +}) export class MessagingConnectedAccountDeletionCleanupJob { private readonly logger = new Logger( MessagingConnectedAccountDeletionCleanupJob.name, diff --git a/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts b/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts index ef28ce0ef..ae4221a85 100644 --- a/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-cleaner/messaging-message-cleaner.module.ts @@ -1,6 +1,6 @@ import { Module } from '@nestjs/common'; -import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; +import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module'; import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity'; import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; import { MessagingConnectedAccountDeletionCleanupJob } from 'src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job'; @@ -9,7 +9,7 @@ import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cl @Module({ imports: [ - ObjectMetadataRepositoryModule.forFeature([ + TwentyORMModule.forFeature([ MessageWorkspaceEntity, MessageThreadWorkspaceEntity, ]), diff --git a/packages/twenty-server/src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service.ts b/packages/twenty-server/src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service.ts index 19a9d6b2d..2642baa93 100644 --- a/packages/twenty-server/src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service.ts @@ -1,8 +1,9 @@ import { Injectable } from '@nestjs/common'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { MessageThreadRepository } from 'src/modules/messaging/common/repositories/message-thread.repository'; -import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository'; +import { EntityManager, IsNull } from 'typeorm'; + +import { InjectWorkspaceRepository } from 'src/engine/twenty-orm/decorators/inject-workspace-repository.decorator'; +import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity'; import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; import { deleteUsingPagination } from 'src/modules/messaging/message-cleaner/utils/delete-using-pagination.util'; @@ -10,31 +11,73 @@ import { deleteUsingPagination } from 'src/modules/messaging/message-cleaner/uti @Injectable() export class MessagingMessageCleanerService { constructor( - @InjectObjectMetadataRepository(MessageWorkspaceEntity) - private readonly messageRepository: MessageRepository, - @InjectObjectMetadataRepository(MessageThreadWorkspaceEntity) - private readonly messageThreadRepository: MessageThreadRepository, + @InjectWorkspaceRepository(MessageWorkspaceEntity) + private readonly messageRepository: WorkspaceRepository, + @InjectWorkspaceRepository(MessageThreadWorkspaceEntity) + private readonly messageThreadRepository: WorkspaceRepository, ) {} public async cleanWorkspaceThreads(workspaceId: string) { await deleteUsingPagination( workspaceId, 500, - this.messageRepository.getNonAssociatedMessageIdsPaginated.bind( - this.messageRepository, - ), - this.messageRepository.deleteByIds.bind(this.messageRepository), + async ( + limit: number, + offset: number, + workspaceId: string, + transactionManager?: EntityManager, + ) => { + const nonAssociatedMessages = await this.messageRepository.find( + { + where: { + messageChannelMessageAssociations: IsNull(), + }, + take: limit, + skip: offset, + }, + transactionManager, + ); + + return nonAssociatedMessages.map(({ id }) => id); + }, + async ( + ids: string[], + workspaceId: string, + transactionManager?: EntityManager, + ) => { + await this.messageRepository.delete(ids, transactionManager); + }, ); await deleteUsingPagination( workspaceId, 500, - this.messageThreadRepository.getOrphanThreadIdsPaginated.bind( - this.messageThreadRepository, - ), - this.messageThreadRepository.deleteByIds.bind( - this.messageThreadRepository, - ), + async ( + limit: number, + offset: number, + workspaceId: string, + transactionManager?: EntityManager, + ) => { + const orphanThreads = await this.messageThreadRepository.find( + { + where: { + messages: IsNull(), + }, + take: limit, + skip: offset, + }, + transactionManager, + ); + + return orphanThreads.map(({ id }) => id); + }, + async ( + ids: string[], + workspaceId: string, + transactionManager?: EntityManager, + ) => { + await this.messageThreadRepository.delete(ids, transactionManager); + }, ); } }