From 729e2dc651eb1484b697442056008c06a7762331 Mon Sep 17 00:00:00 2001 From: Weiko Date: Fri, 2 Feb 2024 18:13:41 +0100 Subject: [PATCH] [Messaging] Delete empty threads after message deletion import (#3716) * [Messaging] Delete empty threads after message deletion import * fix --- .../connected-account.service.ts | 63 ++++++--- ...age-channel-message-association.service.ts | 85 +++++++----- .../message-channel.service.ts | 21 +-- .../message-thread/message-thread.module.ts | 11 ++ .../message-thread/message-thread.service.ts | 28 ++++ .../messaging/message/message.service.ts | 63 ++++++--- .../fetch-workspace-messages.module.ts | 2 + .../services/gmail-full-sync.service.ts | 4 +- .../services/gmail-partial-sync.service.ts | 35 ++--- .../gmail-refresh-access-token.service.ts | 32 ++--- .../services/messaging-utils.service.ts | 126 ++++++++++++------ .../workspace-datasource.service.ts | 22 ++- 12 files changed, 334 insertions(+), 158 deletions(-) create mode 100644 packages/twenty-server/src/workspace/messaging/message-thread/message-thread.module.ts create mode 100644 packages/twenty-server/src/workspace/messaging/message-thread/message-thread.service.ts diff --git a/packages/twenty-server/src/workspace/messaging/connected-account/connected-account.service.ts b/packages/twenty-server/src/workspace/messaging/connected-account/connected-account.service.ts index 0a7ef2278..accc80902 100644 --- a/packages/twenty-server/src/workspace/messaging/connected-account/connected-account.service.ts +++ b/packages/twenty-server/src/workspace/messaging/connected-account/connected-account.service.ts @@ -1,5 +1,7 @@ import { Injectable } from '@nestjs/common'; +import { EntityManager } from 'typeorm'; + import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata'; import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; @@ -12,29 +14,34 @@ export class ConnectedAccountService { public async getAll( workspaceId: string, + transactionManager?: EntityManager, ): Promise[]> { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - return await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google'`, + return await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."connectedAccount" WHERE "provider" = 'google'`, + [], + workspaceId, + transactionManager, ); } public async getByIdOrFail( connectedAccountId: string, workspaceId: string, + transactionManager?: EntityManager, ): Promise> { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + const connectedAccounts = + await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."connectedAccount" WHERE "id" = $1 LIMIT 1`, + [connectedAccountId], workspaceId, + transactionManager, ); - const connectedAccounts = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "id" = $1 LIMIT 1`, - [connectedAccountId], - ); if (!connectedAccounts || connectedAccounts.length === 0) { throw new Error('No connected account found'); @@ -43,19 +50,37 @@ export class ConnectedAccountService { return connectedAccounts[0]; } - public async saveLastSyncHistoryId( + public async updateLastSyncHistoryId( historyId: string, connectedAccountId: string, workspaceId: string, + transactionManager?: EntityManager, ) { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - await workspaceDataSource?.query( - `UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "lastSyncHistoryId" = $1 WHERE "id" = $2`, + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."connectedAccount" SET "lastSyncHistoryId" = $1 WHERE "id" = $2`, [historyId, connectedAccountId], + workspaceId, + transactionManager, + ); + } + + public async updateAccessToken( + accessToken: string, + connectedAccountId: string, + workspaceId: string, + transactionManager?: EntityManager, + ) { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."connectedAccount" SET "accessToken" = $1 WHERE "id" = $2`, + [accessToken, connectedAccountId], + workspaceId, + transactionManager, ); } } diff --git a/packages/twenty-server/src/workspace/messaging/message-channel-message-association/message-channel-message-association.service.ts b/packages/twenty-server/src/workspace/messaging/message-channel-message-association/message-channel-message-association.service.ts index 96168d668..cf2406c32 100644 --- a/packages/twenty-server/src/workspace/messaging/message-channel-message-association/message-channel-message-association.service.ts +++ b/packages/twenty-server/src/workspace/messaging/message-channel-message-association/message-channel-message-association.service.ts @@ -1,5 +1,7 @@ import { Injectable } from '@nestjs/common'; +import { EntityManager } from 'typeorm'; + import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { MessageChannelMessageAssociationObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-channel-message-association.object-metadata'; import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; @@ -14,16 +16,17 @@ export class MessageChannelMessageAssociationService { messageExternalIds: string[], messageChannelId: string, workspaceId: string, + transactionManager?: EntityManager, ): Promise[]> { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - return await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" + return await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, [messageExternalIds, messageChannelId], + workspaceId, + transactionManager, ); } @@ -31,16 +34,17 @@ export class MessageChannelMessageAssociationService { messageExternalIds: string[], messageChannelId: string, workspaceId: string, + transactionManager?: EntityManager, ): Promise { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - const result = await workspaceDataSource?.query( - `SELECT COUNT(*) FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" + const result = await this.workspaceDataSourceService.executeRawQuery( + `SELECT COUNT(*) FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, [messageExternalIds, messageChannelId], + workspaceId, + transactionManager, ); return result[0]?.count; @@ -50,42 +54,62 @@ export class MessageChannelMessageAssociationService { messageExternalIds: string[], messageChannelId: string, workspaceId: string, + transactionManager?: EntityManager, ) { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - await workspaceDataSource?.query( - `DELETE FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, + await this.workspaceDataSourceService.executeRawQuery( + `DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, [messageExternalIds, messageChannelId], + workspaceId, + transactionManager, + ); + } + + public async deleteByIds( + ids: string[], + workspaceId: string, + transactionManager?: EntityManager, + ) { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + await this.workspaceDataSourceService.executeRawQuery( + `DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "id" = ANY($1)`, + [ids], + workspaceId, + transactionManager, ); } public async getByMessageThreadExternalIds( messageThreadExternalIds: string[], workspaceId: string, + transactionManager?: EntityManager, ): Promise[]> { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - return await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" + return await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "messageThreadExternalId" = ANY($1)`, [messageThreadExternalIds], + workspaceId, + transactionManager, ); } public async getFirstByMessageThreadExternalId( messageThreadExternalId: string, workspaceId: string, + transactionManager?: EntityManager, ): Promise | null> { const existingMessageChannelMessageAssociations = await this.getByMessageThreadExternalIds( [messageThreadExternalId], workspaceId, + transactionManager, ); if ( @@ -101,16 +125,17 @@ export class MessageChannelMessageAssociationService { public async getByMessageIds( messageIds: string[], workspaceId: string, + transactionManager?: EntityManager, ): Promise[]> { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - return await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" + return await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "messageId" = ANY($1)`, [messageIds], + workspaceId, + transactionManager, ); } } diff --git a/packages/twenty-server/src/workspace/messaging/message-channel/message-channel.service.ts b/packages/twenty-server/src/workspace/messaging/message-channel/message-channel.service.ts index 0749f1ce6..b9ece9fa2 100644 --- a/packages/twenty-server/src/workspace/messaging/message-channel/message-channel.service.ts +++ b/packages/twenty-server/src/workspace/messaging/message-channel/message-channel.service.ts @@ -1,5 +1,7 @@ import { Injectable } from '@nestjs/common'; +import { EntityManager } from 'typeorm'; + import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { MessageChannelObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-channel.object-metadata'; import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; @@ -11,27 +13,28 @@ export class MessageChannelService { ) {} public async getByConnectedAccountId( - workspaceId: string, connectedAccountId: string, + workspaceId: string, + transactionManager?: EntityManager, ): Promise[]> { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - return await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, + return await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, [connectedAccountId], + workspaceId, + transactionManager, ); } public async getFirstByConnectedAccountIdOrFail( - workspaceId: string, connectedAccountId: string, + workspaceId: string, ): Promise> { const messageChannels = await this.getByConnectedAccountId( - workspaceId, connectedAccountId, + workspaceId, ); if (!messageChannels || messageChannels.length === 0) { diff --git a/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.module.ts b/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.module.ts new file mode 100644 index 000000000..8ee9ec903 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; + +import { MessageThreadService } from 'src/workspace/messaging/message-thread/message-thread.service'; +import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; + +@Module({ + imports: [WorkspaceDataSourceModule], + providers: [MessageThreadService], + exports: [MessageThreadService], +}) +export class MessageThreadModule {} diff --git a/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.service.ts b/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.service.ts new file mode 100644 index 000000000..f25fa16b3 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/message-thread/message-thread.service.ts @@ -0,0 +1,28 @@ +import { Injectable } from '@nestjs/common'; + +import { EntityManager } from 'typeorm'; + +import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; + +@Injectable() +export class MessageThreadService { + constructor( + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + ) {} + + public async deleteByIds( + messageThreadIds: string[], + workspaceId: string, + transactionManager?: EntityManager, + ): Promise { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + await this.workspaceDataSourceService.executeRawQuery( + `DELETE FROM ${dataSourceSchema}."messageThread" WHERE id = ANY($1)`, + [messageThreadIds], + workspaceId, + transactionManager, + ); + } +} diff --git a/packages/twenty-server/src/workspace/messaging/message/message.service.ts b/packages/twenty-server/src/workspace/messaging/message/message.service.ts index d5a444d26..cfa4a21e8 100644 --- a/packages/twenty-server/src/workspace/messaging/message/message.service.ts +++ b/packages/twenty-server/src/workspace/messaging/message/message.service.ts @@ -1,5 +1,7 @@ import { Injectable } from '@nestjs/common'; +import { EntityManager } from 'typeorm'; + import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { MessageObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message.object-metadata'; import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; @@ -11,17 +13,18 @@ export class MessageService { ) {} public async getFirstByHeaderMessageId( - workspaceId: string, headerMessageId: string, + workspaceId: string, + transactionManager?: EntityManager, ): Promise | null> { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - const messages = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."message" WHERE "headerMessageId" = $1 LIMIT 1`, + const messages = await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."message" WHERE "headerMessageId" = $1 LIMIT 1`, [headerMessageId], + workspaceId, + transactionManager, ); if (!messages || messages.length === 0) { @@ -32,32 +35,50 @@ export class MessageService { } public async getByIds( - workspaceId: string, messageIds: string[], + workspaceId: string, + transactionManager?: EntityManager, ): Promise[]> { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - return await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."message" WHERE "id" = ANY($1)`, + return await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`, [messageIds], + workspaceId, + transactionManager, ); } public async deleteByIds( - workspaceId: string, messageIds: string[], + workspaceId: string, + transactionManager?: EntityManager, ): Promise { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); - await workspaceDataSource?.query( - `DELETE FROM ${dataSourceMetadata.schema}."message" WHERE "id" = ANY($1)`, + await this.workspaceDataSourceService.executeRawQuery( + `DELETE FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`, [messageIds], + workspaceId, + transactionManager, + ); + } + + public async getByMessageThreadIds( + messageThreadIds: string[], + workspaceId: string, + transactionManager?: EntityManager, + ): Promise[]> { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + return await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."message" WHERE "messageThreadId" = ANY($1)`, + [messageThreadIds], + workspaceId, + transactionManager, ); } } diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts index 92994bd4a..65ed671db 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts @@ -4,6 +4,7 @@ import { EnvironmentModule } from 'src/integrations/environment/environment.modu import { ConnectedAccountModule } from 'src/workspace/messaging/connected-account/connected-account.module'; import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module'; import { MessageChannelModule } from 'src/workspace/messaging/message-channel/message-channel.module'; +import { MessageThreadModule } from 'src/workspace/messaging/message-thread/message-thread.module'; import { MessageModule } from 'src/workspace/messaging/message/message.module'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; @@ -21,6 +22,7 @@ import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/wo MessageChannelModule, MessageChannelMessageAssociationModule, MessageModule, + MessageThreadModule, ], providers: [ GmailFullSyncService, diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts index 62324b1b2..41ed2598c 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts @@ -52,8 +52,8 @@ export class GmailFullSyncService { const gmailMessageChannel = await this.messageChannelService.getFirstByConnectedAccountIdOrFail( - workspaceId, connectedAccountId, + workspaceId, ); const gmailMessageChannelId = gmailMessageChannel.id; @@ -129,7 +129,7 @@ export class GmailFullSyncService { if (!historyId) throw new Error('No history id found'); - await this.connectedAccountService.saveLastSyncHistoryId( + await this.connectedAccountService.updateLastSyncHistoryId( historyId, connectedAccount.id, workspaceId, diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts index ebc9edabe..40a9b9bd9 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts @@ -82,8 +82,8 @@ export class GmailPartialSyncService { const gmailMessageChannel = await this.messageChannelService.getFirstByConnectedAccountIdOrFail( - workspaceId, connectedAccountId, + workspaceId, ); const gmailMessageChannelId = gmailMessageChannel.id; @@ -100,24 +100,29 @@ export class GmailPartialSyncService { accessToken, ); - await this.utils.saveMessages( - messagesToSave, - dataSourceMetadata, - workspaceDataSource, - connectedAccount, - gmailMessageChannelId, - workspaceId, - ); + if (messagesToSave.length !== 0) { + await this.utils.saveMessages( + messagesToSave, + dataSourceMetadata, + workspaceDataSource, + connectedAccount, + gmailMessageChannelId, + workspaceId, + ); + } - await this.utils.deleteMessages( - messagesDeleted, - gmailMessageChannelId, - workspaceId, - ); + if (messagesDeleted.length !== 0) { + await this.utils.deleteMessages( + workspaceDataSource, + messagesDeleted, + gmailMessageChannelId, + workspaceId, + ); + } if (errors.length) throw new Error('Error fetching messages'); - await this.connectedAccountService.saveLastSyncHistoryId( + await this.connectedAccountService.updateLastSyncHistoryId( newHistoryId, connectedAccount.id, workspaceId, diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts index faca5245d..99fb83e46 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-refresh-access-token.service.ts @@ -3,38 +3,25 @@ import { Injectable } from '@nestjs/common'; import axios from 'axios'; import { EnvironmentService } from 'src/integrations/environment/environment.service'; -import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; +import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service'; @Injectable() export class GmailRefreshAccessTokenService { constructor( private readonly environmentService: EnvironmentService, - private readonly workspaceDataSourceService: WorkspaceDataSourceService, + private readonly connectedAccountService: ConnectedAccountService, ) {} async refreshAndSaveAccessToken( workspaceId: string, connectedAccountId: string, ): Promise { - const { dataSource: workspaceDataSource, dataSourceMetadata } = - await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( - workspaceId, - ); - - if (!workspaceDataSource) { - throw new Error('No workspace data source found'); - } - - const connectedAccounts = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google' AND "id" = $1`, - [connectedAccountId], + const connectedAccount = await this.connectedAccountService.getByIdOrFail( + connectedAccountId, + workspaceId, ); - if (!connectedAccounts || connectedAccounts.length === 0) { - throw new Error('No connected account found'); - } - - const refreshToken = connectedAccounts[0]?.refreshToken; + const refreshToken = connectedAccount.refreshToken; if (!refreshToken) { throw new Error('No refresh token found'); @@ -42,9 +29,10 @@ export class GmailRefreshAccessTokenService { const accessToken = await this.refreshAccessToken(refreshToken); - await workspaceDataSource?.query( - `UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "accessToken" = $1 WHERE "id" = $2`, - [accessToken, connectedAccounts[0].id], + await this.connectedAccountService.updateAccessToken( + accessToken, + connectedAccountId, + workspaceId, ); } diff --git a/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts b/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts index bc92959b1..328b81d80 100644 --- a/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts @@ -11,12 +11,16 @@ import { import { MessageQuery } from 'src/workspace/messaging/types/message-or-thread-query'; import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service'; import { MessageService } from 'src/workspace/messaging/message/message.service'; +import { MessageThreadService } from 'src/workspace/messaging/message-thread/message-thread.service'; +import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; +import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata'; @Injectable() export class MessagingUtilsService { constructor( private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, private readonly messageService: MessageService, + private readonly messageThreadService: MessageThreadService, ) {} public createQueriesFromMessageIds( @@ -31,17 +35,18 @@ export class MessagingUtilsService { messages: GmailMessage[], dataSourceMetadata: DataSourceEntity, workspaceDataSource: DataSource, - connectedAccount, + connectedAccount: ObjectRecord, gmailMessageChannelId: string, workspaceId: string, ) { for (const message of messages) { - await workspaceDataSource?.transaction(async (manager) => { + await workspaceDataSource?.transaction(async (manager: EntityManager) => { const existingMessageChannelMessageAssociationsCount = await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId( [message.externalId], gmailMessageChannelId, workspaceId, + manager, ); if (existingMessageChannelMessageAssociationsCount > 0) { @@ -52,8 +57,8 @@ export class MessagingUtilsService { await this.saveMessageThreadOrReturnExistingMessageThread( message.messageThreadExternalId, dataSourceMetadata, - manager, workspaceId, + manager, ); const savedOrExistingMessageId = @@ -62,8 +67,8 @@ export class MessagingUtilsService { savedOrExistingMessageThreadId, connectedAccount, dataSourceMetadata, - manager, workspaceId, + manager, ); await manager.query( @@ -83,14 +88,14 @@ export class MessagingUtilsService { private async saveMessageOrReturnExistingMessage( message: GmailMessage, messageThreadId: string, - connectedAccount, + connectedAccount: ObjectRecord, dataSourceMetadata: DataSourceEntity, - manager: EntityManager, workspaceId: string, + manager: EntityManager, ): Promise { const existingMessage = await this.messageService.getFirstByHeaderMessageId( - workspaceId, message.headerMessageId, + workspaceId, ); const existingMessageId = existingMessage?.id; @@ -132,13 +137,14 @@ export class MessagingUtilsService { private async saveMessageThreadOrReturnExistingMessageThread( messageThreadExternalId: string, dataSourceMetadata: DataSourceEntity, - manager: EntityManager, workspaceId: string, + manager: EntityManager, ) { const existingMessageChannelMessageAssociationByMessageThreadExternalId = await this.messageChannelMessageAssociationService.getFirstByMessageThreadExternalId( messageThreadExternalId, workspaceId, + manager, ); const existingMessageThread = @@ -199,49 +205,91 @@ export class MessagingUtilsService { } public async deleteMessages( - messagesDeleted: string[], + workspaceDataSource: DataSource, + messagesDeletedMessageExternalIds: string[], gmailMessageChannelId: string, workspaceId: string, ) { - const messageChannelMessageAssociationsToDelete = - await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId( - messagesDeleted, - gmailMessageChannelId, + await workspaceDataSource?.transaction(async (manager: EntityManager) => { + const messageChannelMessageAssociationsToDelete = + await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId( + messagesDeletedMessageExternalIds, + gmailMessageChannelId, + workspaceId, + manager, + ); + + const messageChannelMessageAssociationIdsToDeleteIds = + messageChannelMessageAssociationsToDelete.map( + (messageChannelMessageAssociationToDelete) => + messageChannelMessageAssociationToDelete.id, + ); + + await this.messageChannelMessageAssociationService.deleteByIds( + messageChannelMessageAssociationIdsToDeleteIds, workspaceId, + manager, ); - const messageIdsFromMessageChannelMessageAssociationsToDelete = - messageChannelMessageAssociationsToDelete.map( - (messageChannelMessageAssociationToDelete) => - messageChannelMessageAssociationToDelete.messageId, - ); + const messageIdsFromMessageChannelMessageAssociationsToDelete = + messageChannelMessageAssociationsToDelete.map( + (messageChannelMessageAssociationToDelete) => + messageChannelMessageAssociationToDelete.messageId, + ); - await this.messageChannelMessageAssociationService.deleteByMessageExternalIdsAndMessageChannelId( - messagesDeleted, - gmailMessageChannelId, - workspaceId, - ); + const messageChannelMessageAssociationByMessageIds = + await this.messageChannelMessageAssociationService.getByMessageIds( + messageIdsFromMessageChannelMessageAssociationsToDelete, + workspaceId, + manager, + ); - const messageChannelMessageAssociationByMessageIds = - await this.messageChannelMessageAssociationService.getByMessageIds( - messageIdsFromMessageChannelMessageAssociationsToDelete, + const messageIdsFromMessageChannelMessageAssociationByMessageIds = + messageChannelMessageAssociationByMessageIds.map( + (messageChannelMessageAssociation) => + messageChannelMessageAssociation.messageId, + ); + + const messageIdsToDelete = + messageIdsFromMessageChannelMessageAssociationsToDelete.filter( + (messageId) => + !messageIdsFromMessageChannelMessageAssociationByMessageIds.includes( + messageId, + ), + ); + + await this.messageService.deleteByIds( + messageIdsToDelete, workspaceId, + manager, ); - const messageIdsFromMessageChannelMessageAssociationByMessageIds = - messageChannelMessageAssociationByMessageIds.map( - (messageChannelMessageAssociation) => - messageChannelMessageAssociation.messageId, - ); + const messageThreadIdsFromMessageChannelMessageAssociationsToDelete = + messageChannelMessageAssociationsToDelete.map( + (messageChannelMessageAssociationToDelete) => + messageChannelMessageAssociationToDelete.messageThreadId, + ); - const messageIdsToDelete = - messageIdsFromMessageChannelMessageAssociationsToDelete.filter( - (messageId) => - !messageIdsFromMessageChannelMessageAssociationByMessageIds.includes( - messageId, - ), - ); + const messagesByThreadIds = + await this.messageService.getByMessageThreadIds( + messageThreadIdsFromMessageChannelMessageAssociationsToDelete, + workspaceId, + manager, + ); - await this.messageService.deleteByIds(workspaceId, messageIdsToDelete); + const threadIdsToDelete = + messageThreadIdsFromMessageChannelMessageAssociationsToDelete.filter( + (threadId) => + !messagesByThreadIds.find( + (message) => message.messageThreadId === threadId, + ), + ); + + await this.messageThreadService.deleteByIds( + threadIdsToDelete, + workspaceId, + manager, + ); + }); } } diff --git a/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts b/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts index 5bc48263f..350ec645a 100644 --- a/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts +++ b/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { DataSource } from 'typeorm'; +import { DataSource, EntityManager } from 'typeorm'; import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { TypeORMService } from 'src/database/typeorm/typeorm.service'; @@ -77,6 +77,7 @@ export class WorkspaceDataSourceService { /** * * Get the schema name for a workspace + * Note: This is assuming that the workspace only has one schema but we should prefer querying the metadata table instead. * * @param workspaceId * @returns string @@ -106,4 +107,23 @@ export class WorkspaceDataSourceService { return `${devId ? 'twenty_' : ''}${base36String}`; } + + public async executeRawQuery( + query: string, + parameters: any[] = [], + workspaceId: string, + transactionManager?: EntityManager, + ): Promise { + if (transactionManager) { + return await transactionManager.query(query, parameters); + } + const workspaceDataSource = + await this.connectToWorkspaceDataSource(workspaceId); + + if (workspaceDataSource) { + return await workspaceDataSource.query(query, parameters); + } + + throw new Error('No data source found or transaction manager provided'); + } }