diff --git a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts index 7dfa9a6ef..f382dcfe4 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts @@ -28,10 +28,6 @@ export class FetchWorkspaceMessagesCommand extends CommandRunner { options.workspaceId, ); - await this.fetchWorkspaceMessagesService.fetchWorkspaceThreads( - options.workspaceId, - ); - await this.fetchWorkspaceMessagesService.fetchWorkspaceMessages( options.workspaceId, ); diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts index 08ecb67a7..43066e98e 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts @@ -1,8 +1,12 @@ import { Injectable } from '@nestjs/common'; -import axios, { AxiosInstance } from 'axios'; +import axios, { AxiosInstance, AxiosResponse } from 'axios'; import { simpleParser } from 'mailparser'; +import { GmailMessage } from 'src/workspace/messaging/types/gmailMessage'; +import { MessageQuery } from 'src/workspace/messaging/types/messageQuery'; +import { GmailParsedResponse } from 'src/workspace/messaging/types/gmailParsedResponse'; + @Injectable() export class FetchBatchMessagesService { private readonly httpService: AxiosInstance; @@ -13,13 +17,16 @@ export class FetchBatchMessagesService { }); } - async fetchAllByBatches(messageQueries, accessToken: string): Promise { + async fetchAllByBatches( + messageQueries: MessageQuery[], + accessToken: string, + ): Promise { const batchLimit = 100; - let messages = []; - let batchOffset = 0; + let messages: GmailMessage[] = []; + while (batchOffset < messageQueries.length) { const batchResponse = await this.fetchBatch( messageQueries, @@ -37,11 +44,11 @@ export class FetchBatchMessagesService { } async fetchBatch( - messageQueries, + messageQueries: MessageQuery[], accessToken: string, batchOffset: number, batchLimit: number, - ): Promise { + ): Promise { const limitedMessageQueries = messageQueries.slice( batchOffset, batchOffset + batchLimit, @@ -63,7 +70,7 @@ export class FetchBatchMessagesService { return formattedResponse; } - createBatchBody(messageQueries, boundary: string): string { + createBatchBody(messageQueries: MessageQuery[], boundary: string): string { let batchBody: string[] = []; messageQueries.forEach(function (call) { @@ -87,81 +94,105 @@ export class FetchBatchMessagesService { return batchBody.concat(['--', boundary, '--']).join(''); } - parseBatch(responseCollection) { - const items: any = []; + parseBatch( + responseCollection: AxiosResponse, + ): GmailParsedResponse[] { + const responseItems: GmailParsedResponse[] = []; const boundary = this.getBatchSeparator(responseCollection); - const responseLines = responseCollection.data.split('--' + boundary); + const responseLines: string[] = responseCollection.data.split( + '--' + boundary, + ); responseLines.forEach(function (response) { const startJson = response.indexOf('{'); const endJson = response.lastIndexOf('}'); - if (startJson < 0 || endJson < 0) { - return; - } + if (startJson < 0 || endJson < 0) return; - const responseJson = response.substr(startJson, endJson - startJson + 1); + const responseJson = response.substring(startJson, endJson + 1); const item = JSON.parse(responseJson); - items.push(item); + responseItems.push(item); }); - return items; + return responseItems; } - getBatchSeparator(response) { + getBatchSeparator(response: AxiosResponse): string { const headers = response.headers; - if (!headers['content-type']) return ''; + const contentType: string = headers['content-type']; - const components = headers['content-type'].split('; '); + if (!contentType) return ''; - const boundary = components.find((o) => o.startsWith('boundary=')); + const components = contentType.split('; '); - return boundary.replace('boundary=', '').trim('; '); + const boundary = components.find((item) => item.startsWith('boundary=')); + + return boundary?.replace('boundary=', '').trim() || ''; } - async formatBatchResponse(response) { - const parsedResponse = this.parseBatch(response); + async formatBatchResponse( + response: AxiosResponse, + ): Promise { + const parsedResponses = this.parseBatch(response); + + const formattedResponse = Promise.all( + parsedResponses.map(async (item) => { + if (item.error) { + console.log('Error', item.error); + + return; + } - return Promise.all( - parsedResponse.map(async (item) => { const { id, threadId, internalDate, raw } = item; const message = atob(raw?.replace(/-/g, '+').replace(/_/g, '/')); - const parsed = await simpleParser(message); + try { + const parsed = await simpleParser(message); - const { - subject, - messageId, - from, - to, - cc, - bcc, - text, - html, - attachments, - } = parsed; + const { + subject, + messageId, + from, + to, + cc, + bcc, + text, + html, + attachments, + } = parsed; - return { - externalId: id, - headerMessageId: messageId, - subject: subject, - messageThreadId: threadId, - internalDate, - from, - to, - cc, - bcc, - text, - html, - attachments, - }; + const messageFromGmail: GmailMessage = { + externalId: id, + headerMessageId: messageId || '', + subject: subject || '', + messageThreadId: threadId, + internalDate, + from, + to, + cc, + bcc, + text: text || '', + html: html || '', + attachments, + }; + + return messageFromGmail; + } catch (error) { + console.log('Error', error); + } }), ); + + const filteredResponse = (await formattedResponse).filter( + (item) => item, + ) as GmailMessage[]; + + return filteredResponse; } } diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts index 2c3e14f81..d97f112c8 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts @@ -1,12 +1,16 @@ import { Injectable } from '@nestjs/common'; -import { google } from 'googleapis'; +import { gmail_v1, google } from 'googleapis'; import { v4 } from 'uuid'; +import { DataSource } from 'typeorm'; import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { EnvironmentService } from 'src/integrations/environment/environment.service'; import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service'; +import { GmailMessage } from 'src/workspace/messaging/types/gmailMessage'; +import { MessageQuery } from 'src/workspace/messaging/types/messageQuery'; +import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; @Injectable() export class FetchWorkspaceMessagesService { @@ -17,15 +21,12 @@ export class FetchWorkspaceMessagesService { private readonly fetchBatchMessagesService: FetchBatchMessagesService, ) {} - async fetchWorkspaceThreads(workspaceId: string): Promise { - return await this.fetchWorkspaceMemberThreads( + async fetchWorkspaceMessages(workspaceId: string): Promise { + await this.fetchWorkspaceMemberThreads( workspaceId, '20202020-0687-4c41-b707-ed1bfca972a7', ); - } - - async fetchWorkspaceMessages(workspaceId: string): Promise { - return await this.fetchWorkspaceMemberMessages( + await this.fetchWorkspaceMemberMessages( workspaceId, '20202020-0687-4c41-b707-ed1bfca972a7', ); @@ -35,7 +36,7 @@ export class FetchWorkspaceMessagesService { workspaceId: string, workspaceMemberId: string, maxResults = 500, - ): Promise { + ): Promise { const dataSourceMetadata = await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( workspaceId, @@ -45,16 +46,28 @@ export class FetchWorkspaceMessagesService { dataSourceMetadata, ); - const connectedAccount = await workspaceDataSource?.query( + if (!workspaceDataSource) { + throw new Error('No workspace data source found'); + } + + const connectedAccounts = await workspaceDataSource?.query( `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`, [workspaceMemberId], ); - const refreshToken = connectedAccount[0].refreshToken; + if (!connectedAccounts || connectedAccounts.length === 0) { + throw new Error('No connected account found'); + } - const gmail = await this.getGmailClient(refreshToken); + const refreshToken = connectedAccounts[0]?.refreshToken; - const threads = await gmail.users.threads.list({ + if (!refreshToken) { + throw new Error('No refresh token found'); + } + + const gmailClient = await this.getGmailClient(refreshToken); + + const threads = await gmailClient.users.threads.list({ userId: 'me', maxResults, }); @@ -69,17 +82,15 @@ export class FetchWorkspaceMessagesService { threadsData, dataSourceMetadata, workspaceDataSource, - connectedAccount[0].id, + connectedAccounts[0].id, ); - - return threads; } async fetchWorkspaceMemberMessages( workspaceId: string, workspaceMemberId: string, maxResults = 500, - ): Promise { + ): Promise { const dataSourceMetadata = await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( workspaceId, @@ -89,28 +100,40 @@ export class FetchWorkspaceMessagesService { dataSourceMetadata, ); - const connectedAccount = await workspaceDataSource?.query( + if (!workspaceDataSource) { + throw new Error('No workspace data source found'); + } + + const connectedAccounts = await workspaceDataSource?.query( `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`, [workspaceMemberId], ); - const accessToken = connectedAccount[0].accessToken; - const refreshToken = connectedAccount[0].refreshToken; + if (!connectedAccounts || connectedAccounts.length === 0) { + throw new Error('No connected account found'); + } - const gmail = await this.getGmailClient(refreshToken); + const accessToken = connectedAccounts[0]?.accessToken; + const refreshToken = connectedAccounts[0]?.refreshToken; - const messages = await gmail.users.messages.list({ + if (!accessToken || !refreshToken) { + throw new Error('No access token or refresh token found'); + } + + const gmailClient = await this.getGmailClient(refreshToken); + + const messages = await gmailClient.users.messages.list({ userId: 'me', maxResults, }); const messagesData = messages.data.messages; - if (!messagesData) { + if (!messagesData || messagesData?.length === 0) { return; } - const messageQueries = messagesData.map((message) => ({ + const messageQueries: MessageQuery[] = messagesData.map((message) => ({ uri: '/gmail/v1/users/me/messages/' + message.id + '?format=RAW', })); @@ -126,11 +149,9 @@ export class FetchWorkspaceMessagesService { workspaceDataSource, workspaceMemberId, ); - - return messages; } - async getGmailClient(refreshToken) { + async getGmailClient(refreshToken: string): Promise { const gmailClientId = this.environmentService.getAuthGoogleClientId(); const gmailClientSecret = @@ -145,23 +166,29 @@ export class FetchWorkspaceMessagesService { refresh_token: refreshToken, }); - return google.gmail({ + const gmailClient = google.gmail({ version: 'v1', auth: oAuth2Client, }); + + return gmailClient; } async saveMessageThreads( - threads, - dataSourceMetadata, - workspaceDataSource, - connectedAccountId, + threads: gmail_v1.Schema$Thread[], + dataSourceMetadata: DataSourceEntity, + workspaceDataSource: DataSource, + connectedAccountId: string, ) { const messageChannel = await workspaceDataSource?.query( `SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1`, [connectedAccountId], ); + if (!messageChannel.length) { + throw new Error('No message channel found for this connected account'); + } + for (const thread of threads) { await workspaceDataSource?.query( `INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("externalId", "subject", "messageChannelId", "visibility") VALUES ($1, $2, $3, $4)`, @@ -171,10 +198,10 @@ export class FetchWorkspaceMessagesService { } async saveMessages( - messages, - dataSourceMetadata, - workspaceDataSource, - workspaceMemberId, + messages: GmailMessage[], + dataSourceMetadata: DataSourceEntity, + workspaceDataSource: DataSource, + workspaceMemberId: string, ) { for (const message of messages) { const { diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailMessage.ts b/packages/twenty-server/src/workspace/messaging/types/gmailMessage.ts new file mode 100644 index 000000000..9293ff065 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/types/gmailMessage.ts @@ -0,0 +1,16 @@ +import { AddressObject, Attachment } from 'mailparser'; + +export type GmailMessage = { + externalId: string; + headerMessageId: string; + subject: string; + messageThreadId: string; + internalDate: string; + from: AddressObject | undefined; + to: AddressObject | AddressObject[] | undefined; + cc: AddressObject | AddressObject[] | undefined; + bcc: AddressObject | AddressObject[] | undefined; + text: string; + html: string; + attachments: Attachment[]; +}; diff --git a/packages/twenty-server/src/workspace/messaging/types/gmailParsedResponse.ts b/packages/twenty-server/src/workspace/messaging/types/gmailParsedResponse.ts new file mode 100644 index 000000000..ef967cb3c --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/types/gmailParsedResponse.ts @@ -0,0 +1,15 @@ +export type GmailParsedResponse = { + id: string; + threadId: string; + labelIds: string[]; + snippet: string; + sizeEstimate: number; + raw: string; + historyId: string; + internalDate: string; + error?: { + code: number; + message: string; + status: string; + }; +}; diff --git a/packages/twenty-server/src/workspace/messaging/types/messageQuery.ts b/packages/twenty-server/src/workspace/messaging/types/messageQuery.ts new file mode 100644 index 000000000..b5ddedb90 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/types/messageQuery.ts @@ -0,0 +1,3 @@ +export type MessageQuery = { + uri: string; +};