From 54c1d245ab4e0de945529cba6196e9b707747026 Mon Sep 17 00:00:00 2001 From: bosiraphael <71827178+bosiraphael@users.noreply.github.com> Date: Thu, 4 Jan 2024 13:36:37 +0100 Subject: [PATCH] 3218 make the function fetchworkspacememberthreads idempotent (#3230) * wip * fetch only the messages which are not in the db * fetch only the messages and threads which are not in the db * fix bugs * merge * remove eslint-plugins-twenty * get saved message thread ids and message ids at the same time --- .../fetch-workspace-messages.service.ts | 110 ++++++++++++++---- 1 file changed, 87 insertions(+), 23 deletions(-) diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts index 46bbed59b..268b3fa2a 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts @@ -39,29 +39,14 @@ export class FetchWorkspaceMessagesService { workspaceMemberId: string, maxResults = 500, ): Promise { - const dataSourceMetadata = - await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( + const { workspaceDataSource, dataSourceMetadata, connectedAccount } = + await this.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( workspaceId, + workspaceMemberId, ); - const workspaceDataSource = - await this.typeORMService.connectToDataSource(dataSourceMetadata); - - if (!workspaceDataSource) { - throw new Error('No workspace data source found'); - } - - const connectedAccounts = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`, - [workspaceMemberId], - ); - - if (!connectedAccounts || connectedAccounts.length === 0) { - throw new Error('No connected account found'); - } - - const accessToken = connectedAccounts[0]?.accessToken; - const refreshToken = connectedAccounts[0]?.refreshToken; + const accessToken = connectedAccount.accessToken; + const refreshToken = connectedAccount.refreshToken; if (!refreshToken) { throw new Error('No refresh token found'); @@ -80,11 +65,22 @@ export class FetchWorkspaceMessagesService { return; } + const { savedMessageIds, savedThreadIds } = + await this.getAllSavedMessagesIdsAndMessageThreadsIdsForConnectedAccount( + dataSourceMetadata, + workspaceDataSource, + connectedAccount.id, + ); + + const threadsToSave = threadsData.filter( + (thread) => thread.id && !savedThreadIds.includes(thread.id), + ); + await this.saveMessageThreads( - threadsData, + threadsToSave, dataSourceMetadata, workspaceDataSource, - connectedAccounts[0].id, + connectedAccount.id, ); const threadQueries: MessageOrThreadQuery[] = threadsData.map((thread) => ({ @@ -101,7 +97,11 @@ export class FetchWorkspaceMessagesService { .map((thread) => thread.messageIds) .flat(); - const messageQueries: MessageOrThreadQuery[] = messageIds.map( + const messageIdsToSave = messageIds.filter( + (messageId) => !savedMessageIds.includes(messageId), + ); + + const messageQueries: MessageOrThreadQuery[] = messageIdsToSave.map( (messageId) => ({ uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW', }), @@ -224,4 +224,68 @@ export class FetchWorkspaceMessagesService { }); } } + + async getAllSavedMessagesIdsAndMessageThreadsIdsForConnectedAccount( + dataSourceMetadata: DataSourceEntity, + workspaceDataSource: DataSource, + connectedAccountId: string, + ): Promise<{ + savedMessageIds: string[]; + savedThreadIds: string[]; + }> { + const messageIds: { messageId: string; messageThreadId: string }[] = + await workspaceDataSource?.query( + `SELECT message."externalId" AS "messageId", + "messageThread"."externalId" AS "messageThreadId" + FROM ${dataSourceMetadata.schema}."message" message + LEFT JOIN ${dataSourceMetadata.schema}."messageThread" "messageThread" ON message."messageThreadId" = "messageThread"."id" + LEFT JOIN ${dataSourceMetadata.schema}."messageChannel" ON "messageThread"."messageChannelId" = ${dataSourceMetadata.schema}."messageChannel"."id" + WHERE ${dataSourceMetadata.schema}."messageChannel"."connectedAccountId" = $1`, + [connectedAccountId], + ); + + return { + savedMessageIds: messageIds.map((message) => message.messageId), + savedThreadIds: [ + ...new Set(messageIds.map((message) => message.messageThreadId)), + ], + }; + } + + async getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( + workspaceId: string, + workspaceMemberId: string, + ): Promise<{ + dataSourceMetadata: DataSourceEntity; + workspaceDataSource: DataSource; + connectedAccount: any; + }> { + const dataSourceMetadata = + await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( + workspaceId, + ); + + const workspaceDataSource = await this.typeORMService.connectToDataSource( + dataSourceMetadata, + ); + + if (!workspaceDataSource) { + throw new Error('No workspace data source found'); + } + + const connectedAccounts = await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`, + [workspaceMemberId], + ); + + if (!connectedAccounts || connectedAccounts.length === 0) { + throw new Error('No connected account found'); + } + + return { + dataSourceMetadata, + workspaceDataSource, + connectedAccount: connectedAccounts[0], + }; + } }