From 96035f0ccf7f7010dd6e615f3a6bf61c74dc92b4 Mon Sep 17 00:00:00 2001
From: Etienne <45695613+etiennejouan@users.noreply.github.com>
Date: Fri, 7 Mar 2025 14:41:46 +0100
Subject: [PATCH] fix redis concurrency issue in health metrics + remove
ongoing status count (#10717)
### Context
For calendar and message sync job health monitoring, we used to
increment a counter in redis cache which could lead to concurrency
issue.
### Solution
- Update to a set structure in place of counter + use sAdd redis method
which is atomic
- Each minute another counter was incremented on a new cache key ->
Update to a 15s window
- Remove ONGOING status not needed. We only need status at job end (or
fail).
### Potential improvements
- Check for cache key existence before fetching data to avoid useless
call to redis ?
closes https://github.com/twentyhq/twenty/issues/10070
---
...ngsAdminHealthAccountSyncCountersTable.tsx | 4 +-
.../services/cache-storage.service.ts | 37 ++-
.../core-modules/captcha/captcha.guard.ts | 2 +-
.../environment/environment-variables.ts | 2 +-
.../health/controllers/metrics.controller.ts | 11 +-
.../health/health-cache.service.ts | 223 +++++++-----------
.../connected-account.health.spec.ts | 121 ++++------
.../indicators/connected-account.health.ts | 9 +-
.../types/account-sync-metrics.types.ts | 3 -
.../calendar-channel-sync-status.service.ts | 21 +-
.../message-channel-sync-status.service.ts | 21 +-
11 files changed, 203 insertions(+), 251 deletions(-)
diff --git a/packages/twenty-front/src/modules/settings/admin-panel/health-status/components/SettingsAdminHealthAccountSyncCountersTable.tsx b/packages/twenty-front/src/modules/settings/admin-panel/health-status/components/SettingsAdminHealthAccountSyncCountersTable.tsx
index b356c4553..f8831b427 100644
--- a/packages/twenty-front/src/modules/settings/admin-panel/health-status/components/SettingsAdminHealthAccountSyncCountersTable.tsx
+++ b/packages/twenty-front/src/modules/settings/admin-panel/health-status/components/SettingsAdminHealthAccountSyncCountersTable.tsx
@@ -34,8 +34,8 @@ export const SettingsAdminHealthAccountSyncCountersTable = ({
{details.counters.NOT_SYNCED}
- Sync Ongoing
- {details.counters.ONGOING}
+ Active Sync
+ {details.counters.ACTIVE}
Total Jobs
diff --git a/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts b/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts
index 7b132c7fb..9f3f4dfd9 100644
--- a/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts
+++ b/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts
@@ -25,25 +25,42 @@ export class CacheStorageService {
return this.cache.del(`${this.namespace}:${key}`);
}
- async setAdd(key: string, value: string[]) {
+ async setAdd(key: string, value: string[], ttl?: number) {
if (value.length === 0) {
return;
}
+
if (this.isRedisCache()) {
- return (this.cache as RedisCache).store.client.sAdd(
+ await (this.cache as RedisCache).store.client.sAdd(
`${this.namespace}:${key}`,
value,
);
+
+ if (ttl) {
+ await (this.cache as RedisCache).store.client.expire(
+ `${this.namespace}:${key}`,
+ ttl / 1000,
+ );
+ }
+
+ return;
}
+
this.get(key).then((res: string[]) => {
if (res) {
- this.set(key, [...res, ...value]);
+ this.set(key, [...res, ...value], ttl);
} else {
- this.set(key, value);
+ this.set(key, value, ttl);
}
});
}
+ async countAllSetMembers(cacheKeys: string[]) {
+ return (
+ await Promise.all(cacheKeys.map((key) => this.getSetLength(key) || 0))
+ ).reduce((acc, setLength) => acc + setLength, 0);
+ }
+
async setPop(key: string, size = 1) {
if (this.isRedisCache()) {
return (this.cache as RedisCache).store.client.sPop(
@@ -63,6 +80,18 @@ export class CacheStorageService {
});
}
+ async getSetLength(key: string) {
+ if (this.isRedisCache()) {
+ return await (this.cache as RedisCache).store.client.sCard(
+ `${this.namespace}:${key}`,
+ );
+ }
+
+ return this.get(key).then((res: string[]) => {
+ return res.length;
+ });
+ }
+
async flush() {
return this.cache.reset();
}
diff --git a/packages/twenty-server/src/engine/core-modules/captcha/captcha.guard.ts b/packages/twenty-server/src/engine/core-modules/captcha/captcha.guard.ts
index 7653dcf9e..283be0d7b 100644
--- a/packages/twenty-server/src/engine/core-modules/captcha/captcha.guard.ts
+++ b/packages/twenty-server/src/engine/core-modules/captcha/captcha.guard.ts
@@ -26,7 +26,7 @@ export class CaptchaGuard implements CanActivate {
if (result.success) {
return true;
} else {
- await this.healthCacheService.incrementInvalidCaptchaCounter();
+ await this.healthCacheService.updateInvalidCaptchaCache(token);
throw new BadRequestException(
'Invalid Captcha, please try another device',
diff --git a/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts b/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts
index ec1f963d2..186f31351 100644
--- a/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts
+++ b/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts
@@ -959,7 +959,7 @@ export class EnvironmentVariables {
@IsNumber()
@CastToPositiveNumber()
@IsOptional()
- HEALTH_MONITORING_TIME_WINDOW_IN_MINUTES = 5;
+ HEALTH_METRICS_TIME_WINDOW_IN_MINUTES = 5;
@EnvironmentVariablesMetadata({
group: EnvironmentVariablesGroup.Other,
diff --git a/packages/twenty-server/src/engine/core-modules/health/controllers/metrics.controller.ts b/packages/twenty-server/src/engine/core-modules/health/controllers/metrics.controller.ts
index b29db82f4..50d2cb2d3 100644
--- a/packages/twenty-server/src/engine/core-modules/health/controllers/metrics.controller.ts
+++ b/packages/twenty-server/src/engine/core-modules/health/controllers/metrics.controller.ts
@@ -1,14 +1,17 @@
import { Controller, Get } from '@nestjs/common';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
+import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
-@Controller('metricsz')
+@Controller('metrics')
export class MetricsController {
constructor(private readonly healthCacheService: HealthCacheService) {}
@Get('/message-channel-sync-job-by-status-counter')
getMessageChannelSyncJobByStatusCounter() {
- return this.healthCacheService.getMessageChannelSyncJobByStatusCounter();
+ return this.healthCacheService.countChannelSyncJobByStatus(
+ HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
+ );
}
@Get('/invalid-captcha-counter')
@@ -18,6 +21,8 @@ export class MetricsController {
@Get('/calendar-channel-sync-job-by-status-counter')
getCalendarChannelSyncJobByStatusCounter() {
- return this.healthCacheService.getCalendarChannelSyncJobByStatusCounter();
+ return this.healthCacheService.countChannelSyncJobByStatus(
+ HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
+ );
}
}
diff --git a/packages/twenty-server/src/engine/core-modules/health/health-cache.service.ts b/packages/twenty-server/src/engine/core-modules/health/health-cache.service.ts
index 923bbb584..ed8779822 100644
--- a/packages/twenty-server/src/engine/core-modules/health/health-cache.service.ts
+++ b/packages/twenty-server/src/engine/core-modules/health/health-cache.service.ts
@@ -9,9 +9,11 @@ import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/hea
import { CalendarChannelSyncStatus } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { MessageChannelSyncStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
+const CACHE_BUCKET_DURATION_MS = 15000; // 15 seconds window for each cache bucket
+
@Injectable()
export class HealthCacheService {
- private readonly healthMonitoringTimeWindowInMinutes: number;
+ private readonly healthMetricsTimeWindowInMinutes: number;
private readonly healthCacheTtl: number;
constructor(
@@ -19,168 +21,119 @@ export class HealthCacheService {
private readonly cacheStorage: CacheStorageService,
private readonly environmentService: EnvironmentService,
) {
- this.healthMonitoringTimeWindowInMinutes = this.environmentService.get(
- 'HEALTH_MONITORING_TIME_WINDOW_IN_MINUTES',
+ this.healthMetricsTimeWindowInMinutes = this.environmentService.get(
+ 'HEALTH_METRICS_TIME_WINDOW_IN_MINUTES',
+ );
+ this.healthCacheTtl = this.healthMetricsTimeWindowInMinutes * 60000 * 2;
+ }
+
+ private getCacheBucketStartTimestamp(timestamp: number): number {
+ return (
+ Math.floor(timestamp / CACHE_BUCKET_DURATION_MS) *
+ CACHE_BUCKET_DURATION_MS
);
- this.healthCacheTtl = this.healthMonitoringTimeWindowInMinutes * 60000 * 2;
}
private getCacheKeyWithTimestamp(key: string, timestamp?: number): string {
- const minuteTimestamp = timestamp ?? Math.floor(Date.now() / 60000) * 60000;
+ const currentIntervalTimestamp =
+ timestamp ?? this.getCacheBucketStartTimestamp(Date.now());
- return `${key}:${minuteTimestamp}`;
+ return `${key}:${currentIntervalTimestamp}`;
}
- private getLastXMinutesTimestamps(minutes: number): number[] {
- const currentMinuteTimestamp = Math.floor(Date.now() / 60000) * 60000;
+ private getLastCacheBucketStartTimestampsFromDate(
+ cacheBucketsCount: number,
+ date: number = Date.now(),
+ ): number[] {
+ const currentIntervalTimestamp = this.getCacheBucketStartTimestamp(date);
return Array.from(
- { length: minutes },
- (_, i) => currentMinuteTimestamp - i * 60000,
+ { length: cacheBucketsCount },
+ (_, i) => currentIntervalTimestamp - i * CACHE_BUCKET_DURATION_MS,
);
}
- async incrementMessageChannelSyncJobByStatusCounter(
- status: MessageChannelSyncStatus,
- increment: number,
+ async updateMessageOrCalendarChannelSyncJobByStatusCache(
+ key: HealthCounterCacheKeys,
+ status: MessageChannelSyncStatus | CalendarChannelSyncStatus,
+ messageChannelIds: string[],
) {
- const cacheKey = this.getCacheKeyWithTimestamp(
- HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
- );
-
- const currentCounter =
- await this.cacheStorage.get(cacheKey);
-
- const updatedCounter = {
- ...(currentCounter || {}),
- [status]: (currentCounter?.[status] || 0) + increment,
- };
-
- return await this.cacheStorage.set(
- cacheKey,
- updatedCounter,
+ return await this.cacheStorage.setAdd(
+ this.getCacheKeyWithTimestamp(`${key}:${status}`),
+ messageChannelIds,
this.healthCacheTtl,
);
}
- async getMessageChannelSyncJobByStatusCounter() {
- const cacheKeys = this.getLastXMinutesTimestamps(
- this.healthMonitoringTimeWindowInMinutes,
- ).map((timestamp) =>
- this.getCacheKeyWithTimestamp(
- HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
- timestamp,
- ),
- );
-
- const aggregatedCounter = Object.fromEntries(
- Object.values(MessageChannelSyncStatus).map((status) => [status, 0]),
- );
-
- for (const key of cacheKeys) {
- const counter =
- await this.cacheStorage.get(key);
-
- if (!counter) continue;
-
- for (const [status, count] of Object.entries(counter) as [
- MessageChannelSyncStatus,
- number,
- ][]) {
- aggregatedCounter[status] += count;
- }
+ async countChannelSyncJobByStatus(
+ key:
+ | HealthCounterCacheKeys.MessageChannelSyncJobByStatus
+ | HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
+ timeWindowInSeconds: number = this.healthMetricsTimeWindowInMinutes * 60,
+ ): Promise {
+ if ((timeWindowInSeconds * 1000) % CACHE_BUCKET_DURATION_MS !== 0) {
+ throw new Error(
+ `Time window must be divisible by ${CACHE_BUCKET_DURATION_MS}`,
+ );
}
- return aggregatedCounter;
+ const now = Date.now();
+ const countByStatus = {} as AccountSyncJobByStatusCounter;
+ const statuses =
+ key === HealthCounterCacheKeys.MessageChannelSyncJobByStatus
+ ? Object.values(MessageChannelSyncStatus).filter(
+ (status) => status !== MessageChannelSyncStatus.ONGOING,
+ )
+ : Object.values(CalendarChannelSyncStatus).filter(
+ (status) => status !== CalendarChannelSyncStatus.ONGOING,
+ );
+
+ const cacheBuckets =
+ timeWindowInSeconds / (CACHE_BUCKET_DURATION_MS / 1000);
+
+ for (const status of statuses) {
+ const cacheKeys = this.computeTimeStampedCacheKeys(
+ `${key}:${status}`,
+ cacheBuckets,
+ now,
+ );
+
+ const channelIdsCount =
+ await this.cacheStorage.countAllSetMembers(cacheKeys);
+
+ countByStatus[status] = channelIdsCount;
+ }
+
+ return countByStatus;
}
- async incrementInvalidCaptchaCounter() {
- const cacheKey = this.getCacheKeyWithTimestamp(
- HealthCounterCacheKeys.InvalidCaptcha,
- );
+ computeTimeStampedCacheKeys(
+ key: string,
+ cacheBucketsCount: number,
+ date: number = Date.now(),
+ ) {
+ return this.getLastCacheBucketStartTimestampsFromDate(
+ cacheBucketsCount,
+ date,
+ ).map((timestamp) => this.getCacheKeyWithTimestamp(key, timestamp));
+ }
- const currentCounter = await this.cacheStorage.get(cacheKey);
- const updatedCounter = (currentCounter || 0) + 1;
-
- return await this.cacheStorage.set(
- cacheKey,
- updatedCounter,
+ async updateInvalidCaptchaCache(captchaToken: string) {
+ return await this.cacheStorage.setAdd(
+ this.getCacheKeyWithTimestamp(HealthCounterCacheKeys.InvalidCaptcha),
+ [captchaToken],
this.healthCacheTtl,
);
}
- async getInvalidCaptchaCounter() {
- const cacheKeys = this.getLastXMinutesTimestamps(
- this.healthMonitoringTimeWindowInMinutes,
- ).map((timestamp) =>
- this.getCacheKeyWithTimestamp(
+ async getInvalidCaptchaCounter(
+ timeWindowInSeconds: number = this.healthMetricsTimeWindowInMinutes * 60,
+ ) {
+ return await this.cacheStorage.countAllSetMembers(
+ this.computeTimeStampedCacheKeys(
HealthCounterCacheKeys.InvalidCaptcha,
- timestamp,
+ timeWindowInSeconds / (CACHE_BUCKET_DURATION_MS / 1000),
),
);
-
- let aggregatedCounter = 0;
-
- for (const key of cacheKeys) {
- const counter = await this.cacheStorage.get(key);
-
- aggregatedCounter += counter || 0;
- }
-
- return aggregatedCounter;
- }
-
- async incrementCalendarChannelSyncJobByStatusCounter(
- status: CalendarChannelSyncStatus,
- increment: number,
- ) {
- const cacheKey = this.getCacheKeyWithTimestamp(
- HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
- );
-
- const currentCounter =
- await this.cacheStorage.get(cacheKey);
-
- const updatedCounter = {
- ...(currentCounter || {}),
- [status]: (currentCounter?.[status] || 0) + increment,
- };
-
- return await this.cacheStorage.set(
- cacheKey,
- updatedCounter,
- this.healthCacheTtl,
- );
- }
-
- async getCalendarChannelSyncJobByStatusCounter() {
- const cacheKeys = this.getLastXMinutesTimestamps(
- this.healthMonitoringTimeWindowInMinutes,
- ).map((timestamp) =>
- this.getCacheKeyWithTimestamp(
- HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
- timestamp,
- ),
- );
-
- const aggregatedCounter = Object.fromEntries(
- Object.values(CalendarChannelSyncStatus).map((status) => [status, 0]),
- );
-
- for (const key of cacheKeys) {
- const counter =
- await this.cacheStorage.get(key);
-
- if (!counter) continue;
-
- for (const [status, count] of Object.entries(counter) as [
- CalendarChannelSyncStatus,
- number,
- ][]) {
- aggregatedCounter[status] += count;
- }
- }
-
- return aggregatedCounter;
}
}
diff --git a/packages/twenty-server/src/engine/core-modules/health/indicators/__tests__/connected-account.health.spec.ts b/packages/twenty-server/src/engine/core-modules/health/indicators/__tests__/connected-account.health.spec.ts
index ff394b20d..b920c6719 100644
--- a/packages/twenty-server/src/engine/core-modules/health/indicators/__tests__/connected-account.health.spec.ts
+++ b/packages/twenty-server/src/engine/core-modules/health/indicators/__tests__/connected-account.health.spec.ts
@@ -16,8 +16,7 @@ describe('ConnectedAccountHealth', () => {
beforeEach(async () => {
healthCacheService = {
- getMessageChannelSyncJobByStatusCounter: jest.fn(),
- getCalendarChannelSyncJobByStatusCounter: jest.fn(),
+ countChannelSyncJobByStatus: jest.fn(),
} as any;
healthIndicatorService = {
@@ -65,22 +64,17 @@ describe('ConnectedAccountHealth', () => {
describe('message sync health', () => {
it('should return up status when no message sync jobs are present', async () => {
- healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ healthCacheService.countChannelSyncJobByStatus
+ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
- [MessageChannelSyncStatus.ONGOING]: 0,
[MessageChannelSyncStatus.ACTIVE]: 0,
[MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 0,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 0,
- },
- );
-
- healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ })
+ .mockResolvedValueOnce({
[CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 0,
- },
- );
+ });
const result = await service.isHealthy();
@@ -98,22 +92,17 @@ describe('ConnectedAccountHealth', () => {
});
it(`should return down status when message sync failure rate is above ${METRICS_FAILURE_RATE_THRESHOLD}%`, async () => {
- healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ healthCacheService.countChannelSyncJobByStatus
+ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
- [MessageChannelSyncStatus.ONGOING]: 1,
[MessageChannelSyncStatus.ACTIVE]: 1,
[MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 2,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 2,
- },
- );
-
- healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ })
+ .mockResolvedValueOnce({
[CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 1,
- },
- );
+ });
const result = await service.isHealthy();
@@ -127,28 +116,23 @@ describe('ConnectedAccountHealth', () => {
);
expect(
result.connectedAccount.details.messageSync.details.failureRate,
- ).toBe(33.33);
+ ).toBe(40);
});
});
describe('calendar sync health', () => {
it('should return up status when no calendar sync jobs are present', async () => {
- healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ healthCacheService.countChannelSyncJobByStatus
+ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 0,
- },
- );
-
- healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ })
+ .mockResolvedValueOnce({
[CalendarChannelSyncStatus.NOT_SYNCED]: 0,
- [CalendarChannelSyncStatus.ONGOING]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 0,
[CalendarChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 0,
[CalendarChannelSyncStatus.FAILED_UNKNOWN]: 0,
- },
- );
+ });
const result = await service.isHealthy();
@@ -166,22 +150,17 @@ describe('ConnectedAccountHealth', () => {
});
it(`should return down status when calendar sync failure rate is above ${METRICS_FAILURE_RATE_THRESHOLD}%`, async () => {
- healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ healthCacheService.countChannelSyncJobByStatus
+ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 1,
- },
- );
-
- healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ })
+ .mockResolvedValueOnce({
[CalendarChannelSyncStatus.NOT_SYNCED]: 0,
- [CalendarChannelSyncStatus.ONGOING]: 1,
[CalendarChannelSyncStatus.ACTIVE]: 1,
[CalendarChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS]: 2,
[CalendarChannelSyncStatus.FAILED_UNKNOWN]: 2,
- },
- );
+ });
const result = await service.isHealthy();
@@ -195,25 +174,22 @@ describe('ConnectedAccountHealth', () => {
);
expect(
result.connectedAccount.details.calendarSync.details.failureRate,
- ).toBe(33.33);
+ ).toBe(40);
});
});
describe('timeout handling', () => {
it('should handle message sync timeout', async () => {
- healthCacheService.getMessageChannelSyncJobByStatusCounter.mockImplementationOnce(
- () =>
+ healthCacheService.countChannelSyncJobByStatus
+ .mockResolvedValueOnce(
new Promise((resolve) =>
setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100),
),
- );
-
- healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ )
+ .mockResolvedValueOnce({
[CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 1,
- },
- );
+ });
const healthCheckPromise = service.isHealthy();
@@ -231,19 +207,16 @@ describe('ConnectedAccountHealth', () => {
});
it('should handle calendar sync timeout', async () => {
- healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ healthCacheService.countChannelSyncJobByStatus
+ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 1,
- },
- );
-
- healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockImplementationOnce(
- () =>
+ })
+ .mockResolvedValueOnce(
new Promise((resolve) =>
setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100),
),
- );
+ );
const healthCheckPromise = service.isHealthy();
@@ -263,21 +236,17 @@ describe('ConnectedAccountHealth', () => {
describe('combined health check', () => {
it('should return combined status with both checks healthy', async () => {
- healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ healthCacheService.countChannelSyncJobByStatus
+ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 8,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 1,
- },
- );
-
- healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ })
+ .mockResolvedValueOnce({
[CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 8,
[CalendarChannelSyncStatus.FAILED_UNKNOWN]: 1,
- },
- );
+ });
const result = await service.isHealthy();
@@ -287,21 +256,17 @@ describe('ConnectedAccountHealth', () => {
});
it('should return down status when both syncs fail', async () => {
- healthCacheService.getMessageChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ healthCacheService.countChannelSyncJobByStatus
+ .mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 1,
[MessageChannelSyncStatus.FAILED_UNKNOWN]: 2,
- },
- );
-
- healthCacheService.getCalendarChannelSyncJobByStatusCounter.mockResolvedValue(
- {
+ })
+ .mockResolvedValueOnce({
[CalendarChannelSyncStatus.NOT_SYNCED]: 0,
[CalendarChannelSyncStatus.ACTIVE]: 1,
[CalendarChannelSyncStatus.FAILED_UNKNOWN]: 2,
- },
- );
+ });
const result = await service.isHealthy();
diff --git a/packages/twenty-server/src/engine/core-modules/health/indicators/connected-account.health.ts b/packages/twenty-server/src/engine/core-modules/health/indicators/connected-account.health.ts
index f0b9d31de..96c3d6dca 100644
--- a/packages/twenty-server/src/engine/core-modules/health/indicators/connected-account.health.ts
+++ b/packages/twenty-server/src/engine/core-modules/health/indicators/connected-account.health.ts
@@ -7,6 +7,7 @@ import {
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { METRICS_FAILURE_RATE_THRESHOLD } from 'src/engine/core-modules/health/constants/metrics-failure-rate-threshold.const';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
+import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
import { withHealthCheckTimeout } from 'src/engine/core-modules/health/utils/health-check-timeout.util';
@Injectable()
@@ -21,7 +22,9 @@ export class ConnectedAccountHealth {
try {
const counters = await withHealthCheckTimeout(
- this.healthCacheService.getMessageChannelSyncJobByStatusCounter(),
+ this.healthCacheService.countChannelSyncJobByStatus(
+ HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
+ ),
HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_TIMEOUT,
);
@@ -70,7 +73,9 @@ export class ConnectedAccountHealth {
try {
const counters = await withHealthCheckTimeout(
- this.healthCacheService.getCalendarChannelSyncJobByStatusCounter(),
+ this.healthCacheService.countChannelSyncJobByStatus(
+ HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
+ ),
HEALTH_ERROR_MESSAGES.CALENDAR_SYNC_TIMEOUT,
);
diff --git a/packages/twenty-server/src/engine/core-modules/health/types/account-sync-metrics.types.ts b/packages/twenty-server/src/engine/core-modules/health/types/account-sync-metrics.types.ts
index 506516b43..f3d2f4cae 100644
--- a/packages/twenty-server/src/engine/core-modules/health/types/account-sync-metrics.types.ts
+++ b/packages/twenty-server/src/engine/core-modules/health/types/account-sync-metrics.types.ts
@@ -5,9 +5,6 @@ export class AccountSyncJobByStatusCounter {
@Field(() => Number, { nullable: true })
NOT_SYNCED?: number;
- @Field(() => Number, { nullable: true })
- ONGOING?: number;
-
@Field(() => Number, { nullable: true })
ACTIVE?: number;
diff --git a/packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts b/packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts
index ba2c47b2a..46f0c181d 100644
--- a/packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts
+++ b/packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts
@@ -6,6 +6,7 @@ import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decora
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
+import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import {
CalendarChannelSyncStage,
@@ -79,11 +80,6 @@ export class CalendarChannelSyncStatusService {
syncStatus: CalendarChannelSyncStatus.ONGOING,
syncStageStartedAt: new Date().toISOString(),
});
-
- await this.healthCacheService.incrementCalendarChannelSyncJobByStatusCounter(
- CalendarChannelSyncStatus.ONGOING,
- calendarChannelIds.length,
- );
}
public async resetAndScheduleFullCalendarEventListFetch(
@@ -183,9 +179,10 @@ export class CalendarChannelSyncStatusService {
await this.schedulePartialCalendarEventListFetch(calendarChannelIds);
- await this.healthCacheService.incrementCalendarChannelSyncJobByStatusCounter(
+ await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
+ HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
CalendarChannelSyncStatus.ACTIVE,
- calendarChannelIds.length,
+ calendarChannelIds,
);
}
@@ -213,9 +210,10 @@ export class CalendarChannelSyncStatusService {
syncStage: CalendarChannelSyncStage.FAILED,
});
- await this.healthCacheService.incrementCalendarChannelSyncJobByStatusCounter(
+ await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
+ HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
CalendarChannelSyncStatus.FAILED_UNKNOWN,
- calendarChannelIds.length,
+ calendarChannelIds,
);
}
@@ -268,9 +266,10 @@ export class CalendarChannelSyncStatusService {
workspaceId,
);
- await this.healthCacheService.incrementCalendarChannelSyncJobByStatusCounter(
+ await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
+ HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
CalendarChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
- calendarChannelIds.length,
+ calendarChannelIds,
);
}
diff --git a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts
index 1a6a01230..0a06eed84 100644
--- a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts
+++ b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts
@@ -6,6 +6,7 @@ import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decora
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
+import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@@ -129,11 +130,6 @@ export class MessageChannelSyncStatusService {
syncStatus: MessageChannelSyncStatus.ONGOING,
syncStageStartedAt: new Date().toISOString(),
});
-
- await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter(
- MessageChannelSyncStatus.ONGOING,
- messageChannelIds.length,
- );
}
public async markAsCompletedAndSchedulePartialMessageListFetch(
@@ -156,9 +152,10 @@ export class MessageChannelSyncStatusService {
syncedAt: new Date().toISOString(),
});
- await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter(
+ await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
+ HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
MessageChannelSyncStatus.ACTIVE,
- messageChannelIds.length,
+ messageChannelIds,
);
}
@@ -202,9 +199,10 @@ export class MessageChannelSyncStatusService {
syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN,
});
- await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter(
+ await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
+ HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
MessageChannelSyncStatus.FAILED_UNKNOWN,
- messageChannelIds.length,
+ messageChannelIds,
);
}
@@ -232,9 +230,10 @@ export class MessageChannelSyncStatusService {
syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
});
- await this.healthCacheService.incrementMessageChannelSyncJobByStatusCounter(
+ await this.healthCacheService.updateMessageOrCalendarChannelSyncJobByStatusCache(
+ HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
- messageChannelIds.length,
+ messageChannelIds,
);
const connectedAccountRepository =