fix redis concurrency issue in health metrics + remove ongoing status count (#10717)

### Context
For calendar and message sync job health monitoring, we used to
increment a counter in redis cache which could lead to concurrency
issue.

### Solution
- Update to a set structure in place of counter + use sAdd redis method
which is atomic
- Each minute another counter was incremented on a new cache key ->
Update to a 15s window
- Remove ONGOING status not needed. We only need status at job end (or
fail).


###  Potential improvements
- Check for cache key existence before fetching data to avoid useless
call to redis ?

closes https://github.com/twentyhq/twenty/issues/10070
This commit is contained in:
Etienne
2025-03-07 14:41:46 +01:00
committed by GitHub
parent 02a085df4f
commit 96035f0ccf
11 changed files with 203 additions and 251 deletions

View File

@ -34,8 +34,8 @@ export const SettingsAdminHealthAccountSyncCountersTable = ({
<TableCell align="right">{details.counters.NOT_SYNCED}</TableCell> <TableCell align="right">{details.counters.NOT_SYNCED}</TableCell>
</TableRow> </TableRow>
<TableRow> <TableRow>
<TableCell>Sync Ongoing</TableCell> <TableCell>Active Sync</TableCell>
<TableCell align="right">{details.counters.ONGOING}</TableCell> <TableCell align="right">{details.counters.ACTIVE}</TableCell>
</TableRow> </TableRow>
<TableRow> <TableRow>
<TableCell>Total Jobs</TableCell> <TableCell>Total Jobs</TableCell>

View File

@ -25,25 +25,42 @@ export class CacheStorageService {
return this.cache.del(`${this.namespace}:${key}`); return this.cache.del(`${this.namespace}:${key}`);
} }
async setAdd(key: string, value: string[]) { async setAdd(key: string, value: string[], ttl?: number) {
if (value.length === 0) { if (value.length === 0) {
return; return;
} }
if (this.isRedisCache()) { if (this.isRedisCache()) {
return (this.cache as RedisCache).store.client.sAdd( await (this.cache as RedisCache).store.client.sAdd(
`${this.namespace}:${key}`, `${this.namespace}:${key}`,
value, value,
); );
if (ttl) {
await (this.cache as RedisCache).store.client.expire(
`${this.namespace}:${key}`,
ttl / 1000,
);
}
return;
} }
this.get(key).then((res: string[]) => { this.get(key).then((res: string[]) => {
if (res) { if (res) {
this.set(key, [...res, ...value]); this.set(key, [...res, ...value], ttl);
} else { } else {
this.set(key, value); this.set(key, value, ttl);
} }
}); });
} }
async countAllSetMembers(cacheKeys: string[]) {
return (
await Promise.all(cacheKeys.map((key) => this.getSetLength(key) || 0))
).reduce((acc, setLength) => acc + setLength, 0);
}
async setPop(key: string, size = 1) { async setPop(key: string, size = 1) {
if (this.isRedisCache()) { if (this.isRedisCache()) {
return (this.cache as RedisCache).store.client.sPop( return (this.cache as RedisCache).store.client.sPop(
@ -63,6 +80,18 @@ export class CacheStorageService {
}); });
} }
async getSetLength(key: string) {
if (this.isRedisCache()) {
return await (this.cache as RedisCache).store.client.sCard(
`${this.namespace}:${key}`,
);
}
return this.get(key).then((res: string[]) => {
return res.length;
});
}
async flush() { async flush() {
return this.cache.reset(); return this.cache.reset();
} }

View File

@ -26,7 +26,7 @@ export class CaptchaGuard implements CanActivate {
if (result.success) { if (result.success) {
return true; return true;
} else { } else {
await this.healthCacheService.incrementInvalidCaptchaCounter(); await this.healthCacheService.updateInvalidCaptchaCache(token);
throw new BadRequestException( throw new BadRequestException(
'Invalid Captcha, please try another device', 'Invalid Captcha, please try another device',

View File

@ -959,7 +959,7 @@ export class EnvironmentVariables {
@IsNumber() @IsNumber()
@CastToPositiveNumber() @CastToPositiveNumber()
@IsOptional() @IsOptional()
HEALTH_MONITORING_TIME_WINDOW_IN_MINUTES = 5; HEALTH_METRICS_TIME_WINDOW_IN_MINUTES = 5;
@EnvironmentVariablesMetadata({ @EnvironmentVariablesMetadata({
group: EnvironmentVariablesGroup.Other, group: EnvironmentVariablesGroup.Other,

View File

@ -1,14 +1,17 @@
import { Controller, Get } from '@nestjs/common'; import { Controller, Get } from '@nestjs/common';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service'; import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
@Controller('metricsz') @Controller('metrics')
export class MetricsController { export class MetricsController {
constructor(private readonly healthCacheService: HealthCacheService) {} constructor(private readonly healthCacheService: HealthCacheService) {}
@Get('/message-channel-sync-job-by-status-counter') @Get('/message-channel-sync-job-by-status-counter')
getMessageChannelSyncJobByStatusCounter() { getMessageChannelSyncJobByStatusCounter() {
return this.healthCacheService.getMessageChannelSyncJobByStatusCounter(); return this.healthCacheService.countChannelSyncJobByStatus(
HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
);
} }
@Get('/invalid-captcha-counter') @Get('/invalid-captcha-counter')
@ -18,6 +21,8 @@ export class MetricsController {
@Get('/calendar-channel-sync-job-by-status-counter') @Get('/calendar-channel-sync-job-by-status-counter')
getCalendarChannelSyncJobByStatusCounter() { getCalendarChannelSyncJobByStatusCounter() {
return this.healthCacheService.getCalendarChannelSyncJobByStatusCounter(); return this.healthCacheService.countChannelSyncJobByStatus(
HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
);
} }
} }

View File

@ -9,9 +9,11 @@ import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/hea
import { CalendarChannelSyncStatus } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; import { CalendarChannelSyncStatus } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { MessageChannelSyncStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessageChannelSyncStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
const CACHE_BUCKET_DURATION_MS = 15000; // 15 seconds window for each cache bucket
@Injectable() @Injectable()
export class HealthCacheService { export class HealthCacheService {
private readonly healthMonitoringTimeWindowInMinutes: number; private readonly healthMetricsTimeWindowInMinutes: number;
private readonly healthCacheTtl: number; private readonly healthCacheTtl: number;
constructor( constructor(
@ -19,168 +21,119 @@ export class HealthCacheService {
private readonly cacheStorage: CacheStorageService, private readonly cacheStorage: CacheStorageService,
private readonly environmentService: EnvironmentService, private readonly environmentService: EnvironmentService,
) { ) {
this.healthMonitoringTimeWindowInMinutes = this.environmentService.get( this.healthMetricsTimeWindowInMinutes = this.environmentService.get(
'HEALTH_MONITORING_TIME_WINDOW_IN_MINUTES', 'HEALTH_METRICS_TIME_WINDOW_IN_MINUTES',
);
this.healthCacheTtl = this.healthMetricsTimeWindowInMinutes * 60000 * 2;
}
private getCacheBucketStartTimestamp(timestamp: number): number {
return (
Math.floor(timestamp / CACHE_BUCKET_DURATION_MS) *
CACHE_BUCKET_DURATION_MS
); );
this.healthCacheTtl = this.healthMonitoringTimeWindowInMinutes * 60000 * 2;
} }
private getCacheKeyWithTimestamp(key: string, timestamp?: number): string { private getCacheKeyWithTimestamp(key: string, timestamp?: number): string {
const minuteTimestamp = timestamp ?? Math.floor(Date.now() / 60000) * 60000; const currentIntervalTimestamp =
timestamp ?? this.getCacheBucketStartTimestamp(Date.now());
return `${key}:${minuteTimestamp}`; return `${key}:${currentIntervalTimestamp}`;
} }
private getLastXMinutesTimestamps(minutes: number): number[] { private getLastCacheBucketStartTimestampsFromDate(
const currentMinuteTimestamp = Math.floor(Date.now() / 60000) * 60000; cacheBucketsCount: number,
date: number = Date.now(),
): number[] {
const currentIntervalTimestamp = this.getCacheBucketStartTimestamp(date);
return Array.from( return Array.from(
{ length: minutes }, { length: cacheBucketsCount },
(_, i) => currentMinuteTimestamp - i * 60000, (_, i) => currentIntervalTimestamp - i * CACHE_BUCKET_DURATION_MS,
); );
} }
async incrementMessageChannelSyncJobByStatusCounter( async updateMessageOrCalendarChannelSyncJobByStatusCache(
status: MessageChannelSyncStatus, key: HealthCounterCacheKeys,
increment: number, status: MessageChannelSyncStatus | CalendarChannelSyncStatus,
messageChannelIds: string[],
) { ) {
const cacheKey = this.getCacheKeyWithTimestamp( return await this.cacheStorage.setAdd(
HealthCounterCacheKeys.MessageChannelSyncJobByStatus, this.getCacheKeyWithTimestamp(`${key}:${status}`),
); messageChannelIds,
const currentCounter =
await this.cacheStorage.get<AccountSyncJobByStatusCounter>(cacheKey);
const updatedCounter = {
...(currentCounter || {}),
[status]: (currentCounter?.[status] || 0) + increment,
};
return await this.cacheStorage.set(
cacheKey,
updatedCounter,
this.healthCacheTtl, this.healthCacheTtl,
); );
} }
async getMessageChannelSyncJobByStatusCounter() { async countChannelSyncJobByStatus(
const cacheKeys = this.getLastXMinutesTimestamps( key:
this.healthMonitoringTimeWindowInMinutes, | HealthCounterCacheKeys.MessageChannelSyncJobByStatus
).map((timestamp) => | HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
this.getCacheKeyWithTimestamp( timeWindowInSeconds: number = this.healthMetricsTimeWindowInMinutes * 60,
HealthCounterCacheKeys.MessageChannelSyncJobByStatus, ): Promise<AccountSyncJobByStatusCounter> {
timestamp, if ((timeWindowInSeconds * 1000) % CACHE_BUCKET_DURATION_MS !== 0) {
), throw new Error(
); `Time window must be divisible by ${CACHE_BUCKET_DURATION_MS}`,
);
const aggregatedCounter = Object.fromEntries(
Object.values(MessageChannelSyncStatus).map((status) => [status, 0]),
);
for (const key of cacheKeys) {
const counter =
await this.cacheStorage.get<AccountSyncJobByStatusCounter>(key);
if (!counter) continue;
for (const [status, count] of Object.entries(counter) as [
MessageChannelSyncStatus,
number,
][]) {
aggregatedCounter[status] += count;
}
} }
return aggregatedCounter; const now = Date.now();
const countByStatus = {} as AccountSyncJobByStatusCounter;
const statuses =
key === HealthCounterCacheKeys.MessageChannelSyncJobByStatus
? Object.values(MessageChannelSyncStatus).filter(
(status) => status !== MessageChannelSyncStatus.ONGOING,
)
: Object.values(CalendarChannelSyncStatus).filter(
(status) => status !== CalendarChannelSyncStatus.ONGOING,
);
const cacheBuckets =
timeWindowInSeconds / (CACHE_BUCKET_DURATION_MS / 1000);
for (const status of statuses) {
const cacheKeys = this.computeTimeStampedCacheKeys(
`${key}:${status}`,
cacheBuckets,
now,
);
const channelIdsCount =
await this.cacheStorage.countAllSetMembers(cacheKeys);
countByStatus[status] = channelIdsCount;
}
return countByStatus;
} }
async incrementInvalidCaptchaCounter() { computeTimeStampedCacheKeys(
const cacheKey = this.getCacheKeyWithTimestamp( key: string,
HealthCounterCacheKeys.InvalidCaptcha, cacheBucketsCount: number,
); date: number = Date.now(),
) {
return this.getLastCacheBucketStartTimestampsFromDate(
cacheBucketsCount,
date,
).map((timestamp) => this.getCacheKeyWithTimestamp(key, timestamp));
}
const currentCounter = await this.cacheStorage.get<number>(cacheKey); async updateInvalidCaptchaCache(captchaToken: string) {
const updatedCounter = (currentCounter || 0) + 1; return await this.cacheStorage.setAdd(
this.getCacheKeyWithTimestamp(HealthCounterCacheKeys.InvalidCaptcha),
return await this.cacheStorage.set( [captchaToken],
cacheKey,
updatedCounter,
this.healthCacheTtl, this.healthCacheTtl,
); );
} }
async getInvalidCaptchaCounter() { async getInvalidCaptchaCounter(
const cacheKeys = this.getLastXMinutesTimestamps( timeWindowInSeconds: number = this.healthMetricsTimeWindowInMinutes * 60,
this.healthMonitoringTimeWindowInMinutes, ) {
).map((timestamp) => return await this.cacheStorage.countAllSetMembers(
this.getCacheKeyWithTimestamp( this.computeTimeStampedCacheKeys(
HealthCounterCacheKeys.InvalidCaptcha, HealthCounterCacheKeys.InvalidCaptcha,
timestamp, timeWindowInSeconds / (CACHE_BUCKET_DURATION_MS / 1000),
), ),
); );
let aggregatedCounter = 0;
for (const key of cacheKeys) {
const counter = await this.cacheStorage.get<number>(key);
aggregatedCounter += counter || 0;
}
return aggregatedCounter;
}
async incrementCalendarChannelSyncJobByStatusCounter(
status: CalendarChannelSyncStatus,
increment: number,
) {
const cacheKey = this.getCacheKeyWithTimestamp(
HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
);
const currentCounter =
await this.cacheStorage.get<AccountSyncJobByStatusCounter>(cacheKey);
const updatedCounter = {
...(currentCounter || {}),
[status]: (currentCounter?.[status] || 0) + increment,
};
return await this.cacheStorage.set(
cacheKey,
updatedCounter,
this.healthCacheTtl,
);
}
async getCalendarChannelSyncJobByStatusCounter() {
const cacheKeys = this.getLastXMinutesTimestamps(
this.healthMonitoringTimeWindowInMinutes,
).map((timestamp) =>
this.getCacheKeyWithTimestamp(
HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
timestamp,
),
);
const aggregatedCounter = Object.fromEntries(
Object.values(CalendarChannelSyncStatus).map((status) => [status, 0]),
);
for (const key of cacheKeys) {
const counter =
await this.cacheStorage.get<AccountSyncJobByStatusCounter>(key);
if (!counter) continue;
for (const [status, count] of Object.entries(counter) as [
CalendarChannelSyncStatus,
number,
][]) {
aggregatedCounter[status] += count;
}
}
return aggregatedCounter;
} }
} }

View File

@ -16,8 +16,7 @@ describe('ConnectedAccountHealth', () => {
beforeEach(async () => { beforeEach(async () => {
healthCacheService = { healthCacheService = {
getMessageChannelSyncJobByStatusCounter: jest.fn(), countChannelSyncJobByStatus: jest.fn(),
getCalendarChannelSyncJobByStatusCounter: jest.fn(),
} as any; } as any;
healthIndicatorService = { healthIndicatorService = {
@ -65,22 +64,17 @@ describe('ConnectedAccountHealth', () => {
describe('message sync health', () => { describe('message sync health', () => {
it('should return up status when no message sync jobs are present', async () => { it('should return up status when no message sync jobs are present', async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue( healthCacheService.countChannelSyncJobByStatus
{ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0, [MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ONGOING]: 0,
[MessageChannelSyncStatus.ACTIVE]: 0, [MessageChannelSyncStatus.ACTIVE]: 0,
[MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 0, [MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 0,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 0, [MessageChannelSyncStatus.FAILED_UNKNOWN]: 0,
}, })
); .mockResolvedValueOnce({
healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
{
[CalendarChannelSyncStatus.NOT_SYNCED]: 0, [CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 0, [CalendarChannelSyncStatus.ACTIVE]: 0,
}, });
);
const result = await service.isHealthy(); const result = await service.isHealthy();
@ -98,22 +92,17 @@ describe('ConnectedAccountHealth', () => {
}); });
it(`should return down status when message sync failure rate is above ${METRICS_FAILURE_RATE_THRESHOLD}%`, async () => { it(`should return down status when message sync failure rate is above ${METRICS_FAILURE_RATE_THRESHOLD}%`, async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue( healthCacheService.countChannelSyncJobByStatus
{ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0, [MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ONGOING]: 1,
[MessageChannelSyncStatus.ACTIVE]: 1, [MessageChannelSyncStatus.ACTIVE]: 1,
[MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 2, [MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 2,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 2, [MessageChannelSyncStatus.FAILED_UNKNOWN]: 2,
}, })
); .mockResolvedValueOnce({
healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
{
[CalendarChannelSyncStatus.NOT_SYNCED]: 0, [CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 1, [CalendarChannelSyncStatus.ACTIVE]: 1,
}, });
);
const result = await service.isHealthy(); const result = await service.isHealthy();
@ -127,28 +116,23 @@ describe('ConnectedAccountHealth', () => {
); );
expect( expect(
result.connectedAccount.details.messageSync.details.failureRate, result.connectedAccount.details.messageSync.details.failureRate,
).toBe(33.33); ).toBe(40);
}); });
}); });
describe('calendar sync health', () => { describe('calendar sync health', () => {
it('should return up status when no calendar sync jobs are present', async () => { it('should return up status when no calendar sync jobs are present', async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue( healthCacheService.countChannelSyncJobByStatus
{ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0, [MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 0, [MessageChannelSyncStatus.ACTIVE]: 0,
}, })
); .mockResolvedValueOnce({
healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
{
[CalendarChannelSyncStatus.NOT_SYNCED]: 0, [CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ONGOING]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 0, [CalendarChannelSyncStatus.ACTIVE]: 0,
[CalendarChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 0, [CalendarChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 0,
[CalendarChannelSyncStatus.FAILED_UNKNOWN]: 0, [CalendarChannelSyncStatus.FAILED_UNKNOWN]: 0,
}, });
);
const result = await service.isHealthy(); const result = await service.isHealthy();
@ -166,22 +150,17 @@ describe('ConnectedAccountHealth', () => {
}); });
it(`should return down status when calendar sync failure rate is above ${METRICS_FAILURE_RATE_THRESHOLD}%`, async () => { it(`should return down status when calendar sync failure rate is above ${METRICS_FAILURE_RATE_THRESHOLD}%`, async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue( healthCacheService.countChannelSyncJobByStatus
{ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0, [MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 1, [MessageChannelSyncStatus.ACTIVE]: 1,
}, })
); .mockResolvedValueOnce({
healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
{
[CalendarChannelSyncStatus.NOT_SYNCED]: 0, [CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ONGOING]: 1,
[CalendarChannelSyncStatus.ACTIVE]: 1, [CalendarChannelSyncStatus.ACTIVE]: 1,
[CalendarChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 2, [CalendarChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 2,
[CalendarChannelSyncStatus.FAILED_UNKNOWN]: 2, [CalendarChannelSyncStatus.FAILED_UNKNOWN]: 2,
}, });
);
const result = await service.isHealthy(); const result = await service.isHealthy();
@ -195,25 +174,22 @@ describe('ConnectedAccountHealth', () => {
); );
expect( expect(
result.connectedAccount.details.calendarSync.details.failureRate, result.connectedAccount.details.calendarSync.details.failureRate,
).toBe(33.33); ).toBe(40);
}); });
}); });
describe('timeout handling', () => { describe('timeout handling', () => {
it('should handle message sync timeout', async () => { it('should handle message sync timeout', async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockImplementationOnce( healthCacheService.countChannelSyncJobByStatus
() => .mockResolvedValueOnce(
new Promise((resolve) => new Promise((resolve) =>
setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100), setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100),
), ),
); )
.mockResolvedValueOnce({
healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
{
[CalendarChannelSyncStatus.NOT_SYNCED]: 0, [CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 1, [CalendarChannelSyncStatus.ACTIVE]: 1,
}, });
);
const healthCheckPromise = service.isHealthy(); const healthCheckPromise = service.isHealthy();
@ -231,19 +207,16 @@ describe('ConnectedAccountHealth', () => {
}); });
it('should handle calendar sync timeout', async () => { it('should handle calendar sync timeout', async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue( healthCacheService.countChannelSyncJobByStatus
{ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0, [MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 1, [MessageChannelSyncStatus.ACTIVE]: 1,
}, })
); .mockResolvedValueOnce(
healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockImplementationOnce(
() =>
new Promise((resolve) => new Promise((resolve) =>
setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100), setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100),
), ),
); );
const healthCheckPromise = service.isHealthy(); const healthCheckPromise = service.isHealthy();
@ -263,21 +236,17 @@ describe('ConnectedAccountHealth', () => {
describe('combined health check', () => { describe('combined health check', () => {
it('should return combined status with both checks healthy', async () => { it('should return combined status with both checks healthy', async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue( healthCacheService.countChannelSyncJobByStatus
{ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0, [MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 8, [MessageChannelSyncStatus.ACTIVE]: 8,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 1, [MessageChannelSyncStatus.FAILED_UNKNOWN]: 1,
}, })
); .mockResolvedValueOnce({
healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
{
[CalendarChannelSyncStatus.NOT_SYNCED]: 0, [CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 8, [CalendarChannelSyncStatus.ACTIVE]: 8,
[CalendarChannelSyncStatus.FAILED_UNKNOWN]: 1, [CalendarChannelSyncStatus.FAILED_UNKNOWN]: 1,
}, });
);
const result = await service.isHealthy(); const result = await service.isHealthy();
@ -287,21 +256,17 @@ describe('ConnectedAccountHealth', () => {
}); });
it('should return down status when both syncs fail', async () => { it('should return down status when both syncs fail', async () => {
healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue( healthCacheService.countChannelSyncJobByStatus
{ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0, [MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 1, [MessageChannelSyncStatus.ACTIVE]: 1,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 2, [MessageChannelSyncStatus.FAILED_UNKNOWN]: 2,
}, })
); .mockResolvedValueOnce({
healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
{
[CalendarChannelSyncStatus.NOT_SYNCED]: 0, [CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 1, [CalendarChannelSyncStatus.ACTIVE]: 1,
[CalendarChannelSyncStatus.FAILED_UNKNOWN]: 2, [CalendarChannelSyncStatus.FAILED_UNKNOWN]: 2,
}, });
);
const result = await service.isHealthy(); const result = await service.isHealthy();

View File

@ -7,6 +7,7 @@ import {
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants'; import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { METRICS_FAILURE_RATE_THRESHOLD } from 'src/engine/core-modules/health/constants/metrics-failure-rate-threshold.const'; import { METRICS_FAILURE_RATE_THRESHOLD } from 'src/engine/core-modules/health/constants/metrics-failure-rate-threshold.const';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service'; import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
import { withHealthCheckTimeout } from 'src/engine/core-modules/health/utils/health-check-timeout.util'; import { withHealthCheckTimeout } from 'src/engine/core-modules/health/utils/health-check-timeout.util';
@Injectable() @Injectable()
@ -21,7 +22,9 @@ export class ConnectedAccountHealth {
try { try {
const counters = await withHealthCheckTimeout( const counters = await withHealthCheckTimeout(
this.healthCacheService.getMessageChannelSyncJobByStatusCounter(), this.healthCacheService.countChannelSyncJobByStatus(
HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
),
HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_TIMEOUT, HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_TIMEOUT,
); );
@ -70,7 +73,9 @@ export class ConnectedAccountHealth {
try { try {
const counters = await withHealthCheckTimeout( const counters = await withHealthCheckTimeout(
this.healthCacheService.getCalendarChannelSyncJobByStatusCounter(), this.healthCacheService.countChannelSyncJobByStatus(
HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
),
HEALTH_ERROR_MESSAGES.CALENDAR_SYNC_TIMEOUT, HEALTH_ERROR_MESSAGES.CALENDAR_SYNC_TIMEOUT,
); );

View File

@ -5,9 +5,6 @@ export class AccountSyncJobByStatusCounter {
@Field(() => Number, { nullable: true }) @Field(() => Number, { nullable: true })
NOT_SYNCED?: number; NOT_SYNCED?: number;
@Field(() => Number, { nullable: true })
ONGOING?: number;
@Field(() => Number, { nullable: true }) @Field(() => Number, { nullable: true })
ACTIVE?: number; ACTIVE?: number;

View File

@ -6,6 +6,7 @@ import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decora
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service'; import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { import {
CalendarChannelSyncStage, CalendarChannelSyncStage,
@ -79,11 +80,6 @@ export class CalendarChannelSyncStatusService {
syncStatus: CalendarChannelSyncStatus.ONGOING, syncStatus: CalendarChannelSyncStatus.ONGOING,
syncStageStartedAt: new Date().toISOString(), syncStageStartedAt: new Date().toISOString(),
}); });
await this.healthCacheService.incrementCalendarChannelSyncJobByStatusCounter(
CalendarChannelSyncStatus.ONGOING,
calendarChannelIds.length,
);
} }
public async resetAndScheduleFullCalendarEventListFetch( public async resetAndScheduleFullCalendarEventListFetch(
@ -183,9 +179,10 @@ export class CalendarChannelSyncStatusService {
await this.schedulePartialCalendarEventListFetch(calendarChannelIds); await this.schedulePartialCalendarEventListFetch(calendarChannelIds);
await this.healthCacheService.incrementCalendarChannelSyncJobByStatusCounter( await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
CalendarChannelSyncStatus.ACTIVE, CalendarChannelSyncStatus.ACTIVE,
calendarChannelIds.length, calendarChannelIds,
); );
} }
@ -213,9 +210,10 @@ export class CalendarChannelSyncStatusService {
syncStage: CalendarChannelSyncStage.FAILED, syncStage: CalendarChannelSyncStage.FAILED,
}); });
await this.healthCacheService.incrementCalendarChannelSyncJobByStatusCounter( await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
CalendarChannelSyncStatus.FAILED_UNKNOWN, CalendarChannelSyncStatus.FAILED_UNKNOWN,
calendarChannelIds.length, calendarChannelIds,
); );
} }
@ -268,9 +266,10 @@ export class CalendarChannelSyncStatusService {
workspaceId, workspaceId,
); );
await this.healthCacheService.incrementCalendarChannelSyncJobByStatusCounter( await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
CalendarChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, CalendarChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
calendarChannelIds.length, calendarChannelIds,
); );
} }

View File

@ -6,6 +6,7 @@ import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decora
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service'; import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@ -129,11 +130,6 @@ export class MessageChannelSyncStatusService {
syncStatus: MessageChannelSyncStatus.ONGOING, syncStatus: MessageChannelSyncStatus.ONGOING,
syncStageStartedAt: new Date().toISOString(), syncStageStartedAt: new Date().toISOString(),
}); });
await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter(
MessageChannelSyncStatus.ONGOING,
messageChannelIds.length,
);
} }
public async markAsCompletedAndSchedulePartialMessageListFetch( public async markAsCompletedAndSchedulePartialMessageListFetch(
@ -156,9 +152,10 @@ export class MessageChannelSyncStatusService {
syncedAt: new Date().toISOString(), syncedAt: new Date().toISOString(),
}); });
await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter( await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
MessageChannelSyncStatus.ACTIVE, MessageChannelSyncStatus.ACTIVE,
messageChannelIds.length, messageChannelIds,
); );
} }
@ -202,9 +199,10 @@ export class MessageChannelSyncStatusService {
syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN, syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN,
}); });
await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter( await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
MessageChannelSyncStatus.FAILED_UNKNOWN, MessageChannelSyncStatus.FAILED_UNKNOWN,
messageChannelIds.length, messageChannelIds,
); );
} }
@ -232,9 +230,10 @@ export class MessageChannelSyncStatusService {
syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
}); });
await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter( await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
messageChannelIds.length, messageChannelIds,
); );
const connectedAccountRepository = const connectedAccountRepository =