set up metrics collecting with open telemetry (#11236)

Done :  
- move metrics and health cache services from health module to metrics
module
- refactor metrics counter from specific method to set up from enum keys
- add OpenTelemetry (Otel) instrumentation for metrics
- set up Otel SDK to send metrics to Otel collector

To do later : 
- implement Otel instrumentation for traces + plug Sentry on top
This commit is contained in:
Etienne
2025-03-28 08:45:24 +01:00
committed by GitHub
parent e9e33c4d29
commit 391392dd87
32 changed files with 575 additions and 297 deletions

View File

@ -30,9 +30,9 @@ import { FeatureFlag } from 'src/engine/core-modules/feature-flag/feature-flag.e
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
import { FileUploadModule } from 'src/engine/core-modules/file/file-upload/file-upload.module';
import { GuardRedirectModule } from 'src/engine/core-modules/guard-redirect/guard-redirect.module';
import { HealthModule } from 'src/engine/core-modules/health/health.module';
import { JwtModule } from 'src/engine/core-modules/jwt/jwt.module';
import { KeyValuePair } from 'src/engine/core-modules/key-value-pair/key-value-pair.entity';
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';
import { OnboardingModule } from 'src/engine/core-modules/onboarding/onboarding.module';
import { WorkspaceSSOModule } from 'src/engine/core-modules/sso/sso.module';
import { WorkspaceSSOIdentityProvider } from 'src/engine/core-modules/sso/workspace-sso-identity-provider.entity';
@ -90,7 +90,7 @@ import { JwtAuthStrategy } from './strategies/jwt.auth.strategy';
WorkspaceInvitationModule,
EmailVerificationModule,
GuardRedirectModule,
HealthModule,
MetricsModule,
PermissionsModule,
UserRoleModule,
],

View File

@ -7,13 +7,14 @@ import {
import { GqlExecutionContext } from '@nestjs/graphql';
import { CaptchaService } from 'src/engine/core-modules/captcha/captcha.service';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
@Injectable()
export class CaptchaGuard implements CanActivate {
constructor(
private captchaService: CaptchaService,
private healthCacheService: HealthCacheService,
private metricsService: MetricsService,
) {}
async canActivate(context: ExecutionContext): Promise<boolean> {
@ -26,7 +27,10 @@ export class CaptchaGuard implements CanActivate {
if (result.success) {
return true;
} else {
await this.healthCacheService.updateInvalidCaptchaCache(token);
await this.metricsService.incrementCounter({
key: MetricsKeys.InvalidCaptcha,
eventId: token,
});
throw new BadRequestException(
'Invalid Captcha, please try another device',

View File

@ -54,61 +54,67 @@ export const ENVIRONMENT_VARIABLES_GROUP_METADATA: Record<
'By default, exceptions are sent to the logs. This should be enough for most self-hosting use-cases. For our cloud app we use Sentry.',
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.Other]: {
[EnvironmentVariablesGroup.Metering]: {
position: 900,
description:
'By default, metrics are sent to the console. OpenTelemetry collector can be set up for self-hosting use-cases.',
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.Other]: {
position: 1000,
description:
"The variables in this section are mostly used for internal purposes (running our Cloud offering), but shouldn't usually be required for a simple self-hosted instance",
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.BillingConfig]: {
position: 1000,
position: 1100,
description:
'We use Stripe in our Cloud app to charge customers. Not relevant to Self-hosters.',
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.CaptchaConfig]: {
position: 1100,
position: 1200,
description:
'This protects critical endpoints like login and signup with a captcha to prevent bot attacks. Likely unnecessary for self-hosting scenarios.',
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.CloudflareConfig]: {
position: 1200,
position: 1300,
description: '',
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.LLM]: {
position: 1300,
position: 1400,
description:
'Configure the LLM provider and model to use for the app. This is experimental and not linked to any public feature.',
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.ServerlessConfig]: {
position: 1400,
position: 1500,
description:
'In our multi-tenant cloud app, we offload untrusted custom code from workflows to a serverless system (Lambda) for enhanced security and scalability. Self-hosters with a single tenant can typically ignore this configuration.',
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.SSL]: {
position: 1500,
position: 1600,
description:
'Configure this if you want to setup SSL on your server or full end-to-end encryption. If you just want basic HTTPS, a simple setup like Cloudflare in flexible mode might be easier.',
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.SupportChatConfig]: {
position: 1600,
position: 1700,
description:
'We use this to setup a small support chat on the bottom left. Currently powered by Front.',
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.AnalyticsConfig]: {
position: 1700,
position: 1800,
description:
'Were running a test to perform analytics within the app. This will evolve.',
isHiddenOnLoad: true,
},
[EnvironmentVariablesGroup.TokensDuration]: {
position: 1800,
position: 1900,
description:
'These have been set to sensible default so you probably dont need to change them unless you have a specific use-case.',
isHiddenOnLoad: true,

View File

@ -0,0 +1,21 @@
import { Transform } from 'class-transformer';
import { MeterDriver } from 'src/engine/core-modules/metrics/types/meter-driver.type';
export const CastToMeterDriverArray = () =>
Transform(({ value }: { value: string }) => toMeterDriverArray(value));
const toMeterDriverArray = (value: string | undefined) => {
if (typeof value === 'string') {
const rawMeterDrivers = value.split(',').map((driver) => driver.trim());
const isInvalid = rawMeterDrivers.some(
(driver) => !Object.values(MeterDriver).includes(driver as MeterDriver),
);
if (!isInvalid) {
return rawMeterDrivers;
}
}
return undefined;
};

View File

@ -6,6 +6,7 @@ export enum EnvironmentVariablesGroup {
MicrosoftAuth = 'microsoft-auth',
EmailSettings = 'email-settings',
Logging = 'logging',
Metering = 'metering',
ExceptionHandler = 'exception-handler',
Other = 'other',
BillingConfig = 'billing-config',

View File

@ -25,6 +25,7 @@ import { LLMTracingDriver } from 'src/engine/core-modules/llm-tracing/interfaces
import { CaptchaDriverType } from 'src/engine/core-modules/captcha/interfaces';
import { CastToBoolean } from 'src/engine/core-modules/environment/decorators/cast-to-boolean.decorator';
import { CastToLogLevelArray } from 'src/engine/core-modules/environment/decorators/cast-to-log-level-array.decorator';
import { CastToMeterDriverArray } from 'src/engine/core-modules/environment/decorators/cast-to-meter-driver.decorator';
import { CastToPositiveNumber } from 'src/engine/core-modules/environment/decorators/cast-to-positive-number.decorator';
import { EnvironmentVariablesMetadata } from 'src/engine/core-modules/environment/decorators/environment-variables-metadata.decorator';
import { IsAWSRegion } from 'src/engine/core-modules/environment/decorators/is-aws-region.decorator';
@ -36,6 +37,7 @@ import { EnvironmentVariablesGroup } from 'src/engine/core-modules/environment/e
import { ExceptionHandlerDriver } from 'src/engine/core-modules/exception-handler/interfaces';
import { StorageDriverType } from 'src/engine/core-modules/file-storage/interfaces';
import { LoggerDriverType } from 'src/engine/core-modules/logger/interfaces';
import { MeterDriver } from 'src/engine/core-modules/metrics/types/meter-driver.type';
import { ServerlessDriverType } from 'src/engine/core-modules/serverless/serverless.interface';
export class EnvironmentVariables {
@ -585,6 +587,22 @@ export class EnvironmentVariables {
@IsOptional()
LOG_LEVELS: LogLevel[] = ['log', 'error', 'warn'];
@EnvironmentVariablesMetadata({
group: EnvironmentVariablesGroup.Metering,
description: 'Driver used for collect metrics (OpenTelemetry or Console)',
})
@CastToMeterDriverArray()
@IsOptional()
METER_DRIVER: MeterDriver[] = [MeterDriver.Console];
@EnvironmentVariablesMetadata({
group: EnvironmentVariablesGroup.Metering,
description: 'Endpoint URL for the OpenTelemetry collector',
})
@ValidateIf((env) => env.METER_DRIVER.includes(MeterDriver.OpenTelemetry))
@IsOptional()
OTLP_COLLECTOR_ENDPOINT_URL: string;
@EnvironmentVariablesMetadata({
group: EnvironmentVariablesGroup.ExceptionHandler,
description: 'Driver used for logging (only console for now)',

View File

@ -1,30 +0,0 @@
import { Test, TestingModule } from '@nestjs/testing';
import { MetricsController } from 'src/engine/core-modules/health/controllers/metrics.controller';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
describe('MetricsController', () => {
let metricsController: MetricsController;
beforeEach(async () => {
const testingModule: TestingModule = await Test.createTestingModule({
controllers: [MetricsController],
providers: [
{
provide: HealthCacheService,
useValue: {
getMessageChannelSyncJobByStatusCounter: jest.fn(),
getCalendarChannelSyncJobByStatusCounter: jest.fn(),
getInvalidCaptchaCounter: jest.fn(),
},
},
],
}).compile();
metricsController = testingModule.get<MetricsController>(MetricsController);
});
it('should be defined', () => {
expect(metricsController).toBeDefined();
});
});

View File

@ -1,28 +0,0 @@
import { Controller, Get } from '@nestjs/common';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
@Controller('metrics')
export class MetricsController {
constructor(private readonly healthCacheService: HealthCacheService) {}
@Get('/message-channel-sync-job-by-status-counter')
getMessageChannelSyncJobByStatusCounter() {
return this.healthCacheService.countChannelSyncJobByStatus(
HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
);
}
@Get('/invalid-captcha-counter')
getInvalidCaptchaCounter() {
return this.healthCacheService.getInvalidCaptchaCounter();
}
@Get('/calendar-channel-sync-job-by-status-counter')
getCalendarChannelSyncJobByStatusCounter() {
return this.healthCacheService.countChannelSyncJobByStatus(
HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
);
}
}

View File

@ -1,139 +0,0 @@
import { Injectable } from '@nestjs/common';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { AccountSyncJobByStatusCounter } from 'src/engine/core-modules/health/types/account-sync-metrics.types';
import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
import { CalendarChannelSyncStatus } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { MessageChannelSyncStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
const CACHE_BUCKET_DURATION_MS = 15000; // 15 seconds window for each cache bucket
@Injectable()
export class HealthCacheService {
private readonly healthMetricsTimeWindowInMinutes: number;
private readonly healthCacheTtl: number;
constructor(
@InjectCacheStorage(CacheStorageNamespace.EngineHealth)
private readonly cacheStorage: CacheStorageService,
private readonly environmentService: EnvironmentService,
) {
this.healthMetricsTimeWindowInMinutes = this.environmentService.get(
'HEALTH_METRICS_TIME_WINDOW_IN_MINUTES',
);
this.healthCacheTtl = this.healthMetricsTimeWindowInMinutes * 60000 * 2;
}
private getCacheBucketStartTimestamp(timestamp: number): number {
return (
Math.floor(timestamp / CACHE_BUCKET_DURATION_MS) *
CACHE_BUCKET_DURATION_MS
);
}
private getCacheKeyWithTimestamp(key: string, timestamp?: number): string {
const currentIntervalTimestamp =
timestamp ?? this.getCacheBucketStartTimestamp(Date.now());
return `${key}:${currentIntervalTimestamp}`;
}
private getLastCacheBucketStartTimestampsFromDate(
cacheBucketsCount: number,
date: number = Date.now(),
): number[] {
const currentIntervalTimestamp = this.getCacheBucketStartTimestamp(date);
return Array.from(
{ length: cacheBucketsCount },
(_, i) => currentIntervalTimestamp - i * CACHE_BUCKET_DURATION_MS,
);
}
async updateMessageOrCalendarChannelSyncJobByStatusCache(
key: HealthCounterCacheKeys,
status: MessageChannelSyncStatus | CalendarChannelSyncStatus,
messageChannelIds: string[],
) {
return await this.cacheStorage.setAdd(
this.getCacheKeyWithTimestamp(`${key}:${status}`),
messageChannelIds,
this.healthCacheTtl,
);
}
async countChannelSyncJobByStatus(
key:
| HealthCounterCacheKeys.MessageChannelSyncJobByStatus
| HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
timeWindowInSeconds: number = this.healthMetricsTimeWindowInMinutes * 60,
): Promise<AccountSyncJobByStatusCounter> {
if ((timeWindowInSeconds * 1000) % CACHE_BUCKET_DURATION_MS !== 0) {
throw new Error(
`Time window must be divisible by ${CACHE_BUCKET_DURATION_MS}`,
);
}
const now = Date.now();
const countByStatus = {} as AccountSyncJobByStatusCounter;
const statuses =
key === HealthCounterCacheKeys.MessageChannelSyncJobByStatus
? Object.values(MessageChannelSyncStatus).filter(
(status) => status !== MessageChannelSyncStatus.ONGOING,
)
: Object.values(CalendarChannelSyncStatus).filter(
(status) => status !== CalendarChannelSyncStatus.ONGOING,
);
const cacheBuckets =
timeWindowInSeconds / (CACHE_BUCKET_DURATION_MS / 1000);
for (const status of statuses) {
const cacheKeys = this.computeTimeStampedCacheKeys(
`${key}:${status}`,
cacheBuckets,
now,
);
const channelIdsCount =
await this.cacheStorage.countAllSetMembers(cacheKeys);
countByStatus[status] = channelIdsCount;
}
return countByStatus;
}
computeTimeStampedCacheKeys(
key: string,
cacheBucketsCount: number,
date: number = Date.now(),
) {
return this.getLastCacheBucketStartTimestampsFromDate(
cacheBucketsCount,
date,
).map((timestamp) => this.getCacheKeyWithTimestamp(key, timestamp));
}
async updateInvalidCaptchaCache(captchaToken: string) {
return await this.cacheStorage.setAdd(
this.getCacheKeyWithTimestamp(HealthCounterCacheKeys.InvalidCaptcha),
[captchaToken],
this.healthCacheTtl,
);
}
async getInvalidCaptchaCounter(
timeWindowInSeconds: number = this.healthMetricsTimeWindowInMinutes * 60,
) {
return await this.cacheStorage.countAllSetMembers(
this.computeTimeStampedCacheKeys(
HealthCounterCacheKeys.InvalidCaptcha,
timeWindowInSeconds / (CACHE_BUCKET_DURATION_MS / 1000),
),
);
}
}

View File

@ -3,14 +3,12 @@ import { TerminusModule } from '@nestjs/terminus';
import { TypeOrmModule } from '@nestjs/typeorm';
import { HealthController } from 'src/engine/core-modules/health/controllers/health.controller';
import { MetricsController } from 'src/engine/core-modules/health/controllers/metrics.controller';
import { AppHealthIndicator } from 'src/engine/core-modules/health/indicators/app.health';
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';
import { RedisClientModule } from 'src/engine/core-modules/redis-client/redis-client.module';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { WorkspaceMigrationModule } from 'src/engine/metadata-modules/workspace-migration/workspace-migration.module';
import { HealthCacheService } from './health-cache.service';
import { ConnectedAccountHealth } from './indicators/connected-account.health';
import { DatabaseHealthIndicator } from './indicators/database.health';
import { RedisHealthIndicator } from './indicators/redis.health';
@ -21,10 +19,10 @@ import { WorkerHealthIndicator } from './indicators/worker.health';
RedisClientModule,
WorkspaceMigrationModule,
TypeOrmModule.forFeature([Workspace], 'core'),
MetricsModule,
],
controllers: [HealthController, MetricsController],
controllers: [HealthController],
providers: [
HealthCacheService,
DatabaseHealthIndicator,
RedisHealthIndicator,
WorkerHealthIndicator,
@ -32,7 +30,6 @@ import { WorkerHealthIndicator } from './indicators/worker.health';
AppHealthIndicator,
],
exports: [
HealthCacheService,
DatabaseHealthIndicator,
RedisHealthIndicator,
WorkerHealthIndicator,

View File

@ -4,19 +4,19 @@ import { Test, TestingModule } from '@nestjs/testing';
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { HEALTH_INDICATORS_TIMEOUT } from 'src/engine/core-modules/health/constants/health-indicators-timeout.conts';
import { METRICS_FAILURE_RATE_THRESHOLD } from 'src/engine/core-modules/health/constants/metrics-failure-rate-threshold.const';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { ConnectedAccountHealth } from 'src/engine/core-modules/health/indicators/connected-account.health';
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
import { CalendarChannelSyncStatus } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { MessageChannelSyncStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
describe('ConnectedAccountHealth', () => {
let service: ConnectedAccountHealth;
let healthCacheService: jest.Mocked<HealthCacheService>;
let metricsService: jest.Mocked<MetricsService>;
let healthIndicatorService: jest.Mocked<HealthIndicatorService>;
beforeEach(async () => {
healthCacheService = {
countChannelSyncJobByStatus: jest.fn(),
metricsService = {
groupMetrics: jest.fn(),
} as any;
healthIndicatorService = {
@ -41,8 +41,8 @@ describe('ConnectedAccountHealth', () => {
providers: [
ConnectedAccountHealth,
{
provide: HealthCacheService,
useValue: healthCacheService,
provide: MetricsService,
useValue: metricsService,
},
{
provide: HealthIndicatorService,
@ -64,7 +64,7 @@ describe('ConnectedAccountHealth', () => {
describe('message sync health', () => {
it('should return up status when no message sync jobs are present', async () => {
healthCacheService.countChannelSyncJobByStatus
metricsService.groupMetrics
.mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 0,
@ -92,7 +92,7 @@ describe('ConnectedAccountHealth', () => {
});
it(`should return down status when message sync failure rate is above ${METRICS_FAILURE_RATE_THRESHOLD}%`, async () => {
healthCacheService.countChannelSyncJobByStatus
metricsService.groupMetrics
.mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 1,
@ -122,7 +122,7 @@ describe('ConnectedAccountHealth', () => {
describe('calendar sync health', () => {
it('should return up status when no calendar sync jobs are present', async () => {
healthCacheService.countChannelSyncJobByStatus
metricsService.groupMetrics
.mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 0,
@ -150,7 +150,7 @@ describe('ConnectedAccountHealth', () => {
});
it(`should return down status when calendar sync failure rate is above ${METRICS_FAILURE_RATE_THRESHOLD}%`, async () => {
healthCacheService.countChannelSyncJobByStatus
metricsService.groupMetrics
.mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 1,
@ -180,7 +180,7 @@ describe('ConnectedAccountHealth', () => {
describe('timeout handling', () => {
it('should handle message sync timeout', async () => {
healthCacheService.countChannelSyncJobByStatus
metricsService.groupMetrics
.mockResolvedValueOnce(
new Promise((resolve) =>
setTimeout(resolve, HEALTH_INDICATORS_TIMEOUT + 100),
@ -207,7 +207,7 @@ describe('ConnectedAccountHealth', () => {
});
it('should handle calendar sync timeout', async () => {
healthCacheService.countChannelSyncJobByStatus
metricsService.groupMetrics
.mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 1,
@ -236,7 +236,7 @@ describe('ConnectedAccountHealth', () => {
describe('combined health check', () => {
it('should return combined status with both checks healthy', async () => {
healthCacheService.countChannelSyncJobByStatus
metricsService.groupMetrics
.mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 8,
@ -256,7 +256,7 @@ describe('ConnectedAccountHealth', () => {
});
it('should return down status when both syncs fail', async () => {
healthCacheService.countChannelSyncJobByStatus
metricsService.groupMetrics
.mockResolvedValueOnce({
[MessageChannelSyncStatus.NOT_SYNCED]: 0,
[MessageChannelSyncStatus.ACTIVE]: 1,

View File

@ -6,15 +6,17 @@ import {
import { HEALTH_ERROR_MESSAGES } from 'src/engine/core-modules/health/constants/health-error-messages.constants';
import { METRICS_FAILURE_RATE_THRESHOLD } from 'src/engine/core-modules/health/constants/metrics-failure-rate-threshold.const';
import { HealthCacheService } from 'src/engine/core-modules/health/health-cache.service';
import { HealthCounterCacheKeys } from 'src/engine/core-modules/health/types/health-counter-cache-keys.type';
import { withHealthCheckTimeout } from 'src/engine/core-modules/health/utils/health-check-timeout.util';
import {
CALENDAR_SYNC_METRICS_BY_STATUS,
MESSAGE_SYNC_METRICS_BY_STATUS,
} from 'src/engine/core-modules/metrics/constants/account-sync-metrics-by-status.constant';
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
@Injectable()
export class ConnectedAccountHealth {
constructor(
private readonly healthIndicatorService: HealthIndicatorService,
private readonly healthCacheService: HealthCacheService,
private readonly metricsService: MetricsService,
) {}
private async checkMessageSyncHealth(): Promise<HealthIndicatorResult> {
@ -22,9 +24,7 @@ export class ConnectedAccountHealth {
try {
const counters = await withHealthCheckTimeout(
this.healthCacheService.countChannelSyncJobByStatus(
HealthCounterCacheKeys.MessageChannelSyncJobByStatus,
),
this.metricsService.groupMetrics(MESSAGE_SYNC_METRICS_BY_STATUS),
HEALTH_ERROR_MESSAGES.MESSAGE_SYNC_TIMEOUT,
);
@ -73,9 +73,7 @@ export class ConnectedAccountHealth {
try {
const counters = await withHealthCheckTimeout(
this.healthCacheService.countChannelSyncJobByStatus(
HealthCounterCacheKeys.CalendarEventSyncJobByStatus,
),
this.metricsService.groupMetrics(CALENDAR_SYNC_METRICS_BY_STATUS),
HEALTH_ERROR_MESSAGES.CALENDAR_SYNC_TIMEOUT,
);

View File

@ -1,5 +0,0 @@
export enum HealthCounterCacheKeys {
MessageChannelSyncJobByStatus = 'message-channel-sync-job-by-status',
InvalidCaptcha = 'invalid-captcha',
CalendarEventSyncJobByStatus = 'calendar-event-sync-job-by-status',
}

View File

@ -0,0 +1,31 @@
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
export const MESSAGE_SYNC_METRICS_BY_STATUS = [
{
name: 'ACTIVE',
cacheKey: MetricsKeys.MessageChannelSyncJobActive,
},
{
name: 'FAILED_UNKNOWN',
cacheKey: MetricsKeys.MessageChannelSyncJobFailedUnknown,
},
{
name: 'FAILED_INSUFFICIENT_PERMISSIONS',
cacheKey: MetricsKeys.MessageChannelSyncJobFailedInsufficientPermissions,
},
];
export const CALENDAR_SYNC_METRICS_BY_STATUS = [
{
name: 'ACTIVE',
cacheKey: MetricsKeys.CalendarEventSyncJobActive,
},
{
name: 'FAILED_UNKNOWN',
cacheKey: MetricsKeys.CalendarEventSyncJobFailedUnknown,
},
{
name: 'FAILED_INSUFFICIENT_PERMISSIONS',
cacheKey: MetricsKeys.CalendarEventSyncJobFailedInsufficientPermissions,
},
];

View File

@ -0,0 +1,94 @@
import { Injectable } from '@nestjs/common';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
const CACHE_BUCKET_DURATION_MS = 15000; // 15 seconds window for each cache bucket
@Injectable()
export class MetricsCacheService {
private readonly healthMetricsTimeWindowInMinutes: number;
private readonly healthCacheTtl: number;
constructor(
@InjectCacheStorage(CacheStorageNamespace.EngineHealth)
private readonly cacheStorage: CacheStorageService,
private readonly environmentService: EnvironmentService,
) {
this.healthMetricsTimeWindowInMinutes = this.environmentService.get(
'HEALTH_METRICS_TIME_WINDOW_IN_MINUTES',
);
this.healthCacheTtl = this.healthMetricsTimeWindowInMinutes * 60000 * 2;
}
private getCacheBucketStartTimestamp(timestamp: number): number {
return (
Math.floor(timestamp / CACHE_BUCKET_DURATION_MS) *
CACHE_BUCKET_DURATION_MS
);
}
private getCacheKeyWithTimestamp(key: string, timestamp?: number): string {
const currentIntervalTimestamp =
timestamp ?? this.getCacheBucketStartTimestamp(Date.now());
return `${key}:${currentIntervalTimestamp}`;
}
private getLastCacheBucketStartTimestampsFromDate(
cacheBucketsCount: number,
date: number,
): number[] {
const currentIntervalTimestamp = this.getCacheBucketStartTimestamp(date);
return Array.from(
{ length: cacheBucketsCount },
(_, i) => currentIntervalTimestamp - i * CACHE_BUCKET_DURATION_MS,
);
}
async updateCounter(key: MetricsKeys, items: string[]) {
return await this.cacheStorage.setAdd(
this.getCacheKeyWithTimestamp(key),
items,
this.healthCacheTtl,
);
}
async computeCount({
key,
timeWindowInSeconds = this.healthMetricsTimeWindowInMinutes * 60,
date = Date.now(),
}: {
key: MetricsKeys;
timeWindowInSeconds?: number;
date?: number;
}): Promise<number> {
if ((timeWindowInSeconds * 1000) % CACHE_BUCKET_DURATION_MS !== 0) {
throw new Error(
`Time window must be divisible by ${CACHE_BUCKET_DURATION_MS}`,
);
}
const cacheBuckets =
timeWindowInSeconds / (CACHE_BUCKET_DURATION_MS / 1000);
const cacheKeys = this.computeTimeStampedCacheKeys(key, cacheBuckets, date);
return await this.cacheStorage.countAllSetMembers(cacheKeys);
}
computeTimeStampedCacheKeys(
key: string,
cacheBucketsCount: number,
date: number,
) {
return this.getLastCacheBucketStartTimestampsFromDate(
cacheBucketsCount,
date,
).map((timestamp) => this.getCacheKeyWithTimestamp(key, timestamp));
}
}

View File

@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { MetricsCacheService } from 'src/engine/core-modules/metrics/metrics-cache.service';
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
@Module({
providers: [MetricsService, MetricsCacheService],
exports: [MetricsService, MetricsCacheService],
})
export class MetricsModule {}

View File

@ -0,0 +1,70 @@
import { Injectable } from '@nestjs/common';
import { metrics } from '@opentelemetry/api';
import { MetricsCacheService } from 'src/engine/core-modules/metrics/metrics-cache.service';
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
@Injectable()
export class MetricsService {
constructor(private readonly metricsCacheService: MetricsCacheService) {}
async incrementCounter({
key,
eventId,
shouldStoreInCache = true,
}: {
key: MetricsKeys;
eventId: string;
shouldStoreInCache?: boolean;
}) {
//TODO : Define meter name usage in monitoring
const meter = metrics.getMeter('twenty-server');
const counter = meter.createCounter(key);
counter.add(1);
if (shouldStoreInCache) {
this.metricsCacheService.updateCounter(key, [eventId]);
}
}
async batchIncrementCounter({
key,
eventIds,
shouldStoreInCache = true,
}: {
key: MetricsKeys;
eventIds: string[];
shouldStoreInCache?: boolean;
}) {
//TODO : Define meter name usage in monitoring
const meter = metrics.getMeter('twenty-server');
const counter = meter.createCounter(key);
counter.add(eventIds.length);
if (shouldStoreInCache) {
this.metricsCacheService.updateCounter(key, eventIds);
}
}
async groupMetrics(
metrics: { name: string; cacheKey: MetricsKeys }[],
): Promise<Record<string, number>> {
const groupedMetrics: Record<string, number> = {};
const date = Date.now();
for (const metric of metrics) {
const metricValue = await this.metricsCacheService.computeCount({
key: metric.cacheKey,
date,
});
groupedMetrics[metric.name] = metricValue;
}
return groupedMetrics;
}
}

View File

@ -0,0 +1,4 @@
export enum MeterDriver {
OpenTelemetry = 'opentelemetry',
Console = 'console',
}

View File

@ -0,0 +1,9 @@
export enum MetricsKeys {
MessageChannelSyncJobActive = 'message-channel-sync-job/active',
MessageChannelSyncJobFailedInsufficientPermissions = 'message-channel-sync-job/failed-insufficient-permissions',
MessageChannelSyncJobFailedUnknown = 'message-channel-sync-job/failed-unknown',
CalendarEventSyncJobActive = 'calendar-event-sync-job/active',
CalendarEventSyncJobFailedInsufficientPermissions = 'calendar-event-sync-job/failed-insufficient-permissions',
CalendarEventSyncJobFailedUnknown = 'calendar-event-sync-job/failed-unknown',
InvalidCaptcha = 'invalid-captcha',
}