3019 timebox add typing and checks in workspacemessagingservices (#3112)

* throw error

* fetchWorkspaceMessages fetches messages and threads

* renaming

* improve typing

* improve typing and error handling

* improve typing and error handling

* improve typing and error handling

* improve fetch-batch

* fix bug

* replace return types

* imporving typing and error handling

* improve typing and error handling

* improve typing and error handling

* improve typing and error handling

* improve typing and error handling

* remove console log
This commit is contained in:
bosiraphael
2023-12-26 18:07:40 +01:00
committed by GitHub
parent 58a62e8d17
commit 526a3d7d9a
6 changed files with 178 additions and 90 deletions

View File

@ -28,10 +28,6 @@ export class FetchWorkspaceMessagesCommand extends CommandRunner {
options.workspaceId, options.workspaceId,
); );
await this.fetchWorkspaceMessagesService.fetchWorkspaceThreads(
options.workspaceId,
);
await this.fetchWorkspaceMessagesService.fetchWorkspaceMessages( await this.fetchWorkspaceMessagesService.fetchWorkspaceMessages(
options.workspaceId, options.workspaceId,
); );

View File

@ -1,8 +1,12 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import axios, { AxiosInstance } from 'axios'; import axios, { AxiosInstance, AxiosResponse } from 'axios';
import { simpleParser } from 'mailparser'; import { simpleParser } from 'mailparser';
import { GmailMessage } from 'src/workspace/messaging/types/gmailMessage';
import { MessageQuery } from 'src/workspace/messaging/types/messageQuery';
import { GmailParsedResponse } from 'src/workspace/messaging/types/gmailParsedResponse';
@Injectable() @Injectable()
export class FetchBatchMessagesService { export class FetchBatchMessagesService {
private readonly httpService: AxiosInstance; private readonly httpService: AxiosInstance;
@ -13,13 +17,16 @@ export class FetchBatchMessagesService {
}); });
} }
async fetchAllByBatches(messageQueries, accessToken: string): Promise<any> { async fetchAllByBatches(
messageQueries: MessageQuery[],
accessToken: string,
): Promise<GmailMessage[]> {
const batchLimit = 100; const batchLimit = 100;
let messages = [];
let batchOffset = 0; let batchOffset = 0;
let messages: GmailMessage[] = [];
while (batchOffset < messageQueries.length) { while (batchOffset < messageQueries.length) {
const batchResponse = await this.fetchBatch( const batchResponse = await this.fetchBatch(
messageQueries, messageQueries,
@ -37,11 +44,11 @@ export class FetchBatchMessagesService {
} }
async fetchBatch( async fetchBatch(
messageQueries, messageQueries: MessageQuery[],
accessToken: string, accessToken: string,
batchOffset: number, batchOffset: number,
batchLimit: number, batchLimit: number,
): Promise<any> { ): Promise<GmailMessage[]> {
const limitedMessageQueries = messageQueries.slice( const limitedMessageQueries = messageQueries.slice(
batchOffset, batchOffset,
batchOffset + batchLimit, batchOffset + batchLimit,
@ -63,7 +70,7 @@ export class FetchBatchMessagesService {
return formattedResponse; return formattedResponse;
} }
createBatchBody(messageQueries, boundary: string): string { createBatchBody(messageQueries: MessageQuery[], boundary: string): string {
let batchBody: string[] = []; let batchBody: string[] = [];
messageQueries.forEach(function (call) { messageQueries.forEach(function (call) {
@ -87,81 +94,105 @@ export class FetchBatchMessagesService {
return batchBody.concat(['--', boundary, '--']).join(''); return batchBody.concat(['--', boundary, '--']).join('');
} }
parseBatch(responseCollection) { parseBatch(
const items: any = []; responseCollection: AxiosResponse<any, any>,
): GmailParsedResponse[] {
const responseItems: GmailParsedResponse[] = [];
const boundary = this.getBatchSeparator(responseCollection); const boundary = this.getBatchSeparator(responseCollection);
const responseLines = responseCollection.data.split('--' + boundary); const responseLines: string[] = responseCollection.data.split(
'--' + boundary,
);
responseLines.forEach(function (response) { responseLines.forEach(function (response) {
const startJson = response.indexOf('{'); const startJson = response.indexOf('{');
const endJson = response.lastIndexOf('}'); const endJson = response.lastIndexOf('}');
if (startJson < 0 || endJson < 0) { if (startJson < 0 || endJson < 0) return;
return;
}
const responseJson = response.substr(startJson, endJson - startJson + 1); const responseJson = response.substring(startJson, endJson + 1);
const item = JSON.parse(responseJson); const item = JSON.parse(responseJson);
items.push(item); responseItems.push(item);
}); });
return items; return responseItems;
} }
getBatchSeparator(response) { getBatchSeparator(response: AxiosResponse<any, any>): string {
const headers = response.headers; const headers = response.headers;
if (!headers['content-type']) return ''; const contentType: string = headers['content-type'];
const components = headers['content-type'].split('; '); if (!contentType) return '';
const boundary = components.find((o) => o.startsWith('boundary=')); const components = contentType.split('; ');
return boundary.replace('boundary=', '').trim('; '); const boundary = components.find((item) => item.startsWith('boundary='));
return boundary?.replace('boundary=', '').trim() || '';
} }
async formatBatchResponse(response) { async formatBatchResponse(
const parsedResponse = this.parseBatch(response); response: AxiosResponse<any, any>,
): Promise<GmailMessage[]> {
const parsedResponses = this.parseBatch(response);
const formattedResponse = Promise.all(
parsedResponses.map(async (item) => {
if (item.error) {
console.log('Error', item.error);
return;
}
return Promise.all(
parsedResponse.map(async (item) => {
const { id, threadId, internalDate, raw } = item; const { id, threadId, internalDate, raw } = item;
const message = atob(raw?.replace(/-/g, '+').replace(/_/g, '/')); const message = atob(raw?.replace(/-/g, '+').replace(/_/g, '/'));
const parsed = await simpleParser(message); try {
const parsed = await simpleParser(message);
const { const {
subject, subject,
messageId, messageId,
from, from,
to, to,
cc, cc,
bcc, bcc,
text, text,
html, html,
attachments, attachments,
} = parsed; } = parsed;
return { const messageFromGmail: GmailMessage = {
externalId: id, externalId: id,
headerMessageId: messageId, headerMessageId: messageId || '',
subject: subject, subject: subject || '',
messageThreadId: threadId, messageThreadId: threadId,
internalDate, internalDate,
from, from,
to, to,
cc, cc,
bcc, bcc,
text, text: text || '',
html, html: html || '',
attachments, attachments,
}; };
return messageFromGmail;
} catch (error) {
console.log('Error', error);
}
}), }),
); );
const filteredResponse = (await formattedResponse).filter(
(item) => item,
) as GmailMessage[];
return filteredResponse;
} }
} }

View File

@ -1,12 +1,16 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { google } from 'googleapis'; import { gmail_v1, google } from 'googleapis';
import { v4 } from 'uuid'; import { v4 } from 'uuid';
import { DataSource } from 'typeorm';
import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { EnvironmentService } from 'src/integrations/environment/environment.service'; import { EnvironmentService } from 'src/integrations/environment/environment.service';
import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service'; import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service';
import { GmailMessage } from 'src/workspace/messaging/types/gmailMessage';
import { MessageQuery } from 'src/workspace/messaging/types/messageQuery';
import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity';
@Injectable() @Injectable()
export class FetchWorkspaceMessagesService { export class FetchWorkspaceMessagesService {
@ -17,15 +21,12 @@ export class FetchWorkspaceMessagesService {
private readonly fetchBatchMessagesService: FetchBatchMessagesService, private readonly fetchBatchMessagesService: FetchBatchMessagesService,
) {} ) {}
async fetchWorkspaceThreads(workspaceId: string): Promise<any> { async fetchWorkspaceMessages(workspaceId: string): Promise<void> {
return await this.fetchWorkspaceMemberThreads( await this.fetchWorkspaceMemberThreads(
workspaceId, workspaceId,
'20202020-0687-4c41-b707-ed1bfca972a7', '20202020-0687-4c41-b707-ed1bfca972a7',
); );
} await this.fetchWorkspaceMemberMessages(
async fetchWorkspaceMessages(workspaceId: string): Promise<any> {
return await this.fetchWorkspaceMemberMessages(
workspaceId, workspaceId,
'20202020-0687-4c41-b707-ed1bfca972a7', '20202020-0687-4c41-b707-ed1bfca972a7',
); );
@ -35,7 +36,7 @@ export class FetchWorkspaceMessagesService {
workspaceId: string, workspaceId: string,
workspaceMemberId: string, workspaceMemberId: string,
maxResults = 500, maxResults = 500,
): Promise<any> { ): Promise<void> {
const dataSourceMetadata = const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId, workspaceId,
@ -45,16 +46,28 @@ export class FetchWorkspaceMessagesService {
dataSourceMetadata, dataSourceMetadata,
); );
const connectedAccount = await workspaceDataSource?.query( if (!workspaceDataSource) {
throw new Error('No workspace data source found');
}
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`, `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`,
[workspaceMemberId], [workspaceMemberId],
); );
const refreshToken = connectedAccount[0].refreshToken; if (!connectedAccounts || connectedAccounts.length === 0) {
throw new Error('No connected account found');
}
const gmail = await this.getGmailClient(refreshToken); const refreshToken = connectedAccounts[0]?.refreshToken;
const threads = await gmail.users.threads.list({ if (!refreshToken) {
throw new Error('No refresh token found');
}
const gmailClient = await this.getGmailClient(refreshToken);
const threads = await gmailClient.users.threads.list({
userId: 'me', userId: 'me',
maxResults, maxResults,
}); });
@ -69,17 +82,15 @@ export class FetchWorkspaceMessagesService {
threadsData, threadsData,
dataSourceMetadata, dataSourceMetadata,
workspaceDataSource, workspaceDataSource,
connectedAccount[0].id, connectedAccounts[0].id,
); );
return threads;
} }
async fetchWorkspaceMemberMessages( async fetchWorkspaceMemberMessages(
workspaceId: string, workspaceId: string,
workspaceMemberId: string, workspaceMemberId: string,
maxResults = 500, maxResults = 500,
): Promise<any> { ): Promise<void> {
const dataSourceMetadata = const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId, workspaceId,
@ -89,28 +100,40 @@ export class FetchWorkspaceMessagesService {
dataSourceMetadata, dataSourceMetadata,
); );
const connectedAccount = await workspaceDataSource?.query( if (!workspaceDataSource) {
throw new Error('No workspace data source found');
}
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`, `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`,
[workspaceMemberId], [workspaceMemberId],
); );
const accessToken = connectedAccount[0].accessToken; if (!connectedAccounts || connectedAccounts.length === 0) {
const refreshToken = connectedAccount[0].refreshToken; throw new Error('No connected account found');
}
const gmail = await this.getGmailClient(refreshToken); const accessToken = connectedAccounts[0]?.accessToken;
const refreshToken = connectedAccounts[0]?.refreshToken;
const messages = await gmail.users.messages.list({ if (!accessToken || !refreshToken) {
throw new Error('No access token or refresh token found');
}
const gmailClient = await this.getGmailClient(refreshToken);
const messages = await gmailClient.users.messages.list({
userId: 'me', userId: 'me',
maxResults, maxResults,
}); });
const messagesData = messages.data.messages; const messagesData = messages.data.messages;
if (!messagesData) { if (!messagesData || messagesData?.length === 0) {
return; return;
} }
const messageQueries = messagesData.map((message) => ({ const messageQueries: MessageQuery[] = messagesData.map((message) => ({
uri: '/gmail/v1/users/me/messages/' + message.id + '?format=RAW', uri: '/gmail/v1/users/me/messages/' + message.id + '?format=RAW',
})); }));
@ -126,11 +149,9 @@ export class FetchWorkspaceMessagesService {
workspaceDataSource, workspaceDataSource,
workspaceMemberId, workspaceMemberId,
); );
return messages;
} }
async getGmailClient(refreshToken) { async getGmailClient(refreshToken: string): Promise<gmail_v1.Gmail> {
const gmailClientId = this.environmentService.getAuthGoogleClientId(); const gmailClientId = this.environmentService.getAuthGoogleClientId();
const gmailClientSecret = const gmailClientSecret =
@ -145,23 +166,29 @@ export class FetchWorkspaceMessagesService {
refresh_token: refreshToken, refresh_token: refreshToken,
}); });
return google.gmail({ const gmailClient = google.gmail({
version: 'v1', version: 'v1',
auth: oAuth2Client, auth: oAuth2Client,
}); });
return gmailClient;
} }
async saveMessageThreads( async saveMessageThreads(
threads, threads: gmail_v1.Schema$Thread[],
dataSourceMetadata, dataSourceMetadata: DataSourceEntity,
workspaceDataSource, workspaceDataSource: DataSource,
connectedAccountId, connectedAccountId: string,
) { ) {
const messageChannel = await workspaceDataSource?.query( const messageChannel = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1`, `SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1`,
[connectedAccountId], [connectedAccountId],
); );
if (!messageChannel.length) {
throw new Error('No message channel found for this connected account');
}
for (const thread of threads) { for (const thread of threads) {
await workspaceDataSource?.query( await workspaceDataSource?.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("externalId", "subject", "messageChannelId", "visibility") VALUES ($1, $2, $3, $4)`, `INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("externalId", "subject", "messageChannelId", "visibility") VALUES ($1, $2, $3, $4)`,
@ -171,10 +198,10 @@ export class FetchWorkspaceMessagesService {
} }
async saveMessages( async saveMessages(
messages, messages: GmailMessage[],
dataSourceMetadata, dataSourceMetadata: DataSourceEntity,
workspaceDataSource, workspaceDataSource: DataSource,
workspaceMemberId, workspaceMemberId: string,
) { ) {
for (const message of messages) { for (const message of messages) {
const { const {

View File

@ -0,0 +1,16 @@
import { AddressObject, Attachment } from 'mailparser';
export type GmailMessage = {
externalId: string;
headerMessageId: string;
subject: string;
messageThreadId: string;
internalDate: string;
from: AddressObject | undefined;
to: AddressObject | AddressObject[] | undefined;
cc: AddressObject | AddressObject[] | undefined;
bcc: AddressObject | AddressObject[] | undefined;
text: string;
html: string;
attachments: Attachment[];
};

View File

@ -0,0 +1,15 @@
export type GmailParsedResponse = {
id: string;
threadId: string;
labelIds: string[];
snippet: string;
sizeEstimate: number;
raw: string;
historyId: string;
internalDate: string;
error?: {
code: number;
message: string;
status: string;
};
};

View File

@ -0,0 +1,3 @@
export type MessageQuery = {
uri: string;
};