Add throttling on workflow execution (#9263)

We want to avoid infinite loops using workflows. Adding a throttler with
a limit of 10 executions / sec by default for each workflow.

We were not emitting events on workflow actions so loops could not
happen. Since throttler is there we can now and these.

Adding an error message so the user knows when it happens.
<img width="1284" alt="Capture d’écran 2024-12-27 à 17 05 20"
src="https://github.com/user-attachments/assets/dafa837b-5b4c-48be-8207-c90f5c71a236"
/>
This commit is contained in:
Thomas Trompette
2024-12-30 10:52:33 +01:00
committed by GitHub
parent ba2f55a627
commit c52a4924b9
10 changed files with 215 additions and 25 deletions

View File

@ -167,6 +167,7 @@ type StepRunOutput = {
export type WorkflowRunOutput = {
steps: Record<string, StepRunOutput>;
error?: string;
};
export type WorkflowRun = {

View File

@ -485,6 +485,13 @@ export class EnvironmentVariables {
@CastToPositiveNumber()
SERVERLESS_FUNCTION_EXEC_THROTTLE_TTL = 1000;
@CastToPositiveNumber()
WORKFLOW_EXEC_THROTTLE_LIMIT = 10;
// milliseconds
@CastToPositiveNumber()
WORKFLOW_EXEC_THROTTLE_TTL = 1000;
// SSL
@IsString()
@IsOptional()

View File

@ -46,6 +46,7 @@ type StepRunOutput = {
export type WorkflowRunOutput = {
steps: Record<string, StepRunOutput>;
error?: string;
};
@WorkspaceEntity({

View File

@ -1,14 +1,31 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
RecordCRUDActionException,
RecordCRUDActionExceptionCode,
} from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception';
import { WorkflowCreateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type';
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
@Injectable()
export class CreateRecordWorkflowAction implements WorkflowAction {
constructor(private readonly twentyORMManager: TwentyORMManager) {}
constructor(
private readonly twentyORMManager: TwentyORMManager,
@InjectRepository(ObjectMetadataEntity, 'metadata')
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
) {}
async execute(
workflowActionInput: WorkflowCreateRecordActionInput,
@ -17,10 +34,47 @@ export class CreateRecordWorkflowAction implements WorkflowAction {
workflowActionInput.objectName,
);
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
if (!workspaceId) {
throw new RecordCRUDActionException(
'Failed to create: Workspace ID is required',
RecordCRUDActionExceptionCode.INVALID_REQUEST,
);
}
const objectMetadata = await this.objectMetadataRepository.findOne({
where: {
nameSingular: workflowActionInput.objectName,
},
});
if (!objectMetadata) {
throw new RecordCRUDActionException(
'Failed to create: Object metadata not found',
RecordCRUDActionExceptionCode.INVALID_REQUEST,
);
}
const objectRecord = await repository.save(
workflowActionInput.objectRecord,
);
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: workflowActionInput.objectName,
action: DatabaseEventAction.CREATED,
events: [
{
recordId: objectRecord.id,
objectMetadata,
properties: {
after: objectRecord,
},
},
],
workspaceId,
});
return {
result: objectRecord,
};

View File

@ -1,8 +1,15 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
RecordCRUDActionException,
RecordCRUDActionExceptionCode,
@ -12,7 +19,13 @@ import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/wor
@Injectable()
export class DeleteRecordWorkflowAction implements WorkflowAction {
constructor(private readonly twentyORMManager: TwentyORMManager) {}
constructor(
private readonly twentyORMManager: TwentyORMManager,
@InjectRepository(ObjectMetadataEntity, 'metadata')
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
) {}
async execute(
workflowActionInput: WorkflowDeleteRecordActionInput,
@ -21,6 +34,28 @@ export class DeleteRecordWorkflowAction implements WorkflowAction {
workflowActionInput.objectName,
);
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
if (!workspaceId) {
throw new RecordCRUDActionException(
'Failed to delete: Workspace ID is required',
RecordCRUDActionExceptionCode.INVALID_REQUEST,
);
}
const objectMetadata = await this.objectMetadataRepository.findOne({
where: {
nameSingular: workflowActionInput.objectName,
},
});
if (!objectMetadata) {
throw new RecordCRUDActionException(
'Failed to delete: Object metadata not found',
RecordCRUDActionExceptionCode.INVALID_REQUEST,
);
}
const objectRecord = await repository.findOne({
where: {
id: workflowActionInput.objectRecordId,
@ -34,8 +69,21 @@ export class DeleteRecordWorkflowAction implements WorkflowAction {
);
}
await repository.update(workflowActionInput.objectRecordId, {
deletedAt: new Date(),
await repository.softDelete(workflowActionInput.objectRecordId);
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: workflowActionInput.objectName,
action: DatabaseEventAction.DELETED,
events: [
{
recordId: objectRecord.id,
objectMetadata,
properties: {
before: objectRecord,
},
},
],
workspaceId,
});
return {

View File

@ -1,5 +1,8 @@
import { Module } from '@nestjs/common';
import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkspaceCacheStorageModule } from 'src/engine/workspace-cache-storage/workspace-cache-storage.module';
import { CreateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action';
@ -8,7 +11,10 @@ import { FindRecordsWorflowAction } from 'src/modules/workflow/workflow-executor
import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action';
@Module({
imports: [WorkspaceCacheStorageModule],
imports: [
WorkspaceCacheStorageModule,
NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'),
],
providers: [
ScopedWorkspaceContextFactory,
CreateRecordWorkflowAction,

View File

@ -1,12 +1,18 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { getObjectMetadataMapItemByNameSingular } from 'src/engine/metadata-modules/utils/get-object-metadata-map-item-by-name-singular.util';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { formatData } from 'src/engine/twenty-orm/utils/format-data.util';
import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
RecordCRUDActionException,
RecordCRUDActionExceptionCode,
@ -20,6 +26,9 @@ export class UpdateRecordWorkflowAction implements WorkflowAction {
private readonly twentyORMManager: TwentyORMManager,
private readonly workspaceCacheStorageService: WorkspaceCacheStorageService,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
@InjectRepository(ObjectMetadataEntity, 'metadata')
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
) {}
async execute(
@ -29,28 +38,41 @@ export class UpdateRecordWorkflowAction implements WorkflowAction {
workflowActionInput.objectName,
);
const objectRecord = await repository.findOne({
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
if (!workspaceId) {
throw new RecordCRUDActionException(
'Failed to update: Workspace ID is required',
RecordCRUDActionExceptionCode.INVALID_REQUEST,
);
}
const objectMetadata = await this.objectMetadataRepository.findOne({
where: {
nameSingular: workflowActionInput.objectName,
},
});
if (!objectMetadata) {
throw new RecordCRUDActionException(
'Failed to update: Object metadata not found',
RecordCRUDActionExceptionCode.INVALID_REQUEST,
);
}
const previousObjectRecord = await repository.findOne({
where: {
id: workflowActionInput.objectRecordId,
},
});
if (!objectRecord) {
if (!previousObjectRecord) {
throw new RecordCRUDActionException(
`Failed to update: Record ${workflowActionInput.objectName} with id ${workflowActionInput.objectRecordId} not found`,
RecordCRUDActionExceptionCode.RECORD_NOT_FOUND,
);
}
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
if (!workspaceId) {
throw new RecordCRUDActionException(
'Failed to read: Workspace ID is required',
RecordCRUDActionExceptionCode.INVALID_REQUEST,
);
}
const currentCacheVersion =
await this.workspaceCacheStorageService.getMetadataVersion(workspaceId);
@ -89,9 +111,7 @@ export class UpdateRecordWorkflowAction implements WorkflowAction {
if (workflowActionInput.fieldsToUpdate.length === 0) {
return {
result: {
...objectRecord,
},
result: previousObjectRecord,
};
}
@ -117,11 +137,29 @@ export class UpdateRecordWorkflowAction implements WorkflowAction {
...objectRecordFormatted,
});
const updatedObjectRecord = {
...previousObjectRecord,
...objectRecordWithFilteredFields,
};
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: workflowActionInput.objectName,
action: DatabaseEventAction.UPDATED,
events: [
{
recordId: previousObjectRecord.id,
objectMetadata,
properties: {
before: previousObjectRecord,
after: updatedObjectRecord,
},
},
],
workspaceId,
});
return {
result: {
...objectRecord,
...objectRecordWithFilteredFields,
},
result: updatedObjectRecord,
};
}
}

View File

@ -11,4 +11,5 @@ export enum WorkflowRunExceptionCode {
WORKFLOW_RUN_NOT_FOUND = 'WORKFLOW_RUN_NOT_FOUND',
INVALID_OPERATION = 'INVALID_OPERATION',
INVALID_INPUT = 'INVALID_INPUT',
WORKFLOW_RUN_LIMIT_REACHED = 'WORKFLOW_RUN_LIMIT_REACHED',
}

View File

@ -1,11 +1,17 @@
import { Scope } from '@nestjs/common';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { ThrottlerService } from 'src/engine/core-modules/throttler/throttler.service';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service';
import {
WorkflowRunException,
WorkflowRunExceptionCode,
} from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service';
export type RunWorkflowJobData = {
@ -21,6 +27,8 @@ export class RunWorkflowJob {
private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
private readonly workflowExecutorWorkspaceService: WorkflowExecutorWorkspaceService,
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
private readonly throttlerService: ThrottlerService,
private readonly environmentService: EnvironmentService,
) {}
@Process(RunWorkflowJob.name)
@ -36,6 +44,8 @@ export class RunWorkflowJob {
workflowVersionId,
);
await this.throttleExecution(workflowVersion.workflowId, workflowRunId);
const { steps, status } =
await this.workflowExecutorWorkspaceService.execute({
currentStepIndex: 0,
@ -57,4 +67,27 @@ export class RunWorkflowJob {
},
);
}
private async throttleExecution(workflowId: string, workflowRunId: string) {
try {
await this.throttlerService.throttle(
`${workflowId}-workflow-execution`,
this.environmentService.get('WORKFLOW_EXEC_THROTTLE_LIMIT'),
this.environmentService.get('WORKFLOW_EXEC_THROTTLE_TTL'),
);
} catch (error) {
await this.workflowRunWorkspaceService.endWorkflowRun(
workflowRunId,
WorkflowRunStatus.FAILED,
{
steps: {},
error: 'Workflow execution rate limit exceeded',
},
);
throw new WorkflowRunException(
'Workflow execution rate limit exceeded',
WorkflowRunExceptionCode.WORKFLOW_RUN_LIMIT_REACHED,
);
}
}
}

View File

@ -1,13 +1,14 @@
import { Module } from '@nestjs/common';
import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.module';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module';
import { RunWorkflowJob } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job';
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service';
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service';
@Module({
imports: [WorkflowCommonModule, WorkflowExecutorModule],
imports: [WorkflowCommonModule, WorkflowExecutorModule, ThrottlerModule],
providers: [
WorkflowRunnerWorkspaceService,
WorkflowRunWorkspaceService,