Add metrics to workflows (#12829)
- ensure each trigger is working properly - check throttle does not happen too often - keep an eye on the completed/failed proportion
This commit is contained in:
@ -6,4 +6,11 @@ export enum MetricsKeys {
|
||||
CalendarEventSyncJobFailedInsufficientPermissions = 'calendar-event-sync-job/failed-insufficient-permissions',
|
||||
CalendarEventSyncJobFailedUnknown = 'calendar-event-sync-job/failed-unknown',
|
||||
InvalidCaptcha = 'invalid-captcha',
|
||||
WorkflowRunStartedDatabaseEventTrigger = 'workflow-run/started/database-event-trigger',
|
||||
WorkflowRunStartedCronTrigger = 'workflow-run/started/cron-trigger',
|
||||
WorkflowRunStartedWebhookTrigger = 'workflow-run/started/webhook-trigger',
|
||||
WorkflowRunStartedManualTrigger = 'workflow-run/started/manual-trigger',
|
||||
WorkflowRunCompleted = 'workflow-run/completed',
|
||||
WorkflowRunFailed = 'workflow-run/failed',
|
||||
WorkflowRunFailedThrottled = 'workflow-run/failed/throttled',
|
||||
}
|
||||
|
||||
@ -3,6 +3,8 @@ import { Scope } from '@nestjs/common';
|
||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
|
||||
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
|
||||
import { ThrottlerService } from 'src/engine/core-modules/throttler/throttler.service';
|
||||
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
|
||||
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
@ -13,8 +15,9 @@ import {
|
||||
WorkflowRunException,
|
||||
WorkflowRunExceptionCode,
|
||||
} from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception';
|
||||
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
|
||||
import { getRootSteps } from 'src/modules/workflow/workflow-runner/utils/getRootSteps.utils';
|
||||
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
|
||||
import { WorkflowTriggerType } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type';
|
||||
|
||||
export type RunWorkflowJobData = {
|
||||
workspaceId: string;
|
||||
@ -31,6 +34,7 @@ export class RunWorkflowJob {
|
||||
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
|
||||
private readonly throttlerService: ThrottlerService,
|
||||
private readonly twentyConfigService: TwentyConfigService,
|
||||
private readonly metricsService: MetricsService,
|
||||
) {}
|
||||
|
||||
@Process(RunWorkflowJob.name)
|
||||
@ -96,6 +100,11 @@ export class RunWorkflowJob {
|
||||
);
|
||||
}
|
||||
|
||||
await this.incrementTriggerMetrics({
|
||||
workflowRunId,
|
||||
triggerType: workflowVersion.trigger.type,
|
||||
});
|
||||
|
||||
await this.workflowRunWorkspaceService.startWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
@ -222,10 +231,47 @@ export class RunWorkflowJob {
|
||||
this.twentyConfigService.get('WORKFLOW_EXEC_THROTTLE_TTL'),
|
||||
);
|
||||
} catch (error) {
|
||||
await this.metricsService.incrementCounter({
|
||||
key: MetricsKeys.WorkflowRunFailedThrottled,
|
||||
eventId: workflowId,
|
||||
});
|
||||
|
||||
throw new WorkflowRunException(
|
||||
'Workflow execution rate limit exceeded',
|
||||
WorkflowRunExceptionCode.WORKFLOW_RUN_LIMIT_REACHED,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async incrementTriggerMetrics({
|
||||
workflowRunId,
|
||||
triggerType,
|
||||
}: {
|
||||
workflowRunId: string;
|
||||
triggerType: string;
|
||||
}) {
|
||||
let key: MetricsKeys;
|
||||
|
||||
switch (triggerType) {
|
||||
case WorkflowTriggerType.DATABASE_EVENT:
|
||||
key = MetricsKeys.WorkflowRunStartedDatabaseEventTrigger;
|
||||
break;
|
||||
case WorkflowTriggerType.CRON:
|
||||
key = MetricsKeys.WorkflowRunStartedCronTrigger;
|
||||
break;
|
||||
case WorkflowTriggerType.WEBHOOK:
|
||||
key = MetricsKeys.WorkflowRunStartedWebhookTrigger;
|
||||
break;
|
||||
case WorkflowTriggerType.MANUAL:
|
||||
key = MetricsKeys.WorkflowRunStartedManualTrigger;
|
||||
break;
|
||||
default:
|
||||
throw new Error('Invalid trigger type');
|
||||
}
|
||||
|
||||
await this.metricsService.incrementCounter({
|
||||
key,
|
||||
eventId: workflowRunId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ import { Module } from '@nestjs/common';
|
||||
|
||||
import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm';
|
||||
|
||||
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';
|
||||
import { RecordPositionModule } from 'src/engine/core-modules/record-position/record-position.module';
|
||||
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
|
||||
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
|
||||
@ -13,6 +14,7 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne
|
||||
WorkflowCommonModule,
|
||||
NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'core'),
|
||||
RecordPositionModule,
|
||||
MetricsModule,
|
||||
],
|
||||
providers: [WorkflowRunWorkspaceService, ScopedWorkspaceContextFactory],
|
||||
exports: [WorkflowRunWorkspaceService],
|
||||
|
||||
@ -6,6 +6,8 @@ import { v4 } from 'uuid';
|
||||
|
||||
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
|
||||
import { objectRecordChangedValues } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-values';
|
||||
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
|
||||
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
|
||||
import { RecordPositionService } from 'src/engine/core-modules/record-position/services/record-position.service';
|
||||
import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
|
||||
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
|
||||
@ -35,6 +37,7 @@ export class WorkflowRunWorkspaceService {
|
||||
@InjectRepository(ObjectMetadataEntity, 'core')
|
||||
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
|
||||
private readonly recordPositionService: RecordPositionService,
|
||||
private readonly metricsService: MetricsService,
|
||||
) {}
|
||||
|
||||
async createWorkflowRun({
|
||||
@ -215,6 +218,14 @@ export class WorkflowRunWorkspaceService {
|
||||
workflowRunBefore: workflowRunToUpdate,
|
||||
updatedFields: ['status', 'endedAt', 'output'],
|
||||
});
|
||||
|
||||
await this.metricsService.incrementCounter({
|
||||
key:
|
||||
status === WorkflowRunStatus.COMPLETED
|
||||
? MetricsKeys.WorkflowRunCompleted
|
||||
: MetricsKeys.WorkflowRunFailed,
|
||||
eventId: workflowRunId,
|
||||
});
|
||||
}
|
||||
|
||||
async saveWorkflowRunState({
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { BillingModule } from 'src/engine/core-modules/billing/billing.module';
|
||||
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';
|
||||
import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.module';
|
||||
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
|
||||
import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module';
|
||||
@ -15,6 +16,7 @@ import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-ru
|
||||
ThrottlerModule,
|
||||
BillingModule,
|
||||
WorkflowRunModule,
|
||||
MetricsModule,
|
||||
],
|
||||
providers: [WorkflowRunnerWorkspaceService, RunWorkflowJob],
|
||||
exports: [WorkflowRunnerWorkspaceService],
|
||||
|
||||
Reference in New Issue
Block a user