Add pagination to partial sync and add logs (#4223)

* update gmail partial sync to add pagination

* adding logs

* update

* improve readability
This commit is contained in:
bosiraphael
2024-02-28 14:55:54 +01:00
committed by GitHub
parent 47656479ba
commit fcfc6796f7

View File

@ -46,6 +46,10 @@ export class GmailPartialSyncService {
const lastSyncHistoryId = connectedAccount.lastSyncHistoryId; const lastSyncHistoryId = connectedAccount.lastSyncHistoryId;
if (!lastSyncHistoryId) { if (!lastSyncHistoryId) {
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: no lastSyncHistoryId, falling back to full sync.`,
);
await this.fallbackToFullSync(workspaceId, connectedAccountId); await this.fallbackToFullSync(workspaceId, connectedAccountId);
return; return;
@ -58,13 +62,27 @@ export class GmailPartialSyncService {
throw new Error('No refresh token found'); throw new Error('No refresh token found');
} }
const { history, error } = await this.getHistoryFromGmail( let startTime = Date.now();
const { history, historyId, error } = await this.getHistoryFromGmail(
refreshToken, refreshToken,
lastSyncHistoryId, lastSyncHistoryId,
maxResults, maxResults,
); );
let endTime = Date.now();
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} getting history in ${
endTime - startTime
}ms.`,
);
if (error && error.code === 404) { if (error && error.code === 404) {
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: invalid lastSyncHistoryId, falling back to full sync.`,
);
await this.connectedAccountService.deleteHistoryId( await this.connectedAccountService.deleteHistoryId(
connectedAccountId, connectedAccountId,
workspaceId, workspaceId,
@ -75,13 +93,11 @@ export class GmailPartialSyncService {
return; return;
} }
const newHistoryId = history?.historyId; if (!historyId) {
if (!newHistoryId) {
throw new Error('No history id found'); throw new Error('No history id found');
} }
if (newHistoryId === lastSyncHistoryId || !history?.history?.length) { if (historyId === lastSyncHistoryId || !history?.length) {
this.logger.log( this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to update.`, `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to update.`,
); );
@ -106,7 +122,7 @@ export class GmailPartialSyncService {
await this.fetchMessagesByBatchesService.fetchAllMessages( await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries, messageQueries,
accessToken, accessToken,
'gmail full-sync', 'gmail partial-sync',
workspaceId, workspaceId,
connectedAccountId, connectedAccountId,
); );
@ -122,35 +138,53 @@ export class GmailPartialSyncService {
} }
if (messagesDeleted.length !== 0) { if (messagesDeleted.length !== 0) {
startTime = Date.now();
await this.messageService.deleteMessages( await this.messageService.deleteMessages(
messagesDeleted, messagesDeleted,
gmailMessageChannelId, gmailMessageChannelId,
workspaceId, workspaceId,
); );
endTime = Date.now();
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: deleting messages in ${
endTime - startTime
}ms.`,
);
} }
if (errors.length) throw new Error('Error fetching messages'); if (errors.length) throw new Error('Error fetching messages');
startTime = Date.now();
await this.connectedAccountService.updateLastSyncHistoryId( await this.connectedAccountService.updateLastSyncHistoryId(
newHistoryId, historyId,
connectedAccount.id, connectedAccount.id,
workspaceId, workspaceId,
); );
endTime = Date.now();
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} updating lastSyncHistoryId in ${
endTime - startTime
}ms.`,
);
this.logger.log( this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done.`, `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done.`,
); );
} }
private async getMessageIdsFromHistory( private async getMessageIdsFromHistory(
history: gmail_v1.Schema$ListHistoryResponse, history: gmail_v1.Schema$History[],
): Promise<{ ): Promise<{
messagesAdded: string[]; messagesAdded: string[];
messagesDeleted: string[]; messagesDeleted: string[];
}> { }> {
if (!history.history) throw new Error('No history found'); const { messagesAdded, messagesDeleted } = history.reduce(
const { messagesAdded, messagesDeleted } = history.history.reduce(
( (
acc: { acc: {
messagesAdded: string[]; messagesAdded: string[];
@ -193,12 +227,15 @@ export class GmailPartialSyncService {
lastSyncHistoryId: string, lastSyncHistoryId: string,
maxResults: number, maxResults: number,
): Promise<{ ): Promise<{
history?: gmail_v1.Schema$ListHistoryResponse; history: gmail_v1.Schema$History[];
historyId?: string | null;
error?: any; error?: any;
}> { }> {
const gmailClient = const gmailClient =
await this.gmailClientProvider.getGmailClient(refreshToken); await this.gmailClientProvider.getGmailClient(refreshToken);
const fullHistory: gmail_v1.Schema$History[] = [];
try { try {
const history = await gmailClient.users.history.list({ const history = await gmailClient.users.history.list({
userId: 'me', userId: 'me',
@ -207,12 +244,36 @@ export class GmailPartialSyncService {
maxResults, maxResults,
}); });
return { history: history.data }; let nextPageToken = history?.data?.nextPageToken;
const historyId = history?.data?.historyId;
if (history?.data?.history) {
fullHistory.push(...history.data.history);
}
while (nextPageToken) {
const nextHistory = await gmailClient.users.history.list({
userId: 'me',
startHistoryId: lastSyncHistoryId,
historyTypes: ['messageAdded', 'messageDeleted'],
maxResults,
pageToken: nextPageToken,
});
nextPageToken = nextHistory?.data?.nextPageToken;
if (nextHistory?.data?.history) {
fullHistory.push(...nextHistory.data.history);
}
}
return { history: fullHistory, historyId };
} catch (error) { } catch (error) {
const errorData = error?.response?.data?.error; const errorData = error?.response?.data?.error;
if (errorData) { if (errorData) {
return { error: errorData }; return { history: [], error: errorData };
} }
throw error; throw error;