diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts index 43066e98e..c1b4d737a 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts @@ -4,8 +4,10 @@ import axios, { AxiosInstance, AxiosResponse } from 'axios'; import { simpleParser } from 'mailparser'; import { GmailMessage } from 'src/workspace/messaging/types/gmailMessage'; -import { MessageQuery } from 'src/workspace/messaging/types/messageQuery'; -import { GmailParsedResponse } from 'src/workspace/messaging/types/gmailParsedResponse'; +import { MessageOrThreadQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; +import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmailMessageParsedResponse'; +import { GmailThreadParsedResponse } from 'src/workspace/messaging/types/gmailThreadParsedResponse'; +import { GmailThread } from 'src/workspace/messaging/types/gmailThread'; @Injectable() export class FetchBatchMessagesService { @@ -17,60 +19,95 @@ export class FetchBatchMessagesService { }); } - async fetchAllByBatches( - messageQueries: MessageQuery[], + async fetchAllMessages( + queries: MessageOrThreadQuery[], accessToken: string, ): Promise { - const batchLimit = 100; + const batchResponses = await this.fetchAllByBatches( + queries, + accessToken, + 'batch_gmail_messages', + ); - let batchOffset = 0; - - let messages: GmailMessage[] = []; - - while (batchOffset < messageQueries.length) { - const batchResponse = await this.fetchBatch( - messageQueries, - accessToken, - batchOffset, - batchLimit, - ); - - messages = messages.concat(batchResponse); - - batchOffset += batchLimit; - } + const messages = await this.formatBatchResponsesAsGmailMessages( + batchResponses, + ); return messages; } + async fetchAllThreads( + queries: MessageOrThreadQuery[], + accessToken: string, + ): Promise { + const batchResponses = await this.fetchAllByBatches( + queries, + accessToken, + 'batch_gmail_threads', + ); + + const threads = await this.formatBatchResponsesAsGmailThreads( + batchResponses, + ); + + return threads; + } + + async fetchAllByBatches( + queries: MessageOrThreadQuery[], + accessToken: string, + boundary: string, + ): Promise[]> { + const batchLimit = 100; + + let batchOffset = 0; + + let batchResponses: AxiosResponse[] = []; + + while (batchOffset < queries.length) { + const batchResponse = await this.fetchBatch( + queries, + accessToken, + batchOffset, + batchLimit, + boundary, + ); + + batchResponses = batchResponses.concat(batchResponse); + + batchOffset += batchLimit; + } + + return batchResponses; + } + async fetchBatch( - messageQueries: MessageQuery[], + queries: MessageOrThreadQuery[], accessToken: string, batchOffset: number, batchLimit: number, - ): Promise { - const limitedMessageQueries = messageQueries.slice( - batchOffset, - batchOffset + batchLimit, - ); + boundary: string, + ): Promise> { + const limitedQueries = queries.slice(batchOffset, batchOffset + batchLimit); const response = await this.httpService.post( '/', - this.createBatchBody(limitedMessageQueries, 'batch_gmail_messages'), + this.createBatchBody(limitedQueries, boundary), { headers: { - 'Content-Type': 'multipart/mixed; boundary=batch_gmail_messages', + 'Content-Type': 'multipart/mixed; boundary=' + boundary, Authorization: 'Bearer ' + accessToken, }, }, ); - const formattedResponse = await this.formatBatchResponse(response); - - return formattedResponse; + return response; } - createBatchBody(messageQueries: MessageQuery[], boundary: string): string { + createBatchBody( + messageQueries: MessageOrThreadQuery[], + boundary: string, + ): string { let batchBody: string[] = []; messageQueries.forEach(function (call) { @@ -96,8 +133,10 @@ export class FetchBatchMessagesService { parseBatch( responseCollection: AxiosResponse, - ): GmailParsedResponse[] { - const responseItems: GmailParsedResponse[] = []; + ): GmailMessageParsedResponse[] | GmailThreadParsedResponse[] { + const responseItems: + | GmailMessageParsedResponse[] + | GmailThreadParsedResponse[] = []; const boundary = this.getBatchSeparator(responseCollection); @@ -121,8 +160,8 @@ export class FetchBatchMessagesService { return responseItems; } - getBatchSeparator(response: AxiosResponse): string { - const headers = response.headers; + getBatchSeparator(responseCollection: AxiosResponse): string { + const headers = responseCollection.headers; const contentType: string = headers['content-type']; @@ -135,25 +174,27 @@ export class FetchBatchMessagesService { return boundary?.replace('boundary=', '').trim() || ''; } - async formatBatchResponse( - response: AxiosResponse, + async formatBatchResponseAsGmailMessage( + responseCollection: AxiosResponse, ): Promise { - const parsedResponses = this.parseBatch(response); + const parsedResponses = this.parseBatch( + responseCollection, + ) as GmailMessageParsedResponse[]; const formattedResponse = Promise.all( - parsedResponses.map(async (item) => { - if (item.error) { - console.log('Error', item.error); + parsedResponses.map(async (message: GmailMessageParsedResponse) => { + if (message.error) { + console.log('Error', message.error); return; } - const { id, threadId, internalDate, raw } = item; + const { id, threadId, internalDate, raw } = message; - const message = atob(raw?.replace(/-/g, '+').replace(/_/g, '/')); + const body = atob(raw?.replace(/-/g, '+').replace(/_/g, '/')); try { - const parsed = await simpleParser(message); + const parsed = await simpleParser(body); const { subject, @@ -190,9 +231,75 @@ export class FetchBatchMessagesService { ); const filteredResponse = (await formattedResponse).filter( - (item) => item, + (message) => message, ) as GmailMessage[]; return filteredResponse; } + + async formatBatchResponsesAsGmailMessages( + batchResponses: AxiosResponse[], + ): Promise { + const formattedResponses = await Promise.all( + batchResponses.map(async (response) => { + const formattedResponse = await this.formatBatchResponseAsGmailMessage( + response, + ); + + return formattedResponse; + }), + ); + + return formattedResponses.flat(); + } + + async formatBatchResponseAsGmailThread( + responseCollection: AxiosResponse, + ): Promise { + const parsedResponses = this.parseBatch( + responseCollection, + ) as GmailThreadParsedResponse[]; + + const formattedResponse = Promise.all( + parsedResponses.map(async (thread: GmailThreadParsedResponse) => { + if (thread.error) { + console.log('Error', thread.error); + + return; + } + try { + const { id, messages } = thread; + + return { + id, + messageIds: messages.map((message) => message.id) || [], + }; + } catch (error) { + console.log('Error', error); + } + }), + ); + + const filteredResponse = (await formattedResponse).filter( + (item) => item, + ) as GmailThread[]; + + return filteredResponse; + } + + async formatBatchResponsesAsGmailThreads( + batchResponses: AxiosResponse[], + ): Promise { + const formattedResponses = await Promise.all( + batchResponses.map(async (response) => { + const formattedResponse = await this.formatBatchResponseAsGmailThread( + response, + ); + + return formattedResponse; + }), + ); + + return formattedResponses.flat(); + } } diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts index d97f112c8..61e4ade85 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts @@ -9,7 +9,7 @@ import { EnvironmentService } from 'src/integrations/environment/environment.ser import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service'; import { GmailMessage } from 'src/workspace/messaging/types/gmailMessage'; -import { MessageQuery } from 'src/workspace/messaging/types/messageQuery'; +import { MessageOrThreadQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; @Injectable() @@ -26,10 +26,6 @@ export class FetchWorkspaceMessagesService { workspaceId, '20202020-0687-4c41-b707-ed1bfca972a7', ); - await this.fetchWorkspaceMemberMessages( - workspaceId, - '20202020-0687-4c41-b707-ed1bfca972a7', - ); } async fetchWorkspaceMemberThreads( @@ -59,6 +55,7 @@ export class FetchWorkspaceMessagesService { throw new Error('No connected account found'); } + const accessToken = connectedAccounts[0]?.accessToken; const refreshToken = connectedAccounts[0]?.refreshToken; if (!refreshToken) { @@ -74,7 +71,7 @@ export class FetchWorkspaceMessagesService { const threadsData = threads.data.threads; - if (!threadsData) { + if (!threadsData || threadsData?.length === 0) { return; } @@ -84,61 +81,29 @@ export class FetchWorkspaceMessagesService { workspaceDataSource, connectedAccounts[0].id, ); - } - async fetchWorkspaceMemberMessages( - workspaceId: string, - workspaceMemberId: string, - maxResults = 500, - ): Promise { - const dataSourceMetadata = - await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( - workspaceId, - ); - - const workspaceDataSource = await this.typeORMService.connectToDataSource( - dataSourceMetadata, - ); - - if (!workspaceDataSource) { - throw new Error('No workspace data source found'); - } - - const connectedAccounts = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`, - [workspaceMemberId], - ); - - if (!connectedAccounts || connectedAccounts.length === 0) { - throw new Error('No connected account found'); - } - - const accessToken = connectedAccounts[0]?.accessToken; - const refreshToken = connectedAccounts[0]?.refreshToken; - - if (!accessToken || !refreshToken) { - throw new Error('No access token or refresh token found'); - } - - const gmailClient = await this.getGmailClient(refreshToken); - - const messages = await gmailClient.users.messages.list({ - userId: 'me', - maxResults, - }); - - const messagesData = messages.data.messages; - - if (!messagesData || messagesData?.length === 0) { - return; - } - - const messageQueries: MessageQuery[] = messagesData.map((message) => ({ - uri: '/gmail/v1/users/me/messages/' + message.id + '?format=RAW', + const threadQueries: MessageOrThreadQuery[] = threadsData.map((thread) => ({ + uri: '/gmail/v1/users/me/threads/' + thread.id + '?format=minimal', })); + const threadsWithMessageIds = + await this.fetchBatchMessagesService.fetchAllThreads( + threadQueries, + accessToken, + ); + + const messageIds = threadsWithMessageIds + .map((thread) => thread.messageIds) + .flat(); + + const messageQueries: MessageOrThreadQuery[] = messageIds.map( + (messageId) => ({ + uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW', + }), + ); + const messagesResponse = - await this.fetchBatchMessagesService.fetchAllByBatches( + await this.fetchBatchMessagesService.fetchAllMessages( messageQueries, accessToken, ); diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailParsedResponse.ts b/packages/twenty-server/src/workspace/messaging/types/gmailMessageParsedResponse.ts similarity index 84% rename from packages/twenty-server/src/workspace/messaging/types/gmailParsedResponse.ts rename to packages/twenty-server/src/workspace/messaging/types/gmailMessageParsedResponse.ts index ef967cb3c..ef888321b 100644 --- a/packages/twenty-server/src/workspace/messaging/types/gmailParsedResponse.ts +++ b/packages/twenty-server/src/workspace/messaging/types/gmailMessageParsedResponse.ts @@ -1,4 +1,4 @@ -export type GmailParsedResponse = { +export type GmailMessageParsedResponse = { id: string; threadId: string; labelIds: string[]; diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailThread.ts b/packages/twenty-server/src/workspace/messaging/types/gmailThread.ts new file mode 100644 index 000000000..b1e847f4d --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/types/gmailThread.ts @@ -0,0 +1,4 @@ +export type GmailThread = { + id: string; + messageIds: string[]; +}; diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailThreadParsedResponse.ts b/packages/twenty-server/src/workspace/messaging/types/gmailThreadParsedResponse.ts new file mode 100644 index 000000000..169f22373 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/types/gmailThreadParsedResponse.ts @@ -0,0 +1,14 @@ +type Message = { + id: string; + labels: string[]; +}; + +export type GmailThreadParsedResponse = { + id: string; + messages: Message[]; + error?: { + code: number; + message: string; + status: string; + }; +}; diff --git a/packages/twenty-server/src/workspace/messaging/types/messageOrThreadQuery.ts b/packages/twenty-server/src/workspace/messaging/types/messageOrThreadQuery.ts new file mode 100644 index 000000000..93368b56c --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/types/messageOrThreadQuery.ts @@ -0,0 +1,3 @@ +export type MessageOrThreadQuery = { + uri: string; +}; diff --git a/packages/twenty-server/src/workspace/messaging/types/messageQuery.ts b/packages/twenty-server/src/workspace/messaging/types/messageQuery.ts deleted file mode 100644 index b5ddedb90..000000000 --- a/packages/twenty-server/src/workspace/messaging/types/messageQuery.ts +++ /dev/null @@ -1,3 +0,0 @@ -export type MessageQuery = { - uri: string; -};