add invalid captcha and messageChannel sync status health monitoring (#10029)
Context : We want to implement some counters to monitor server health. First counters will track : messageChannel sync status during job execution and invalid captcha. How : Counters are stored in cache and grouped by one-minute windows. Controllers are created for each metric, aggregating counter over a five-minutes window. Endpoints are public and will be queried by Prometheus. closes https://github.com/twentyhq/core-team-issues/issues/55
This commit is contained in:
@ -30,6 +30,7 @@ import { FeatureFlag } from 'src/engine/core-modules/feature-flag/feature-flag.e
|
||||
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
|
||||
import { FileUploadModule } from 'src/engine/core-modules/file/file-upload/file-upload.module';
|
||||
import { GuardRedirectModule } from 'src/engine/core-modules/guard-redirect/guard-redirect.module';
|
||||
import { HealthModule } from 'src/engine/core-modules/health/health.module';
|
||||
import { JwtModule } from 'src/engine/core-modules/jwt/jwt.module';
|
||||
import { KeyValuePair } from 'src/engine/core-modules/key-value-pair/key-value-pair.entity';
|
||||
import { OnboardingModule } from 'src/engine/core-modules/onboarding/onboarding.module';
|
||||
@ -87,6 +88,7 @@ import { JwtAuthStrategy } from './strategies/jwt.auth.strategy';
|
||||
WorkspaceInvitationModule,
|
||||
EmailVerificationModule,
|
||||
GuardRedirectModule,
|
||||
HealthModule,
|
||||
],
|
||||
controllers: [
|
||||
GoogleAuthController,
|
||||
|
||||
@ -2,4 +2,5 @@ export enum CacheStorageNamespace {
|
||||
ModuleMessaging = 'module:messaging',
|
||||
ModuleCalendar = 'module:calendar',
|
||||
EngineWorkspace = 'engine:workspace',
|
||||
EngineHealth = 'engine:health',
|
||||
}
|
||||
|
||||
@ -7,10 +7,14 @@ import {
|
||||
import { GqlExecutionContext } from '@nestjs/graphql';
|
||||
|
||||
import { CaptchaService } from 'src/engine/core-modules/captcha/captcha.service';
|
||||
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
|
||||
|
||||
@Injectable()
|
||||
export class CaptchaGuard implements CanActivate {
|
||||
constructor(private captchaService: CaptchaService) {}
|
||||
constructor(
|
||||
private captchaService: CaptchaService,
|
||||
private healthCacheService: HealthCacheService,
|
||||
) {}
|
||||
|
||||
async canActivate(context: ExecutionContext): Promise<boolean> {
|
||||
const ctx = GqlExecutionContext.create(context);
|
||||
@ -19,10 +23,14 @@ export class CaptchaGuard implements CanActivate {
|
||||
|
||||
const result = await this.captchaService.validate(token || '');
|
||||
|
||||
if (result.success) return true;
|
||||
else
|
||||
if (result.success) {
|
||||
return true;
|
||||
} else {
|
||||
await this.healthCacheService.incrementInvalidCaptchaCounter();
|
||||
|
||||
throw new BadRequestException(
|
||||
'Invalid Captcha, please try another device',
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1036,6 +1036,15 @@ export class EnvironmentVariables {
|
||||
@IsString()
|
||||
@IsOptional()
|
||||
ENTERPRISE_KEY: string;
|
||||
|
||||
@EnvironmentVariablesMetadata({
|
||||
group: EnvironmentVariablesGroup.Other,
|
||||
description: 'Health monitoring time window in minutes',
|
||||
})
|
||||
@IsNumber()
|
||||
@CastToPositiveNumber()
|
||||
@IsOptional()
|
||||
HEALTH_MONITORING_TIME_WINDOW_IN_MINUTES = 5;
|
||||
}
|
||||
|
||||
export const validate = (
|
||||
|
||||
@ -0,0 +1,133 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
|
||||
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
|
||||
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
|
||||
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
|
||||
import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
|
||||
import { MessageChannelSyncJobByStatusCounter } from 'src/engine/core-modules/health/types/health-metrics.types';
|
||||
import { MessageChannelSyncStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
|
||||
@Injectable()
|
||||
export class HealthCacheService {
|
||||
private readonly healthMonitoringTimeWindowInMinutes: number;
|
||||
private readonly healthCacheTtl: number;
|
||||
|
||||
constructor(
|
||||
@InjectCacheStorage(CacheStorageNamespace.EngineHealth)
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
private readonly environmentService: EnvironmentService,
|
||||
) {
|
||||
this.healthMonitoringTimeWindowInMinutes = this.environmentService.get(
|
||||
'HEALTH_MONITORING_TIME_WINDOW_IN_MINUTES',
|
||||
);
|
||||
this.healthCacheTtl = this.healthMonitoringTimeWindowInMinutes * 60000 * 2;
|
||||
}
|
||||
|
||||
private getCacheKeyWithTimestamp(key: string, timestamp?: number): string {
|
||||
const minuteTimestamp = timestamp ?? Math.floor(Date.now() / 60000) * 60000;
|
||||
|
||||
return `${key}:${minuteTimestamp}`;
|
||||
}
|
||||
|
||||
private getLastXMinutesTimestamps(minutes: number): number[] {
|
||||
const currentMinuteTimestamp = Math.floor(Date.now() / 60000) * 60000;
|
||||
|
||||
return Array.from(
|
||||
{ length: minutes },
|
||||
(_, i) => currentMinuteTimestamp - i * 60000,
|
||||
);
|
||||
}
|
||||
|
||||
async incrementMessageChannelSyncJobByStatusCounter(
|
||||
status: MessageChannelSyncStatus,
|
||||
increment: number,
|
||||
) {
|
||||
const cacheKey = this.getCacheKeyWithTimestamp(
|
||||
HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
|
||||
);
|
||||
|
||||
const currentCounter =
|
||||
await this.cacheStorage.get<MessageChannelSyncJobByStatusCounter>(
|
||||
cacheKey,
|
||||
);
|
||||
|
||||
const updatedCounter = {
|
||||
...(currentCounter || {}),
|
||||
[status]: (currentCounter?.[status] || 0) + increment,
|
||||
};
|
||||
|
||||
return await this.cacheStorage.set(
|
||||
cacheKey,
|
||||
updatedCounter,
|
||||
this.healthCacheTtl,
|
||||
);
|
||||
}
|
||||
|
||||
async getMessageChannelSyncJobByStatusCounter() {
|
||||
const cacheKeys = this.getLastXMinutesTimestamps(
|
||||
this.healthMonitoringTimeWindowInMinutes,
|
||||
).map((timestamp) =>
|
||||
this.getCacheKeyWithTimestamp(
|
||||
HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
|
||||
timestamp,
|
||||
),
|
||||
);
|
||||
|
||||
const aggregatedCounter = Object.fromEntries(
|
||||
Object.values(MessageChannelSyncStatus).map((status) => [status, 0]),
|
||||
);
|
||||
|
||||
for (const key of cacheKeys) {
|
||||
const counter =
|
||||
await this.cacheStorage.get<MessageChannelSyncJobByStatusCounter>(key);
|
||||
|
||||
if (!counter) continue;
|
||||
|
||||
for (const [status, count] of Object.entries(counter) as [
|
||||
MessageChannelSyncStatus,
|
||||
number,
|
||||
][]) {
|
||||
aggregatedCounter[status] += count;
|
||||
}
|
||||
}
|
||||
|
||||
return aggregatedCounter;
|
||||
}
|
||||
|
||||
async incrementInvalidCaptchaCounter() {
|
||||
const cacheKey = this.getCacheKeyWithTimestamp(
|
||||
HealthCounterCacheKeys.InvalidCaptcha,
|
||||
);
|
||||
|
||||
const currentCounter = await this.cacheStorage.get<number>(cacheKey);
|
||||
const updatedCounter = (currentCounter || 0) + 1;
|
||||
|
||||
return await this.cacheStorage.set(
|
||||
cacheKey,
|
||||
updatedCounter,
|
||||
this.healthCacheTtl,
|
||||
);
|
||||
}
|
||||
|
||||
async getInvalidCaptchaCounter() {
|
||||
const cacheKeys = this.getLastXMinutesTimestamps(
|
||||
this.healthMonitoringTimeWindowInMinutes,
|
||||
).map((timestamp) =>
|
||||
this.getCacheKeyWithTimestamp(
|
||||
HealthCounterCacheKeys.InvalidCaptcha,
|
||||
timestamp,
|
||||
),
|
||||
);
|
||||
|
||||
let aggregatedCounter = 0;
|
||||
|
||||
for (const key of cacheKeys) {
|
||||
const counter = await this.cacheStorage.get<number>(key);
|
||||
|
||||
aggregatedCounter += counter || 0;
|
||||
}
|
||||
|
||||
return aggregatedCounter;
|
||||
}
|
||||
}
|
||||
@ -1,6 +1,7 @@
|
||||
import { HealthCheckService, HttpHealthIndicator } from '@nestjs/terminus';
|
||||
import { Test, TestingModule } from '@nestjs/testing';
|
||||
|
||||
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
|
||||
import { HealthController } from 'src/engine/core-modules/health/health.controller';
|
||||
|
||||
describe('HealthController', () => {
|
||||
@ -19,6 +20,10 @@ describe('HealthController', () => {
|
||||
provide: HttpHealthIndicator,
|
||||
useValue: {},
|
||||
},
|
||||
{
|
||||
provide: HealthCacheService,
|
||||
useValue: {},
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
|
||||
@ -1,13 +1,28 @@
|
||||
import { Controller, Get } from '@nestjs/common';
|
||||
import { HealthCheck, HealthCheckService } from '@nestjs/terminus';
|
||||
|
||||
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
|
||||
|
||||
@Controller('healthz')
|
||||
export class HealthController {
|
||||
constructor(private health: HealthCheckService) {}
|
||||
constructor(
|
||||
private health: HealthCheckService,
|
||||
private healthCacheService: HealthCacheService,
|
||||
) {}
|
||||
|
||||
@Get()
|
||||
@HealthCheck()
|
||||
check() {
|
||||
return this.health.check([]);
|
||||
}
|
||||
|
||||
@Get('/message-channel-sync-job-by-status-counter')
|
||||
getMessageChannelSyncJobByStatusCounter() {
|
||||
return this.healthCacheService.getMessageChannelSyncJobByStatusCounter();
|
||||
}
|
||||
|
||||
@Get('/invalid-captcha-counter')
|
||||
getInvalidCaptchaCounter() {
|
||||
return this.healthCacheService.getInvalidCaptchaCounter();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,10 +1,13 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { TerminusModule } from '@nestjs/terminus';
|
||||
|
||||
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
|
||||
import { HealthController } from 'src/engine/core-modules/health/health.controller';
|
||||
|
||||
@Module({
|
||||
imports: [TerminusModule],
|
||||
controllers: [HealthController],
|
||||
providers: [HealthCacheService],
|
||||
exports: [HealthCacheService],
|
||||
})
|
||||
export class HealthModule {}
|
||||
|
||||
@ -0,0 +1,4 @@
|
||||
export enum HealthCounterCacheKeys {
|
||||
MessageChannelSyncJobByStatus = 'message-channel-sync-job-by-status',
|
||||
InvalidCaptcha = 'invalid-captcha',
|
||||
}
|
||||
@ -0,0 +1,5 @@
|
||||
import { MessageChannelSyncStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
|
||||
export type MessageChannelSyncJobByStatusCounter = {
|
||||
[key in MessageChannelSyncStatus]?: number;
|
||||
};
|
||||
@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
|
||||
import { FeatureFlag } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
|
||||
import { HealthModule } from 'src/engine/core-modules/health/health.module';
|
||||
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
|
||||
import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module';
|
||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||
@ -11,6 +12,7 @@ import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/se
|
||||
WorkspaceDataSourceModule,
|
||||
TypeOrmModule.forFeature([FeatureFlag], 'core'),
|
||||
ConnectedAccountModule,
|
||||
HealthModule,
|
||||
],
|
||||
providers: [MessageChannelSyncStatusService],
|
||||
exports: [MessageChannelSyncStatusService],
|
||||
|
||||
@ -5,6 +5,7 @@ import { Any } from 'typeorm';
|
||||
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
|
||||
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
|
||||
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
|
||||
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service';
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
@ -22,6 +23,7 @@ export class MessageChannelSyncStatusService {
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly accountsToReconnectService: AccountsToReconnectService,
|
||||
private readonly healthCacheService: HealthCacheService,
|
||||
) {}
|
||||
|
||||
public async scheduleFullMessageListFetch(messageChannelIds: string[]) {
|
||||
@ -127,6 +129,11 @@ export class MessageChannelSyncStatusService {
|
||||
syncStatus: MessageChannelSyncStatus.ONGOING,
|
||||
syncStageStartedAt: new Date().toISOString(),
|
||||
});
|
||||
|
||||
await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter(
|
||||
MessageChannelSyncStatus.ONGOING,
|
||||
messageChannelIds.length,
|
||||
);
|
||||
}
|
||||
|
||||
public async markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
@ -148,6 +155,11 @@ export class MessageChannelSyncStatusService {
|
||||
syncStageStartedAt: null,
|
||||
syncedAt: new Date().toISOString(),
|
||||
});
|
||||
|
||||
await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter(
|
||||
MessageChannelSyncStatus.ACTIVE,
|
||||
messageChannelIds.length,
|
||||
);
|
||||
}
|
||||
|
||||
public async markAsMessagesImportOngoing(messageChannelIds: string[]) {
|
||||
@ -189,6 +201,11 @@ export class MessageChannelSyncStatusService {
|
||||
syncStage: MessageChannelSyncStage.FAILED,
|
||||
syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN,
|
||||
});
|
||||
|
||||
await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter(
|
||||
MessageChannelSyncStatus.FAILED_UNKNOWN,
|
||||
messageChannelIds.length,
|
||||
);
|
||||
}
|
||||
|
||||
public async markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
|
||||
@ -215,6 +232,11 @@ export class MessageChannelSyncStatusService {
|
||||
syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
|
||||
});
|
||||
|
||||
await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter(
|
||||
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
|
||||
messageChannelIds.length,
|
||||
);
|
||||
|
||||
const connectedAccountRepository =
|
||||
await this.twentyORMManager.getRepository<ConnectedAccountWorkspaceEntity>(
|
||||
'connectedAccount',
|
||||
|
||||
Reference in New Issue
Block a user