Remove message thread id from mcma and update scripts (#6500)

- Remove `messageThreadId` from `messageChannelMessageAssociation`
- Update thread merging
- Update all queries which were dependent on this field
- Update some raw queries by using `twentyORM` instead

---------

Co-authored-by: Weiko <corentin@twenty.com>
This commit is contained in:
bosiraphael
2024-08-06 18:19:59 +02:00
committed by GitHub
parent 48d0a3649d
commit 018b8220dc
14 changed files with 177 additions and 571 deletions

View File

@ -27,7 +27,6 @@ import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-m
import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener';
import { MessagingErrorHandlingService } from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service';
import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service';
import { MessagingMessageThreadService } from 'src/modules/messaging/message-import-manager/services/messaging-message-thread.service';
import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service';
import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service';
import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service';
@ -65,7 +64,6 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess
MessagingMessageImportManagerMessageChannelListener,
MessagingCleanCacheJob,
MessagingMessageService,
MessagingMessageThreadService,
MessagingErrorHandlingService,
MessagingPartialMessageListFetchService,
MessagingFullMessageListFetchService,

View File

@ -2,14 +2,14 @@ import { Injectable, Logger } from '@nestjs/common';
import { GaxiosResponse } from 'gaxios';
import { gmail_v1 } from 'googleapis';
import { EntityManager } from 'typeorm';
import { Any, EntityManager } from 'typeorm';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
@ -35,12 +35,9 @@ export class MessagingFullMessageListFetchService {
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
private readonly twentyORMManager: TwentyORMManager,
) {}
public async processMessageListFetch(
@ -129,11 +126,19 @@ export class MessagingFullMessageListFetchService {
firstMessageExternalId = messageExternalIds[0];
}
const messageChannelMessageAssociationRepository =
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
'messageChannelMessageAssociation',
);
const existingMessageChannelMessageAssociations =
await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId(
messageExternalIds,
messageChannelId,
workspaceId,
await messageChannelMessageAssociationRepository.find(
{
where: {
messageChannelId,
messageExternalId: Any(messageExternalIds),
},
},
transactionManager,
);

View File

@ -1,94 +0,0 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { v4 } from 'uuid';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageThreadRepository } from 'src/modules/messaging/common/repositories/message-thread.repository';
import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageThreadSubscriberWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread-subscriber.workspace-entity';
import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity';
import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity';
@Injectable()
export class MessagingMessageThreadService {
constructor(
private readonly twentyORMManager: TwentyORMManager,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
@InjectObjectMetadataRepository(MessageWorkspaceEntity)
private readonly messageRepository: MessageRepository,
@InjectObjectMetadataRepository(MessageThreadWorkspaceEntity)
private readonly messageThreadRepository: MessageThreadRepository,
) {}
public async saveMessageThreadMember(
messageThreadId: string,
workspaceMemberId: string,
) {
const id = v4();
const messageThreadSubscriberRepository =
await this.twentyORMManager.getRepository<MessageThreadSubscriberWorkspaceEntity>(
'messageThreadSubscriber',
);
await messageThreadSubscriberRepository.insert({
id,
messageThreadId,
workspaceMemberId,
});
}
public async saveMessageThreadOrReturnExistingMessageThread(
headerMessageId: string,
messageThreadExternalId: string,
workspaceId: string,
manager: EntityManager,
) {
// Check if message thread already exists via threadExternalId
const existingMessageChannelMessageAssociationByMessageThreadExternalId =
await this.messageChannelMessageAssociationRepository.getFirstByMessageThreadExternalId(
messageThreadExternalId,
workspaceId,
manager,
);
const existingMessageThread =
existingMessageChannelMessageAssociationByMessageThreadExternalId?.messageThreadId;
if (existingMessageThread) {
return Promise.resolve(existingMessageThread);
}
// Check if message thread already exists via existing message headerMessageId
const existingMessageWithSameHeaderMessageId =
await this.messageRepository.getFirstOrNullByHeaderMessageId(
headerMessageId,
workspaceId,
manager,
);
if (existingMessageWithSameHeaderMessageId) {
return Promise.resolve(
existingMessageWithSameHeaderMessageId.messageThreadId,
);
}
// If message thread does not exist, create new message thread
const newMessageThreadId = v4();
await this.messageThreadRepository.insert(
newMessageThreadId,
workspaceId,
manager,
);
return Promise.resolve(newMessageThreadId);
}
}

View File

@ -3,231 +3,138 @@ import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { v4 } from 'uuid';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageThreadRepository } from 'src/modules/messaging/common/repositories/message-thread.repository';
import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity';
import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity';
import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
import { MessagingMessageThreadService } from 'src/modules/messaging/message-import-manager/services/messaging-message-thread.service';
@Injectable()
export class MessagingMessageService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
@InjectObjectMetadataRepository(MessageWorkspaceEntity)
private readonly messageRepository: MessageRepository,
@InjectObjectMetadataRepository(MessageThreadWorkspaceEntity)
private readonly messageThreadRepository: MessageThreadRepository,
private readonly messageThreadService: MessagingMessageThreadService,
) {}
constructor(private readonly twentyORMManager: TwentyORMManager) {}
public async saveMessagesWithinTransaction(
messages: GmailMessage[],
connectedAccount: ConnectedAccountWorkspaceEntity,
gmailMessageChannelId: string,
workspaceId: string,
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'handle' | 'handleAliases'
>,
messageChannelId: string,
transactionManager: EntityManager,
): Promise<Map<string, string>> {
const messageChannelMessageAssociationRepository =
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
'messageChannelMessageAssociation',
);
const messageRepository =
await this.twentyORMManager.getRepository<MessageWorkspaceEntity>(
'message',
);
const messageThreadRepository =
await this.twentyORMManager.getRepository<MessageThreadWorkspaceEntity>(
'messageThread',
);
const messageExternalIdsAndIdsMap = new Map<string, string>();
for (const message of messages) {
const existingMessageChannelMessageAssociationsCount =
await this.messageChannelMessageAssociationRepository.countByMessageExternalIdsAndMessageChannelId(
[message.externalId],
gmailMessageChannelId,
workspaceId,
const existingMessageChannelMessageAssociation =
await messageChannelMessageAssociationRepository.findOne(
{
where: {
messageExternalId: message.externalId,
messageChannelId: messageChannelId,
},
},
transactionManager,
);
if (existingMessageChannelMessageAssociationsCount > 0) {
if (existingMessageChannelMessageAssociation) {
continue;
}
// TODO: This does not handle all thread merging use cases and might create orphan threads.
const savedOrExistingMessageThreadId =
await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread(
message.headerMessageId,
message.messageThreadExternalId,
workspaceId,
const existingMessage = await messageRepository.findOne({
where: {
headerMessageId: message.headerMessageId,
},
});
if (existingMessage) {
await messageChannelMessageAssociationRepository.insert(
{
messageChannelId,
messageId: existingMessage.id,
messageExternalId: message.externalId,
messageThreadExternalId: message.messageThreadExternalId,
},
transactionManager,
);
if (!savedOrExistingMessageThreadId) {
throw new Error(
`No message thread found for message ${message.headerMessageId} in workspace ${workspaceId} in saveMessages`,
continue;
}
const existingThread = await messageThreadRepository.findOne(
{
where: {
messages: {
messageChannelMessageAssociations: {
messageThreadExternalId: message.messageThreadExternalId,
messageChannelId,
},
},
},
},
transactionManager,
);
let newOrExistingMessageThreadId = existingThread?.id;
if (!existingThread) {
newOrExistingMessageThreadId = v4();
await messageThreadRepository.insert(
{ id: newOrExistingMessageThreadId },
transactionManager,
);
}
const savedOrExistingMessageId =
await this.saveMessageOrReturnExistingMessage(
message,
savedOrExistingMessageThreadId,
connectedAccount,
workspaceId,
transactionManager,
);
const newMessageId = v4();
messageExternalIdsAndIdsMap.set(
message.externalId,
savedOrExistingMessageId,
const messageDirection =
connectedAccount.handle === message.fromHandle ||
connectedAccount.handleAliases?.includes(message.fromHandle)
? 'outgoing'
: 'incoming';
await messageRepository.insert(
{
id: newMessageId,
headerMessageId: message.headerMessageId,
subject: message.subject,
receivedAt: new Date(parseInt(message.internalDate)),
direction: messageDirection,
text: message.text,
messageThreadId: newOrExistingMessageThreadId,
},
transactionManager,
);
await this.messageChannelMessageAssociationRepository.insert(
gmailMessageChannelId,
savedOrExistingMessageId,
message.externalId,
savedOrExistingMessageThreadId,
message.messageThreadExternalId,
workspaceId,
messageExternalIdsAndIdsMap.set(message.externalId, newMessageId);
await messageChannelMessageAssociationRepository.insert(
{
messageChannelId,
messageId: newMessageId,
messageExternalId: message.externalId,
messageThreadExternalId: message.messageThreadExternalId,
},
transactionManager,
);
}
return messageExternalIdsAndIdsMap;
}
private async saveMessageOrReturnExistingMessage(
message: GmailMessage,
messageThreadId: string,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
manager: EntityManager,
): Promise<string> {
const existingMessage =
await this.messageRepository.getFirstOrNullByHeaderMessageId(
message.headerMessageId,
workspaceId,
);
const existingMessageId = existingMessage?.id;
if (existingMessageId) {
return Promise.resolve(existingMessageId);
}
const newMessageId = v4();
const messageDirection =
connectedAccount.handle === message.fromHandle ||
connectedAccount.handleAliases?.includes(message.fromHandle)
? 'outgoing'
: 'incoming';
const receivedAt = new Date(parseInt(message.internalDate));
await this.messageRepository.insert(
newMessageId,
message.headerMessageId,
message.subject,
receivedAt,
messageDirection,
messageThreadId,
message.text,
workspaceId,
manager,
);
return Promise.resolve(newMessageId);
}
public async deleteMessages(
messagesDeletedMessageExternalIds: string[],
gmailMessageChannelId: string,
workspaceId: string,
) {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
await workspaceDataSource?.transaction(async (manager: EntityManager) => {
const messageChannelMessageAssociationsToDelete =
await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId(
messagesDeletedMessageExternalIds,
gmailMessageChannelId,
workspaceId,
manager,
);
const messageChannelMessageAssociationIdsToDeleteIds =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.id,
);
await this.messageChannelMessageAssociationRepository.deleteByIds(
messageChannelMessageAssociationIdsToDeleteIds,
workspaceId,
manager,
);
const messageIdsFromMessageChannelMessageAssociationsToDelete =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.messageId,
);
const messageChannelMessageAssociationByMessageIds =
await this.messageChannelMessageAssociationRepository.getByMessageIds(
messageIdsFromMessageChannelMessageAssociationsToDelete,
workspaceId,
manager,
);
const messageIdsFromMessageChannelMessageAssociationByMessageIds =
messageChannelMessageAssociationByMessageIds.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageId,
);
const messageIdsToDelete =
messageIdsFromMessageChannelMessageAssociationsToDelete.filter(
(messageId) =>
!messageIdsFromMessageChannelMessageAssociationByMessageIds.includes(
messageId,
),
);
await this.messageRepository.deleteByIds(
messageIdsToDelete,
workspaceId,
manager,
);
const messageThreadIdsFromMessageChannelMessageAssociationsToDelete =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.messageThreadId,
);
const messagesByThreadIds =
await this.messageRepository.getByMessageThreadIds(
messageThreadIdsFromMessageChannelMessageAssociationsToDelete,
workspaceId,
manager,
);
const threadIdsToDelete =
messageThreadIdsFromMessageChannelMessageAssociationsToDelete.filter(
(threadId) =>
!messagesByThreadIds.find(
(message) => message.messageThreadId === threadId,
),
);
await this.messageThreadRepository.deleteByIds(
threadIdsToDelete,
workspaceId,
manager,
);
});
}
}

View File

@ -1,13 +1,14 @@
import { Injectable, Logger } from '@nestjs/common';
import { gmail_v1 } from 'googleapis';
import { Any } from 'typeorm';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
@ -29,14 +30,11 @@ export class MessagingPartialMessageListFetchService {
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
private readonly gmailGetHistoryService: MessagingGmailHistoryService,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly gmailFetchMessageIdsToExcludeService: MessagingGmailFetchMessageIdsToExcludeService,
private readonly twentyORMManager: TwentyORMManager,
) {}
public async processMessageListFetch(
@ -135,11 +133,15 @@ export class MessagingPartialMessageListFetchService {
`Added ${messagesAddedFiltered.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messageChannelMessageAssociationRepository.deleteByMessageExternalIdsAndMessageChannelId(
messagesDeleted,
messageChannel.id,
workspaceId,
);
const messageChannelMessageAssociationRepository =
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
'messageChannelMessageAssociation',
);
await messageChannelMessageAssociationRepository.delete({
messageChannelId: messageChannel.id,
messageExternalId: Any(messagesDeleted),
});
this.logger.log(
`Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,

View File

@ -52,7 +52,6 @@ export class MessagingSaveMessagesAndEnqueueContactCreationService {
messagesToSave,
connectedAccount,
messageChannel.id,
workspaceId,
transactionManager,
);