From 250bb6134e01ae829f584e72ba994d5ccc467bd2 Mon Sep 17 00:00:00 2001 From: Weiko Date: Fri, 8 Mar 2024 14:06:21 +0100 Subject: [PATCH] [messaging] remove partial sync retry and fix missing datasource error (#4371) * [messaging] remove partial sync retry and fix missing datasource error * revert * fix * add 429 * fix * fix * fix * remove duplicate log * fix cron pattern --- .../integrations/message-queue/jobs.module.ts | 2 ++ .../fetch-all-workspaces-messages.job.ts | 20 ++++++++++++++----- .../commands/gmail-partial-sync.command.ts | 3 --- .../fetch-messages-by-batches.service.ts | 2 -- .../services/gmail-partial-sync.service.ts | 20 +++++++++++++++++-- 5 files changed, 35 insertions(+), 12 deletions(-) diff --git a/packages/twenty-server/src/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/integrations/message-queue/jobs.module.ts index d38b47fab..c3d2442c5 100644 --- a/packages/twenty-server/src/integrations/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/integrations/message-queue/jobs.module.ts @@ -33,6 +33,7 @@ import { UserWorkspaceModule } from 'src/core/user-workspace/user-workspace.modu import { StripeModule } from 'src/core/billing/stripe/stripe.module'; import { Workspace } from 'src/core/workspace/workspace.entity'; import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; +import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; @Module({ imports: [ @@ -51,6 +52,7 @@ import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; ThreadCleanerModule, TypeORMModule, TypeOrmModule.forFeature([Workspace, FeatureFlagEntity], 'core'), + TypeOrmModule.forFeature([DataSourceEntity], 'metadata'), UserModule, UserWorkspaceModule, WorkspaceDataSourceModule, diff --git a/packages/twenty-server/src/workspace/messaging/commands/crons/fetch-all-workspaces-messages.job.ts b/packages/twenty-server/src/workspace/messaging/commands/crons/fetch-all-workspaces-messages.job.ts index 9f5712d3a..9d02f2705 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/crons/fetch-all-workspaces-messages.job.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/crons/fetch-all-workspaces-messages.job.ts @@ -1,7 +1,7 @@ import { Inject, Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import { Repository } from 'typeorm'; +import { Repository, In } from 'typeorm'; import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; @@ -13,6 +13,7 @@ import { GmailPartialSyncJobData, GmailPartialSyncJob, } from 'src/workspace/messaging/jobs/gmail-partial-sync.job'; +import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; @Injectable() export class FetchAllWorkspacesMessagesJob @@ -23,6 +24,8 @@ export class FetchAllWorkspacesMessagesJob constructor( @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, + @InjectRepository(DataSourceEntity, 'metadata') + private readonly dataSourceRepository: Repository, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, private readonly connectedAccountService: ConnectedAccountService, @@ -38,7 +41,17 @@ export class FetchAllWorkspacesMessagesJob }) ).map((workspace) => workspace.id); - for (const workspaceId of workspaceIds) { + const dataSources = await this.dataSourceRepository.find({ + where: { + workspaceId: In(workspaceIds), + }, + }); + + const workspaceIdsWithDataSources = new Set( + dataSources.map((dataSource) => dataSource.workspaceId), + ); + + for (const workspaceId of workspaceIdsWithDataSources) { await this.fetchWorkspaceMessages(workspaceId); } } @@ -55,9 +68,6 @@ export class FetchAllWorkspacesMessagesJob workspaceId, connectedAccountId: connectedAccount.id, }, - { - retryLimit: 2, - }, ); } } catch (error) { diff --git a/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts b/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts index b911f6122..134023630 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts @@ -56,9 +56,6 @@ export class GmailPartialSyncCommand extends CommandRunner { workspaceId, connectedAccountId: connectedAccount.id, }, - { - retryLimit: 2, - }, ); } } diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts index bde65dc2a..a88ef5db6 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-messages-by-batches.service.ts @@ -187,8 +187,6 @@ export class FetchMessagesByBatchesService { const formattedResponse = Promise.all( parsedResponses.map(async (message: GmailMessageParsedResponse) => { if (message.error) { - console.log('Error', message.error); - errors.push(message.error); return; diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts index fae82f7af..7ce6419fd 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts @@ -213,9 +213,25 @@ export class GmailPartialSyncService { } if (errors.length) { - throw new Error( - `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during partial-sync`, + this.logger.error( + `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during partial-sync: ${JSON.stringify( + errors, + null, + 2, + )}`, ); + const errorsCanBeIgnored = errors.every((error) => error.code === 404); + const errorsShouldBeRetried = errors.some((error) => error.code === 429); + + if (errorsShouldBeRetried) { + return; + } + + if (!errorsCanBeIgnored) { + throw new Error( + `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during partial-sync`, + ); + } } startTime = Date.now();