From fcfc6796f79b5d4d9c35ec2c59df09a5445ee1a4 Mon Sep 17 00:00:00 2001 From: bosiraphael <71827178+bosiraphael@users.noreply.github.com> Date: Wed, 28 Feb 2024 14:55:54 +0100 Subject: [PATCH] Add pagination to partial sync and add logs (#4223) * update gmail partial sync to add pagination * adding logs * update * improve readability --- .../services/gmail-partial-sync.service.ts | 89 ++++++++++++++++--- 1 file changed, 75 insertions(+), 14 deletions(-) diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts index 08771134a..c7ce8fa13 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts @@ -46,6 +46,10 @@ export class GmailPartialSyncService { const lastSyncHistoryId = connectedAccount.lastSyncHistoryId; if (!lastSyncHistoryId) { + this.logger.log( + `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: no lastSyncHistoryId, falling back to full sync.`, + ); + await this.fallbackToFullSync(workspaceId, connectedAccountId); return; @@ -58,13 +62,27 @@ export class GmailPartialSyncService { throw new Error('No refresh token found'); } - const { history, error } = await this.getHistoryFromGmail( + let startTime = Date.now(); + + const { history, historyId, error } = await this.getHistoryFromGmail( refreshToken, lastSyncHistoryId, maxResults, ); + let endTime = Date.now(); + + this.logger.log( + `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} getting history in ${ + endTime - startTime + }ms.`, + ); + if (error && error.code === 404) { + this.logger.log( + `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: invalid lastSyncHistoryId, falling back to full sync.`, + ); + await this.connectedAccountService.deleteHistoryId( connectedAccountId, workspaceId, @@ -75,13 +93,11 @@ export class GmailPartialSyncService { return; } - const newHistoryId = history?.historyId; - - if (!newHistoryId) { + if (!historyId) { throw new Error('No history id found'); } - if (newHistoryId === lastSyncHistoryId || !history?.history?.length) { + if (historyId === lastSyncHistoryId || !history?.length) { this.logger.log( `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to update.`, ); @@ -106,7 +122,7 @@ export class GmailPartialSyncService { await this.fetchMessagesByBatchesService.fetchAllMessages( messageQueries, accessToken, - 'gmail full-sync', + 'gmail partial-sync', workspaceId, connectedAccountId, ); @@ -122,35 +138,53 @@ export class GmailPartialSyncService { } if (messagesDeleted.length !== 0) { + startTime = Date.now(); + await this.messageService.deleteMessages( messagesDeleted, gmailMessageChannelId, workspaceId, ); + + endTime = Date.now(); + + this.logger.log( + `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: deleting messages in ${ + endTime - startTime + }ms.`, + ); } if (errors.length) throw new Error('Error fetching messages'); + startTime = Date.now(); + await this.connectedAccountService.updateLastSyncHistoryId( - newHistoryId, + historyId, connectedAccount.id, workspaceId, ); + endTime = Date.now(); + + this.logger.log( + `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} updating lastSyncHistoryId in ${ + endTime - startTime + }ms.`, + ); + this.logger.log( `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done.`, ); } private async getMessageIdsFromHistory( - history: gmail_v1.Schema$ListHistoryResponse, + history: gmail_v1.Schema$History[], ): Promise<{ messagesAdded: string[]; messagesDeleted: string[]; }> { - if (!history.history) throw new Error('No history found'); - - const { messagesAdded, messagesDeleted } = history.history.reduce( + const { messagesAdded, messagesDeleted } = history.reduce( ( acc: { messagesAdded: string[]; @@ -193,12 +227,15 @@ export class GmailPartialSyncService { lastSyncHistoryId: string, maxResults: number, ): Promise<{ - history?: gmail_v1.Schema$ListHistoryResponse; + history: gmail_v1.Schema$History[]; + historyId?: string | null; error?: any; }> { const gmailClient = await this.gmailClientProvider.getGmailClient(refreshToken); + const fullHistory: gmail_v1.Schema$History[] = []; + try { const history = await gmailClient.users.history.list({ userId: 'me', @@ -207,12 +244,36 @@ export class GmailPartialSyncService { maxResults, }); - return { history: history.data }; + let nextPageToken = history?.data?.nextPageToken; + + const historyId = history?.data?.historyId; + + if (history?.data?.history) { + fullHistory.push(...history.data.history); + } + + while (nextPageToken) { + const nextHistory = await gmailClient.users.history.list({ + userId: 'me', + startHistoryId: lastSyncHistoryId, + historyTypes: ['messageAdded', 'messageDeleted'], + maxResults, + pageToken: nextPageToken, + }); + + nextPageToken = nextHistory?.data?.nextPageToken; + + if (nextHistory?.data?.history) { + fullHistory.push(...nextHistory.data.history); + } + } + + return { history: fullHistory, historyId }; } catch (error) { const errorData = error?.response?.data?.error; if (errorData) { - return { error: errorData }; + return { history: [], error: errorData }; } throw error;