From 87465b13eee3a21c5f387ca17791e46d6566114e Mon Sep 17 00:00:00 2001 From: bosiraphael <71827178+bosiraphael@users.noreply.github.com> Date: Fri, 24 May 2024 18:27:54 +0200 Subject: [PATCH] 5507 modify the partial sync cron to work with the new statuses (#5512) Closes #5507 --- .../auth/services/google-apis.service.ts | 10 +- .../integrations/message-queue/jobs.module.ts | 12 +- .../connected-account.repository.ts | 26 +++ ... gmail-message-list-fetch.cron.command.ts} | 8 +- ... => gmail-messages-import.cron.command.ts} | 8 +- .../messaging-cron-commands.module.ts | 9 +- ...s => gmail-message-list-fetch.cron.job.ts} | 68 ++++++-- ...b.ts => gmail-messages-import.cron.job.ts} | 63 ++++++-- .../crons/jobs/messaging-cron-job.module.ts | 16 +- .../jobs/blocklist-reimport-messages.job.ts | 4 +- ...s => gmail-full-message-list-fetch.job.ts} | 14 +- .../jobs/gmail-message-list-fetch.job.ts | 97 +++++++++++ ...> gmail-partial-message-list-fetch.job.ts} | 14 +- .../messaging/jobs/messaging-job.module.ts | 30 ++-- .../message-channel.repository.ts | 18 +++ ...cted-account-and-message-channel.module.ts | 18 +++ ...ted-account-and-message-channel.service.ts | 62 +++++++ .../gmail-full-message-list-fetch.module.ts} | 8 +- .../gmail-full-message-list-fetch.service.ts} | 4 +- .../gmail-messages-import-v2.service.ts | 146 +++++++++++++++++ .../gmail-messages-import.module.ts} | 16 +- .../gmail-messages-import.service.ts} | 16 +- ...es-and-enqueue-contact-creation.service.ts | 96 +++++++++++ .../gmail-get-history.service.ts | 104 ++++++++++++ ...ssage-list-fetch-error-handling.service.ts | 122 ++++++++++++++ ...l-partial-message-list-fetch-v2.service.ts | 152 ++++++++++++++++++ ...mail-partial-message-list-fetch.module.ts} | 21 ++- ...ail-partial-message-list-fetch.service.ts} | 16 +- .../set-message-channel-sync-status.module.ts | 14 ++ ...set-message-channel-sync-status.service.ts | 101 ++++++++++++ .../message-channel.workspace-entity.ts | 7 + 31 files changed, 1185 insertions(+), 115 deletions(-) rename packages/twenty-server/src/modules/messaging/crons/commands/{gmail-partial-sync.cron.command.ts => gmail-message-list-fetch.cron.command.ts} (74%) rename packages/twenty-server/src/modules/messaging/crons/commands/{gmail-fetch-messages-from-cache.cron.command.ts => gmail-messages-import.cron.command.ts} (69%) rename packages/twenty-server/src/modules/messaging/crons/jobs/{gmail-partial-sync.cron.job.ts => gmail-message-list-fetch.cron.job.ts} (61%) rename packages/twenty-server/src/modules/messaging/crons/jobs/{gmail-fetch-messages-from-cache.cron.job.ts => gmail-messages-import.cron.job.ts} (50%) rename packages/twenty-server/src/modules/messaging/jobs/{gmail-full-sync.job.ts => gmail-full-message-list-fetch.job.ts} (68%) create mode 100644 packages/twenty-server/src/modules/messaging/jobs/gmail-message-list-fetch.job.ts rename packages/twenty-server/src/modules/messaging/jobs/{gmail-partial-sync.job.ts => gmail-partial-message-list-fetch.job.ts} (67%) create mode 100644 packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module.ts create mode 100644 packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service.ts rename packages/twenty-server/src/modules/messaging/services/{gmail-full-sync/gmail-full-sync.module.ts => gmail-full-message-list-fetch/gmail-full-message-list-fetch.module.ts} (84%) rename packages/twenty-server/src/modules/messaging/services/{gmail-full-sync/gmail-full-sync.service.ts => gmail-full-message-list-fetch/gmail-full-message-list-fetch.service.ts} (98%) create mode 100644 packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service.ts rename packages/twenty-server/src/modules/messaging/services/{gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts => gmail-messages-import/gmail-messages-import.module.ts} (57%) rename packages/twenty-server/src/modules/messaging/services/{gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts => gmail-messages-import/gmail-messages-import.service.ts} (96%) create mode 100644 packages/twenty-server/src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service.ts create mode 100644 packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service.ts create mode 100644 packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service.ts create mode 100644 packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service.ts rename packages/twenty-server/src/modules/messaging/services/{gmail-partial-sync/gmail-partial-sync.module.ts => gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module.ts} (55%) rename packages/twenty-server/src/modules/messaging/services/{gmail-partial-sync/gmail-partial-sync.service.ts => gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service.ts} (96%) create mode 100644 packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module.ts create mode 100644 packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service.ts diff --git a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts index 3fee59544..3dd53036e 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts @@ -32,9 +32,9 @@ import { MessageChannelVisibility, } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; import { - GmailFullSyncJobData, - GmailFullSyncJob, -} from 'src/modules/messaging/jobs/gmail-full-sync.job'; + GmailFullMessageListFetchJobData, + GmailFullMessageListFetchJob, +} from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job'; @Injectable() export class GoogleAPIsService { @@ -156,8 +156,8 @@ export class GoogleAPIsService { isCalendarEnabled: boolean, ) { if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) { - await this.messageQueueService.add( - GmailFullSyncJob.name, + await this.messageQueueService.add( + GmailFullMessageListFetchJob.name, { workspaceId, connectedAccountId, diff --git a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts index 5ef30f9be..6acee1a8b 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts @@ -17,9 +17,9 @@ import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-s import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module'; import { CleanInactiveWorkspaceJob } from 'src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job'; import { CalendarEventParticipantModule } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.module'; -import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module'; -import { GmailFullSyncModule } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module'; -import { GmailPartialSyncModule } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module'; +import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module'; +import { GmailFullMessageListFetchModule } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module'; +import { GmailPartialMessageListFetchModule } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module'; import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module'; import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module'; import { CalendarMessagingParticipantJobModule } from 'src/modules/calendar-messaging-participant/jobs/calendar-messaging-participant-job.module'; @@ -41,9 +41,9 @@ import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module BillingModule, UserWorkspaceModule, WorkspaceModule, - GmailFullSyncModule, - GmailFetchMessageContentFromCacheModule, - GmailPartialSyncModule, + GmailFullMessageListFetchModule, + GmailMessagesImportModule, + GmailPartialMessageListFetchModule, CalendarEventParticipantModule, TimelineActivityModule, StripeModule, diff --git a/packages/twenty-server/src/modules/connected-account/repositories/connected-account.repository.ts b/packages/twenty-server/src/modules/connected-account/repositories/connected-account.repository.ts index cb07db84f..0f94f2861 100644 --- a/packages/twenty-server/src/modules/connected-account/repositories/connected-account.repository.ts +++ b/packages/twenty-server/src/modules/connected-account/repositories/connected-account.repository.ts @@ -254,4 +254,30 @@ export class ConnectedAccountRepository { transactionManager, ); } + + public async getConnectedAccountOrThrow( + workspaceId: string, + connectedAccountId: string, + ): Promise> { + const connectedAccount = await this.getById( + connectedAccountId, + workspaceId, + ); + + if (!connectedAccount) { + throw new Error( + `Connected account ${connectedAccountId} not found in workspace ${workspaceId}`, + ); + } + + const refreshToken = connectedAccount.refreshToken; + + if (!refreshToken) { + throw new Error( + `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`, + ); + } + + return connectedAccount; + } } diff --git a/packages/twenty-server/src/modules/messaging/crons/commands/gmail-partial-sync.cron.command.ts b/packages/twenty-server/src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command.ts similarity index 74% rename from packages/twenty-server/src/modules/messaging/crons/commands/gmail-partial-sync.cron.command.ts rename to packages/twenty-server/src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command.ts index 3b122ef6d..da419abd5 100644 --- a/packages/twenty-server/src/modules/messaging/crons/commands/gmail-partial-sync.cron.command.ts +++ b/packages/twenty-server/src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command.ts @@ -4,16 +4,16 @@ import { Command, CommandRunner } from 'nest-commander'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { GmailPartialSyncCronJob } from 'src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job'; +import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job'; const GMAIL_PARTIAL_SYNC_CRON_PATTERN = '*/5 * * * *'; @Command({ - name: 'cron:messaging:gmail-partial-sync', + name: 'cron:messaging:gmail-message-list-fetch', description: 'Starts a cron job to sync existing connected account messages and store them in the cache', }) -export class GmailPartialSyncCronCommand extends CommandRunner { +export class GmailMessageListFetchCronCommand extends CommandRunner { constructor( @Inject(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, @@ -23,7 +23,7 @@ export class GmailPartialSyncCronCommand extends CommandRunner { async run(): Promise { await this.messageQueueService.addCron( - GmailPartialSyncCronJob.name, + GmailMessageListFetchCronJob.name, undefined, { repeat: { pattern: GMAIL_PARTIAL_SYNC_CRON_PATTERN }, diff --git a/packages/twenty-server/src/modules/messaging/crons/commands/gmail-fetch-messages-from-cache.cron.command.ts b/packages/twenty-server/src/modules/messaging/crons/commands/gmail-messages-import.cron.command.ts similarity index 69% rename from packages/twenty-server/src/modules/messaging/crons/commands/gmail-fetch-messages-from-cache.cron.command.ts rename to packages/twenty-server/src/modules/messaging/crons/commands/gmail-messages-import.cron.command.ts index d08eee18c..ec2986f6a 100644 --- a/packages/twenty-server/src/modules/messaging/crons/commands/gmail-fetch-messages-from-cache.cron.command.ts +++ b/packages/twenty-server/src/modules/messaging/crons/commands/gmail-messages-import.cron.command.ts @@ -4,13 +4,13 @@ import { Command, CommandRunner } from 'nest-commander'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { GmailFetchMessagesFromCacheCronJob } from 'src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job'; +import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gmail-messages-import.cron.job'; @Command({ - name: 'cron:messaging:gmail-fetch-messages-from-cache', + name: 'cron:messaging:gmail-messages-import', description: 'Starts a cron job to fetch all messages from cache', }) -export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner { +export class GmailMessagesImportCronCommand extends CommandRunner { constructor( @Inject(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, @@ -20,7 +20,7 @@ export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner { async run(): Promise { await this.messageQueueService.addCron( - GmailFetchMessagesFromCacheCronJob.name, + GmailMessagesImportCronJob.name, undefined, { repeat: { diff --git a/packages/twenty-server/src/modules/messaging/crons/commands/messaging-cron-commands.module.ts b/packages/twenty-server/src/modules/messaging/crons/commands/messaging-cron-commands.module.ts index eff0c5b4f..082aff092 100644 --- a/packages/twenty-server/src/modules/messaging/crons/commands/messaging-cron-commands.module.ts +++ b/packages/twenty-server/src/modules/messaging/crons/commands/messaging-cron-commands.module.ts @@ -2,17 +2,14 @@ import { Module } from '@nestjs/common'; import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { GmailFetchMessagesFromCacheCronCommand } from 'src/modules/messaging/crons/commands/gmail-fetch-messages-from-cache.cron.command'; -import { GmailPartialSyncCronCommand } from 'src/modules/messaging/crons/commands/gmail-partial-sync.cron.command'; +import { GmailMessagesImportCronCommand } from 'src/modules/messaging/crons/commands/gmail-messages-import.cron.command'; +import { GmailMessageListFetchCronCommand } from 'src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command'; @Module({ imports: [ ObjectMetadataRepositoryModule.forFeature([ ConnectedAccountWorkspaceEntity, ]), ], - providers: [ - GmailPartialSyncCronCommand, - GmailFetchMessagesFromCacheCronCommand, - ], + providers: [GmailMessageListFetchCronCommand, GmailMessagesImportCronCommand], }) export class MessagingCronCommandsModule {} diff --git a/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job.ts b/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job.ts similarity index 61% rename from packages/twenty-server/src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job.ts rename to packages/twenty-server/src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job.ts index 19ed32667..6d1ba52f1 100644 --- a/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job.ts @@ -8,19 +8,29 @@ import { MessageQueueJob } from 'src/engine/integrations/message-queue/interface import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { - GmailPartialSyncJobData, - GmailPartialSyncJob, -} from 'src/modules/messaging/jobs/gmail-partial-sync.job'; + GmailPartialMessageListFetchJobData, + GmailPartialMessageListFetchJob, +} from 'src/modules/messaging/jobs/gmail-partial-message-list-fetch.job'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; +import { + FeatureFlagEntity, + FeatureFlagKeys, +} from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { + GmailMessageListFetchJobData, + GmailMessageListFetchJob, +} from 'src/modules/messaging/jobs/gmail-message-list-fetch.job'; @Injectable() -export class GmailPartialSyncCronJob implements MessageQueueJob { - private readonly logger = new Logger(GmailPartialSyncCronJob.name); +export class GmailMessageListFetchCronJob + implements MessageQueueJob +{ + private readonly logger = new Logger(GmailMessageListFetchCronJob.name); constructor( @InjectRepository(Workspace, 'core') @@ -32,6 +42,8 @@ export class GmailPartialSyncCronJob implements MessageQueueJob { @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) private readonly messageChannelRepository: MessageChannelRepository, private readonly environmentService: EnvironmentService, + @InjectRepository(FeatureFlagEntity, 'core') + private readonly featureFlagRepository: Repository, ) {} async handle(): Promise { @@ -57,11 +69,20 @@ export class GmailPartialSyncCronJob implements MessageQueueJob { ); for (const workspaceId of workspaceIdsWithDataSources) { - await this.enqueuePartialSyncs(workspaceId); + await this.enqueueSyncs(workspaceId); } } - private async enqueuePartialSyncs(workspaceId: string): Promise { + private async enqueueSyncs(workspaceId: string): Promise { + const isGmailSyncV2EnabledFeatureFlag = + await this.featureFlagRepository.findOneBy({ + workspaceId: workspaceId, + key: FeatureFlagKeys.IsGmailSyncV2Enabled, + value: true, + }); + + const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value; + try { const messageChannels = await this.messageChannelRepository.getAll(workspaceId); @@ -71,16 +92,29 @@ export class GmailPartialSyncCronJob implements MessageQueueJob { continue; } - await this.messageQueueService.add( - GmailPartialSyncJob.name, - { - workspaceId, - connectedAccountId: messageChannel.connectedAccountId, - }, - { - retryLimit: 2, - }, - ); + if (isGmailSyncV2Enabled) { + await this.messageQueueService.add( + GmailMessageListFetchJob.name, + { + workspaceId, + connectedAccountId: messageChannel.connectedAccountId, + }, + { + retryLimit: 2, + }, + ); + } else { + await this.messageQueueService.add( + GmailPartialMessageListFetchJob.name, + { + workspaceId, + connectedAccountId: messageChannel.connectedAccountId, + }, + { + retryLimit: 2, + }, + ); + } } } catch (error) { this.logger.error( diff --git a/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job.ts b/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-messages-import.cron.job.ts similarity index 50% rename from packages/twenty-server/src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job.ts rename to packages/twenty-server/src/modules/messaging/crons/jobs/gmail-messages-import.cron.job.ts index 38eb5c838..9d1ea699d 100644 --- a/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/crons/jobs/gmail-messages-import.cron.job.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; import { Repository, In } from 'typeorm'; @@ -10,13 +10,20 @@ import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-s import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; -import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service'; +import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service'; import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; +import { GmailMessagesImportV2Service } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service'; +import { + FeatureFlagEntity, + FeatureFlagKeys, +} from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; @Injectable() -export class GmailFetchMessagesFromCacheCronJob - implements MessageQueueJob -{ +export class GmailMessagesImportCronJob implements MessageQueueJob { + private readonly logger = new Logger(GmailMessagesImportCronJob.name); + constructor( @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, @@ -24,8 +31,13 @@ export class GmailFetchMessagesFromCacheCronJob private readonly dataSourceRepository: Repository, @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) private readonly messageChannelRepository: MessageChannelRepository, - private readonly gmailFetchMessageContentFromCacheService: GmailFetchMessageContentFromCacheService, + private readonly gmailFetchMessageContentFromCacheService: GmailMessagesImportService, + private readonly gmailFetchMessageContentFromCacheV2Service: GmailMessagesImportV2Service, + @InjectRepository(FeatureFlagEntity, 'core') + private readonly featureFlagRepository: Repository, private readonly environmentService: EnvironmentService, + @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) + private readonly connectedAccountRepository: ConnectedAccountRepository, ) {} async handle(): Promise { @@ -59,11 +71,42 @@ export class GmailFetchMessagesFromCacheCronJob const messageChannels = await this.messageChannelRepository.getAll(workspaceId); + const isGmailSyncV2EnabledFeatureFlag = + await this.featureFlagRepository.findOneBy({ + workspaceId: workspaceId, + key: FeatureFlagKeys.IsGmailSyncV2Enabled, + value: true, + }); + + const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value; + for (const messageChannel of messageChannels) { - await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache( - workspaceId, - messageChannel.connectedAccountId, - ); + if (!messageChannel?.isSyncEnabled) { + continue; + } + + if (isGmailSyncV2Enabled) { + try { + const connectedAccount = + await this.connectedAccountRepository.getConnectedAccountOrThrow( + workspaceId, + messageChannel.connectedAccountId, + ); + + await this.gmailFetchMessageContentFromCacheV2Service.processMessageBatchImport( + messageChannel, + connectedAccount, + workspaceId, + ); + } catch (error) { + this.logger.log(error.message); + } + } else { + await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache( + workspaceId, + messageChannel.connectedAccountId, + ); + } } } } diff --git a/packages/twenty-server/src/modules/messaging/crons/jobs/messaging-cron-job.module.ts b/packages/twenty-server/src/modules/messaging/crons/jobs/messaging-cron-job.module.ts index 8d171c915..7ef4fedf8 100644 --- a/packages/twenty-server/src/modules/messaging/crons/jobs/messaging-cron-job.module.ts +++ b/packages/twenty-server/src/modules/messaging/crons/jobs/messaging-cron-job.module.ts @@ -5,9 +5,9 @@ import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature- import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; -import { GmailFetchMessagesFromCacheCronJob } from 'src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job'; -import { GmailPartialSyncCronJob } from 'src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job'; -import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module'; +import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gmail-messages-import.cron.job'; +import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job'; +import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; @Module({ @@ -15,16 +15,16 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob TypeOrmModule.forFeature([Workspace, FeatureFlagEntity], 'core'), TypeOrmModule.forFeature([DataSourceEntity], 'metadata'), ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]), - GmailFetchMessageContentFromCacheModule, + GmailMessagesImportModule, ], providers: [ { - provide: GmailFetchMessagesFromCacheCronJob.name, - useClass: GmailFetchMessagesFromCacheCronJob, + provide: GmailMessagesImportCronJob.name, + useClass: GmailMessagesImportCronJob, }, { - provide: GmailPartialSyncCronJob.name, - useClass: GmailPartialSyncCronJob, + provide: GmailMessageListFetchCronJob.name, + useClass: GmailMessageListFetchCronJob, }, ], }) diff --git a/packages/twenty-server/src/modules/messaging/jobs/blocklist-reimport-messages.job.ts b/packages/twenty-server/src/modules/messaging/jobs/blocklist-reimport-messages.job.ts index 62fdf9a4e..44ffeb4d0 100644 --- a/packages/twenty-server/src/modules/messaging/jobs/blocklist-reimport-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/jobs/blocklist-reimport-messages.job.ts @@ -5,7 +5,7 @@ import { MessageQueueJob } from 'src/engine/integrations/message-queue/interface import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service'; +import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service'; export type BlocklistReimportMessagesJobData = { workspaceId: string; @@ -22,7 +22,7 @@ export class BlocklistReimportMessagesJob constructor( @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, - private readonly gmailFullSyncService: GmailFullSyncService, + private readonly gmailFullSyncService: GmailFullMessageListFetchService, ) {} async handle(data: BlocklistReimportMessagesJobData): Promise { diff --git a/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync.job.ts b/packages/twenty-server/src/modules/messaging/jobs/gmail-full-message-list-fetch.job.ts similarity index 68% rename from packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync.job.ts rename to packages/twenty-server/src/modules/messaging/jobs/gmail-full-message-list-fetch.job.ts index ad744c91d..14d364fd0 100644 --- a/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync.job.ts +++ b/packages/twenty-server/src/modules/messaging/jobs/gmail-full-message-list-fetch.job.ts @@ -3,23 +3,25 @@ import { Injectable, Logger } from '@nestjs/common'; import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service'; -import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service'; +import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service'; -export type GmailFullSyncJobData = { +export type GmailFullMessageListFetchJobData = { workspaceId: string; connectedAccountId: string; }; @Injectable() -export class GmailFullSyncJob implements MessageQueueJob { - private readonly logger = new Logger(GmailFullSyncJob.name); +export class GmailFullMessageListFetchJob + implements MessageQueueJob +{ + private readonly logger = new Logger(GmailFullMessageListFetchJob.name); constructor( private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService, - private readonly gmailFullSyncService: GmailFullSyncService, + private readonly gmailFullSyncService: GmailFullMessageListFetchService, ) {} - async handle(data: GmailFullSyncJobData): Promise { + async handle(data: GmailFullMessageListFetchJobData): Promise { this.logger.log( `gmail full-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`, ); diff --git a/packages/twenty-server/src/modules/messaging/jobs/gmail-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/jobs/gmail-message-list-fetch.job.ts new file mode 100644 index 000000000..ca638890e --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/jobs/gmail-message-list-fetch.job.ts @@ -0,0 +1,97 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; + +import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service'; +import { GmailPartialMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service'; +import { GetConnectedAccountAndMessageChannelService } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service'; +import { MessageChannelSyncSubStatus } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; +import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service'; + +export type GmailMessageListFetchJobData = { + workspaceId: string; + connectedAccountId: string; +}; + +@Injectable() +export class GmailMessageListFetchJob + implements MessageQueueJob +{ + private readonly logger = new Logger(GmailMessageListFetchJob.name); + + constructor( + private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService, + private readonly gmailFullSyncService: GmailFullMessageListFetchService, + private readonly gmailPartialMessageListFetchV2Service: GmailPartialMessageListFetchV2Service, + private readonly getConnectedAccountAndMessageChannelService: GetConnectedAccountAndMessageChannelService, + ) {} + + async handle(data: GmailMessageListFetchJobData): Promise { + const { workspaceId, connectedAccountId } = data; + + this.logger.log( + `Fetch gmail message list for workspace ${workspaceId} and account ${connectedAccountId}`, + ); + + try { + await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken( + workspaceId, + connectedAccountId, + ); + } catch (e) { + this.logger.error( + `Error refreshing access token for connected account ${connectedAccountId} in workspace ${workspaceId}`, + e, + ); + + return; + } + + const { messageChannel, connectedAccount } = + await this.getConnectedAccountAndMessageChannelService.getConnectedAccountAndMessageChannelOrThrow( + workspaceId, + connectedAccountId, + ); + + switch (messageChannel.syncSubStatus) { + case MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING: + try { + await this.gmailPartialMessageListFetchV2Service.processMessageListFetch( + messageChannel, + connectedAccount, + workspaceId, + ); + } catch (e) { + this.logger.error(e); + } + + return; + + case MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING: + try { + await this.gmailFullSyncService.fetchConnectedAccountThreads( + workspaceId, + connectedAccountId, + ); + } catch (e) { + this.logger.error(e); + } + + return; + + case MessageChannelSyncSubStatus.FAILED: + this.logger.error( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is in a failed state.`, + ); + + return; + + default: + this.logger.error( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is locked, import will be retried later.`, + ); + + return; + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/jobs/gmail-partial-sync.job.ts b/packages/twenty-server/src/modules/messaging/jobs/gmail-partial-message-list-fetch.job.ts similarity index 67% rename from packages/twenty-server/src/modules/messaging/jobs/gmail-partial-sync.job.ts rename to packages/twenty-server/src/modules/messaging/jobs/gmail-partial-message-list-fetch.job.ts index abc39b569..9954492ba 100644 --- a/packages/twenty-server/src/modules/messaging/jobs/gmail-partial-sync.job.ts +++ b/packages/twenty-server/src/modules/messaging/jobs/gmail-partial-message-list-fetch.job.ts @@ -3,25 +3,25 @@ import { Injectable, Logger } from '@nestjs/common'; import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service'; -import { GmailPartialSyncV2Service } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service'; +import { GmailPartialMessageListFetchService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service'; -export type GmailPartialSyncJobData = { +export type GmailPartialMessageListFetchJobData = { workspaceId: string; connectedAccountId: string; }; @Injectable() -export class GmailPartialSyncJob - implements MessageQueueJob +export class GmailPartialMessageListFetchJob + implements MessageQueueJob { - private readonly logger = new Logger(GmailPartialSyncJob.name); + private readonly logger = new Logger(GmailPartialMessageListFetchJob.name); constructor( private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService, - private readonly gmailPartialSyncService: GmailPartialSyncV2Service, + private readonly gmailPartialSyncService: GmailPartialMessageListFetchService, ) {} - async handle(data: GmailPartialSyncJobData): Promise { + async handle(data: GmailPartialMessageListFetchJobData): Promise { this.logger.log( `gmail partial-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`, ); diff --git a/packages/twenty-server/src/modules/messaging/jobs/messaging-job.module.ts b/packages/twenty-server/src/modules/messaging/jobs/messaging-job.module.ts index 41e613f26..820ebf476 100644 --- a/packages/twenty-server/src/modules/messaging/jobs/messaging-job.module.ts +++ b/packages/twenty-server/src/modules/messaging/jobs/messaging-job.module.ts @@ -1,5 +1,7 @@ import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; import { AutoCompaniesAndContactsCreationModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/auto-companies-and-contacts-creation.module'; import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module'; @@ -8,11 +10,13 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s import { BlocklistItemDeleteMessagesJob } from 'src/modules/messaging/jobs/blocklist-item-delete-messages.job'; import { BlocklistReimportMessagesJob } from 'src/modules/messaging/jobs/blocklist-reimport-messages.job'; import { DeleteConnectedAccountAssociatedMessagingDataJob } from 'src/modules/messaging/jobs/delete-connected-account-associated-messaging-data.job'; -import { GmailFullSyncJob } from 'src/modules/messaging/jobs/gmail-full-sync.job'; -import { GmailPartialSyncJob } from 'src/modules/messaging/jobs/gmail-partial-sync.job'; +import { GmailFullMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job'; +import { GmailMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-message-list-fetch.job'; +import { GmailPartialMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-partial-message-list-fetch.job'; import { MessagingCreateCompanyAndContactAfterSyncJob } from 'src/modules/messaging/jobs/messaging-create-company-and-contact-after-sync.job'; -import { GmailFullSyncModule } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module'; -import { GmailPartialSyncModule } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module'; +import { GetConnectedAccountAndMessageChannelModule } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module'; +import { GmailFullMessageListFetchModule } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module'; +import { GmailPartialMessageListFetchModule } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module'; import { ThreadCleanerModule } from 'src/modules/messaging/services/thread-cleaner/thread-cleaner.module'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; @@ -27,11 +31,13 @@ import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/standar MessageChannelMessageAssociationWorkspaceEntity, BlocklistWorkspaceEntity, ]), - GmailFullSyncModule, - GmailPartialSyncModule, + GmailFullMessageListFetchModule, + GmailPartialMessageListFetchModule, ThreadCleanerModule, GoogleAPIRefreshAccessTokenModule, AutoCompaniesAndContactsCreationModule, + TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), + GetConnectedAccountAndMessageChannelModule, ], providers: [ { @@ -43,12 +49,16 @@ import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/standar useClass: BlocklistItemDeleteMessagesJob, }, { - provide: GmailFullSyncJob.name, - useClass: GmailFullSyncJob, + provide: GmailFullMessageListFetchJob.name, + useClass: GmailFullMessageListFetchJob, }, { - provide: GmailPartialSyncJob.name, - useClass: GmailPartialSyncJob, + provide: GmailPartialMessageListFetchJob.name, + useClass: GmailPartialMessageListFetchJob, + }, + { + provide: GmailMessageListFetchJob.name, + useClass: GmailMessageListFetchJob, }, { provide: DeleteConnectedAccountAssociatedMessagingDataJob.name, diff --git a/packages/twenty-server/src/modules/messaging/repositories/message-channel.repository.ts b/packages/twenty-server/src/modules/messaging/repositories/message-channel.repository.ts index 4bc249f4a..3eb6f9d76 100644 --- a/packages/twenty-server/src/modules/messaging/repositories/message-channel.repository.ts +++ b/packages/twenty-server/src/modules/messaging/repositories/message-channel.repository.ts @@ -6,6 +6,7 @@ import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/work import { MessageChannelWorkspaceEntity, MessageChannelSyncStatus, + MessageChannelSyncSubStatus, } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; @@ -187,6 +188,23 @@ export class MessageChannelRepository { ); } + public async updateSyncSubStatus( + id: string, + syncSubStatus: MessageChannelSyncSubStatus, + workspaceId: string, + transactionManager?: EntityManager, + ): Promise { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."messageChannel" SET "syncSubStatus" = $1 WHERE "id" = $2`, + [syncSubStatus, id], + workspaceId, + transactionManager, + ); + } + public async updateLastSyncCursorIfHigher( id: string, syncCursor: string, diff --git a/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module.ts b/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module.ts new file mode 100644 index 000000000..137fc7a4b --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module.ts @@ -0,0 +1,18 @@ +import { Module } from '@nestjs/common'; + +import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { GetConnectedAccountAndMessageChannelService } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; + +@Module({ + imports: [ + ObjectMetadataRepositoryModule.forFeature([ + ConnectedAccountWorkspaceEntity, + MessageChannelWorkspaceEntity, + ]), + ], + providers: [GetConnectedAccountAndMessageChannelService], + exports: [GetConnectedAccountAndMessageChannelService], +}) +export class GetConnectedAccountAndMessageChannelModule {} diff --git a/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service.ts b/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service.ts new file mode 100644 index 000000000..441696ea1 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service.ts @@ -0,0 +1,62 @@ +import { Injectable } from '@nestjs/common'; + +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; +import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; + +@Injectable() +export class GetConnectedAccountAndMessageChannelService { + constructor( + @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) + private readonly connectedAccountRepository: ConnectedAccountRepository, + @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) + private readonly messageChannelRepository: MessageChannelRepository, + ) {} + + public async getConnectedAccountAndMessageChannelOrThrow( + workspaceId: string, + connectedAccountId: string, + ): Promise<{ + messageChannel: ObjectRecord; + connectedAccount: ObjectRecord; + }> { + const connectedAccount = await this.connectedAccountRepository.getById( + connectedAccountId, + workspaceId, + ); + + if (!connectedAccount) { + throw new Error( + `Connected account ${connectedAccountId} not found in workspace ${workspaceId}`, + ); + } + + const refreshToken = connectedAccount.refreshToken; + + if (!refreshToken) { + throw new Error( + `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`, + ); + } + + const messageChannel = + await this.messageChannelRepository.getFirstByConnectedAccountId( + connectedAccountId, + workspaceId, + ); + + if (!messageChannel) { + throw new Error( + `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`, + ); + } + + return { + messageChannel, + connectedAccount, + }; + } +} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module.ts similarity index 84% rename from packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module.ts rename to packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module.ts index 75c2de0b3..b782de73a 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module.ts @@ -7,7 +7,7 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module'; -import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service'; +import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service'; import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; @@ -25,7 +25,7 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), WorkspaceDataSourceModule, ], - providers: [GmailFullSyncService], - exports: [GmailFullSyncService], + providers: [GmailFullMessageListFetchService], + exports: [GmailFullMessageListFetchService], }) -export class GmailFullSyncModule {} +export class GmailFullMessageListFetchModule {} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service.ts similarity index 98% rename from packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service.ts rename to packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service.ts index 043c1a3c8..51dd0157b 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service.ts @@ -29,8 +29,8 @@ import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/work import { gmailSearchFilterEmailAdresses } from 'src/modules/messaging/utils/gmail-search-filter.util'; @Injectable() -export class GmailFullSyncService { - private readonly logger = new Logger(GmailFullSyncService.name); +export class GmailFullMessageListFetchService { + private readonly logger = new Logger(GmailFullMessageListFetchService.name); constructor( private readonly gmailClientProvider: GmailClientProvider, diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service.ts new file mode 100644 index 000000000..5b86f3093 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service.ts @@ -0,0 +1,146 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { FetchMessagesByBatchesService } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.service'; +import { + MessageChannelWorkspaceEntity, + MessageChannelSyncSubStatus, +} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; +import { createQueriesFromMessageIds } from 'src/modules/messaging/utils/create-queries-from-message-ids.util'; +import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; +import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; +import { GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant'; +import { GMAIL_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant'; +import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service'; +import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service'; +import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; +import { SaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service'; + +@Injectable() +export class GmailMessagesImportV2Service { + private readonly logger = new Logger(GmailMessagesImportService.name); + + constructor( + private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, + @InjectCacheStorage(CacheStorageNamespace.Messaging) + private readonly cacheStorage: CacheStorageService, + private readonly setMessageChannelSyncStatusService: SetMessageChannelSyncStatusService, + private readonly saveMessagesAndEnqueueContactCreationService: SaveMessagesAndEnqueueContactCreationService, + ) {} + + async processMessageBatchImport( + messageChannel: ObjectRecord, + connectedAccount: ObjectRecord, + workspaceId: string, + ) { + if (messageChannel.syncSubStatus === MessageChannelSyncSubStatus.FAILED) { + throw new Error( + `Connected account ${connectedAccount.id} in workspace ${workspaceId} is in a failed state. Skipping...`, + ); + } + + if ( + messageChannel.syncSubStatus !== + MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING + ) { + throw new Error( + `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} is not pending.`, + ); + } + + await this.setMessageChannelSyncStatusService.setMessagesImportOnGoingStatus( + messageChannel.id, + workspaceId, + ); + + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`, + ); + + const messageIdsToFetch = + (await this.cacheStorage.setPop( + `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, + GMAIL_USERS_MESSAGES_GET_BATCH_SIZE, + )) ?? []; + + if (!messageIdsToFetch?.length) { + await this.setMessageChannelSyncStatusService.setCompletedStatus( + messageChannel.id, + workspaceId, + ); + + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with nothing to import or delete.`, + ); + + return; + } + + const messageQueries = createQueriesFromMessageIds(messageIdsToFetch); + + try { + const messagesToSave = + await this.fetchMessagesByBatchesService.fetchAllMessages( + messageQueries, + connectedAccount.accessToken, + workspaceId, + connectedAccount.id, + ); + + if (!messagesToSave.length) { + await this.setMessageChannelSyncStatusService.setCompletedStatus( + messageChannel.id, + workspaceId, + ); + + return []; + } + + await this.saveMessagesAndEnqueueContactCreationService.saveMessagesAndEnqueueContactCreationJob( + messagesToSave, + messageChannel, + connectedAccount, + workspaceId, + ); + + if (messageIdsToFetch.length < GMAIL_USERS_MESSAGES_GET_BATCH_SIZE) { + await this.setMessageChannelSyncStatusService.setCompletedStatus( + messageChannel.id, + workspaceId, + ); + + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with no more messages to import.`, + ); + } else { + await this.setMessageChannelSyncStatusService.setMessagesImportPendingStatus( + messageChannel.id, + workspaceId, + ); + + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with more messages to import.`, + ); + } + } catch (error) { + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, + messageIdsToFetch, + ); + + await this.setMessageChannelSyncStatusService.setFailedUnkownStatus( + messageChannel.id, + workspaceId, + ); + + this.logger.error( + `Error fetching messages for ${connectedAccount.id} in workspace ${workspaceId}: locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`, + ); + + throw new Error( + `Error fetching messages for ${connectedAccount.id} in workspace ${workspaceId}: ${error.message}`, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module.ts similarity index 57% rename from packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts rename to packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module.ts index a287550a9..6ceabf3e2 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module.ts @@ -4,9 +4,12 @@ import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repos import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module'; -import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service'; +import { GmailMessagesImportV2Service } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service'; +import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service'; +import { SaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service'; import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module'; import { MessageModule } from 'src/modules/messaging/services/message/message.module'; +import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; @Module({ @@ -20,8 +23,13 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob WorkspaceDataSourceModule, MessageModule, MessageParticipantModule, + SetMessageChannelSyncStatusModule, ], - providers: [GmailFetchMessageContentFromCacheService], - exports: [GmailFetchMessageContentFromCacheService], + providers: [ + GmailMessagesImportService, + GmailMessagesImportV2Service, + SaveMessagesAndEnqueueContactCreationService, + ], + exports: [GmailMessagesImportService, GmailMessagesImportV2Service], }) -export class GmailFetchMessageContentFromCacheModule {} +export class GmailMessagesImportModule {} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service.ts similarity index 96% rename from packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts rename to packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service.ts index 8d4ef6896..c59721c0f 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service.ts @@ -18,9 +18,9 @@ import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache import { GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { - GmailFullSyncJobData, - GmailFullSyncJob, -} from 'src/modules/messaging/jobs/gmail-full-sync.job'; + GmailFullMessageListFetchJobData, + GmailFullMessageListFetchJob, +} from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { GMAIL_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant'; @@ -33,10 +33,8 @@ import { } from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job'; @Injectable() -export class GmailFetchMessageContentFromCacheService { - private readonly logger = new Logger( - GmailFetchMessageContentFromCacheService.name, - ); +export class GmailMessagesImportService { + private readonly logger = new Logger(GmailMessagesImportService.name); constructor( private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, @@ -296,8 +294,8 @@ export class GmailFetchMessageContentFromCacheService { workspaceId: string, connectedAccountId: string, ) { - await this.messageQueueService.add( - GmailFullSyncJob.name, + await this.messageQueueService.add( + GmailFullMessageListFetchJob.name, { workspaceId, connectedAccountId }, ); } diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service.ts new file mode 100644 index 000000000..fc99e3de2 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service.ts @@ -0,0 +1,96 @@ +import { Injectable, Inject } from '@nestjs/common'; + +import { EntityManager } from 'typeorm'; + +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageParticipantService } from 'src/modules/messaging/services/message-participant/message-participant.service'; +import { MessageService } from 'src/modules/messaging/services/message/message.service'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; +import { + GmailMessage, + ParticipantWithMessageId, +} from 'src/modules/messaging/types/gmail-message'; +import { + CreateCompanyAndContactJobData, + CreateCompanyAndContactJob, +} from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job'; + +@Injectable() +export class SaveMessagesAndEnqueueContactCreationService { + constructor( + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + @Inject(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + private readonly messageService: MessageService, + private readonly messageParticipantService: MessageParticipantService, + ) {} + + async saveMessagesAndEnqueueContactCreationJob( + messagesToSave: GmailMessage[], + messageChannel: ObjectRecord, + connectedAccount: ObjectRecord, + workspaceId: string, + ) { + const workspaceDataSource = + await this.workspaceDataSourceService.connectToWorkspaceDataSource( + workspaceId, + ); + + const participantsWithMessageId = await workspaceDataSource?.transaction( + async (transactionManager: EntityManager) => { + const messageExternalIdsAndIdsMap = + await this.messageService.saveMessagesWithinTransaction( + messagesToSave, + connectedAccount, + messageChannel.id, + workspaceId, + transactionManager, + ); + + const participantsWithMessageId: (ParticipantWithMessageId & { + shouldCreateContact: boolean; + })[] = messagesToSave.flatMap((message) => { + const messageId = messageExternalIdsAndIdsMap.get(message.externalId); + + return messageId + ? message.participants.map((participant) => ({ + ...participant, + messageId, + shouldCreateContact: + messageChannel.isContactAutoCreationEnabled && + message.participants.find((p) => p.role === 'from') + ?.handle === connectedAccount.handle, + })) + : []; + }); + + await this.messageParticipantService.saveMessageParticipants( + participantsWithMessageId, + workspaceId, + transactionManager, + ); + + return participantsWithMessageId; + }, + ); + + if (messageChannel.isContactAutoCreationEnabled) { + const contactsToCreate = participantsWithMessageId.filter( + (participant) => participant.shouldCreateContact, + ); + + await this.messageQueueService.add( + CreateCompanyAndContactJob.name, + { + workspaceId, + connectedAccountHandle: connectedAccount.handle, + contactsToCreate, + }, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service.ts new file mode 100644 index 000000000..e2953b0b8 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service.ts @@ -0,0 +1,104 @@ +import { Injectable } from '@nestjs/common'; + +import { gmail_v1 } from 'googleapis'; + +import { GMAIL_USERS_HISTORY_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-history-max-result.constant'; +import { GmailError } from 'src/modules/messaging/types/gmail-error'; + +@Injectable() +export class GmailGetHistoryService { + constructor() {} + + public async getHistory( + gmailClient: gmail_v1.Gmail, + lastSyncHistoryId: string, + ): Promise<{ + history: gmail_v1.Schema$History[]; + historyId?: string | null; + error?: GmailError; + }> { + const fullHistory: gmail_v1.Schema$History[] = []; + let pageToken: string | undefined; + let hasMoreMessages = true; + let nextHistoryId: string | undefined; + + while (hasMoreMessages) { + try { + const response = await gmailClient.users.history.list({ + userId: 'me', + maxResults: GMAIL_USERS_HISTORY_MAX_RESULT, + pageToken, + startHistoryId: lastSyncHistoryId, + historyTypes: ['messageAdded', 'messageDeleted'], + }); + + nextHistoryId = response?.data?.historyId ?? undefined; + + if (response?.data?.history) { + fullHistory.push(...response.data.history); + } + + pageToken = response?.data?.nextPageToken ?? undefined; + hasMoreMessages = !!pageToken; + } catch (error) { + const errorData = error?.response?.data?.error; + + if (errorData) { + return { + history: [], + error: errorData, + historyId: lastSyncHistoryId, + }; + } + + throw error; + } + } + + return { history: fullHistory, historyId: nextHistoryId }; + } + + public async getMessageIdsFromHistory( + history: gmail_v1.Schema$History[], + ): Promise<{ + messagesAdded: string[]; + messagesDeleted: string[]; + }> { + const { messagesAdded, messagesDeleted } = history.reduce( + ( + acc: { + messagesAdded: string[]; + messagesDeleted: string[]; + }, + history, + ) => { + const messagesAdded = history.messagesAdded?.map( + (messageAdded) => messageAdded.message?.id || '', + ); + + const messagesDeleted = history.messagesDeleted?.map( + (messageDeleted) => messageDeleted.message?.id || '', + ); + + if (messagesAdded) acc.messagesAdded.push(...messagesAdded); + if (messagesDeleted) acc.messagesDeleted.push(...messagesDeleted); + + return acc; + }, + { messagesAdded: [], messagesDeleted: [] }, + ); + + const uniqueMessagesAdded = messagesAdded.filter( + (messageId) => !messagesDeleted.includes(messageId), + ); + + const uniqueMessagesDeleted = messagesDeleted.filter( + (messageId) => !messagesAdded.includes(messageId), + ); + + return { + messagesAdded: uniqueMessagesAdded, + messagesDeleted: uniqueMessagesDeleted, + }; + } +} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service.ts new file mode 100644 index 000000000..638ffffdb --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service.ts @@ -0,0 +1,122 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; +import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; +import { + MessageChannelSyncStatus, + MessageChannelSyncSubStatus, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; +import { GmailError } from 'src/modules/messaging/types/gmail-error'; + +@Injectable() +export class GmailPartialMessageListFetchErrorHandlingService { + private readonly logger = new Logger( + GmailPartialMessageListFetchErrorHandlingService.name, + ); + + constructor( + @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) + private readonly connectedAccountRepository: ConnectedAccountRepository, + @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) + private readonly messageChannelRepository: MessageChannelRepository, + ) {} + + public async handleGmailError( + error: GmailError | undefined, + messageChannel: ObjectRecord, + connectedAccountId: string, + workspaceId: string, + ): Promise { + switch (error?.code) { + case 404: + this.logger.log( + `404: Invalid lastSyncHistoryId for workspace ${workspaceId} and account ${connectedAccountId}, falling back to full sync.`, + ); + await this.messageChannelRepository.resetSyncCursor( + messageChannel.id, + workspaceId, + ); + await this.messageChannelRepository.updateSyncSubStatus( + messageChannel.id, + MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING, + workspaceId, + ); + break; + + case 429: + this.logger.log( + `429: rate limit reached for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}, import will be retried later.`, + ); + await this.handleRateLimitExceeded(messageChannel, workspaceId); + break; + + case 403: + if ( + error?.errors?.[0]?.reason === 'rateLimitExceeded' || + error?.errors?.[0]?.reason === 'userRateLimitExceeded' + ) { + this.logger.log( + `403:${ + error?.errors?.[0]?.reason === 'userRateLimitExceeded' && ' user' + } rate limit exceeded for workspace ${workspaceId} and account ${connectedAccountId}: ${ + error.message + }, import will be retried later.`, + ); + this.handleRateLimitExceeded(messageChannel, workspaceId); + } else { + await this.handleInsufficientPermissions( + error, + messageChannel, + workspaceId, + ); + } + break; + + case 401: + this.handleInsufficientPermissions(error, messageChannel, workspaceId); + break; + + default: + break; + } + } + + public async handleRateLimitExceeded( + messageChannel: MessageChannelWorkspaceEntity, + workspaceId: string, + ): Promise { + await this.messageChannelRepository.updateSyncSubStatus( + messageChannel.id, + MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING, + workspaceId, + ); + } + + public async handleInsufficientPermissions( + error: GmailError, + messageChannel: MessageChannelWorkspaceEntity, + workspaceId: string, + ): Promise { + this.logger.error( + `{error?.code}: ${error.message} for workspace ${workspaceId} and account ${messageChannel.connectedAccount.id}`, + ); + await this.messageChannelRepository.updateSyncStatus( + messageChannel.id, + MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, + workspaceId, + ); + await this.messageChannelRepository.updateSyncSubStatus( + messageChannel.id, + MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING, + workspaceId, + ); + await this.connectedAccountRepository.updateAuthFailedAt( + messageChannel.connectedAccount.id, + workspaceId, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service.ts new file mode 100644 index 000000000..b3e85fb2d --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service.ts @@ -0,0 +1,152 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import { gmail_v1 } from 'googleapis'; + +import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider'; +import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; +import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; +import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; +import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity'; +import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository'; +import { GmailPartialMessageListFetchErrorHandlingService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service'; +import { GmailGetHistoryService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service'; +import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service'; +import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; + +@Injectable() +export class GmailPartialMessageListFetchV2Service { + private readonly logger = new Logger( + GmailPartialMessageListFetchV2Service.name, + ); + + constructor( + private readonly gmailClientProvider: GmailClientProvider, + @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) + private readonly messageChannelRepository: MessageChannelRepository, + @InjectCacheStorage(CacheStorageNamespace.Messaging) + private readonly cacheStorage: CacheStorageService, + @InjectObjectMetadataRepository( + MessageChannelMessageAssociationWorkspaceEntity, + ) + private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository, + private readonly gmailPartialMessageListFetchErrorHandlingService: GmailPartialMessageListFetchErrorHandlingService, + private readonly gmailGetHistoryService: GmailGetHistoryService, + private readonly setMessageChannelSyncStatusService: SetMessageChannelSyncStatusService, + ) {} + + public async processMessageListFetch( + messageChannel: ObjectRecord, + connectedAccount: ObjectRecord, + workspaceId: string, + ): Promise { + this.logger.log( + `Fetching partial message list for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + await this.setMessageChannelSyncStatusService.setMessageListFetchOnGoingStatus( + messageChannel.id, + workspaceId, + ); + + const lastSyncHistoryId = messageChannel.syncCursor; + + if (!lastSyncHistoryId) { + this.logger.log( + `No lastSyncHistoryId for workspace ${workspaceId} and account ${connectedAccount.id}, falling back to full sync.`, + ); + + await this.setMessageChannelSyncStatusService.setFullMessageListFetchPendingStatus( + messageChannel.id, + workspaceId, + ); + + return; + } + + const gmailClient: gmail_v1.Gmail = + await this.gmailClientProvider.getGmailClient( + connectedAccount.refreshToken, + ); + + const { history, historyId, error } = + await this.gmailGetHistoryService.getHistory( + gmailClient, + lastSyncHistoryId, + ); + + if (error) { + await this.gmailPartialMessageListFetchErrorHandlingService.handleGmailError( + error, + messageChannel, + workspaceId, + connectedAccount.id, + ); + + return; + } + + if (!historyId) { + throw new Error( + `No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`, + ); + } + + if (historyId === lastSyncHistoryId || !history?.length) { + this.logger.log( + `Partial message list import done with history ${historyId} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + await this.setMessageChannelSyncStatusService.setCompletedStatus( + messageChannel.id, + workspaceId, + ); + + return; + } + + const { messagesAdded, messagesDeleted } = + await this.gmailGetHistoryService.getMessageIdsFromHistory(history); + + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, + messagesAdded, + ); + + this.logger.log( + `Added ${messagesAdded.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + await this.messageChannelMessageAssociationRepository.deleteByMessageExternalIdsAndMessageChannelId( + messagesDeleted, + messageChannel.id, + workspaceId, + ); + + this.logger.log( + `Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + await this.messageChannelRepository.updateLastSyncCursorIfHigher( + messageChannel.id, + historyId, + workspaceId, + ); + + this.logger.log( + `Updated lastSyncCursor to ${historyId} for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + this.logger.log( + `Partial message list import done with history ${historyId} for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + await this.setMessageChannelSyncStatusService.setMessagesImportPendingStatus( + messageChannel.id, + workspaceId, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module.ts similarity index 55% rename from packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module.ts rename to packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module.ts index dd7663e17..ce288a5b7 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module.ts @@ -7,9 +7,13 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module'; -import { GmailPartialSyncV2Service as GmailPartialSyncService } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service'; +import { GmailGetHistoryService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service'; +import { GmailPartialMessageListFetchErrorHandlingService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service'; +import { GmailPartialMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service'; +import { GmailPartialMessageListFetchService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service'; import { MessageModule } from 'src/modules/messaging/services/message/message.module'; import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module'; +import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; @Module({ @@ -24,8 +28,17 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob MessageModule, TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), WorkspaceDataSourceModule, + SetMessageChannelSyncStatusModule, + ], + providers: [ + GmailPartialMessageListFetchService, + GmailPartialMessageListFetchV2Service, + GmailPartialMessageListFetchErrorHandlingService, + GmailGetHistoryService, + ], + exports: [ + GmailPartialMessageListFetchService, + GmailPartialMessageListFetchV2Service, ], - providers: [GmailPartialSyncService], - exports: [GmailPartialSyncService], }) -export class GmailPartialSyncModule {} +export class GmailPartialMessageListFetchModule {} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service.ts similarity index 96% rename from packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service.ts rename to packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service.ts index 4631b384c..7ffac02d7 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service.ts @@ -21,15 +21,17 @@ import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decora import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { - GmailFullSyncJob, - GmailFullSyncJobData, -} from 'src/modules/messaging/jobs/gmail-full-sync.job'; + GmailFullMessageListFetchJob, + GmailFullMessageListFetchJobData, +} from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository'; @Injectable() -export class GmailPartialSyncV2Service { - private readonly logger = new Logger(GmailPartialSyncV2Service.name); +export class GmailPartialMessageListFetchService { + private readonly logger = new Logger( + GmailPartialMessageListFetchService.name, + ); constructor( private readonly gmailClientProvider: GmailClientProvider, @@ -338,8 +340,8 @@ export class GmailPartialSyncV2Service { workspaceId: string, connectedAccountId: string, ) { - await this.messageQueueService.add( - GmailFullSyncJob.name, + await this.messageQueueService.add( + GmailFullMessageListFetchJob.name, { workspaceId, connectedAccountId }, ); } diff --git a/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module.ts b/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module.ts new file mode 100644 index 000000000..6d20cbac4 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module.ts @@ -0,0 +1,14 @@ +import { Module } from '@nestjs/common'; + +import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; +import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; + +@Module({ + imports: [ + ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]), + ], + providers: [SetMessageChannelSyncStatusService], + exports: [SetMessageChannelSyncStatusService], +}) +export class SetMessageChannelSyncStatusModule {} diff --git a/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service.ts new file mode 100644 index 000000000..4c24f4349 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service.ts @@ -0,0 +1,101 @@ +import { Injectable } from '@nestjs/common'; + +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; +import { + MessageChannelWorkspaceEntity, + MessageChannelSyncSubStatus, + MessageChannelSyncStatus, +} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity'; + +@Injectable() +export class SetMessageChannelSyncStatusService { + constructor( + @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) + private readonly messageChannelRepository: MessageChannelRepository, + ) {} + + public async setMessageListFetchOnGoingStatus( + messageChannelId: string, + workspaceId: string, + ) { + await this.messageChannelRepository.updateSyncSubStatus( + messageChannelId, + MessageChannelSyncSubStatus.MESSAGES_LIST_FETCH_ONGOING, + workspaceId, + ); + + await this.messageChannelRepository.updateSyncStatus( + messageChannelId, + MessageChannelSyncStatus.ONGOING, + workspaceId, + ); + } + + public async setFullMessageListFetchPendingStatus( + messageChannelId: string, + workspaceId: string, + ) { + await this.messageChannelRepository.updateSyncSubStatus( + messageChannelId, + MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING, + workspaceId, + ); + } + + public async setCompletedStatus( + messageChannelId: string, + workspaceId: string, + ) { + await this.messageChannelRepository.updateSyncSubStatus( + messageChannelId, + MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING, + workspaceId, + ); + + await this.messageChannelRepository.updateSyncStatus( + messageChannelId, + MessageChannelSyncStatus.COMPLETED, + workspaceId, + ); + } + + public async setMessagesImportPendingStatus( + messageChannelId: string, + workspaceId: string, + ) { + await this.messageChannelRepository.updateSyncSubStatus( + messageChannelId, + MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING, + workspaceId, + ); + } + + public async setMessagesImportOnGoingStatus( + messageChannelId: string, + workspaceId: string, + ) { + await this.messageChannelRepository.updateSyncSubStatus( + messageChannelId, + MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING, + workspaceId, + ); + } + + public async setFailedUnkownStatus( + messageChannelId: string, + workspaceId: string, + ) { + await this.messageChannelRepository.updateSyncSubStatus( + messageChannelId, + MessageChannelSyncSubStatus.FAILED, + workspaceId, + ); + + await this.messageChannelRepository.updateSyncStatus( + messageChannelId, + MessageChannelSyncStatus.FAILED_UNKNOWN, + workspaceId, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.workspace-entity.ts b/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.workspace-entity.ts index 7895e8514..e06b40f2f 100644 --- a/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.workspace-entity.ts +++ b/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.workspace-entity.ts @@ -37,6 +37,7 @@ export enum MessageChannelSyncSubStatus { MESSAGES_LIST_FETCH_ONGOING = 'MESSAGES_LIST_FETCH_ONGOING', MESSAGES_IMPORT_PENDING = 'MESSAGES_IMPORT_PENDING', MESSAGES_IMPORT_ONGOING = 'MESSAGES_IMPORT_ONGOING', + FAILED = 'FAILED', } export enum MessageChannelVisibility { @@ -262,6 +263,12 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity { position: 4, color: 'orange', }, + { + value: MessageChannelSyncSubStatus.FAILED, + label: 'Failed', + position: 5, + color: 'red', + }, ], defaultValue: `'${MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING}'`, })