diff --git a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts index 64ff68d0e..b8b7fce8a 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/fetch-workspace-messages-commands.module.ts @@ -6,12 +6,10 @@ import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; import { GmailFullSyncCommand } from 'src/workspace/messaging/commands/gmail-full-sync.command'; import { GmailPartialSyncCommand } from 'src/workspace/messaging/commands/gmail-partial-sync.command'; -import { MessagingModule } from 'src/workspace/messaging/messaging.module'; import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; @Module({ imports: [ - MessagingModule, DataSourceModule, TypeORMModule, TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), diff --git a/packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts b/packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts index f5ed4b69f..24a5c131c 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/gmail-full-sync.command.ts @@ -1,4 +1,5 @@ import { InjectRepository } from '@nestjs/typeorm'; +import { Inject } from '@nestjs/common'; import { Command, CommandRunner, Option } from 'nest-commander'; import { Repository } from 'typeorm'; @@ -7,8 +8,13 @@ import { FeatureFlagEntity, FeatureFlagKeys, } from 'src/core/feature-flag/feature-flag.entity'; -import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer'; import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { + GmailFullSyncJobData, + GmailFullSyncJob, +} from 'src/workspace/messaging/jobs/gmail-full-sync.job'; interface GmailFullSyncOptions { workspaceId: string; @@ -20,11 +26,11 @@ interface GmailFullSyncOptions { }) export class GmailFullSyncCommand extends CommandRunner { constructor( - private readonly messagingProducer: MessagingProducer, private readonly utils: MessagingUtilsService, - @InjectRepository(FeatureFlagEntity, 'core') private readonly featureFlagRepository: Repository, + @Inject(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, ) { super(); } @@ -58,13 +64,25 @@ export class GmailFullSyncCommand extends CommandRunner { } private async fetchWorkspaceMessages(workspaceId: string): Promise { - const connectedAccounts = - await this.utils.getConnectedAccountsFromWorkspaceId(workspaceId); + const { workspaceDataSource, dataSourceMetadata } = + await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); + + const connectedAccounts = await this.utils.getConnectedAccounts( + dataSourceMetadata, + workspaceDataSource, + ); for (const connectedAccount of connectedAccounts) { - await this.messagingProducer.enqueueGmailFullSync( - { workspaceId, connectedAccountId: connectedAccount.id }, - `${workspaceId}-${connectedAccount.id}`, + await this.messageQueueService.add( + GmailFullSyncJob.name, + { + workspaceId, + connectedAccountId: connectedAccount.id, + }, + { + id: `${workspaceId}-${connectedAccount.id}`, + retryLimit: 2, + }, ); } } diff --git a/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts b/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts index 64de1265c..1119d263b 100644 --- a/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts +++ b/packages/twenty-server/src/workspace/messaging/commands/gmail-partial-sync.command.ts @@ -1,4 +1,5 @@ import { InjectRepository } from '@nestjs/typeorm'; +import { Inject } from '@nestjs/common'; import { Command, CommandRunner, Option } from 'nest-commander'; import { Repository } from 'typeorm'; @@ -7,8 +8,13 @@ import { FeatureFlagEntity, FeatureFlagKeys, } from 'src/core/feature-flag/feature-flag.entity'; -import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer'; import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { + GmailPartialSyncJob, + GmailPartialSyncJobData, +} from 'src/workspace/messaging/jobs/gmail-partial-sync.job'; interface GmailPartialSyncOptions { workspaceId: string; @@ -20,11 +26,11 @@ interface GmailPartialSyncOptions { }) export class GmailPartialSyncCommand extends CommandRunner { constructor( - private readonly messagingProducer: MessagingProducer, private readonly utils: MessagingUtilsService, - @InjectRepository(FeatureFlagEntity, 'core') private readonly featureFlagRepository: Repository, + @Inject(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, ) { super(); } @@ -58,13 +64,25 @@ export class GmailPartialSyncCommand extends CommandRunner { } private async fetchWorkspaceMessages(workspaceId: string): Promise { - const connectedAccounts = - await this.utils.getConnectedAccountsFromWorkspaceId(workspaceId); + const { workspaceDataSource, dataSourceMetadata } = + await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); + + const connectedAccounts = await this.utils.getConnectedAccounts( + dataSourceMetadata, + workspaceDataSource, + ); for (const connectedAccount of connectedAccounts) { - await this.messagingProducer.enqueueGmailPartialSync( - { workspaceId, connectedAccountId: connectedAccount.id }, - `${workspaceId}-${connectedAccount.id}`, + await this.messageQueueService.add( + GmailPartialSyncJob.name, + { + workspaceId, + connectedAccountId: connectedAccount.id, + }, + { + id: `${workspaceId}-${connectedAccount.id}`, + retryLimit: 2, + }, ); } } diff --git a/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts b/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts index 5320f8ed4..4b565a2c1 100644 --- a/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts +++ b/packages/twenty-server/src/workspace/messaging/jobs/gmail-full-sync.job.ts @@ -9,6 +9,7 @@ import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-ful export type GmailFullSyncJobData = { workspaceId: string; connectedAccountId: string; + nextPageToken?: string; }; @Injectable() @@ -21,8 +22,10 @@ export class GmailFullSyncJob implements MessageQueueJob { async handle(data: GmailFullSyncJobData): Promise { console.log( - `fetching messages for workspace ${data.workspaceId} and account ${ + `gmail full-sync for workspace ${data.workspaceId} and account ${ data.connectedAccountId + } ${ + data.nextPageToken ? `and ${data.nextPageToken} pageToken` : '' } with ${this.environmentService.getMessageQueueDriverType()}`, ); await this.gmailRefreshAccessTokenService.refreshAndSaveAccessToken( @@ -33,6 +36,7 @@ export class GmailFullSyncJob implements MessageQueueJob { await this.fetchWorkspaceMessagesService.fetchConnectedAccountThreads( data.workspaceId, data.connectedAccountId, + data.nextPageToken, ); } } diff --git a/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts b/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts index 1c0951f8d..f552c7f1d 100644 --- a/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts +++ b/packages/twenty-server/src/workspace/messaging/jobs/gmail-partial-sync.job.ts @@ -23,7 +23,7 @@ export class GmailPartialSyncJob async handle(data: GmailPartialSyncJobData): Promise { console.log( - `fetching messages for workspace ${data.workspaceId} and account ${ + `gmail partial-sync for workspace ${data.workspaceId} and account ${ data.connectedAccountId } with ${this.environmentService.getMessageQueueDriverType()}`, ); diff --git a/packages/twenty-server/src/workspace/messaging/messaging.module.ts b/packages/twenty-server/src/workspace/messaging/messaging.module.ts deleted file mode 100644 index b001aae35..000000000 --- a/packages/twenty-server/src/workspace/messaging/messaging.module.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { Module } from '@nestjs/common'; - -import { MessagingProducer } from 'src/workspace/messaging/producers/messaging-producer'; - -@Module({ - imports: [], - providers: [MessagingProducer], - exports: [MessagingProducer], -}) -export class MessagingModule {} diff --git a/packages/twenty-server/src/workspace/messaging/producers/messaging-producer.ts b/packages/twenty-server/src/workspace/messaging/producers/messaging-producer.ts deleted file mode 100644 index 79b015c45..000000000 --- a/packages/twenty-server/src/workspace/messaging/producers/messaging-producer.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { Inject, Injectable } from '@nestjs/common'; - -import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; -import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; -import { - GmailFullSyncJob, - GmailFullSyncJobData, -} from 'src/workspace/messaging/jobs/gmail-full-sync.job'; -import { - GmailPartialSyncJob, - GmailPartialSyncJobData, -} from 'src/workspace/messaging/jobs/gmail-partial-sync.job'; - -@Injectable() -export class MessagingProducer { - constructor( - @Inject(MessageQueue.messagingQueue) - private readonly messageQueueService: MessageQueueService, - ) {} - - async enqueueGmailFullSync(data: GmailFullSyncJobData, singletonKey: string) { - await this.messageQueueService.add( - GmailFullSyncJob.name, - data, - { - id: singletonKey, - retryLimit: 2, - }, - ); - } - - async enqueueGmailPartialSync( - data: GmailPartialSyncJobData, - singletonKey: string, - ) { - await this.messageQueueService.add( - GmailPartialSyncJob.name, - data, - { - id: singletonKey, - retryLimit: 2, - }, - ); - } -} diff --git a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts index eb1e1ff8e..7238712fa 100644 --- a/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts +++ b/packages/twenty-server/src/workspace/messaging/services/fetch-workspace-messages.module.ts @@ -3,7 +3,6 @@ import { Module } from '@nestjs/common'; import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; import { EnvironmentModule } from 'src/integrations/environment/environment.module'; import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; -import { MessagingModule } from 'src/workspace/messaging/messaging.module'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service'; @@ -12,12 +11,7 @@ import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; @Module({ - imports: [ - MessagingModule, - TypeORMModule, - DataSourceModule, - EnvironmentModule, - ], + imports: [TypeORMModule, DataSourceModule, EnvironmentModule], providers: [ GmailFullSyncService, GmailPartialSyncService, diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts index 425047865..bfdf62f22 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-full-sync.service.ts @@ -1,8 +1,14 @@ -import { Injectable } from '@nestjs/common'; +import { Inject, Injectable } from '@nestjs/common'; import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service'; import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider'; import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service'; +import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { + GmailFullSyncJobData, + GmailFullSyncJob, +} from 'src/workspace/messaging/jobs/gmail-full-sync.job'; @Injectable() export class GmailFullSyncService { @@ -10,18 +16,23 @@ export class GmailFullSyncService { private readonly gmailClientProvider: GmailClientProvider, private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, private readonly utils: MessagingUtilsService, + @Inject(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, ) {} public async fetchConnectedAccountThreads( workspaceId: string, connectedAccountId: string, - maxResults = 500, + nextPageToken?: string, ): Promise { - const { workspaceDataSource, dataSourceMetadata, connectedAccount } = - await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( - workspaceId, - connectedAccountId, - ); + const { workspaceDataSource, dataSourceMetadata } = + await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); + + const connectedAccount = await this.utils.getConnectedAcountByIdOrFail( + connectedAccountId, + dataSourceMetadata, + workspaceDataSource, + ); const accessToken = connectedAccount.accessToken; const refreshToken = connectedAccount.refreshToken; @@ -48,7 +59,8 @@ export class GmailFullSyncService { const messages = await gmailClient.users.messages.list({ userId: 'me', - maxResults, + maxResults: 500, + pageToken: nextPageToken, }); const messagesData = messages.data.messages; @@ -119,5 +131,20 @@ export class GmailFullSyncService { dataSourceMetadata, workspaceDataSource, ); + + if (messages.data.nextPageToken) { + await this.messageQueueService.add( + GmailFullSyncJob.name, + { + workspaceId, + connectedAccountId, + nextPageToken: messages.data.nextPageToken, + }, + { + id: `${workspaceId}-${connectedAccountId}`, + retryLimit: 2, + }, + ); + } } } diff --git a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts index 0b138de5f..1f1c1f29f 100644 --- a/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/gmail-partial-sync.service.ts @@ -28,11 +28,14 @@ export class GmailPartialSyncService { lastSyncHistoryId: string, maxResults: number, ) { - const { connectedAccount } = - await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( - workspaceId, - connectedAccountId, - ); + const { workspaceDataSource, dataSourceMetadata } = + await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); + + const connectedAccount = await this.utils.getConnectedAcountByIdOrFail( + connectedAccountId, + dataSourceMetadata, + workspaceDataSource, + ); const gmailClient = await this.gmailClientProvider.getGmailClient( connectedAccount.refreshToken, @@ -53,17 +56,19 @@ export class GmailPartialSyncService { connectedAccountId: string, maxResults = 500, ): Promise { - const { workspaceDataSource, dataSourceMetadata, connectedAccount } = - await this.utils.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( - workspaceId, - connectedAccountId, - ); + const { workspaceDataSource, dataSourceMetadata } = + await this.utils.getDataSourceMetadataWorkspaceMetadata(workspaceId); + + const connectedAccount = await this.utils.getConnectedAcountByIdOrFail( + connectedAccountId, + dataSourceMetadata, + workspaceDataSource, + ); const lastSyncHistoryId = connectedAccount.lastSyncHistoryId; if (!lastSyncHistoryId) { // Fall back to full sync - await this.messageQueueService.add( GmailFullSyncJob.name, { workspaceId, connectedAccountId }, diff --git a/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts b/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts index 7c8e2c212..722414a14 100644 --- a/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts +++ b/packages/twenty-server/src/workspace/messaging/services/messaging-utils.service.ts @@ -202,39 +202,39 @@ export class MessagingUtilsService { ); } - public async getConnectedAccountsFromWorkspaceId( - workspaceId: string, + public async getConnectedAccounts( + dataSourceMetadata: DataSourceEntity, + workspaceDataSource: DataSource, ): Promise { - const dataSourceMetadata = - await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( - workspaceId, - ); - - const workspaceDataSource = - await this.typeORMService.connectToDataSource(dataSourceMetadata); - - if (!workspaceDataSource) { - throw new Error('No workspace data source found'); - } - const connectedAccounts = await workspaceDataSource?.query( `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google'`, ); + return connectedAccounts; + } + + public async getConnectedAcountByIdOrFail( + connectedAccountId: string, + dataSourceMetadata: DataSourceEntity, + workspaceDataSource: DataSource, + ): Promise { + const connectedAccounts = await workspaceDataSource?.query( + `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "id" = $1`, + [connectedAccountId], + ); + if (!connectedAccounts || connectedAccounts.length === 0) { throw new Error('No connected account found'); } - return connectedAccounts; + return connectedAccounts[0]; } - public async getDataSourceMetadataWorkspaceMetadataAndConnectedAccount( + public async getDataSourceMetadataWorkspaceMetadata( workspaceId: string, - connectedAccountId: string, ): Promise<{ dataSourceMetadata: DataSourceEntity; workspaceDataSource: DataSource; - connectedAccount: any; }> { const dataSourceMetadata = await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( @@ -248,19 +248,9 @@ export class MessagingUtilsService { throw new Error('No workspace data source found'); } - const connectedAccounts = await workspaceDataSource?.query( - `SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google' AND "id" = $1`, - [connectedAccountId], - ); - - if (!connectedAccounts || connectedAccounts.length === 0) { - throw new Error('No connected account found'); - } - return { dataSourceMetadata, workspaceDataSource, - connectedAccount: connectedAccounts[0], }; }