[Messaging] Delete empty threads after message deletion import (#3716)

* [Messaging] Delete empty threads after message deletion import

* fix
This commit is contained in:
Weiko
2024-02-02 18:13:41 +01:00
committed by GitHub
parent c56153cab1
commit 729e2dc651
12 changed files with 334 additions and 158 deletions

View File

@ -1,5 +1,7 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata'; import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
@ -12,29 +14,34 @@ export class ConnectedAccountService {
public async getAll( public async getAll(
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<ConnectedAccountObjectMetadata>[]> { ): Promise<ObjectRecord<ConnectedAccountObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
return await workspaceDataSource?.query( return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google'`, `SELECT * FROM ${dataSourceSchema}."connectedAccount" WHERE "provider" = 'google'`,
[],
workspaceId,
transactionManager,
); );
} }
public async getByIdOrFail( public async getByIdOrFail(
connectedAccountId: string, connectedAccountId: string,
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<ConnectedAccountObjectMetadata>> { ): Promise<ObjectRecord<ConnectedAccountObjectMetadata>> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
const connectedAccounts =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."connectedAccount" WHERE "id" = $1 LIMIT 1`,
[connectedAccountId],
workspaceId, workspaceId,
transactionManager,
); );
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "id" = $1 LIMIT 1`,
[connectedAccountId],
);
if (!connectedAccounts || connectedAccounts.length === 0) { if (!connectedAccounts || connectedAccounts.length === 0) {
throw new Error('No connected account found'); throw new Error('No connected account found');
@ -43,19 +50,37 @@ export class ConnectedAccountService {
return connectedAccounts[0]; return connectedAccounts[0];
} }
public async saveLastSyncHistoryId( public async updateLastSyncHistoryId(
historyId: string, historyId: string,
connectedAccountId: string, connectedAccountId: string,
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager,
) { ) {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
await workspaceDataSource?.query( await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "lastSyncHistoryId" = $1 WHERE "id" = $2`, `UPDATE ${dataSourceSchema}."connectedAccount" SET "lastSyncHistoryId" = $1 WHERE "id" = $2`,
[historyId, connectedAccountId], [historyId, connectedAccountId],
workspaceId,
transactionManager,
);
}
public async updateAccessToken(
accessToken: string,
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."connectedAccount" SET "accessToken" = $1 WHERE "id" = $2`,
[accessToken, connectedAccountId],
workspaceId,
transactionManager,
); );
} }
} }

View File

@ -1,5 +1,7 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageChannelMessageAssociationObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-channel-message-association.object-metadata'; import { MessageChannelMessageAssociationObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-channel-message-association.object-metadata';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
@ -14,16 +16,17 @@ export class MessageChannelMessageAssociationService {
messageExternalIds: string[], messageExternalIds: string[],
messageChannelId: string, messageChannelId: string,
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> { ): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
return await workspaceDataSource?.query( return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation"
WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, messageChannelId], [messageExternalIds, messageChannelId],
workspaceId,
transactionManager,
); );
} }
@ -31,16 +34,17 @@ export class MessageChannelMessageAssociationService {
messageExternalIds: string[], messageExternalIds: string[],
messageChannelId: string, messageChannelId: string,
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager,
): Promise<number> { ): Promise<number> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
const result = await workspaceDataSource?.query( const result = await this.workspaceDataSourceService.executeRawQuery(
`SELECT COUNT(*) FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" `SELECT COUNT(*) FROM ${dataSourceSchema}."messageChannelMessageAssociation"
WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, messageChannelId], [messageExternalIds, messageChannelId],
workspaceId,
transactionManager,
); );
return result[0]?.count; return result[0]?.count;
@ -50,42 +54,62 @@ export class MessageChannelMessageAssociationService {
messageExternalIds: string[], messageExternalIds: string[],
messageChannelId: string, messageChannelId: string,
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager,
) { ) {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
await workspaceDataSource?.query( await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`, `DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, messageChannelId], [messageExternalIds, messageChannelId],
workspaceId,
transactionManager,
);
}
public async deleteByIds(
ids: string[],
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "id" = ANY($1)`,
[ids],
workspaceId,
transactionManager,
); );
} }
public async getByMessageThreadExternalIds( public async getByMessageThreadExternalIds(
messageThreadExternalIds: string[], messageThreadExternalIds: string[],
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> { ): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
return await workspaceDataSource?.query( return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation"
WHERE "messageThreadExternalId" = ANY($1)`, WHERE "messageThreadExternalId" = ANY($1)`,
[messageThreadExternalIds], [messageThreadExternalIds],
workspaceId,
transactionManager,
); );
} }
public async getFirstByMessageThreadExternalId( public async getFirstByMessageThreadExternalId(
messageThreadExternalId: string, messageThreadExternalId: string,
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata> | null> { ): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata> | null> {
const existingMessageChannelMessageAssociations = const existingMessageChannelMessageAssociations =
await this.getByMessageThreadExternalIds( await this.getByMessageThreadExternalIds(
[messageThreadExternalId], [messageThreadExternalId],
workspaceId, workspaceId,
transactionManager,
); );
if ( if (
@ -101,16 +125,17 @@ export class MessageChannelMessageAssociationService {
public async getByMessageIds( public async getByMessageIds(
messageIds: string[], messageIds: string[],
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> { ): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
return await workspaceDataSource?.query( return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" `SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation"
WHERE "messageId" = ANY($1)`, WHERE "messageId" = ANY($1)`,
[messageIds], [messageIds],
workspaceId,
transactionManager,
); );
} }
} }

View File

@ -1,5 +1,7 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageChannelObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-channel.object-metadata'; import { MessageChannelObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-channel.object-metadata';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
@ -11,27 +13,28 @@ export class MessageChannelService {
) {} ) {}
public async getByConnectedAccountId( public async getByConnectedAccountId(
workspaceId: string,
connectedAccountId: string, connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelObjectMetadata>[]> { ): Promise<ObjectRecord<MessageChannelObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
return await workspaceDataSource?.query( return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, `SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`,
[connectedAccountId], [connectedAccountId],
workspaceId,
transactionManager,
); );
} }
public async getFirstByConnectedAccountIdOrFail( public async getFirstByConnectedAccountIdOrFail(
workspaceId: string,
connectedAccountId: string, connectedAccountId: string,
workspaceId: string,
): Promise<ObjectRecord<MessageChannelObjectMetadata>> { ): Promise<ObjectRecord<MessageChannelObjectMetadata>> {
const messageChannels = await this.getByConnectedAccountId( const messageChannels = await this.getByConnectedAccountId(
workspaceId,
connectedAccountId, connectedAccountId,
workspaceId,
); );
if (!messageChannels || messageChannels.length === 0) { if (!messageChannels || messageChannels.length === 0) {

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { MessageThreadService } from 'src/workspace/messaging/message-thread/message-thread.service';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [MessageThreadService],
exports: [MessageThreadService],
})
export class MessageThreadModule {}

View File

@ -0,0 +1,28 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
@Injectable()
export class MessageThreadService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async deleteByIds(
messageThreadIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."messageThread" WHERE id = ANY($1)`,
[messageThreadIds],
workspaceId,
transactionManager,
);
}
}

View File

@ -1,5 +1,7 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message.object-metadata'; import { MessageObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message.object-metadata';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record'; import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
@ -11,17 +13,18 @@ export class MessageService {
) {} ) {}
public async getFirstByHeaderMessageId( public async getFirstByHeaderMessageId(
workspaceId: string,
headerMessageId: string, headerMessageId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata> | null> { ): Promise<ObjectRecord<MessageObjectMetadata> | null> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
const messages = await workspaceDataSource?.query( const messages = await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceMetadata.schema}."message" WHERE "headerMessageId" = $1 LIMIT 1`, `SELECT * FROM ${dataSourceSchema}."message" WHERE "headerMessageId" = $1 LIMIT 1`,
[headerMessageId], [headerMessageId],
workspaceId,
transactionManager,
); );
if (!messages || messages.length === 0) { if (!messages || messages.length === 0) {
@ -32,32 +35,50 @@ export class MessageService {
} }
public async getByIds( public async getByIds(
workspaceId: string,
messageIds: string[], messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata>[]> { ): Promise<ObjectRecord<MessageObjectMetadata>[]> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
return await workspaceDataSource?.query( return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceMetadata.schema}."message" WHERE "id" = ANY($1)`, `SELECT * FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`,
[messageIds], [messageIds],
workspaceId,
transactionManager,
); );
} }
public async deleteByIds( public async deleteByIds(
workspaceId: string,
messageIds: string[], messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> { ): Promise<void> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const dataSourceSchema =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( this.workspaceDataSourceService.getSchemaName(workspaceId);
workspaceId,
);
await workspaceDataSource?.query( await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceMetadata.schema}."message" WHERE "id" = ANY($1)`, `DELETE FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`,
[messageIds], [messageIds],
workspaceId,
transactionManager,
);
}
public async getByMessageThreadIds(
messageThreadIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "messageThreadId" = ANY($1)`,
[messageThreadIds],
workspaceId,
transactionManager,
); );
} }
} }

View File

@ -4,6 +4,7 @@ import { EnvironmentModule } from 'src/integrations/environment/environment.modu
import { ConnectedAccountModule } from 'src/workspace/messaging/connected-account/connected-account.module'; import { ConnectedAccountModule } from 'src/workspace/messaging/connected-account/connected-account.module';
import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module'; import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module';
import { MessageChannelModule } from 'src/workspace/messaging/message-channel/message-channel.module'; import { MessageChannelModule } from 'src/workspace/messaging/message-channel/message-channel.module';
import { MessageThreadModule } from 'src/workspace/messaging/message-thread/message-thread.module';
import { MessageModule } from 'src/workspace/messaging/message/message.module'; import { MessageModule } from 'src/workspace/messaging/message/message.module';
import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider';
import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service';
@ -21,6 +22,7 @@ import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/wo
MessageChannelModule, MessageChannelModule,
MessageChannelMessageAssociationModule, MessageChannelMessageAssociationModule,
MessageModule, MessageModule,
MessageThreadModule,
], ],
providers: [ providers: [
GmailFullSyncService, GmailFullSyncService,

View File

@ -52,8 +52,8 @@ export class GmailFullSyncService {
const gmailMessageChannel = const gmailMessageChannel =
await this.messageChannelService.getFirstByConnectedAccountIdOrFail( await this.messageChannelService.getFirstByConnectedAccountIdOrFail(
workspaceId,
connectedAccountId, connectedAccountId,
workspaceId,
); );
const gmailMessageChannelId = gmailMessageChannel.id; const gmailMessageChannelId = gmailMessageChannel.id;
@ -129,7 +129,7 @@ export class GmailFullSyncService {
if (!historyId) throw new Error('No history id found'); if (!historyId) throw new Error('No history id found');
await this.connectedAccountService.saveLastSyncHistoryId( await this.connectedAccountService.updateLastSyncHistoryId(
historyId, historyId,
connectedAccount.id, connectedAccount.id,
workspaceId, workspaceId,

View File

@ -82,8 +82,8 @@ export class GmailPartialSyncService {
const gmailMessageChannel = const gmailMessageChannel =
await this.messageChannelService.getFirstByConnectedAccountIdOrFail( await this.messageChannelService.getFirstByConnectedAccountIdOrFail(
workspaceId,
connectedAccountId, connectedAccountId,
workspaceId,
); );
const gmailMessageChannelId = gmailMessageChannel.id; const gmailMessageChannelId = gmailMessageChannel.id;
@ -100,24 +100,29 @@ export class GmailPartialSyncService {
accessToken, accessToken,
); );
await this.utils.saveMessages( if (messagesToSave.length !== 0) {
messagesToSave, await this.utils.saveMessages(
dataSourceMetadata, messagesToSave,
workspaceDataSource, dataSourceMetadata,
connectedAccount, workspaceDataSource,
gmailMessageChannelId, connectedAccount,
workspaceId, gmailMessageChannelId,
); workspaceId,
);
}
await this.utils.deleteMessages( if (messagesDeleted.length !== 0) {
messagesDeleted, await this.utils.deleteMessages(
gmailMessageChannelId, workspaceDataSource,
workspaceId, messagesDeleted,
); gmailMessageChannelId,
workspaceId,
);
}
if (errors.length) throw new Error('Error fetching messages'); if (errors.length) throw new Error('Error fetching messages');
await this.connectedAccountService.saveLastSyncHistoryId( await this.connectedAccountService.updateLastSyncHistoryId(
newHistoryId, newHistoryId,
connectedAccount.id, connectedAccount.id,
workspaceId, workspaceId,

View File

@ -3,38 +3,25 @@ import { Injectable } from '@nestjs/common';
import axios from 'axios'; import axios from 'axios';
import { EnvironmentService } from 'src/integrations/environment/environment.service'; import { EnvironmentService } from 'src/integrations/environment/environment.service';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service'; import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service';
@Injectable() @Injectable()
export class GmailRefreshAccessTokenService { export class GmailRefreshAccessTokenService {
constructor( constructor(
private readonly environmentService: EnvironmentService, private readonly environmentService: EnvironmentService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly connectedAccountService: ConnectedAccountService,
) {} ) {}
async refreshAndSaveAccessToken( async refreshAndSaveAccessToken(
workspaceId: string, workspaceId: string,
connectedAccountId: string, connectedAccountId: string,
): Promise<void> { ): Promise<void> {
const { dataSource: workspaceDataSource, dataSourceMetadata } = const connectedAccount = await this.connectedAccountService.getByIdOrFail(
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( connectedAccountId,
workspaceId, workspaceId,
);
if (!workspaceDataSource) {
throw new Error('No workspace data source found');
}
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google' AND "id" = $1`,
[connectedAccountId],
); );
if (!connectedAccounts || connectedAccounts.length === 0) { const refreshToken = connectedAccount.refreshToken;
throw new Error('No connected account found');
}
const refreshToken = connectedAccounts[0]?.refreshToken;
if (!refreshToken) { if (!refreshToken) {
throw new Error('No refresh token found'); throw new Error('No refresh token found');
@ -42,9 +29,10 @@ export class GmailRefreshAccessTokenService {
const accessToken = await this.refreshAccessToken(refreshToken); const accessToken = await this.refreshAccessToken(refreshToken);
await workspaceDataSource?.query( await this.connectedAccountService.updateAccessToken(
`UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "accessToken" = $1 WHERE "id" = $2`, accessToken,
[accessToken, connectedAccounts[0].id], connectedAccountId,
workspaceId,
); );
} }

View File

@ -11,12 +11,16 @@ import {
import { MessageQuery } from 'src/workspace/messaging/types/message-or-thread-query'; import { MessageQuery } from 'src/workspace/messaging/types/message-or-thread-query';
import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service'; import { MessageChannelMessageAssociationService } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-association.service';
import { MessageService } from 'src/workspace/messaging/message/message.service'; import { MessageService } from 'src/workspace/messaging/message/message.service';
import { MessageThreadService } from 'src/workspace/messaging/message-thread/message-thread.service';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
import { ConnectedAccountObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/connected-account.object-metadata';
@Injectable() @Injectable()
export class MessagingUtilsService { export class MessagingUtilsService {
constructor( constructor(
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService, private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
private readonly messageService: MessageService, private readonly messageService: MessageService,
private readonly messageThreadService: MessageThreadService,
) {} ) {}
public createQueriesFromMessageIds( public createQueriesFromMessageIds(
@ -31,17 +35,18 @@ export class MessagingUtilsService {
messages: GmailMessage[], messages: GmailMessage[],
dataSourceMetadata: DataSourceEntity, dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource, workspaceDataSource: DataSource,
connectedAccount, connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
gmailMessageChannelId: string, gmailMessageChannelId: string,
workspaceId: string, workspaceId: string,
) { ) {
for (const message of messages) { for (const message of messages) {
await workspaceDataSource?.transaction(async (manager) => { await workspaceDataSource?.transaction(async (manager: EntityManager) => {
const existingMessageChannelMessageAssociationsCount = const existingMessageChannelMessageAssociationsCount =
await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId( await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId(
[message.externalId], [message.externalId],
gmailMessageChannelId, gmailMessageChannelId,
workspaceId, workspaceId,
manager,
); );
if (existingMessageChannelMessageAssociationsCount > 0) { if (existingMessageChannelMessageAssociationsCount > 0) {
@ -52,8 +57,8 @@ export class MessagingUtilsService {
await this.saveMessageThreadOrReturnExistingMessageThread( await this.saveMessageThreadOrReturnExistingMessageThread(
message.messageThreadExternalId, message.messageThreadExternalId,
dataSourceMetadata, dataSourceMetadata,
manager,
workspaceId, workspaceId,
manager,
); );
const savedOrExistingMessageId = const savedOrExistingMessageId =
@ -62,8 +67,8 @@ export class MessagingUtilsService {
savedOrExistingMessageThreadId, savedOrExistingMessageThreadId,
connectedAccount, connectedAccount,
dataSourceMetadata, dataSourceMetadata,
manager,
workspaceId, workspaceId,
manager,
); );
await manager.query( await manager.query(
@ -83,14 +88,14 @@ export class MessagingUtilsService {
private async saveMessageOrReturnExistingMessage( private async saveMessageOrReturnExistingMessage(
message: GmailMessage, message: GmailMessage,
messageThreadId: string, messageThreadId: string,
connectedAccount, connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
dataSourceMetadata: DataSourceEntity, dataSourceMetadata: DataSourceEntity,
manager: EntityManager,
workspaceId: string, workspaceId: string,
manager: EntityManager,
): Promise<string> { ): Promise<string> {
const existingMessage = await this.messageService.getFirstByHeaderMessageId( const existingMessage = await this.messageService.getFirstByHeaderMessageId(
workspaceId,
message.headerMessageId, message.headerMessageId,
workspaceId,
); );
const existingMessageId = existingMessage?.id; const existingMessageId = existingMessage?.id;
@ -132,13 +137,14 @@ export class MessagingUtilsService {
private async saveMessageThreadOrReturnExistingMessageThread( private async saveMessageThreadOrReturnExistingMessageThread(
messageThreadExternalId: string, messageThreadExternalId: string,
dataSourceMetadata: DataSourceEntity, dataSourceMetadata: DataSourceEntity,
manager: EntityManager,
workspaceId: string, workspaceId: string,
manager: EntityManager,
) { ) {
const existingMessageChannelMessageAssociationByMessageThreadExternalId = const existingMessageChannelMessageAssociationByMessageThreadExternalId =
await this.messageChannelMessageAssociationService.getFirstByMessageThreadExternalId( await this.messageChannelMessageAssociationService.getFirstByMessageThreadExternalId(
messageThreadExternalId, messageThreadExternalId,
workspaceId, workspaceId,
manager,
); );
const existingMessageThread = const existingMessageThread =
@ -199,49 +205,91 @@ export class MessagingUtilsService {
} }
public async deleteMessages( public async deleteMessages(
messagesDeleted: string[], workspaceDataSource: DataSource,
messagesDeletedMessageExternalIds: string[],
gmailMessageChannelId: string, gmailMessageChannelId: string,
workspaceId: string, workspaceId: string,
) { ) {
const messageChannelMessageAssociationsToDelete = await workspaceDataSource?.transaction(async (manager: EntityManager) => {
await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId( const messageChannelMessageAssociationsToDelete =
messagesDeleted, await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId(
gmailMessageChannelId, messagesDeletedMessageExternalIds,
gmailMessageChannelId,
workspaceId,
manager,
);
const messageChannelMessageAssociationIdsToDeleteIds =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.id,
);
await this.messageChannelMessageAssociationService.deleteByIds(
messageChannelMessageAssociationIdsToDeleteIds,
workspaceId, workspaceId,
manager,
); );
const messageIdsFromMessageChannelMessageAssociationsToDelete = const messageIdsFromMessageChannelMessageAssociationsToDelete =
messageChannelMessageAssociationsToDelete.map( messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) => (messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.messageId, messageChannelMessageAssociationToDelete.messageId,
); );
await this.messageChannelMessageAssociationService.deleteByMessageExternalIdsAndMessageChannelId( const messageChannelMessageAssociationByMessageIds =
messagesDeleted, await this.messageChannelMessageAssociationService.getByMessageIds(
gmailMessageChannelId, messageIdsFromMessageChannelMessageAssociationsToDelete,
workspaceId, workspaceId,
); manager,
);
const messageChannelMessageAssociationByMessageIds = const messageIdsFromMessageChannelMessageAssociationByMessageIds =
await this.messageChannelMessageAssociationService.getByMessageIds( messageChannelMessageAssociationByMessageIds.map(
messageIdsFromMessageChannelMessageAssociationsToDelete, (messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageId,
);
const messageIdsToDelete =
messageIdsFromMessageChannelMessageAssociationsToDelete.filter(
(messageId) =>
!messageIdsFromMessageChannelMessageAssociationByMessageIds.includes(
messageId,
),
);
await this.messageService.deleteByIds(
messageIdsToDelete,
workspaceId, workspaceId,
manager,
); );
const messageIdsFromMessageChannelMessageAssociationByMessageIds = const messageThreadIdsFromMessageChannelMessageAssociationsToDelete =
messageChannelMessageAssociationByMessageIds.map( messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociation) => (messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociation.messageId, messageChannelMessageAssociationToDelete.messageThreadId,
); );
const messageIdsToDelete = const messagesByThreadIds =
messageIdsFromMessageChannelMessageAssociationsToDelete.filter( await this.messageService.getByMessageThreadIds(
(messageId) => messageThreadIdsFromMessageChannelMessageAssociationsToDelete,
!messageIdsFromMessageChannelMessageAssociationByMessageIds.includes( workspaceId,
messageId, manager,
), );
);
await this.messageService.deleteByIds(workspaceId, messageIdsToDelete); const threadIdsToDelete =
messageThreadIdsFromMessageChannelMessageAssociationsToDelete.filter(
(threadId) =>
!messagesByThreadIds.find(
(message) => message.messageThreadId === threadId,
),
);
await this.messageThreadService.deleteByIds(
threadIdsToDelete,
workspaceId,
manager,
);
});
} }
} }

View File

@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { DataSource } from 'typeorm'; import { DataSource, EntityManager } from 'typeorm';
import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { TypeORMService } from 'src/database/typeorm/typeorm.service';
@ -77,6 +77,7 @@ export class WorkspaceDataSourceService {
/** /**
* *
* Get the schema name for a workspace * Get the schema name for a workspace
* Note: This is assuming that the workspace only has one schema but we should prefer querying the metadata table instead.
* *
* @param workspaceId * @param workspaceId
* @returns string * @returns string
@ -106,4 +107,23 @@ export class WorkspaceDataSourceService {
return `${devId ? 'twenty_' : ''}${base36String}`; return `${devId ? 'twenty_' : ''}${base36String}`;
} }
public async executeRawQuery(
query: string,
parameters: any[] = [],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<any> {
if (transactionManager) {
return await transactionManager.query(query, parameters);
}
const workspaceDataSource =
await this.connectToWorkspaceDataSource(workspaceId);
if (workspaceDataSource) {
return await workspaceDataSource.query(query, parameters);
}
throw new Error('No data source found or transaction manager provided');
}
} }