2880 timebox create a poc to fetch emails from the gmail api (#2993)

* create empty service

* getting threads is working

* insert message channel

* save threads in the db

* clean

* fetch messages

* create a service to fetch a batch of messages

* batch messages

* use httpService instead

* parse batch

* base 64 decoding working

* solve parsing bug

* saving messages is working

* bug to fix in fetchAllByBatches

* fetching all messages is working but not saving yet

* fecth 500 messages and threads is working

* remove unused package and console log

* set direction to incoming

* fix bug after merging main
This commit is contained in:
bosiraphael
2023-12-15 16:35:56 +01:00
committed by GitHub
parent ac3c517c82
commit f95c56b1cb
7 changed files with 412 additions and 7 deletions

View File

@ -1,5 +1,7 @@
import { Injectable } from '@nestjs/common';
import { v4 } from 'uuid';
import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { SaveConnectedAccountInput } from 'src/core/auth/dto/save-connected-account';
@ -43,10 +45,26 @@ export class GoogleGmailService {
return;
}
await workspaceDataSource?.query(
`INSERT INTO ${dataSourceMetadata.schema}."connectedAccount" ("handle", "provider", "accessToken", "refreshToken", "accountOwnerId") VALUES ($1, $2, $3, $4, $5)`,
[handle, provider, accessToken, refreshToken, workspaceMemberId],
);
const connectedAccountId = v4();
await workspaceDataSource?.transaction(async (manager) => {
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."connectedAccount" ("id", "handle", "provider", "accessToken", "refreshToken", "accountOwnerId") VALUES ($1, $2, $3, $4, $5, $6)`,
[
connectedAccountId,
handle,
provider,
accessToken,
refreshToken,
workspaceMemberId,
],
);
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageChannel" ("visibility", "handle", "connectedAccountId", "type") VALUES ($1, $2, $3, $4)`,
['share_everything', handle, connectedAccountId, 'gmail'],
);
});
return;
}

View File

@ -2,9 +2,10 @@ import { Module } from '@nestjs/common';
import { FetchWorkspaceMessagesCommand } from 'src/workspace/messaging/commands/fetch-workspace-messages.command';
import { MessagingModule } from 'src/workspace/messaging/messaging.module';
import { FetchWorkspaceMessagesModule } from 'src/workspace/messaging/services/fetch-workspace-messages.module';
@Module({
imports: [MessagingModule],
imports: [MessagingModule, FetchWorkspaceMessagesModule],
providers: [FetchWorkspaceMessagesCommand],
})
export class FetchWorkspaceMessagesCommandsModule {}

View File

@ -1,5 +1,6 @@
import { Command, CommandRunner, Option } from 'nest-commander';
import { FetchWorkspaceMessagesService } from 'src/workspace/messaging/services/fetch-workspace-messages.service';
import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer';
interface FetchWorkspaceMessagesOptions {
@ -11,7 +12,10 @@ interface FetchWorkspaceMessagesOptions {
description: 'Fetch messages of all workspaceMembers in a workspace.',
})
export class FetchWorkspaceMessagesCommand extends CommandRunner {
constructor(private readonly messagingProducer: MessagingProducer) {
constructor(
private readonly fetchWorkspaceMessagesService: FetchWorkspaceMessagesService,
private readonly messagingProducer: MessagingProducer,
) {
super();
}
@ -24,6 +28,14 @@ export class FetchWorkspaceMessagesCommand extends CommandRunner {
options.workspaceId,
);
await this.fetchWorkspaceMessagesService.fetchWorkspaceThreads(
options.workspaceId,
);
await this.fetchWorkspaceMessagesService.fetchWorkspaceMessages(
options.workspaceId,
);
return;
}

View File

@ -0,0 +1,160 @@
import { Injectable } from '@nestjs/common';
import axios, { AxiosInstance } from 'axios';
@Injectable()
export class FetchBatchMessagesService {
private readonly httpService: AxiosInstance;
constructor() {
this.httpService = axios.create({
baseURL: 'https://www.googleapis.com/batch/gmail/v1',
});
}
async fetchAllByBatches(messageQueries, accessToken: string): Promise<any> {
const batchLimit = 100;
let messages = [];
let batchOffset = 0;
while (batchOffset < messageQueries.length) {
const batchResponse = await this.fetchBatch(
messageQueries,
accessToken,
batchOffset,
batchLimit,
);
messages = messages.concat(batchResponse);
batchOffset += batchLimit;
}
return messages;
}
async fetchBatch(
messageQueries,
accessToken: string,
batchOffset: number,
batchLimit: number,
): Promise<any> {
const limitedMessageQueries = messageQueries.slice(
batchOffset,
batchOffset + batchLimit,
);
const response = await this.httpService.post(
'/',
this.createBatchBody(limitedMessageQueries, 'batch_gmail_messages'),
{
headers: {
'Content-Type': 'multipart/mixed; boundary=batch_gmail_messages',
Authorization: 'Bearer ' + accessToken,
},
},
);
return this.formatBatchResponse(response);
}
createBatchBody(messageQueries, boundary: string): string {
let batchBody: string[] = [];
messageQueries.forEach(function (call) {
const method = 'GET';
const uri = call.uri;
batchBody = batchBody.concat([
'--',
boundary,
'\r\n',
'Content-Type: application/http',
'\r\n\r\n',
method,
' ',
uri,
'\r\n\r\n',
]);
});
return batchBody.concat(['--', boundary, '--']).join('');
}
parseBatch(responseCollection) {
const items: any = [];
const boundary = this.getBatchSeparator(responseCollection);
const responseLines = responseCollection.data.split('--' + boundary);
responseLines.forEach(function (response) {
const startJson = response.indexOf('{');
const endJson = response.lastIndexOf('}');
if (startJson < 0 || endJson < 0) {
return;
}
const responseJson = response.substr(startJson, endJson - startJson + 1);
const item = JSON.parse(responseJson);
items.push(item);
});
return items;
}
getBatchSeparator(response) {
const headers = response.headers;
if (!headers['content-type']) return '';
const components = headers['content-type'].split('; ');
const boundary = components.find((o) => o.startsWith('boundary='));
return boundary.replace('boundary=', '').trim('; ');
}
formatBatchResponse(response) {
const parsedResponse = this.parseBatch(response);
return parsedResponse
.map((item) => {
const { id, threadId, payload } = item;
const headers = payload?.headers;
const parts = payload?.parts;
if (!parts) {
return;
}
const bodyBase64 = parts[0]?.body?.data;
if (!bodyBase64) {
return;
}
const body = atob(bodyBase64.replace(/-/g, '+').replace(/_/g, '/'));
return {
externalId: id,
headerMessageId: headers?.find(
(header) => header.name === 'Message-ID',
)?.value,
subject: headers?.find((header) => header.name === 'Subject')?.value,
messageThreadId: threadId,
from: headers?.find((header) => header.name === 'From')?.value,
body,
};
})
.filter((item) => item);
}
}

View File

@ -0,0 +1,14 @@
import { Module } from '@nestjs/common';
import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { EnvironmentModule } from 'src/integrations/environment/environment.module';
import { DataSourceModule } from 'src/metadata/data-source/data-source.module';
import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service';
import { FetchWorkspaceMessagesService } from 'src/workspace/messaging/services/fetch-workspace-messages.service';
@Module({
imports: [TypeORMModule, DataSourceModule, EnvironmentModule],
providers: [FetchWorkspaceMessagesService, FetchBatchMessagesService],
exports: [FetchWorkspaceMessagesService],
})
export class FetchWorkspaceMessagesModule {}

View File

@ -0,0 +1,200 @@
import { Injectable } from '@nestjs/common';
import { google } from 'googleapis';
import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { EnvironmentService } from 'src/integrations/environment/environment.service';
import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service';
@Injectable()
export class FetchWorkspaceMessagesService {
constructor(
private readonly environmentService: EnvironmentService,
private readonly dataSourceService: DataSourceService,
private readonly typeORMService: TypeORMService,
private readonly fetchBatchMessagesService: FetchBatchMessagesService,
) {}
async fetchWorkspaceThreads(workspaceId: string): Promise<any> {
return await this.fetchWorkspaceMemberThreads(
workspaceId,
'20202020-0687-4c41-b707-ed1bfca972a7',
);
}
async fetchWorkspaceMessages(workspaceId: string): Promise<any> {
return await this.fetchWorkspaceMemberMessages(
workspaceId,
'20202020-0687-4c41-b707-ed1bfca972a7',
);
}
async fetchWorkspaceMemberThreads(
workspaceId: string,
workspaceMemberId: string,
maxResults = 500,
): Promise<any> {
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId,
);
const workspaceDataSource = await this.typeORMService.connectToDataSource(
dataSourceMetadata,
);
const connectedAccount = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`,
[workspaceMemberId],
);
const refreshToken = connectedAccount[0].refreshToken;
const gmail = await this.getGmailClient(refreshToken);
const threads = await gmail.users.threads.list({
userId: 'me',
maxResults,
});
const threadsData = threads.data.threads;
if (!threadsData) {
return;
}
await this.saveMessageThreads(
threadsData,
dataSourceMetadata,
workspaceDataSource,
connectedAccount[0].id,
);
return threads;
}
async fetchWorkspaceMemberMessages(
workspaceId: string,
workspaceMemberId: string,
maxResults = 500,
): Promise<any> {
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId,
);
const workspaceDataSource = await this.typeORMService.connectToDataSource(
dataSourceMetadata,
);
const connectedAccount = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`,
[workspaceMemberId],
);
const accessToken = connectedAccount[0].accessToken;
const refreshToken = connectedAccount[0].refreshToken;
const gmail = await this.getGmailClient(refreshToken);
const messages = await gmail.users.messages.list({
userId: 'me',
maxResults,
});
const messagesData = messages.data.messages;
if (!messagesData) {
return;
}
const messageQueries = messagesData.map((message) => ({
uri: '/gmail/v1/users/me/messages/' + message.id,
}));
const messagesResponse =
await this.fetchBatchMessagesService.fetchAllByBatches(
messageQueries,
accessToken,
);
await this.saveMessages(
messagesResponse,
dataSourceMetadata,
workspaceDataSource,
);
return messages;
}
async getGmailClient(refreshToken) {
const gmailClientId = this.environmentService.getAuthGoogleClientId();
const gmailClientSecret =
this.environmentService.getAuthGoogleClientSecret();
const oAuth2Client = new google.auth.OAuth2(
gmailClientId,
gmailClientSecret,
);
oAuth2Client.setCredentials({
refresh_token: refreshToken,
});
return google.gmail({
version: 'v1',
auth: oAuth2Client,
});
}
async saveMessageThreads(
threads,
dataSourceMetadata,
workspaceDataSource,
connectedAccountId,
) {
const messageChannel = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1`,
[connectedAccountId],
);
for (const thread of threads) {
await workspaceDataSource?.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("externalId", "subject", "messageChannelId", "visibility") VALUES ($1, $2, $3, $4)`,
[thread.id, thread.snippet, messageChannel[0].id, 'default'],
);
}
}
async saveMessages(messages, dataSourceMetadata, workspaceDataSource) {
for (const message of messages) {
const {
externalId,
headerMessageId,
subject,
messageThreadId,
from,
body,
} = message;
const messageThread = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageThread" WHERE "externalId" = $1`,
[messageThreadId],
);
await workspaceDataSource?.query(
`INSERT INTO ${dataSourceMetadata.schema}."message" ("externalId", "headerMessageId", "subject", "messageThreadId", "direction", "body") VALUES ($1, $2, $3, $4, $5, $6)`,
[
externalId,
headerMessageId,
subject,
messageThread[0]?.id,
'incoming',
body,
],
);
}
}
}

View File

@ -25,7 +25,7 @@ import { MessageThreadObjectMetadata } from 'src/workspace/workspace-sync-metada
@IsSystem()
export class MessageChannelObjectMetadata extends BaseObjectMetadata {
@FieldMetadata({
// This will be a type select later
// This will be a type select later: metadata, subject, share_everything
type: FieldMetadataType.TEXT,
label: 'Visibility',
description: 'Visibility',