5898 Create a cron to monitor messageChannelSyncStatus (#5933)
Closes #5898
This commit is contained in:
@ -5,7 +5,7 @@ import { EnvironmentService } from 'src/engine/integrations/environment/environm
|
|||||||
|
|
||||||
type MessagingTelemetryTrackInput = {
|
type MessagingTelemetryTrackInput = {
|
||||||
eventName: string;
|
eventName: string;
|
||||||
workspaceId: string;
|
workspaceId?: string;
|
||||||
userId?: string;
|
userId?: string;
|
||||||
connectedAccountId?: string;
|
connectedAccountId?: string;
|
||||||
messageChannelId?: string;
|
messageChannelId?: string;
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import { MessagingBlocklistManagerModule } from 'src/modules/messaging/blocklist
|
|||||||
import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cleaner/messaging-message-cleaner.module';
|
import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cleaner/messaging-message-cleaner.module';
|
||||||
import { MessagingImportManagerModule } from 'src/modules/messaging/message-import-manager/messaging-import-manager.module';
|
import { MessagingImportManagerModule } from 'src/modules/messaging/message-import-manager/messaging-import-manager.module';
|
||||||
import { MessaginParticipantsManagerModule } from 'src/modules/messaging/message-participants-manager/messaging-participants-manager.module';
|
import { MessaginParticipantsManagerModule } from 'src/modules/messaging/message-participants-manager/messaging-participants-manager.module';
|
||||||
|
import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/messaging-monitoring.module';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
@ -11,6 +12,7 @@ import { MessaginParticipantsManagerModule } from 'src/modules/messaging/message
|
|||||||
MessagingMessageCleanerModule,
|
MessagingMessageCleanerModule,
|
||||||
MessaginParticipantsManagerModule,
|
MessaginParticipantsManagerModule,
|
||||||
MessagingBlocklistManagerModule,
|
MessagingBlocklistManagerModule,
|
||||||
|
MessagingMonitoringModule,
|
||||||
],
|
],
|
||||||
providers: [],
|
providers: [],
|
||||||
exports: [],
|
exports: [],
|
||||||
|
|||||||
@ -0,0 +1,36 @@
|
|||||||
|
import { Command, CommandRunner } from 'nest-commander';
|
||||||
|
|
||||||
|
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
|
||||||
|
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||||
|
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||||
|
import { MessagingMessageChannelSyncStatusMonitoringCronJob } from 'src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron';
|
||||||
|
|
||||||
|
const MESSAGING_MESSAGE_CHANNEL_SYNC_STATUS_MONITORING_CRON_PATTERN =
|
||||||
|
'0 * * * *';
|
||||||
|
|
||||||
|
@Command({
|
||||||
|
name: 'cron:messaging:monitoring:message-channel-sync-status',
|
||||||
|
description:
|
||||||
|
'Starts a cron job to monitor the sync status of message channels',
|
||||||
|
})
|
||||||
|
export class MessagingMessageChannelSyncStatusMonitoringCronCommand extends CommandRunner {
|
||||||
|
constructor(
|
||||||
|
@InjectMessageQueue(MessageQueue.cronQueue)
|
||||||
|
private readonly messageQueueService: MessageQueueService,
|
||||||
|
) {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
async run(): Promise<void> {
|
||||||
|
await this.messageQueueService.addCron<undefined>(
|
||||||
|
MessagingMessageChannelSyncStatusMonitoringCronJob.name,
|
||||||
|
undefined,
|
||||||
|
{
|
||||||
|
repeat: {
|
||||||
|
pattern:
|
||||||
|
MESSAGING_MESSAGE_CHANNEL_SYNC_STATUS_MONITORING_CRON_PATTERN,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,82 @@
|
|||||||
|
import { Logger } from '@nestjs/common';
|
||||||
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
|
|
||||||
|
import { Repository, In } from 'typeorm';
|
||||||
|
import snakeCase from 'lodash.snakecase';
|
||||||
|
|
||||||
|
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||||
|
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
|
||||||
|
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||||
|
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
|
||||||
|
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
|
||||||
|
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
|
||||||
|
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||||
|
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
|
||||||
|
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
|
||||||
|
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
|
||||||
|
|
||||||
|
@Processor(MessageQueue.cronQueue)
|
||||||
|
export class MessagingMessageChannelSyncStatusMonitoringCronJob {
|
||||||
|
private readonly logger = new Logger(
|
||||||
|
MessagingMessageChannelSyncStatusMonitoringCronJob.name,
|
||||||
|
);
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
@InjectRepository(Workspace, 'core')
|
||||||
|
private readonly workspaceRepository: Repository<Workspace>,
|
||||||
|
@InjectRepository(DataSourceEntity, 'metadata')
|
||||||
|
private readonly dataSourceRepository: Repository<DataSourceEntity>,
|
||||||
|
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
|
||||||
|
private readonly messageChannelRepository: MessageChannelRepository,
|
||||||
|
private readonly environmentService: EnvironmentService,
|
||||||
|
private readonly messagingTelemetryService: MessagingTelemetryService,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@Process(MessagingMessageChannelSyncStatusMonitoringCronJob.name)
|
||||||
|
async handle(): Promise<void> {
|
||||||
|
this.logger.log('Starting message channel sync status monitoring...');
|
||||||
|
|
||||||
|
await this.messagingTelemetryService.track({
|
||||||
|
eventName: 'message_channel.monitoring.sync_status.start',
|
||||||
|
message: 'Starting message channel sync status monitoring',
|
||||||
|
});
|
||||||
|
|
||||||
|
const workspaceIds = (
|
||||||
|
await this.workspaceRepository.find({
|
||||||
|
where: this.environmentService.get('IS_BILLING_ENABLED')
|
||||||
|
? {
|
||||||
|
subscriptionStatus: In(['active', 'trialing', 'past_due']),
|
||||||
|
}
|
||||||
|
: {},
|
||||||
|
select: ['id'],
|
||||||
|
})
|
||||||
|
).map((workspace) => workspace.id);
|
||||||
|
|
||||||
|
const dataSources = await this.dataSourceRepository.find({
|
||||||
|
where: {
|
||||||
|
workspaceId: In(workspaceIds),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const workspaceIdsWithDataSources = new Set(
|
||||||
|
dataSources.map((dataSource) => dataSource.workspaceId),
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const workspaceId of workspaceIdsWithDataSources) {
|
||||||
|
const messageChannels =
|
||||||
|
await this.messageChannelRepository.getAll(workspaceId);
|
||||||
|
|
||||||
|
for (const messageChannel of messageChannels) {
|
||||||
|
await this.messagingTelemetryService.track({
|
||||||
|
eventName: `message_channel.monitoring.sync_status.${snakeCase(
|
||||||
|
messageChannel.syncStatus,
|
||||||
|
)}`,
|
||||||
|
workspaceId,
|
||||||
|
connectedAccountId: messageChannel.connectedAccountId,
|
||||||
|
messageChannelId: messageChannel.id,
|
||||||
|
message: messageChannel.syncStatus,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,22 @@
|
|||||||
|
import { Module } from '@nestjs/common';
|
||||||
|
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||||
|
|
||||||
|
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||||
|
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
|
||||||
|
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
|
||||||
|
import { MessagingMessageChannelSyncStatusMonitoringCronCommand } from 'src/modules/messaging/monitoring/crons/commands/messaging-message-channel-sync-status-monitoring.cron.command';
|
||||||
|
import { MessagingMessageChannelSyncStatusMonitoringCronJob } from 'src/modules/messaging/monitoring/crons/jobs/messaging-message-channel-sync-status-monitoring.cron';
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [
|
||||||
|
MessagingCommonModule,
|
||||||
|
TypeOrmModule.forFeature([Workspace], 'core'),
|
||||||
|
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
|
||||||
|
],
|
||||||
|
providers: [
|
||||||
|
MessagingMessageChannelSyncStatusMonitoringCronCommand,
|
||||||
|
MessagingMessageChannelSyncStatusMonitoringCronJob,
|
||||||
|
],
|
||||||
|
exports: [],
|
||||||
|
})
|
||||||
|
export class MessagingMonitoringModule {}
|
||||||
Reference in New Issue
Block a user