5531 update gmail full sync to v2 (#5674)

Closes #5531

---------

Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
bosiraphael
2024-05-31 13:29:58 +02:00
committed by GitHub
parent fe941c64be
commit f166171a1c
39 changed files with 1062 additions and 581 deletions

View File

@ -21,3 +21,5 @@ STORAGE_TYPE=local
# STORAGE_S3_REGION=eu-west3
# STORAGE_S3_NAME=my-bucket
# STORAGE_S3_ENDPOINT=
MESSAGE_QUEUE_TYPE=pg-boss

View File

@ -13,6 +13,7 @@ services:
PG_DATABASE_URL: postgres://twenty:twenty@${PG_DATABASE_HOST}/default
SERVER_URL: ${SERVER_URL}
FRONT_BASE_URL: ${FRONT_BASE_URL:-$SERVER_URL}
MESSAGE_QUEUE_TYPE: ${MESSAGE_QUEUE_TYPE}
ENABLE_DB_MIGRATIONS: "true"
@ -35,6 +36,32 @@ services:
retries: 10
restart: always
worker:
image: twentycrm/twenty:${TAG}
volumes:
- worker-local-data:/app/${STORAGE_LOCAL_PATH:-.local-storage}
command: ["yarn", "worker:prod"]
environment:
PG_DATABASE_URL: postgres://twenty:twenty@${PG_DATABASE_HOST}/default
SERVER_URL: ${SERVER_URL}
FRONT_BASE_URL: ${FRONT_BASE_URL:-$SERVER_URL}
MESSAGE_QUEUE_TYPE: ${MESSAGE_QUEUE_TYPE}
ENABLE_DB_MIGRATIONS: "true"
STORAGE_TYPE: ${STORAGE_TYPE}
STORAGE_S3_REGION: ${STORAGE_S3_REGION}
STORAGE_S3_NAME: ${STORAGE_S3_NAME}
STORAGE_S3_ENDPOINT: ${STORAGE_S3_ENDPOINT}
ACCESS_TOKEN_SECRET: ${ACCESS_TOKEN_SECRET}
LOGIN_TOKEN_SECRET: ${LOGIN_TOKEN_SECRET}
REFRESH_TOKEN_SECRET: ${REFRESH_TOKEN_SECRET}
FILE_TOKEN_SECRET: ${FILE_TOKEN_SECRET}
depends_on:
db:
condition: service_healthy
restart: always
db:
image: twentycrm/twenty-postgres:${TAG}
volumes:
@ -51,3 +78,4 @@ services:
volumes:
db-data:
server-local-data:
worker-local-data:

View File

@ -17,6 +17,15 @@ import TabItem from '@theme/TabItem';
<DocCardList/>
# Setup Messaging & Calendar sync
Twenty offers integrations with Gmail and Google Calendar. To enable these features, you need to connect to register the following recurring jobs:
```
# from your worker container
yarn command:prod cron:messaging:gmail-messages-import
yarn command:prod cron:messaging:gmail-message-list-fetch
```
# Setup Environment Variables
## Frontend
@ -209,3 +218,4 @@ import TabItem from '@theme/TabItem';
['CAPTCHA_SITE_KEY', '', 'The captcha site key'],
['CAPTCHA_SECRET_KEY', '', 'The captcha secret key'],
]}></OptionTable>

View File

@ -43,7 +43,7 @@ export const seedMessageChannel = async (
handle: 'tim@apple.dev',
visibility: 'share_everything',
syncSubStatus:
MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
},
{
id: DEV_SEED_MESSAGE_CHANNEL_IDS.JONY,
@ -56,7 +56,7 @@ export const seedMessageChannel = async (
handle: 'jony.ive@apple.dev',
visibility: 'share_everything',
syncSubStatus:
MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
},
{
id: DEV_SEED_MESSAGE_CHANNEL_IDS.PHIL,
@ -69,7 +69,7 @@ export const seedMessageChannel = async (
handle: 'phil.schiler@apple.dev',
visibility: 'share_everything',
syncSubStatus:
MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
},
])
.execute();

View File

@ -2,7 +2,6 @@ import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { AnalyticsService } from 'src/engine/core-modules/analytics/analytics.service';
import { CreateAnalyticsInput } from 'src/engine/core-modules/analytics/dto/create-analytics.input';
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';
@Injectable()
@ -11,14 +10,13 @@ export class TelemetryListener {
@OnEvent('*.created')
async handleAllCreate(payload: ObjectRecordCreateEvent<any>) {
this.analyticsService.create(
await this.analyticsService.create(
{
type: 'track',
name: payload.name,
data: JSON.parse(`{
"eventName": "${payload.name}"
}`),
} as CreateAnalyticsInput,
data: {
eventName: payload.name,
},
},
payload.userId,
payload.workspaceId,
'', // voluntarely not retrieving this

View File

@ -12,7 +12,7 @@ import { User } from 'src/engine/core-modules/user/user.entity';
import { AnalyticsService } from './analytics.service';
import { Analytics } from './analytics.entity';
import { CreateAnalyticsInput } from './dto/create-analytics.input';
import { CreateAnalyticsInput } from './dtos/create-analytics.input';
@UseGuards(OptionalJwtAuthGuard)
@Resolver(() => Analytics)

View File

@ -4,7 +4,10 @@ import { HttpService } from '@nestjs/axios';
import { anonymize } from 'src/utils/anonymize';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { CreateAnalyticsInput } from './dto/create-analytics.input';
type CreateEventInput = {
type: string;
data: object;
};
@Injectable()
export class AnalyticsService {
@ -16,7 +19,7 @@ export class AnalyticsService {
) {}
async create(
createEventInput: CreateAnalyticsInput,
createEventInput: CreateEventInput,
userId: string | undefined,
workspaceId: string | undefined,
workspaceDisplayName: string | undefined,

View File

@ -270,14 +270,6 @@ export class ConnectedAccountRepository {
);
}
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}
return connectedAccount;
}
}

View File

@ -3,12 +3,14 @@ import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GmailErrorHandlingModule } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
]),
GmailErrorHandlingModule,
],
providers: [GoogleAPIRefreshAccessTokenService],
exports: [GoogleAPIRefreshAccessTokenService],

View File

@ -6,6 +6,9 @@ import { EnvironmentService } from 'src/engine/integrations/environment/environm
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { GmailErrorHandlingService } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Injectable()
export class GoogleAPIRefreshAccessTokenService {
@ -13,6 +16,9 @@ export class GoogleAPIRefreshAccessTokenService {
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly gmailErrorHandlingService: GmailErrorHandlingService,
) {}
async refreshAndSaveAccessToken(
@ -30,12 +36,6 @@ export class GoogleAPIRefreshAccessTokenService {
);
}
if (connectedAccount.authFailedAt) {
throw new Error(
`Skipping refresh of access token for connected account ${connectedAccountId} in workspace ${workspaceId} because auth already failed, a new refresh token is needed`,
);
}
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
@ -44,50 +44,55 @@ export class GoogleAPIRefreshAccessTokenService {
);
}
const accessToken = await this.refreshAccessToken(
refreshToken,
connectedAccountId,
workspaceId,
);
await this.connectedAccountRepository.updateAccessToken(
accessToken,
connectedAccountId,
workspaceId,
);
}
async refreshAccessToken(
refreshToken: string,
connectedAccountId: string,
workspaceId: string,
): Promise<string> {
try {
const response = await axios.post(
'https://oauth2.googleapis.com/token',
{
client_id: this.environmentService.get('AUTH_GOOGLE_CLIENT_ID'),
client_secret: this.environmentService.get(
'AUTH_GOOGLE_CLIENT_SECRET',
),
refresh_token: refreshToken,
grant_type: 'refresh_token',
},
{
headers: {
'Content-Type': 'application/json',
},
},
);
const accessToken = await this.refreshAccessToken(refreshToken);
return response.data.access_token;
} catch (error) {
await this.connectedAccountRepository.updateAuthFailedAt(
await this.connectedAccountRepository.updateAccessToken(
accessToken,
connectedAccountId,
workspaceId,
);
} catch (error) {
const messageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);
throw new Error(`Error refreshing access token: ${error.message}`);
if (!messageChannel) {
throw new Error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}
await this.gmailErrorHandlingService.handleGmailError(
{
code: error.code,
reason: error.response.data.error,
},
'messages-import',
messageChannel,
workspaceId,
);
}
}
async refreshAccessToken(refreshToken: string): Promise<string> {
const response = await axios.post(
'https://oauth2.googleapis.com/token',
{
client_id: this.environmentService.get('AUTH_GOOGLE_CLIENT_ID'),
client_secret: this.environmentService.get('AUTH_GOOGLE_CLIENT_SECRET'),
refresh_token: refreshToken,
grant_type: 'refresh_token',
},
{
headers: {
'Content-Type': 'application/json',
},
},
);
return response.data.access_token;
}
}

View File

@ -6,7 +6,7 @@ import { MessageQueue } from 'src/engine/integrations/message-queue/message-queu
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';
const GMAIL_PARTIAL_SYNC_CRON_PATTERN = '*/5 * * * *';
const GMAIL_MESSAGE_LIST_FETCH_CRON_PATTERN = '*/5 * * * *';
@Command({
name: 'cron:messaging:gmail-message-list-fetch',
@ -26,7 +26,7 @@ export class GmailMessageListFetchCronCommand extends CommandRunner {
GmailMessageListFetchCronJob.name,
undefined,
{
repeat: { pattern: GMAIL_PARTIAL_SYNC_CRON_PATTERN },
repeat: { pattern: GMAIL_MESSAGE_LIST_FETCH_CRON_PATTERN },
},
);
}

View File

@ -99,9 +99,6 @@ export class GmailMessageListFetchCronJob
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
} else {
await this.messageQueueService.add<GmailPartialMessageListFetchJobData>(
@ -110,9 +107,6 @@ export class GmailMessageListFetchCronJob
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
{
retryLimit: 2,
},
);
}
}

View File

@ -19,6 +19,7 @@ import {
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
@Injectable()
export class GmailMessagesImportCronJob implements MessageQueueJob<undefined> {
@ -38,6 +39,7 @@ export class GmailMessagesImportCronJob implements MessageQueueJob<undefined> {
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly messagingTelemetryService: MessagingTelemetryService,
) {}
async handle(): Promise<void> {
@ -86,21 +88,24 @@ export class GmailMessagesImportCronJob implements MessageQueueJob<undefined> {
}
if (isGmailSyncV2Enabled) {
try {
const connectedAccount =
await this.connectedAccountRepository.getConnectedAccountOrThrow(
workspaceId,
messageChannel.connectedAccountId,
);
await this.messagingTelemetryService.track({
eventName: 'messages_import.triggered',
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
});
await this.gmailFetchMessageContentFromCacheV2Service.processMessageBatchImport(
messageChannel,
connectedAccount,
const connectedAccount =
await this.connectedAccountRepository.getConnectedAccountOrThrow(
workspaceId,
messageChannel.connectedAccountId,
);
} catch (error) {
this.logger.log(error.message);
}
await this.gmailFetchMessageContentFromCacheV2Service.processMessageBatchImport(
messageChannel,
connectedAccount,
workspaceId,
);
} else {
await this.gmailFetchMessageContentFromCacheService.fetchMessageContentFromCache(
workspaceId,

View File

@ -9,6 +9,7 @@ import { GmailMessagesImportCronJob } from 'src/modules/messaging/crons/jobs/gma
import { GmailMessageListFetchCronJob } from 'src/modules/messaging/crons/jobs/gmail-message-list-fetch.cron.job';
import { GmailMessagesImportModule } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { MessagingTelemetryModule } from 'src/modules/messaging/services/telemetry/messaging-telemetry.module';
@Module({
imports: [
@ -16,6 +17,7 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]),
GmailMessagesImportModule,
MessagingTelemetryModule,
],
providers: [
{

View File

@ -22,7 +22,7 @@ export class BlocklistReimportMessagesJob
constructor(
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly gmailFullSyncService: GmailFullMessageListFetchService,
private readonly gmailFullMessageListFetchService: GmailFullMessageListFetchService,
) {}
async handle(data: BlocklistReimportMessagesJobData): Promise<void> {
@ -46,7 +46,7 @@ export class BlocklistReimportMessagesJob
return;
}
await this.gmailFullSyncService.fetchConnectedAccountThreads(
await this.gmailFullMessageListFetchService.fetchConnectedAccountThreads(
workspaceId,
connectedAccount[0].id,
[handle],

View File

@ -1,9 +1,23 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
import { GmailFullMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
export type GmailFullMessageListFetchJobData = {
workspaceId: string;
@ -18,31 +32,95 @@ export class GmailFullMessageListFetchJob
constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
private readonly gmailFullSyncService: GmailFullMessageListFetchService,
private readonly gmailFullMessageListFetchService: GmailFullMessageListFetchService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
private readonly gmailFullMessageListFetchV2Service: GmailFullMessageListFetchV2Service,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly messagingTelemetryService: MessagingTelemetryService,
) {}
async handle(data: GmailFullMessageListFetchJobData): Promise<void> {
const { workspaceId, connectedAccountId } = data;
this.logger.log(
`gmail full-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`,
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}`,
);
try {
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
data.workspaceId,
data.connectedAccountId,
workspaceId,
connectedAccountId,
);
} catch (e) {
this.logger.error(
`Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`,
`Error refreshing access token for connected account ${connectedAccountId} in workspace ${workspaceId}`,
e,
);
return;
}
await this.gmailFullSyncService.fetchConnectedAccountThreads(
data.workspaceId,
data.connectedAccountId,
);
const isGmailSyncV2EnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId: workspaceId,
key: FeatureFlagKeys.IsGmailSyncV2Enabled,
value: true,
});
const isGmailSyncV2Enabled = isGmailSyncV2EnabledFeatureFlag?.value;
if (isGmailSyncV2Enabled) {
// Todo delete this code block after migration
const connectedAccount = await this.connectedAccountRepository.getById(
connectedAccountId,
workspaceId,
);
if (!connectedAccount) {
throw new Error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
);
}
const messageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);
if (!messageChannel) {
throw new Error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}
await this.messagingTelemetryService.track({
eventName: 'full_message_list_fetch.started',
workspaceId,
connectedAccountId,
});
await this.gmailFullMessageListFetchV2Service.processMessageListFetch(
messageChannel,
connectedAccount,
workspaceId,
);
await this.messagingTelemetryService.track({
eventName: 'full_message_list_fetch.completed',
workspaceId,
connectedAccountId,
messageChannelId: messageChannel.id,
});
} else {
await this.gmailFullMessageListFetchService.fetchConnectedAccountThreads(
data.workspaceId,
data.connectedAccountId,
);
}
}
}

View File

@ -2,11 +2,17 @@ import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GmailPartialMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service';
import { GetConnectedAccountAndMessageChannelService } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service';
import { MessageChannelSyncSubStatus } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
import {
MessageChannelSyncSubStatus,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GmailFullMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
export type GmailMessageListFetchJobData = {
workspaceId: string;
@ -20,78 +26,110 @@ export class GmailMessageListFetchJob
private readonly logger = new Logger(GmailMessageListFetchJob.name);
constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
private readonly gmailFullSyncService: GmailFullMessageListFetchService,
private readonly gmailFullMessageListFetchV2Service: GmailFullMessageListFetchV2Service,
private readonly gmailPartialMessageListFetchV2Service: GmailPartialMessageListFetchV2Service,
private readonly getConnectedAccountAndMessageChannelService: GetConnectedAccountAndMessageChannelService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly messagingTelemetryService: MessagingTelemetryService,
) {}
async handle(data: GmailMessageListFetchJobData): Promise<void> {
const { workspaceId, connectedAccountId } = data;
this.logger.log(
`Fetch gmail message list for workspace ${workspaceId} and account ${connectedAccountId}`,
await this.messagingTelemetryService.track({
eventName: 'message_list_fetch_job.triggered',
workspaceId,
connectedAccountId,
});
const connectedAccount = await this.connectedAccountRepository.getById(
connectedAccountId,
workspaceId,
);
try {
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
if (!connectedAccount) {
await this.messagingTelemetryService.track({
eventName: 'message_list_fetch_job.error.connected_account_not_found',
workspaceId,
connectedAccountId,
);
} catch (e) {
this.logger.error(
`Error refreshing access token for connected account ${connectedAccountId} in workspace ${workspaceId}`,
e,
);
});
return;
}
const { messageChannel, connectedAccount } =
await this.getConnectedAccountAndMessageChannelService.getConnectedAccountAndMessageChannelOrThrow(
workspaceId,
const messageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);
if (!messageChannel) {
await this.messagingTelemetryService.track({
eventName: 'message_list_fetch_job.error.message_channel_not_found',
workspaceId,
connectedAccountId,
});
return;
}
switch (messageChannel.syncSubStatus) {
case MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING:
try {
await this.gmailPartialMessageListFetchV2Service.processMessageListFetch(
messageChannel,
connectedAccount,
workspaceId,
);
} catch (e) {
this.logger.error(e);
}
return;
case MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING:
try {
await this.gmailFullSyncService.fetchConnectedAccountThreads(
workspaceId,
connectedAccountId,
);
} catch (e) {
this.logger.error(e);
}
return;
case MessageChannelSyncSubStatus.FAILED:
this.logger.error(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is in a failed state.`,
case MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
`Fetching partial message list for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
return;
await this.messagingTelemetryService.track({
eventName: 'partial_message_list_fetch.started',
workspaceId,
connectedAccountId,
});
await this.gmailPartialMessageListFetchV2Service.processMessageListFetch(
messageChannel,
connectedAccount,
workspaceId,
);
await this.messagingTelemetryService.track({
eventName: 'partial_message_list_fetch.completed',
workspaceId,
connectedAccountId,
messageChannelId: messageChannel.id,
});
break;
case MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
`Fetching full message list for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messagingTelemetryService.track({
eventName: 'full_message_list_fetch.started',
workspaceId,
connectedAccountId,
});
await this.gmailFullMessageListFetchV2Service.processMessageListFetch(
messageChannel,
connectedAccount,
workspaceId,
);
await this.messagingTelemetryService.track({
eventName: 'full_message_list_fetch.completed',
workspaceId,
connectedAccountId,
messageChannelId: messageChannel.id,
});
break;
default:
this.logger.error(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is locked, import will be retried later.`,
);
return;
break;
}
}
}

View File

@ -14,9 +14,10 @@ import { GmailFullMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-f
import { GmailMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-message-list-fetch.job';
import { GmailPartialMessageListFetchJob } from 'src/modules/messaging/jobs/gmail-partial-message-list-fetch.job';
import { MessagingCreateCompanyAndContactAfterSyncJob } from 'src/modules/messaging/jobs/messaging-create-company-and-contact-after-sync.job';
import { GetConnectedAccountAndMessageChannelModule } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.module';
import { GmailFullMessageListFetchModule } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.module';
import { GmailPartialMessageListFetchModule } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.module';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module';
import { MessagingTelemetryModule } from 'src/modules/messaging/services/telemetry/messaging-telemetry.module';
import { ThreadCleanerModule } from 'src/modules/messaging/services/thread-cleaner/thread-cleaner.module';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@ -31,13 +32,14 @@ import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/standar
MessageChannelMessageAssociationWorkspaceEntity,
BlocklistWorkspaceEntity,
]),
MessagingTelemetryModule,
GmailFullMessageListFetchModule,
GmailPartialMessageListFetchModule,
ThreadCleanerModule,
GoogleAPIRefreshAccessTokenModule,
AutoCompaniesAndContactsCreationModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
GetConnectedAccountAndMessageChannelModule,
SetMessageChannelSyncStatusModule,
],
providers: [
{

View File

@ -1,18 +0,0 @@
import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GetConnectedAccountAndMessageChannelService } from 'src/modules/messaging/services/get-connected-account-and-message-channel/get-connected-account-and-message-channel.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
MessageChannelWorkspaceEntity,
]),
],
providers: [GetConnectedAccountAndMessageChannelService],
exports: [GetConnectedAccountAndMessageChannelService],
})
export class GetConnectedAccountAndMessageChannelModule {}

View File

@ -1,62 +0,0 @@
import { Injectable } from '@nestjs/common';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Injectable()
export class GetConnectedAccountAndMessageChannelService {
constructor(
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}
public async getConnectedAccountAndMessageChannelOrThrow(
workspaceId: string,
connectedAccountId: string,
): Promise<{
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>;
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>;
}> {
const connectedAccount = await this.connectedAccountRepository.getById(
connectedAccountId,
workspaceId,
);
if (!connectedAccount) {
throw new Error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
);
}
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}
const messageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);
if (!messageChannel) {
throw new Error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
);
}
return {
messageChannel,
connectedAccount,
};
}
}

View File

@ -0,0 +1,12 @@
import { Module } from '@nestjs/common';
import { GmailErrorHandlingService } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module';
import { MessagingTelemetryModule } from 'src/modules/messaging/services/telemetry/messaging-telemetry.module';
@Module({
imports: [SetMessageChannelSyncStatusModule, MessagingTelemetryModule],
providers: [GmailErrorHandlingService],
exports: [GmailErrorHandlingService],
})
export class GmailErrorHandlingModule {}

View File

@ -0,0 +1,192 @@
import { Injectable } from '@nestjs/common';
import snakeCase from 'lodash.snakecase';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service';
import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
type SyncStep =
| 'partial-message-list-fetch'
| 'full-message-list-fetch'
| 'messages-import';
export type GmailError = {
code: number;
reason: string;
};
@Injectable()
export class GmailErrorHandlingService {
constructor(
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
private readonly messagingTelemetryService: MessagingTelemetryService,
) {}
public async handleGmailError(
error: GmailError,
syncStep: SyncStep,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
const { code, reason } = error;
switch (code) {
case 400:
if (reason === 'invalid_grant') {
await this.handleInsufficientPermissions(
error,
syncStep,
messageChannel,
workspaceId,
);
}
break;
case 404:
await this.handleNotFound(error, syncStep, messageChannel, workspaceId);
break;
case 429:
await this.handleRateLimitExceeded(
error,
syncStep,
messageChannel,
workspaceId,
);
break;
case 403:
if (
reason === 'rateLimitExceeded' ||
reason === 'userRateLimitExceeded'
) {
await this.handleRateLimitExceeded(
error,
syncStep,
messageChannel,
workspaceId,
);
} else {
await this.handleInsufficientPermissions(
error,
syncStep,
messageChannel,
workspaceId,
);
}
break;
case 401:
await this.handleInsufficientPermissions(
error,
syncStep,
messageChannel,
workspaceId,
);
break;
default:
await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
messageChannel.id,
workspaceId,
);
break;
}
}
public async handleRateLimitExceeded(
error: GmailError,
syncStep: SyncStep,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
await this.messagingTelemetryService.track({
eventName: `${snakeCase(syncStep)}.error.rate_limit_exceeded`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `${error.code}: ${error.reason}`,
});
switch (syncStep) {
case 'full-message-list-fetch':
await this.messageChannelSyncStatusService.scheduleFullMessageListFetch(
messageChannel.id,
workspaceId,
);
break;
case 'partial-message-list-fetch':
await this.messageChannelSyncStatusService.schedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
break;
case 'messages-import':
await this.messageChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
break;
default:
break;
}
}
public async handleInsufficientPermissions(
error: GmailError,
syncStep: SyncStep,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
await this.messagingTelemetryService.track({
eventName: `${snakeCase(syncStep)}.error.insufficient_permissions`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `${error.code}: ${error.reason}`,
});
await this.messageChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
messageChannel.id,
workspaceId,
);
await this.connectedAccountRepository.updateAuthFailedAt(
messageChannel.connectedAccountId,
workspaceId,
);
}
public async handleNotFound(
error: GmailError,
syncStep: SyncStep,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
if (syncStep === 'messages-import') {
return;
}
await this.messagingTelemetryService.track({
eventName: `${snakeCase(syncStep)}.error.not_found`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `404: ${error.reason}`,
});
await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch(
messageChannel.id,
workspaceId,
);
}
}

View File

@ -0,0 +1,208 @@
import { Injectable, Logger } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { gmail_v1 } from 'googleapis';
import { GaxiosResponse } from 'gaxios';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GMAIL_USERS_MESSAGES_LIST_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-messages-list-max-result.constant';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import {
GmailError,
GmailErrorHandlingService,
} from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service';
@Injectable()
export class GmailFullMessageListFetchV2Service {
private readonly logger = new Logger(GmailFullMessageListFetchV2Service.name);
constructor(
private readonly gmailClientProvider: GmailClientProvider,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
private readonly gmailErrorHandlingService: GmailErrorHandlingService,
) {}
public async processMessageListFetch(
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>,
workspaceId: string,
) {
await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing(
messageChannel.id,
workspaceId,
);
const gmailClient: gmail_v1.Gmail =
await this.gmailClientProvider.getGmailClient(
connectedAccount.refreshToken,
);
const { error: gmailError } =
await this.fetchAllMessageIdsFromGmailAndStoreInCache(
gmailClient,
messageChannel.id,
workspaceId,
);
if (gmailError) {
await this.gmailErrorHandlingService.handleGmailError(
gmailError,
'full-message-list-fetch',
messageChannel,
workspaceId,
);
return;
}
await this.messageChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
}
private async fetchAllMessageIdsFromGmailAndStoreInCache(
gmailClient: gmail_v1.Gmail,
messageChannelId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<{ error?: GmailError }> {
let pageToken: string | undefined;
let fetchedMessageIdsCount = 0;
let hasMoreMessages = true;
let firstMessageExternalId: string | undefined;
let response: GaxiosResponse<gmail_v1.Schema$ListMessagesResponse>;
while (hasMoreMessages) {
try {
response = await gmailClient.users.messages.list({
userId: 'me',
maxResults: GMAIL_USERS_MESSAGES_LIST_MAX_RESULT,
pageToken,
});
} catch (error) {
return {
error: {
code: error.response?.status,
reason: error.response?.data?.error,
},
};
}
if (response.data?.messages) {
const messageExternalIds = response.data.messages
.filter((message): message is { id: string } => message.id != null)
.map((message) => message.id);
if (!firstMessageExternalId) {
firstMessageExternalId = messageExternalIds[0];
}
const existingMessageChannelMessageAssociations =
await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId(
messageExternalIds,
messageChannelId,
workspaceId,
transactionManager,
);
const existingMessageChannelMessageAssociationsExternalIds =
existingMessageChannelMessageAssociations.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageExternalId,
);
const messageIdsToImport = messageExternalIds.filter(
(messageExternalId) =>
!existingMessageChannelMessageAssociationsExternalIds.includes(
messageExternalId,
),
);
if (messageIdsToImport.length) {
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
messageIdsToImport,
);
}
fetchedMessageIdsCount += messageExternalIds.length;
}
pageToken = response.data.nextPageToken ?? undefined;
hasMoreMessages = !!pageToken;
}
this.logger.log(
`Added ${fetchedMessageIdsCount} messages ids from Gmail for messageChannel ${messageChannelId} in workspace ${workspaceId} and added to cache for import`,
);
if (!firstMessageExternalId) {
throw new Error(
`No first message found for workspace ${workspaceId} and account ${messageChannelId}, can't update sync external id`,
);
}
await this.updateLastSyncCursor(
gmailClient,
messageChannelId,
firstMessageExternalId,
workspaceId,
transactionManager,
);
return {};
}
private async updateLastSyncCursor(
gmailClient: gmail_v1.Gmail,
messageChannelId: string,
firstMessageExternalId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const firstMessageContent = await gmailClient.users.messages.get({
userId: 'me',
id: firstMessageExternalId,
});
if (!firstMessageContent?.data) {
throw new Error(
`No first message content found for message ${firstMessageExternalId} in workspace ${workspaceId}`,
);
}
const historyId = firstMessageContent?.data?.historyId;
if (!historyId) {
throw new Error(
`No historyId found for message ${firstMessageExternalId} in workspace ${workspaceId}`,
);
}
await this.messageChannelRepository.updateLastSyncCursorIfHigher(
messageChannelId,
historyId,
workspaceId,
transactionManager,
);
}
}

View File

@ -7,7 +7,10 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
import { GmailErrorHandlingModule } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module';
import { GmailFullMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch-v2.service';
import { GmailFullMessageListFetchService } from 'src/modules/messaging/services/gmail-full-message-list-fetch/gmail-full-message-list-fetch.service';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module';
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@ -16,6 +19,7 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
imports: [
MessagingProvidersModule,
FetchMessagesByBatchesModule,
GmailErrorHandlingModule,
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
MessageChannelWorkspaceEntity,
@ -24,8 +28,15 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
]),
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
WorkspaceDataSourceModule,
SetMessageChannelSyncStatusModule,
],
providers: [
GmailFullMessageListFetchService,
GmailFullMessageListFetchV2Service,
],
exports: [
GmailFullMessageListFetchService,
GmailFullMessageListFetchV2Service,
],
providers: [GmailFullMessageListFetchService],
exports: [GmailFullMessageListFetchService],
})
export class GmailFullMessageListFetchModule {}

View File

@ -11,11 +11,13 @@ import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decora
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant';
import { GMAIL_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant';
import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { SaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service';
import { GmailErrorHandlingService } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service';
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
@Injectable()
export class GmailMessagesImportV2Service {
@ -25,8 +27,11 @@ export class GmailMessagesImportV2Service {
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
private readonly setMessageChannelSyncStatusService: SetMessageChannelSyncStatusService,
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
private readonly saveMessagesAndEnqueueContactCreationService: SaveMessagesAndEnqueueContactCreationService,
private readonly gmailErrorHandlingService: GmailErrorHandlingService,
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
private readonly messagingTelemetryService: MessagingTelemetryService,
) {}
async processMessageBatchImport(
@ -34,28 +39,32 @@ export class GmailMessagesImportV2Service {
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>,
workspaceId: string,
) {
if (messageChannel.syncSubStatus === MessageChannelSyncSubStatus.FAILED) {
throw new Error(
`Connected account ${connectedAccount.id} in workspace ${workspaceId} is in a failed state. Skipping...`,
);
}
if (
messageChannel.syncSubStatus !==
MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING
) {
throw new Error(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} is not pending.`,
);
return;
}
await this.setMessageChannelSyncStatusService.setMessagesImportOnGoingStatus(
await this.messagingTelemetryService.track({
eventName: 'messages_import.started',
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
});
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`,
);
await this.messageChannelSyncStatusService.markAsMessagesImportOngoing(
messageChannel.id,
workspaceId,
);
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`,
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
workspaceId,
connectedAccount.id,
);
const messageIdsToFetch =
@ -65,16 +74,15 @@ export class GmailMessagesImportV2Service {
)) ?? [];
if (!messageIdsToFetch?.length) {
await this.setMessageChannelSyncStatusService.setCompletedStatus(
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with nothing to import or delete.`,
return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
);
return;
}
const messageQueries = createQueriesFromMessageIds(messageIdsToFetch);
@ -89,12 +97,15 @@ export class GmailMessagesImportV2Service {
);
if (!messagesToSave.length) {
await this.setMessageChannelSyncStatusService.setCompletedStatus(
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
return [];
return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
);
}
await this.saveMessagesAndEnqueueContactCreationService.saveMessagesAndEnqueueContactCreationJob(
@ -105,42 +116,53 @@ export class GmailMessagesImportV2Service {
);
if (messageIdsToFetch.length < GMAIL_USERS_MESSAGES_GET_BATCH_SIZE) {
await this.setMessageChannelSyncStatusService.setCompletedStatus(
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with no more messages to import.`,
);
} else {
await this.setMessageChannelSyncStatusService.setMessagesImportPendingStatus(
await this.messageChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} done with more messages to import.`,
);
}
return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
);
} catch (error) {
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
messageIdsToFetch,
);
await this.setMessageChannelSyncStatusService.setFailedUnkownStatus(
messageChannel.id,
await this.gmailErrorHandlingService.handleGmailError(
{
code: error.code,
reason: error.errors?.[0]?.reason,
},
'messages-import',
messageChannel,
workspaceId,
);
this.logger.error(
`Error fetching messages for ${connectedAccount.id} in workspace ${workspaceId}: locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`,
);
throw new Error(
`Error fetching messages for ${connectedAccount.id} in workspace ${workspaceId}: ${error.message}`,
return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
);
}
}
private async trackMessageImportCompleted(
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
workspaceId: string,
) {
await this.messagingTelemetryService.track({
eventName: 'messages_import.completed',
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
});
}
}

View File

@ -4,14 +4,17 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.module';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
import { GmailErrorHandlingModule } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module';
import { GmailMessagesImportV2Service } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import-v2.service';
import { GmailMessagesImportService } from 'src/modules/messaging/services/gmail-messages-import/gmail-messages-import.service';
import { SaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/services/gmail-messages-import/save-messages-and-enqueue-contact-creation.service';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module';
import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module';
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module';
import { MessagingTelemetryModule } from 'src/modules/messaging/services/telemetry/messaging-telemetry.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
@ -27,6 +30,9 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
MessageParticipantModule,
SetMessageChannelSyncStatusModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
GmailErrorHandlingModule,
GoogleAPIRefreshAccessTokenModule,
MessagingTelemetryModule,
],
providers: [
GmailMessagesImportService,

View File

@ -1,9 +1,10 @@
import { Injectable } from '@nestjs/common';
import { gmail_v1 } from 'googleapis';
import { GaxiosResponse } from 'gaxios';
import { GMAIL_USERS_HISTORY_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-history-max-result.constant';
import { GmailError } from 'src/modules/messaging/types/gmail-error';
import { GmailError } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
@Injectable()
export class GmailGetHistoryService {
@ -21,38 +22,36 @@ export class GmailGetHistoryService {
let pageToken: string | undefined;
let hasMoreMessages = true;
let nextHistoryId: string | undefined;
let response: GaxiosResponse<gmail_v1.Schema$ListHistoryResponse>;
while (hasMoreMessages) {
try {
const response = await gmailClient.users.history.list({
response = await gmailClient.users.history.list({
userId: 'me',
maxResults: GMAIL_USERS_HISTORY_MAX_RESULT,
pageToken,
startHistoryId: lastSyncHistoryId,
historyTypes: ['messageAdded', 'messageDeleted'],
});
nextHistoryId = response?.data?.historyId ?? undefined;
if (response?.data?.history) {
fullHistory.push(...response.data.history);
}
pageToken = response?.data?.nextPageToken ?? undefined;
hasMoreMessages = !!pageToken;
} catch (error) {
const errorData = error?.response?.data?.error;
if (errorData) {
return {
history: [],
error: errorData,
historyId: lastSyncHistoryId,
};
}
throw error;
return {
history: [],
error: {
code: error.response?.status,
reason: error.response?.data?.error,
},
historyId: lastSyncHistoryId,
};
}
nextHistoryId = response?.data?.historyId ?? undefined;
if (response?.data?.history) {
fullHistory.push(...response.data.history);
}
pageToken = response?.data?.nextPageToken ?? undefined;
hasMoreMessages = !!pageToken;
}
return { history: fullHistory, historyId: nextHistoryId };

View File

@ -1,122 +0,0 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import {
MessageChannelSyncStatus,
MessageChannelSyncSubStatus,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GmailError } from 'src/modules/messaging/types/gmail-error';
@Injectable()
export class GmailPartialMessageListFetchErrorHandlingService {
private readonly logger = new Logger(
GmailPartialMessageListFetchErrorHandlingService.name,
);
constructor(
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}
public async handleGmailError(
error: GmailError | undefined,
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
connectedAccountId: string,
workspaceId: string,
): Promise<void> {
switch (error?.code) {
case 404:
this.logger.log(
`404: Invalid lastSyncHistoryId for workspace ${workspaceId} and account ${connectedAccountId}, falling back to full sync.`,
);
await this.messageChannelRepository.resetSyncCursor(
messageChannel.id,
workspaceId,
);
await this.messageChannelRepository.updateSyncSubStatus(
messageChannel.id,
MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
workspaceId,
);
break;
case 429:
this.logger.log(
`429: rate limit reached for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}, import will be retried later.`,
);
await this.handleRateLimitExceeded(messageChannel, workspaceId);
break;
case 403:
if (
error?.errors?.[0]?.reason === 'rateLimitExceeded' ||
error?.errors?.[0]?.reason === 'userRateLimitExceeded'
) {
this.logger.log(
`403:${
error?.errors?.[0]?.reason === 'userRateLimitExceeded' && ' user'
} rate limit exceeded for workspace ${workspaceId} and account ${connectedAccountId}: ${
error.message
}, import will be retried later.`,
);
this.handleRateLimitExceeded(messageChannel, workspaceId);
} else {
await this.handleInsufficientPermissions(
error,
messageChannel,
workspaceId,
);
}
break;
case 401:
this.handleInsufficientPermissions(error, messageChannel, workspaceId);
break;
default:
break;
}
}
public async handleRateLimitExceeded(
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannel.id,
MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
workspaceId,
);
}
public async handleInsufficientPermissions(
error: GmailError,
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
this.logger.error(
`{error?.code}: ${error.message} for workspace ${workspaceId} and account ${messageChannel.connectedAccount.id}`,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannel.id,
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
workspaceId,
);
await this.messageChannelRepository.updateSyncSubStatus(
messageChannel.id,
MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
workspaceId,
);
await this.connectedAccountRepository.updateAuthFailedAt(
messageChannel.connectedAccount.id,
workspaceId,
);
}
}

View File

@ -12,10 +12,10 @@ import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decora
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository';
import { GmailPartialMessageListFetchErrorHandlingService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service';
import { GmailGetHistoryService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service';
import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { GmailErrorHandlingService } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.service';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service';
@Injectable()
export class GmailPartialMessageListFetchV2Service {
@ -33,9 +33,9 @@ export class GmailPartialMessageListFetchV2Service {
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
private readonly gmailPartialMessageListFetchErrorHandlingService: GmailPartialMessageListFetchErrorHandlingService,
private readonly gmailErrorHandlingService: GmailErrorHandlingService,
private readonly gmailGetHistoryService: GmailGetHistoryService,
private readonly setMessageChannelSyncStatusService: SetMessageChannelSyncStatusService,
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
) {}
public async processMessageListFetch(
@ -43,30 +43,13 @@ export class GmailPartialMessageListFetchV2Service {
connectedAccount: ObjectRecord<ConnectedAccountWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
this.logger.log(
`Fetching partial message list for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.setMessageChannelSyncStatusService.setMessageListFetchOnGoingStatus(
await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing(
messageChannel.id,
workspaceId,
);
const lastSyncHistoryId = messageChannel.syncCursor;
if (!lastSyncHistoryId) {
this.logger.log(
`No lastSyncHistoryId for workspace ${workspaceId} and account ${connectedAccount.id}, falling back to full sync.`,
);
await this.setMessageChannelSyncStatusService.setFullMessageListFetchPendingStatus(
messageChannel.id,
workspaceId,
);
return;
}
const gmailClient: gmail_v1.Gmail =
await this.gmailClientProvider.getGmailClient(
connectedAccount.refreshToken,
@ -79,11 +62,11 @@ export class GmailPartialMessageListFetchV2Service {
);
if (error) {
await this.gmailPartialMessageListFetchErrorHandlingService.handleGmailError(
await this.gmailErrorHandlingService.handleGmailError(
error,
'partial-message-list-fetch',
messageChannel,
workspaceId,
connectedAccount.id,
);
return;
@ -100,7 +83,7 @@ export class GmailPartialMessageListFetchV2Service {
`Partial message list import done with history ${historyId} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.setMessageChannelSyncStatusService.setCompletedStatus(
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
@ -136,15 +119,7 @@ export class GmailPartialMessageListFetchV2Service {
workspaceId,
);
this.logger.log(
`Updated lastSyncCursor to ${historyId} for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
this.logger.log(
`Partial message list import done with history ${historyId} for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.setMessageChannelSyncStatusService.setMessagesImportPendingStatus(
await this.messageChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);

View File

@ -7,13 +7,13 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
import { BlocklistWorkspaceEntity } from 'src/modules/connected-account/standard-objects/blocklist.workspace-entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
import { GmailErrorHandlingModule } from 'src/modules/messaging/services/gmail-error-handling/gmail-error-handling.module';
import { GmailGetHistoryService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-get-history.service';
import { GmailPartialMessageListFetchErrorHandlingService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-error-handling.service';
import { GmailPartialMessageListFetchV2Service } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch-v2.service';
import { GmailPartialMessageListFetchService } from 'src/modules/messaging/services/gmail-partial-message-list-fetch/gmail-partial-message-list-fetch.service';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.module';
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
import { SetMessageChannelSyncStatusModule } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.module';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
@ -29,11 +29,11 @@ import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-ob
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
WorkspaceDataSourceModule,
SetMessageChannelSyncStatusModule,
GmailErrorHandlingModule,
],
providers: [
GmailPartialMessageListFetchService,
GmailPartialMessageListFetchV2Service,
GmailPartialMessageListFetchErrorHandlingService,
GmailGetHistoryService,
],
exports: [

View File

@ -15,7 +15,6 @@ import {
MessageChannelSyncStatus,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
import { GMAIL_USERS_HISTORY_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-history-max-result.constant';
import { GmailError } from 'src/modules/messaging/types/gmail-error';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
@ -293,7 +292,7 @@ export class GmailPartialMessageListFetchService {
): Promise<{
history: gmail_v1.Schema$History[];
historyId?: string | null;
error?: GmailError;
error?: any;
}> {
const fullHistory: gmail_v1.Schema$History[] = [];
let pageToken: string | undefined;

View File

@ -1,14 +1,14 @@
import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { SetMessageChannelSyncStatusService } from 'src/modules/messaging/services/set-message-channel-sync-status/set-message-channel-sync-status.service';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/services/message-channel-sync-status/message-channel-sync-status.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([MessageChannelWorkspaceEntity]),
],
providers: [SetMessageChannelSyncStatusService],
exports: [SetMessageChannelSyncStatusService],
providers: [MessageChannelSyncStatusService],
exports: [MessageChannelSyncStatusService],
})
export class SetMessageChannelSyncStatusModule {}

View File

@ -0,0 +1,156 @@
import { Injectable } from '@nestjs/common';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncSubStatus,
MessageChannelSyncStatus,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Injectable()
export class MessageChannelSyncStatusService {
constructor(
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
) {}
public async scheduleFullMessageListFetch(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
workspaceId,
);
}
public async schedulePartialMessageListFetch(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
workspaceId,
);
}
public async scheduleMessagesImport(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING,
workspaceId,
);
}
public async resetAndScheduleFullMessageListFetch(
messageChannelId: string,
workspaceId: string,
) {
await this.cacheStorage.setPop(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);
// TODO: remove nextPageToken from cache
await this.messageChannelRepository.resetSyncCursor(
messageChannelId,
workspaceId,
);
await this.scheduleFullMessageListFetch(messageChannelId, workspaceId);
}
public async markAsMessagesListFetchOngoing(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGE_LIST_FETCH_ONGOING,
workspaceId,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.ONGOING,
workspaceId,
);
}
public async markAsCompletedAndSchedulePartialMessageListFetch(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.COMPLETED,
workspaceId,
);
await this.schedulePartialMessageListFetch(messageChannelId, workspaceId);
}
public async markAsMessagesImportOngoing(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING,
workspaceId,
);
}
public async markAsFailedUnknownAndFlushMessagesToImport(
messageChannelId: string,
workspaceId: string,
) {
await this.cacheStorage.setPop(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.FAILED,
workspaceId,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.FAILED_UNKNOWN,
workspaceId,
);
}
public async markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
messageChannelId: string,
workspaceId: string,
) {
await this.cacheStorage.setPop(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.FAILED,
workspaceId,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
workspaceId,
);
}
}

View File

@ -1,101 +0,0 @@
import { Injectable } from '@nestjs/common';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncSubStatus,
MessageChannelSyncStatus,
} from 'src/modules/messaging/standard-objects/message-channel.workspace-entity';
@Injectable()
export class SetMessageChannelSyncStatusService {
constructor(
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}
public async setMessageListFetchOnGoingStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_LIST_FETCH_ONGOING,
workspaceId,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.ONGOING,
workspaceId,
);
}
public async setFullMessageListFetchPendingStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
workspaceId,
);
}
public async setCompletedStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
workspaceId,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.COMPLETED,
workspaceId,
);
}
public async setMessagesImportPendingStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING,
workspaceId,
);
}
public async setMessagesImportOnGoingStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING,
workspaceId,
);
}
public async setFailedUnkownStatus(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
messageChannelId,
MessageChannelSyncSubStatus.FAILED,
workspaceId,
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.FAILED_UNKNOWN,
workspaceId,
);
}
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { AnalyticsModule } from 'src/engine/core-modules/analytics/analytics.module';
import { MessagingTelemetryService } from 'src/modules/messaging/services/telemetry/messaging-telemetry.service';
@Module({
imports: [AnalyticsModule],
providers: [MessagingTelemetryService],
exports: [MessagingTelemetryService],
})
export class MessagingTelemetryModule {}

View File

@ -0,0 +1,45 @@
import { Injectable } from '@nestjs/common';
import { AnalyticsService } from 'src/engine/core-modules/analytics/analytics.service';
type MessagingTelemetryTrackInput = {
eventName: string;
workspaceId: string;
userId?: string;
connectedAccountId?: string;
messageChannelId?: string;
message?: string;
};
@Injectable()
export class MessagingTelemetryService {
constructor(private readonly analyticsService: AnalyticsService) {}
public async track({
eventName,
workspaceId,
userId,
connectedAccountId,
messageChannelId,
message,
}: MessagingTelemetryTrackInput): Promise<void> {
await this.analyticsService.create(
{
type: 'track',
data: {
eventName: `messaging.${eventName}`,
workspaceId,
userId,
connectedAccountId,
messageChannelId,
message,
},
},
userId,
workspaceId,
'', // voluntarely not retrieving this
'', // to avoid slowing down
'',
);
}
}

View File

@ -32,9 +32,9 @@ export enum MessageChannelSyncStatus {
}
export enum MessageChannelSyncSubStatus {
FULL_MESSAGES_LIST_FETCH_PENDING = 'FULL_MESSAGES_LIST_FETCH_PENDING',
PARTIAL_MESSAGES_LIST_FETCH_PENDING = 'PARTIAL_MESSAGES_LIST_FETCH_PENDING',
MESSAGES_LIST_FETCH_ONGOING = 'MESSAGES_LIST_FETCH_ONGOING',
FULL_MESSAGE_LIST_FETCH_PENDING = 'FULL_MESSAGE_LIST_FETCH_PENDING',
PARTIAL_MESSAGE_LIST_FETCH_PENDING = 'PARTIAL_MESSAGE_LIST_FETCH_PENDING',
MESSAGE_LIST_FETCH_ONGOING = 'MESSAGE_LIST_FETCH_ONGOING',
MESSAGES_IMPORT_PENDING = 'MESSAGES_IMPORT_PENDING',
MESSAGES_IMPORT_ONGOING = 'MESSAGES_IMPORT_ONGOING',
FAILED = 'FAILED',
@ -234,19 +234,19 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity {
icon: 'IconStatusChange',
options: [
{
value: MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING,
value: MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
label: 'Full messages list fetch pending',
position: 0,
color: 'blue',
},
{
value: MessageChannelSyncSubStatus.PARTIAL_MESSAGES_LIST_FETCH_PENDING,
value: MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
label: 'Partial messages list fetch pending',
position: 1,
color: 'blue',
},
{
value: MessageChannelSyncSubStatus.MESSAGES_LIST_FETCH_ONGOING,
value: MessageChannelSyncSubStatus.MESSAGE_LIST_FETCH_ONGOING,
label: 'Messages list fetch ongoing',
position: 2,
color: 'orange',
@ -270,7 +270,7 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity {
color: 'red',
},
],
defaultValue: `'${MessageChannelSyncSubStatus.FULL_MESSAGES_LIST_FETCH_PENDING}'`,
defaultValue: `'${MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING}'`,
})
syncSubStatus: MessageChannelSyncSubStatus;

View File

@ -1,11 +0,0 @@
export type GmailError = {
code: number;
errors: {
domain: string;
reason: string;
message: string;
locationType?: string;
location?: string;
}[];
message: string;
};