From 7f9fdf3ff68529cdc1ef2c6213f06b19300f0145 Mon Sep 17 00:00:00 2001 From: Charles Bochet Date: Sat, 8 Jun 2024 11:07:36 +0200 Subject: [PATCH] Fix performance issue mail (#5780) In this PR, I'm mainly doing two things: - uniformizing messaging-messages-import and messaging-message-list-fetch behaviors (cron.job and job) - improving performances of these cron.jobs by not triggering the jobs if the stage is not relevant - making sure these jobs have same signature (workspaceId + messageChannelId) --- .../auth/services/google-apis.service.ts | 36 ++++---- .../message-channel.repository.ts | 31 ++++++- .../messaging-message-list-fetch.cron.job.ts | 42 ++++------ .../messaging-messages-import.cron.job.ts | 36 ++++++-- .../jobs/messaging-message-list-fetch.job.ts | 40 ++++----- .../jobs/messaging-messages-import.job.ts | 82 ++++++++++++------- 6 files changed, 158 insertions(+), 109 deletions(-) diff --git a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts index 3e6bc89a9..ed4b58e59 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts @@ -28,6 +28,7 @@ import { MessageChannelWorkspaceEntity, MessageChannelType, MessageChannelVisibility, + MessageChannelSyncStatus, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessageListFetchJob, @@ -114,6 +115,7 @@ export class GoogleAPIsService { handle, visibility: messageVisibility || MessageChannelVisibility.SHARE_EVERYTHING, + syncStatus: MessageChannelSyncStatus.ONGOING, }, workspaceId, manager, @@ -150,26 +152,22 @@ export class GoogleAPIsService { } }); - await this.enqueueSyncJobs( - newOrExistingConnectedAccountId, - workspaceId, - isCalendarEnabled, - ); - } - - private async enqueueSyncJobs( - connectedAccountId: string, - workspaceId: string, - isCalendarEnabled: boolean, - ) { if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) { - await this.messageQueueService.add( - MessagingMessageListFetchJob.name, - { + const messageChannels = + await this.messageChannelRepository.getByConnectedAccountId( + newOrExistingConnectedAccountId, workspaceId, - connectedAccountId, - }, - ); + ); + + for (const messageChannel of messageChannels) { + await this.messageQueueService.add( + MessagingMessageListFetchJob.name, + { + workspaceId, + messageChannelId: messageChannel.id, + }, + ); + } } if ( @@ -180,7 +178,7 @@ export class GoogleAPIsService { GoogleCalendarSyncJob.name, { workspaceId, - connectedAccountId, + connectedAccountId: newOrExistingConnectedAccountId, }, { retryLimit: 2, diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts index fa229445b..a3a8c727c 100644 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts +++ b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts @@ -19,7 +19,12 @@ export class MessageChannelRepository { public async create( messageChannel: Pick< ObjectRecord, - 'id' | 'connectedAccountId' | 'type' | 'handle' | 'visibility' + | 'id' + | 'connectedAccountId' + | 'type' + | 'handle' + | 'visibility' + | 'syncStatus' >, workspaceId: string, transactionManager?: EntityManager, @@ -28,14 +33,15 @@ export class MessageChannelRepository { this.workspaceDataSourceService.getSchemaName(workspaceId); await this.workspaceDataSourceService.executeRawQuery( - `INSERT INTO ${dataSourceSchema}."messageChannel" ("id", "connectedAccountId", "type", "handle", "visibility") - VALUES ($1, $2, $3, $4, $5)`, + `INSERT INTO ${dataSourceSchema}."messageChannel" ("id", "connectedAccountId", "type", "handle", "visibility", "syncStatus") + VALUES ($1, $2, $3, $4, $5, $6)`, [ messageChannel.id, messageChannel.connectedAccountId, messageChannel.type, messageChannel.handle, messageChannel.visibility, + messageChannel.syncStatus, ], workspaceId, transactionManager, @@ -138,6 +144,25 @@ export class MessageChannelRepository { ); } + public async getById( + id: string, + workspaceId: string, + transactionManager?: EntityManager, + ): Promise> { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + const messageChannels = + await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "id" = $1`, + [id], + workspaceId, + transactionManager, + ); + + return messageChannels[0]; + } + public async getIdsByWorkspaceMemberId( workspaceMemberId: string, workspaceId: string, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts index 206990a72..9a2cb5474 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts @@ -12,7 +12,10 @@ import { MessageQueueService } from 'src/engine/integrations/message-queue/servi import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + MessageChannelSyncStage, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessageListFetchJobData, MessagingMessageListFetchJob, @@ -59,35 +62,26 @@ export class MessagingMessageListFetchCronJob ); for (const workspaceId of workspaceIdsWithDataSources) { - await this.enqueueSyncs(workspaceId); - } - } - - private async enqueueSyncs(workspaceId: string): Promise { - try { const messageChannels = await this.messageChannelRepository.getAll(workspaceId); for (const messageChannel of messageChannels) { - if (!messageChannel?.isSyncEnabled) { - continue; + if ( + (messageChannel.isSyncEnabled && + messageChannel.syncStage === + MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING) || + messageChannel.syncStage === + MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING + ) { + await this.messageQueueService.add( + MessagingMessageListFetchJob.name, + { + workspaceId, + messageChannelId: messageChannel.id, + }, + ); } - - await this.messageQueueService.add( - MessagingMessageListFetchJob.name, - { - workspaceId, - connectedAccountId: messageChannel.connectedAccountId, - }, - ); } - } catch (error) { - this.logger.error( - `Error while fetching workspace messages for workspace ${workspaceId}`, - ); - this.logger.error(error); - - return; } } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts index 6780059bd..2faafc14a 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, In } from 'typeorm'; @@ -14,13 +14,17 @@ import { MessagingMessagesImportJobData, MessagingMessagesImportJob, } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job'; +import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { + MessageChannelSyncStage, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @Injectable() export class MessagingMessagesImportCronJob implements MessageQueueJob { - private readonly logger = new Logger(MessagingMessagesImportCronJob.name); - constructor( @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, @@ -29,6 +33,8 @@ export class MessagingMessagesImportCronJob private readonly environmentService: EnvironmentService, @Inject(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, + @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) + private readonly messageChannelRepository: MessageChannelRepository, ) {} async handle(): Promise { @@ -54,12 +60,24 @@ export class MessagingMessagesImportCronJob ); for (const workspaceId of workspaceIdsWithDataSources) { - await this.messageQueueService.add( - MessagingMessagesImportJob.name, - { - workspaceId, - }, - ); + const messageChannels = + await this.messageChannelRepository.getAll(workspaceId); + + for (const messageChannel of messageChannels) { + if ( + messageChannel.isSyncEnabled && + messageChannel.syncStage === + MessageChannelSyncStage.MESSAGES_IMPORT_PENDING + ) { + await this.messageQueueService.add( + MessagingMessagesImportJob.name, + { + workspaceId, + messageChannelId: messageChannel.id, + }, + ); + } + } } } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index a39672d20..4cd4b40e5 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -16,8 +16,8 @@ import { MessagingGmailPartialMessageListFetchService } from 'src/modules/messag import { isThrottled } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled'; export type MessagingMessageListFetchJobData = { + messageChannelId: string; workspaceId: string; - connectedAccountId: string; }; @Injectable() @@ -37,42 +37,36 @@ export class MessagingMessageListFetchJob ) {} async handle(data: MessagingMessageListFetchJobData): Promise { - const { workspaceId, connectedAccountId } = data; + const { messageChannelId, workspaceId } = data; await this.messagingTelemetryService.track({ eventName: 'message_list_fetch_job.triggered', + messageChannelId, workspaceId, - connectedAccountId, }); - const connectedAccount = await this.connectedAccountRepository.getById( - connectedAccountId, + const messageChannel = await this.messageChannelRepository.getById( + messageChannelId, workspaceId, ); - if (!connectedAccount) { + if (!messageChannel) { await this.messagingTelemetryService.track({ - eventName: 'message_list_fetch_job.error.connected_account_not_found', + eventName: 'message_list_fetch_job.error.message_channel_not_found', + messageChannelId, workspaceId, - connectedAccountId, }); return; } - const messageChannel = - await this.messageChannelRepository.getFirstByConnectedAccountId( - connectedAccountId, + const connectedAccount = + await this.connectedAccountRepository.getByIdOrFail( + messageChannel.connectedAccountId, workspaceId, ); - if (!messageChannel) { - await this.messagingTelemetryService.track({ - eventName: 'message_list_fetch_job.error.message_channel_not_found', - workspaceId, - connectedAccountId, - }); - + if (!messageChannel?.isSyncEnabled) { return; } @@ -88,13 +82,13 @@ export class MessagingMessageListFetchJob switch (messageChannel.syncStage) { case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING: this.logger.log( - `Fetching partial message list for workspace ${workspaceId} and account ${connectedAccount.id}`, + `Fetching partial message list for workspace ${workspaceId} and messageChannelId ${messageChannel.id}`, ); await this.messagingTelemetryService.track({ eventName: 'partial_message_list_fetch.started', workspaceId, - connectedAccountId, + connectedAccountId: connectedAccount.id, messageChannelId: messageChannel.id, }); @@ -107,7 +101,7 @@ export class MessagingMessageListFetchJob await this.messagingTelemetryService.track({ eventName: 'partial_message_list_fetch.completed', workspaceId, - connectedAccountId, + connectedAccountId: connectedAccount.id, messageChannelId: messageChannel.id, }); @@ -121,7 +115,7 @@ export class MessagingMessageListFetchJob await this.messagingTelemetryService.track({ eventName: 'full_message_list_fetch.started', workspaceId, - connectedAccountId, + connectedAccountId: connectedAccount.id, messageChannelId: messageChannel.id, }); @@ -134,7 +128,7 @@ export class MessagingMessageListFetchJob await this.messagingTelemetryService.track({ eventName: 'full_message_list_fetch.completed', workspaceId, - connectedAccountId, + connectedAccountId: connectedAccount.id, messageChannelId: messageChannel.id, }); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts index 757fbbf41..246f5f8b1 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts @@ -7,11 +7,15 @@ import { ConnectedAccountRepository } from 'src/modules/connected-account/reposi import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + MessageChannelSyncStage, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingGmailMessagesImportService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service'; import { isThrottled } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled'; export type MessagingMessagesImportJobData = { + messageChannelId: string; workspaceId: string; }; @@ -29,43 +33,59 @@ export class MessagingMessagesImportJob ) {} async handle(data: MessagingMessagesImportJobData): Promise { - const { workspaceId } = data; + const { messageChannelId, workspaceId } = data; - const messageChannels = - await this.messageChannelRepository.getAll(workspaceId); + await this.messagingTelemetryService.track({ + eventName: 'messages_import.triggered', + workspaceId, + messageChannelId, + }); - for (const messageChannel of messageChannels) { - if (!messageChannel?.isSyncEnabled) { - continue; - } + const messageChannel = await this.messageChannelRepository.getById( + messageChannelId, + workspaceId, + ); + if (!messageChannel) { await this.messagingTelemetryService.track({ - eventName: 'messages_import.triggered', + eventName: 'messages_import.error.message_channel_not_found', + messageChannelId, workspaceId, - connectedAccountId: messageChannel.connectedAccountId, - messageChannelId: messageChannel.id, }); - if ( - isThrottled( - messageChannel.syncStageStartedAt, - messageChannel.throttleFailureCount, - ) - ) { - continue; - } - - const connectedAccount = - await this.connectedAccountRepository.getConnectedAccountOrThrow( - workspaceId, - messageChannel.connectedAccountId, - ); - - await this.gmailFetchMessageContentFromCacheService.processMessageBatchImport( - messageChannel, - connectedAccount, - workspaceId, - ); + return; } + + const connectedAccount = + await this.connectedAccountRepository.getConnectedAccountOrThrow( + workspaceId, + messageChannel.connectedAccountId, + ); + + if (!messageChannel?.isSyncEnabled) { + return; + } + + if ( + isThrottled( + messageChannel.syncStageStartedAt, + messageChannel.throttleFailureCount, + ) + ) { + return; + } + + if ( + messageChannel.syncStage !== + MessageChannelSyncStage.MESSAGES_IMPORT_PENDING + ) { + return; + } + + await this.gmailFetchMessageContentFromCacheService.processMessageBatchImport( + messageChannel, + connectedAccount, + workspaceId, + ); } }