diff --git a/packages/twenty-server/src/command/command.module.ts b/packages/twenty-server/src/command/command.module.ts index 6926124aa..406a30230 100644 --- a/packages/twenty-server/src/command/command.module.ts +++ b/packages/twenty-server/src/command/command.module.ts @@ -1,7 +1,7 @@ import { Module } from '@nestjs/common'; import { DatabaseCommandModule } from 'src/database/commands/database-command.module'; -import { FetchWorkspaceMessagesCommandsModule } from 'src/modules/messaging/commands/fetch-workspace-messages-commands.module'; +import { MessagingCommandModule } from 'src/modules/messaging/commands/messaging-command.module'; import { WorkspaceHealthCommandModule } from 'src/engine/workspace-manager/workspace-health/commands/workspace-health-command.module'; import { WorkspaceCleanerModule } from 'src/engine/workspace-manager/workspace-cleaner/workspace-cleaner.module'; import { WorkspaceCalendarSyncCommandsModule } from 'src/modules/calendar/commands/workspace-calendar-sync-commands.module'; @@ -14,7 +14,7 @@ import { WorkspaceSyncMetadataCommandsModule } from 'src/engine/workspace-manage AppModule, WorkspaceSyncMetadataCommandsModule, DatabaseCommandModule, - FetchWorkspaceMessagesCommandsModule, + MessagingCommandModule, WorkspaceCalendarSyncCommandsModule, WorkspaceCleanerModule, WorkspaceHealthCommandModule, diff --git a/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts b/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts index f3a27c4b2..a8b6a89be 100644 --- a/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts +++ b/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts @@ -40,11 +40,6 @@ export const seedFeatureFlags = async ( workspaceId: workspaceId, value: true, }, - { - key: FeatureFlagKeys.IsFullSyncV2Enabled, - workspaceId: workspaceId, - value: true, - }, { key: FeatureFlagKeys.IsMultiSelectEnabled, workspaceId: workspaceId, diff --git a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts index 78f7d2875..7ce15f328 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts @@ -9,10 +9,6 @@ import { TypeORMService } from 'src/database/typeorm/typeorm.service'; import { SaveOrUpdateConnectedAccountInput } from 'src/engine/core-modules/auth/dto/save-connected-account'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { - GmailFullSyncJob, - GmailFullSyncJobData, -} from 'src/modules/messaging/jobs/gmail-full-sync.job'; import { GoogleCalendarFullSyncJob, GoogleCalendarFullSyncJobData, @@ -187,32 +183,13 @@ export class GoogleAPIsService { workspaceId: string, connectedAccountId: string, ) { - const isFullSyncV2Enabled = await this.featureFlagRepository.findOneBy({ - workspaceId, - key: FeatureFlagKeys.IsFullSyncV2Enabled, - value: true, - }); - - if (isFullSyncV2Enabled) { - await this.messageQueueService.add( - GmailFullSyncV2Job.name, - { - workspaceId, - connectedAccountId, - }, - ); - } else { - await this.messageQueueService.add( - GmailFullSyncJob.name, - { - workspaceId, - connectedAccountId, - }, - { - retryLimit: 2, - }, - ); - } + await this.messageQueueService.add( + GmailFullSyncV2Job.name, + { + workspaceId, + connectedAccountId, + }, + ); } async enqueueGoogleCalendarFullSyncJob( diff --git a/packages/twenty-server/src/engine/core-modules/feature-flag/feature-flag.entity.ts b/packages/twenty-server/src/engine/core-modules/feature-flag/feature-flag.entity.ts index 891a81dce..8da15d8fc 100644 --- a/packages/twenty-server/src/engine/core-modules/feature-flag/feature-flag.entity.ts +++ b/packages/twenty-server/src/engine/core-modules/feature-flag/feature-flag.entity.ts @@ -19,7 +19,6 @@ export enum FeatureFlagKeys { IsEventObjectEnabled = 'IS_EVENT_OBJECT_ENABLED', IsAirtableIntegrationEnabled = 'IS_AIRTABLE_INTEGRATION_ENABLED', IsPostgreSQLIntegrationEnabled = 'IS_POSTGRESQL_INTEGRATION_ENABLED', - IsFullSyncV2Enabled = 'IS_FULL_SYNC_V2_ENABLED', IsMultiSelectEnabled = 'IS_MULTI_SELECT_ENABLED', } diff --git a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts index 821cc082f..e845ecd7e 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts @@ -3,7 +3,6 @@ import { ModuleRef } from '@nestjs/core'; import { HttpModule } from '@nestjs/axios'; import { TypeOrmModule } from '@nestjs/typeorm'; -import { GmailFullSyncJob } from 'src/modules/messaging/jobs/gmail-full-sync.job'; import { CallWebhookJobsJob } from 'src/engine/api/graphql/workspace-query-runner/jobs/call-webhook-jobs.job'; import { CallWebhookJob } from 'src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job'; import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; @@ -11,11 +10,10 @@ import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadat import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-source.module'; import { CleanInactiveWorkspaceJob } from 'src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job'; import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; -import { GmailPartialSyncJob } from 'src/modules/messaging/jobs/gmail-partial-sync.job'; import { EmailSenderJob } from 'src/engine/integrations/email/email-sender.job'; import { UserModule } from 'src/engine/core-modules/user/user.module'; import { EnvironmentModule } from 'src/engine/integrations/environment/environment.module'; -import { FetchAllWorkspacesMessagesJob } from 'src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job'; +import { GmailPartialSyncCronJob } from 'src/modules/messaging/jobs/crons/gmail-partial-sync.cron.job'; import { MatchMessageParticipantJob } from 'src/modules/messaging/jobs/match-message-participant.job'; import { CreateCompaniesAndContactsAfterSyncJob } from 'src/modules/messaging/jobs/create-companies-and-contacts-after-sync.job'; import { AutoCompaniesAndContactsCreationModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/auto-companies-and-contacts-creation.module'; @@ -34,11 +32,9 @@ import { GoogleCalendarFullSyncJob } from 'src/modules/calendar/jobs/google-cale import { CalendarEventCleanerModule } from 'src/modules/calendar/services/calendar-event-cleaner/calendar-event-cleaner.module'; import { RecordPositionBackfillJob } from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job'; import { RecordPositionBackfillModule } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-module'; -import { DeleteConnectedAccountAssociatedCalendarDataJob } from 'src/modules/messaging/jobs/delete-connected-account-associated-calendar-data.job'; +import { DeleteConnectedAccountAssociatedCalendarDataJob } from 'src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job'; import { GoogleCalendarFullSyncModule } from 'src/modules/calendar/services/google-calendar-full-sync.module'; import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module'; -import { GmailFullSyncModule } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module'; -import { GmailPartialSyncModule } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module'; import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module'; import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; @@ -50,7 +46,7 @@ import { EventObjectMetadata } from 'src/modules/event/standard-objects/event.ob import { HandleWorkspaceMemberDeletedJob } from 'src/engine/core-modules/workspace/handle-workspace-member-deleted.job'; import { GmailFullSynV2Module } from 'src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.module'; import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module'; -import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/commands/crons/fetch-all-messages-from-cache.cron-job'; +import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job'; import { GmailFullSyncV2Job } from 'src/modules/messaging/jobs/gmail-full-sync-v2.job'; import { GmailPartialSyncV2Job } from 'src/modules/messaging/jobs/gmail-partial-sync-v2.job'; import { GmailPartialSyncV2Module } from 'src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.module'; @@ -76,8 +72,6 @@ import { GmailPartialSyncV2Module } from 'src/modules/messaging/services/gmail-p WorkspaceDataSourceModule, RecordPositionBackfillModule, GoogleAPIRefreshAccessTokenModule, - GmailFullSyncModule, - GmailPartialSyncModule, MessageParticipantModule, ObjectMetadataRepositoryModule.forFeature([ ConnectedAccountObjectMetadata, @@ -90,14 +84,6 @@ import { GmailPartialSyncV2Module } from 'src/modules/messaging/services/gmail-p GmailPartialSyncV2Module, ], providers: [ - { - provide: GmailFullSyncJob.name, - useClass: GmailFullSyncJob, - }, - { - provide: GmailPartialSyncJob.name, - useClass: GmailPartialSyncJob, - }, { provide: GoogleCalendarFullSyncJob.name, useClass: GoogleCalendarFullSyncJob, @@ -116,8 +102,8 @@ import { GmailPartialSyncV2Module } from 'src/modules/messaging/services/gmail-p }, { provide: EmailSenderJob.name, useClass: EmailSenderJob }, { - provide: FetchAllWorkspacesMessagesJob.name, - useClass: FetchAllWorkspacesMessagesJob, + provide: GmailPartialSyncCronJob.name, + useClass: GmailPartialSyncCronJob, }, { provide: MatchMessageParticipantJob.name, diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/commands/add-standard-id.command.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/commands/add-standard-id.command.ts index 891c1d09b..d1d6d3ef9 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/commands/add-standard-id.command.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/commands/add-standard-id.command.ts @@ -58,7 +58,6 @@ export class AddStandardIdCommand extends CommandRunner { IS_EVENT_OBJECT_ENABLED: true, IS_AIRTABLE_INTEGRATION_ENABLED: true, IS_POSTGRESQL_INTEGRATION_ENABLED: true, - IS_FULL_SYNC_V2_ENABLED: false, IS_MULTI_SELECT_ENABLED: false, }, ); @@ -74,7 +73,6 @@ export class AddStandardIdCommand extends CommandRunner { IS_EVENT_OBJECT_ENABLED: true, IS_AIRTABLE_INTEGRATION_ENABLED: true, IS_POSTGRESQL_INTEGRATION_ENABLED: true, - IS_FULL_SYNC_V2_ENABLED: false, IS_MULTI_SELECT_ENABLED: false, }, ); diff --git a/packages/twenty-server/src/modules/messaging/jobs/delete-connected-account-associated-calendar-data.job.ts b/packages/twenty-server/src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job.ts similarity index 100% rename from packages/twenty-server/src/modules/messaging/jobs/delete-connected-account-associated-calendar-data.job.ts rename to packages/twenty-server/src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job.ts diff --git a/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages-from-cache.cron.command.ts b/packages/twenty-server/src/modules/messaging/commands/crons/gmail-fetch-messages-from-cache.cron.command.ts similarity index 71% rename from packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages-from-cache.cron.command.ts rename to packages/twenty-server/src/modules/messaging/commands/crons/gmail-fetch-messages-from-cache.cron.command.ts index aede8efc6..9f0013f9c 100644 --- a/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages-from-cache.cron.command.ts +++ b/packages/twenty-server/src/modules/messaging/commands/crons/gmail-fetch-messages-from-cache.cron.command.ts @@ -4,13 +4,13 @@ import { Command, CommandRunner } from 'nest-commander'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/commands/crons/fetch-all-messages-from-cache.cron-job'; +import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job'; @Command({ - name: 'fetch-all-workspaces-messages-from-cache:cron:start', - description: 'Starts a cron job to fetch all workspaces messages from cache', + name: 'cron:messaging:gmail-fetch-messages-from-cache', + description: 'Starts a cron job to fetch all messages from cache', }) -export class StartFetchAllWorkspacesMessagesFromCacheCronCommand extends CommandRunner { +export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner { constructor( @Inject(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, diff --git a/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages.cron.command.ts b/packages/twenty-server/src/modules/messaging/commands/crons/gmail-partial-sync.cron.command.ts similarity index 61% rename from packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages.cron.command.ts rename to packages/twenty-server/src/modules/messaging/commands/crons/gmail-partial-sync.cron.command.ts index 695fbce34..df11717a2 100644 --- a/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages.cron.command.ts +++ b/packages/twenty-server/src/modules/messaging/commands/crons/gmail-partial-sync.cron.command.ts @@ -4,14 +4,15 @@ import { Command, CommandRunner } from 'nest-commander'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { fetchAllWorkspacesMessagesCronPattern } from 'src/modules/messaging/commands/crons/fetch-all-workspaces-messages.cron.pattern'; -import { FetchAllWorkspacesMessagesJob } from 'src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job'; +import { fetchAllWorkspacesMessagesCronPattern } from 'src/modules/messaging/commands/crons/patterns/fetch-all-workspaces-messages.cron.pattern'; +import { GmailPartialSyncCronJob } from 'src/modules/messaging/jobs/crons/gmail-partial-sync.cron.job'; @Command({ - name: 'fetch-all-workspaces-messages:cron:start', - description: 'Starts a cron job to fetch all workspaces messages', + name: 'cron:messaging:gmail-partial-sync', + description: + 'Starts a cron job to sync existing connected account messages and store them in the cache', }) -export class StartFetchAllWorkspacesMessagesCronCommand extends CommandRunner { +export class GmailPartialSyncCronCommand extends CommandRunner { constructor( @Inject(MessageQueue.cronQueue) private readonly messageQueueService: MessageQueueService, @@ -21,7 +22,7 @@ export class StartFetchAllWorkspacesMessagesCronCommand extends CommandRunner { async run(): Promise { await this.messageQueueService.addCron( - FetchAllWorkspacesMessagesJob.name, + GmailPartialSyncCronJob.name, undefined, { repeat: { pattern: fetchAllWorkspacesMessagesCronPattern }, diff --git a/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.cron.pattern.ts b/packages/twenty-server/src/modules/messaging/commands/crons/patterns/fetch-all-workspaces-messages.cron.pattern.ts similarity index 100% rename from packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.cron.pattern.ts rename to packages/twenty-server/src/modules/messaging/commands/crons/patterns/fetch-all-workspaces-messages.cron.pattern.ts diff --git a/packages/twenty-server/src/modules/messaging/commands/fetch-workspace-messages-commands.module.ts b/packages/twenty-server/src/modules/messaging/commands/fetch-workspace-messages-commands.module.ts deleted file mode 100644 index 0e5cb7fa8..000000000 --- a/packages/twenty-server/src/modules/messaging/commands/fetch-workspace-messages-commands.module.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { Module } from '@nestjs/common'; - -import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; -import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; -import { GmailFullSyncCommand } from 'src/modules/messaging/commands/gmail-full-sync.command'; -import { GmailPartialSyncCommand } from 'src/modules/messaging/commands/gmail-partial-sync.command'; -import { StartFetchAllWorkspacesMessagesFromCacheCronCommand } from 'src/modules/messaging/commands/start-fetch-all-workspaces-messages-from-cache.cron.command'; -import { StartFetchAllWorkspacesMessagesCronCommand } from 'src/modules/messaging/commands/start-fetch-all-workspaces-messages.cron.command'; -import { StopFetchAllWorkspacesMessagesCronCommand } from 'src/modules/messaging/commands/stop-fetch-all-workspaces-messages.cron.command'; - -@Module({ - imports: [ - ObjectMetadataRepositoryModule.forFeature([ConnectedAccountObjectMetadata]), - ], - providers: [ - GmailFullSyncCommand, - GmailPartialSyncCommand, - StartFetchAllWorkspacesMessagesCronCommand, - StopFetchAllWorkspacesMessagesCronCommand, - StartFetchAllWorkspacesMessagesFromCacheCronCommand, - ], -}) -export class FetchWorkspaceMessagesCommandsModule {} diff --git a/packages/twenty-server/src/modules/messaging/commands/gmail-full-sync.command.ts b/packages/twenty-server/src/modules/messaging/commands/gmail-full-sync.command.ts deleted file mode 100644 index da47d3326..000000000 --- a/packages/twenty-server/src/modules/messaging/commands/gmail-full-sync.command.ts +++ /dev/null @@ -1,68 +0,0 @@ -import { Inject } from '@nestjs/common'; - -import { Command, CommandRunner, Option } from 'nest-commander'; - -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { - GmailFullSyncJobData, - GmailFullSyncJob, -} from 'src/modules/messaging/jobs/gmail-full-sync.job'; -import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; -import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; - -interface GmailFullSyncOptions { - workspaceId: string; -} - -@Command({ - name: 'workspace:gmail-full-sync', - description: 'Fetch messages of all workspaceMembers in a workspace.', -}) -export class GmailFullSyncCommand extends CommandRunner { - constructor( - @Inject(MessageQueue.messagingQueue) - private readonly messageQueueService: MessageQueueService, - @InjectObjectMetadataRepository(ConnectedAccountObjectMetadata) - private readonly connectedAccountRepository: ConnectedAccountRepository, - ) { - super(); - } - - async run( - _passedParam: string[], - options: GmailFullSyncOptions, - ): Promise { - await this.fetchWorkspaceMessages(options.workspaceId); - - return; - } - - @Option({ - flags: '-w, --workspace-id [workspace_id]', - description: 'workspace id', - required: true, - }) - parseWorkspaceId(value: string): string { - return value; - } - - private async fetchWorkspaceMessages(workspaceId: string): Promise { - const connectedAccounts = - await this.connectedAccountRepository.getAll(workspaceId); - - for (const connectedAccount of connectedAccounts) { - await this.messageQueueService.add( - GmailFullSyncJob.name, - { - workspaceId, - connectedAccountId: connectedAccount.id, - }, - { - retryLimit: 2, - }, - ); - } - } -} diff --git a/packages/twenty-server/src/modules/messaging/commands/gmail-partial-sync.command.ts b/packages/twenty-server/src/modules/messaging/commands/gmail-partial-sync.command.ts deleted file mode 100644 index 36c4dd57c..000000000 --- a/packages/twenty-server/src/modules/messaging/commands/gmail-partial-sync.command.ts +++ /dev/null @@ -1,65 +0,0 @@ -import { Inject } from '@nestjs/common'; - -import { Command, CommandRunner, Option } from 'nest-commander'; - -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { - GmailPartialSyncJob, - GmailPartialSyncJobData, -} from 'src/modules/messaging/jobs/gmail-partial-sync.job'; -import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; - -interface GmailPartialSyncOptions { - workspaceId: string; -} - -@Command({ - name: 'workspace:gmail-partial-sync', - description: 'Fetch messages of all workspaceMembers in a workspace.', -}) -export class GmailPartialSyncCommand extends CommandRunner { - constructor( - @Inject(MessageQueue.messagingQueue) - private readonly messageQueueService: MessageQueueService, - @InjectObjectMetadataRepository(ConnectedAccountObjectMetadata) - private readonly connectedAccountService: ConnectedAccountRepository, - ) { - super(); - } - - async run( - _passedParam: string[], - options: GmailPartialSyncOptions, - ): Promise { - await this.fetchWorkspaceMessages(options.workspaceId); - - return; - } - - @Option({ - flags: '-w, --workspace-id [workspace_id]', - description: 'workspace id', - required: true, - }) - parseWorkspaceId(value: string): string { - return value; - } - - private async fetchWorkspaceMessages(workspaceId: string): Promise { - const connectedAccounts = - await this.connectedAccountService.getAll(workspaceId); - - for (const connectedAccount of connectedAccounts) { - await this.messageQueueService.add( - GmailPartialSyncJob.name, - { - workspaceId, - connectedAccountId: connectedAccount.id, - }, - ); - } - } -} diff --git a/packages/twenty-server/src/modules/messaging/commands/messaging-command.module.ts b/packages/twenty-server/src/modules/messaging/commands/messaging-command.module.ts new file mode 100644 index 000000000..0bfa32f1f --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/commands/messaging-command.module.ts @@ -0,0 +1,17 @@ +import { Module } from '@nestjs/common'; + +import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; +import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; +import { GmailFetchMessagesFromCacheCronCommand } from 'src/modules/messaging/commands/crons/gmail-fetch-messages-from-cache.cron.command'; +import { GmailPartialSyncCronCommand } from 'src/modules/messaging/commands/crons/gmail-partial-sync.cron.command'; + +@Module({ + imports: [ + ObjectMetadataRepositoryModule.forFeature([ConnectedAccountObjectMetadata]), + ], + providers: [ + GmailPartialSyncCronCommand, + GmailFetchMessagesFromCacheCronCommand, + ], +}) +export class MessagingCommandModule {} diff --git a/packages/twenty-server/src/modules/messaging/commands/stop-fetch-all-workspaces-messages.cron.command.ts b/packages/twenty-server/src/modules/messaging/commands/stop-fetch-all-workspaces-messages.cron.command.ts deleted file mode 100644 index 3b87c84d3..000000000 --- a/packages/twenty-server/src/modules/messaging/commands/stop-fetch-all-workspaces-messages.cron.command.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { Inject } from '@nestjs/common'; - -import { Command, CommandRunner } from 'nest-commander'; - -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { fetchAllWorkspacesMessagesCronPattern } from 'src/modules/messaging/commands/crons/fetch-all-workspaces-messages.cron.pattern'; -import { FetchAllWorkspacesMessagesJob } from 'src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job'; - -@Command({ - name: 'fetch-all-workspaces-messages:cron:stop', - description: 'Stops the fetch all workspaces messages cron job', -}) -export class StopFetchAllWorkspacesMessagesCronCommand extends CommandRunner { - constructor( - @Inject(MessageQueue.cronQueue) - private readonly messageQueueService: MessageQueueService, - ) { - super(); - } - - async run(): Promise { - await this.messageQueueService.removeCron( - FetchAllWorkspacesMessagesJob.name, - fetchAllWorkspacesMessagesCronPattern, - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-messages-from-cache.cron-job.ts b/packages/twenty-server/src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job.ts similarity index 81% rename from packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-messages-from-cache.cron-job.ts rename to packages/twenty-server/src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job.ts index 492a7e9ab..16ec17e18 100644 --- a/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-messages-from-cache.cron-job.ts +++ b/packages/twenty-server/src/modules/messaging/jobs/crons/fetch-all-messages-from-cache.cron.job.ts @@ -5,10 +5,6 @@ import { Repository, In } from 'typeorm'; import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; -import { - FeatureFlagEntity, - FeatureFlagKeys, -} from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; @@ -29,8 +25,6 @@ export class FetchAllMessagesFromCacheCronJob private readonly dataSourceRepository: Repository, @InjectObjectMetadataRepository(MessageChannelObjectMetadata) private readonly messageChannelRepository: MessageChannelRepository, - @InjectRepository(FeatureFlagEntity, 'core') - private readonly featureFlagRepository: Repository, private readonly gmailFetchMessageContentFromCacheService: GmailFetchMessageContentFromCacheService, ) {} @@ -44,20 +38,9 @@ export class FetchAllMessagesFromCacheCronJob }) ).map((workspace) => workspace.id); - const workspacesWithFeatureFlagActive = - await this.featureFlagRepository.find({ - where: { - workspaceId: In(workspaceIds), - key: FeatureFlagKeys.IsFullSyncV2Enabled, - value: true, - }, - }); - const dataSources = await this.dataSourceRepository.find({ where: { - workspaceId: In( - workspacesWithFeatureFlagActive.map((w) => w.workspaceId), - ), + workspaceId: In(workspaceIds), }, }); diff --git a/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job.ts b/packages/twenty-server/src/modules/messaging/jobs/crons/gmail-partial-sync.cron.job.ts similarity index 66% rename from packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job.ts rename to packages/twenty-server/src/modules/messaging/jobs/crons/gmail-partial-sync.cron.job.ts index 14051519a..905a84d22 100644 --- a/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/jobs/crons/gmail-partial-sync.cron.job.ts @@ -9,27 +9,17 @@ import { MessageQueue } from 'src/engine/integrations/message-queue/message-queu import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; -import { - GmailPartialSyncJobData, - GmailPartialSyncJob, -} from 'src/modules/messaging/jobs/gmail-partial-sync.job'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; -import { - FeatureFlagEntity, - FeatureFlagKeys, -} from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { GmailPartialSyncV2Job as GmailPartialSyncV2Job, GmailPartialSyncV2JobData as GmailPartialSyncV2JobData, } from 'src/modules/messaging/jobs/gmail-partial-sync-v2.job'; @Injectable() -export class FetchAllWorkspacesMessagesJob - implements MessageQueueJob -{ - private readonly logger = new Logger(FetchAllWorkspacesMessagesJob.name); +export class GmailPartialSyncCronJob implements MessageQueueJob { + private readonly logger = new Logger(GmailPartialSyncCronJob.name); constructor( @InjectRepository(Workspace, 'core') @@ -40,8 +30,6 @@ export class FetchAllWorkspacesMessagesJob private readonly messageQueueService: MessageQueueService, @InjectObjectMetadataRepository(ConnectedAccountObjectMetadata) private readonly connectedAccountRepository: ConnectedAccountRepository, - @InjectRepository(FeatureFlagEntity, 'core') - private readonly featureFlagRepository: Repository, ) {} async handle(): Promise { @@ -65,39 +53,23 @@ export class FetchAllWorkspacesMessagesJob ); for (const workspaceId of workspaceIdsWithDataSources) { - await this.fetchWorkspaceMessages(workspaceId); + await this.enqueuePartialSyncs(workspaceId); } } - private async fetchWorkspaceMessages(workspaceId: string): Promise { + private async enqueuePartialSyncs(workspaceId: string): Promise { try { - const isFullSyncV2Enabled = await this.featureFlagRepository.findOneBy({ - workspaceId, - key: FeatureFlagKeys.IsFullSyncV2Enabled, - value: true, - }); - const connectedAccounts = await this.connectedAccountRepository.getAll(workspaceId); for (const connectedAccount of connectedAccounts) { - if (isFullSyncV2Enabled) { - await this.messageQueueService.add( - GmailPartialSyncV2Job.name, - { - workspaceId, - connectedAccountId: connectedAccount.id, - }, - ); - } else { - await this.messageQueueService.add( - GmailPartialSyncJob.name, - { - workspaceId, - connectedAccountId: connectedAccount.id, - }, - ); - } + await this.messageQueueService.add( + GmailPartialSyncV2Job.name, + { + workspaceId, + connectedAccountId: connectedAccount.id, + }, + ); } } catch (error) { this.logger.error( diff --git a/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync.job.ts b/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync.job.ts deleted file mode 100644 index 37c63a713..000000000 --- a/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync.job.ts +++ /dev/null @@ -1,49 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - -import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service'; -import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service'; - -export type GmailFullSyncJobData = { - workspaceId: string; - connectedAccountId: string; - nextPageToken?: string; -}; - -@Injectable() -export class GmailFullSyncJob implements MessageQueueJob { - private readonly logger = new Logger(GmailFullSyncJob.name); - - constructor( - private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService, - private readonly gmailFullSyncService: GmailFullSyncService, - ) {} - - async handle(data: GmailFullSyncJobData): Promise { - this.logger.log( - `gmail full-sync for workspace ${data.workspaceId} and account ${ - data.connectedAccountId - } ${data.nextPageToken ? `and ${data.nextPageToken} pageToken` : ''}`, - ); - - try { - await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken( - data.workspaceId, - data.connectedAccountId, - ); - } catch (e) { - this.logger.error( - `Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`, - e, - ); - - return; - } - - await this.gmailFullSyncService.fetchConnectedAccountThreads( - data.workspaceId, - data.connectedAccountId, - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/jobs/gmail-partial-sync.job.ts b/packages/twenty-server/src/modules/messaging/jobs/gmail-partial-sync.job.ts deleted file mode 100644 index 478bb3334..000000000 --- a/packages/twenty-server/src/modules/messaging/jobs/gmail-partial-sync.job.ts +++ /dev/null @@ -1,48 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; - -import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service'; -import { GmailPartialSyncService } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service'; - -export type GmailPartialSyncJobData = { - workspaceId: string; - connectedAccountId: string; -}; - -@Injectable() -export class GmailPartialSyncJob - implements MessageQueueJob -{ - private readonly logger = new Logger(GmailPartialSyncJob.name); - - constructor( - private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService, - private readonly gmailPartialSyncService: GmailPartialSyncService, - ) {} - - async handle(data: GmailPartialSyncJobData): Promise { - this.logger.log( - `gmail partial-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`, - ); - - try { - await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken( - data.workspaceId, - data.connectedAccountId, - ); - } catch (e) { - this.logger.error( - `Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`, - e, - ); - - return; - } - - await this.gmailPartialSyncService.fetchConnectedAccountThreads( - data.workspaceId, - data.connectedAccountId, - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/listeners/messaging-connected-account.listener.ts b/packages/twenty-server/src/modules/messaging/listeners/messaging-connected-account.listener.ts index 1e363af19..4c1d17d17 100644 --- a/packages/twenty-server/src/modules/messaging/listeners/messaging-connected-account.listener.ts +++ b/packages/twenty-server/src/modules/messaging/listeners/messaging-connected-account.listener.ts @@ -10,7 +10,7 @@ import { MessageQueueService } from 'src/engine/integrations/message-queue/servi import { DeleteConnectedAccountAssociatedCalendarDataJobData, DeleteConnectedAccountAssociatedCalendarDataJob, -} from 'src/modules/messaging/jobs/delete-connected-account-associated-calendar-data.job'; +} from 'src/modules/calendar/jobs/delete-connected-account-associated-calendar-data.job'; import { DeleteConnectedAccountAssociatedMessagingDataJobData, DeleteConnectedAccountAssociatedMessagingDataJob, diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module.ts deleted file mode 100644 index 357d3b5a4..000000000 --- a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { Module } from '@nestjs/common'; -import { TypeOrmModule } from '@nestjs/typeorm'; - -import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; -import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; -import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata'; -import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; -import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module'; -import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service'; -import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module'; -import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module'; -import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata'; -import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; - -@Module({ - imports: [ - MessagingProvidersModule, - FetchMessagesByBatchesModule, - ObjectMetadataRepositoryModule.forFeature([ - ConnectedAccountObjectMetadata, - MessageChannelObjectMetadata, - MessageChannelMessageAssociationObjectMetadata, - BlocklistObjectMetadata, - ]), - SaveMessageAndEmitContactCreationEventModule, - TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), - ], - providers: [GmailFullSyncService], - exports: [GmailFullSyncService], -}) -export class GmailFullSyncModule {} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service.ts deleted file mode 100644 index 67099bdf4..000000000 --- a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service.ts +++ /dev/null @@ -1,269 +0,0 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { InjectRepository } from '@nestjs/typeorm'; - -import { Repository } from 'typeorm'; - -import { FetchMessagesByBatchesService } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.service'; -import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider'; -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { - GmailFullSyncJobData, - GmailFullSyncJob, -} from 'src/modules/messaging/jobs/gmail-full-sync.job'; -import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; -import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; -import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository'; -import { createQueriesFromMessageIds } from 'src/modules/messaging/utils/create-queries-from-message-ids.util'; -import { gmailSearchFilterExcludeEmails } from 'src/modules/messaging/utils/gmail-search-filter.util'; -import { BlocklistRepository } from 'src/modules/connected-account/repositories/blocklist.repository'; -import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service'; -import { - FeatureFlagEntity, - FeatureFlagKeys, -} from 'src/engine/core-modules/feature-flag/feature-flag.entity'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; -import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; -import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata'; -import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata'; - -@Injectable() -export class GmailFullSyncService { - private readonly logger = new Logger(GmailFullSyncService.name); - - constructor( - private readonly gmailClientProvider: GmailClientProvider, - private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, - @Inject(MessageQueue.messagingQueue) - private readonly messageQueueService: MessageQueueService, - @InjectObjectMetadataRepository(ConnectedAccountObjectMetadata) - private readonly connectedAccountRepository: ConnectedAccountRepository, - @InjectObjectMetadataRepository(MessageChannelObjectMetadata) - private readonly messageChannelRepository: MessageChannelRepository, - @InjectObjectMetadataRepository( - MessageChannelMessageAssociationObjectMetadata, - ) - private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository, - @InjectObjectMetadataRepository(BlocklistObjectMetadata) - private readonly blocklistRepository: BlocklistRepository, - private readonly saveMessagesAndEmitContactCreationEventService: SaveMessageAndEmitContactCreationEventService, - @InjectRepository(FeatureFlagEntity, 'core') - private readonly featureFlagRepository: Repository, - ) {} - - public async fetchConnectedAccountThreads( - workspaceId: string, - connectedAccountId: string, - nextPageToken?: string, - ): Promise { - const connectedAccount = await this.connectedAccountRepository.getById( - connectedAccountId, - workspaceId, - ); - - if (!connectedAccount) { - this.logger.error( - `Connected account ${connectedAccountId} not found in workspace ${workspaceId} during full-sync`, - ); - - return; - } - - const accessToken = connectedAccount.accessToken; - const refreshToken = connectedAccount.refreshToken; - const workspaceMemberId = connectedAccount.accountOwnerId; - - if (!refreshToken) { - throw new Error( - `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId} during full-sync`, - ); - } - - const gmailMessageChannel = - await this.messageChannelRepository.getFirstByConnectedAccountId( - connectedAccountId, - workspaceId, - ); - - if (!gmailMessageChannel) { - this.logger.error( - `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId} during full-syn`, - ); - - return; - } - - const gmailMessageChannelId = gmailMessageChannel.id; - - const gmailClient = - await this.gmailClientProvider.getGmailClient(refreshToken); - - const isBlocklistEnabledFeatureFlag = - await this.featureFlagRepository.findOneBy({ - workspaceId, - key: FeatureFlagKeys.IsBlocklistEnabled, - value: true, - }); - - const isBlocklistEnabled = - isBlocklistEnabledFeatureFlag && isBlocklistEnabledFeatureFlag.value; - - const blocklist = isBlocklistEnabled - ? await this.blocklistRepository.getByWorkspaceMemberId( - workspaceMemberId, - workspaceId, - ) - : []; - - const blocklistedEmails = blocklist.map((blocklist) => blocklist.handle); - let startTime = Date.now(); - - const messages = await gmailClient.users.messages.list({ - userId: 'me', - maxResults: 500, - pageToken: nextPageToken, - q: gmailSearchFilterExcludeEmails(blocklistedEmails), - }); - - let endTime = Date.now(); - - this.logger.log( - `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} getting messages list in ${ - endTime - startTime - }ms.`, - ); - - const messagesData = messages.data.messages; - - const messageExternalIds = messagesData - ? messagesData.map((message) => message.id || '') - : []; - - if (!messageExternalIds || messageExternalIds?.length === 0) { - this.logger.log( - `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`, - ); - - return; - } - - startTime = Date.now(); - - const existingMessageChannelMessageAssociations = - await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId( - messageExternalIds, - gmailMessageChannelId, - workspaceId, - ); - - endTime = Date.now(); - - this.logger.log( - `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: getting existing message channel message associations in ${ - endTime - startTime - }ms.`, - ); - - const existingMessageChannelMessageAssociationsExternalIds = - existingMessageChannelMessageAssociations.map( - (messageChannelMessageAssociation) => - messageChannelMessageAssociation.messageExternalId, - ); - - const messagesToFetch = messageExternalIds.filter( - (messageExternalId) => - !existingMessageChannelMessageAssociationsExternalIds.includes( - messageExternalId, - ), - ); - - const messageQueries = createQueriesFromMessageIds(messagesToFetch); - - startTime = Date.now(); - - const { messages: messagesToSave, errors } = - await this.fetchMessagesByBatchesService.fetchAllMessages( - messageQueries, - accessToken, - workspaceId, - connectedAccountId, - ); - - endTime = Date.now(); - - this.logger.log( - `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: fetching all messages in ${ - endTime - startTime - }ms.`, - ); - - if (messagesToSave.length > 0) { - await this.saveMessagesAndEmitContactCreationEventService.saveMessagesAndEmitContactCreation( - messagesToSave, - connectedAccount, - workspaceId, - gmailMessageChannelId, - ); - } else { - this.logger.log( - `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`, - ); - - return; - } - - if (errors.length) { - throw new Error( - `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during full-sync`, - ); - } - const lastModifiedMessageId = messagesToFetch[0]; - - const historyId = messagesToSave.find( - (message) => message.externalId === lastModifiedMessageId, - )?.historyId; - - if (!historyId) { - throw new Error( - `No historyId found for ${connectedAccountId} in workspace ${workspaceId} during full-sync`, - ); - } - - startTime = Date.now(); - - await this.connectedAccountRepository.updateLastSyncHistoryIdIfHigher( - historyId, - connectedAccount.id, - workspaceId, - ); - - endTime = Date.now(); - - this.logger.log( - `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating last sync history id in ${ - endTime - startTime - }ms.`, - ); - - this.logger.log( - `gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} ${ - nextPageToken ? `and ${nextPageToken} pageToken` : '' - }done.`, - ); - - if (messages.data.nextPageToken) { - await this.messageQueueService.add( - GmailFullSyncJob.name, - { - workspaceId, - connectedAccountId, - nextPageToken: messages.data.nextPageToken, - }, - { - retryLimit: 2, - }, - ); - } - } -} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module.ts deleted file mode 100644 index 52242798f..000000000 --- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { Module } from '@nestjs/common'; -import { TypeOrmModule } from '@nestjs/typeorm'; - -import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; -import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; -import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata'; -import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; -import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module'; -import { GmailPartialSyncService } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service'; -import { MessageModule } from 'src/modules/messaging/services/message/message.module'; -import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module'; -import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module'; -import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; - -@Module({ - imports: [ - MessagingProvidersModule, - FetchMessagesByBatchesModule, - ObjectMetadataRepositoryModule.forFeature([ - ConnectedAccountObjectMetadata, - MessageChannelObjectMetadata, - BlocklistObjectMetadata, - ]), - MessageModule, - SaveMessageAndEmitContactCreationEventModule, - TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), - ], - providers: [GmailPartialSyncService], - exports: [GmailPartialSyncService], -}) -export class GmailPartialSyncModule {} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service.ts deleted file mode 100644 index dc5573efe..000000000 --- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service.ts +++ /dev/null @@ -1,432 +0,0 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; -import { InjectRepository } from '@nestjs/typeorm'; - -import { gmail_v1 } from 'googleapis'; -import { Repository } from 'typeorm'; - -import { FetchMessagesByBatchesService } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.service'; -import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { - GmailFullSyncJob, - GmailFullSyncJobData, -} from 'src/modules/messaging/jobs/gmail-full-sync.job'; -import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; -import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; -import { createQueriesFromMessageIds } from 'src/modules/messaging/utils/create-queries-from-message-ids.util'; -import { GmailMessage } from 'src/modules/messaging/types/gmail-message'; -import { isPersonEmail } from 'src/modules/messaging/utils/is-person-email.util'; -import { BlocklistRepository } from 'src/modules/connected-account/repositories/blocklist.repository'; -import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service'; -import { - FeatureFlagEntity, - FeatureFlagKeys, -} from 'src/engine/core-modules/feature-flag/feature-flag.entity'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; -import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; -import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata'; -import { MessageService } from 'src/modules/messaging/services/message/message.service'; - -@Injectable() -export class GmailPartialSyncService { - private readonly logger = new Logger(GmailPartialSyncService.name); - - constructor( - private readonly gmailClientProvider: GmailClientProvider, - private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, - @Inject(MessageQueue.messagingQueue) - private readonly messageQueueService: MessageQueueService, - @InjectObjectMetadataRepository(ConnectedAccountObjectMetadata) - private readonly connectedAccountRepository: ConnectedAccountRepository, - @InjectObjectMetadataRepository(MessageChannelObjectMetadata) - private readonly messageChannelRepository: MessageChannelRepository, - private readonly messageService: MessageService, - @InjectObjectMetadataRepository(BlocklistObjectMetadata) - private readonly blocklistRepository: BlocklistRepository, - private readonly saveMessagesAndEmitContactCreationEventService: SaveMessageAndEmitContactCreationEventService, - @InjectRepository(FeatureFlagEntity, 'core') - private readonly featureFlagRepository: Repository, - ) {} - - public async fetchConnectedAccountThreads( - workspaceId: string, - connectedAccountId: string, - maxResults = 500, - ): Promise { - const connectedAccount = await this.connectedAccountRepository.getById( - connectedAccountId, - workspaceId, - ); - - if (!connectedAccount) { - this.logger.error( - `Connected account ${connectedAccountId} not found in workspace ${workspaceId} during partial-sync`, - ); - - return; - } - - const lastSyncHistoryId = connectedAccount.lastSyncHistoryId; - - if (!lastSyncHistoryId) { - this.logger.log( - `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: no lastSyncHistoryId, falling back to full sync.`, - ); - - await this.fallbackToFullSync(workspaceId, connectedAccountId); - - return; - } - - const accessToken = connectedAccount.accessToken; - const refreshToken = connectedAccount.refreshToken; - - if (!refreshToken) { - throw new Error( - `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId} during partial-sync`, - ); - } - - let startTime = Date.now(); - - const { history, historyId, error } = await this.getHistoryFromGmail( - refreshToken, - lastSyncHistoryId, - maxResults, - ); - - let endTime = Date.now(); - - this.logger.log( - `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} getting history in ${ - endTime - startTime - }ms.`, - ); - - if (error && error.code === 404) { - this.logger.log( - `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: invalid lastSyncHistoryId, falling back to full sync.`, - ); - - await this.connectedAccountRepository.deleteHistoryId( - connectedAccountId, - workspaceId, - ); - - await this.fallbackToFullSync(workspaceId, connectedAccountId); - - return; - } - - if (error && error.code === 429) { - this.logger.log( - `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: Error 429: ${error.message}, partial sync will be retried later.`, - ); - - return; - } - - if (error) { - throw new Error( - `Error getting history for ${connectedAccountId} in workspace ${workspaceId} during partial-sync: - ${JSON.stringify(error)}`, - ); - } - - if (!historyId) { - throw new Error( - `No historyId found for ${connectedAccountId} in workspace ${workspaceId} during partial-sync`, - ); - } - - if (historyId === lastSyncHistoryId || !history?.length) { - this.logger.log( - `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to update.`, - ); - - return; - } - - const gmailMessageChannel = - await this.messageChannelRepository.getFirstByConnectedAccountId( - connectedAccountId, - workspaceId, - ); - - if (!gmailMessageChannel) { - this.logger.error( - `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId} during partial-sync`, - ); - - return; - } - - const gmailMessageChannelId = gmailMessageChannel.id; - - const { messagesAdded, messagesDeleted } = - await this.getMessageIdsFromHistory(history); - - const messageQueries = createQueriesFromMessageIds(messagesAdded); - - const { messages, errors } = - await this.fetchMessagesByBatchesService.fetchAllMessages( - messageQueries, - accessToken, - workspaceId, - connectedAccountId, - ); - - const isBlocklistEnabledFeatureFlag = - await this.featureFlagRepository.findOneBy({ - workspaceId, - key: FeatureFlagKeys.IsBlocklistEnabled, - value: true, - }); - - const isBlocklistEnabled = - isBlocklistEnabledFeatureFlag && isBlocklistEnabledFeatureFlag.value; - - const blocklist = isBlocklistEnabled - ? await this.blocklistRepository.getByWorkspaceMemberId( - connectedAccount.accountOwnerId, - workspaceId, - ) - : []; - - const blocklistedEmails = blocklist.map((blocklist) => blocklist.handle); - - const messagesToSave = messages.filter( - (message) => - !this.shouldSkipImport( - connectedAccount.handle, - message, - blocklistedEmails, - ), - ); - - if (messagesToSave.length !== 0) { - await this.saveMessagesAndEmitContactCreationEventService.saveMessagesAndEmitContactCreation( - messagesToSave, - connectedAccount, - workspaceId, - gmailMessageChannelId, - ); - } - - if (messagesDeleted.length !== 0) { - startTime = Date.now(); - - await this.messageService.deleteMessages( - messagesDeleted, - gmailMessageChannelId, - workspaceId, - ); - - endTime = Date.now(); - - this.logger.log( - `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: deleting messages in ${ - endTime - startTime - }ms.`, - ); - } - - if (errors.length) { - this.logger.error( - `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during partial-sync: ${JSON.stringify( - errors, - null, - 2, - )}`, - ); - const errorsCanBeIgnored = errors.every((error) => error.code === 404); - const errorsShouldBeRetried = errors.some((error) => error.code === 429); - - if (errorsShouldBeRetried) { - return; - } - - if (!errorsCanBeIgnored) { - throw new Error( - `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during partial-sync`, - ); - } - } - startTime = Date.now(); - - await this.connectedAccountRepository.updateLastSyncHistoryId( - historyId, - connectedAccount.id, - workspaceId, - ); - - endTime = Date.now(); - - this.logger.log( - `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} updating lastSyncHistoryId in ${ - endTime - startTime - }ms.`, - ); - - this.logger.log( - `gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done.`, - ); - } - - private async getMessageIdsFromHistory( - history: gmail_v1.Schema$History[], - ): Promise<{ - messagesAdded: string[]; - messagesDeleted: string[]; - }> { - const { messagesAdded, messagesDeleted } = history.reduce( - ( - acc: { - messagesAdded: string[]; - messagesDeleted: string[]; - }, - history, - ) => { - const messagesAdded = history.messagesAdded?.map( - (messageAdded) => messageAdded.message?.id || '', - ); - - const messagesDeleted = history.messagesDeleted?.map( - (messageDeleted) => messageDeleted.message?.id || '', - ); - - if (messagesAdded) acc.messagesAdded.push(...messagesAdded); - if (messagesDeleted) acc.messagesDeleted.push(...messagesDeleted); - - return acc; - }, - { messagesAdded: [], messagesDeleted: [] }, - ); - - const uniqueMessagesAdded = messagesAdded.filter( - (messageId) => !messagesDeleted.includes(messageId), - ); - - const uniqueMessagesDeleted = messagesDeleted.filter( - (messageId) => !messagesAdded.includes(messageId), - ); - - return { - messagesAdded: uniqueMessagesAdded, - messagesDeleted: uniqueMessagesDeleted, - }; - } - - private async getHistoryFromGmail( - refreshToken: string, - lastSyncHistoryId: string, - maxResults: number, - ): Promise<{ - history: gmail_v1.Schema$History[]; - historyId?: string | null; - error?: { - code: number; - errors: { - domain: string; - reason: string; - message: string; - locationType?: string; - location?: string; - }[]; - message: string; - }; - }> { - const gmailClient = - await this.gmailClientProvider.getGmailClient(refreshToken); - - const fullHistory: gmail_v1.Schema$History[] = []; - - try { - const history = await gmailClient.users.history.list({ - userId: 'me', - startHistoryId: lastSyncHistoryId, - historyTypes: ['messageAdded', 'messageDeleted'], - maxResults, - }); - - let nextPageToken = history?.data?.nextPageToken; - - const historyId = history?.data?.historyId; - - if (history?.data?.history) { - fullHistory.push(...history.data.history); - } - - while (nextPageToken) { - const nextHistory = await gmailClient.users.history.list({ - userId: 'me', - startHistoryId: lastSyncHistoryId, - historyTypes: ['messageAdded', 'messageDeleted'], - maxResults, - pageToken: nextPageToken, - }); - - nextPageToken = nextHistory?.data?.nextPageToken; - - if (nextHistory?.data?.history) { - fullHistory.push(...nextHistory.data.history); - } - } - - return { history: fullHistory, historyId }; - } catch (error) { - const errorData = error?.response?.data?.error; - - if (errorData) { - return { history: [], error: errorData }; - } - - throw error; - } - } - - private async fallbackToFullSync( - workspaceId: string, - connectedAccountId: string, - ) { - await this.messageQueueService.add( - GmailFullSyncJob.name, - { workspaceId, connectedAccountId }, - { - retryLimit: 2, - }, - ); - } - - private isHandleBlocked = ( - selfHandle: string, - message: GmailMessage, - blocklistedEmails: string[], - ): boolean => { - // If the message is received, check if the sender is in the blocklist - // If the message is sent, check if any of the recipients with role 'to' is in the blocklist - - if (message.fromHandle === selfHandle) { - return message.participants.some( - (participant) => - participant.role === 'to' && - blocklistedEmails.includes(participant.handle), - ); - } - - return blocklistedEmails.includes(message.fromHandle); - }; - - private shouldSkipImport( - selfHandle: string, - message: GmailMessage, - blocklistedEmails: string[], - ): boolean { - return ( - !isPersonEmail(message.fromHandle) || - this.isHandleBlocked(selfHandle, message, blocklistedEmails) - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.object-metadata.ts b/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.object-metadata.ts index 965d9acdb..c3c1a3f83 100644 --- a/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.object-metadata.ts +++ b/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.object-metadata.ts @@ -1,4 +1,3 @@ -import { FeatureFlagKeys } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; import { RelationMetadataType, @@ -7,7 +6,6 @@ import { import { messageChannelStandardFieldIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; import { standardObjectIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; import { FieldMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/field-metadata.decorator'; -import { Gate } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/gate.decorator'; import { IsNullable } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-nullable.decorator'; import { IsSystem } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-system.decorator'; import { ObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/object-metadata.decorator'; @@ -119,9 +117,6 @@ export class MessageChannelObjectMetadata extends BaseObjectMetadata { description: 'Last sync cursor', icon: 'IconHistory', }) - @Gate({ - featureFlag: FeatureFlagKeys.IsFullSyncV2Enabled, - }) syncCursor: string; @FieldMetadata({ @@ -131,9 +126,6 @@ export class MessageChannelObjectMetadata extends BaseObjectMetadata { description: 'Last sync date', icon: 'IconHistory', }) - @Gate({ - featureFlag: FeatureFlagKeys.IsFullSyncV2Enabled, - }) @IsNullable() syncedAt: string; @@ -170,9 +162,6 @@ export class MessageChannelObjectMetadata extends BaseObjectMetadata { }, ], }) - @Gate({ - featureFlag: FeatureFlagKeys.IsFullSyncV2Enabled, - }) @IsNullable() syncStatus: MessageChannelSyncStatus; @@ -183,9 +172,6 @@ export class MessageChannelObjectMetadata extends BaseObjectMetadata { description: 'Ongoing sync started at', icon: 'IconHistory', }) - @Gate({ - featureFlag: FeatureFlagKeys.IsFullSyncV2Enabled, - }) @IsNullable() ongoingSyncStartedAt: string; }