Fix onboarding status performance issues (#6512)

Updated the onboardingStatus computation to improve performances

---------

Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
martmull
2024-08-04 00:33:33 +02:00
committed by GitHub
parent e01d3fd0be
commit 7cd5427589
40 changed files with 757 additions and 767 deletions

View File

@ -1,9 +1,15 @@
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, In } from 'typeorm';
import { Repository } from 'typeorm';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import {
Workspace,
WorkspaceActivationStatus,
} from 'src/engine/core-modules/workspace/workspace.entity';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
@ -13,46 +19,35 @@ import {
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessagingMessageListFetchJobData,
MessagingMessageListFetchJob,
MessagingMessageListFetchJobData,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { BillingService } from 'src/engine/core-modules/billing/billing.service';
@Processor(MessageQueue.cronQueue)
export class MessagingMessageListFetchCronJob {
private readonly logger = new Logger(MessagingMessageListFetchCronJob.name);
constructor(
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly billingService: BillingService,
) {}
@Process(MessagingMessageListFetchCronJob.name)
async handle(): Promise<void> {
const workspaceIds =
await this.billingService.getActiveSubscriptionWorkspaceIds();
const dataSources = await this.dataSourceRepository.find({
const activeWorkspaces = await this.workspaceRepository.find({
where: {
workspaceId: In(workspaceIds),
activationStatus: WorkspaceActivationStatus.ACTIVE,
},
});
const workspaceIdsWithDataSources = new Set(
dataSources.map((dataSource) => dataSource.workspaceId),
);
for (const workspaceId of workspaceIdsWithDataSources) {
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);
for (const activeWorkspace of activeWorkspaces) {
const messageChannels = await this.messageChannelRepository.getAll(
activeWorkspace.id,
);
for (const messageChannel of messageChannels) {
if (
@ -65,7 +60,7 @@ export class MessagingMessageListFetchCronJob {
await this.messageQueueService.add<MessagingMessageListFetchJobData>(
MessagingMessageListFetchJob.name,
{
workspaceId,
workspaceId: activeWorkspace.id,
messageChannelId: messageChannel.id,
},
);

View File

@ -1,58 +1,53 @@
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, In } from 'typeorm';
import { Repository } from 'typeorm';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import {
Workspace,
WorkspaceActivationStatus,
} from 'src/engine/core-modules/workspace/workspace.entity';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
MessagingMessagesImportJobData,
MessagingMessagesImportJob,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { BillingService } from 'src/engine/core-modules/billing/billing.service';
import {
MessagingMessagesImportJob,
MessagingMessagesImportJobData,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job';
@Processor(MessageQueue.cronQueue)
export class MessagingMessagesImportCronJob {
private readonly logger = new Logger(MessagingMessagesImportCronJob.name);
constructor(
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly billingService: BillingService,
) {}
@Process(MessagingMessagesImportCronJob.name)
async handle(): Promise<void> {
const workspaceIds =
await this.billingService.getActiveSubscriptionWorkspaceIds();
const dataSources = await this.dataSourceRepository.find({
const activeWorkspaces = await this.workspaceRepository.find({
where: {
workspaceId: In(workspaceIds),
activationStatus: WorkspaceActivationStatus.ACTIVE,
},
});
const workspaceIdsWithDataSources = new Set(
dataSources.map((dataSource) => dataSource.workspaceId),
);
for (const workspaceId of workspaceIdsWithDataSources) {
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);
for (const activeWorkspace of activeWorkspaces) {
const messageChannels = await this.messageChannelRepository.getAll(
activeWorkspace.id,
);
for (const messageChannel of messageChannels) {
if (
@ -63,7 +58,7 @@ export class MessagingMessagesImportCronJob {
await this.messageQueueService.add<MessagingMessagesImportJobData>(
MessagingMessagesImportJob.name,
{
workspaceId,
workspaceId: activeWorkspace.id,
messageChannelId: messageChannel.id,
},
);

View File

@ -1,49 +1,43 @@
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, In } from 'typeorm';
import { Repository } from 'typeorm';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import {
Workspace,
WorkspaceActivationStatus,
} from 'src/engine/core-modules/workspace/workspace.entity';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
MessagingOngoingStaleJobData,
MessagingOngoingStaleJob,
MessagingOngoingStaleJobData,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';
import { BillingService } from 'src/engine/core-modules/billing/billing.service';
@Processor(MessageQueue.cronQueue)
export class MessagingOngoingStaleCronJob {
constructor(
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly billingService: BillingService,
) {}
@Process(MessagingOngoingStaleCronJob.name)
async handle(): Promise<void> {
const workspaceIds =
await this.billingService.getActiveSubscriptionWorkspaceIds();
const dataSources = await this.dataSourceRepository.find({
const activeWorkspaces = await this.workspaceRepository.find({
where: {
workspaceId: In(workspaceIds),
activationStatus: WorkspaceActivationStatus.ACTIVE,
},
});
const workspaceIdsWithDataSources = new Set(
dataSources.map((dataSource) => dataSource.workspaceId),
);
for (const workspaceId of workspaceIdsWithDataSources) {
for (const activeWorkspace of activeWorkspaces) {
await this.messageQueueService.add<MessagingOngoingStaleJobData>(
MessagingOngoingStaleJob.name,
{
workspaceId,
workspaceId: activeWorkspace.id,
},
);
}

View File

@ -2,14 +2,15 @@ import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import snakeCase from 'lodash.snakecase';
import { In, Repository } from 'typeorm';
import { Repository } from 'typeorm';
import { BillingService } from 'src/engine/core-modules/billing/billing.service';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import {
Workspace,
WorkspaceActivationStatus,
} from 'src/engine/core-modules/workspace/workspace.entity';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
@ -24,11 +25,8 @@ export class MessagingMessageChannelSyncStatusMonitoringCronJob {
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly billingService: BillingService,
private readonly messagingTelemetryService: MessagingTelemetryService,
) {}
@ -41,22 +39,16 @@ export class MessagingMessageChannelSyncStatusMonitoringCronJob {
message: 'Starting message channel sync status monitoring',
});
const workspaceIds =
await this.billingService.getActiveSubscriptionWorkspaceIds();
const dataSources = await this.dataSourceRepository.find({
const activeWorkspaces = await this.workspaceRepository.find({
where: {
workspaceId: In(workspaceIds),
activationStatus: WorkspaceActivationStatus.ACTIVE,
},
});
const workspaceIdsWithDataSources = new Set(
dataSources.map((dataSource) => dataSource.workspaceId),
);
for (const workspaceId of workspaceIdsWithDataSources) {
const messageChannels =
await this.messageChannelRepository.getAll(workspaceId);
for (const activeWorkspace of activeWorkspaces) {
const messageChannels = await this.messageChannelRepository.getAll(
activeWorkspace.id,
);
for (const messageChannel of messageChannels) {
if (!messageChannel.syncStatus) {
@ -66,7 +58,7 @@ export class MessagingMessageChannelSyncStatusMonitoringCronJob {
eventName: `message_channel.monitoring.sync_status.${snakeCase(
messageChannel.syncStatus,
)}`,
workspaceId,
workspaceId: activeWorkspace.id,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: messageChannel.syncStatus,