3239 create a command to do a partial sync with the gmail api using the historyid (#3405)

* create utils service

* getLastSyncHistoryId

* getHistory

* add historyTypes messageAdded and messageDeleted

* getMessageIdsAndThreadIdsNotInDatabase

* wip

* fix messageThreadId null

* no need to fetch threads anymore

* get messagesAdded in partial sync

* adding errors

* save lastSyncHistoryId

* improve

* renaming

* create partial sync job

* improve partial sync

* adding messages with partial sync is working

* now adding messages with partial sync is working

* deleting messages and empty threads is working

* wip

* wip

* fix bug to delete threads

* update partial sync to cover edge cases

* renaming

* modify ambiguous naming

* renaming
This commit is contained in:
bosiraphael
2024-01-12 17:46:55 +01:00
committed by GitHub
parent 4f306f8955
commit 5a61e34f4c
18 changed files with 705 additions and 296 deletions

View File

@ -5,12 +5,12 @@ import { v4 } from 'uuid';
import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { SaveConnectedAccountInput } from 'src/core/auth/dto/save-connected-account'; import { SaveConnectedAccountInput } from 'src/core/auth/dto/save-connected-account';
import {
FetchAllMessagesFromConnectedAccountJobData,
FetchAllMessagesFromConnectedAccountJob,
} from 'src/workspace/messaging/jobs/fetch-all-messages-from-connected-account.job';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import {
GmailFullSyncJob,
GmailFullSyncJobData,
} from 'src/workspace/messaging/jobs/gmail-full-sync.job';
@Injectable() @Injectable()
export class GoogleGmailService { export class GoogleGmailService {
@ -73,8 +73,8 @@ export class GoogleGmailService {
); );
}); });
await this.messageQueueService.add<FetchAllMessagesFromConnectedAccountJobData>( await this.messageQueueService.add<GmailFullSyncJobData>(
FetchAllMessagesFromConnectedAccountJob.name, GmailFullSyncJob.name,
{ {
workspaceId, workspaceId,
connectedAccountId, connectedAccountId,

View File

@ -2,7 +2,7 @@ import { Module } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core'; import { ModuleRef } from '@nestjs/core';
import { HttpModule } from '@nestjs/axios'; import { HttpModule } from '@nestjs/axios';
import { FetchAllMessagesFromConnectedAccountJob } from 'src/workspace/messaging/jobs/fetch-all-messages-from-connected-account.job'; import { GmailFullSyncJob } from 'src/workspace/messaging/jobs/gmail-full-sync.job';
import { CallWebhookJobsJob } from 'src/workspace/workspace-query-runner/jobs/call-webhook-jobs.job'; import { CallWebhookJobsJob } from 'src/workspace/workspace-query-runner/jobs/call-webhook-jobs.job';
import { CallWebhookJob } from 'src/workspace/workspace-query-runner/jobs/call-webhook.job'; import { CallWebhookJob } from 'src/workspace/workspace-query-runner/jobs/call-webhook.job';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
@ -10,6 +10,7 @@ import { ObjectMetadataModule } from 'src/metadata/object-metadata/object-metada
import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; import { DataSourceModule } from 'src/metadata/data-source/data-source.module';
import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { FetchWorkspaceMessagesModule } from 'src/workspace/messaging/services/fetch-workspace-messages.module'; import { FetchWorkspaceMessagesModule } from 'src/workspace/messaging/services/fetch-workspace-messages.module';
import { GmailPartialSyncJob } from 'src/workspace/messaging/jobs/gmail-partial-sync.job';
import { EmailSenderJob } from 'src/integrations/email/email-sender.job'; import { EmailSenderJob } from 'src/integrations/email/email-sender.job';
@Module({ @Module({
@ -23,8 +24,12 @@ import { EmailSenderJob } from 'src/integrations/email/email-sender.job';
], ],
providers: [ providers: [
{ {
provide: FetchAllMessagesFromConnectedAccountJob.name, provide: GmailFullSyncJob.name,
useClass: FetchAllMessagesFromConnectedAccountJob, useClass: GmailFullSyncJob,
},
{
provide: GmailPartialSyncJob.name,
useClass: GmailPartialSyncJob,
}, },
{ {
provide: CallWebhookJobsJob.name, provide: CallWebhookJobsJob.name,

View File

@ -4,8 +4,10 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity';
import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; import { DataSourceModule } from 'src/metadata/data-source/data-source.module';
import { FetchWorkspaceMessagesCommand } from 'src/workspace/messaging/commands/fetch-workspace-messages.command'; import { GmailFullSyncCommand } from 'src/workspace/messaging/commands/gmail-full-sync.command';
import { GmailPartialSyncCommand } from 'src/workspace/messaging/commands/gmail-partial-sync.command';
import { MessagingModule } from 'src/workspace/messaging/messaging.module'; import { MessagingModule } from 'src/workspace/messaging/messaging.module';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
@Module({ @Module({
imports: [ imports: [
@ -14,6 +16,10 @@ import { MessagingModule } from 'src/workspace/messaging/messaging.module';
TypeORMModule, TypeORMModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
], ],
providers: [FetchWorkspaceMessagesCommand], providers: [
GmailFullSyncCommand,
GmailPartialSyncCommand,
MessagingUtilsService,
],
}) })
export class FetchWorkspaceMessagesCommandsModule {} export class FetchWorkspaceMessagesCommandsModule {}

View File

@ -4,23 +4,21 @@ import { Command, CommandRunner, Option } from 'nest-commander';
import { Repository } from 'typeorm'; import { Repository } from 'typeorm';
import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity';
import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer'; import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
interface FetchWorkspaceMessagesOptions { interface GmailFullSyncOptions {
workspaceId: string; workspaceId: string;
} }
@Command({ @Command({
name: 'workspace:fetch-messages', name: 'workspace:gmail-full-sync',
description: 'Fetch messages of all workspaceMembers in a workspace.', description: 'Fetch messages of all workspaceMembers in a workspace.',
}) })
export class FetchWorkspaceMessagesCommand extends CommandRunner { export class GmailFullSyncCommand extends CommandRunner {
constructor( constructor(
private readonly dataSourceService: DataSourceService,
private readonly typeORMService: TypeORMService,
private readonly messagingProducer: MessagingProducer, private readonly messagingProducer: MessagingProducer,
private readonly utils: MessagingUtilsService,
@InjectRepository(FeatureFlagEntity, 'core') @InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>, private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
@ -30,7 +28,7 @@ export class FetchWorkspaceMessagesCommand extends CommandRunner {
async run( async run(
_passedParam: string[], _passedParam: string[],
options: FetchWorkspaceMessagesOptions, options: GmailFullSyncOptions,
): Promise<void> { ): Promise<void> {
const isMessagingEnabled = await this.featureFlagRepository.findOneBy({ const isMessagingEnabled = await this.featureFlagRepository.findOneBy({
workspaceId: options.workspaceId, workspaceId: options.workspaceId,
@ -57,28 +55,11 @@ export class FetchWorkspaceMessagesCommand extends CommandRunner {
} }
private async fetchWorkspaceMessages(workspaceId: string): Promise<void> { private async fetchWorkspaceMessages(workspaceId: string): Promise<void> {
const dataSourceMetadata = const connectedAccounts =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( await this.utils.getConnectedAccountsFromWorkspaceId(workspaceId);
workspaceId,
);
const workspaceDataSource =
await this.typeORMService.connectToDataSource(dataSourceMetadata);
if (!workspaceDataSource) {
throw new Error('No workspace data source found');
}
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail'`,
);
if (!connectedAccounts || connectedAccounts.length === 0) {
throw new Error('No connected account found');
}
for (const connectedAccount of connectedAccounts) { for (const connectedAccount of connectedAccounts) {
await this.messagingProducer.enqueueFetchAllMessagesFromConnectedAccount( await this.messagingProducer.enqueueGmailFullSync(
{ workspaceId, connectedAccountId: connectedAccount.id }, { workspaceId, connectedAccountId: connectedAccount.id },
`${workspaceId}-${connectedAccount.id}`, `${workspaceId}-${connectedAccount.id}`,
); );

View File

@ -0,0 +1,68 @@
import { InjectRepository } from '@nestjs/typeorm';
import { Command, CommandRunner, Option } from 'nest-commander';
import { Repository } from 'typeorm';
import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity';
import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
interface GmailPartialSyncOptions {
workspaceId: string;
}
@Command({
name: 'workspace:gmail-partial-sync',
description: 'Fetch messages of all workspaceMembers in a workspace.',
})
export class GmailPartialSyncCommand extends CommandRunner {
constructor(
private readonly messagingProducer: MessagingProducer,
private readonly utils: MessagingUtilsService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
) {
super();
}
async run(
_passedParam: string[],
options: GmailPartialSyncOptions,
): Promise<void> {
const isMessagingEnabled = await this.featureFlagRepository.findOneBy({
workspaceId: options.workspaceId,
key: 'IS_MESSAGING_ENABLED',
value: true,
});
if (!isMessagingEnabled) {
throw new Error('Messaging is not enabled for this workspace');
}
await this.fetchWorkspaceMessages(options.workspaceId);
return;
}
@Option({
flags: '-w, --workspace-id [workspace_id]',
description: 'workspace id',
required: true,
})
parseWorkspaceId(value: string): string {
return value;
}
private async fetchWorkspaceMessages(workspaceId: string): Promise<void> {
const connectedAccounts =
await this.utils.getConnectedAccountsFromWorkspaceId(workspaceId);
for (const connectedAccount of connectedAccounts) {
await this.messagingProducer.enqueueGmailPartialSync(
{ workspaceId, connectedAccountId: connectedAccount.id },
`${workspaceId}-${connectedAccount.id}`,
);
}
}
}

View File

@ -3,33 +3,29 @@ import { Injectable } from '@nestjs/common';
import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { EnvironmentService } from 'src/integrations/environment/environment.service'; import { EnvironmentService } from 'src/integrations/environment/environment.service';
import { RefreshAccessTokenService } from 'src/workspace/messaging/services/refresh-access-token.service'; import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service';
import { FetchWorkspaceMessagesService } from 'src/workspace/messaging/services/fetch-workspace-messages.service'; import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service';
export type FetchAllMessagesFromConnectedAccountJobData = { export type GmailFullSyncJobData = {
workspaceId: string; workspaceId: string;
connectedAccountId: string; connectedAccountId: string;
}; };
@Injectable() @Injectable()
export class FetchAllMessagesFromConnectedAccountJob export class GmailFullSyncJob implements MessageQueueJob<GmailFullSyncJobData> {
implements MessageQueueJob<FetchAllMessagesFromConnectedAccountJobData>
{
constructor( constructor(
private readonly environmentService: EnvironmentService, private readonly environmentService: EnvironmentService,
private readonly refreshAccessTokenService: RefreshAccessTokenService, private readonly gmailRefreshAccessTokenService: GmailRefreshAccessTokenService,
private readonly fetchWorkspaceMessagesService: FetchWorkspaceMessagesService, private readonly fetchWorkspaceMessagesService: GmailFullSyncService,
) {} ) {}
async handle( async handle(data: GmailFullSyncJobData): Promise<void> {
data: FetchAllMessagesFromConnectedAccountJobData,
): Promise<void> {
console.log( console.log(
`fetching messages for workspace ${data.workspaceId} and account ${ `fetching messages for workspace ${data.workspaceId} and account ${
data.connectedAccountId data.connectedAccountId
} with ${this.environmentService.getMessageQueueDriverType()}`, } with ${this.environmentService.getMessageQueueDriverType()}`,
); );
await this.refreshAccessTokenService.refreshAndSaveAccessToken( await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken(
data.workspaceId, data.workspaceId,
data.connectedAccountId, data.connectedAccountId,
); );

View File

@ -0,0 +1,40 @@
import { Injectable } from '@nestjs/common';
import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { EnvironmentService } from 'src/integrations/environment/environment.service';
import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service';
import { GmailPartialSyncService } from 'src/workspace/messaging/services/gmail-partial-sync.service';
export type GmailPartialSyncJobData = {
workspaceId: string;
connectedAccountId: string;
};
@Injectable()
export class GmailPartialSyncJob
implements MessageQueueJob<GmailPartialSyncJobData>
{
constructor(
private readonly environmentService: EnvironmentService,
private readonly gmailRefreshAccessTokenService: GmailRefreshAccessTokenService,
private readonly gmailPartialSyncService: GmailPartialSyncService,
) {}
async handle(data: GmailPartialSyncJobData): Promise<void> {
console.log(
`fetching messages for workspace ${data.workspaceId} and account ${
data.connectedAccountId
} with ${this.environmentService.getMessageQueueDriverType()}`,
);
await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken(
data.workspaceId,
data.connectedAccountId,
);
await this.gmailPartialSyncService.fetchConnectedAccountThreads(
data.workspaceId,
data.connectedAccountId,
);
}
}

View File

@ -3,9 +3,13 @@ import { Inject, Injectable } from '@nestjs/common';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { import {
FetchAllMessagesFromConnectedAccountJob, GmailFullSyncJob,
FetchAllMessagesFromConnectedAccountJobData, GmailFullSyncJobData,
} from 'src/workspace/messaging/jobs/fetch-all-messages-from-connected-account.job'; } from 'src/workspace/messaging/jobs/gmail-full-sync.job';
import {
GmailPartialSyncJob,
GmailPartialSyncJobData,
} from 'src/workspace/messaging/jobs/gmail-partial-sync.job';
@Injectable() @Injectable()
export class MessagingProducer { export class MessagingProducer {
@ -14,12 +18,23 @@ export class MessagingProducer {
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
) {} ) {}
async enqueueFetchAllMessagesFromConnectedAccount( async enqueueGmailFullSync(data: GmailFullSyncJobData, singletonKey: string) {
data: FetchAllMessagesFromConnectedAccountJobData, await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
data,
{
id: singletonKey,
retryLimit: 2,
},
);
}
async enqueueGmailPartialSync(
data: GmailPartialSyncJobData,
singletonKey: string, singletonKey: string,
) { ) {
await this.messageQueueService.add<FetchAllMessagesFromConnectedAccountJobData>( await this.messageQueueService.add<GmailPartialSyncJobData>(
FetchAllMessagesFromConnectedAccountJob.name, GmailPartialSyncJob.name,
data, data,
{ {
id: singletonKey, id: singletonKey,

View File

@ -7,16 +7,12 @@ import {
GmailMessage, GmailMessage,
Recipient, Recipient,
} from 'src/workspace/messaging/types/gmailMessage'; } from 'src/workspace/messaging/types/gmailMessage';
import { MessageOrThreadQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; import { MessageQuery } from 'src/workspace/messaging/types/messageOrThreadQuery';
import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmailMessageParsedResponse'; import { GmailMessageParsedResponse } from 'src/workspace/messaging/types/gmailMessageParsedResponse';
import { GmailThreadParsedResponse } from 'src/workspace/messaging/types/gmailThreadParsedResponse';
import { GmailThread } from 'src/workspace/messaging/types/gmailThread';
import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider';
@Injectable() @Injectable()
export class FetchBatchMessagesService { export class FetchMessagesByBatchesService {
private readonly httpService: AxiosInstance; private readonly httpService: AxiosInstance;
private readonly gmailClientProvider: GmailClientProvider;
constructor() { constructor() {
this.httpService = axios.create({ this.httpService = axios.create({
@ -25,39 +21,20 @@ export class FetchBatchMessagesService {
} }
async fetchAllMessages( async fetchAllMessages(
queries: MessageOrThreadQuery[], queries: MessageQuery[],
accessToken: string, accessToken: string,
): Promise<GmailMessage[]> { ): Promise<{ messages: GmailMessage[]; errors: any[] }> {
const batchResponses = await this.fetchAllByBatches( const batchResponses = await this.fetchAllByBatches(
queries, queries,
accessToken, accessToken,
'batch_gmail_messages', 'batch_gmail_messages',
); );
const messages = return this.formatBatchResponsesAsGmailMessages(batchResponses);
await this.formatBatchResponsesAsGmailMessages(batchResponses);
return messages;
}
async fetchAllThreads(
queries: MessageOrThreadQuery[],
accessToken: string,
): Promise<GmailThread[]> {
const batchResponses = await this.fetchAllByBatches(
queries,
accessToken,
'batch_gmail_threads',
);
const threads =
await this.formatBatchResponsesAsGmailThreads(batchResponses);
return threads;
} }
async fetchAllByBatches( async fetchAllByBatches(
queries: MessageOrThreadQuery[], queries: MessageQuery[],
accessToken: string, accessToken: string,
boundary: string, boundary: string,
): Promise<AxiosResponse<any, any>[]> { ): Promise<AxiosResponse<any, any>[]> {
@ -85,7 +62,7 @@ export class FetchBatchMessagesService {
} }
async fetchBatch( async fetchBatch(
queries: MessageOrThreadQuery[], queries: MessageQuery[],
accessToken: string, accessToken: string,
batchOffset: number, batchOffset: number,
batchLimit: number, batchLimit: number,
@ -107,13 +84,10 @@ export class FetchBatchMessagesService {
return response; return response;
} }
createBatchBody( createBatchBody(queries: MessageQuery[], boundary: string): string {
messageQueries: MessageOrThreadQuery[],
boundary: string,
): string {
let batchBody: string[] = []; let batchBody: string[] = [];
messageQueries.forEach(function (call) { queries.forEach(function (call) {
const method = 'GET'; const method = 'GET';
const uri = call.uri; const uri = call.uri;
@ -136,10 +110,8 @@ export class FetchBatchMessagesService {
parseBatch( parseBatch(
responseCollection: AxiosResponse<any, any>, responseCollection: AxiosResponse<any, any>,
): GmailMessageParsedResponse[] | GmailThreadParsedResponse[] { ): GmailMessageParsedResponse[] {
const responseItems: const responseItems: GmailMessageParsedResponse[] = [];
| GmailMessageParsedResponse[]
| GmailThreadParsedResponse[] = [];
const boundary = this.getBatchSeparator(responseCollection); const boundary = this.getBatchSeparator(responseCollection);
@ -179,20 +151,24 @@ export class FetchBatchMessagesService {
async formatBatchResponseAsGmailMessage( async formatBatchResponseAsGmailMessage(
responseCollection: AxiosResponse<any, any>, responseCollection: AxiosResponse<any, any>,
): Promise<GmailMessage[]> { ): Promise<{ messages: GmailMessage[]; errors: any[] }> {
const parsedResponses = this.parseBatch( const parsedResponses = this.parseBatch(
responseCollection, responseCollection,
) as GmailMessageParsedResponse[]; ) as GmailMessageParsedResponse[];
const errors: any = [];
const formattedResponse = Promise.all( const formattedResponse = Promise.all(
parsedResponses.map(async (message: GmailMessageParsedResponse) => { parsedResponses.map(async (message: GmailMessageParsedResponse) => {
if (message.error) { if (message.error) {
console.log('Error', message.error); console.log('Error', message.error);
errors.push(message.error);
return; return;
} }
const { id, threadId, internalDate, raw } = message; const { historyId, id, threadId, internalDate, raw } = message;
const body = atob(raw?.replace(/-/g, '+').replace(/_/g, '/')); const body = atob(raw?.replace(/-/g, '+').replace(/_/g, '/'));
@ -222,10 +198,11 @@ export class FetchBatchMessagesService {
]; ];
const messageFromGmail: GmailMessage = { const messageFromGmail: GmailMessage = {
historyId,
externalId: id, externalId: id,
headerMessageId: messageId || '', headerMessageId: messageId || '',
subject: subject || '', subject: subject || '',
messageThreadId: threadId, messageThreadExternalId: threadId,
internalDate, internalDate,
fromHandle: from.value[0].address || '', fromHandle: from.value[0].address || '',
fromDisplayName: from.value[0].name || '', fromDisplayName: from.value[0].name || '',
@ -238,15 +215,17 @@ export class FetchBatchMessagesService {
return messageFromGmail; return messageFromGmail;
} catch (error) { } catch (error) {
console.log('Error', error); console.log('Error', error);
errors.push(error);
} }
}), }),
); );
const filteredResponse = (await formattedResponse).filter( const filteredMessages = (await formattedResponse).filter(
(message) => message, (message) => message,
) as GmailMessage[]; ) as GmailMessage[];
return filteredResponse; return { messages: filteredMessages, errors };
} }
formatAddressObjectAsArray( formatAddressObjectAsArray(
@ -281,65 +260,17 @@ export class FetchBatchMessagesService {
async formatBatchResponsesAsGmailMessages( async formatBatchResponsesAsGmailMessages(
batchResponses: AxiosResponse<any, any>[], batchResponses: AxiosResponse<any, any>[],
): Promise<GmailMessage[]> { ): Promise<{ messages: GmailMessage[]; errors: any[] }> {
const formattedResponses = await Promise.all( const messagesAndErrors = await Promise.all(
batchResponses.map(async (response) => { batchResponses.map(async (response) => {
const formattedResponse = return this.formatBatchResponseAsGmailMessage(response);
await this.formatBatchResponseAsGmailMessage(response);
return formattedResponse;
}), }),
); );
return formattedResponses.flat(); const messages = messagesAndErrors.map((item) => item.messages).flat();
}
async formatBatchResponseAsGmailThread( const errors = messagesAndErrors.map((item) => item.errors).flat();
responseCollection: AxiosResponse<any, any>,
): Promise<GmailThread[]> {
const parsedResponses = this.parseBatch(
responseCollection,
) as GmailThreadParsedResponse[];
const formattedResponse = Promise.all( return { messages, errors };
parsedResponses.map(async (thread: GmailThreadParsedResponse) => {
if (thread.error) {
console.log('Error', thread.error);
return;
}
try {
const { id, messages } = thread;
return {
id,
messageIds: messages.map((message) => message.id) || [],
};
} catch (error) {
console.log('Error', error);
}
}),
);
const filteredResponse = (await formattedResponse).filter(
(item) => item,
) as GmailThread[];
return filteredResponse;
}
async formatBatchResponsesAsGmailThreads(
batchResponses: AxiosResponse<any, any>[],
): Promise<GmailThread[]> {
const formattedResponses = await Promise.all(
batchResponses.map(async (response) => {
const formattedResponse =
await this.formatBatchResponseAsGmailThread(response);
return formattedResponse;
}),
);
return formattedResponses.flat();
} }
} }

View File

@ -4,10 +4,12 @@ import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { EnvironmentModule } from 'src/integrations/environment/environment.module'; import { EnvironmentModule } from 'src/integrations/environment/environment.module';
import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; import { DataSourceModule } from 'src/metadata/data-source/data-source.module';
import { MessagingModule } from 'src/workspace/messaging/messaging.module'; import { MessagingModule } from 'src/workspace/messaging/messaging.module';
import { MessagingProvidersModule } from 'src/workspace/messaging/providers/messaging-providers.module'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider';
import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service';
import { FetchWorkspaceMessagesService } from 'src/workspace/messaging/services/fetch-workspace-messages.service'; import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service';
import { RefreshAccessTokenService } from 'src/workspace/messaging/services/refresh-access-token.service'; import { GmailPartialSyncService } from 'src/workspace/messaging/services/gmail-partial-sync.service';
import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
@Module({ @Module({
imports: [ imports: [
@ -15,13 +17,20 @@ import { RefreshAccessTokenService } from 'src/workspace/messaging/services/refr
TypeORMModule, TypeORMModule,
DataSourceModule, DataSourceModule,
EnvironmentModule, EnvironmentModule,
MessagingProvidersModule,
], ],
providers: [ providers: [
FetchWorkspaceMessagesService, GmailFullSyncService,
FetchBatchMessagesService, GmailPartialSyncService,
RefreshAccessTokenService, FetchMessagesByBatchesService,
GmailRefreshAccessTokenService,
MessagingUtilsService,
GmailClientProvider,
],
exports: [
GmailPartialSyncService,
GmailFullSyncService,
GmailRefreshAccessTokenService,
MessagingUtilsService,
], ],
exports: [FetchWorkspaceMessagesService, RefreshAccessTokenService],
}) })
export class FetchWorkspaceMessagesModule {} export class FetchWorkspaceMessagesModule {}

View File

@ -0,0 +1,113 @@
import { Injectable } from '@nestjs/common';
import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service';
import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
@Injectable()
export class GmailFullSyncService {
constructor(
private readonly gmailClientProvider: GmailClientProvider,
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
private readonly utils: MessagingUtilsService,
) {}
public async fetchConnectedAccountThreads(
workspaceId: string,
connectedAccountId: string,
maxResults = 500,
): Promise<void> {
const { workspaceDataSource, dataSourceMetadata, connectedAccount } =
await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount(
workspaceId,
connectedAccountId,
);
const accessToken = connectedAccount.accessToken;
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
throw new Error('No refresh token found');
}
const gmailClient =
await this.gmailClientProvider.getGmailClient(refreshToken);
const messages = await gmailClient.users.messages.list({
userId: 'me',
maxResults,
});
const messagesData = messages.data.messages;
const messageExternalIds = messagesData
? messagesData.map((message) => message.id || '')
: [];
if (!messagesData || messagesData?.length === 0) {
return;
}
const { savedMessageIds, savedThreadIds } =
await this.utils.getSavedMessageIdsAndThreadIds(
messageExternalIds,
connectedAccountId,
dataSourceMetadata,
workspaceDataSource,
);
const messageIdsToSave = messageExternalIds.filter(
(messageId) => !savedMessageIds.includes(messageId),
);
const messageQueries =
this.utils.createQueriesFromMessageIds(messageIdsToSave);
const { messages: messagesToSave, errors } =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
);
const threads = this.utils.getThreadsFromMessages(messagesToSave);
const threadsToSave = threads.filter(
(threadId) => !savedThreadIds.includes(threadId.id),
);
await this.utils.saveMessageThreads(
threadsToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount.id,
);
await this.utils.saveMessages(
messagesToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount,
);
if (errors.length) throw new Error('Error fetching messages');
if (messagesToSave.length === 0) {
return;
}
const lastModifiedMessageId = messagesData[0].id;
const historyId = messagesToSave.find(
(message) => message.externalId === lastModifiedMessageId,
)?.historyId;
if (!historyId) throw new Error('No history id found');
await this.utils.saveLastSyncHistoryId(
historyId,
connectedAccount.id,
dataSourceMetadata,
workspaceDataSource,
);
}
}

View File

@ -0,0 +1,235 @@
import { Inject, Injectable } from '@nestjs/common';
import { gmail_v1 } from 'googleapis';
import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service';
import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import {
GmailFullSyncJob,
GmailFullSyncJobData,
} from 'src/workspace/messaging/jobs/gmail-full-sync.job';
@Injectable()
export class GmailPartialSyncService {
constructor(
private readonly gmailClientProvider: GmailClientProvider,
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
private readonly utils: MessagingUtilsService,
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {}
private async getHistory(
workspaceId: string,
connectedAccountId: string,
lastSyncHistoryId: string,
maxResults: number,
) {
const { connectedAccount } =
await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount(
workspaceId,
connectedAccountId,
);
const gmailClient = await this.gmailClientProvider.getGmailClient(
connectedAccount.refreshToken,
);
const history = await gmailClient.users.history.list({
userId: 'me',
startHistoryId: lastSyncHistoryId,
historyTypes: ['messageAdded', 'messageDeleted'],
maxResults,
});
return history.data;
}
public async fetchConnectedAccountThreads(
workspaceId: string,
connectedAccountId: string,
maxResults = 500,
): Promise<void> {
const { workspaceDataSource, dataSourceMetadata, connectedAccount } =
await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount(
workspaceId,
connectedAccountId,
);
const lastSyncHistoryId = connectedAccount.lastSyncHistoryId;
if (!lastSyncHistoryId) {
// Fall back to full sync
await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
{ workspaceId, connectedAccountId },
{
id: `${workspaceId}-${connectedAccount.id}`,
retryLimit: 2,
},
);
return;
}
const accessToken = connectedAccount.accessToken;
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
throw new Error('No refresh token found');
}
const history = await this.getHistory(
workspaceId,
connectedAccountId,
lastSyncHistoryId,
maxResults,
);
const historyId = history.historyId;
if (!historyId) {
throw new Error('No history id found');
}
if (historyId === lastSyncHistoryId) {
return;
}
if (!history.history) {
await this.utils.saveLastSyncHistoryId(
historyId,
connectedAccountId,
dataSourceMetadata,
workspaceDataSource,
);
return;
}
const { messagesAdded, messagesDeleted } =
await this.getMessageIdsAndThreadIdsFromHistory(history);
const {
savedMessageIds: messagesAddedAlreadySaved,
savedThreadIds: threadsAddedAlreadySaved,
} = await this.utils.getSavedMessageIdsAndThreadIds(
messagesAdded,
connectedAccountId,
dataSourceMetadata,
workspaceDataSource,
);
const messageExternalIdsToSave = messagesAdded.filter(
(messageId) =>
!messagesAddedAlreadySaved.includes(messageId) &&
!messagesDeleted.includes(messageId),
);
const { savedMessageIds: messagesDeletedAlreadySaved } =
await this.utils.getSavedMessageIdsAndThreadIds(
messagesDeleted,
connectedAccountId,
dataSourceMetadata,
workspaceDataSource,
);
const messageExternalIdsToDelete = messagesDeleted.filter((messageId) =>
messagesDeletedAlreadySaved.includes(messageId),
);
const messageQueries = this.utils.createQueriesFromMessageIds(
messageExternalIdsToSave,
);
const { messages: messagesToSave, errors } =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
);
const threads = this.utils.getThreadsFromMessages(messagesToSave);
const threadsToSave = threads.filter(
(thread) => !threadsAddedAlreadySaved.includes(thread.id),
);
await this.utils.saveMessageThreads(
threadsToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount.id,
);
await this.utils.saveMessages(
messagesToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount,
);
await this.utils.deleteMessages(
messageExternalIdsToDelete,
dataSourceMetadata,
workspaceDataSource,
);
await this.utils.deleteEmptyThreads(
messagesDeleted,
connectedAccountId,
dataSourceMetadata,
workspaceDataSource,
);
if (errors.length) throw new Error('Error fetching messages');
await this.utils.saveLastSyncHistoryId(
historyId,
connectedAccount.id,
dataSourceMetadata,
workspaceDataSource,
);
}
private async getMessageIdsAndThreadIdsFromHistory(
history: gmail_v1.Schema$ListHistoryResponse,
): Promise<{
messagesAdded: string[];
messagesDeleted: string[];
}> {
if (!history.history) throw new Error('No history found');
const { messagesAdded, messagesDeleted } = history.history.reduce(
(
acc: {
messagesAdded: string[];
messagesDeleted: string[];
},
history,
) => {
const messagesAdded = history.messagesAdded?.map(
(messageAdded) => messageAdded.message?.id || '',
);
const messagesDeleted = history.messagesDeleted?.map(
(messageDeleted) => messageDeleted.message?.id || '',
);
if (messagesAdded) acc.messagesAdded.push(...messagesAdded);
if (messagesDeleted) acc.messagesDeleted.push(...messagesDeleted);
return acc;
},
{ messagesAdded: [], messagesDeleted: [] },
);
return {
messagesAdded,
messagesDeleted,
};
}
}

View File

@ -7,7 +7,7 @@ import { EnvironmentService } from 'src/integrations/environment/environment.ser
import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { DataSourceService } from 'src/metadata/data-source/data-source.service';
@Injectable() @Injectable()
export class RefreshAccessTokenService { export class GmailRefreshAccessTokenService {
constructor( constructor(
private readonly environmentService: EnvironmentService, private readonly environmentService: EnvironmentService,
private readonly dataSourceService: DataSourceService, private readonly dataSourceService: DataSourceService,

View File

@ -1,119 +1,48 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { gmail_v1 } from 'googleapis'; import { EntityManager, DataSource } from 'typeorm';
import { v4 } from 'uuid'; import { v4 } from 'uuid';
import { DataSource, EntityManager } from 'typeorm';
import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { FetchBatchMessagesService } from 'src/workspace/messaging/services/fetch-batch-messages.service'; import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity';
import { import {
GmailMessage, GmailMessage,
Recipient, Recipient,
} from 'src/workspace/messaging/types/gmailMessage'; } from 'src/workspace/messaging/types/gmailMessage';
import { MessageOrThreadQuery } from 'src/workspace/messaging/types/messageOrThreadQuery'; import { GmailThread } from 'src/workspace/messaging/types/gmailThread';
import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; import { MessageQuery } from 'src/workspace/messaging/types/messageOrThreadQuery';
import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider';
@Injectable() @Injectable()
export class FetchWorkspaceMessagesService { export class MessagingUtilsService {
constructor( constructor(
private readonly gmailClientProvider: GmailClientProvider,
private readonly dataSourceService: DataSourceService, private readonly dataSourceService: DataSourceService,
private readonly typeORMService: TypeORMService, private readonly typeORMService: TypeORMService,
private readonly fetchBatchMessagesService: FetchBatchMessagesService,
) {} ) {}
public async fetchConnectedAccountThreads( public createQueriesFromMessageIds(
workspaceId: string, messageExternalIds: string[],
connectedAccountId: string, ): MessageQuery[] {
maxResults = 500, return messageExternalIds.map((messageId) => ({
): Promise<void> { uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW',
const { workspaceDataSource, dataSourceMetadata, connectedAccount } =
await this.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount(
workspaceId,
connectedAccountId,
);
const accessToken = connectedAccount.accessToken;
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
throw new Error('No refresh token found');
}
const gmailClient =
await this.gmailClientProvider.getGmailClient(refreshToken);
const threads = await gmailClient.users.threads.list({
userId: 'me',
maxResults,
});
const threadsData = threads.data.threads;
if (!threadsData || threadsData?.length === 0) {
return;
}
const { savedMessageIds, savedThreadIds } =
await this.getAllSavedMessagesIdsAndMessageThreadsIdsForConnectedAccount(
dataSourceMetadata,
workspaceDataSource,
connectedAccount.id,
);
const threadsToSave = threadsData.filter(
(thread) => thread.id && !savedThreadIds.includes(thread.id),
);
await this.saveMessageThreads(
threadsToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount.id,
);
const threadQueries: MessageOrThreadQuery[] = threadsData.map((thread) => ({
uri: '/gmail/v1/users/me/threads/' + thread.id + '?format=minimal',
})); }));
const threadsWithMessageIds =
await this.fetchBatchMessagesService.fetchAllThreads(
threadQueries,
accessToken,
);
const messageIds = threadsWithMessageIds
.map((thread) => thread.messageIds)
.flat();
const messageIdsToSave = messageIds.filter(
(messageId) => !savedMessageIds.includes(messageId),
);
const messageQueries: MessageOrThreadQuery[] = messageIdsToSave.map(
(messageId) => ({
uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW',
}),
);
const messagesResponse =
await this.fetchBatchMessagesService.fetchAllMessages(
messageQueries,
accessToken,
);
await this.saveMessages(
messagesResponse,
dataSourceMetadata,
workspaceDataSource,
connectedAccount,
);
} }
private async saveMessageThreads( public getThreadsFromMessages(messages: GmailMessage[]): GmailThread[] {
threads: gmail_v1.Schema$Thread[], return messages.reduce((acc, message) => {
if (message.externalId === message.messageThreadExternalId) {
acc.push({
id: message.messageThreadExternalId,
subject: message.subject,
});
}
return acc;
}, [] as GmailThread[]);
}
public async saveMessageThreads(
threads: GmailThread[],
dataSourceMetadata: DataSourceEntity, dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource, workspaceDataSource: DataSource,
connectedAccountId: string, connectedAccountId: string,
@ -130,12 +59,12 @@ export class FetchWorkspaceMessagesService {
for (const thread of threads) { for (const thread of threads) {
await workspaceDataSource?.query( await workspaceDataSource?.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("externalId", "subject", "messageChannelId", "visibility") VALUES ($1, $2, $3, $4)`, `INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("externalId", "subject", "messageChannelId", "visibility") VALUES ($1, $2, $3, $4)`,
[thread.id, thread.snippet, messageChannel[0].id, 'default'], [thread.id, thread.subject, messageChannel[0].id, 'default'],
); );
} }
} }
private async saveMessages( public async saveMessages(
messages: GmailMessage[], messages: GmailMessage[],
dataSourceMetadata: DataSourceEntity, dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource, workspaceDataSource: DataSource,
@ -146,7 +75,7 @@ export class FetchWorkspaceMessagesService {
externalId, externalId,
headerMessageId, headerMessageId,
subject, subject,
messageThreadId, messageThreadExternalId,
internalDate, internalDate,
fromHandle, fromHandle,
fromDisplayName, fromDisplayName,
@ -158,7 +87,7 @@ export class FetchWorkspaceMessagesService {
const messageThread = await workspaceDataSource?.query( const messageThread = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageThread" WHERE "externalId" = $1`, `SELECT * FROM ${dataSourceMetadata.schema}."messageThread" WHERE "externalId" = $1`,
[messageThreadId], [messageThreadExternalId],
); );
const messageId = v4(); const messageId = v4();
@ -219,7 +148,7 @@ export class FetchWorkspaceMessagesService {
} }
} }
async saveMessageRecipients( public async saveMessageRecipients(
recipients: Recipient[], recipients: Recipient[],
dataSourceMetadata: DataSourceEntity, dataSourceMetadata: DataSourceEntity,
messageId: string, messageId: string,
@ -258,34 +187,70 @@ export class FetchWorkspaceMessagesService {
} }
} }
private async getAllSavedMessagesIdsAndMessageThreadsIdsForConnectedAccount( public async getSavedMessageIdsAndThreadIds(
messageEternalIds: string[],
connectedAccountId: string,
dataSourceMetadata: DataSourceEntity, dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource, workspaceDataSource: DataSource,
connectedAccountId: string,
): Promise<{ ): Promise<{
savedMessageIds: string[]; savedMessageIds: string[];
savedThreadIds: string[]; savedThreadIds: string[];
}> { }> {
const messageIds: { messageId: string; messageThreadId: string }[] = const messageIdsInDatabase: {
await workspaceDataSource?.query( messageExternalId: string;
`SELECT message."externalId" AS "messageId", messageThreadExternalId: string;
"messageThread"."externalId" AS "messageThreadId" }[] = await workspaceDataSource?.query(
`SELECT message."externalId" AS "messageExternalId",
"messageThread"."externalId" AS "messageThreadExternalId"
FROM ${dataSourceMetadata.schema}."message" message FROM ${dataSourceMetadata.schema}."message" message
LEFT JOIN ${dataSourceMetadata.schema}."messageThread" "messageThread" ON message."messageThreadId" = "messageThread"."id" LEFT JOIN ${dataSourceMetadata.schema}."messageThread" "messageThread" ON message."messageThreadId" = "messageThread"."id"
LEFT JOIN ${dataSourceMetadata.schema}."messageChannel" ON "messageThread"."messageChannelId" = ${dataSourceMetadata.schema}."messageChannel"."id" LEFT JOIN ${dataSourceMetadata.schema}."messageChannel" ON "messageThread"."messageChannelId" = ${dataSourceMetadata.schema}."messageChannel"."id"
WHERE ${dataSourceMetadata.schema}."messageChannel"."connectedAccountId" = $1`, WHERE ${dataSourceMetadata.schema}."messageChannel"."connectedAccountId" = $1
[connectedAccountId], AND message."externalId" = ANY($2)`,
); [connectedAccountId, messageEternalIds],
);
return { return {
savedMessageIds: messageIds.map((message) => message.messageId), savedMessageIds: messageIdsInDatabase.map(
(message) => message.messageExternalId,
),
savedThreadIds: [ savedThreadIds: [
...new Set(messageIds.map((message) => message.messageThreadId)), ...new Set(
messageIdsInDatabase.map(
(message) => message.messageThreadExternalId,
),
),
], ],
}; };
} }
private async getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( public async getConnectedAccountsFromWorkspaceId(
workspaceId: string,
): Promise<any[]> {
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId,
);
const workspaceDataSource =
await this.typeORMService.connectToDataSource(dataSourceMetadata);
if (!workspaceDataSource) {
throw new Error('No workspace data source found');
}
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail'`,
);
if (!connectedAccounts || connectedAccounts.length === 0) {
throw new Error('No connected account found');
}
return connectedAccounts;
}
public async getDataSourceMetadataWorkspaceMetadataAndConnectedAccount(
workspaceId: string, workspaceId: string,
connectedAccountId: string, connectedAccountId: string,
): Promise<{ ): Promise<{
@ -320,4 +285,62 @@ export class FetchWorkspaceMessagesService {
connectedAccount: connectedAccounts[0], connectedAccount: connectedAccounts[0],
}; };
} }
public async saveLastSyncHistoryId(
historyId: string,
connectedAccountId: string,
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
) {
await workspaceDataSource?.query(
`UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "lastSyncHistoryId" = $1 WHERE "id" = $2`,
[historyId, connectedAccountId],
);
}
public async deleteMessages(
messageIds: string[],
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
) {
if (!messageIds || messageIds.length === 0) {
return;
}
await workspaceDataSource?.query(
`DELETE FROM ${dataSourceMetadata.schema}."message" WHERE "externalId" = ANY($1)`,
[messageIds],
);
}
public async deleteEmptyThreads(
messageIds: string[],
connectedAccountId: string,
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
) {
const messageThreadsToDelete = await workspaceDataSource?.query(
`SELECT "messageThread"."id" FROM ${dataSourceMetadata.schema}."messageThread" "messageThread"
LEFT JOIN ${dataSourceMetadata.schema}."message" message ON "messageThread"."id" = message."messageThreadId"
LEFT JOIN ${dataSourceMetadata.schema}."messageChannel" ON "messageThread"."messageChannelId" = ${dataSourceMetadata.schema}."messageChannel"."id"
WHERE "messageThread"."externalId" = ANY($1)
AND ${dataSourceMetadata.schema}."messageChannel"."connectedAccountId" = $2
GROUP BY "messageThread"."id"
HAVING COUNT(message."id") = 0`,
[messageIds, connectedAccountId],
);
if (!messageThreadsToDelete || messageThreadsToDelete.length === 0) {
return;
}
const messageThreadIdsToDelete = messageThreadsToDelete.map(
(messageThread) => messageThread.id,
);
await workspaceDataSource?.query(
`DELETE FROM ${dataSourceMetadata.schema}."messageThread" WHERE "id" = ANY($1)`,
[messageThreadIdsToDelete],
);
}
} }

View File

@ -1,10 +1,11 @@
import { Attachment } from 'mailparser'; import { Attachment } from 'mailparser';
export type GmailMessage = { export type GmailMessage = {
historyId: string;
externalId: string; externalId: string;
headerMessageId: string; headerMessageId: string;
subject: string; subject: string;
messageThreadId: string; messageThreadExternalId: string;
internalDate: string; internalDate: string;
fromHandle: string; fromHandle: string;
fromDisplayName: string; fromDisplayName: string;

View File

@ -1,4 +1,4 @@
export type GmailThread = { export type GmailThread = {
id: string; id: string;
messageIds: string[]; subject: string;
}; };

View File

@ -1,14 +0,0 @@
type Message = {
id: string;
labels: string[];
};
export type GmailThreadParsedResponse = {
id: string;
messages: Message[];
error?: {
code: number;
message: string;
status: string;
};
};

View File

@ -1,3 +1,3 @@
export type MessageOrThreadQuery = { export type MessageQuery = {
uri: string; uri: string;
}; };