Health monitor status for admin panel (#10186)

# Health Monitoring for Self-Hosted Instances

This PR implements basic health monitoring for self-hosted instances in
the admin panel.

## Service Status Checks
We're adding real-time health checks for:
- Redis Connection
- Database Connection
- Worker Status
- Message Sync Status

## Existing Functionality
We already have message sync and captcha counters that store aggregated
metrics in cache within a configurable time window (default: 5 minutes).

## New Endpoints
1. `/healthz` - Basic server health check for Kubernetes pod monitoring
2. `/healthz/{serviceName}` - Individual service health checks (returns
200 if healthy)
3. `/metricsz/{metricName}` - Time-windowed metrics (message sync,
captcha)
4. GraphQL resolver in admin panel for UI consumption

All endpoints use the same underlying service, with different
presentation layers for infrastructure and UI needs.

---------

Co-authored-by: Félix Malfait <felix@twenty.com>
This commit is contained in:
nitin
2025-02-18 20:22:19 +05:30
committed by GitHub
parent 2fca60436b
commit d6655a2c3b
54 changed files with 2307 additions and 95 deletions

View File

@ -0,0 +1,12 @@
export const HEALTH_ERROR_MESSAGES = {
NO_ACTIVE_WORKERS: 'No active workers found',
WORKER_TIMEOUT: 'Worker check timeout',
DATABASE_TIMEOUT: 'Database timeout',
REDIS_TIMEOUT: 'Redis timeout',
DATABASE_CONNECTION_FAILED: 'Database connection failed',
REDIS_CONNECTION_FAILED: 'Unknown Redis error',
WORKER_CHECK_FAILED: 'Worker check failed',
MESSAGE_SYNC_TIMEOUT: 'Message sync check timeout',
MESSAGE_SYNC_CHECK_FAILED: 'Message sync check failed',
MESSAGE_SYNC_HIGH_FAILURE_RATE: 'High failure rate in message sync jobs',
} as const;

View File

@ -0,0 +1 @@
export const HEALTH_INDICATORS_TIMEOUT = 3000;

View File

@ -0,0 +1,41 @@
import { HealthCheckService } from '@nestjs/terminus';
import { Test, TestingModule } from '@nestjs/testing';
import { HealthController } from 'src/engine/core-modules/health/controllers/health.controller';
import { DatabaseHealthIndicator } from 'src/engine/core-modules/health/indicators/database.health';
import { RedisHealthIndicator } from 'src/engine/core-modules/health/indicators/redis.health';
import { WorkerHealthIndicator } from 'src/engine/core-modules/health/indicators/worker.health';
describe('HealthController', () => {
let healthController: HealthController;
beforeEach(async () => {
const testingModule: TestingModule = await Test.createTestingModule({
controllers: [HealthController],
providers: [
{
provide: HealthCheckService,
useValue: { check: jest.fn() },
},
{
provide: DatabaseHealthIndicator,
useValue: { isHealthy: jest.fn() },
},
{
provide: RedisHealthIndicator,
useValue: { isHealthy: jest.fn() },
},
{
provide: WorkerHealthIndicator,
useValue: { isHealthy: jest.fn() },
},
],
}).compile();
healthController = testingModule.get<HealthController>(HealthController);
});
it('should be defined', () => {
expect(healthController).toBeDefined();
});
});

View File

@ -0,0 +1,29 @@
import { Test, TestingModule } from '@nestjs/testing';
import { MetricsController } from 'src/engine/core-modules/health/controllers/metrics.controller';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
describe('MetricsController', () => {
let metricsController: MetricsController;
beforeEach(async () => {
const testingModule: TestingModule = await Test.createTestingModule({
controllers: [MetricsController],
providers: [
{
provide: HealthCacheService,
useValue: {
getMessageChannelSyncJobByStatusCounter: jest.fn(),
getInvalidCaptchaCounter: jest.fn(),
},
},
],
}).compile();
metricsController = testingModule.get<MetricsController>(MetricsController);
});
it('should be defined', () => {
expect(metricsController).toBeDefined();
});
});

View File

@ -0,0 +1,39 @@
import { BadRequestException, Controller, Get, Param } from '@nestjs/common';
import { HealthCheck, HealthCheckService } from '@nestjs/terminus';
import { HealthServiceName } from 'src/engine/core-modules/health/enums/health-service-name.enum';
import { DatabaseHealthIndicator } from 'src/engine/core-modules/health/indicators/database.health';
import { RedisHealthIndicator } from 'src/engine/core-modules/health/indicators/redis.health';
import { WorkerHealthIndicator } from 'src/engine/core-modules/health/indicators/worker.health';
@Controller('healthz')
export class HealthController {
constructor(
private readonly health: HealthCheckService,
private readonly databaseHealth: DatabaseHealthIndicator,
private readonly redisHealth: RedisHealthIndicator,
private readonly workerHealth: WorkerHealthIndicator,
) {}
@Get()
@HealthCheck()
check() {
return this.health.check([]);
}
@Get('/:serviceName')
@HealthCheck()
checkService(@Param('serviceName') serviceName: HealthServiceName) {
const checks = {
[HealthServiceName.DATABASE]: () => this.databaseHealth.isHealthy(),
[HealthServiceName.REDIS]: () => this.redisHealth.isHealthy(),
[HealthServiceName.WORKER]: () => this.workerHealth.isHealthy(),
};
if (!(serviceName in checks)) {
throw new BadRequestException(`Invalid service name: ${serviceName}`);
}
return this.health.check([checks[serviceName]]);
}
}

View File

@ -1,20 +1,10 @@
import { Controller, Get } from '@nestjs/common';
import { HealthCheck, HealthCheckService } from '@nestjs/terminus';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
@Controller('healthz')
export class HealthController {
constructor(
private health: HealthCheckService,
private healthCacheService: HealthCacheService,
) {}
@Get()
@HealthCheck()
check() {
return this.health.check([]);
}
@Controller('metricsz')
export class MetricsController {
constructor(private readonly healthCacheService: HealthCacheService) {}
@Get('/message-channel-sync-job-by-status-counter')
getMessageChannelSyncJobByStatusCounter() {

View File

@ -0,0 +1,6 @@
export enum HealthServiceName {
DATABASE = 'database',
REDIS = 'redis',
WORKER = 'worker',
MESSAGE_SYNC = 'messageSync',
}

View File

@ -5,7 +5,7 @@ import { CacheStorageService } from 'src/engine/core-modules/cache-storage/servi
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
import { MessageChannelSyncJobByStatusCounter } from 'src/engine/core-modules/health/types/health-metrics.types';
import { MessageChannelSyncJobByStatusCounter } from 'src/engine/core-modules/health/types/message-sync-metrics.types';
import { MessageChannelSyncStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
@Injectable()

View File

@ -1,36 +0,0 @@
import { HealthCheckService, HttpHealthIndicator } from '@nestjs/terminus';
import { Test, TestingModule } from '@nestjs/testing';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { HealthController } from 'src/engine/core-modules/health/health.controller';
describe('HealthController', () => {
let healthController: HealthController;
let testingModule: TestingModule;
beforeEach(async () => {
testingModule = await Test.createTestingModule({
providers: [
HealthController,
{
provide: HealthCheckService,
useValue: {},
},
{
provide: HttpHealthIndicator,
useValue: {},
},
{
provide: HealthCacheService,
useValue: {},
},
],
}).compile();
healthController = testingModule.get<HealthController>(HealthController);
});
it('should be defined', () => {
expect(healthController).toBeDefined();
});
});

View File

@ -1,13 +1,32 @@
import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { HealthController } from 'src/engine/core-modules/health/health.controller';
import { HealthController } from 'src/engine/core-modules/health/controllers/health.controller';
import { MetricsController } from 'src/engine/core-modules/health/controllers/metrics.controller';
import { MessageSyncHealthIndicator } from 'src/engine/core-modules/health/indicators/message-sync.health';
import { RedisClientModule } from 'src/engine/core-modules/redis-client/redis-client.module';
import { HealthCacheService } from './health-cache.service';
import { DatabaseHealthIndicator } from './indicators/database.health';
import { RedisHealthIndicator } from './indicators/redis.health';
import { WorkerHealthIndicator } from './indicators/worker.health';
@Module({
imports: [TerminusModule],
controllers: [HealthController],
providers: [HealthCacheService],
exports: [HealthCacheService],
imports: [TerminusModule, RedisClientModule],
controllers: [HealthController, MetricsController],
providers: [
HealthCacheService,
DatabaseHealthIndicator,
RedisHealthIndicator,
WorkerHealthIndicator,
MessageSyncHealthIndicator,
],
exports: [
HealthCacheService,
DatabaseHealthIndicator,
RedisHealthIndicator,
WorkerHealthIndicator,
MessageSyncHealthIndicator,
],
})
export class HealthModule {}

View File

@ -0,0 +1,116 @@
import { HealthIndicatorService } from '@nestjs/terminus';
import { Test, TestingModule } from '@nestjs/testing';
import { DataSource } from 'typeorm';
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { HEALTH_INDICATORS_TIMEOUT } from 'src/engine/core-modules/health/constants/health-indicators-timeout.conts';
import { DatabaseHealthIndicator } from 'src/engine/core-modules/health/indicators/database.health';
describe('DatabaseHealthIndicator', () => {
let service: DatabaseHealthIndicator;
let dataSource: jest.Mocked<DataSource>;
let healthIndicatorService: jest.Mocked<HealthIndicatorService>;
beforeEach(async () => {
dataSource = {
query: jest.fn(),
} as any;
healthIndicatorService = {
check: jest.fn().mockReturnValue({
up: jest.fn().mockImplementation((data) => ({
database: { status: 'up', ...data },
})),
down: jest.fn().mockImplementation((error) => ({
database: {
status: 'down',
error,
},
})),
}),
} as any;
const module: TestingModule = await Test.createTestingModule({
providers: [
DatabaseHealthIndicator,
{
provide: 'coreDataSource',
useValue: dataSource,
},
{
provide: HealthIndicatorService,
useValue: healthIndicatorService,
},
],
}).compile();
service = module.get<DatabaseHealthIndicator>(DatabaseHealthIndicator);
jest.useFakeTimers();
});
afterEach(() => {
jest.useRealTimers();
});
it('should be defined', () => {
expect(service).toBeDefined();
});
it('should return up status with details when database responds', async () => {
const mockResponses = [
[{ version: 'PostgreSQL 15.6' }],
[{ count: '5' }],
[{ max_connections: '100' }],
[{ uptime: '3600' }],
[{ size: '1 GB' }],
[{ table_stats: [] }],
[{ ratio: '95.5' }],
[{ deadlocks: '0' }],
[{ count: '0' }],
];
mockResponses.forEach((response) => {
dataSource.query.mockResolvedValueOnce(response);
});
const result = await service.isHealthy();
expect(result.database.status).toBe('up');
expect(result.database.details).toBeDefined();
expect(result.database.details.version).toBeDefined();
expect(result.database.details.connections).toBeDefined();
expect(result.database.details.performance).toBeDefined();
});
it('should return down status when database fails', async () => {
dataSource.query.mockRejectedValueOnce(
new Error(HEALTH_ERROR_MESSAGES.DATABASE_CONNECTION_FAILED),
);
const result = await service.isHealthy();
expect(result.database.status).toBe('down');
expect(result.database.error).toBe(
HEALTH_ERROR_MESSAGES.DATABASE_CONNECTION_FAILED,
);
});
it('should timeout after specified duration', async () => {
dataSource.query.mockImplementationOnce(
() =>
new Promise((resolve) =>
setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100),
),
);
const healthCheckPromise = service.isHealthy();
jest.advanceTimersByTime(HEALTH_INDICATORS_TIMEOUT + 1);
const result = await healthCheckPromise;
expect(result.database.status).toBe('down');
expect(result.database.error).toBe(HEALTH_ERROR_MESSAGES.DATABASE_TIMEOUT);
});
});

View File

@ -0,0 +1,137 @@
import { HealthIndicatorService } from '@nestjs/terminus';
import { Test, TestingModule } from '@nestjs/testing';
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { HEALTH_INDICATORS_TIMEOUT } from 'src/engine/core-modules/health/constants/health-indicators-timeout.conts';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { MessageSyncHealthIndicator } from 'src/engine/core-modules/health/indicators/message-sync.health';
import { MessageChannelSyncStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
describe('MessageSyncHealthIndicator', () => {
let service: MessageSyncHealthIndicator;
let healthCacheService: jest.Mocked<HealthCacheService>;
let healthIndicatorService: jest.Mocked<HealthIndicatorService>;
beforeEach(async () => {
healthCacheService = {
getMessageChannelSyncJobByStatusCounter: jest.fn(),
} as any;
healthIndicatorService = {
check: jest.fn().mockReturnValue({
up: jest.fn().mockImplementation((data) => ({
messageSync: { status: 'up', ...data },
})),
down: jest.fn().mockImplementation((error) => ({
messageSync: { status: 'down', error },
})),
}),
} as any;
const module: TestingModule = await Test.createTestingModule({
providers: [
MessageSyncHealthIndicator,
{
provide: HealthCacheService,
useValue: healthCacheService,
},
{
provide: HealthIndicatorService,
useValue: healthIndicatorService,
},
],
}).compile();
service = module.get<MessageSyncHealthIndicator>(
MessageSyncHealthIndicator,
);
jest.useFakeTimers();
});
afterEach(() => {
jest.useRealTimers();
});
it('should be defined', () => {
expect(service).toBeDefined();
});
it('should return up status when no jobs are present', async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue(
{
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ONGOING]: 0,
[MessageChannelSyncStatus.ACTIVE]: 0,
[MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 0,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 0,
},
);
const result = await service.isHealthy();
expect(result.messageSync.status).toBe('up');
expect(result.messageSync.details.totalJobs).toBe(0);
expect(result.messageSync.details.failedJobs).toBe(0);
expect(result.messageSync.details.failureRate).toBe(0);
});
it('should return up status when failure rate is below 20%', async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue(
{
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ONGOING]: 2,
[MessageChannelSyncStatus.ACTIVE]: 8,
[MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 0,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 1,
},
);
const result = await service.isHealthy();
expect(result.messageSync.status).toBe('up');
expect(result.messageSync.details.totalJobs).toBe(11);
expect(result.messageSync.details.failedJobs).toBe(1);
expect(result.messageSync.details.failureRate).toBe(9.09);
});
it('should return down status when failure rate is above 20%', async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue(
{
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ONGOING]: 1,
[MessageChannelSyncStatus.ACTIVE]: 1,
[MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 2,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 2,
},
);
const result = await service.isHealthy();
expect(result.messageSync.status).toBe('down');
expect(result.messageSync.error.error).toBe(
HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_HIGH_FAILURE_RATE,
);
expect(result.messageSync.error.details).toBeDefined();
expect(result.messageSync.error.details.failureRate).toBe(33.33);
});
it('should timeout after specified duration', async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockImplementationOnce(
() =>
new Promise((resolve) =>
setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100),
),
);
const healthCheckPromise = service.isHealthy();
jest.advanceTimersByTime(HEALTH_INDICATORS_TIMEOUT + 1);
const result = await healthCheckPromise;
expect(result.messageSync.status).toBe('down');
expect(result.messageSync.error).toBe(
HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_TIMEOUT,
);
});
});

View File

@ -0,0 +1,133 @@
import { HealthIndicatorService } from '@nestjs/terminus';
import { Test, TestingModule } from '@nestjs/testing';
import { Redis } from 'ioredis';
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { HEALTH_INDICATORS_TIMEOUT } from 'src/engine/core-modules/health/constants/health-indicators-timeout.conts';
import { RedisHealthIndicator } from 'src/engine/core-modules/health/indicators/redis.health';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
describe('RedisHealthIndicator', () => {
let service: RedisHealthIndicator;
let mockRedis: jest.Mocked<
Pick<Redis, 'ping' | 'info' | 'dbsize' | 'memory'>
>;
let healthIndicatorService: jest.Mocked<HealthIndicatorService>;
beforeEach(async () => {
mockRedis = {
ping: jest.fn(),
info: jest.fn(),
dbsize: jest.fn(),
memory: jest.fn(),
};
const mockRedisService = {
getClient: () => mockRedis,
} as unknown as RedisClientService;
healthIndicatorService = {
check: jest.fn().mockReturnValue({
up: jest.fn().mockImplementation((data) => ({
redis: { status: 'up', ...data },
})),
down: jest.fn().mockImplementation((error) => ({
redis: {
status: 'down',
error,
},
})),
}),
} as any;
const module: TestingModule = await Test.createTestingModule({
providers: [
RedisHealthIndicator,
{
provide: RedisClientService,
useValue: mockRedisService,
},
{
provide: HealthIndicatorService,
useValue: healthIndicatorService,
},
],
}).compile();
service = module.get<RedisHealthIndicator>(RedisHealthIndicator);
jest.useFakeTimers();
});
afterEach(() => {
jest.useRealTimers();
});
it('should be defined', () => {
expect(service).toBeDefined();
});
it('should return up status with details when redis responds', async () => {
// ai generated mock
mockRedis.info
.mockResolvedValueOnce('redis_version:7.0.0\r\n')
.mockResolvedValueOnce(
'used_memory_human:1.2G\nused_memory_peak_human:1.5G\nmem_fragmentation_ratio:1.5\n',
)
.mockResolvedValueOnce('connected_clients:5\n')
.mockResolvedValueOnce(
'total_connections_received:100\nkeyspace_hits:90\nkeyspace_misses:10\n',
);
const result = await service.isHealthy();
expect(result.redis.status).toBe('up');
expect(result.redis.details).toBeDefined();
expect(result.redis.details.version).toBe('7.0.0');
});
it('should return down status when redis fails', async () => {
mockRedis.ping.mockRejectedValueOnce(
new Error(HEALTH_ERROR_MESSAGES.REDIS_CONNECTION_FAILED),
);
const result = await service.isHealthy();
expect(result.redis.status).toBe('down');
expect(result.redis.error).toBe(
HEALTH_ERROR_MESSAGES.REDIS_CONNECTION_FAILED,
);
});
it('should timeout after specified duration', async () => {
mockRedis.ping.mockImplementationOnce(
() =>
new Promise((resolve) =>
setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100),
),
);
const healthCheckPromise = service.isHealthy();
jest.advanceTimersByTime(HEALTH_INDICATORS_TIMEOUT + 1);
const result = await healthCheckPromise;
expect(result.redis.status).toBe('down');
expect(result.redis.error).toBe(HEALTH_ERROR_MESSAGES.REDIS_TIMEOUT);
});
it('should handle partial failures in health details collection', async () => {
mockRedis.info
.mockResolvedValueOnce('redis_version:7.0.0') // info
.mockResolvedValueOnce('used_memory_human:1.2G') // memory
.mockResolvedValueOnce('connected_clients:5') // clients
.mockResolvedValueOnce('total_connections_received:100'); // stats
const result = await service.isHealthy();
expect(result.redis.status).toBe('up');
expect(result.redis.details).toBeDefined();
expect(result.redis.details.version).toBe('7.0.0');
});
});

View File

@ -0,0 +1,136 @@
import { HealthIndicatorService } from '@nestjs/terminus';
import { Test, TestingModule } from '@nestjs/testing';
import { Redis } from 'ioredis';
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { HEALTH_INDICATORS_TIMEOUT } from 'src/engine/core-modules/health/constants/health-indicators-timeout.conts';
import { WorkerHealthIndicator } from 'src/engine/core-modules/health/indicators/worker.health';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
const mockQueueInstance = {
getWorkers: jest.fn().mockResolvedValue([]),
close: jest.fn().mockResolvedValue(undefined),
getFailedCount: jest.fn().mockResolvedValue(0),
getCompletedCount: jest.fn().mockResolvedValue(0),
getWaitingCount: jest.fn().mockResolvedValue(0),
getActiveCount: jest.fn().mockResolvedValue(0),
getDelayedCount: jest.fn().mockResolvedValue(0),
getPrioritizedCount: jest.fn().mockResolvedValue(0),
};
jest.mock('bullmq', () => ({
Queue: jest.fn(() => mockQueueInstance),
}));
describe('WorkerHealthIndicator', () => {
let service: WorkerHealthIndicator;
let mockRedis: jest.Mocked<Pick<Redis, 'ping'>>;
let healthIndicatorService: jest.Mocked<HealthIndicatorService>;
beforeEach(async () => {
mockRedis = {
ping: jest.fn(),
};
const mockRedisService = {
getClient: () => mockRedis,
} as unknown as RedisClientService;
healthIndicatorService = {
check: jest.fn().mockReturnValue({
up: jest.fn().mockImplementation((data) => ({
worker: { status: 'up', ...data },
})),
down: jest.fn().mockImplementation((error) => ({
worker: { status: 'down', error },
})),
}),
} as any;
const module: TestingModule = await Test.createTestingModule({
providers: [
WorkerHealthIndicator,
{
provide: RedisClientService,
useValue: mockRedisService,
},
{
provide: HealthIndicatorService,
useValue: healthIndicatorService,
},
],
}).compile();
service = module.get<WorkerHealthIndicator>(WorkerHealthIndicator);
jest.useFakeTimers();
});
afterEach(() => {
jest.useRealTimers();
});
it('should be defined', () => {
expect(service).toBeDefined();
});
it('should return up status when workers are active', async () => {
mockQueueInstance.getWorkers.mockResolvedValue([{ id: 'worker1' }]);
const result = await service.isHealthy();
expect(result.worker.status).toBe('up');
expect('queues' in result.worker).toBe(true);
if ('queues' in result.worker) {
expect(result.worker.queues.length).toBeGreaterThan(0);
}
});
it('should return down status when no workers are active', async () => {
mockQueueInstance.getWorkers.mockResolvedValue([]);
const result = await service.isHealthy();
expect(result.worker.status).toBe('down');
expect('error' in result.worker).toBe(true);
if ('error' in result.worker) {
expect(result.worker.error).toBe(HEALTH_ERROR_MESSAGES.NO_ACTIVE_WORKERS);
}
});
it('should timeout after specified duration', async () => {
jest.useFakeTimers();
mockQueueInstance.getWorkers.mockImplementationOnce(
() =>
new Promise((resolve) =>
setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100),
),
);
const resultPromise = service.isHealthy();
jest.advanceTimersByTime(HEALTH_INDICATORS_TIMEOUT + 200);
const result = await resultPromise;
expect(result.worker.status).toBe('down');
expect('error' in result.worker).toBe(true);
if ('error' in result.worker) {
expect(result.worker.error).toBe(HEALTH_ERROR_MESSAGES.WORKER_TIMEOUT);
}
jest.useRealTimers();
});
it('should check all message queues', async () => {
mockQueueInstance.getWorkers.mockResolvedValue([{ id: 'worker1' }]);
await service.isHealthy();
expect(mockQueueInstance.getWorkers).toHaveBeenCalledTimes(
Object.keys(MessageQueue).length,
);
expect(mockQueueInstance.close).toHaveBeenCalledTimes(
Object.keys(MessageQueue).length,
);
});
});

View File

@ -0,0 +1,103 @@
import { Injectable } from '@nestjs/common';
import {
HealthIndicatorResult,
HealthIndicatorService,
} from '@nestjs/terminus';
import { InjectDataSource } from '@nestjs/typeorm';
import { DataSource } from 'typeorm';
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { withHealthCheckTimeout } from 'src/engine/core-modules/health/utils/health-check-timeout.util';
@Injectable()
export class DatabaseHealthIndicator {
constructor(
@InjectDataSource('core')
private readonly dataSource: DataSource,
private readonly healthIndicatorService: HealthIndicatorService,
) {}
async isHealthy(): Promise<HealthIndicatorResult> {
const indicator = this.healthIndicatorService.check('database');
try {
const [
[versionResult],
[activeConnections],
[maxConnections],
[uptime],
[databaseSize],
tableStats,
[cacheHitRatio],
[deadlocks],
[slowQueries],
] = await withHealthCheckTimeout(
Promise.all([
this.dataSource.query('SELECT version()'),
this.dataSource.query(
'SELECT count(*) as count FROM pg_stat_activity',
),
this.dataSource.query('SHOW max_connections'),
this.dataSource.query(
'SELECT extract(epoch from current_timestamp - pg_postmaster_start_time()) as uptime',
),
this.dataSource.query(
'SELECT pg_size_pretty(pg_database_size(current_database())) as size',
),
this.dataSource.query(`
SELECT schemaname, relname, n_live_tup, n_dead_tup, last_vacuum, last_autovacuum
FROM pg_stat_user_tables
ORDER BY n_live_tup DESC
LIMIT 10
`),
this.dataSource.query(`
SELECT
sum(heap_blks_hit) * 100.0 / (sum(heap_blks_hit) + sum(heap_blks_read)) as ratio
FROM pg_statio_user_tables
`),
this.dataSource.query(
'SELECT deadlocks FROM pg_stat_database WHERE datname = current_database()',
),
this.dataSource.query(`
SELECT count(*) as count
FROM pg_stat_activity
WHERE state = 'active'
AND query_start < now() - interval '1 minute'
`),
]),
HEALTH_ERROR_MESSAGES.DATABASE_TIMEOUT,
);
return indicator.up({
details: {
version: versionResult.version,
connections: {
active: parseInt(activeConnections.count),
max: parseInt(maxConnections.max_connections),
utilizationPercent: Math.round(
(parseInt(activeConnections.count) /
parseInt(maxConnections.max_connections)) *
100,
),
},
uptime: Math.round(uptime.uptime / 3600) + ' hours',
databaseSize: databaseSize.size,
performance: {
cacheHitRatio: Math.round(parseFloat(cacheHitRatio.ratio)) + '%',
deadlocks: parseInt(deadlocks.deadlocks),
slowQueries: parseInt(slowQueries.count),
},
top10Tables: tableStats,
},
});
} catch (error) {
const errorMessage =
error.message === HEALTH_ERROR_MESSAGES.DATABASE_TIMEOUT
? HEALTH_ERROR_MESSAGES.DATABASE_TIMEOUT
: HEALTH_ERROR_MESSAGES.DATABASE_CONNECTION_FAILED;
return indicator.down(errorMessage);
}
}
}

View File

@ -0,0 +1,63 @@
import { Injectable } from '@nestjs/common';
import {
HealthIndicatorResult,
HealthIndicatorService,
} from '@nestjs/terminus';
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { withHealthCheckTimeout } from 'src/engine/core-modules/health/utils/health-check-timeout.util';
@Injectable()
export class MessageSyncHealthIndicator {
constructor(
private readonly healthIndicatorService: HealthIndicatorService,
private readonly healthCacheService: HealthCacheService,
) {}
async isHealthy(): Promise<HealthIndicatorResult> {
const indicator = this.healthIndicatorService.check('messageSync');
try {
const counters = await withHealthCheckTimeout(
this.healthCacheService.getMessageChannelSyncJobByStatusCounter(),
HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_TIMEOUT,
);
const totalJobs = Object.values(counters).reduce(
(sum, count) => sum + (count || 0),
0,
);
const failedJobs = counters.FAILED_UNKNOWN || 0;
// + (counters.FAILED_INSUFFICIENT_PERMISSIONS || 0)
const failureRate =
totalJobs > 0
? Math.round((failedJobs / totalJobs) * 100 * 100) / 100
: 0;
const details = {
counters,
totalJobs,
failedJobs,
failureRate,
};
if (totalJobs === 0 || failureRate < 20) {
return indicator.up({ details });
}
return indicator.down({
error: HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_HIGH_FAILURE_RATE,
details,
});
} catch (error) {
const errorMessage =
error.message === HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_TIMEOUT
? HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_TIMEOUT
: HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_CHECK_FAILED;
return indicator.down(errorMessage);
}
}
}

View File

@ -0,0 +1,94 @@
import { Injectable } from '@nestjs/common';
import {
HealthIndicatorResult,
HealthIndicatorService,
} from '@nestjs/terminus';
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { withHealthCheckTimeout } from 'src/engine/core-modules/health/utils/health-check-timeout.util';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
@Injectable()
export class RedisHealthIndicator {
constructor(
private readonly redisClient: RedisClientService,
private readonly healthIndicatorService: HealthIndicatorService,
) {}
async isHealthy(): Promise<HealthIndicatorResult> {
const indicator = this.healthIndicatorService.check('redis');
try {
const [info, memory, clients, stats] = await withHealthCheckTimeout(
Promise.all([
this.redisClient.getClient().info(),
this.redisClient.getClient().info('memory'),
this.redisClient.getClient().info('clients'),
this.redisClient.getClient().info('stats'),
]),
HEALTH_ERROR_MESSAGES.REDIS_TIMEOUT,
);
const parseInfo = (info: string) => {
const result: Record<string, string> = {};
info.split('\r\n').forEach((line) => {
const [key, value] = line.split(':');
if (key && value) {
result[key] = value;
}
});
return result;
};
const infoData = parseInfo(info);
const memoryData = parseInfo(memory);
const clientsData = parseInfo(clients);
const statsData = parseInfo(stats);
return indicator.up({
details: {
version: infoData.redis_version,
uptime:
Math.round(parseInt(infoData.uptime_in_seconds) / 3600) + ' hours',
memory: {
used: memoryData.used_memory_human,
peak: memoryData.used_memory_peak_human,
fragmentation: parseFloat(memoryData.mem_fragmentation_ratio),
},
connections: {
current: parseInt(clientsData.connected_clients),
total: parseInt(statsData.total_connections_received),
rejected: parseInt(statsData.rejected_connections),
},
performance: {
opsPerSecond: parseInt(statsData.instantaneous_ops_per_sec),
hitRate: statsData.keyspace_hits
? Math.round(
(parseInt(statsData.keyspace_hits) /
(parseInt(statsData.keyspace_hits) +
parseInt(statsData.keyspace_misses))) *
100,
) + '%'
: '0%',
evictedKeys: parseInt(statsData.evicted_keys),
expiredKeys: parseInt(statsData.expired_keys),
},
replication: {
role: infoData.role,
connectedSlaves: parseInt(infoData.connected_slaves || '0'),
},
},
});
} catch (error) {
const errorMessage =
error.message === HEALTH_ERROR_MESSAGES.REDIS_TIMEOUT
? HEALTH_ERROR_MESSAGES.REDIS_TIMEOUT
: HEALTH_ERROR_MESSAGES.REDIS_CONNECTION_FAILED;
return indicator.down(errorMessage);
}
}
}

View File

@ -0,0 +1,103 @@
import { Injectable } from '@nestjs/common';
import { HealthIndicatorService } from '@nestjs/terminus';
import { Queue } from 'bullmq';
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { WorkerQueueHealth } from 'src/engine/core-modules/health/types/worker-queue-health.type';
import { withHealthCheckTimeout } from 'src/engine/core-modules/health/utils/health-check-timeout.util';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
@Injectable()
export class WorkerHealthIndicator {
constructor(
private readonly redisClient: RedisClientService,
private readonly healthIndicatorService: HealthIndicatorService,
) {}
async isHealthy() {
const indicator = this.healthIndicatorService.check('worker');
try {
const workerStatus = await withHealthCheckTimeout(
this.checkWorkers(),
HEALTH_ERROR_MESSAGES.WORKER_TIMEOUT,
);
if (workerStatus.status === 'up') {
return indicator.up({
queues: workerStatus.queues,
});
}
return indicator.down(workerStatus.error);
} catch (error) {
const errorMessage =
error.message === HEALTH_ERROR_MESSAGES.WORKER_TIMEOUT
? HEALTH_ERROR_MESSAGES.WORKER_TIMEOUT
: HEALTH_ERROR_MESSAGES.WORKER_CHECK_FAILED;
return indicator.down(errorMessage);
}
}
private async checkWorkers() {
const redis = this.redisClient.getClient();
const queues = Object.values(MessageQueue);
const queueStatuses: WorkerQueueHealth[] = [];
for (const queueName of queues) {
const queue = new Queue(queueName, { connection: redis });
try {
const workers = await queue.getWorkers();
if (workers.length > 0) {
const [
failedCount,
completedCount,
waitingCount,
activeCount,
delayedCount,
prioritizedCount,
] = await Promise.all([
queue.getFailedCount(),
queue.getCompletedCount(),
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getDelayedCount(),
queue.getPrioritizedCount(),
]);
queueStatuses.push({
name: queueName,
workers: workers.length,
metrics: {
failed: failedCount,
completed: completedCount,
waiting: waitingCount,
active: activeCount,
delayed: delayedCount,
prioritized: prioritizedCount,
},
});
}
await queue.close();
} catch (error) {
await queue.close();
}
}
const hasActiveWorkers = queueStatuses.some((q) => q.workers > 0);
return {
status: hasActiveWorkers ? 'up' : 'down',
error: hasActiveWorkers
? undefined
: HEALTH_ERROR_MESSAGES.NO_ACTIVE_WORKERS,
queues: queueStatuses,
};
}
}

View File

@ -1,5 +0,0 @@
import { MessageChannelSyncStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
export type MessageChannelSyncJobByStatusCounter = {
[key in MessageChannelSyncStatus]?: number;
};

View File

@ -0,0 +1,19 @@
import { Field, ObjectType } from '@nestjs/graphql';
@ObjectType()
export class MessageChannelSyncJobByStatusCounter {
@Field(() => Number, { nullable: true })
NOT_SYNCED?: number;
@Field(() => Number, { nullable: true })
ONGOING?: number;
@Field(() => Number, { nullable: true })
ACTIVE?: number;
@Field(() => Number, { nullable: true })
FAILED_INSUFFICIENT_PERMISSIONS?: number;
@Field(() => Number, { nullable: true })
FAILED_UNKNOWN?: number;
}

View File

@ -0,0 +1,15 @@
import { Field, ObjectType } from '@nestjs/graphql';
import { WorkerQueueMetrics } from 'src/engine/core-modules/health/types/worker-queue-metrics.type';
@ObjectType()
export class WorkerQueueHealth {
@Field(() => String)
name: string;
@Field(() => Number)
workers: number;
@Field(() => WorkerQueueMetrics)
metrics: WorkerQueueMetrics;
}

View File

@ -0,0 +1,22 @@
import { Field, ObjectType } from '@nestjs/graphql';
@ObjectType()
export class WorkerQueueMetrics {
@Field(() => Number)
failed: number;
@Field(() => Number)
completed: number;
@Field(() => Number)
waiting: number;
@Field(() => Number)
active: number;
@Field(() => Number)
delayed: number;
@Field(() => Number)
prioritized: number;
}

View File

@ -0,0 +1,16 @@
import { HEALTH_INDICATORS_TIMEOUT } from 'src/engine/core-modules/health/constants/health-indicators-timeout.conts';
export const withHealthCheckTimeout = async <T>(
promise: Promise<T>,
errorMessage: string,
): Promise<T> => {
return Promise.race([
promise,
new Promise<T>((_, reject) =>
setTimeout(
() => reject(new Error(errorMessage)),
HEALTH_INDICATORS_TIMEOUT,
),
),
]);
};