From c52a4924b9a1b6826478dc4e698e5cd4b5de7eaa Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Mon, 30 Dec 2024 10:52:33 +0100 Subject: [PATCH] Add throttling on workflow execution (#9263) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We want to avoid infinite loops using workflows. Adding a throttler with a limit of 10 executions / sec by default for each workflow. We were not emitting events on workflow actions so loops could not happen. Since throttler is there we can now and these. Adding an error message so the user knows when it happens. Capture d’écran 2024-12-27 à 17 05 20 --- .../src/modules/workflow/types/Workflow.ts | 1 + .../environment/environment-variables.ts | 7 ++ .../workflow-run.workspace-entity.ts | 1 + .../create-record.workflow-action.ts | 56 +++++++++++++- .../delete-record.workflow-action.ts | 54 +++++++++++++- .../record-crud/record-crud-action.module.ts | 8 +- .../update-record.workflow-action.ts | 74 ++++++++++++++----- .../exceptions/workflow-run.exception.ts | 1 + .../workflow-runner/jobs/run-workflow.job.ts | 33 +++++++++ .../workflow-runner/workflow-runner.module.ts | 5 +- 10 files changed, 215 insertions(+), 25 deletions(-) diff --git a/packages/twenty-front/src/modules/workflow/types/Workflow.ts b/packages/twenty-front/src/modules/workflow/types/Workflow.ts index 71af62edb..e7c2dfa15 100644 --- a/packages/twenty-front/src/modules/workflow/types/Workflow.ts +++ b/packages/twenty-front/src/modules/workflow/types/Workflow.ts @@ -167,6 +167,7 @@ type StepRunOutput = { export type WorkflowRunOutput = { steps: Record; + error?: string; }; export type WorkflowRun = { diff --git a/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts b/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts index 687c94c00..96b46927d 100644 --- a/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts +++ b/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts @@ -485,6 +485,13 @@ export class EnvironmentVariables { @CastToPositiveNumber() SERVERLESS_FUNCTION_EXEC_THROTTLE_TTL = 1000; + @CastToPositiveNumber() + WORKFLOW_EXEC_THROTTLE_LIMIT = 10; + + // milliseconds + @CastToPositiveNumber() + WORKFLOW_EXEC_THROTTLE_TTL = 1000; + // SSL @IsString() @IsOptional() diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 537a85364..f14716ee9 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -46,6 +46,7 @@ type StepRunOutput = { export type WorkflowRunOutput = { steps: Record; + error?: string; }; @WorkspaceEntity({ diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts index 1945fd70b..e3eb3e4c9 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts @@ -1,14 +1,31 @@ import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; +import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; +import { + RecordCRUDActionException, + RecordCRUDActionExceptionCode, +} from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception'; import { WorkflowCreateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; @Injectable() export class CreateRecordWorkflowAction implements WorkflowAction { - constructor(private readonly twentyORMManager: TwentyORMManager) {} + constructor( + private readonly twentyORMManager: TwentyORMManager, + @InjectRepository(ObjectMetadataEntity, 'metadata') + private readonly objectMetadataRepository: Repository, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, + private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, + ) {} async execute( workflowActionInput: WorkflowCreateRecordActionInput, @@ -17,10 +34,47 @@ export class CreateRecordWorkflowAction implements WorkflowAction { workflowActionInput.objectName, ); + const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + + if (!workspaceId) { + throw new RecordCRUDActionException( + 'Failed to create: Workspace ID is required', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + + const objectMetadata = await this.objectMetadataRepository.findOne({ + where: { + nameSingular: workflowActionInput.objectName, + }, + }); + + if (!objectMetadata) { + throw new RecordCRUDActionException( + 'Failed to create: Object metadata not found', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + const objectRecord = await repository.save( workflowActionInput.objectRecord, ); + this.workspaceEventEmitter.emitDatabaseBatchEvent({ + objectMetadataNameSingular: workflowActionInput.objectName, + action: DatabaseEventAction.CREATED, + events: [ + { + recordId: objectRecord.id, + objectMetadata, + properties: { + after: objectRecord, + }, + }, + ], + workspaceId, + }); + return { result: objectRecord, }; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts index ecfa32955..29fa9c3fd 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts @@ -1,8 +1,15 @@ import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; +import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { RecordCRUDActionException, RecordCRUDActionExceptionCode, @@ -12,7 +19,13 @@ import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/wor @Injectable() export class DeleteRecordWorkflowAction implements WorkflowAction { - constructor(private readonly twentyORMManager: TwentyORMManager) {} + constructor( + private readonly twentyORMManager: TwentyORMManager, + @InjectRepository(ObjectMetadataEntity, 'metadata') + private readonly objectMetadataRepository: Repository, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, + private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, + ) {} async execute( workflowActionInput: WorkflowDeleteRecordActionInput, @@ -21,6 +34,28 @@ export class DeleteRecordWorkflowAction implements WorkflowAction { workflowActionInput.objectName, ); + const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + + if (!workspaceId) { + throw new RecordCRUDActionException( + 'Failed to delete: Workspace ID is required', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + + const objectMetadata = await this.objectMetadataRepository.findOne({ + where: { + nameSingular: workflowActionInput.objectName, + }, + }); + + if (!objectMetadata) { + throw new RecordCRUDActionException( + 'Failed to delete: Object metadata not found', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + const objectRecord = await repository.findOne({ where: { id: workflowActionInput.objectRecordId, @@ -34,8 +69,21 @@ export class DeleteRecordWorkflowAction implements WorkflowAction { ); } - await repository.update(workflowActionInput.objectRecordId, { - deletedAt: new Date(), + await repository.softDelete(workflowActionInput.objectRecordId); + + this.workspaceEventEmitter.emitDatabaseBatchEvent({ + objectMetadataNameSingular: workflowActionInput.objectName, + action: DatabaseEventAction.DELETED, + events: [ + { + recordId: objectRecord.id, + objectMetadata, + properties: { + before: objectRecord, + }, + }, + ], + workspaceId, }); return { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts index fb3e6d78c..6702859d0 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts @@ -1,5 +1,8 @@ import { Module } from '@nestjs/common'; +import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm'; + +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkspaceCacheStorageModule } from 'src/engine/workspace-cache-storage/workspace-cache-storage.module'; import { CreateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action'; @@ -8,7 +11,10 @@ import { FindRecordsWorflowAction } from 'src/modules/workflow/workflow-executor import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action'; @Module({ - imports: [WorkspaceCacheStorageModule], + imports: [ + WorkspaceCacheStorageModule, + NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'), + ], providers: [ ScopedWorkspaceContextFactory, CreateRecordWorkflowAction, diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts index e6f47c430..35798f651 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts @@ -1,12 +1,18 @@ import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { getObjectMetadataMapItemByNameSingular } from 'src/engine/metadata-modules/utils/get-object-metadata-map-item-by-name-singular.util'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { formatData } from 'src/engine/twenty-orm/utils/format-data.util'; import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { RecordCRUDActionException, RecordCRUDActionExceptionCode, @@ -20,6 +26,9 @@ export class UpdateRecordWorkflowAction implements WorkflowAction { private readonly twentyORMManager: TwentyORMManager, private readonly workspaceCacheStorageService: WorkspaceCacheStorageService, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, + @InjectRepository(ObjectMetadataEntity, 'metadata') + private readonly objectMetadataRepository: Repository, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, ) {} async execute( @@ -29,28 +38,41 @@ export class UpdateRecordWorkflowAction implements WorkflowAction { workflowActionInput.objectName, ); - const objectRecord = await repository.findOne({ + const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + + if (!workspaceId) { + throw new RecordCRUDActionException( + 'Failed to update: Workspace ID is required', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + + const objectMetadata = await this.objectMetadataRepository.findOne({ + where: { + nameSingular: workflowActionInput.objectName, + }, + }); + + if (!objectMetadata) { + throw new RecordCRUDActionException( + 'Failed to update: Object metadata not found', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + + const previousObjectRecord = await repository.findOne({ where: { id: workflowActionInput.objectRecordId, }, }); - if (!objectRecord) { + if (!previousObjectRecord) { throw new RecordCRUDActionException( `Failed to update: Record ${workflowActionInput.objectName} with id ${workflowActionInput.objectRecordId} not found`, RecordCRUDActionExceptionCode.RECORD_NOT_FOUND, ); } - const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; - - if (!workspaceId) { - throw new RecordCRUDActionException( - 'Failed to read: Workspace ID is required', - RecordCRUDActionExceptionCode.INVALID_REQUEST, - ); - } - const currentCacheVersion = await this.workspaceCacheStorageService.getMetadataVersion(workspaceId); @@ -89,9 +111,7 @@ export class UpdateRecordWorkflowAction implements WorkflowAction { if (workflowActionInput.fieldsToUpdate.length === 0) { return { - result: { - ...objectRecord, - }, + result: previousObjectRecord, }; } @@ -117,11 +137,29 @@ export class UpdateRecordWorkflowAction implements WorkflowAction { ...objectRecordFormatted, }); + const updatedObjectRecord = { + ...previousObjectRecord, + ...objectRecordWithFilteredFields, + }; + + this.workspaceEventEmitter.emitDatabaseBatchEvent({ + objectMetadataNameSingular: workflowActionInput.objectName, + action: DatabaseEventAction.UPDATED, + events: [ + { + recordId: previousObjectRecord.id, + objectMetadata, + properties: { + before: previousObjectRecord, + after: updatedObjectRecord, + }, + }, + ], + workspaceId, + }); + return { - result: { - ...objectRecord, - ...objectRecordWithFilteredFields, - }, + result: updatedObjectRecord, }; } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts index 02f50928d..9ed0bfe59 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts @@ -11,4 +11,5 @@ export enum WorkflowRunExceptionCode { WORKFLOW_RUN_NOT_FOUND = 'WORKFLOW_RUN_NOT_FOUND', INVALID_OPERATION = 'INVALID_OPERATION', INVALID_INPUT = 'INVALID_INPUT', + WORKFLOW_RUN_LIMIT_REACHED = 'WORKFLOW_RUN_LIMIT_REACHED', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index 644595df6..f6069bf3f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -1,11 +1,17 @@ import { Scope } from '@nestjs/common'; +import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service'; import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { ThrottlerService } from 'src/engine/core-modules/throttler/throttler.service'; import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; +import { + WorkflowRunException, + WorkflowRunExceptionCode, +} from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; export type RunWorkflowJobData = { @@ -21,6 +27,8 @@ export class RunWorkflowJob { private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService, private readonly workflowExecutorWorkspaceService: WorkflowExecutorWorkspaceService, private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, + private readonly throttlerService: ThrottlerService, + private readonly environmentService: EnvironmentService, ) {} @Process(RunWorkflowJob.name) @@ -36,6 +44,8 @@ export class RunWorkflowJob { workflowVersionId, ); + await this.throttleExecution(workflowVersion.workflowId, workflowRunId); + const { steps, status } = await this.workflowExecutorWorkspaceService.execute({ currentStepIndex: 0, @@ -57,4 +67,27 @@ export class RunWorkflowJob { }, ); } + + private async throttleExecution(workflowId: string, workflowRunId: string) { + try { + await this.throttlerService.throttle( + `${workflowId}-workflow-execution`, + this.environmentService.get('WORKFLOW_EXEC_THROTTLE_LIMIT'), + this.environmentService.get('WORKFLOW_EXEC_THROTTLE_TTL'), + ); + } catch (error) { + await this.workflowRunWorkspaceService.endWorkflowRun( + workflowRunId, + WorkflowRunStatus.FAILED, + { + steps: {}, + error: 'Workflow execution rate limit exceeded', + }, + ); + throw new WorkflowRunException( + 'Workflow execution rate limit exceeded', + WorkflowRunExceptionCode.WORKFLOW_RUN_LIMIT_REACHED, + ); + } + } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts index 72871ae83..19eda68e5 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts @@ -1,13 +1,14 @@ import { Module } from '@nestjs/common'; +import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.module'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module'; import { RunWorkflowJob } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; -import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; +import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; @Module({ - imports: [WorkflowCommonModule, WorkflowExecutorModule], + imports: [WorkflowCommonModule, WorkflowExecutorModule, ThrottlerModule], providers: [ WorkflowRunnerWorkspaceService, WorkflowRunWorkspaceService,