Health status worker metrics improvements (#10442)

Co-authored-by: Félix Malfait <felix.malfait@gmail.com>
This commit is contained in:
nitin
2025-03-04 12:47:12 +05:30
committed by GitHub
parent 41db10daff
commit 327f0cd370
27 changed files with 1468 additions and 312 deletions

View File

@ -12,12 +12,10 @@ import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-c
const mockQueueInstance = {
getWorkers: jest.fn().mockResolvedValue([]),
close: jest.fn().mockResolvedValue(undefined),
getFailedCount: jest.fn().mockResolvedValue(0),
getCompletedCount: jest.fn().mockResolvedValue(0),
getMetrics: jest.fn().mockResolvedValue({ count: 0, data: [] }),
getWaitingCount: jest.fn().mockResolvedValue(0),
getActiveCount: jest.fn().mockResolvedValue(0),
getDelayedCount: jest.fn().mockResolvedValue(0),
getPrioritizedCount: jest.fn().mockResolvedValue(0),
};
jest.mock('bullmq', () => ({
@ -28,6 +26,7 @@ describe('WorkerHealthIndicator', () => {
let service: WorkerHealthIndicator;
let mockRedis: jest.Mocked<Pick<Redis, 'ping'>>;
let healthIndicatorService: jest.Mocked<HealthIndicatorService>;
let loggerSpy: jest.SpyInstance;
beforeEach(async () => {
mockRedis = {
@ -64,11 +63,23 @@ describe('WorkerHealthIndicator', () => {
}).compile();
service = module.get<WorkerHealthIndicator>(WorkerHealthIndicator);
loggerSpy = jest
.spyOn(service['logger'], 'error')
.mockImplementation(() => {});
jest.useFakeTimers();
// Reset mocks to their default success state before each test
mockQueueInstance.getWorkers.mockResolvedValue([]);
mockQueueInstance.getMetrics.mockResolvedValue({ count: 0, data: [] });
mockQueueInstance.getWaitingCount.mockResolvedValue(0);
mockQueueInstance.getActiveCount.mockResolvedValue(0);
mockQueueInstance.getDelayedCount.mockResolvedValue(0);
});
afterEach(() => {
jest.useRealTimers();
jest.clearAllMocks();
});
it('should be defined', () => {
@ -133,4 +144,204 @@ describe('WorkerHealthIndicator', () => {
Object.keys(MessageQueue).length,
);
});
it('should return down status when failure rate exceeds threshold', async () => {
mockQueueInstance.getWorkers.mockResolvedValue([{ id: 'worker1' }]);
mockQueueInstance.getMetrics.mockImplementation((type) => {
if (type === 'failed') {
return Promise.resolve({ count: 600 });
}
if (type === 'completed') {
return Promise.resolve({ count: 400 });
}
return Promise.resolve({ count: 0 });
});
const result = await service.isHealthy();
expect(result.worker.status).toBe('up');
expect('queues' in result.worker).toBe(true);
if ('queues' in result.worker) {
expect(result.worker.queues[0].status).toBe('down');
expect(result.worker.queues[0].metrics).toEqual({
failed: 600,
completed: 400,
waiting: 0,
active: 0,
delayed: 0,
failureRate: 60,
});
}
});
it('should return complete metrics for active workers', async () => {
mockQueueInstance.getWorkers.mockResolvedValue([{ id: 'worker1' }]);
mockQueueInstance.getMetrics.mockImplementation((type) => {
if (type === 'failed') {
return Promise.resolve({ count: 10 });
}
if (type === 'completed') {
return Promise.resolve({ count: 90 });
}
return Promise.resolve({ count: 0 });
});
mockQueueInstance.getWaitingCount.mockResolvedValue(5);
mockQueueInstance.getActiveCount.mockResolvedValue(2);
mockQueueInstance.getDelayedCount.mockResolvedValue(1);
const result = await service.isHealthy();
expect(result.worker.status).toBe('up');
expect('queues' in result.worker).toBe(true);
if ('queues' in result.worker) {
expect(result.worker.queues[0].metrics).toEqual({
failed: 10,
completed: 90,
waiting: 5,
active: 2,
delayed: 1,
failureRate: 10,
});
}
});
it('should handle queue errors gracefully', async () => {
mockQueueInstance.getWorkers.mockRejectedValue(new Error('Queue error'));
mockQueueInstance.getMetrics.mockRejectedValue(new Error('Queue error'));
mockQueueInstance.getWaitingCount.mockRejectedValue(
new Error('Queue error'),
);
mockQueueInstance.getActiveCount.mockRejectedValue(
new Error('Queue error'),
);
mockQueueInstance.getDelayedCount.mockRejectedValue(
new Error('Queue error'),
);
const result = await service.isHealthy();
expect(result.worker.status).toBe('down');
expect('error' in result.worker).toBe(true);
if ('error' in result.worker) {
expect(result.worker.error).toBe(HEALTH_ERROR_MESSAGES.NO_ACTIVE_WORKERS);
}
expect(loggerSpy).toHaveBeenCalled();
Object.values(MessageQueue).forEach((queueName) => {
expect(loggerSpy).toHaveBeenCalledWith(
`Error getting queue details for ${queueName}: Queue error`,
);
expect(loggerSpy).toHaveBeenCalledWith(
`Error checking worker for queue ${queueName}: Queue error`,
);
});
});
describe('getQueueDetails', () => {
beforeEach(() => {
// Reset mocks to clean state before each test in this describe block
mockQueueInstance.getWorkers.mockResolvedValue([{ id: 'worker1' }]);
mockQueueInstance.getMetrics.mockResolvedValue({ count: 0, data: [] });
});
it('should return metrics with time series data when pointsNeeded is provided', async () => {
const pointsNeeded = 60;
mockQueueInstance.getMetrics.mockImplementation((type) => {
if (type === 'failed') {
return Promise.resolve({
count: 10,
data: Array(pointsNeeded).fill(10 / pointsNeeded),
});
}
if (type === 'completed') {
return Promise.resolve({
count: 90,
data: Array(pointsNeeded).fill(90 / pointsNeeded),
});
}
return Promise.resolve({ count: 0, data: [] });
});
const result = await service.getQueueDetails(
MessageQueue.messagingQueue,
{
pointsNeeded,
},
);
expect(result).toBeDefined();
expect(result?.metrics).toMatchObject({
failed: 10,
completed: 90,
failedData: expect.any(Array),
completedData: expect.any(Array),
});
expect(result?.metrics.failedData).toHaveLength(pointsNeeded);
expect(result?.metrics.completedData).toHaveLength(pointsNeeded);
});
it('should handle invalid metrics data gracefully', async () => {
const invalidData = ['invalid', null, undefined, '1', 2];
mockQueueInstance.getMetrics.mockResolvedValue({
count: 0,
data: invalidData,
});
const result = await service.getQueueDetails(
MessageQueue.messagingQueue,
{
pointsNeeded: 5,
},
);
expect(result).toBeDefined();
expect(result?.metrics.failedData).toEqual([NaN, 0, NaN, 1, 2]);
expect(result?.metrics.completedData).toEqual([NaN, 0, NaN, 1, 2]);
});
it('should calculate correct failure rate with time series data', async () => {
mockQueueInstance.getMetrics.mockImplementation((type) => {
if (type === 'failed') {
return Promise.resolve({ count: 600, data: Array(10).fill(60) });
}
if (type === 'completed') {
return Promise.resolve({ count: 400, data: Array(10).fill(40) });
}
return Promise.resolve({ count: 0, data: [] });
});
const result = await service.getQueueDetails(
MessageQueue.messagingQueue,
{
pointsNeeded: 10,
},
);
expect(result).toBeDefined();
expect(result?.metrics).toMatchObject({
failed: 600,
completed: 400,
failureRate: 60,
});
});
it('should handle queue errors gracefully', async () => {
mockQueueInstance.getWorkers.mockRejectedValue(new Error('Queue error'));
mockQueueInstance.getMetrics.mockRejectedValue(new Error('Queue error'));
await expect(
service.getQueueDetails(MessageQueue.messagingQueue),
).rejects.toThrow('Queue error');
expect(loggerSpy).toHaveBeenCalledWith(
`Error getting queue details for ${MessageQueue.messagingQueue}: Queue error`,
);
});
});
});

View File

@ -1,9 +1,13 @@
import { Injectable } from '@nestjs/common';
import { HealthIndicatorService } from '@nestjs/terminus';
import { Injectable, Logger } from '@nestjs/common';
import {
HealthIndicatorResult,
HealthIndicatorService,
} from '@nestjs/terminus';
import { Queue } from 'bullmq';
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { METRICS_FAILURE_RATE_THRESHOLD } from 'src/engine/core-modules/health/constants/metrics-failure-rate-threshold.const';
import { WorkerQueueHealth } from 'src/engine/core-modules/health/types/worker-queue-health.type';
import { withHealthCheckTimeout } from 'src/engine/core-modules/health/utils/health-check-timeout.util';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
@ -11,12 +15,14 @@ import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-c
@Injectable()
export class WorkerHealthIndicator {
private readonly logger = new Logger(WorkerHealthIndicator.name);
constructor(
private readonly redisClient: RedisClientService,
private readonly healthIndicatorService: HealthIndicatorService,
) {}
async isHealthy() {
async isHealthy(): Promise<HealthIndicatorResult> {
const indicator = this.healthIndicatorService.check('worker');
try {
@ -42,51 +48,106 @@ export class WorkerHealthIndicator {
}
}
private async checkWorkers() {
async getQueueDetails(
queueName: MessageQueue,
options?: {
pointsNeeded?: number;
},
): Promise<WorkerQueueHealth | null> {
const redis = this.redisClient.getClient();
const queue = new Queue(queueName, { connection: redis });
try {
const workers = await queue.getWorkers();
if (workers.length > 0) {
const metricsParams = options?.pointsNeeded
? [0, options.pointsNeeded - 1]
: [];
const [
failedMetrics,
completedMetrics,
waitingCount,
activeCount,
delayedCount,
] = await Promise.all([
queue.getMetrics('failed', ...metricsParams),
queue.getMetrics('completed', ...metricsParams),
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getDelayedCount(),
]);
const failedCount = options?.pointsNeeded
? this.calculateMetricsSum(failedMetrics.data)
: failedMetrics.count;
const completedCount = options?.pointsNeeded
? this.calculateMetricsSum(completedMetrics.data)
: completedMetrics.count;
const totalJobs = failedCount + completedCount;
const failureRate =
totalJobs > 0
? Number(((failedCount / totalJobs) * 100).toFixed(1))
: 0;
return {
queueName,
workers: workers.length,
status: failureRate > METRICS_FAILURE_RATE_THRESHOLD ? 'down' : 'up',
metrics: {
failed: failedCount,
completed: completedCount,
waiting: waitingCount,
active: activeCount,
delayed: delayedCount,
failureRate,
...(options?.pointsNeeded && {
failedData: failedMetrics.data.map(Number),
completedData: completedMetrics.data.map(Number),
}),
},
};
}
return null;
} catch (error) {
this.logger.error(
`Error getting queue details for ${queueName}: ${error.message}`,
);
throw error;
} finally {
await queue.close();
}
}
private calculateMetricsSum(data: string[] | number[]): number {
const sum = data.reduce((sum: number, value: string | number) => {
const numericValue = Number(value);
return sum + (isNaN(numericValue) ? 0 : numericValue);
}, 0);
return Math.round(Number(sum));
}
private async checkWorkers() {
const queues = Object.values(MessageQueue);
const queueStatuses: WorkerQueueHealth[] = [];
for (const queueName of queues) {
const queue = new Queue(queueName, { connection: redis });
try {
const workers = await queue.getWorkers();
const queueDetails = await this.getQueueDetails(queueName);
if (workers.length > 0) {
const [
failedCount,
completedCount,
waitingCount,
activeCount,
delayedCount,
prioritizedCount,
] = await Promise.all([
queue.getFailedCount(),
queue.getCompletedCount(),
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getDelayedCount(),
queue.getPrioritizedCount(),
]);
queueStatuses.push({
queueName: queueName,
workers: workers.length,
metrics: {
failed: failedCount,
completed: completedCount,
waiting: waitingCount,
active: activeCount,
delayed: delayedCount,
prioritized: prioritizedCount,
},
});
if (queueDetails) {
queueStatuses.push(queueDetails);
}
await queue.close();
} catch (error) {
await queue.close();
this.logger.error(
`Error checking worker for queue ${queueName}: ${error.message}`,
);
}
}

View File

@ -7,6 +7,9 @@ export class WorkerQueueHealth {
@Field(() => String)
queueName: string;
@Field(() => String)
status: string;
@Field(() => Number)
workers: number;

View File

@ -18,5 +18,11 @@ export class WorkerQueueMetrics {
delayed: number;
@Field(() => Number)
prioritized: number;
failureRate: number;
@Field(() => [Number], { nullable: true })
failedData?: number[];
@Field(() => [Number], { nullable: true })
completedData?: number[];
}