From 018b8220dc315eecb5097576a1563e03c2d7dd12 Mon Sep 17 00:00:00 2001 From: bosiraphael <71827178+bosiraphael@users.noreply.github.com> Date: Tue, 6 Aug 2024 18:19:59 +0200 Subject: [PATCH] Remove message thread id from mcma and update scripts (#6500) - Remove `messageThreadId` from `messageChannelMessageAssociation` - Update thread merging - Update all queries which were dependent on this field - Update some raw queries by using `twentyORM` instead --------- Co-authored-by: Weiko --- .../message-channel-message-associations.ts | 5 - .../workspace-migration-runner.service.ts | 15 +- .../message-find-many.pre-query.hook.ts | 28 +- .../message-find-one.pre-query-hook.ts | 26 +- ...-channel-message-association.repository.ts | 192 ------------ ...el-message-association.workspace-entity.ts | 33 +- .../message-thread.workspace-entity.ts | 16 - .../message.workspace-entity.ts | 2 +- .../messaging-import-manager.module.ts | 2 - ...ssaging-full-message-list-fetch.service.ts | 25 +- .../messaging-message-thread.service.ts | 94 ------ .../services/messaging-message.service.ts | 287 ++++++------------ ...ging-partial-message-list-fetch.service.ts | 22 +- ...es-and-enqueue-contact-creation.service.ts | 1 - 14 files changed, 177 insertions(+), 571 deletions(-) delete mode 100644 packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-thread.service.ts diff --git a/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channel-message-associations.ts b/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channel-message-associations.ts index 2bc170306..3e857301a 100644 --- a/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channel-message-associations.ts +++ b/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channel-message-associations.ts @@ -2,7 +2,6 @@ import { EntityManager } from 'typeorm'; import { DEV_SEED_MESSAGE_CHANNEL_IDS } from 'src/database/typeorm-seeds/workspace/message-channels'; import { DEV_SEED_MESSAGE_IDS } from 'src/database/typeorm-seeds/workspace/messages'; -import { DEV_SEED_MESSAGE_THREAD_IDS } from 'src/database/typeorm-seeds/workspace/message-threads'; const tableName = 'messageChannelMessageAssociation'; @@ -25,7 +24,6 @@ export const seedMessageChannelMessageAssociation = async ( 'updatedAt', 'deletedAt', 'messageThreadExternalId', - 'messageThreadId', 'messageExternalId', 'messageId', 'messageChannelId', @@ -38,7 +36,6 @@ export const seedMessageChannelMessageAssociation = async ( updatedAt: new Date(), deletedAt: null, messageThreadExternalId: null, - messageThreadId: DEV_SEED_MESSAGE_THREAD_IDS.MESSAGE_THREAD_1, messageExternalId: null, messageId: DEV_SEED_MESSAGE_IDS.MESSAGE_1, messageChannelId: DEV_SEED_MESSAGE_CHANNEL_IDS.TIM, @@ -49,7 +46,6 @@ export const seedMessageChannelMessageAssociation = async ( updatedAt: new Date(), deletedAt: null, messageThreadExternalId: null, - messageThreadId: DEV_SEED_MESSAGE_THREAD_IDS.MESSAGE_THREAD_2, messageExternalId: null, messageId: DEV_SEED_MESSAGE_IDS.MESSAGE_2, messageChannelId: DEV_SEED_MESSAGE_CHANNEL_IDS.TIM, @@ -60,7 +56,6 @@ export const seedMessageChannelMessageAssociation = async ( updatedAt: new Date(), deletedAt: null, messageThreadExternalId: null, - messageThreadId: DEV_SEED_MESSAGE_THREAD_IDS.MESSAGE_THREAD_1, messageExternalId: null, messageId: DEV_SEED_MESSAGE_IDS.MESSAGE_3, messageChannelId: DEV_SEED_MESSAGE_CHANNEL_IDS.TIM, diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.service.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.service.ts index c08afa92e..caffa38f6 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.service.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.service.ts @@ -205,7 +205,20 @@ export class WorkspaceMigrationRunnerService { ); break; case WorkspaceMigrationIndexActionType.DROP: - await queryRunner.dropIndex(`${schemaName}.${tableName}`, index.name); + try { + await queryRunner.dropIndex( + `${schemaName}.${tableName}`, + index.name, + ); + } catch (error) { + // Ignore error if index does not exist + if ( + error.message === + `Supplied index ${index.name} was not found in table ${schemaName}.${tableName}` + ) { + continue; + } + } break; default: throw new Error(`Migration index action not supported`); diff --git a/packages/twenty-server/src/modules/messaging/common/query-hooks/message/message-find-many.pre-query.hook.ts b/packages/twenty-server/src/modules/messaging/common/query-hooks/message/message-find-many.pre-query.hook.ts index 9feb04611..14fa417cf 100644 --- a/packages/twenty-server/src/modules/messaging/common/query-hooks/message/message-find-many.pre-query.hook.ts +++ b/packages/twenty-server/src/modules/messaging/common/query-hooks/message/message-find-many.pre-query.hook.ts @@ -4,20 +4,16 @@ import { WorkspaceQueryHookInstance } from 'src/engine/api/graphql/workspace-que import { FindManyResolverArgs } from 'src/engine/api/graphql/workspace-resolver-builder/interfaces/workspace-resolvers-builder.interface'; import { WorkspaceQueryHook } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/decorators/workspace-query-hook.decorator'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { CanAccessMessageThreadService } from 'src/modules/messaging/common/query-hooks/message/can-access-message-thread.service'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { AuthContext } from 'src/engine/core-modules/auth/types/auth-context.type'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { CanAccessMessageThreadService } from 'src/modules/messaging/common/query-hooks/message/can-access-message-thread.service'; +import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; @WorkspaceQueryHook(`message.findMany`) export class MessageFindManyPreQueryHook implements WorkspaceQueryHookInstance { constructor( - @InjectObjectMetadataRepository( - MessageChannelMessageAssociationWorkspaceEntity, - ) - private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationRepository, private readonly canAccessMessageThreadService: CanAccessMessageThreadService, + private readonly twentyORMManager: TwentyORMManager, ) {} async execute( @@ -33,12 +29,20 @@ export class MessageFindManyPreQueryHook implements WorkspaceQueryHookInstance { throw new BadRequestException('User id is required'); } - const messageChannelMessageAssociations = - await this.messageChannelMessageAssociationService.getByMessageThreadId( - payload?.filter?.messageThreadId?.eq, - authContext.workspace.id, + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', ); + const messageChannelMessageAssociations = + await messageChannelMessageAssociationRepository.find({ + where: { + message: { + messageThreadId: payload.filter.messageThreadId.eq, + }, + }, + }); + if (messageChannelMessageAssociations.length === 0) { throw new NotFoundException(); } diff --git a/packages/twenty-server/src/modules/messaging/common/query-hooks/message/message-find-one.pre-query-hook.ts b/packages/twenty-server/src/modules/messaging/common/query-hooks/message/message-find-one.pre-query-hook.ts index 35d7fcecc..f06f4b09f 100644 --- a/packages/twenty-server/src/modules/messaging/common/query-hooks/message/message-find-one.pre-query-hook.ts +++ b/packages/twenty-server/src/modules/messaging/common/query-hooks/message/message-find-one.pre-query-hook.ts @@ -5,20 +5,16 @@ import { WorkspaceQueryHookInstance } from 'src/engine/api/graphql/workspace-que import { FindOneResolverArgs } from 'src/engine/api/graphql/workspace-resolver-builder/interfaces/workspace-resolvers-builder.interface'; import { WorkspaceQueryHook } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/decorators/workspace-query-hook.decorator'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { CanAccessMessageThreadService } from 'src/modules/messaging/common/query-hooks/message/can-access-message-thread.service'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { AuthContext } from 'src/engine/core-modules/auth/types/auth-context.type'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { CanAccessMessageThreadService } from 'src/modules/messaging/common/query-hooks/message/can-access-message-thread.service'; +import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; @WorkspaceQueryHook(`message.findOne`) export class MessageFindOnePreQueryHook implements WorkspaceQueryHookInstance { constructor( - @InjectObjectMetadataRepository( - MessageChannelMessageAssociationWorkspaceEntity, - ) - private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationRepository, private readonly canAccessMessageThreadService: CanAccessMessageThreadService, + private readonly twentyORMManager: TwentyORMManager, ) {} async execute( @@ -30,12 +26,18 @@ export class MessageFindOnePreQueryHook implements WorkspaceQueryHookInstance { throw new NotFoundException('User id is required'); } - const messageChannelMessageAssociations = - await this.messageChannelMessageAssociationService.getByMessageIds( - [payload?.filter?.id?.eq], - authContext.workspace.id, + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', ); + const messageChannelMessageAssociations = + await messageChannelMessageAssociationRepository.find({ + where: { + messageId: payload?.filter?.id?.eq, + }, + }); + if (messageChannelMessageAssociations.length === 0) { throw new NotFoundException(); } diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts index e9b0ffa0d..0013e036f 100644 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts +++ b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel-message-association.repository.ts @@ -3,7 +3,6 @@ import { Injectable } from '@nestjs/common'; import { EntityManager } from 'typeorm'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; @Injectable() export class MessageChannelMessageAssociationRepository { @@ -11,61 +10,6 @@ export class MessageChannelMessageAssociationRepository { private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} - public async getByMessageExternalIdsAndMessageChannelId( - messageExternalIds: string[], - messageChannelId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation" - WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, - [messageExternalIds, messageChannelId], - workspaceId, - transactionManager, - ); - } - - public async countByMessageExternalIdsAndMessageChannelId( - messageExternalIds: string[], - messageChannelId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - const result = await this.workspaceDataSourceService.executeRawQuery( - `SELECT COUNT(*) FROM ${dataSourceSchema}."messageChannelMessageAssociation" - WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, - [messageExternalIds, messageChannelId], - workspaceId, - transactionManager, - ); - - return result[0]?.count; - } - - public async deleteByMessageExternalIdsAndMessageChannelId( - messageExternalIds: string[], - messageChannelId: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, - [messageExternalIds, messageChannelId], - workspaceId, - transactionManager, - ); - } - public async deleteByMessageParticipantHandleAndMessageChannelIdAndRoles( messageParticipantHandle: string, messageChannelId: string, @@ -125,43 +69,6 @@ export class MessageChannelMessageAssociationRepository { ); } - public async getByMessageChannelIds( - messageChannelIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation" - WHERE "messageChannelId" = ANY($1)`, - [messageChannelIds], - workspaceId, - transactionManager, - ); - } - - public async deleteByMessageChannelIds( - messageChannelIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ) { - if (messageChannelIds.length === 0) { - return; - } - - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "messageChannelId" = ANY($1)`, - [messageChannelIds], - workspaceId, - transactionManager, - ); - } - public async deleteByIds( ids: string[], workspaceId: string, @@ -177,103 +84,4 @@ export class MessageChannelMessageAssociationRepository { transactionManager, ); } - - public async getByMessageThreadExternalIds( - messageThreadExternalIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation" - WHERE "messageThreadExternalId" = ANY($1)`, - [messageThreadExternalIds], - workspaceId, - transactionManager, - ); - } - - public async getFirstByMessageThreadExternalId( - messageThreadExternalId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const existingMessageChannelMessageAssociations = - await this.getByMessageThreadExternalIds( - [messageThreadExternalId], - workspaceId, - transactionManager, - ); - - if ( - !existingMessageChannelMessageAssociations || - existingMessageChannelMessageAssociations.length === 0 - ) { - return null; - } - - return existingMessageChannelMessageAssociations[0]; - } - - public async getByMessageIds( - messageIds: string[], - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation" - WHERE "messageId" = ANY($1)`, - [messageIds], - workspaceId, - transactionManager, - ); - } - - public async getByMessageThreadId( - messageThreadId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - return await this.workspaceDataSourceService.executeRawQuery( - `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation" - WHERE "messageThreadId" = $1`, - [messageThreadId], - workspaceId, - transactionManager, - ); - } - - public async insert( - messageChannelId: string, - messageId: string, - messageExternalId: string, - messageThreadId: string, - messageThreadExternalId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise { - const dataSourceSchema = - this.workspaceDataSourceService.getSchemaName(workspaceId); - - await this.workspaceDataSourceService.executeRawQuery( - `INSERT INTO ${dataSourceSchema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`, - [ - messageChannelId, - messageId, - messageExternalId, - messageThreadId, - messageThreadExternalId, - ], - workspaceId, - transactionManager, - ); - } } diff --git a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity.ts b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity.ts index 0e0171766..6c7baf5e8 100644 --- a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity.ts +++ b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity.ts @@ -1,20 +1,19 @@ import { Relation } from 'src/engine/workspace-manager/workspace-sync-metadata/interfaces/relation.interface'; import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; -import { MESSAGE_CHANNEL_MESSAGE_ASSOCIATION_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; -import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; +import { RelationMetadataType } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity'; import { WorkspaceEntity } from 'src/engine/twenty-orm/decorators/workspace-entity.decorator'; -import { WorkspaceIsNotAuditLogged } from 'src/engine/twenty-orm/decorators/workspace-is-not-audit-logged.decorator'; -import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is-system.decorator'; import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator'; +import { WorkspaceIsNotAuditLogged } from 'src/engine/twenty-orm/decorators/workspace-is-not-audit-logged.decorator'; import { WorkspaceIsNullable } from 'src/engine/twenty-orm/decorators/workspace-is-nullable.decorator'; -import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; -import { RelationMetadataType } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity'; -import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; +import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is-system.decorator'; import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator'; +import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; +import { MESSAGE_CHANNEL_MESSAGE_ASSOCIATION_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; +import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; @WorkspaceEntity({ standardId: STANDARD_OBJECT_IDS.messageChannelMessageAssociation, @@ -79,20 +78,4 @@ export class MessageChannelMessageAssociationWorkspaceEntity extends BaseWorkspa @WorkspaceJoinColumn('message') messageId: string; - - @WorkspaceRelation({ - standardId: - MESSAGE_CHANNEL_MESSAGE_ASSOCIATION_STANDARD_FIELD_IDS.messageThread, - type: RelationMetadataType.MANY_TO_ONE, - label: 'Message Thread Id', - description: 'Message Thread Id', - icon: 'IconHash', - inverseSideTarget: () => MessageThreadWorkspaceEntity, - inverseSideFieldKey: 'messageChannelMessageAssociations', - }) - @WorkspaceIsNullable() - messageThread: Relation | null; - - @WorkspaceJoinColumn('messageThread') - messageThreadId: string; } diff --git a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-thread.workspace-entity.ts b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-thread.workspace-entity.ts index 46b9f5c02..255cd3dda 100644 --- a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-thread.workspace-entity.ts +++ b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-thread.workspace-entity.ts @@ -14,7 +14,6 @@ import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; import { MESSAGE_THREAD_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageThreadSubscriberWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread-subscriber.workspace-entity'; import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; @@ -54,19 +53,4 @@ export class MessageThreadWorkspaceEntity extends BaseWorkspaceEntity { featureFlag: FeatureFlagKey.IsMessageThreadSubscriberEnabled, }) subscribers: Relation; - - @WorkspaceRelation({ - standardId: - MESSAGE_THREAD_STANDARD_FIELD_IDS.messageChannelMessageAssociations, - type: RelationMetadataType.ONE_TO_MANY, - label: 'Message Channel Association', - description: 'Messages from the channel', - icon: 'IconMessage', - inverseSideTarget: () => MessageChannelMessageAssociationWorkspaceEntity, - onDelete: RelationOnDeleteAction.RESTRICT, - }) - @WorkspaceIsNullable() - messageChannelMessageAssociations: Relation< - MessageChannelMessageAssociationWorkspaceEntity[] - >; } diff --git a/packages/twenty-server/src/modules/messaging/common/standard-objects/message.workspace-entity.ts b/packages/twenty-server/src/modules/messaging/common/standard-objects/message.workspace-entity.ts index 9db744163..535639adc 100644 --- a/packages/twenty-server/src/modules/messaging/common/standard-objects/message.workspace-entity.ts +++ b/packages/twenty-server/src/modules/messaging/common/standard-objects/message.workspace-entity.ts @@ -80,7 +80,7 @@ export class MessageWorkspaceEntity extends BaseWorkspaceEntity { icon: 'IconCalendar', }) @WorkspaceIsNullable() - receivedAt: string | null; + receivedAt: Date | null; @WorkspaceRelation({ standardId: MESSAGE_STANDARD_FIELD_IDS.messageThread, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index 541cf2588..c908b202d 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -27,7 +27,6 @@ import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-m import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener'; import { MessagingErrorHandlingService } from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service'; import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; -import { MessagingMessageThreadService } from 'src/modules/messaging/message-import-manager/services/messaging-message-thread.service'; import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service'; import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service'; import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; @@ -65,7 +64,6 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess MessagingMessageImportManagerMessageChannelListener, MessagingCleanCacheJob, MessagingMessageService, - MessagingMessageThreadService, MessagingErrorHandlingService, MessagingPartialMessageListFetchService, MessagingFullMessageListFetchService, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts index 5e790d9af..bda33157b 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts @@ -2,14 +2,14 @@ import { Injectable, Logger } from '@nestjs/common'; import { GaxiosResponse } from 'gaxios'; import { gmail_v1 } from 'googleapis'; -import { EntityManager } from 'typeorm'; +import { Any, EntityManager } from 'typeorm'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; @@ -35,12 +35,9 @@ export class MessagingFullMessageListFetchService { private readonly messageChannelRepository: MessageChannelRepository, @InjectCacheStorage(CacheStorageNamespace.Messaging) private readonly cacheStorage: CacheStorageService, - @InjectObjectMetadataRepository( - MessageChannelMessageAssociationWorkspaceEntity, - ) - private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, private readonly gmailErrorHandlingService: MessagingErrorHandlingService, + private readonly twentyORMManager: TwentyORMManager, ) {} public async processMessageListFetch( @@ -129,11 +126,19 @@ export class MessagingFullMessageListFetchService { firstMessageExternalId = messageExternalIds[0]; } + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', + ); + const existingMessageChannelMessageAssociations = - await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId( - messageExternalIds, - messageChannelId, - workspaceId, + await messageChannelMessageAssociationRepository.find( + { + where: { + messageChannelId, + messageExternalId: Any(messageExternalIds), + }, + }, transactionManager, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-thread.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-thread.service.ts deleted file mode 100644 index c59c456a2..000000000 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message-thread.service.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; -import { v4 } from 'uuid'; - -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; -import { MessageThreadRepository } from 'src/modules/messaging/common/repositories/message-thread.repository'; -import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository'; -import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; -import { MessageThreadSubscriberWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread-subscriber.workspace-entity'; -import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity'; -import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; - -@Injectable() -export class MessagingMessageThreadService { - constructor( - private readonly twentyORMManager: TwentyORMManager, - @InjectObjectMetadataRepository( - MessageChannelMessageAssociationWorkspaceEntity, - ) - private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository, - @InjectObjectMetadataRepository(MessageWorkspaceEntity) - private readonly messageRepository: MessageRepository, - @InjectObjectMetadataRepository(MessageThreadWorkspaceEntity) - private readonly messageThreadRepository: MessageThreadRepository, - ) {} - - public async saveMessageThreadMember( - messageThreadId: string, - workspaceMemberId: string, - ) { - const id = v4(); - - const messageThreadSubscriberRepository = - await this.twentyORMManager.getRepository( - 'messageThreadSubscriber', - ); - - await messageThreadSubscriberRepository.insert({ - id, - messageThreadId, - workspaceMemberId, - }); - } - - public async saveMessageThreadOrReturnExistingMessageThread( - headerMessageId: string, - messageThreadExternalId: string, - workspaceId: string, - manager: EntityManager, - ) { - // Check if message thread already exists via threadExternalId - const existingMessageChannelMessageAssociationByMessageThreadExternalId = - await this.messageChannelMessageAssociationRepository.getFirstByMessageThreadExternalId( - messageThreadExternalId, - workspaceId, - manager, - ); - - const existingMessageThread = - existingMessageChannelMessageAssociationByMessageThreadExternalId?.messageThreadId; - - if (existingMessageThread) { - return Promise.resolve(existingMessageThread); - } - - // Check if message thread already exists via existing message headerMessageId - const existingMessageWithSameHeaderMessageId = - await this.messageRepository.getFirstOrNullByHeaderMessageId( - headerMessageId, - workspaceId, - manager, - ); - - if (existingMessageWithSameHeaderMessageId) { - return Promise.resolve( - existingMessageWithSameHeaderMessageId.messageThreadId, - ); - } - - // If message thread does not exist, create new message thread - const newMessageThreadId = v4(); - - await this.messageThreadRepository.insert( - newMessageThreadId, - workspaceId, - manager, - ); - - return Promise.resolve(newMessageThreadId); - } -} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message.service.ts index 378caac05..4a8468878 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message.service.ts @@ -3,231 +3,138 @@ import { Injectable } from '@nestjs/common'; import { EntityManager } from 'typeorm'; import { v4 } from 'uuid'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; -import { MessageThreadRepository } from 'src/modules/messaging/common/repositories/message-thread.repository'; -import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity'; import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; -import { MessagingMessageThreadService } from 'src/modules/messaging/message-import-manager/services/messaging-message-thread.service'; @Injectable() export class MessagingMessageService { - constructor( - private readonly workspaceDataSourceService: WorkspaceDataSourceService, - @InjectObjectMetadataRepository( - MessageChannelMessageAssociationWorkspaceEntity, - ) - private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository, - @InjectObjectMetadataRepository(MessageWorkspaceEntity) - private readonly messageRepository: MessageRepository, - @InjectObjectMetadataRepository(MessageThreadWorkspaceEntity) - private readonly messageThreadRepository: MessageThreadRepository, - private readonly messageThreadService: MessagingMessageThreadService, - ) {} + constructor(private readonly twentyORMManager: TwentyORMManager) {} public async saveMessagesWithinTransaction( messages: GmailMessage[], - connectedAccount: ConnectedAccountWorkspaceEntity, - gmailMessageChannelId: string, - workspaceId: string, + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'handle' | 'handleAliases' + >, + messageChannelId: string, transactionManager: EntityManager, ): Promise> { + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', + ); + + const messageRepository = + await this.twentyORMManager.getRepository( + 'message', + ); + + const messageThreadRepository = + await this.twentyORMManager.getRepository( + 'messageThread', + ); + const messageExternalIdsAndIdsMap = new Map(); for (const message of messages) { - const existingMessageChannelMessageAssociationsCount = - await this.messageChannelMessageAssociationRepository.countByMessageExternalIdsAndMessageChannelId( - [message.externalId], - gmailMessageChannelId, - workspaceId, + const existingMessageChannelMessageAssociation = + await messageChannelMessageAssociationRepository.findOne( + { + where: { + messageExternalId: message.externalId, + messageChannelId: messageChannelId, + }, + }, transactionManager, ); - if (existingMessageChannelMessageAssociationsCount > 0) { + if (existingMessageChannelMessageAssociation) { continue; } - // TODO: This does not handle all thread merging use cases and might create orphan threads. - const savedOrExistingMessageThreadId = - await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread( - message.headerMessageId, - message.messageThreadExternalId, - workspaceId, + const existingMessage = await messageRepository.findOne({ + where: { + headerMessageId: message.headerMessageId, + }, + }); + + if (existingMessage) { + await messageChannelMessageAssociationRepository.insert( + { + messageChannelId, + messageId: existingMessage.id, + messageExternalId: message.externalId, + messageThreadExternalId: message.messageThreadExternalId, + }, transactionManager, ); - if (!savedOrExistingMessageThreadId) { - throw new Error( - `No message thread found for message ${message.headerMessageId} in workspace ${workspaceId} in saveMessages`, + continue; + } + + const existingThread = await messageThreadRepository.findOne( + { + where: { + messages: { + messageChannelMessageAssociations: { + messageThreadExternalId: message.messageThreadExternalId, + messageChannelId, + }, + }, + }, + }, + transactionManager, + ); + + let newOrExistingMessageThreadId = existingThread?.id; + + if (!existingThread) { + newOrExistingMessageThreadId = v4(); + + await messageThreadRepository.insert( + { id: newOrExistingMessageThreadId }, + transactionManager, ); } - const savedOrExistingMessageId = - await this.saveMessageOrReturnExistingMessage( - message, - savedOrExistingMessageThreadId, - connectedAccount, - workspaceId, - transactionManager, - ); + const newMessageId = v4(); - messageExternalIdsAndIdsMap.set( - message.externalId, - savedOrExistingMessageId, + const messageDirection = + connectedAccount.handle === message.fromHandle || + connectedAccount.handleAliases?.includes(message.fromHandle) + ? 'outgoing' + : 'incoming'; + + await messageRepository.insert( + { + id: newMessageId, + headerMessageId: message.headerMessageId, + subject: message.subject, + receivedAt: new Date(parseInt(message.internalDate)), + direction: messageDirection, + text: message.text, + messageThreadId: newOrExistingMessageThreadId, + }, + transactionManager, ); - await this.messageChannelMessageAssociationRepository.insert( - gmailMessageChannelId, - savedOrExistingMessageId, - message.externalId, - savedOrExistingMessageThreadId, - message.messageThreadExternalId, - workspaceId, + messageExternalIdsAndIdsMap.set(message.externalId, newMessageId); + + await messageChannelMessageAssociationRepository.insert( + { + messageChannelId, + messageId: newMessageId, + messageExternalId: message.externalId, + messageThreadExternalId: message.messageThreadExternalId, + }, transactionManager, ); } return messageExternalIdsAndIdsMap; } - - private async saveMessageOrReturnExistingMessage( - message: GmailMessage, - messageThreadId: string, - connectedAccount: ConnectedAccountWorkspaceEntity, - workspaceId: string, - manager: EntityManager, - ): Promise { - const existingMessage = - await this.messageRepository.getFirstOrNullByHeaderMessageId( - message.headerMessageId, - workspaceId, - ); - const existingMessageId = existingMessage?.id; - - if (existingMessageId) { - return Promise.resolve(existingMessageId); - } - - const newMessageId = v4(); - - const messageDirection = - connectedAccount.handle === message.fromHandle || - connectedAccount.handleAliases?.includes(message.fromHandle) - ? 'outgoing' - : 'incoming'; - - const receivedAt = new Date(parseInt(message.internalDate)); - - await this.messageRepository.insert( - newMessageId, - message.headerMessageId, - message.subject, - receivedAt, - messageDirection, - messageThreadId, - message.text, - workspaceId, - manager, - ); - - return Promise.resolve(newMessageId); - } - - public async deleteMessages( - messagesDeletedMessageExternalIds: string[], - gmailMessageChannelId: string, - workspaceId: string, - ) { - const workspaceDataSource = - await this.workspaceDataSourceService.connectToWorkspaceDataSource( - workspaceId, - ); - - await workspaceDataSource?.transaction(async (manager: EntityManager) => { - const messageChannelMessageAssociationsToDelete = - await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId( - messagesDeletedMessageExternalIds, - gmailMessageChannelId, - workspaceId, - manager, - ); - - const messageChannelMessageAssociationIdsToDeleteIds = - messageChannelMessageAssociationsToDelete.map( - (messageChannelMessageAssociationToDelete) => - messageChannelMessageAssociationToDelete.id, - ); - - await this.messageChannelMessageAssociationRepository.deleteByIds( - messageChannelMessageAssociationIdsToDeleteIds, - workspaceId, - manager, - ); - - const messageIdsFromMessageChannelMessageAssociationsToDelete = - messageChannelMessageAssociationsToDelete.map( - (messageChannelMessageAssociationToDelete) => - messageChannelMessageAssociationToDelete.messageId, - ); - - const messageChannelMessageAssociationByMessageIds = - await this.messageChannelMessageAssociationRepository.getByMessageIds( - messageIdsFromMessageChannelMessageAssociationsToDelete, - workspaceId, - manager, - ); - - const messageIdsFromMessageChannelMessageAssociationByMessageIds = - messageChannelMessageAssociationByMessageIds.map( - (messageChannelMessageAssociation) => - messageChannelMessageAssociation.messageId, - ); - - const messageIdsToDelete = - messageIdsFromMessageChannelMessageAssociationsToDelete.filter( - (messageId) => - !messageIdsFromMessageChannelMessageAssociationByMessageIds.includes( - messageId, - ), - ); - - await this.messageRepository.deleteByIds( - messageIdsToDelete, - workspaceId, - manager, - ); - - const messageThreadIdsFromMessageChannelMessageAssociationsToDelete = - messageChannelMessageAssociationsToDelete.map( - (messageChannelMessageAssociationToDelete) => - messageChannelMessageAssociationToDelete.messageThreadId, - ); - - const messagesByThreadIds = - await this.messageRepository.getByMessageThreadIds( - messageThreadIdsFromMessageChannelMessageAssociationsToDelete, - workspaceId, - manager, - ); - - const threadIdsToDelete = - messageThreadIdsFromMessageChannelMessageAssociationsToDelete.filter( - (threadId) => - !messagesByThreadIds.find( - (message) => message.messageThreadId === threadId, - ), - ); - - await this.messageThreadRepository.deleteByIds( - threadIdsToDelete, - workspaceId, - manager, - ); - }); - } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts index 72125b793..a186f7615 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts @@ -1,13 +1,14 @@ import { Injectable, Logger } from '@nestjs/common'; import { gmail_v1 } from 'googleapis'; +import { Any } from 'typeorm'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; @@ -29,14 +30,11 @@ export class MessagingPartialMessageListFetchService { private readonly messageChannelRepository: MessageChannelRepository, @InjectCacheStorage(CacheStorageNamespace.Messaging) private readonly cacheStorage: CacheStorageService, - @InjectObjectMetadataRepository( - MessageChannelMessageAssociationWorkspaceEntity, - ) - private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository, private readonly gmailErrorHandlingService: MessagingErrorHandlingService, private readonly gmailGetHistoryService: MessagingGmailHistoryService, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, private readonly gmailFetchMessageIdsToExcludeService: MessagingGmailFetchMessageIdsToExcludeService, + private readonly twentyORMManager: TwentyORMManager, ) {} public async processMessageListFetch( @@ -135,11 +133,15 @@ export class MessagingPartialMessageListFetchService { `Added ${messagesAddedFiltered.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`, ); - await this.messageChannelMessageAssociationRepository.deleteByMessageExternalIdsAndMessageChannelId( - messagesDeleted, - messageChannel.id, - workspaceId, - ); + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', + ); + + await messageChannelMessageAssociationRepository.delete({ + messageChannelId: messageChannel.id, + messageExternalId: Any(messagesDeleted), + }); this.logger.log( `Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service.ts index e453141db..fe8c8177c 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service.ts @@ -52,7 +52,6 @@ export class MessagingSaveMessagesAndEnqueueContactCreationService { messagesToSave, connectedAccount, messageChannel.id, - workspaceId, transactionManager, );