From f95c56b1cb0399a648729955d710940bdd43b27c Mon Sep 17 00:00:00 2001 From: bosiraphael <71827178+bosiraphael@users.noreply.github.com> Date: Fri, 15 Dec 2023 16:35:56 +0100 Subject: [PATCH] 2880 timebox create a poc to fetch emails from the gmail api (#2993) * create empty service * getting threads is working * insert message channel * save threads in the db * clean * fetch messages * create a service to fetch a batch of messages * batch messages * use httpService instead * parse batch * base 64 decoding working * solve parsing bug * saving messages is working * bug to fix in fetchAllByBatches * fetching all messages is working but not saving yet * fecth 500 messages and threads is working * remove unused package and console log * set direction to incoming * fix bug after merging main --- .../auth/services/google-gmail.service.ts | 26 ++- ...etch-workspace-messages-commands.module.ts | 3 +- .../fetch-workspace-messages.command.ts | 14 +- .../services/fetch-batch-messages.service.ts | 160 ++++++++++++++ .../fetch-workspace-messages.module.ts | 14 ++ .../fetch-workspace-messages.service.ts | 200 ++++++++++++++++++ .../message-channel.object-metadata.ts | 2 +- 7 files changed, 412 insertions(+), 7 deletions(-) create mode 100644 packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts create mode 100644 packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts create mode 100644 packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts diff --git a/packages/twenty-server/src/core/auth/services/google-gmail.service.ts b/packages/twenty-server/src/core/auth/services/google-gmail.service.ts index 11422a13b..08e1d7bd7 100644 --- a/packages/twenty-server/src/core/auth/services/google-gmail.service.ts +++ b/packages/twenty-server/src/core/auth/services/google-gmail.service.ts @@ -1,5 +1,7 @@ import { Injectable } from '@nestjs/common'; +import { v4 } from 'uuid'; + import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { SaveConnectedAccountInput } from 'src/core/auth/dto/save-connected-account'; @@ -43,10 +45,26 @@ export class GoogleGmailService { return; } - await workspaceDataSource?.query( - `INSERT INTO ${dataSourceMetadata.schema}."connectedAccount" ("handle", "provider", "accessToken", "refreshToken", "accountOwnerId") VALUES ($1, $2, $3, $4, $5)`, - [handle, provider, accessToken, refreshToken, workspaceMemberId], - ); + const connectedAccountId = v4(); + + await workspaceDataSource?.transaction(async (manager) => { + await manager.query( + `INSERT INTO ${dataSourceMetadata.schema}."connectedAccount" ("id", "handle", "provider", "accessToken", "refreshToken", "accountOwnerId") VALUES ($1, $2, $3, $4, $5, $6)`, + [ + connectedAccountId, + handle, + provider, + accessToken, + refreshToken, + workspaceMemberId, + ], + ); + + await manager.query( + `INSERT INTO ${dataSourceMetadata.schema}."messageChannel" ("visibility", "handle", "connectedAccountId", "type") VALUES ($1, $2, $3, $4)`, + ['share_everything', handle, connectedAccountId, 'gmail'], + ); + }); return; } diff --git a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts index 5b17178c3..f9be46d33 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts @@ -2,9 +2,10 @@ import { Module } from '@nestjs/common'; import { FetchWorkspaceMessagesCommand } from 'src/workspace/messaging/commands/fetch-workspace-messages.command'; import { MessagingModule } from 'src/workspace/messaging/messaging.module'; +import { FetchWorkspaceMessagesModule } from 'src/workspace/messaging/services/fetch-workspace-messages.module'; @Module({ - imports: [MessagingModule], + imports: [MessagingModule, FetchWorkspaceMessagesModule], providers: [FetchWorkspaceMessagesCommand], }) export class FetchWorkspaceMessagesCommandsModule {} diff --git a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts index 537d66b37..7dfa9a6ef 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages.command.ts @@ -1,5 +1,6 @@ import { Command, CommandRunner, Option } from 'nest-commander'; +import { FetchWorkspaceMessagesService } from 'src/workspace/messaging/services/fetch-workspace-messages.service'; import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer'; interface FetchWorkspaceMessagesOptions { @@ -11,7 +12,10 @@ interface FetchWorkspaceMessagesOptions { description: 'Fetch messages of all workspaceMembers in a workspace.', }) export class FetchWorkspaceMessagesCommand extends CommandRunner { - constructor(private readonly messagingProducer: MessagingProducer) { + constructor( + private readonly fetchWorkspaceMessagesService: FetchWorkspaceMessagesService, + private readonly messagingProducer: MessagingProducer, + ) { super(); } @@ -24,6 +28,14 @@ export class FetchWorkspaceMessagesCommand extends CommandRunner { options.workspaceId, ); + await this.fetchWorkspaceMessagesService.fetchWorkspaceThreads( + options.workspaceId, + ); + + await this.fetchWorkspaceMessagesService.fetchWorkspaceMessages( + options.workspaceId, + ); + return; } diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts new file mode 100644 index 000000000..24260bb13 --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-batch-messages.service.ts @@ -0,0 +1,160 @@ +import { Injectable } from '@nestjs/common'; + +import axios, { AxiosInstance } from 'axios'; + +@Injectable() +export class FetchBatchMessagesService { + private readonly httpService: AxiosInstance; + + constructor() { + this.httpService = axios.create({ + baseURL: 'https://www.googleapis.com/batch/gmail/v1', + }); + } + + async fetchAllByBatches(messageQueries, accessToken: string): Promise { + const batchLimit = 100; + + let messages = []; + + let batchOffset = 0; + + while (batchOffset < messageQueries.length) { + const batchResponse = await this.fetchBatch( + messageQueries, + accessToken, + batchOffset, + batchLimit, + ); + + messages = messages.concat(batchResponse); + + batchOffset += batchLimit; + } + + return messages; + } + + async fetchBatch( + messageQueries, + accessToken: string, + batchOffset: number, + batchLimit: number, + ): Promise { + const limitedMessageQueries = messageQueries.slice( + batchOffset, + batchOffset + batchLimit, + ); + + const response = await this.httpService.post( + '/', + this.createBatchBody(limitedMessageQueries, 'batch_gmail_messages'), + { + headers: { + 'Content-Type': 'multipart/mixed; boundary=batch_gmail_messages', + Authorization: 'Bearer ' + accessToken, + }, + }, + ); + + return this.formatBatchResponse(response); + } + + createBatchBody(messageQueries, boundary: string): string { + let batchBody: string[] = []; + + messageQueries.forEach(function (call) { + const method = 'GET'; + const uri = call.uri; + + batchBody = batchBody.concat([ + '--', + boundary, + '\r\n', + 'Content-Type: application/http', + '\r\n\r\n', + + method, + ' ', + uri, + '\r\n\r\n', + ]); + }); + + return batchBody.concat(['--', boundary, '--']).join(''); + } + + parseBatch(responseCollection) { + const items: any = []; + + const boundary = this.getBatchSeparator(responseCollection); + + const responseLines = responseCollection.data.split('--' + boundary); + + responseLines.forEach(function (response) { + const startJson = response.indexOf('{'); + const endJson = response.lastIndexOf('}'); + + if (startJson < 0 || endJson < 0) { + return; + } + + const responseJson = response.substr(startJson, endJson - startJson + 1); + + const item = JSON.parse(responseJson); + + items.push(item); + }); + + return items; + } + + getBatchSeparator(response) { + const headers = response.headers; + + if (!headers['content-type']) return ''; + + const components = headers['content-type'].split('; '); + + const boundary = components.find((o) => o.startsWith('boundary=')); + + return boundary.replace('boundary=', '').trim('; '); + } + + formatBatchResponse(response) { + const parsedResponse = this.parseBatch(response); + + return parsedResponse + .map((item) => { + const { id, threadId, payload } = item; + + const headers = payload?.headers; + + const parts = payload?.parts; + + if (!parts) { + return; + } + + const bodyBase64 = parts[0]?.body?.data; + + if (!bodyBase64) { + return; + } + + const body = atob(bodyBase64.replace(/-/g, '+').replace(/_/g, '/')); + + return { + externalId: id, + headerMessageId: headers?.find( + (header) => header.name === 'Message-ID', + )?.value, + subject: headers?.find((header) => header.name === 'Subject')?.value, + messageThreadId: threadId, + from: headers?.find((header) => header.name === 'From')?.value, + body, + }; + }) + .filter((item) => item); + } +} diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts new file mode 100644 index 000000000..c4c394b2f --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts @@ -0,0 +1,14 @@ +import { Module } from '@nestjs/common'; + +import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; +import { EnvironmentModule } from 'src/integrations/environment/environment.module'; +import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; +import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service'; +import { FetchWorkspaceMessagesService } from 'src/workspace/messaging/services/fetch-workspace-messages.service'; + +@Module({ + imports: [TypeORMModule, DataSourceModule, EnvironmentModule], + providers: [FetchWorkspaceMessagesService, FetchBatchMessagesService], + exports: [FetchWorkspaceMessagesService], +}) +export class FetchWorkspaceMessagesModule {} diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts new file mode 100644 index 000000000..0d31fcf4f --- /dev/null +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.service.ts @@ -0,0 +1,200 @@ +import { Injectable } from '@nestjs/common'; + +import { google } from 'googleapis'; + +import { TypeORMService } from 'src/database/typeorm/typeorm.service'; +import { EnvironmentService } from 'src/integrations/environment/environment.service'; +import { DataSourceService } from 'src/metadata/data-source/data-source.service'; +import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service'; + +@Injectable() +export class FetchWorkspaceMessagesService { + constructor( + private readonly environmentService: EnvironmentService, + private readonly dataSourceService: DataSourceService, + private readonly typeORMService: TypeORMService, + private readonly fetchBatchMessagesService: FetchBatchMessagesService, + ) {} + + async fetchWorkspaceThreads(workspaceId: string): Promise { + return await this.fetchWorkspaceMemberThreads( + workspaceId, + '20202020-0687-4c41-b707-ed1bfca972a7', + ); + } + + async fetchWorkspaceMessages(workspaceId: string): Promise { + return await this.fetchWorkspaceMemberMessages( + workspaceId, + '20202020-0687-4c41-b707-ed1bfca972a7', + ); + } + + async fetchWorkspaceMemberThreads( + workspaceId: string, + workspaceMemberId: string, + maxResults = 500, + ): Promise { + const dataSourceMetadata = + await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( + workspaceId, + ); + + const workspaceDataSource = await this.typeORMService.connectToDataSource( + dataSourceMetadata, + ); + + const connectedAccount = await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`, + [workspaceMemberId], + ); + + const refreshToken = connectedAccount[0].refreshToken; + + const gmail = await this.getGmailClient(refreshToken); + + const threads = await gmail.users.threads.list({ + userId: 'me', + maxResults, + }); + + const threadsData = threads.data.threads; + + if (!threadsData) { + return; + } + + await this.saveMessageThreads( + threadsData, + dataSourceMetadata, + workspaceDataSource, + connectedAccount[0].id, + ); + + return threads; + } + + async fetchWorkspaceMemberMessages( + workspaceId: string, + workspaceMemberId: string, + maxResults = 500, + ): Promise { + const dataSourceMetadata = + await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( + workspaceId, + ); + + const workspaceDataSource = await this.typeORMService.connectToDataSource( + dataSourceMetadata, + ); + + const connectedAccount = await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`, + [workspaceMemberId], + ); + + const accessToken = connectedAccount[0].accessToken; + const refreshToken = connectedAccount[0].refreshToken; + + const gmail = await this.getGmailClient(refreshToken); + + const messages = await gmail.users.messages.list({ + userId: 'me', + maxResults, + }); + + const messagesData = messages.data.messages; + + if (!messagesData) { + return; + } + + const messageQueries = messagesData.map((message) => ({ + uri: '/gmail/v1/users/me/messages/' + message.id, + })); + + const messagesResponse = + await this.fetchBatchMessagesService.fetchAllByBatches( + messageQueries, + accessToken, + ); + + await this.saveMessages( + messagesResponse, + dataSourceMetadata, + workspaceDataSource, + ); + + return messages; + } + + async getGmailClient(refreshToken) { + const gmailClientId = this.environmentService.getAuthGoogleClientId(); + + const gmailClientSecret = + this.environmentService.getAuthGoogleClientSecret(); + + const oAuth2Client = new google.auth.OAuth2( + gmailClientId, + gmailClientSecret, + ); + + oAuth2Client.setCredentials({ + refresh_token: refreshToken, + }); + + return google.gmail({ + version: 'v1', + auth: oAuth2Client, + }); + } + + async saveMessageThreads( + threads, + dataSourceMetadata, + workspaceDataSource, + connectedAccountId, + ) { + const messageChannel = await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1`, + [connectedAccountId], + ); + + for (const thread of threads) { + await workspaceDataSource?.query( + `INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("externalId", "subject", "messageChannelId", "visibility") VALUES ($1, $2, $3, $4)`, + [thread.id, thread.snippet, messageChannel[0].id, 'default'], + ); + } + } + + async saveMessages(messages, dataSourceMetadata, workspaceDataSource) { + for (const message of messages) { + const { + externalId, + headerMessageId, + subject, + messageThreadId, + from, + body, + } = message; + + const messageThread = await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."messageThread" WHERE "externalId" = $1`, + [messageThreadId], + ); + + await workspaceDataSource?.query( + `INSERT INTO ${dataSourceMetadata.schema}."message" ("externalId", "headerMessageId", "subject", "messageThreadId", "direction", "body") VALUES ($1, $2, $3, $4, $5, $6)`, + [ + externalId, + headerMessageId, + subject, + messageThread[0]?.id, + 'incoming', + body, + ], + ); + } + } +} diff --git a/packages/twenty-server/src/workspace/workspace-sync-metadata/standard-objects/message-channel.object-metadata.ts b/packages/twenty-server/src/workspace/workspace-sync-metadata/standard-objects/message-channel.object-metadata.ts index ae87e1e6d..83b878d07 100644 --- a/packages/twenty-server/src/workspace/workspace-sync-metadata/standard-objects/message-channel.object-metadata.ts +++ b/packages/twenty-server/src/workspace/workspace-sync-metadata/standard-objects/message-channel.object-metadata.ts @@ -25,7 +25,7 @@ import { MessageThreadObjectMetadata } from 'src/workspace/workspace-sync-metada @IsSystem() export class MessageChannelObjectMetadata extends BaseObjectMetadata { @FieldMetadata({ - // This will be a type select later + // This will be a type select later: metadata, subject, share_everything type: FieldMetadataType.TEXT, label: 'Visibility', description: 'Visibility',