5507 modify the partial sync cron to work with the new statuses (#5512)

Closes #5507
This commit is contained in:
bosiraphael
2024-05-24 18:27:54 +02:00
committed by GitHub
parent 3de5ed3427
commit 87465b13ee
31 changed files with 1185 additions and 115 deletions

View File

@ -32,9 +32,9 @@ import {
MessageChannelVisibility,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import {
GmailFullSyncJobData,
GmailFullSyncJob,
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
GmailFullMessageListFetchJobData,
GmailFullMessageListFetchJob,
} from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job';
@Injectable()
export class GoogleAPIsService {
@ -156,8 +156,8 @@ export class GoogleAPIsService {
isCalendarEnabled: boolean,
) {
if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) {
await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
await this.messageQueueService.add<GmailFullMessageListFetchJobData>(
GmailFullMessageListFetchJob.name,
{
workspaceId,
connectedAccountId,

View File

@ -17,9 +17,9 @@ import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-s
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
import { CleanInactiveWorkspaceJob } from 'src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job';
import { CalendarEventParticipantModule } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.module';
import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module';
import { GmailFullSyncModule } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module';
import { GmailPartialSyncModule } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module';
import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module';
import { GmailFullMessageListFetchModule } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module';
import { GmailPartialMessageListFetchModule } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module';
import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module';
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
import { CalendarMessagingParticipantJobModule } from 'src/modules/calendar-messaging-participant/jobs/calendar-messaging-participant-job.module';
@ -41,9 +41,9 @@ import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module
BillingModule,
UserWorkspaceModule,
WorkspaceModule,
GmailFullSyncModule,
GmailFetchMessageContentFromCacheModule,
GmailPartialSyncModule,
GmailFullMessageListFetchModule,
GmailMessagesImportModule,
GmailPartialMessageListFetchModule,
CalendarEventParticipantModule,
TimelineActivityModule,
StripeModule,

View File

@ -254,4 +254,30 @@ export class ConnectedAccountRepository {
transactionManager,
);
}
public async getConnectedAccountOrThrow(
workspaceId: string,
connectedAccountId: string,
): Promise<ObjectRecord<ConnectedAccountWorkspaceEntity>> {
const connectedAccount = await this.getById(
connectedAccountId,
workspaceId,
);
if (!connectedAccount) {
throw new Error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
);
}
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}
return connectedAccount;
}
}

View File

@ -4,16 +4,16 @@ import { Command, CommandRunner } from 'nest-commander';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GmailPartialSyncCronJob } from 'src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job';
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';
const GMAIL_PARTIAL_SYNC_CRON_PATTERN = '*/5 * * * *';
@Command({
name: 'cron:messaging:gmail-partial-sync',
name: 'cron:messaging:gmail-message-list-fetch',
description:
'Starts a cron job to sync existing connected account messages and store them in the cache',
})
export class GmailPartialSyncCronCommand extends CommandRunner {
export class GmailMessageListFetchCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
@ -23,7 +23,7 @@ export class GmailPartialSyncCronCommand extends CommandRunner {
async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
GmailPartialSyncCronJob.name,
GmailMessageListFetchCronJob.name,
undefined,
{
repeat: { pattern: GMAIL_PARTIAL_SYNC_CRON_PATTERN },

View File

@ -4,13 +4,13 @@ import { Command, CommandRunner } from 'nest-commander';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GmailFetchMessagesFromCacheCronJob } from 'src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job';
import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gmail-messages-import.cron.job';
@Command({
name: 'cron:messaging:gmail-fetch-messages-from-cache',
name: 'cron:messaging:gmail-messages-import',
description: 'Starts a cron job to fetch all messages from cache',
})
export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner {
export class GmailMessagesImportCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
@ -20,7 +20,7 @@ export class GmailFetchMessagesFromCacheCronCommand extends CommandRunner {
async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
GmailFetchMessagesFromCacheCronJob.name,
GmailMessagesImportCronJob.name,
undefined,
{
repeat: {

View File

@ -2,17 +2,14 @@ import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GmailFetchMessagesFromCacheCronCommand } from 'src/modules/messaging/crons/commands/gmail-fetch-messages-from-cache.cron.command';
import { GmailPartialSyncCronCommand } from 'src/modules/messaging/crons/commands/gmail-partial-sync.cron.command';
import { GmailMessagesImportCronCommand } from 'src/modules/messaging/crons/commands/gmail-messages-import.cron.command';
import { GmailMessageListFetchCronCommand } from 'src/modules/messaging/crons/commands/gmail-message-list-fetch.cron.command';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
]),
],
providers: [
GmailPartialSyncCronCommand,
GmailFetchMessagesFromCacheCronCommand,
],
providers: [GmailMessageListFetchCronCommand, GmailMessagesImportCronCommand],
})
export class MessagingCronCommandsModule {}

View File

@ -8,19 +8,29 @@ import { MessageQueueJob } from 'src/engine/integrations/message-queue/interface
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import {
GmailPartialSyncJobData,
GmailPartialSyncJob,
} from 'src/modules/messaging/jobs/gmail-partial-sync.job';
GmailPartialMessageListFetchJobData,
GmailPartialMessageListFetchJob,
} from 'src/modules/messaging/jobs/gmail-partial-message-list-fetch.job';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import {
GmailMessageListFetchJobData,
GmailMessageListFetchJob,
} from 'src/modules/messaging/jobs/gmail-message-list-fetch.job';
@Injectable()
export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
private readonly logger = new Logger(GmailPartialSyncCronJob.name);
export class GmailMessageListFetchCronJob
implements MessageQueueJob<undefined>
{
private readonly logger = new Logger(GmailMessageListFetchCronJob.name);
constructor(
@InjectRepository(Workspace, 'core')
@ -32,6 +42,8 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly environmentService: EnvironmentService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
) {}
async handle(): Promise<void> {
@ -57,11 +69,20 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
);
for (const workspaceId of workspaceIdsWithDataSources) {
await this.enqueuePartialSyncs(workspaceId);
await this.enqueueSyncs(workspaceId);
}
}
private async enqueuePartialSyncs(workspaceId: string): Promise<void> {
private async enqueueSyncs(workspaceId: string): Promise<void> {
const isGmailSyncV2EnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId: workspaceId,
key: FeatureFlagKeys.IsGmailSyncV2Enabled,
value: true,
});
const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;
try {
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);
@ -71,16 +92,29 @@ export class GmailPartialSyncCronJob implements MessageQueueJob<undefined> {
continue;
}
await this.messageQueueService.add<GmailPartialSyncJobData>(
GmailPartialSyncJob.name,
{
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
if (isGmailSyncV2Enabled) {
await this.messageQueueService.add<GmailMessageListFetchJobData>(
GmailMessageListFetchJob.name,
{
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
} else {
await this.messageQueueService.add<GmailPartialMessageListFetchJobData>(
GmailPartialMessageListFetchJob.name,
{
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
}
}
} catch (error) {
this.logger.error(

View File

@ -1,4 +1,4 @@
import { Injectable } from '@nestjs/common';
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, In } from 'typeorm';
@ -10,13 +10,20 @@ import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-s
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service';
import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { GmailMessagesImportV2Service } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
@Injectable()
export class GmailFetchMessagesFromCacheCronJob
implements MessageQueueJob<undefined>
{
export class GmailMessagesImportCronJob implements MessageQueueJob<undefined> {
private readonly logger = new Logger(GmailMessagesImportCronJob.name);
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@ -24,8 +31,13 @@ export class GmailFetchMessagesFromCacheCronJob
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly gmailFetchMessageContentFromCacheService: GmailFetchMessageContentFromCacheService,
private readonly gmailFetchMessageContentFromCacheService: GmailMessagesImportService,
private readonly gmailFetchMessageContentFromCacheV2Service: GmailMessagesImportV2Service,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
) {}
async handle(): Promise<void> {
@ -59,11 +71,42 @@ export class GmailFetchMessagesFromCacheCronJob
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);
const isGmailSyncV2EnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId: workspaceId,
key: FeatureFlagKeys.IsGmailSyncV2Enabled,
value: true,
});
const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;
for (const messageChannel of messageChannels) {
await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache(
workspaceId,
messageChannel.connectedAccountId,
);
if (!messageChannel?.isSyncEnabled) {
continue;
}
if (isGmailSyncV2Enabled) {
try {
const connectedAccount =
await this.connectedAccountRepository.getConnectedAccountOrThrow(
workspaceId,
messageChannel.connectedAccountId,
);
await this.gmailFetchMessageContentFromCacheV2Service.processMessageBatchImport(
messageChannel,
connectedAccount,
workspaceId,
);
} catch (error) {
this.logger.log(error.message);
}
} else {
await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache(
workspaceId,
messageChannel.connectedAccountId,
);
}
}
}
}

View File

@ -5,9 +5,9 @@ import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { GmailFetchMessagesFromCacheCronJob } from 'src/modules/messaging/crons/jobs/gmail-fetch-messages-from-cache.cron.job';
import { GmailPartialSyncCronJob } from 'src/modules/messaging/crons/jobs/gmail-partial-sync.cron.job';
import { GmailFetchMessageContentFromCacheModule } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.module';
import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gmail-messages-import.cron.job';
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';
import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
@ -15,16 +15,16 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
TypeOrmModule.forFeature([Workspace, FeatureFlagEntity], 'core'),
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]),
GmailFetchMessageContentFromCacheModule,
GmailMessagesImportModule,
],
providers: [
{
provide: GmailFetchMessagesFromCacheCronJob.name,
useClass: GmailFetchMessagesFromCacheCronJob,
provide: GmailMessagesImportCronJob.name,
useClass: GmailMessagesImportCronJob,
},
{
provide: GmailPartialSyncCronJob.name,
useClass: GmailPartialSyncCronJob,
provide: GmailMessageListFetchCronJob.name,
useClass: GmailMessageListFetchCronJob,
},
],
})

View File

@ -5,7 +5,7 @@ import { MessageQueueJob } from 'src/engine/integrations/message-queue/interface
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
export type BlocklistReimportMessagesJobData = {
workspaceId: string;
@ -22,7 +22,7 @@ export class BlocklistReimportMessagesJob
constructor(
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly gmailFullSyncService: GmailFullSyncService,
private readonly gmailFullSyncService: GmailFullMessageListFetchService,
) {}
async handle(data: BlocklistReimportMessagesJobData): Promise<void> {

View File

@ -3,23 +3,25 @@ import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
export type GmailFullSyncJobData = {
export type GmailFullMessageListFetchJobData = {
workspaceId: string;
connectedAccountId: string;
};
@Injectable()
export class GmailFullSyncJob implements MessageQueueJob<GmailFullSyncJobData> {
private readonly logger = new Logger(GmailFullSyncJob.name);
export class GmailFullMessageListFetchJob
implements MessageQueueJob<GmailFullMessageListFetchJobData>
{
private readonly logger = new Logger(GmailFullMessageListFetchJob.name);
constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
private readonly gmailFullSyncService: GmailFullSyncService,
private readonly gmailFullSyncService: GmailFullMessageListFetchService,
) {}
async handle(data: GmailFullSyncJobData): Promise<void> {
async handle(data: GmailFullMessageListFetchJobData): Promise<void> {
this.logger.log(
`gmail full-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`,
);

View File

@ -0,0 +1,97 @@
import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GmailPartialMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service';
import { GetConnectedAccountAndMessageChannelService } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service';
import { MessageChannelSyncSubStatus } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
export type GmailMessageListFetchJobData = {
workspaceId: string;
connectedAccountId: string;
};
@Injectable()
export class GmailMessageListFetchJob
implements MessageQueueJob<GmailMessageListFetchJobData>
{
private readonly logger = new Logger(GmailMessageListFetchJob.name);
constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
private readonly gmailFullSyncService: GmailFullMessageListFetchService,
private readonly gmailPartialMessageListFetchV2Service: GmailPartialMessageListFetchV2Service,
private readonly getConnectedAccountAndMessageChannelService: GetConnectedAccountAndMessageChannelService,
) {}
async handle(data: GmailMessageListFetchJobData): Promise<void> {
const { workspaceId, connectedAccountId } = data;
this.logger.log(
`Fetch gmail message list for workspace ${workspaceId} and account ${connectedAccountId}`,
);
try {
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
workspaceId,
connectedAccountId,
);
} catch (e) {
this.logger.error(
`Error refreshing access token for connected account ${connectedAccountId} in workspace ${workspaceId}`,
e,
);
return;
}
const { messageChannel, connectedAccount } =
await this.getConnectedAccountAndMessageChannelService.getConnectedAccountAndMessageChannelOrThrow(
workspaceId,
connectedAccountId,
);
switch (messageChannel.syncSubStatus) {
case MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING:
try {
await this.gmailPartialMessageListFetchV2Service.processMessageListFetch(
messageChannel,
connectedAccount,
workspaceId,
);
} catch (e) {
this.logger.error(e);
}
return;
case MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING:
try {
await this.gmailFullSyncService.fetchConnectedAccountThreads(
workspaceId,
connectedAccountId,
);
} catch (e) {
this.logger.error(e);
}
return;
case MessageChannelSyncSubStatus.FAILED:
this.logger.error(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is in a failed state.`,
);
return;
default:
this.logger.error(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is locked, import will be retried later.`,
);
return;
}
}
}

View File

@ -3,25 +3,25 @@ import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GmailPartialSyncV2Service } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service';
import { GmailPartialMessageListFetchService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service';
export type GmailPartialSyncJobData = {
export type GmailPartialMessageListFetchJobData = {
workspaceId: string;
connectedAccountId: string;
};
@Injectable()
export class GmailPartialSyncJob
implements MessageQueueJob<GmailPartialSyncJobData>
export class GmailPartialMessageListFetchJob
implements MessageQueueJob<GmailPartialMessageListFetchJobData>
{
private readonly logger = new Logger(GmailPartialSyncJob.name);
private readonly logger = new Logger(GmailPartialMessageListFetchJob.name);
constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
private readonly gmailPartialSyncService: GmailPartialSyncV2Service,
private readonly gmailPartialSyncService: GmailPartialMessageListFetchService,
) {}
async handle(data: GmailPartialSyncJobData): Promise<void> {
async handle(data: GmailPartialMessageListFetchJobData): Promise<void> {
this.logger.log(
`gmail partial-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`,
);

View File

@ -1,5 +1,7 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { AutoCompaniesAndContactsCreationModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/auto-companies-and-contacts-creation.module';
import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module';
@ -8,11 +10,13 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s
import { BlocklistItemDeleteMessagesJob } from 'src/modules/messaging/jobs/blocklist-item-delete-messages.job';
import { BlocklistReimportMessagesJob } from 'src/modules/messaging/jobs/blocklist-reimport-messages.job';
import { DeleteConnectedAccountAssociatedMessagingDataJob } from 'src/modules/messaging/jobs/delete-connected-account-associated-messaging-data.job';
import { GmailFullSyncJob } from 'src/modules/messaging/jobs/gmail-full-sync.job';
import { GmailPartialSyncJob } from 'src/modules/messaging/jobs/gmail-partial-sync.job';
import { GmailFullMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job';
import { GmailMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-message-list-fetch.job';
import { GmailPartialMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-partial-message-list-fetch.job';
import { MessagingCreateCompanyAndContactAfterSyncJob } from 'src/modules/messaging/jobs/messaging-create-company-and-contact-after-sync.job';
import { GmailFullSyncModule } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.module';
import { GmailPartialSyncModule } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.module';
import { GetConnectedAccountAndMessageChannelModule } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module';
import { GmailFullMessageListFetchModule } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module';
import { GmailPartialMessageListFetchModule } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module';
import { ThreadCleanerModule } from 'src/modules/messaging/services/thread-cleaner/thread-cleaner.module';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@ -27,11 +31,13 @@ import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/standar
MessageChannelMessageAssociationWorkspaceEntity,
BlocklistWorkspaceEntity,
]),
GmailFullSyncModule,
GmailPartialSyncModule,
GmailFullMessageListFetchModule,
GmailPartialMessageListFetchModule,
ThreadCleanerModule,
GoogleAPIRefreshAccessTokenModule,
AutoCompaniesAndContactsCreationModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
GetConnectedAccountAndMessageChannelModule,
],
providers: [
{
@ -43,12 +49,16 @@ import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/standar
useClass: BlocklistItemDeleteMessagesJob,
},
{
provide: GmailFullSyncJob.name,
useClass: GmailFullSyncJob,
provide: GmailFullMessageListFetchJob.name,
useClass: GmailFullMessageListFetchJob,
},
{
provide: GmailPartialSyncJob.name,
useClass: GmailPartialSyncJob,
provide: GmailPartialMessageListFetchJob.name,
useClass: GmailPartialMessageListFetchJob,
},
{
provide: GmailMessageListFetchJob.name,
useClass: GmailMessageListFetchJob,
},
{
provide: DeleteConnectedAccountAssociatedMessagingDataJob.name,

View File

@ -6,6 +6,7 @@ import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/work
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncStatus,
MessageChannelSyncSubStatus,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
@ -187,6 +188,23 @@ export class MessageChannelRepository {
);
}
public async updateSyncSubStatus(
id: string,
syncSubStatus: MessageChannelSyncSubStatus,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncSubStatus" = $1 WHERE "id" = $2`,
[syncSubStatus, id],
workspaceId,
transactionManager,
);
}
public async updateLastSyncCursorIfHigher(
id: string,
syncCursor: string,

View File

@ -0,0 +1,18 @@
import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GetConnectedAccountAndMessageChannelService } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
MessageChannelWorkspaceEntity,
]),
],
providers: [GetConnectedAccountAndMessageChannelService],
exports: [GetConnectedAccountAndMessageChannelService],
})
export class GetConnectedAccountAndMessageChannelModule {}

View File

@ -0,0 +1,62 @@
import { Injectable } from '@nestjs/common';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Injectable()
export class GetConnectedAccountAndMessageChannelService {
constructor(
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}
public async getConnectedAccountAndMessageChannelOrThrow(
workspaceId: string,
connectedAccountId: string,
): Promise<{
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>;
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>;
}> {
const connectedAccount = await this.connectedAccountRepository.getById(
connectedAccountId,
workspaceId,
);
if (!connectedAccount) {
throw new Error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
);
}
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}
const messageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);
if (!messageChannel) {
throw new Error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}
return {
messageChannel,
connectedAccount,
};
}
}

View File

@ -7,7 +7,7 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@ -25,7 +25,7 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
WorkspaceDataSourceModule,
],
providers: [GmailFullSyncService],
exports: [GmailFullSyncService],
providers: [GmailFullMessageListFetchService],
exports: [GmailFullMessageListFetchService],
})
export class GmailFullSyncModule {}
export class GmailFullMessageListFetchModule {}

View File

@ -29,8 +29,8 @@ import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/work
import { gmailSearchFilterEmailAdresses } from 'src/modules/messaging/utils/gmail-search-filter.util';
@Injectable()
export class GmailFullSyncService {
private readonly logger = new Logger(GmailFullSyncService.name);
export class GmailFullMessageListFetchService {
private readonly logger = new Logger(GmailFullMessageListFetchService.name);
constructor(
private readonly gmailClientProvider: GmailClientProvider,

View File

@ -0,0 +1,146 @@
import { Injectable, Logger } from '@nestjs/common';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { FetchMessagesByBatchesService } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.service';
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncSubStatus,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { createQueriesFromMessageIds } from 'src/modules/messaging/utils/create-queries-from-message-ids.util';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant';
import { GMAIL_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant';
import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { SaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service';
@Injectable()
export class GmailMessagesImportV2Service {
private readonly logger = new Logger(GmailMessagesImportService.name);
constructor(
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
private readonly setMessageChannelSyncStatusService: SetMessageChannelSyncStatusService,
private readonly saveMessagesAndEnqueueContactCreationService: SaveMessagesAndEnqueueContactCreationService,
) {}
async processMessageBatchImport(
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>,
workspaceId: string,
) {
if (messageChannel.syncSubStatus === MessageChannelSyncSubStatus.FAILED) {
throw new Error(
`Connected account ${connectedAccount.id} in workspace ${workspaceId} is in a failed state. Skipping...`,
);
}
if (
messageChannel.syncSubStatus !==
MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING
) {
throw new Error(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} is not pending.`,
);
}
await this.setMessageChannelSyncStatusService.setMessagesImportOnGoingStatus(
messageChannel.id,
workspaceId,
);
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`,
);
const messageIdsToFetch =
(await this.cacheStorage.setPop(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
GMAIL_USERS_MESSAGES_GET_BATCH_SIZE,
)) ?? [];
if (!messageIdsToFetch?.length) {
await this.setMessageChannelSyncStatusService.setCompletedStatus(
messageChannel.id,
workspaceId,
);
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with nothing to import or delete.`,
);
return;
}
const messageQueries = createQueriesFromMessageIds(messageIdsToFetch);
try {
const messagesToSave =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
connectedAccount.accessToken,
workspaceId,
connectedAccount.id,
);
if (!messagesToSave.length) {
await this.setMessageChannelSyncStatusService.setCompletedStatus(
messageChannel.id,
workspaceId,
);
return [];
}
await this.saveMessagesAndEnqueueContactCreationService.saveMessagesAndEnqueueContactCreationJob(
messagesToSave,
messageChannel,
connectedAccount,
workspaceId,
);
if (messageIdsToFetch.length < GMAIL_USERS_MESSAGES_GET_BATCH_SIZE) {
await this.setMessageChannelSyncStatusService.setCompletedStatus(
messageChannel.id,
workspaceId,
);
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with no more messages to import.`,
);
} else {
await this.setMessageChannelSyncStatusService.setMessagesImportPendingStatus(
messageChannel.id,
workspaceId,
);
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with more messages to import.`,
);
}
} catch (error) {
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
messageIdsToFetch,
);
await this.setMessageChannelSyncStatusService.setFailedUnkownStatus(
messageChannel.id,
workspaceId,
);
this.logger.error(
`Error fetching messages for ${connectedAccount.id} in workspace ${workspaceId}: locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`,
);
throw new Error(
`Error fetching messages for ${connectedAccount.id} in workspace ${workspaceId}: ${error.message}`,
);
}
}
}

View File

@ -4,9 +4,12 @@ import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repos
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service';
import { GmailMessagesImportV2Service } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service';
import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
import { SaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service';
import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module';
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
@ -20,8 +23,13 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
WorkspaceDataSourceModule,
MessageModule,
MessageParticipantModule,
SetMessageChannelSyncStatusModule,
],
providers: [GmailFetchMessageContentFromCacheService],
exports: [GmailFetchMessageContentFromCacheService],
providers: [
GmailMessagesImportService,
GmailMessagesImportV2Service,
SaveMessagesAndEnqueueContactCreationService,
],
exports: [GmailMessagesImportService, GmailMessagesImportV2Service],
})
export class GmailFetchMessageContentFromCacheModule {}
export class GmailMessagesImportModule {}

View File

@ -18,9 +18,9 @@ import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache
import { GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import {
GmailFullSyncJobData,
GmailFullSyncJob,
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
GmailFullMessageListFetchJobData,
GmailFullMessageListFetchJob,
} from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GMAIL_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant';
@ -33,10 +33,8 @@ import {
} from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job';
@Injectable()
export class GmailFetchMessageContentFromCacheService {
private readonly logger = new Logger(
GmailFetchMessageContentFromCacheService.name,
);
export class GmailMessagesImportService {
private readonly logger = new Logger(GmailMessagesImportService.name);
constructor(
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
@ -296,8 +294,8 @@ export class GmailFetchMessageContentFromCacheService {
workspaceId: string,
connectedAccountId: string,
) {
await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
await this.messageQueueService.add<GmailFullMessageListFetchJobData>(
GmailFullMessageListFetchJob.name,
{ workspaceId, connectedAccountId },
);
}

View File

@ -0,0 +1,96 @@
import { Injectable, Inject } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageParticipantService } from 'src/modules/messaging/services/message-participant/message-participant.service';
import { MessageService } from 'src/modules/messaging/services/message/message.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import {
GmailMessage,
ParticipantWithMessageId,
} from 'src/modules/messaging/types/gmail-message';
import {
CreateCompanyAndContactJobData,
CreateCompanyAndContactJob,
} from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job';
@Injectable()
export class SaveMessagesAndEnqueueContactCreationService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly messageService: MessageService,
private readonly messageParticipantService: MessageParticipantService,
) {}
async saveMessagesAndEnqueueContactCreationJob(
messagesToSave: GmailMessage[],
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>,
workspaceId: string,
) {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
const participantsWithMessageId = await workspaceDataSource?.transaction(
async (transactionManager: EntityManager) => {
const messageExternalIdsAndIdsMap =
await this.messageService.saveMessagesWithinTransaction(
messagesToSave,
connectedAccount,
messageChannel.id,
workspaceId,
transactionManager,
);
const participantsWithMessageId: (ParticipantWithMessageId & {
shouldCreateContact: boolean;
})[] = messagesToSave.flatMap((message) => {
const messageId = messageExternalIdsAndIdsMap.get(message.externalId);
return messageId
? message.participants.map((participant) => ({
...participant,
messageId,
shouldCreateContact:
messageChannel.isContactAutoCreationEnabled &&
message.participants.find((p) => p.role === 'from')
?.handle === connectedAccount.handle,
}))
: [];
});
await this.messageParticipantService.saveMessageParticipants(
participantsWithMessageId,
workspaceId,
transactionManager,
);
return participantsWithMessageId;
},
);
if (messageChannel.isContactAutoCreationEnabled) {
const contactsToCreate = participantsWithMessageId.filter(
(participant) => participant.shouldCreateContact,
);
await this.messageQueueService.add<CreateCompanyAndContactJobData>(
CreateCompanyAndContactJob.name,
{
workspaceId,
connectedAccountHandle: connectedAccount.handle,
contactsToCreate,
},
);
}
}
}

View File

@ -0,0 +1,104 @@
import { Injectable } from '@nestjs/common';
import { gmail_v1 } from 'googleapis';
import { GMAIL_USERS_HISTORY_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-history-max-result.constant';
import { GmailError } from 'src/modules/messaging/types/gmail-error';
@Injectable()
export class GmailGetHistoryService {
constructor() {}
public async getHistory(
gmailClient: gmail_v1.Gmail,
lastSyncHistoryId: string,
): Promise<{
history: gmail_v1.Schema$History[];
historyId?: string | null;
error?: GmailError;
}> {
const fullHistory: gmail_v1.Schema$History[] = [];
let pageToken: string | undefined;
let hasMoreMessages = true;
let nextHistoryId: string | undefined;
while (hasMoreMessages) {
try {
const response = await gmailClient.users.history.list({
userId: 'me',
maxResults: GMAIL_USERS_HISTORY_MAX_RESULT,
pageToken,
startHistoryId: lastSyncHistoryId,
historyTypes: ['messageAdded', 'messageDeleted'],
});
nextHistoryId = response?.data?.historyId ?? undefined;
if (response?.data?.history) {
fullHistory.push(...response.data.history);
}
pageToken = response?.data?.nextPageToken ?? undefined;
hasMoreMessages = !!pageToken;
} catch (error) {
const errorData = error?.response?.data?.error;
if (errorData) {
return {
history: [],
error: errorData,
historyId: lastSyncHistoryId,
};
}
throw error;
}
}
return { history: fullHistory, historyId: nextHistoryId };
}
public async getMessageIdsFromHistory(
history: gmail_v1.Schema$History[],
): Promise<{
messagesAdded: string[];
messagesDeleted: string[];
}> {
const { messagesAdded, messagesDeleted } = history.reduce(
(
acc: {
messagesAdded: string[];
messagesDeleted: string[];
},
history,
) => {
const messagesAdded = history.messagesAdded?.map(
(messageAdded) => messageAdded.message?.id || '',
);
const messagesDeleted = history.messagesDeleted?.map(
(messageDeleted) => messageDeleted.message?.id || '',
);
if (messagesAdded) acc.messagesAdded.push(...messagesAdded);
if (messagesDeleted) acc.messagesDeleted.push(...messagesDeleted);
return acc;
},
{ messagesAdded: [], messagesDeleted: [] },
);
const uniqueMessagesAdded = messagesAdded.filter(
(messageId) => !messagesDeleted.includes(messageId),
);
const uniqueMessagesDeleted = messagesDeleted.filter(
(messageId) => !messagesAdded.includes(messageId),
);
return {
messagesAdded: uniqueMessagesAdded,
messagesDeleted: uniqueMessagesDeleted,
};
}
}

View File

@ -0,0 +1,122 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import {
MessageChannelSyncStatus,
MessageChannelSyncSubStatus,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GmailError } from 'src/modules/messaging/types/gmail-error';
@Injectable()
export class GmailPartialMessageListFetchErrorHandlingService {
private readonly logger = new Logger(
GmailPartialMessageListFetchErrorHandlingService.name,
);
constructor(
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}
public async handleGmailError(
error: GmailError | undefined,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
connectedAccountId: string,
workspaceId: string,
): Promise<void> {
switch (error?.code) {
case 404:
this.logger.log(
`404: Invalid lastSyncHistoryId for workspace ${workspaceId} and account ${connectedAccountId}, falling back to full sync.`,
);
await this.messageChannelRepository.resetSyncCursor(
messageChannel.id,
workspaceId,
);
await this.messageChannelRepository.updateSyncSubStatus(
messageChannel.id,
MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
workspaceId,
);
break;
case 429:
this.logger.log(
`429: rate limit reached for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}, import will be retried later.`,
);
await this.handleRateLimitExceeded(messageChannel, workspaceId);
break;
case 403:
if (
error?.errors?.[0]?.reason === 'rateLimitExceeded' ||
error?.errors?.[0]?.reason === 'userRateLimitExceeded'
) {
this.logger.log(
`403:${
error?.errors?.[0]?.reason === 'userRateLimitExceeded' && ' user'
} rate limit exceeded for workspace ${workspaceId} and account ${connectedAccountId}: ${
error.message
}, import will be retried later.`,
);
this.handleRateLimitExceeded(messageChannel, workspaceId);
} else {
await this.handleInsufficientPermissions(
error,
messageChannel,
workspaceId,
);
}
break;
case 401:
this.handleInsufficientPermissions(error, messageChannel, workspaceId);
break;
default:
break;
}
}
public async handleRateLimitExceeded(
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannel.id,
MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
workspaceId,
);
}
public async handleInsufficientPermissions(
error: GmailError,
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
this.logger.error(
`{error?.code}: ${error.message} for workspace ${workspaceId} and account ${messageChannel.connectedAccount.id}`,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannel.id,
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
workspaceId,
);
await this.messageChannelRepository.updateSyncSubStatus(
messageChannel.id,
MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
workspaceId,
);
await this.connectedAccountRepository.updateAuthFailedAt(
messageChannel.connectedAccount.id,
workspaceId,
);
}
}

View File

@ -0,0 +1,152 @@
import { Injectable, Logger } from '@nestjs/common';
import { gmail_v1 } from 'googleapis';
import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository';
import { GmailPartialMessageListFetchErrorHandlingService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service';
import { GmailGetHistoryService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service';
import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
@Injectable()
export class GmailPartialMessageListFetchV2Service {
private readonly logger = new Logger(
GmailPartialMessageListFetchV2Service.name,
);
constructor(
private readonly gmailClientProvider: GmailClientProvider,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
private readonly gmailPartialMessageListFetchErrorHandlingService: GmailPartialMessageListFetchErrorHandlingService,
private readonly gmailGetHistoryService: GmailGetHistoryService,
private readonly setMessageChannelSyncStatusService: SetMessageChannelSyncStatusService,
) {}
public async processMessageListFetch(
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
this.logger.log(
`Fetching partial message list for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.setMessageChannelSyncStatusService.setMessageListFetchOnGoingStatus(
messageChannel.id,
workspaceId,
);
const lastSyncHistoryId = messageChannel.syncCursor;
if (!lastSyncHistoryId) {
this.logger.log(
`No lastSyncHistoryId for workspace ${workspaceId} and account ${connectedAccount.id}, falling back to full sync.`,
);
await this.setMessageChannelSyncStatusService.setFullMessageListFetchPendingStatus(
messageChannel.id,
workspaceId,
);
return;
}
const gmailClient: gmail_v1.Gmail =
await this.gmailClientProvider.getGmailClient(
connectedAccount.refreshToken,
);
const { history, historyId, error } =
await this.gmailGetHistoryService.getHistory(
gmailClient,
lastSyncHistoryId,
);
if (error) {
await this.gmailPartialMessageListFetchErrorHandlingService.handleGmailError(
error,
messageChannel,
workspaceId,
connectedAccount.id,
);
return;
}
if (!historyId) {
throw new Error(
`No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`,
);
}
if (historyId === lastSyncHistoryId || !history?.length) {
this.logger.log(
`Partial message list import done with history ${historyId} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.setMessageChannelSyncStatusService.setCompletedStatus(
messageChannel.id,
workspaceId,
);
return;
}
const { messagesAdded, messagesDeleted } =
await this.gmailGetHistoryService.getMessageIdsFromHistory(history);
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
messagesAdded,
);
this.logger.log(
`Added ${messagesAdded.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messageChannelMessageAssociationRepository.deleteByMessageExternalIdsAndMessageChannelId(
messagesDeleted,
messageChannel.id,
workspaceId,
);
this.logger.log(
`Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messageChannelRepository.updateLastSyncCursorIfHigher(
messageChannel.id,
historyId,
workspaceId,
);
this.logger.log(
`Updated lastSyncCursor to ${historyId} for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
this.logger.log(
`Partial message list import done with history ${historyId} for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.setMessageChannelSyncStatusService.setMessagesImportPendingStatus(
messageChannel.id,
workspaceId,
);
}
}

View File

@ -7,9 +7,13 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
import { GmailPartialSyncV2Service as GmailPartialSyncService } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service';
import { GmailGetHistoryService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service';
import { GmailPartialMessageListFetchErrorHandlingService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service';
import { GmailPartialMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service';
import { GmailPartialMessageListFetchService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service';
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
@ -24,8 +28,17 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
MessageModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
WorkspaceDataSourceModule,
SetMessageChannelSyncStatusModule,
],
providers: [
GmailPartialMessageListFetchService,
GmailPartialMessageListFetchV2Service,
GmailPartialMessageListFetchErrorHandlingService,
GmailGetHistoryService,
],
exports: [
GmailPartialMessageListFetchService,
GmailPartialMessageListFetchV2Service,
],
providers: [GmailPartialSyncService],
exports: [GmailPartialSyncService],
})
export class GmailPartialSyncModule {}
export class GmailPartialMessageListFetchModule {}

View File

@ -21,15 +21,17 @@ import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decora
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import {
GmailFullSyncJob,
GmailFullSyncJobData,
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
GmailFullMessageListFetchJob,
GmailFullMessageListFetchJobData,
} from 'src/modules/messaging/jobs/gmail-full-message-list-fetch.job';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository';
@Injectable()
export class GmailPartialSyncV2Service {
private readonly logger = new Logger(GmailPartialSyncV2Service.name);
export class GmailPartialMessageListFetchService {
private readonly logger = new Logger(
GmailPartialMessageListFetchService.name,
);
constructor(
private readonly gmailClientProvider: GmailClientProvider,
@ -338,8 +340,8 @@ export class GmailPartialSyncV2Service {
workspaceId: string,
connectedAccountId: string,
) {
await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
await this.messageQueueService.add<GmailFullMessageListFetchJobData>(
GmailFullMessageListFetchJob.name,
{ workspaceId, connectedAccountId },
);
}

View File

@ -0,0 +1,14 @@
import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]),
],
providers: [SetMessageChannelSyncStatusService],
exports: [SetMessageChannelSyncStatusService],
})
export class SetMessageChannelSyncStatusModule {}

View File

@ -0,0 +1,101 @@
import { Injectable } from '@nestjs/common';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncSubStatus,
MessageChannelSyncStatus,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Injectable()
export class SetMessageChannelSyncStatusService {
constructor(
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}
public async setMessageListFetchOnGoingStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_LIST_FETCH_ONGOING,
workspaceId,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.ONGOING,
workspaceId,
);
}
public async setFullMessageListFetchPendingStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
workspaceId,
);
}
public async setCompletedStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
workspaceId,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.COMPLETED,
workspaceId,
);
}
public async setMessagesImportPendingStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING,
workspaceId,
);
}
public async setMessagesImportOnGoingStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING,
workspaceId,
);
}
public async setFailedUnkownStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.FAILED,
workspaceId,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.FAILED_UNKNOWN,
workspaceId,
);
}
}

View File

@ -37,6 +37,7 @@ export enum MessageChannelSyncSubStatus {
MESSAGES_LIST_FETCH_ONGOING = 'MESSAGES_LIST_FETCH_ONGOING',
MESSAGES_IMPORT_PENDING = 'MESSAGES_IMPORT_PENDING',
MESSAGES_IMPORT_ONGOING = 'MESSAGES_IMPORT_ONGOING',
FAILED = 'FAILED',
}
export enum MessageChannelVisibility {
@ -262,6 +263,12 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity {
position: 4,
color: 'orange',
},
{
value: MessageChannelSyncSubStatus.FAILED,
label: 'Failed',
position: 5,
color: 'red',
},
],
defaultValue: `'${MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING}'`,
})