From 3f9f2c3ba611b082f0b4f230283eeef937fbd729 Mon Sep 17 00:00:00 2001 From: bosiraphael <71827178+bosiraphael@users.noreply.github.com> Date: Tue, 4 Jun 2024 10:29:05 +0200 Subject: [PATCH] 5620 implement throttle logic for message and calendar sync (#5718) Closes #5620 and improve messages filters --- .../docs/start/self-hosting/self-hosting.mdx | 4 +- ...google-api-refresh-access-token.service.ts | 2 +- .../constants/messaging-throttle-duration.ts | 1 + .../messaging-throttle-max-attempts.ts | 1 + .../message-channel.repository.ts | 35 ++++++++++++++ .../messaging-error-handling.service.ts | 47 +++++++++++++++++-- .../messaging-gmail-excluded-categories.ts | 1 + ...g-gmail-full-message-list-fetch.service.ts | 5 ++ ...messaging-gmail-messages-import.service.ts | 8 ++++ ...mail-partial-message-list-fetch.service.ts | 5 ++ .../jobs/messaging-message-list-fetch.job.ts | 9 ++++ .../jobs/messaging-messages-import.job.ts | 7 +++ .../utils/filter-emails.util.ts | 2 +- 13 files changed, 120 insertions(+), 7 deletions(-) create mode 100644 packages/twenty-server/src/modules/messaging/common/constants/messaging-throttle-duration.ts create mode 100644 packages/twenty-server/src/modules/messaging/common/constants/messaging-throttle-max-attempts.ts diff --git a/packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx b/packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx index a6ea99e0d..de75c17e7 100644 --- a/packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx +++ b/packages/twenty-docs/docs/start/self-hosting/self-hosting.mdx @@ -22,8 +22,8 @@ import TabItem from '@theme/TabItem'; Twenty offers integrations with Gmail and Google Calendar. To enable these features, you need to connect to register the following recurring jobs: ``` # from your worker container -yarn command:prod cron:messaging:gmail-messages-import -yarn command:prod cron:messaging:gmail-message-list-fetch +yarn command:prod cron:messaging:messages-import +yarn command:prod cron:messaging:message-list-fetch ``` # Setup Environment Variables diff --git a/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service.ts b/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service.ts index 0bdfcefb0..dc687e1a4 100644 --- a/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service.ts +++ b/packages/twenty-server/src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service.ts @@ -68,7 +68,7 @@ export class GoogleAPIRefreshAccessTokenService { } await this.messagingTelemetryService.track({ - eventName: `refresh-token.error.insufficient_permissions`, + eventName: `refresh_token.error.insufficient_permissions`, workspaceId, connectedAccountId: messageChannel.connectedAccountId, messageChannelId: messageChannel.id, diff --git a/packages/twenty-server/src/modules/messaging/common/constants/messaging-throttle-duration.ts b/packages/twenty-server/src/modules/messaging/common/constants/messaging-throttle-duration.ts new file mode 100644 index 000000000..193810102 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/common/constants/messaging-throttle-duration.ts @@ -0,0 +1 @@ +export const MESSAGING_THROTTLE_DURATION = 1000 * 60 * 1; // 1 minute diff --git a/packages/twenty-server/src/modules/messaging/common/constants/messaging-throttle-max-attempts.ts b/packages/twenty-server/src/modules/messaging/common/constants/messaging-throttle-max-attempts.ts new file mode 100644 index 000000000..319c126e2 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/common/constants/messaging-throttle-max-attempts.ts @@ -0,0 +1 @@ +export const MESSAGING_THROTTLE_MAX_ATTEMPTS = 4; diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts index 0c86ba4de..a6b2f9585 100644 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts +++ b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts @@ -240,4 +240,39 @@ export class MessageChannelRepository { transactionManager, ); } + + public async updateThrottlePauseUntilAndIncrementThrottleFailureCount( + id: string, + throttleDurationMs: number, + workspaceId: string, + transactionManager?: EntityManager, + ) { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NOW() + ($1 || ' milliseconds')::interval, "throttleFailureCount" = "throttleFailureCount" + 1 + WHERE "id" = $2`, + [throttleDurationMs, id], + workspaceId, + transactionManager, + ); + } + + public async resetThrottlePauseUntilAndThrottleFailureCount( + id: string, + workspaceId: string, + transactionManager?: EntityManager, + ) { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NULL, "throttleFailureCount" = 0 + WHERE "id" = $1`, + [id], + workspaceId, + transactionManager, + ); + } } diff --git a/packages/twenty-server/src/modules/messaging/common/services/messaging-error-handling.service.ts b/packages/twenty-server/src/modules/messaging/common/services/messaging-error-handling.service.ts index 5073ade63..b79fbfa38 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/messaging-error-handling.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/messaging-error-handling.service.ts @@ -9,6 +9,9 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; +import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; +import { MESSAGING_THROTTLE_DURATION } from 'src/modules/messaging/common/constants/messaging-throttle-duration'; +import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/common/constants/messaging-throttle-max-attempts'; type SyncStep = | 'partial-message-list-fetch' @@ -27,6 +30,8 @@ export class MessagingErrorHandlingService { private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, private readonly messagingTelemetryService: MessagingTelemetryService, + @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) + private readonly messageChannelRepository: MessageChannelRepository, ) {} public async handleGmailError( @@ -100,7 +105,7 @@ export class MessagingErrorHandlingService { } } - public async handleRateLimitExceeded( + private async handleRateLimitExceeded( error: GmailError, syncStep: SyncStep, messageChannel: ObjectRecord, @@ -114,6 +119,19 @@ export class MessagingErrorHandlingService { message: `${error.code}: ${error.reason}`, }); + if ( + messageChannel.throttleFailureCount >= MESSAGING_THROTTLE_MAX_ATTEMPTS + ) { + await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( + messageChannel.id, + workspaceId, + ); + + return; + } + + await this.throttle(messageChannel, workspaceId); + switch (syncStep) { case 'full-message-list-fetch': await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch( @@ -141,7 +159,7 @@ export class MessagingErrorHandlingService { } } - public async handleInsufficientPermissions( + private async handleInsufficientPermissions( error: GmailError, syncStep: SyncStep, messageChannel: ObjectRecord, @@ -166,7 +184,7 @@ export class MessagingErrorHandlingService { ); } - public async handleNotFound( + private async handleNotFound( error: GmailError, syncStep: SyncStep, messageChannel: ObjectRecord, @@ -189,4 +207,27 @@ export class MessagingErrorHandlingService { workspaceId, ); } + + private async throttle( + messageChannel: ObjectRecord, + workspaceId: string, + ): Promise { + const throttleDuration = + MESSAGING_THROTTLE_DURATION * + Math.pow(2, messageChannel.throttleFailureCount); + + await this.messageChannelRepository.updateThrottlePauseUntilAndIncrementThrottleFailureCount( + messageChannel.id, + throttleDuration, + workspaceId, + ); + + await this.messagingTelemetryService.track({ + eventName: 'message_channel.throttle', + workspaceId, + connectedAccountId: messageChannel.connectedAccountId, + messageChannelId: messageChannel.id, + message: `Throttling for ${throttleDuration}ms`, + }); + } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts index 1b6cd0152..d9f46c48e 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories.ts @@ -2,4 +2,5 @@ export const MESSAGING_GMAIL_EXCLUDED_CATEGORIES = [ 'promotions', 'social', 'forums', + 'updates', ]; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service.ts index 1fb1c4b95..07e2c1eef 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service.ts @@ -77,6 +77,11 @@ export class MessagingGmailFullMessageListFetchService { return; } + await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount( + messageChannel.id, + workspaceId, + ); + await this.messagingChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, workspaceId, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts index 259e6de68..a00590b05 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts @@ -21,6 +21,7 @@ import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messa import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service'; import { MessagingErrorHandlingService } from 'src/modules/messaging/common/services/messaging-error-handling.service'; import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service'; +import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; @Injectable() export class MessagingGmailMessagesImportService { @@ -39,6 +40,8 @@ export class MessagingGmailMessagesImportService { private readonly messagingTelemetryService: MessagingTelemetryService, @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) private readonly blocklistRepository: BlocklistRepository, + @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) + private readonly messageChannelRepository: MessageChannelRepository, ) {} async processMessageBatchImport( @@ -134,6 +137,11 @@ export class MessagingGmailMessagesImportService { ); } + await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount( + messageChannel.id, + workspaceId, + ); + return await this.trackMessageImportCompleted( messageChannel, workspaceId, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service.ts index f45c5be52..36e86080f 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service.ts @@ -74,6 +74,11 @@ export class MessagingGmailPartialMessageListFetchService { return; } + await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount( + messageChannel.id, + workspaceId, + ); + if (!historyId) { throw new Error( `No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index f180a91a9..13e4fe715 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -75,6 +75,13 @@ export class MessagingMessageListFetchJob return; } + if ( + messageChannel.throttlePauseUntil && + messageChannel.throttlePauseUntil > new Date() + ) { + return; + } + switch (messageChannel.syncSubStatus) { case MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING: this.logger.log( @@ -85,6 +92,7 @@ export class MessagingMessageListFetchJob eventName: 'partial_message_list_fetch.started', workspaceId, connectedAccountId, + messageChannelId: messageChannel.id, }); await this.gmailPartialMessageListFetchV2Service.processMessageListFetch( @@ -111,6 +119,7 @@ export class MessagingMessageListFetchJob eventName: 'full_message_list_fetch.started', workspaceId, connectedAccountId, + messageChannelId: messageChannel.id, }); await this.gmailFullMessageListFetchService.processMessageListFetch( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts index 4529e8105..139a0b667 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts @@ -45,6 +45,13 @@ export class MessagingMessagesImportJob messageChannelId: messageChannel.id, }); + if ( + messageChannel.throttlePauseUntil && + messageChannel.throttlePauseUntil > new Date() + ) { + continue; + } + const connectedAccount = await this.connectedAccountRepository.getConnectedAccountOrThrow( workspaceId, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts index 01140a00f..9e1ebecfe 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts @@ -38,7 +38,7 @@ const filterOutIcsAttachments = (messages: GmailMessage[]) => { const isPersonEmail = (email: string): boolean => { const nonPersonalPattern = - /noreply|no-reply|do_not_reply|no\.reply|^(info@|contact@|hello@|support@|feedback@|service@|help@|invites@|invite@|welcome@|alerts@|team@)/; + /noreply|no-reply|do_not_reply|no\.reply|^(info@|contact@|hello@|support@|feedback@|service@|help@|invites@|invite@|welcome@|alerts@|team@|notifications@|notification@|news@)/; return !nonPersonalPattern.test(email); };