import { Inject, Injectable } from '@nestjs/common'; import { gmail_v1 } from 'googleapis'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { GmailFullSyncJob, GmailFullSyncJobData, } from 'src/workspace/messaging/jobs/gmail-full-sync.job'; @Injectable() export class GmailPartialSyncService { constructor( private readonly gmailClientProvider: GmailClientProvider, private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, private readonly utils: MessagingUtilsService, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, ) {} private async getHistory( workspaceId: string, connectedAccountId: string, lastSyncHistoryId: string, maxResults: number, ) { const { connectedAccount } = await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( workspaceId, connectedAccountId, ); const gmailClient = await this.gmailClientProvider.getGmailClient( connectedAccount.refreshToken, ); const history = await gmailClient.users.history.list({ userId: 'me', startHistoryId: lastSyncHistoryId, historyTypes: ['messageAdded', 'messageDeleted'], maxResults, }); return history.data; } public async fetchConnectedAccountThreads( workspaceId: string, connectedAccountId: string, maxResults = 500, ): Promise { const { workspaceDataSource, dataSourceMetadata, connectedAccount } = await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( workspaceId, connectedAccountId, ); const lastSyncHistoryId = connectedAccount.lastSyncHistoryId; if (!lastSyncHistoryId) { // Fall back to full sync await this.messageQueueService.add( GmailFullSyncJob.name, { workspaceId, connectedAccountId }, { id: `${workspaceId}-${connectedAccount.id}`, retryLimit: 2, }, ); return; } const accessToken = connectedAccount.accessToken; const refreshToken = connectedAccount.refreshToken; if (!refreshToken) { throw new Error('No refresh token found'); } const history = await this.getHistory( workspaceId, connectedAccountId, lastSyncHistoryId, maxResults, ); const historyId = history.historyId; if (!historyId) { throw new Error('No history id found'); } if (historyId === lastSyncHistoryId) { return; } if (!history.history) { await this.utils.saveLastSyncHistoryId( historyId, connectedAccountId, dataSourceMetadata, workspaceDataSource, ); return; } const gmailMessageChannel = await workspaceDataSource?.query( `SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`, [connectedAccountId], ); if (!gmailMessageChannel.length) { throw new Error( `No gmail message channel found for connected account ${connectedAccountId}`, ); } const gmailMessageChannelId = gmailMessageChannel[0].id; const { messagesAdded, messagesDeleted } = await this.getMessageIdsFromHistory(history); const messageQueries = this.utils.createQueriesFromMessageIds(messagesAdded); const { messages: messagesToSave, errors } = await this.fetchMessagesByBatchesService.fetchAllMessages( messageQueries, accessToken, ); await this.utils.saveMessages( messagesToSave, dataSourceMetadata, workspaceDataSource, connectedAccount, gmailMessageChannelId, ); await this.utils.deleteMessageChannelMessageAssociations( messagesDeleted, gmailMessageChannelId, dataSourceMetadata, workspaceDataSource, ); if (errors.length) throw new Error('Error fetching messages'); await this.utils.saveLastSyncHistoryId( historyId, connectedAccount.id, dataSourceMetadata, workspaceDataSource, ); } private async getMessageIdsFromHistory( history: gmail_v1.Schema$ListHistoryResponse, ): Promise<{ messagesAdded: string[]; messagesDeleted: string[]; }> { if (!history.history) throw new Error('No history found'); const { messagesAdded, messagesDeleted } = history.history.reduce( ( acc: { messagesAdded: string[]; messagesDeleted: string[]; }, history, ) => { const messagesAdded = history.messagesAdded?.map( (messageAdded) => messageAdded.message?.id || '', ); const messagesDeleted = history.messagesDeleted?.map( (messageDeleted) => messageDeleted.message?.id || '', ); if (messagesAdded) acc.messagesAdded.push(...messagesAdded); if (messagesDeleted) acc.messagesDeleted.push(...messagesDeleted); return acc; }, { messagesAdded: [], messagesDeleted: [] }, ); const uniqueMessagesAdded = messagesAdded.filter( (messageId) => !messagesDeleted.includes(messageId), ); const uniqueMessagesDeleted = messagesDeleted.filter( (messageId) => !messagesAdded.includes(messageId), ); return { messagesAdded: uniqueMessagesAdded, messagesDeleted: uniqueMessagesDeleted, }; } }