diff --git a/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/start-data-seed-demo-workspace.cron.command.ts b/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/start-data-seed-demo-workspace.cron.command.ts index 1bcf5c58d..e9a8df6d3 100644 --- a/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/start-data-seed-demo-workspace.cron.command.ts +++ b/packages/twenty-server/src/database/commands/data-seed-demo-workspace/crons/start-data-seed-demo-workspace.cron.command.ts @@ -23,7 +23,11 @@ export class StartDataSeedDemoWorkspaceCronCommand extends CommandRunner { await this.messageQueueService.addCron( DataSeedDemoWorkspaceJob.name, undefined, - dataSeedDemoWorkspaceCronPattern, + { + repeat: { + pattern: dataSeedDemoWorkspaceCronPattern, + }, + }, ); } } diff --git a/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts b/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts index 2c5440d87..082a0d853 100644 --- a/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts +++ b/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts @@ -40,6 +40,11 @@ export const seedFeatureFlags = async ( workspaceId: workspaceId, value: true, }, + { + key: FeatureFlagKeys.IsFullSyncV2Enabled, + workspaceId: workspaceId, + value: true, + }, ]) .execute(); }; diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-schema-storage/workspace-schema-storage.service.ts b/packages/twenty-server/src/engine/api/graphql/workspace-schema-storage/workspace-schema-storage.service.ts index 6733aa8bd..c92f911a2 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-schema-storage/workspace-schema-storage.service.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-schema-storage/workspace-schema-storage.service.ts @@ -1,6 +1,7 @@ -import { Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; +import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { WorkspaceCacheVersionService } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.service'; @@ -8,7 +9,7 @@ import { WorkspaceCacheVersionService } from 'src/engine/metadata-modules/worksp @Injectable() export class WorkspaceSchemaStorageService { constructor( - @Inject(CacheStorageNamespace.WorkspaceSchema) + @InjectCacheStorage(CacheStorageNamespace.WorkspaceSchema) private readonly workspaceSchemaCache: CacheStorageService, private readonly workspaceCacheVersionService: WorkspaceCacheVersionService, diff --git a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts index 93d0eb502..18c61d069 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/services/google-apis.service.ts @@ -22,6 +22,10 @@ import { FeatureFlagEntity, FeatureFlagKeys, } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { + GmailFullSyncV2Job, + GmailFullSyncV2JobData, +} from 'src/modules/messaging/jobs/gmail-full-sync-v2.job'; @Injectable() export class GoogleAPIsService { @@ -75,6 +79,12 @@ export class GoogleAPIsService { value: true, }); + const isFullSyncV2Enabled = await this.featureFlagRepository.findOneBy({ + workspaceId, + key: FeatureFlagKeys.IsFullSyncV2Enabled, + value: true, + }); + await workspaceDataSource?.transaction(async (manager) => { await manager.query( `INSERT INTO ${dataSourceMetadata.schema}."connectedAccount" ("id", "handle", "provider", "accessToken", "refreshToken", "accountOwnerId") VALUES ($1, $2, $3, $4, $5, $6)`, @@ -107,16 +117,26 @@ export class GoogleAPIsService { }); if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) { - await this.messageQueueService.add( - GmailFullSyncJob.name, - { - workspaceId, - connectedAccountId, - }, - { - retryLimit: 2, - }, - ); + if (isFullSyncV2Enabled) { + await this.messageQueueService.add( + GmailFullSyncV2Job.name, + { + workspaceId, + connectedAccountId, + }, + ); + } else { + await this.messageQueueService.add( + GmailFullSyncJob.name, + { + workspaceId, + connectedAccountId, + }, + { + retryLimit: 2, + }, + ); + } } if ( diff --git a/packages/twenty-server/src/engine/core-modules/feature-flag/feature-flag.entity.ts b/packages/twenty-server/src/engine/core-modules/feature-flag/feature-flag.entity.ts index d5504308e..036c340ca 100644 --- a/packages/twenty-server/src/engine/core-modules/feature-flag/feature-flag.entity.ts +++ b/packages/twenty-server/src/engine/core-modules/feature-flag/feature-flag.entity.ts @@ -19,6 +19,7 @@ export enum FeatureFlagKeys { IsEventObjectEnabled = 'IS_EVENT_OBJECT_ENABLED', IsAirtableIntegrationEnabled = 'IS_AIRTABLE_INTEGRATION_ENABLED', IsPostgreSQLIntegrationEnabled = 'IS_POSTGRESQL_INTEGRATION_ENABLED', + IsFullSyncV2Enabled = 'IS_FULL_SYNC_V2_ENABLED', } @Entity({ name: 'featureFlag', schema: 'core' }) diff --git a/packages/twenty-server/src/engine/integrations/cache-storage/__tests__/cache-storage.service.spec.ts b/packages/twenty-server/src/engine/integrations/cache-storage/__tests__/cache-storage.service.spec.ts deleted file mode 100644 index d8ee9c8a5..000000000 --- a/packages/twenty-server/src/engine/integrations/cache-storage/__tests__/cache-storage.service.spec.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { Cache } from '@nestjs/cache-manager'; - -import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; -import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; - -const cacheStorageNamespace = CacheStorageNamespace.Messaging; - -describe('CacheStorageService', () => { - let cacheStorageService: CacheStorageService; - let cacheManagerMock: Partial; - - beforeEach(() => { - cacheManagerMock = { - get: jest.fn(), - set: jest.fn(), - }; - - cacheStorageService = new CacheStorageService( - cacheManagerMock as Cache, - cacheStorageNamespace, - ); - }); - - afterEach(() => { - jest.clearAllMocks(); - }); - - describe('get', () => { - it('should call cacheManager.get with the correct namespaced key', async () => { - const key = 'testKey'; - const namespacedKey = `${cacheStorageNamespace}:${key}`; - - await cacheStorageService.get(key); - - expect(cacheManagerMock.get).toHaveBeenCalledWith(namespacedKey); - }); - - it('should return the value returned by cacheManager.get', async () => { - const key = 'testKey'; - const value = 'testValue'; - - jest.spyOn(cacheManagerMock, 'get').mockResolvedValue(value); - - const result = await cacheStorageService.get(key); - - expect(result).toBe(value); - }); - }); - - describe('set', () => { - it('should call cacheManager.set with the correct namespaced key, value, and optional ttl', async () => { - const key = 'testKey'; - const value = 'testValue'; - const ttl = 60; - const namespacedKey = `${cacheStorageNamespace}:${key}`; - - await cacheStorageService.set(key, value, ttl); - - expect(cacheManagerMock.set).toHaveBeenCalledWith( - namespacedKey, - value, - ttl, - ); - }); - - it('should not throw if cacheManager.set resolves successfully', async () => { - const key = 'testKey'; - const value = 'testValue'; - const ttl = 60; - - jest.spyOn(cacheManagerMock, 'set').mockResolvedValue(undefined); - - await expect( - cacheStorageService.set(key, value, ttl), - ).resolves.not.toThrow(); - }); - }); -}); diff --git a/packages/twenty-server/src/engine/integrations/cache-storage/cache-storage.service.ts b/packages/twenty-server/src/engine/integrations/cache-storage/cache-storage.service.ts index e0b6fba9a..529025012 100644 --- a/packages/twenty-server/src/engine/integrations/cache-storage/cache-storage.service.ts +++ b/packages/twenty-server/src/engine/integrations/cache-storage/cache-storage.service.ts @@ -1,25 +1,67 @@ import { Inject, Injectable } from '@nestjs/common'; import { CACHE_MANAGER, Cache } from '@nestjs/cache-manager'; +import { RedisCache } from 'cache-manager-redis-yet'; + import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; @Injectable() export class CacheStorageService { constructor( @Inject(CACHE_MANAGER) - private readonly cacheManager: Cache, + private readonly cache: Cache, private readonly namespace: CacheStorageNamespace, ) {} async get(key: string): Promise { - return this.cacheManager.get(`${this.namespace}:${key}`); + return this.cache.get(`${this.namespace}:${key}`); } async set(key: string, value: T, ttl?: number) { - return this.cacheManager.set(`${this.namespace}:${key}`, value, ttl); + return this.cache.set(`${this.namespace}:${key}`, value, ttl); } async del(key: string) { - return this.cacheManager.del(`${this.namespace}:${key}`); + return this.cache.del(`${this.namespace}:${key}`); + } + + async setAdd(key: string, value: string[]) { + if (value.length === 0) { + return; + } + if (this.isRedisCache()) { + return (this.cache as RedisCache).store.client.sAdd( + `${this.namespace}:${key}`, + value, + ); + } + this.get(key).then((res: string[]) => { + if (res) { + this.set(key, [...res, ...value]); + } else { + this.set(key, value); + } + }); + } + + async setPop(key: string, size: number = 1) { + if (this.isRedisCache()) { + return (this.cache as RedisCache).store.client.sPop( + `${this.namespace}:${key}`, + size, + ); + } + + return this.get(key).then((res: string[]) => { + if (res) { + this.set(key, res.slice(0, -size)); + } + + return res; + }); + } + + private isRedisCache() { + return (this.cache.store as any)?.name === 'redis'; } } diff --git a/packages/twenty-server/src/engine/integrations/cache-storage/decorators/cache-storage.decorator.ts b/packages/twenty-server/src/engine/integrations/cache-storage/decorators/cache-storage.decorator.ts new file mode 100644 index 000000000..4782059b2 --- /dev/null +++ b/packages/twenty-server/src/engine/integrations/cache-storage/decorators/cache-storage.decorator.ts @@ -0,0 +1,9 @@ +import { Inject } from '@nestjs/common'; + +import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; + +export const InjectCacheStorage = ( + cacheStorageNamespace: CacheStorageNamespace, +) => { + return Inject(cacheStorageNamespace); +}; diff --git a/packages/twenty-server/src/engine/integrations/message-queue/decorators/message-queue.decorator.ts b/packages/twenty-server/src/engine/integrations/message-queue/decorators/message-queue.decorator.ts new file mode 100644 index 000000000..69fa50ad5 --- /dev/null +++ b/packages/twenty-server/src/engine/integrations/message-queue/decorators/message-queue.decorator.ts @@ -0,0 +1,7 @@ +import { Inject } from '@nestjs/common'; + +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; + +export const InjectMessageQueue = (messageQueueName: MessageQueue) => { + return Inject(messageQueueName); +}; diff --git a/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts b/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts index 988f8716d..be282e726 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts @@ -1,6 +1,9 @@ import { Queue, QueueOptions, Worker } from 'bullmq'; -import { QueueJobOptions } from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; +import { + QueueCronJobOptions, + QueueJobOptions, +} from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -53,8 +56,7 @@ export class BullMQDriver implements MessageQueueDriver { queueName: MessageQueue, jobName: string, data: T, - pattern: string, - options?: QueueJobOptions, + options?: QueueCronJobOptions, ): Promise { if (!this.queueMap[queueName]) { throw new Error( @@ -64,9 +66,7 @@ export class BullMQDriver implements MessageQueueDriver { const queueOptions = { jobId: options?.id, priority: options?.priority, - repeat: { - pattern, - }, + repeat: options?.repeat, }; await this.queueMap[queueName].add(jobName, data, queueOptions); diff --git a/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/job-options.interface.ts b/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/job-options.interface.ts index 304c0648a..f475ec2c4 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/job-options.interface.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/job-options.interface.ts @@ -3,3 +3,11 @@ export interface QueueJobOptions { priority?: number; retryLimit?: number; } + +export interface QueueCronJobOptions extends QueueJobOptions { + repeat?: { + every?: number; + pattern?: string; + limit?: number; + }; +} diff --git a/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts b/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts index 517125503..cdd30913f 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts @@ -1,4 +1,7 @@ -import { QueueJobOptions } from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; +import { + QueueCronJobOptions, + QueueJobOptions, +} from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; import { MessageQueueJobData } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -18,8 +21,7 @@ export interface MessageQueueDriver { queueName: MessageQueue, jobName: string, data: T, - pattern: string, - options?: QueueJobOptions, + options?: QueueCronJobOptions, ); removeCron(queueName: MessageQueue, jobName: string, pattern?: string); stop?(): Promise; diff --git a/packages/twenty-server/src/engine/integrations/message-queue/drivers/pg-boss.driver.ts b/packages/twenty-server/src/engine/integrations/message-queue/drivers/pg-boss.driver.ts index 8911f2409..eba836b75 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/drivers/pg-boss.driver.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/drivers/pg-boss.driver.ts @@ -1,6 +1,9 @@ import PgBoss from 'pg-boss'; -import { QueueJobOptions } from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; +import { + QueueCronJobOptions, + QueueJobOptions, +} from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; @@ -8,6 +11,8 @@ import { MessageQueueDriver } from './interfaces/message-queue-driver.interface' export type PgBossDriverOptions = PgBoss.ConstructorOptions; +const DEFAULT_PG_BOSS_CRON_PATTERN_WHEN_NOT_PROVIDED = '*/10 * * * *'; + export class PgBossDriver implements MessageQueueDriver { private pgBoss: PgBoss; @@ -34,16 +39,15 @@ export class PgBossDriver implements MessageQueueDriver { queueName: MessageQueue, jobName: string, data: T, - pattern: string, - options?: QueueJobOptions, + options?: QueueCronJobOptions, ): Promise { await this.pgBoss.schedule( `${queueName}.${jobName}`, - pattern, + options?.repeat?.pattern ?? + DEFAULT_PG_BOSS_CRON_PATTERN_WHEN_NOT_PROVIDED, data as object, options ? { - ...options, singletonKey: options?.id, } : {}, diff --git a/packages/twenty-server/src/engine/integrations/message-queue/drivers/sync.driver.ts b/packages/twenty-server/src/engine/integrations/message-queue/drivers/sync.driver.ts index ec6d070be..d432e1d22 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/drivers/sync.driver.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/drivers/sync.driver.ts @@ -33,9 +33,8 @@ export class SyncDriver implements MessageQueueDriver { _queueName: MessageQueue, jobName: string, data: T, - pattern: string, ): Promise { - this.logger.log(`Running '${pattern}' cron job with SyncDriver`); + this.logger.log(`Running cron job with SyncDriver`); const jobClassName = getJobClassName(jobName); const job: MessageQueueCronJobData = diff --git a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts index ce7ca5e99..2257cf4ad 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/jobs.module.ts @@ -44,9 +44,15 @@ import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repos import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata'; import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; -import { CreateCompanyAndContactJob } from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job'; import { SaveEventToDbJob } from 'src/engine/api/graphql/workspace-query-runner/jobs/save-event-to-db.job'; +import { CreateCompanyAndContactJob } from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job'; import { EventObjectMetadata } from 'src/modules/event/standard-objects/event.object-metadata'; +import { GmailFullSynV2Module } from 'src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.module'; +import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module'; +import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/commands/crons/fetch-all-messages-from-cache.cron-job'; +import { GmailFullSyncV2Job } from 'src/modules/messaging/jobs/gmail-full-sync-v2.job'; +import { GmailPartialSyncV2Job } from 'src/modules/messaging/jobs/gmail-partial-sync-v2.job'; +import { GmailPartialSyncV2Module } from 'src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.module'; @Module({ imports: [ @@ -78,6 +84,9 @@ import { EventObjectMetadata } from 'src/modules/event/standard-objects/event.ob MessageChannelObjectMetadata, EventObjectMetadata, ]), + GmailFullSynV2Module, + GmailFetchMessageContentFromCacheModule, + GmailPartialSyncV2Module, ], providers: [ { @@ -142,6 +151,18 @@ import { EventObjectMetadata } from 'src/modules/event/standard-objects/event.ob provide: SaveEventToDbJob.name, useClass: SaveEventToDbJob, }, + { + provide: FetchAllMessagesFromCacheCronJob.name, + useClass: FetchAllMessagesFromCacheCronJob, + }, + { + provide: GmailFullSyncV2Job.name, + useClass: GmailFullSyncV2Job, + }, + { + provide: GmailPartialSyncV2Job.name, + useClass: GmailPartialSyncV2Job, + }, ], }) export class JobsModule { diff --git a/packages/twenty-server/src/engine/integrations/message-queue/services/message-queue.service.ts b/packages/twenty-server/src/engine/integrations/message-queue/services/message-queue.service.ts index b082ec7ce..87899696f 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/services/message-queue.service.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/services/message-queue.service.ts @@ -1,6 +1,9 @@ import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common'; -import { QueueJobOptions } from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; +import { + QueueCronJobOptions, + QueueJobOptions, +} from 'src/engine/integrations/message-queue/drivers/interfaces/job-options.interface'; import { MessageQueueDriver } from 'src/engine/integrations/message-queue/drivers/interfaces/message-queue-driver.interface'; import { MessageQueueJobData } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; @@ -37,10 +40,9 @@ export class MessageQueueService implements OnModuleDestroy { addCron( jobName: string, data: T, - pattern: string, - options?: QueueJobOptions, + options?: QueueCronJobOptions, ): Promise { - return this.driver.addCron(this.queueName, jobName, data, pattern, options); + return this.driver.addCron(this.queueName, jobName, data, options); } removeCron(jobName: string, pattern: string): Promise { diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/start-clean-inactive-workspaces.cron.command.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/start-clean-inactive-workspaces.cron.command.ts index 6188b7faf..62b7f0fa0 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/start-clean-inactive-workspaces.cron.command.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-cleaner/commands/start-clean-inactive-workspaces.cron.command.ts @@ -23,8 +23,7 @@ export class StartCleanInactiveWorkspacesCronCommand extends CommandRunner { await this.messageQueueService.addCron( CleanInactiveWorkspaceJob.name, undefined, - cleanInactiveWorkspaceCronPattern, - { retryLimit: 3 }, + { retryLimit: 3, repeat: { pattern: cleanInactiveWorkspaceCronPattern } }, ); } } diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/commands/add-standard-id.command.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/commands/add-standard-id.command.ts index 33c60a3fd..17b60e263 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/commands/add-standard-id.command.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/commands/add-standard-id.command.ts @@ -58,6 +58,7 @@ export class AddStandardIdCommand extends CommandRunner { IS_EVENT_OBJECT_ENABLED: true, IS_AIRTABLE_INTEGRATION_ENABLED: true, IS_POSTGRESQL_INTEGRATION_ENABLED: true, + IS_FULL_SYNC_V2_ENABLED: false, }, ); const standardFieldMetadataCollection = this.standardFieldFactory.create( @@ -72,6 +73,7 @@ export class AddStandardIdCommand extends CommandRunner { IS_EVENT_OBJECT_ENABLED: true, IS_AIRTABLE_INTEGRATION_ENABLED: true, IS_POSTGRESQL_INTEGRATION_ENABLED: true, + IS_FULL_SYNC_V2_ENABLED: false, }, ); diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index 5edb4e971..22f04503e 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -169,6 +169,10 @@ export const messageChannelStandardFieldIds = { type: '20202020-ae95-42d9-a3f1-797a2ea22122', isContactAutoCreationEnabled: '20202020-fabd-4f14-b7c6-3310f6d132c6', messageChannelMessageAssociations: '20202020-49b8-4766-88fd-75f1e21b3d5f', + syncExternalId: '20202020-79d1-41cf-b738-bcf5ed61e256', + syncedAt: '20202020-263d-4c6b-ad51-137ada56f7d4', + syncStatus: '20202020-56a1-4f7e-9880-a8493bb899cc', + ongoingSyncStartedAt: '20202020-8c61-4a42-ae63-73c1c3c52e06', }; export const messageParticipantStandardFieldIds = { diff --git a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/services/create-company-and-contact.service.ts b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/services/create-company-and-contact.service.ts index f805c7d7f..cfce5432e 100644 --- a/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/services/create-company-and-contact.service.ts +++ b/packages/twenty-server/src/modules/connected-account/auto-companies-and-contacts-creation/services/create-company-and-contact.service.ts @@ -119,8 +119,9 @@ export class CreateCompanyAndContactService { handle: contact.handle, displayName: contact.displayName, companyId: - contact.companyDomainName && - companiesObject[contact.companyDomainName], + contact.companyDomainName && contact.companyDomainName !== '' + ? companiesObject[contact.companyDomainName] + : undefined, })); await this.createContactService.createContacts( diff --git a/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-messages-from-cache.cron-job.ts b/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-messages-from-cache.cron-job.ts new file mode 100644 index 000000000..492a7e9ab --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-messages-from-cache.cron-job.ts @@ -0,0 +1,84 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository, In } from 'typeorm'; + +import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; + +import { + FeatureFlagEntity, + FeatureFlagKeys, +} from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; +import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; +import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service'; + +@Injectable() +export class FetchAllMessagesFromCacheCronJob + implements MessageQueueJob +{ + private readonly logger = new Logger(FetchAllMessagesFromCacheCronJob.name); + + constructor( + @InjectRepository(Workspace, 'core') + private readonly workspaceRepository: Repository, + @InjectRepository(DataSourceEntity, 'metadata') + private readonly dataSourceRepository: Repository, + @InjectObjectMetadataRepository(MessageChannelObjectMetadata) + private readonly messageChannelRepository: MessageChannelRepository, + @InjectRepository(FeatureFlagEntity, 'core') + private readonly featureFlagRepository: Repository, + private readonly gmailFetchMessageContentFromCacheService: GmailFetchMessageContentFromCacheService, + ) {} + + async handle(): Promise { + const workspaceIds = ( + await this.workspaceRepository.find({ + where: { + subscriptionStatus: 'active', + }, + select: ['id'], + }) + ).map((workspace) => workspace.id); + + const workspacesWithFeatureFlagActive = + await this.featureFlagRepository.find({ + where: { + workspaceId: In(workspaceIds), + key: FeatureFlagKeys.IsFullSyncV2Enabled, + value: true, + }, + }); + + const dataSources = await this.dataSourceRepository.find({ + where: { + workspaceId: In( + workspacesWithFeatureFlagActive.map((w) => w.workspaceId), + ), + }, + }); + + const workspaceIdsWithDataSources = new Set( + dataSources.map((dataSource) => dataSource.workspaceId), + ); + + for (const workspaceId of workspaceIdsWithDataSources) { + await this.fetchWorkspaceMessages(workspaceId); + } + } + + private async fetchWorkspaceMessages(workspaceId: string): Promise { + const messageChannels = + await this.messageChannelRepository.getAll(workspaceId); + + for (const messageChannel of messageChannels) { + await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache( + workspaceId, + messageChannel.connectedAccountId, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.cron.pattern.ts b/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.cron.pattern.ts index 99d9acaa7..83d6b6db0 100644 --- a/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.cron.pattern.ts +++ b/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.cron.pattern.ts @@ -1 +1 @@ -export const fetchAllWorkspacesMessagesCronPattern = '*/10 * * * *'; +export const fetchAllWorkspacesMessagesCronPattern = '*/5 * * * *'; diff --git a/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job.ts b/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job.ts index 804573470..14051519a 100644 --- a/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job.ts @@ -16,6 +16,14 @@ import { import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; +import { + FeatureFlagEntity, + FeatureFlagKeys, +} from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { + GmailPartialSyncV2Job as GmailPartialSyncV2Job, + GmailPartialSyncV2JobData as GmailPartialSyncV2JobData, +} from 'src/modules/messaging/jobs/gmail-partial-sync-v2.job'; @Injectable() export class FetchAllWorkspacesMessagesJob @@ -32,6 +40,8 @@ export class FetchAllWorkspacesMessagesJob private readonly messageQueueService: MessageQueueService, @InjectObjectMetadataRepository(ConnectedAccountObjectMetadata) private readonly connectedAccountRepository: ConnectedAccountRepository, + @InjectRepository(FeatureFlagEntity, 'core') + private readonly featureFlagRepository: Repository, ) {} async handle(): Promise { @@ -61,17 +71,33 @@ export class FetchAllWorkspacesMessagesJob private async fetchWorkspaceMessages(workspaceId: string): Promise { try { + const isFullSyncV2Enabled = await this.featureFlagRepository.findOneBy({ + workspaceId, + key: FeatureFlagKeys.IsFullSyncV2Enabled, + value: true, + }); + const connectedAccounts = await this.connectedAccountRepository.getAll(workspaceId); for (const connectedAccount of connectedAccounts) { - await this.messageQueueService.add( - GmailPartialSyncJob.name, - { - workspaceId, - connectedAccountId: connectedAccount.id, - }, - ); + if (isFullSyncV2Enabled) { + await this.messageQueueService.add( + GmailPartialSyncV2Job.name, + { + workspaceId, + connectedAccountId: connectedAccount.id, + }, + ); + } else { + await this.messageQueueService.add( + GmailPartialSyncJob.name, + { + workspaceId, + connectedAccountId: connectedAccount.id, + }, + ); + } } } catch (error) { this.logger.error( diff --git a/packages/twenty-server/src/modules/messaging/commands/fetch-workspace-messages-commands.module.ts b/packages/twenty-server/src/modules/messaging/commands/fetch-workspace-messages-commands.module.ts index c285a6235..0e5cb7fa8 100644 --- a/packages/twenty-server/src/modules/messaging/commands/fetch-workspace-messages-commands.module.ts +++ b/packages/twenty-server/src/modules/messaging/commands/fetch-workspace-messages-commands.module.ts @@ -4,6 +4,7 @@ import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repos import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; import { GmailFullSyncCommand } from 'src/modules/messaging/commands/gmail-full-sync.command'; import { GmailPartialSyncCommand } from 'src/modules/messaging/commands/gmail-partial-sync.command'; +import { StartFetchAllWorkspacesMessagesFromCacheCronCommand } from 'src/modules/messaging/commands/start-fetch-all-workspaces-messages-from-cache.cron.command'; import { StartFetchAllWorkspacesMessagesCronCommand } from 'src/modules/messaging/commands/start-fetch-all-workspaces-messages.cron.command'; import { StopFetchAllWorkspacesMessagesCronCommand } from 'src/modules/messaging/commands/stop-fetch-all-workspaces-messages.cron.command'; @@ -16,6 +17,7 @@ import { StopFetchAllWorkspacesMessagesCronCommand } from 'src/modules/messaging GmailPartialSyncCommand, StartFetchAllWorkspacesMessagesCronCommand, StopFetchAllWorkspacesMessagesCronCommand, + StartFetchAllWorkspacesMessagesFromCacheCronCommand, ], }) export class FetchWorkspaceMessagesCommandsModule {} diff --git a/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages-from-cache.cron.command.ts b/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages-from-cache.cron.command.ts new file mode 100644 index 000000000..aede8efc6 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages-from-cache.cron.command.ts @@ -0,0 +1,32 @@ +import { Inject } from '@nestjs/common'; + +import { Command, CommandRunner } from 'nest-commander'; + +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/commands/crons/fetch-all-messages-from-cache.cron-job'; + +@Command({ + name: 'fetch-all-workspaces-messages-from-cache:cron:start', + description: 'Starts a cron job to fetch all workspaces messages from cache', +}) +export class StartFetchAllWorkspacesMessagesFromCacheCronCommand extends CommandRunner { + constructor( + @Inject(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron( + FetchAllMessagesFromCacheCronJob.name, + undefined, + { + repeat: { + every: 5000, + }, + }, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages.cron.command.ts b/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages.cron.command.ts index 4da6824a2..695fbce34 100644 --- a/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages.cron.command.ts +++ b/packages/twenty-server/src/modules/messaging/commands/start-fetch-all-workspaces-messages.cron.command.ts @@ -23,7 +23,9 @@ export class StartFetchAllWorkspacesMessagesCronCommand extends CommandRunner { await this.messageQueueService.addCron( FetchAllWorkspacesMessagesJob.name, undefined, - fetchAllWorkspacesMessagesCronPattern, + { + repeat: { pattern: fetchAllWorkspacesMessagesCronPattern }, + }, ); } } diff --git a/packages/twenty-server/src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant.ts b/packages/twenty-server/src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant.ts new file mode 100644 index 000000000..c4c67700d --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant.ts @@ -0,0 +1 @@ +export const GMAIL_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 60; // 1 hour diff --git a/packages/twenty-server/src/modules/messaging/constants/gmail-users-history-max-result.constant.ts b/packages/twenty-server/src/modules/messaging/constants/gmail-users-history-max-result.constant.ts new file mode 100644 index 000000000..d208d7d24 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/constants/gmail-users-history-max-result.constant.ts @@ -0,0 +1 @@ +export const GMAIL_USERS_HISTORY_MAX_RESULT = 500; diff --git a/packages/twenty-server/src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant.ts b/packages/twenty-server/src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant.ts new file mode 100644 index 000000000..a883ab0d4 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant.ts @@ -0,0 +1 @@ +export const GMAIL_USERS_MESSAGES_GET_BATCH_SIZE = 50; diff --git a/packages/twenty-server/src/modules/messaging/constants/gmail-users-messages-list-max-result.constant.ts b/packages/twenty-server/src/modules/messaging/constants/gmail-users-messages-list-max-result.constant.ts new file mode 100644 index 000000000..29b301215 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/constants/gmail-users-messages-list-max-result.constant.ts @@ -0,0 +1 @@ +export const GMAIL_USERS_MESSAGES_LIST_MAX_RESULT = 500; diff --git a/packages/twenty-server/src/modules/messaging/constants/messages-to-delete-from-cache-batch-size.constant.ts b/packages/twenty-server/src/modules/messaging/constants/messages-to-delete-from-cache-batch-size.constant.ts new file mode 100644 index 000000000..1f8a59b88 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/constants/messages-to-delete-from-cache-batch-size.constant.ts @@ -0,0 +1 @@ +export const MESSAGES_TO_DELETE_FROM_CACHE_BATCH_SIZE = 1000; diff --git a/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync-v2.job.ts b/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync-v2.job.ts new file mode 100644 index 000000000..ac90589f5 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync-v2.job.ts @@ -0,0 +1,48 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; + +import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service'; +import { GmailFullSyncV2Service } from 'src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.service'; + +export type GmailFullSyncV2JobData = { + workspaceId: string; + connectedAccountId: string; +}; + +@Injectable() +export class GmailFullSyncV2Job + implements MessageQueueJob +{ + private readonly logger = new Logger(GmailFullSyncV2Job.name); + + constructor( + private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService, + private readonly gmailFullSyncV2Service: GmailFullSyncV2Service, + ) {} + + async handle(data: GmailFullSyncV2JobData): Promise { + this.logger.log( + `gmail full-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`, + ); + + try { + await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken( + data.workspaceId, + data.connectedAccountId, + ); + } catch (e) { + this.logger.error( + `Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`, + e, + ); + + return; + } + + await this.gmailFullSyncV2Service.fetchConnectedAccountThreads( + data.workspaceId, + data.connectedAccountId, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync.job.ts b/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync.job.ts index 7011ded5b..37c63a713 100644 --- a/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync.job.ts +++ b/packages/twenty-server/src/modules/messaging/jobs/gmail-full-sync.job.ts @@ -44,7 +44,6 @@ export class GmailFullSyncJob implements MessageQueueJob { await this.gmailFullSyncService.fetchConnectedAccountThreads( data.workspaceId, data.connectedAccountId, - data.nextPageToken, ); } } diff --git a/packages/twenty-server/src/modules/messaging/jobs/gmail-partial-sync-v2.job.ts b/packages/twenty-server/src/modules/messaging/jobs/gmail-partial-sync-v2.job.ts new file mode 100644 index 000000000..d327e0ebc --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/jobs/gmail-partial-sync-v2.job.ts @@ -0,0 +1,48 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface'; + +import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service'; +import { GmailPartialSyncV2Service } from 'src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.service'; + +export type GmailPartialSyncV2JobData = { + workspaceId: string; + connectedAccountId: string; +}; + +@Injectable() +export class GmailPartialSyncV2Job + implements MessageQueueJob +{ + private readonly logger = new Logger(GmailPartialSyncV2Job.name); + + constructor( + private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService, + private readonly gmailPartialSyncV2Service: GmailPartialSyncV2Service, + ) {} + + async handle(data: GmailPartialSyncV2JobData): Promise { + this.logger.log( + `gmail partial-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`, + ); + + try { + await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken( + data.workspaceId, + data.connectedAccountId, + ); + } catch (e) { + this.logger.error( + `Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`, + e, + ); + + return; + } + + await this.gmailPartialSyncV2Service.fetchConnectedAccountThreads( + data.workspaceId, + data.connectedAccountId, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/repositories/message-channel.repository.ts b/packages/twenty-server/src/modules/messaging/repositories/message-channel.repository.ts index 59c0ddab7..4c7318df5 100644 --- a/packages/twenty-server/src/modules/messaging/repositories/message-channel.repository.ts +++ b/packages/twenty-server/src/modules/messaging/repositories/message-channel.repository.ts @@ -3,7 +3,10 @@ import { Injectable } from '@nestjs/common'; import { EntityManager } from 'typeorm'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; -import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; +import { + MessageChannelObjectMetadata, + MessageChannelSyncStatus, +} from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; @Injectable() @@ -12,6 +15,21 @@ export class MessageChannelRepository { private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} + public async getAll( + workspaceId: string, + transactionManager?: EntityManager, + ): Promise[]> { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + return await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."messageChannel"`, + [], + workspaceId, + transactionManager, + ); + } + public async getByConnectedAccountId( connectedAccountId: string, workspaceId: string, @@ -49,27 +67,17 @@ export class MessageChannelRepository { public async getFirstByConnectedAccountId( connectedAccountId: string, workspaceId: string, + transactionManager?: EntityManager, ): Promise | undefined> { const messageChannels = await this.getByConnectedAccountId( connectedAccountId, workspaceId, + transactionManager, ); return messageChannels[0]; } - public async getIsContactAutoCreationEnabledByConnectedAccountIdOrFail( - connectedAccountId: string, - workspaceId: string, - ): Promise { - const messageChannel = await this.getFirstByConnectedAccountIdOrFail( - connectedAccountId, - workspaceId, - ); - - return messageChannel.isContactAutoCreationEnabled; - } - public async getByIds( ids: string[], workspaceId: string, @@ -85,4 +93,69 @@ export class MessageChannelRepository { transactionManager, ); } + + public async updateSyncStatus( + id: string, + syncStatus: MessageChannelSyncStatus, + workspaceId: string, + transactionManager?: EntityManager, + ): Promise { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + const needsToUpdateSyncedAt = + syncStatus === MessageChannelSyncStatus.SUCCEEDED; + + const needsToUpdateOngoingSyncStartedAt = + syncStatus === MessageChannelSyncStatus.ONGOING; + + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = $1 ${ + needsToUpdateSyncedAt ? `, "syncedAt" = NOW()` : '' + } ${ + needsToUpdateOngoingSyncStartedAt + ? `, "ongoingSyncStartedAt" = NOW()` + : `, "ongoingSyncStartedAt" = NULL` + } WHERE "id" = $2`, + [syncStatus, id], + workspaceId, + transactionManager, + ); + } + + public async updateLastSyncExternalIdIfHigher( + id: string, + syncExternalId: string, + workspaceId: string, + transactionManager?: EntityManager, + ) { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."messageChannel" SET "syncExternalId" = $1 + WHERE "id" = $2 + AND ("syncExternalId" < $1 OR "syncExternalId" = '')`, + [syncExternalId, id], + workspaceId, + transactionManager, + ); + } + + public async resetSyncExternalId( + id: string, + workspaceId: string, + transactionManager?: EntityManager, + ) { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."messageChannel" SET "syncExternalId" = '' + WHERE "id" = $1`, + [id], + workspaceId, + transactionManager, + ); + } } diff --git a/packages/twenty-server/src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.service.ts b/packages/twenty-server/src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.service.ts index 63b2b2e0d..cde2d7dcc 100644 --- a/packages/twenty-server/src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.service.ts +++ b/packages/twenty-server/src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.service.ts @@ -19,7 +19,6 @@ export class FetchMessagesByBatchesService { async fetchAllMessages( queries: MessageQuery[], accessToken: string, - jobName?: string, workspaceId?: string, connectedAccountId?: string, ): Promise<{ messages: GmailMessage[]; errors: any[] }> { @@ -32,7 +31,7 @@ export class FetchMessagesByBatchesService { let endTime = Date.now(); this.logger.log( - `${jobName} for workspace ${workspaceId} and account ${connectedAccountId} fetching ${ + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} fetching ${ queries.length } messages in ${endTime - startTime}ms`, ); @@ -45,7 +44,7 @@ export class FetchMessagesByBatchesService { endTime = Date.now(); this.logger.log( - `${jobName} for workspace ${workspaceId} and account ${connectedAccountId} formatting ${ + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} formatting ${ queries.length } messages in ${endTime - startTime}ms`, ); @@ -62,6 +61,10 @@ export class FetchMessagesByBatchesService { const errors: any = []; + const sanitizeString = (str: string) => { + return str.replace(/\0/g, ''); + }; + const formattedResponse = Promise.all( parsedResponses.map(async (message: GmailMessageParsedResponse) => { if (message.error) { @@ -119,7 +122,7 @@ export class FetchMessagesByBatchesService { fromHandle: from.value[0].address || '', fromDisplayName: from.value[0].name || '', participants, - text: textWithoutReplyQuotations || '', + text: sanitizeString(textWithoutReplyQuotations || ''), attachments, }; diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts new file mode 100644 index 000000000..587619793 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module.ts @@ -0,0 +1,26 @@ +import { Module } from '@nestjs/common'; + +import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; +import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; +import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; +import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module'; +import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service'; +import { MessageModule } from 'src/modules/messaging/services/message/message.module'; +import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module'; +import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; + +@Module({ + imports: [ + FetchMessagesByBatchesModule, + ObjectMetadataRepositoryModule.forFeature([ + ConnectedAccountObjectMetadata, + MessageChannelObjectMetadata, + ]), + SaveMessageAndEmitContactCreationEventModule, + MessageModule, + WorkspaceDataSourceModule, + ], + providers: [GmailFetchMessageContentFromCacheService], + exports: [GmailFetchMessageContentFromCacheService], +}) +export class GmailFetchMessageContentFromCacheModule {} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts new file mode 100644 index 000000000..46e18e85f --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service.ts @@ -0,0 +1,257 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; + +import { EntityManager } from 'typeorm'; + +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; +import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; +import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; +import { FetchMessagesByBatchesService } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.service'; +import { + MessageChannelObjectMetadata, + MessageChannelSyncStatus, +} from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; +import { createQueriesFromMessageIds } from 'src/modules/messaging/utils/create-queries-from-message-ids.util'; +import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; +import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; +import { GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service'; +import { + GmailFullSyncV2JobData, + GmailFullSyncV2Job, +} from 'src/modules/messaging/jobs/gmail-full-sync-v2.job'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { GMAIL_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant'; + +@Injectable() +export class GmailFetchMessageContentFromCacheService { + private readonly logger = new Logger( + GmailFetchMessageContentFromCacheService.name, + ); + + constructor( + private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService, + @InjectObjectMetadataRepository(ConnectedAccountObjectMetadata) + private readonly connectedAccountRepository: ConnectedAccountRepository, + @InjectObjectMetadataRepository(MessageChannelObjectMetadata) + private readonly messageChannelRepository: MessageChannelRepository, + private readonly saveMessageAndEmitContactCreationEventService: SaveMessageAndEmitContactCreationEventService, + @InjectCacheStorage(CacheStorageNamespace.Messaging) + private readonly cacheStorage: CacheStorageService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + @Inject(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + ) {} + + async fetchMessageContentFromCache( + workspaceId: string, + connectedAccountId: string, + ) { + const connectedAccount = await this.connectedAccountRepository.getById( + connectedAccountId, + workspaceId, + ); + + if (!connectedAccount) { + this.logger.error( + `Connected account ${connectedAccountId} not found in workspace ${workspaceId}`, + ); + + return; + } + + const accessToken = connectedAccount.accessToken; + const refreshToken = connectedAccount.refreshToken; + + if (!refreshToken) { + throw new Error( + `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`, + ); + } + + const gmailMessageChannel = + await this.messageChannelRepository.getFirstByConnectedAccountId( + connectedAccountId, + workspaceId, + ); + + if (!gmailMessageChannel) { + this.logger.error( + `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`, + ); + + return; + } + + if (gmailMessageChannel.syncStatus !== MessageChannelSyncStatus.PENDING) { + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is not pending.`, + ); + + if (gmailMessageChannel.syncStatus !== MessageChannelSyncStatus.ONGOING) { + return; + } + + const ongoingSyncStartedAt = new Date( + gmailMessageChannel.ongoingSyncStartedAt, + ); + + if ( + ongoingSyncStartedAt < new Date(Date.now() - GMAIL_ONGOING_SYNC_TIMEOUT) + ) { + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} failed due to ongoing sync timeout. Restarting full-sync...`, + ); + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.FAILED, + workspaceId, + ); + + await this.fallbackToFullSync(workspaceId, connectedAccountId); + + return; + } + + return; + } + + const gmailMessageChannelId = gmailMessageChannel.id; + + const messageIdsToFetch = + (await this.cacheStorage.setPop( + `messages-to-import:${workspaceId}:gmail:${gmailMessageChannelId}`, + GMAIL_USERS_MESSAGES_GET_BATCH_SIZE, + )) ?? []; + + if (!messageIdsToFetch?.length) { + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannelId, + MessageChannelSyncStatus.SUCCEEDED, + workspaceId, + ); + + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import or delete.`, + ); + + return; + } + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannelId, + MessageChannelSyncStatus.ONGOING, + workspaceId, + ); + + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} starting...`, + ); + + const workspaceDataSource = + await this.workspaceDataSourceService.connectToWorkspaceDataSource( + workspaceId, + ); + + await workspaceDataSource + ?.transaction(async (transactionManager: EntityManager) => { + const messageQueries = createQueriesFromMessageIds(messageIdsToFetch); + + const { messages: messagesToSave, errors } = + await this.fetchMessagesByBatchesService.fetchAllMessages( + messageQueries, + accessToken, + workspaceId, + connectedAccountId, + ); + + if (!messagesToSave.length) { + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannelId, + MessageChannelSyncStatus.PENDING, + workspaceId, + ); + + return; + } + + if (errors.length) { + const errorsCanBeIgnored = errors.every( + (error) => error.code === 404, + ); + + if (!errorsCanBeIgnored) { + throw new Error( + `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${JSON.stringify( + errors, + null, + 2, + )}`, + ); + } + } + + await this.saveMessageAndEmitContactCreationEventService.saveMessagesAndEmitContactCreationEventWithinTransaction( + messagesToSave, + connectedAccount, + workspaceId, + gmailMessageChannel, + transactionManager, + ); + + if (messageIdsToFetch.length < GMAIL_USERS_MESSAGES_GET_BATCH_SIZE) { + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannelId, + MessageChannelSyncStatus.SUCCEEDED, + workspaceId, + transactionManager, + ); + + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with no more messages to import.`, + ); + } else { + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannelId, + MessageChannelSyncStatus.PENDING, + workspaceId, + transactionManager, + ); + + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with more messages to import.`, + ); + } + }) + .catch(async (error) => { + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:gmail:${gmailMessageChannelId}`, + messageIdsToFetch, + ); + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannelId, + MessageChannelSyncStatus.FAILED, + workspaceId, + ); + + throw new Error( + `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`, + ); + }); + } + + private async fallbackToFullSync( + workspaceId: string, + connectedAccountId: string, + ) { + await this.messageQueueService.add( + GmailFullSyncV2Job.name, + { workspaceId, connectedAccountId }, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.module.ts new file mode 100644 index 000000000..c0bc6bc73 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.module.ts @@ -0,0 +1,31 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; + +import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; +import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; +import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata'; +import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; +import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module'; +import { GmailFullSyncV2Service } from 'src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.service'; +import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module'; +import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata'; +import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; + +@Module({ + imports: [ + MessagingProvidersModule, + FetchMessagesByBatchesModule, + ObjectMetadataRepositoryModule.forFeature([ + ConnectedAccountObjectMetadata, + MessageChannelObjectMetadata, + MessageChannelMessageAssociationObjectMetadata, + BlocklistObjectMetadata, + ]), + TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), + WorkspaceDataSourceModule, + ], + providers: [GmailFullSyncV2Service], + exports: [GmailFullSyncV2Service], +}) +export class GmailFullSynV2Module {} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.service.ts new file mode 100644 index 000000000..e05fa8162 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-full-sync-v2/gmail-full-sync.v2.service.ts @@ -0,0 +1,302 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { EntityManager, Repository } from 'typeorm'; +import { gmail_v1 } from 'googleapis'; + +import { + FeatureFlagEntity, + FeatureFlagKeys, +} from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; +import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { BlocklistRepository } from 'src/modules/connected-account/repositories/blocklist.repository'; +import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; +import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata'; +import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; +import { GMAIL_USERS_MESSAGES_LIST_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-messages-list-max-result.constant'; +import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository'; +import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; +import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider'; +import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata'; +import { + MessageChannelObjectMetadata, + MessageChannelSyncStatus, +} from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; +import { gmailSearchFilterExcludeEmails } from 'src/modules/messaging/utils/gmail-search-filter.util'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; + +@Injectable() +export class GmailFullSyncV2Service { + private readonly logger = new Logger(GmailFullSyncV2Service.name); + + constructor( + private readonly gmailClientProvider: GmailClientProvider, + @InjectObjectMetadataRepository(ConnectedAccountObjectMetadata) + private readonly connectedAccountRepository: ConnectedAccountRepository, + @InjectObjectMetadataRepository(MessageChannelObjectMetadata) + private readonly messageChannelRepository: MessageChannelRepository, + @InjectObjectMetadataRepository(BlocklistObjectMetadata) + private readonly blocklistRepository: BlocklistRepository, + @InjectRepository(FeatureFlagEntity, 'core') + private readonly featureFlagRepository: Repository, + @InjectCacheStorage(CacheStorageNamespace.Messaging) + private readonly cacheStorage: CacheStorageService, + @InjectObjectMetadataRepository( + MessageChannelMessageAssociationObjectMetadata, + ) + private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + ) {} + + public async fetchConnectedAccountThreads( + workspaceId: string, + connectedAccountId: string, + ) { + const connectedAccount = await this.connectedAccountRepository.getById( + connectedAccountId, + workspaceId, + ); + + if (!connectedAccount) { + this.logger.error( + `Connected account ${connectedAccountId} not found in workspace ${workspaceId}`, + ); + + return; + } + + const refreshToken = connectedAccount.refreshToken; + + if (!refreshToken) { + throw new Error( + `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`, + ); + } + + const gmailMessageChannel = + await this.messageChannelRepository.getFirstByConnectedAccountId( + connectedAccountId, + workspaceId, + ); + + if (!gmailMessageChannel) { + this.logger.error( + `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`, + ); + + return; + } + + if (gmailMessageChannel.syncStatus === MessageChannelSyncStatus.ONGOING) { + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is locked, import will be retried later.`, + ); + + return; + } + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.ONGOING, + workspaceId, + ); + + const workspaceDataSource = + await this.workspaceDataSourceService.connectToWorkspaceDataSource( + workspaceId, + ); + + await workspaceDataSource + ?.transaction(async (transactionManager) => { + const gmailClient: gmail_v1.Gmail = + await this.gmailClientProvider.getGmailClient(refreshToken); + + const blocklistedEmails = await this.fetchBlocklistEmails( + connectedAccount.accountOwnerId, + workspaceId, + ); + + await this.fetchAllMessageIdsFromGmailAndStoreInCache( + gmailClient, + gmailMessageChannel.id, + blocklistedEmails, + workspaceId, + transactionManager, + ); + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.PENDING, + workspaceId, + transactionManager, + ); + }) + .catch(async (error) => { + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.FAILED, + workspaceId, + ); + + throw new Error( + `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`, + ); + }); + } + + public async fetchAllMessageIdsFromGmailAndStoreInCache( + gmailClient: gmail_v1.Gmail, + messageChannelId: string, + blocklistedEmails: string[], + workspaceId: string, + transactionManager?: EntityManager, + ) { + let pageToken: string | undefined; + let hasMoreMessages = true; + let messageIdsToFetch = 0; + let firstMessageExternalId; + + while (hasMoreMessages) { + const response = await gmailClient.users.messages.list({ + userId: 'me', + maxResults: GMAIL_USERS_MESSAGES_LIST_MAX_RESULT, + pageToken, + q: gmailSearchFilterExcludeEmails(blocklistedEmails), + }); + + if (response.data?.messages) { + const messageExternalIds = response.data.messages + .filter((message): message is { id: string } => message.id != null) + .map((message) => message.id); + + if (!firstMessageExternalId) { + firstMessageExternalId = messageExternalIds[0]; + } + + const existingMessageChannelMessageAssociations = + await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId( + messageExternalIds, + messageChannelId, + workspaceId, + transactionManager, + ); + + const existingMessageChannelMessageAssociationsExternalIds = + existingMessageChannelMessageAssociations.map( + (messageChannelMessageAssociation) => + messageChannelMessageAssociation.messageExternalId, + ); + + const messageIdsToImport = messageExternalIds.filter( + (messageExternalId) => + !existingMessageChannelMessageAssociationsExternalIds.includes( + messageExternalId, + ), + ); + + if (messageIdsToImport && messageIdsToImport.length) { + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, + messageIdsToImport, + ); + + messageIdsToFetch += messageIdsToImport.length; + } + } + + pageToken = response.data.nextPageToken ?? undefined; + hasMoreMessages = !!pageToken; + } + + if (!messageIdsToFetch) { + this.logger.log( + `No messages found in Gmail for messageChannel ${messageChannelId} in workspace ${workspaceId}`, + ); + + return; + } + + this.logger.log( + `Fetched all ${messageIdsToFetch} message ids from Gmail for messageChannel ${messageChannelId} in workspace ${workspaceId} and added to cache for import`, + ); + + await this.updateLastSyncExternalId( + gmailClient, + messageChannelId, + firstMessageExternalId, + workspaceId, + transactionManager, + ); + } + + public async fetchBlocklistEmails( + workspaceMemberId: string, + workspaceId: string, + ) { + const isBlocklistEnabledFeatureFlag = + await this.featureFlagRepository.findOneBy({ + workspaceId, + key: FeatureFlagKeys.IsBlocklistEnabled, + value: true, + }); + + const isBlocklistEnabled = + isBlocklistEnabledFeatureFlag && isBlocklistEnabledFeatureFlag.value; + + const blocklist = isBlocklistEnabled + ? await this.blocklistRepository.getByWorkspaceMemberId( + workspaceMemberId, + workspaceId, + ) + : []; + + return blocklist.map((blocklist) => blocklist.handle); + } + + private async updateLastSyncExternalId( + gmailClient: gmail_v1.Gmail, + messageChannelId: string, + firstMessageExternalId: string, + workspaceId: string, + transactionManager?: EntityManager, + ) { + if (!firstMessageExternalId) { + throw new Error( + `No first message found for workspace ${workspaceId} and account ${messageChannelId}, can't update sync external id`, + ); + } + + const firstMessageContent = await gmailClient.users.messages.get({ + userId: 'me', + id: firstMessageExternalId, + }); + + if (!firstMessageContent?.data) { + throw new Error( + `No first message content found for message ${firstMessageExternalId} in workspace ${workspaceId}`, + ); + } + + const historyId = firstMessageContent?.data?.historyId; + + if (!historyId) { + throw new Error( + `No historyId found for message ${firstMessageExternalId} in workspace ${workspaceId}`, + ); + } + + this.logger.log( + `Updating last external id: ${historyId} for workspace ${workspaceId} and account ${messageChannelId} succeeded.`, + ); + + await this.messageChannelRepository.updateLastSyncExternalIdIfHigher( + messageChannelId, + historyId, + workspaceId, + transactionManager, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service.ts index d3ae3ab56..67099bdf4 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service.ts @@ -47,7 +47,7 @@ export class GmailFullSyncService { private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository, @InjectObjectMetadataRepository(BlocklistObjectMetadata) private readonly blocklistRepository: BlocklistRepository, - private readonly saveMessagesAndCreateContactsService: SaveMessageAndEmitContactCreationEventService, + private readonly saveMessagesAndEmitContactCreationEventService: SaveMessageAndEmitContactCreationEventService, @InjectRepository(FeatureFlagEntity, 'core') private readonly featureFlagRepository: Repository, ) {} @@ -186,7 +186,6 @@ export class GmailFullSyncService { await this.fetchMessagesByBatchesService.fetchAllMessages( messageQueries, accessToken, - 'gmail full-sync', workspaceId, connectedAccountId, ); @@ -200,12 +199,11 @@ export class GmailFullSyncService { ); if (messagesToSave.length > 0) { - await this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts( + await this.saveMessagesAndEmitContactCreationEventService.saveMessagesAndEmitContactCreation( messagesToSave, connectedAccount, workspaceId, gmailMessageChannelId, - 'gmail full-sync', ); } else { this.logger.log( diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.module.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.module.ts new file mode 100644 index 000000000..dcd759d80 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.module.ts @@ -0,0 +1,33 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; + +import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module'; +import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; +import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata'; +import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; +import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module'; +import { GmailPartialSyncV2Service } from 'src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.service'; +import { MessageModule } from 'src/modules/messaging/services/message/message.module'; +import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module'; +import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module'; +import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; + +@Module({ + imports: [ + MessagingProvidersModule, + FetchMessagesByBatchesModule, + ObjectMetadataRepositoryModule.forFeature([ + ConnectedAccountObjectMetadata, + MessageChannelObjectMetadata, + BlocklistObjectMetadata, + ]), + MessageModule, + SaveMessageAndEmitContactCreationEventModule, + TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), + WorkspaceDataSourceModule, + ], + providers: [GmailPartialSyncV2Service], + exports: [GmailPartialSyncV2Service], +}) +export class GmailPartialSyncV2Module {} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.service.ts new file mode 100644 index 000000000..f07d80908 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.service.ts @@ -0,0 +1,338 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; + +import { gmail_v1 } from 'googleapis'; +import { EntityManager } from 'typeorm'; + +import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; +import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; +import { + MessageChannelObjectMetadata, + MessageChannelSyncStatus, +} from 'src/modules/messaging/standard-objects/message-channel.object-metadata'; +import { GMAIL_USERS_HISTORY_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-history-max-result.constant'; +import { GmailError } from 'src/modules/messaging/types/gmail-error'; +import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; +import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { + GmailFullSyncV2Job, + GmailFullSyncV2JobData, +} from 'src/modules/messaging/jobs/gmail-full-sync-v2.job'; + +@Injectable() +export class GmailPartialSyncV2Service { + private readonly logger = new Logger(GmailPartialSyncV2Service.name); + + constructor( + private readonly gmailClientProvider: GmailClientProvider, + @Inject(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + @InjectObjectMetadataRepository(ConnectedAccountObjectMetadata) + private readonly connectedAccountRepository: ConnectedAccountRepository, + @InjectObjectMetadataRepository(MessageChannelObjectMetadata) + private readonly messageChannelRepository: MessageChannelRepository, + @InjectCacheStorage(CacheStorageNamespace.Messaging) + private readonly cacheStorage: CacheStorageService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + ) {} + + public async fetchConnectedAccountThreads( + workspaceId: string, + connectedAccountId: string, + ): Promise { + const connectedAccount = await this.connectedAccountRepository.getById( + connectedAccountId, + workspaceId, + ); + + if (!connectedAccount) { + this.logger.error( + `Connected account ${connectedAccountId} not found in workspace ${workspaceId}`, + ); + + return; + } + + const refreshToken = connectedAccount.refreshToken; + + if (!refreshToken) { + throw new Error( + `No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`, + ); + } + + const gmailMessageChannel = + await this.messageChannelRepository.getFirstByConnectedAccountId( + connectedAccountId, + workspaceId, + ); + + if (!gmailMessageChannel) { + this.logger.error( + `No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`, + ); + + return; + } + + if (gmailMessageChannel.syncStatus !== MessageChannelSyncStatus.SUCCEEDED) { + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is locked, import will be retried later.`, + ); + + return; + } + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.ONGOING, + workspaceId, + ); + + const workspaceDataSource = + await this.workspaceDataSourceService.connectToWorkspaceDataSource( + workspaceId, + ); + + await workspaceDataSource + ?.transaction(async (transactionManager: EntityManager) => { + const lastSyncHistoryId = gmailMessageChannel.syncExternalId; + + if (!lastSyncHistoryId) { + this.logger.log( + `No lastSyncHistoryId for workspace ${workspaceId} and account ${connectedAccountId}, falling back to full sync.`, + ); + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.PENDING, + workspaceId, + transactionManager, + ); + + await this.fallbackToFullSync(workspaceId, connectedAccountId); + + return; + } + + const gmailClient: gmail_v1.Gmail = + await this.gmailClientProvider.getGmailClient(refreshToken); + + const { history, historyId, error } = await this.getHistoryFromGmail( + gmailClient, + lastSyncHistoryId, + ); + + if (error?.code === 404) { + this.logger.log( + `404: Invalid lastSyncHistoryId: ${lastSyncHistoryId} for workspace ${workspaceId} and account ${connectedAccountId}, falling back to full sync.`, + ); + + await this.messageChannelRepository.resetSyncExternalId( + gmailMessageChannel.id, + workspaceId, + transactionManager, + ); + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.PENDING, + workspaceId, + transactionManager, + ); + + await this.fallbackToFullSync(workspaceId, connectedAccountId); + + return; + } + + if (error?.code === 429) { + this.logger.log( + `429: rate limit reached for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}, import will be retried later.`, + ); + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.PENDING, + workspaceId, + transactionManager, + ); + + return; + } + + if (error) { + throw new Error( + `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`, + ); + } + + if (!historyId) { + throw new Error( + `No historyId found for ${connectedAccountId} in workspace ${workspaceId} in gmail history response.`, + ); + } + + if (historyId === lastSyncHistoryId || !history?.length) { + this.logger.log( + `Messaging import done with history ${historyId} and nothing to update for workspace ${workspaceId} and account ${connectedAccountId}`, + ); + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.PENDING, + workspaceId, + ); + + return; + } + + const { messagesAdded, messagesDeleted } = + await this.getMessageIdsFromHistory(history); + + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:gmail:${gmailMessageChannel.id}`, + messagesAdded, + ); + + await this.cacheStorage.setAdd( + `messages-to-delete:${workspaceId}:gmail:${gmailMessageChannel.id}`, + messagesDeleted, + ); + + await this.messageChannelRepository.updateLastSyncExternalIdIfHigher( + gmailMessageChannel.id, + historyId, + workspaceId, + transactionManager, + ); + + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.PENDING, + workspaceId, + transactionManager, + ); + }) + .catch(async (error) => { + await this.messageChannelRepository.updateSyncStatus( + gmailMessageChannel.id, + MessageChannelSyncStatus.FAILED, + workspaceId, + ); + + throw new Error( + `Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`, + ); + }); + } + + private async getMessageIdsFromHistory( + history: gmail_v1.Schema$History[], + ): Promise<{ + messagesAdded: string[]; + messagesDeleted: string[]; + }> { + const { messagesAdded, messagesDeleted } = history.reduce( + ( + acc: { + messagesAdded: string[]; + messagesDeleted: string[]; + }, + history, + ) => { + const messagesAdded = history.messagesAdded?.map( + (messageAdded) => messageAdded.message?.id || '', + ); + + const messagesDeleted = history.messagesDeleted?.map( + (messageDeleted) => messageDeleted.message?.id || '', + ); + + if (messagesAdded) acc.messagesAdded.push(...messagesAdded); + if (messagesDeleted) acc.messagesDeleted.push(...messagesDeleted); + + return acc; + }, + { messagesAdded: [], messagesDeleted: [] }, + ); + + const uniqueMessagesAdded = messagesAdded.filter( + (messageId) => !messagesDeleted.includes(messageId), + ); + + const uniqueMessagesDeleted = messagesDeleted.filter( + (messageId) => !messagesAdded.includes(messageId), + ); + + return { + messagesAdded: uniqueMessagesAdded, + messagesDeleted: uniqueMessagesDeleted, + }; + } + + private async getHistoryFromGmail( + gmailClient: gmail_v1.Gmail, + lastSyncHistoryId: string, + ): Promise<{ + history: gmail_v1.Schema$History[]; + historyId?: string | null; + error?: GmailError; + }> { + const fullHistory: gmail_v1.Schema$History[] = []; + let pageToken: string | undefined; + let hasMoreMessages = true; + let nextHistoryId: string | undefined; + + while (hasMoreMessages) { + try { + const response = await gmailClient.users.history.list({ + userId: 'me', + maxResults: GMAIL_USERS_HISTORY_MAX_RESULT, + pageToken, + startHistoryId: lastSyncHistoryId, + historyTypes: ['messageAdded', 'messageDeleted'], + }); + + nextHistoryId = response?.data?.historyId ?? undefined; + + if (response?.data?.history) { + fullHistory.push(...response.data.history); + } + + pageToken = response?.data?.nextPageToken ?? undefined; + hasMoreMessages = !!pageToken; + } catch (error) { + const errorData = error?.response?.data?.error; + + if (errorData) { + return { + history: [], + error: errorData, + historyId: lastSyncHistoryId, + }; + } + + throw error; + } + } + + return { history: fullHistory, historyId: nextHistoryId }; + } + + private async fallbackToFullSync( + workspaceId: string, + connectedAccountId: string, + ) { + await this.messageQueueService.add( + GmailFullSyncV2Job.name, + { workspaceId, connectedAccountId }, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service.ts b/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service.ts index 3bfb4df34..dc5573efe 100644 --- a/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service.ts +++ b/packages/twenty-server/src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service.ts @@ -45,7 +45,7 @@ export class GmailPartialSyncService { private readonly messageService: MessageService, @InjectObjectMetadataRepository(BlocklistObjectMetadata) private readonly blocklistRepository: BlocklistRepository, - private readonly saveMessagesAndCreateContactsService: SaveMessageAndEmitContactCreationEventService, + private readonly saveMessagesAndEmitContactCreationEventService: SaveMessageAndEmitContactCreationEventService, @InjectRepository(FeatureFlagEntity, 'core') private readonly featureFlagRepository: Repository, ) {} @@ -174,7 +174,6 @@ export class GmailPartialSyncService { await this.fetchMessagesByBatchesService.fetchAllMessages( messageQueries, accessToken, - 'gmail partial-sync', workspaceId, connectedAccountId, ); @@ -208,12 +207,11 @@ export class GmailPartialSyncService { ); if (messagesToSave.length !== 0) { - await this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts( + await this.saveMessagesAndEmitContactCreationEventService.saveMessagesAndEmitContactCreation( messagesToSave, connectedAccount, workspaceId, gmailMessageChannelId, - 'gmail partial-sync', ); } diff --git a/packages/twenty-server/src/modules/messaging/services/message/message.service.ts b/packages/twenty-server/src/modules/messaging/services/message/message.service.ts index 9a73b7b7c..f3cc41c9f 100644 --- a/packages/twenty-server/src/modules/messaging/services/message/message.service.ts +++ b/packages/twenty-server/src/modules/messaging/services/message/message.service.ts @@ -6,7 +6,6 @@ import { v4 } from 'uuid'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { MessageObjectMetadata } from 'src/modules/messaging/standard-objects/message.object-metadata'; import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record'; -import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { GmailMessage } from 'src/modules/messaging/types/gmail-message'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; @@ -38,9 +37,67 @@ export class MessageService { private readonly messageThreadService: MessageThreadService, ) {} + public async saveMessagesWithinTransaction( + messages: GmailMessage[], + connectedAccount: ObjectRecord, + gmailMessageChannelId: string, + workspaceId: string, + transactionManager: EntityManager, + ): Promise> { + const messageExternalIdsAndIdsMap = new Map(); + + for (const message of messages) { + const existingMessageChannelMessageAssociationsCount = + await this.messageChannelMessageAssociationRepository.countByMessageExternalIdsAndMessageChannelId( + [message.externalId], + gmailMessageChannelId, + workspaceId, + transactionManager, + ); + + if (existingMessageChannelMessageAssociationsCount > 0) { + continue; + } + + // TODO: This does not handle all thread merging use cases and might create orphan threads. + const savedOrExistingMessageThreadId = + await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread( + message.headerMessageId, + message.messageThreadExternalId, + workspaceId, + transactionManager, + ); + + const savedOrExistingMessageId = + await this.saveMessageOrReturnExistingMessage( + message, + savedOrExistingMessageThreadId, + connectedAccount, + workspaceId, + transactionManager, + ); + + messageExternalIdsAndIdsMap.set( + message.externalId, + savedOrExistingMessageId, + ); + + await this.messageChannelMessageAssociationRepository.insert( + gmailMessageChannelId, + savedOrExistingMessageId, + message.externalId, + savedOrExistingMessageThreadId, + message.messageThreadExternalId, + workspaceId, + transactionManager, + ); + } + + return messageExternalIdsAndIdsMap; + } + public async saveMessages( messages: GmailMessage[], - dataSourceMetadata: DataSourceEntity, workspaceDataSource: DataSource, connectedAccount: ObjectRecord, gmailMessageChannelId: string, @@ -101,7 +158,6 @@ export class MessageService { message, savedOrExistingMessageThreadId, connectedAccount, - dataSourceMetadata, workspaceId, manager, ); @@ -136,7 +192,6 @@ export class MessageService { message: GmailMessage, messageThreadId: string, connectedAccount: ObjectRecord, - dataSourceMetadata: DataSourceEntity, workspaceId: string, manager: EntityManager, ): Promise { diff --git a/packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service.ts b/packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service.ts index 6e998f7b4..0576fce71 100644 --- a/packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service.ts +++ b/packages/twenty-server/src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service.ts @@ -1,6 +1,8 @@ import { Injectable, Logger } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; +import { EntityManager } from 'typeorm'; + import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository'; import { MessageParticipantRepository } from 'src/modules/messaging/repositories/message-participant.repository'; import { @@ -31,14 +33,65 @@ export class SaveMessageAndEmitContactCreationEventService { private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} - async saveMessagesAndCreateContacts( + public async saveMessagesAndEmitContactCreationEventWithinTransaction( + messagesToSave: GmailMessage[], + connectedAccount: ObjectRecord, + workspaceId: string, + gmailMessageChannel: ObjectRecord, + transactionManager: EntityManager, + ) { + const messageExternalIdsAndIdsMap = + await this.messageService.saveMessagesWithinTransaction( + messagesToSave, + connectedAccount, + gmailMessageChannel.id, + workspaceId, + transactionManager, + ); + + const participantsWithMessageId: (ParticipantWithMessageId & { + shouldCreateContact: boolean; + })[] = messagesToSave.flatMap((message) => { + const messageId = messageExternalIdsAndIdsMap.get(message.externalId); + + return messageId + ? message.participants.map((participant) => ({ + ...participant, + messageId, + shouldCreateContact: + gmailMessageChannel.isContactAutoCreationEnabled && + message.participants.find((p) => p.role === 'from')?.handle === + connectedAccount.handle, + })) + : []; + }); + + await this.messageParticipantRepository.saveMessageParticipants( + participantsWithMessageId, + workspaceId, + transactionManager, + ); + + if (gmailMessageChannel.isContactAutoCreationEnabled) { + const contactsToCreate = participantsWithMessageId.filter( + (participant) => participant.shouldCreateContact, + ); + + this.eventEmitter.emit(`createContacts`, { + workspaceId, + connectedAccountHandle: connectedAccount.handle, + contactsToCreate, + }); + } + } + + async saveMessagesAndEmitContactCreation( messagesToSave: GmailMessage[], connectedAccount: ObjectRecord, workspaceId: string, gmailMessageChannelId: string, - jobName?: string, ) { - const { dataSource: workspaceDataSource, dataSourceMetadata } = + const { dataSource: workspaceDataSource } = await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata( workspaceId, ); @@ -47,7 +100,6 @@ export class SaveMessageAndEmitContactCreationEventService { const messageExternalIdsAndIdsMap = await this.messageService.saveMessages( messagesToSave, - dataSourceMetadata, workspaceDataSource, connectedAccount, gmailMessageChannelId, @@ -57,7 +109,7 @@ export class SaveMessageAndEmitContactCreationEventService { let endTime = Date.now(); this.logger.log( - `${jobName} saving messages for workspace ${workspaceId} and account ${ + `Saving messages for workspace ${workspaceId} and account ${ connectedAccount.id } in ${endTime - startTime}ms`, ); @@ -100,13 +152,12 @@ export class SaveMessageAndEmitContactCreationEventService { gmailMessageChannel, workspaceId, connectedAccount, - jobName, ); endTime = Date.now(); this.logger.log( - `${jobName} saving message participants for workspace ${workspaceId} and account in ${ + `Saving message participants for workspace ${workspaceId} and account in ${ connectedAccount.id } ${endTime - startTime}ms`, ); @@ -119,7 +170,6 @@ export class SaveMessageAndEmitContactCreationEventService { gmailMessageChannel: ObjectRecord, workspaceId: string, connectedAccount: ObjectRecord, - jobName?: string, ) { try { await this.messageParticipantRepository.saveMessageParticipants( @@ -140,7 +190,7 @@ export class SaveMessageAndEmitContactCreationEventService { } } catch (error) { this.logger.error( - `${jobName} error saving message participants for workspace ${workspaceId} and account ${connectedAccount.id}`, + `Error saving message participants for workspace ${workspaceId} and account ${connectedAccount.id}`, error, ); diff --git a/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.object-metadata.ts b/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.object-metadata.ts index 2d0ec4072..41d098b3c 100644 --- a/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.object-metadata.ts +++ b/packages/twenty-server/src/modules/messaging/standard-objects/message-channel.object-metadata.ts @@ -1,3 +1,4 @@ +import { FeatureFlagKeys } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; import { RelationMetadataType, @@ -6,6 +7,7 @@ import { import { messageChannelStandardFieldIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; import { standardObjectIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; import { FieldMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/field-metadata.decorator'; +import { Gate } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/gate.decorator'; import { IsNullable } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-nullable.decorator'; import { IsSystem } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-system.decorator'; import { ObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/object-metadata.decorator'; @@ -14,6 +16,13 @@ import { BaseObjectMetadata } from 'src/engine/workspace-manager/workspace-sync- import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata'; import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata'; +export enum MessageChannelSyncStatus { + PENDING = 'PENDING', + ONGOING = 'ONGOING', + SUCCEEDED = 'SUCCEEDED', + FAILED = 'FAILED', +} + @ObjectMetadata({ standardId: standardObjectIds.messageChannel, namePlural: 'messageChannels', @@ -102,4 +111,81 @@ export class MessageChannelObjectMetadata extends BaseObjectMetadata { }) @IsNullable() messageChannelMessageAssociations: MessageChannelMessageAssociationObjectMetadata[]; + + @FieldMetadata({ + standardId: messageChannelStandardFieldIds.syncExternalId, + type: FieldMetadataType.TEXT, + label: 'Last sync external ID', + description: 'Last sync external ID', + icon: 'IconHistory', + }) + @Gate({ + featureFlag: FeatureFlagKeys.IsFullSyncV2Enabled, + }) + syncExternalId: string; + + @FieldMetadata({ + standardId: messageChannelStandardFieldIds.syncedAt, + type: FieldMetadataType.DATE_TIME, + label: 'Last sync date', + description: 'Last sync date', + icon: 'IconHistory', + }) + @Gate({ + featureFlag: FeatureFlagKeys.IsFullSyncV2Enabled, + }) + @IsNullable() + syncedAt: string; + + @FieldMetadata({ + standardId: messageChannelStandardFieldIds.syncStatus, + type: FieldMetadataType.SELECT, + label: 'Last sync status', + description: 'Last sync status', + icon: 'IconHistory', + options: [ + { + value: MessageChannelSyncStatus.PENDING, + label: 'Pending', + position: 0, + color: 'blue', + }, + { + value: MessageChannelSyncStatus.ONGOING, + label: 'Ongoing', + position: 1, + color: 'yellow', + }, + { + value: MessageChannelSyncStatus.SUCCEEDED, + label: 'Succeeded', + position: 2, + color: 'green', + }, + { + value: MessageChannelSyncStatus.FAILED, + label: 'Failed', + position: 3, + color: 'red', + }, + ], + }) + @Gate({ + featureFlag: FeatureFlagKeys.IsFullSyncV2Enabled, + }) + @IsNullable() + syncStatus: MessageChannelSyncStatus; + + @FieldMetadata({ + standardId: messageChannelStandardFieldIds.ongoingSyncStartedAt, + type: FieldMetadataType.DATE_TIME, + label: 'Ongoing sync started at', + description: 'Ongoing sync started at', + icon: 'IconHistory', + }) + @Gate({ + featureFlag: FeatureFlagKeys.IsFullSyncV2Enabled, + }) + @IsNullable() + ongoingSyncStartedAt: string; } diff --git a/packages/twenty-server/src/modules/messaging/types/gmail-error.ts b/packages/twenty-server/src/modules/messaging/types/gmail-error.ts new file mode 100644 index 000000000..ff715e217 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/types/gmail-error.ts @@ -0,0 +1,11 @@ +export type GmailError = { + code: number; + errors: { + domain: string; + reason: string; + message: string; + locationType?: string; + location?: string; + }[]; + message: string; +};