diff --git a/packages/twenty-server/src/modules/messaging/common/services/messaging-telemetry.service.ts b/packages/twenty-server/src/modules/messaging/common/services/messaging-telemetry.service.ts index ae9cec0fa..0234f7fbc 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/messaging-telemetry.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/messaging-telemetry.service.ts @@ -5,7 +5,7 @@ import { EnvironmentService } from 'src/engine/integrations/environment/environm type MessagingTelemetryTrackInput = { eventName: string; - workspaceId: string; + workspaceId?: string; userId?: string; connectedAccountId?: string; messageChannelId?: string; diff --git a/packages/twenty-server/src/modules/messaging/messaging.module.ts b/packages/twenty-server/src/modules/messaging/messaging.module.ts index 0797e3853..5e2afc037 100644 --- a/packages/twenty-server/src/modules/messaging/messaging.module.ts +++ b/packages/twenty-server/src/modules/messaging/messaging.module.ts @@ -4,6 +4,7 @@ import { MessagingBlocklistManagerModule } from 'src/modules/messaging/blocklist import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cleaner/messaging-message-cleaner.module'; import { MessagingImportManagerModule } from 'src/modules/messaging/message-import-manager/messaging-import-manager.module'; import { MessaginParticipantsManagerModule } from 'src/modules/messaging/message-participants-manager/messaging-participants-manager.module'; +import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/messaging-monitoring.module'; @Module({ imports: [ @@ -11,6 +12,7 @@ import { MessaginParticipantsManagerModule } from 'src/modules/messaging/message MessagingMessageCleanerModule, MessaginParticipantsManagerModule, MessagingBlocklistManagerModule, + MessagingMonitoringModule, ], providers: [], exports: [], diff --git a/packages/twenty-server/src/modules/messaging/monitoring/crons/commands/messaging-message-channel-sync-status-monitoring.cron.command.ts b/packages/twenty-server/src/modules/messaging/monitoring/crons/commands/messaging-message-channel-sync-status-monitoring.cron.command.ts new file mode 100644 index 000000000..689678562 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/monitoring/crons/commands/messaging-message-channel-sync-status-monitoring.cron.command.ts @@ -0,0 +1,36 @@ +import { Command, CommandRunner } from 'nest-commander'; + +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { MessagingMessageChannelSyncStatusMonitoringCronJob } from 'src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron'; + +const MESSAGING_MESSAGE_CHANNEL_SYNC_STATUS_MONITORING_CRON_PATTERN = + '0 * * * *'; + +@Command({ + name: 'cron:messaging:monitoring:message-channel-sync-status', + description: + 'Starts a cron job to monitor the sync status of message channels', +}) +export class MessagingMessageChannelSyncStatusMonitoringCronCommand extends CommandRunner { + constructor( + @InjectMessageQueue(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron( + MessagingMessageChannelSyncStatusMonitoringCronJob.name, + undefined, + { + repeat: { + pattern: + MESSAGING_MESSAGE_CHANNEL_SYNC_STATUS_MONITORING_CRON_PATTERN, + }, + }, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts b/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts new file mode 100644 index 000000000..ee420467c --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron.ts @@ -0,0 +1,82 @@ +import { Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository, In } from 'typeorm'; +import snakeCase from 'lodash.snakecase'; + +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; +import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; +import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service'; + +@Processor(MessageQueue.cronQueue) +export class MessagingMessageChannelSyncStatusMonitoringCronJob { + private readonly logger = new Logger( + MessagingMessageChannelSyncStatusMonitoringCronJob.name, + ); + + constructor( + @InjectRepository(Workspace, 'core') + private readonly workspaceRepository: Repository, + @InjectRepository(DataSourceEntity, 'metadata') + private readonly dataSourceRepository: Repository, + @InjectObjectMetadataRepository(MessageChannelWorkspaceEntity) + private readonly messageChannelRepository: MessageChannelRepository, + private readonly environmentService: EnvironmentService, + private readonly messagingTelemetryService: MessagingTelemetryService, + ) {} + + @Process(MessagingMessageChannelSyncStatusMonitoringCronJob.name) + async handle(): Promise { + this.logger.log('Starting message channel sync status monitoring...'); + + await this.messagingTelemetryService.track({ + eventName: 'message_channel.monitoring.sync_status.start', + message: 'Starting message channel sync status monitoring', + }); + + const workspaceIds = ( + await this.workspaceRepository.find({ + where: this.environmentService.get('IS_BILLING_ENABLED') + ? { + subscriptionStatus: In(['active', 'trialing', 'past_due']), + } + : {}, + select: ['id'], + }) + ).map((workspace) => workspace.id); + + const dataSources = await this.dataSourceRepository.find({ + where: { + workspaceId: In(workspaceIds), + }, + }); + + const workspaceIdsWithDataSources = new Set( + dataSources.map((dataSource) => dataSource.workspaceId), + ); + + for (const workspaceId of workspaceIdsWithDataSources) { + const messageChannels = + await this.messageChannelRepository.getAll(workspaceId); + + for (const messageChannel of messageChannels) { + await this.messagingTelemetryService.track({ + eventName: `message_channel.monitoring.sync_status.${snakeCase( + messageChannel.syncStatus, + )}`, + workspaceId, + connectedAccountId: messageChannel.connectedAccountId, + messageChannelId: messageChannel.id, + message: messageChannel.syncStatus, + }); + } + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/monitoring/messaging-monitoring.module.ts b/packages/twenty-server/src/modules/messaging/monitoring/messaging-monitoring.module.ts new file mode 100644 index 000000000..2d6c04c75 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/monitoring/messaging-monitoring.module.ts @@ -0,0 +1,22 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; + +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; +import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; +import { MessagingMessageChannelSyncStatusMonitoringCronCommand } from 'src/modules/messaging/monitoring/crons/commands/messaging-message-channel-sync-status-monitoring.cron.command'; +import { MessagingMessageChannelSyncStatusMonitoringCronJob } from 'src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron'; + +@Module({ + imports: [ + MessagingCommonModule, + TypeOrmModule.forFeature([Workspace], 'core'), + TypeOrmModule.forFeature([DataSourceEntity], 'metadata'), + ], + providers: [ + MessagingMessageChannelSyncStatusMonitoringCronCommand, + MessagingMessageChannelSyncStatusMonitoringCronJob, + ], + exports: [], +}) +export class MessagingMonitoringModule {}