Fix performance issue mail (#5780)

In this PR, I'm mainly doing two things:
- uniformizing messaging-messages-import and
messaging-message-list-fetch behaviors (cron.job and job)
- improving performances of these cron.jobs by not triggering the jobs
if the stage is not relevant
- making sure these jobs have same signature (workspaceId +
messageChannelId)
This commit is contained in:
Charles Bochet
2024-06-08 11:07:36 +02:00
committed by GitHub
parent 520a883c73
commit 7f9fdf3ff6
6 changed files with 158 additions and 109 deletions

View File

@ -28,6 +28,7 @@ import {
MessageChannelWorkspaceEntity,
MessageChannelType,
MessageChannelVisibility,
MessageChannelSyncStatus,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessagingMessageListFetchJob,
@ -114,6 +115,7 @@ export class GoogleAPIsService {
handle,
visibility:
messageVisibility || MessageChannelVisibility.SHARE_EVERYTHING,
syncStatus: MessageChannelSyncStatus.ONGOING,
},
workspaceId,
manager,
@ -150,26 +152,22 @@ export class GoogleAPIsService {
}
});
await this.enqueueSyncJobs(
newOrExistingConnectedAccountId,
workspaceId,
isCalendarEnabled,
);
}
private async enqueueSyncJobs(
connectedAccountId: string,
workspaceId: string,
isCalendarEnabled: boolean,
) {
if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) {
await this.messageQueueService.add<MessagingMessageListFetchJobData>(
MessagingMessageListFetchJob.name,
{
const messageChannels =
await this.messageChannelRepository.getByConnectedAccountId(
newOrExistingConnectedAccountId,
workspaceId,
connectedAccountId,
},
);
);
for (const messageChannel of messageChannels) {
await this.messageQueueService.add<MessagingMessageListFetchJobData>(
MessagingMessageListFetchJob.name,
{
workspaceId,
messageChannelId: messageChannel.id,
},
);
}
}
if (
@ -180,7 +178,7 @@ export class GoogleAPIsService {
GoogleCalendarSyncJob.name,
{
workspaceId,
connectedAccountId,
connectedAccountId: newOrExistingConnectedAccountId,
},
{
retryLimit: 2,

View File

@ -19,7 +19,12 @@ export class MessageChannelRepository {
public async create(
messageChannel: Pick<
ObjectRecord<MessageChannelWorkspaceEntity>,
'id' | 'connectedAccountId' | 'type' | 'handle' | 'visibility'
| 'id'
| 'connectedAccountId'
| 'type'
| 'handle'
| 'visibility'
| 'syncStatus'
>,
workspaceId: string,
transactionManager?: EntityManager,
@ -28,14 +33,15 @@ export class MessageChannelRepository {
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."messageChannel" ("id", "connectedAccountId", "type", "handle", "visibility")
VALUES ($1, $2, $3, $4, $5)`,
`INSERT INTO ${dataSourceSchema}."messageChannel" ("id", "connectedAccountId", "type", "handle", "visibility", "syncStatus")
VALUES ($1, $2, $3, $4, $5, $6)`,
[
messageChannel.id,
messageChannel.connectedAccountId,
messageChannel.type,
messageChannel.handle,
messageChannel.visibility,
messageChannel.syncStatus,
],
workspaceId,
transactionManager,
@ -138,6 +144,25 @@ export class MessageChannelRepository {
);
}
public async getById(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelWorkspaceEntity>> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messageChannels =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
return messageChannels[0];
}
public async getIdsByWorkspaceMemberId(
workspaceMemberId: string,
workspaceId: string,

View File

@ -12,7 +12,10 @@ import { MessageQueueService } from 'src/engine/integrations/message-queue/servi
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessagingMessageListFetchJobData,
MessagingMessageListFetchJob,
@ -59,35 +62,26 @@ export class MessagingMessageListFetchCronJob
);
for (const workspaceId of workspaceIdsWithDataSources) {
await this.enqueueSyncs(workspaceId);
}
}
private async enqueueSyncs(workspaceId: string): Promise<void> {
try {
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);
for (const messageChannel of messageChannels) {
if (!messageChannel?.isSyncEnabled) {
continue;
if (
(messageChannel.isSyncEnabled &&
messageChannel.syncStage ===
MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING) ||
messageChannel.syncStage ===
MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING
) {
await this.messageQueueService.add<MessagingMessageListFetchJobData>(
MessagingMessageListFetchJob.name,
{
workspaceId,
messageChannelId: messageChannel.id,
},
);
}
await this.messageQueueService.add<MessagingMessageListFetchJobData>(
MessagingMessageListFetchJob.name,
{
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
},
);
}
} catch (error) {
this.logger.error(
`Error while fetching workspace messages for workspace ${workspaceId}`,
);
this.logger.error(error);
return;
}
}
}

View File

@ -1,4 +1,4 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Inject, Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, In } from 'typeorm';
@ -14,13 +14,17 @@ import {
MessagingMessagesImportJobData,
MessagingMessagesImportJob,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
@Injectable()
export class MessagingMessagesImportCronJob
implements MessageQueueJob<undefined>
{
private readonly logger = new Logger(MessagingMessagesImportCronJob.name);
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@ -29,6 +33,8 @@ export class MessagingMessagesImportCronJob
private readonly environmentService: EnvironmentService,
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}
async handle(): Promise<void> {
@ -54,12 +60,24 @@ export class MessagingMessagesImportCronJob
);
for (const workspaceId of workspaceIdsWithDataSources) {
await this.messageQueueService.add<MessagingMessagesImportJobData>(
MessagingMessagesImportJob.name,
{
workspaceId,
},
);
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);
for (const messageChannel of messageChannels) {
if (
messageChannel.isSyncEnabled &&
messageChannel.syncStage ===
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING
) {
await this.messageQueueService.add<MessagingMessagesImportJobData>(
MessagingMessagesImportJob.name,
{
workspaceId,
messageChannelId: messageChannel.id,
},
);
}
}
}
}
}

View File

@ -16,8 +16,8 @@ import { MessagingGmailPartialMessageListFetchService } from 'src/modules/messag
import { isThrottled } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled';
export type MessagingMessageListFetchJobData = {
messageChannelId: string;
workspaceId: string;
connectedAccountId: string;
};
@Injectable()
@ -37,42 +37,36 @@ export class MessagingMessageListFetchJob
) {}
async handle(data: MessagingMessageListFetchJobData): Promise<void> {
const { workspaceId, connectedAccountId } = data;
const { messageChannelId, workspaceId } = data;
await this.messagingTelemetryService.track({
eventName: 'message_list_fetch_job.triggered',
messageChannelId,
workspaceId,
connectedAccountId,
});
const connectedAccount = await this.connectedAccountRepository.getById(
connectedAccountId,
const messageChannel = await this.messageChannelRepository.getById(
messageChannelId,
workspaceId,
);
if (!connectedAccount) {
if (!messageChannel) {
await this.messagingTelemetryService.track({
eventName: 'message_list_fetch_job.error.connected_account_not_found',
eventName: 'message_list_fetch_job.error.message_channel_not_found',
messageChannelId,
workspaceId,
connectedAccountId,
});
return;
}
const messageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
const connectedAccount =
await this.connectedAccountRepository.getByIdOrFail(
messageChannel.connectedAccountId,
workspaceId,
);
if (!messageChannel) {
await this.messagingTelemetryService.track({
eventName: 'message_list_fetch_job.error.message_channel_not_found',
workspaceId,
connectedAccountId,
});
if (!messageChannel?.isSyncEnabled) {
return;
}
@ -88,13 +82,13 @@ export class MessagingMessageListFetchJob
switch (messageChannel.syncStage) {
case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
`Fetching partial message list for workspace ${workspaceId} and account ${connectedAccount.id}`,
`Fetching partial message list for workspace ${workspaceId} and messageChannelId ${messageChannel.id}`,
);
await this.messagingTelemetryService.track({
eventName: 'partial_message_list_fetch.started',
workspaceId,
connectedAccountId,
connectedAccountId: connectedAccount.id,
messageChannelId: messageChannel.id,
});
@ -107,7 +101,7 @@ export class MessagingMessageListFetchJob
await this.messagingTelemetryService.track({
eventName: 'partial_message_list_fetch.completed',
workspaceId,
connectedAccountId,
connectedAccountId: connectedAccount.id,
messageChannelId: messageChannel.id,
});
@ -121,7 +115,7 @@ export class MessagingMessageListFetchJob
await this.messagingTelemetryService.track({
eventName: 'full_message_list_fetch.started',
workspaceId,
connectedAccountId,
connectedAccountId: connectedAccount.id,
messageChannelId: messageChannel.id,
});
@ -134,7 +128,7 @@ export class MessagingMessageListFetchJob
await this.messagingTelemetryService.track({
eventName: 'full_message_list_fetch.completed',
workspaceId,
connectedAccountId,
connectedAccountId: connectedAccount.id,
messageChannelId: messageChannel.id,
});

View File

@ -7,11 +7,15 @@ import { ConnectedAccountRepository } from 'src/modules/connected-account/reposi
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingGmailMessagesImportService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service';
import { isThrottled } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled';
export type MessagingMessagesImportJobData = {
messageChannelId: string;
workspaceId: string;
};
@ -29,43 +33,59 @@ export class MessagingMessagesImportJob
) {}
async handle(data: MessagingMessagesImportJobData): Promise<void> {
const { workspaceId } = data;
const { messageChannelId, workspaceId } = data;
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);
await this.messagingTelemetryService.track({
eventName: 'messages_import.triggered',
workspaceId,
messageChannelId,
});
for (const messageChannel of messageChannels) {
if (!messageChannel?.isSyncEnabled) {
continue;
}
const messageChannel = await this.messageChannelRepository.getById(
messageChannelId,
workspaceId,
);
if (!messageChannel) {
await this.messagingTelemetryService.track({
eventName: 'messages_import.triggered',
eventName: 'messages_import.error.message_channel_not_found',
messageChannelId,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
});
if (
isThrottled(
messageChannel.syncStageStartedAt,
messageChannel.throttleFailureCount,
)
) {
continue;
}
const connectedAccount =
await this.connectedAccountRepository.getConnectedAccountOrThrow(
workspaceId,
messageChannel.connectedAccountId,
);
await this.gmailFetchMessageContentFromCacheService.processMessageBatchImport(
messageChannel,
connectedAccount,
workspaceId,
);
return;
}
const connectedAccount =
await this.connectedAccountRepository.getConnectedAccountOrThrow(
workspaceId,
messageChannel.connectedAccountId,
);
if (!messageChannel?.isSyncEnabled) {
return;
}
if (
isThrottled(
messageChannel.syncStageStartedAt,
messageChannel.throttleFailureCount,
)
) {
return;
}
if (
messageChannel.syncStage !==
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING
) {
return;
}
await this.gmailFetchMessageContentFromCacheService.processMessageBatchImport(
messageChannel,
connectedAccount,
workspaceId,
);
}
}