diff --git a/packages/twenty-front/src/modules/workflow/types/Workflow.ts b/packages/twenty-front/src/modules/workflow/types/Workflow.ts index 71af62edb..e7c2dfa15 100644 --- a/packages/twenty-front/src/modules/workflow/types/Workflow.ts +++ b/packages/twenty-front/src/modules/workflow/types/Workflow.ts @@ -167,6 +167,7 @@ type StepRunOutput = { export type WorkflowRunOutput = { steps: Record; + error?: string; }; export type WorkflowRun = { diff --git a/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts b/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts index 687c94c00..96b46927d 100644 --- a/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts +++ b/packages/twenty-server/src/engine/core-modules/environment/environment-variables.ts @@ -485,6 +485,13 @@ export class EnvironmentVariables { @CastToPositiveNumber() SERVERLESS_FUNCTION_EXEC_THROTTLE_TTL = 1000; + @CastToPositiveNumber() + WORKFLOW_EXEC_THROTTLE_LIMIT = 10; + + // milliseconds + @CastToPositiveNumber() + WORKFLOW_EXEC_THROTTLE_TTL = 1000; + // SSL @IsString() @IsOptional() diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 537a85364..f14716ee9 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -46,6 +46,7 @@ type StepRunOutput = { export type WorkflowRunOutput = { steps: Record; + error?: string; }; @WorkspaceEntity({ diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts index 1945fd70b..e3eb3e4c9 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts @@ -1,14 +1,31 @@ import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; +import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; +import { + RecordCRUDActionException, + RecordCRUDActionExceptionCode, +} from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception'; import { WorkflowCreateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; @Injectable() export class CreateRecordWorkflowAction implements WorkflowAction { - constructor(private readonly twentyORMManager: TwentyORMManager) {} + constructor( + private readonly twentyORMManager: TwentyORMManager, + @InjectRepository(ObjectMetadataEntity, 'metadata') + private readonly objectMetadataRepository: Repository, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, + private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, + ) {} async execute( workflowActionInput: WorkflowCreateRecordActionInput, @@ -17,10 +34,47 @@ export class CreateRecordWorkflowAction implements WorkflowAction { workflowActionInput.objectName, ); + const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + + if (!workspaceId) { + throw new RecordCRUDActionException( + 'Failed to create: Workspace ID is required', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + + const objectMetadata = await this.objectMetadataRepository.findOne({ + where: { + nameSingular: workflowActionInput.objectName, + }, + }); + + if (!objectMetadata) { + throw new RecordCRUDActionException( + 'Failed to create: Object metadata not found', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + const objectRecord = await repository.save( workflowActionInput.objectRecord, ); + this.workspaceEventEmitter.emitDatabaseBatchEvent({ + objectMetadataNameSingular: workflowActionInput.objectName, + action: DatabaseEventAction.CREATED, + events: [ + { + recordId: objectRecord.id, + objectMetadata, + properties: { + after: objectRecord, + }, + }, + ], + workspaceId, + }); + return { result: objectRecord, }; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts index ecfa32955..29fa9c3fd 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts @@ -1,8 +1,15 @@ import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; +import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { RecordCRUDActionException, RecordCRUDActionExceptionCode, @@ -12,7 +19,13 @@ import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/wor @Injectable() export class DeleteRecordWorkflowAction implements WorkflowAction { - constructor(private readonly twentyORMManager: TwentyORMManager) {} + constructor( + private readonly twentyORMManager: TwentyORMManager, + @InjectRepository(ObjectMetadataEntity, 'metadata') + private readonly objectMetadataRepository: Repository, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, + private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, + ) {} async execute( workflowActionInput: WorkflowDeleteRecordActionInput, @@ -21,6 +34,28 @@ export class DeleteRecordWorkflowAction implements WorkflowAction { workflowActionInput.objectName, ); + const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + + if (!workspaceId) { + throw new RecordCRUDActionException( + 'Failed to delete: Workspace ID is required', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + + const objectMetadata = await this.objectMetadataRepository.findOne({ + where: { + nameSingular: workflowActionInput.objectName, + }, + }); + + if (!objectMetadata) { + throw new RecordCRUDActionException( + 'Failed to delete: Object metadata not found', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + const objectRecord = await repository.findOne({ where: { id: workflowActionInput.objectRecordId, @@ -34,8 +69,21 @@ export class DeleteRecordWorkflowAction implements WorkflowAction { ); } - await repository.update(workflowActionInput.objectRecordId, { - deletedAt: new Date(), + await repository.softDelete(workflowActionInput.objectRecordId); + + this.workspaceEventEmitter.emitDatabaseBatchEvent({ + objectMetadataNameSingular: workflowActionInput.objectName, + action: DatabaseEventAction.DELETED, + events: [ + { + recordId: objectRecord.id, + objectMetadata, + properties: { + before: objectRecord, + }, + }, + ], + workspaceId, }); return { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts index fb3e6d78c..6702859d0 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts @@ -1,5 +1,8 @@ import { Module } from '@nestjs/common'; +import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm'; + +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkspaceCacheStorageModule } from 'src/engine/workspace-cache-storage/workspace-cache-storage.module'; import { CreateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action'; @@ -8,7 +11,10 @@ import { FindRecordsWorflowAction } from 'src/modules/workflow/workflow-executor import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action'; @Module({ - imports: [WorkspaceCacheStorageModule], + imports: [ + WorkspaceCacheStorageModule, + NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'), + ], providers: [ ScopedWorkspaceContextFactory, CreateRecordWorkflowAction, diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts index e6f47c430..35798f651 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts @@ -1,12 +1,18 @@ import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { getObjectMetadataMapItemByNameSingular } from 'src/engine/metadata-modules/utils/get-object-metadata-map-item-by-name-singular.util'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { formatData } from 'src/engine/twenty-orm/utils/format-data.util'; import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { RecordCRUDActionException, RecordCRUDActionExceptionCode, @@ -20,6 +26,9 @@ export class UpdateRecordWorkflowAction implements WorkflowAction { private readonly twentyORMManager: TwentyORMManager, private readonly workspaceCacheStorageService: WorkspaceCacheStorageService, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, + @InjectRepository(ObjectMetadataEntity, 'metadata') + private readonly objectMetadataRepository: Repository, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, ) {} async execute( @@ -29,28 +38,41 @@ export class UpdateRecordWorkflowAction implements WorkflowAction { workflowActionInput.objectName, ); - const objectRecord = await repository.findOne({ + const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + + if (!workspaceId) { + throw new RecordCRUDActionException( + 'Failed to update: Workspace ID is required', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + + const objectMetadata = await this.objectMetadataRepository.findOne({ + where: { + nameSingular: workflowActionInput.objectName, + }, + }); + + if (!objectMetadata) { + throw new RecordCRUDActionException( + 'Failed to update: Object metadata not found', + RecordCRUDActionExceptionCode.INVALID_REQUEST, + ); + } + + const previousObjectRecord = await repository.findOne({ where: { id: workflowActionInput.objectRecordId, }, }); - if (!objectRecord) { + if (!previousObjectRecord) { throw new RecordCRUDActionException( `Failed to update: Record ${workflowActionInput.objectName} with id ${workflowActionInput.objectRecordId} not found`, RecordCRUDActionExceptionCode.RECORD_NOT_FOUND, ); } - const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; - - if (!workspaceId) { - throw new RecordCRUDActionException( - 'Failed to read: Workspace ID is required', - RecordCRUDActionExceptionCode.INVALID_REQUEST, - ); - } - const currentCacheVersion = await this.workspaceCacheStorageService.getMetadataVersion(workspaceId); @@ -89,9 +111,7 @@ export class UpdateRecordWorkflowAction implements WorkflowAction { if (workflowActionInput.fieldsToUpdate.length === 0) { return { - result: { - ...objectRecord, - }, + result: previousObjectRecord, }; } @@ -117,11 +137,29 @@ export class UpdateRecordWorkflowAction implements WorkflowAction { ...objectRecordFormatted, }); + const updatedObjectRecord = { + ...previousObjectRecord, + ...objectRecordWithFilteredFields, + }; + + this.workspaceEventEmitter.emitDatabaseBatchEvent({ + objectMetadataNameSingular: workflowActionInput.objectName, + action: DatabaseEventAction.UPDATED, + events: [ + { + recordId: previousObjectRecord.id, + objectMetadata, + properties: { + before: previousObjectRecord, + after: updatedObjectRecord, + }, + }, + ], + workspaceId, + }); + return { - result: { - ...objectRecord, - ...objectRecordWithFilteredFields, - }, + result: updatedObjectRecord, }; } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts index 02f50928d..9ed0bfe59 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts @@ -11,4 +11,5 @@ export enum WorkflowRunExceptionCode { WORKFLOW_RUN_NOT_FOUND = 'WORKFLOW_RUN_NOT_FOUND', INVALID_OPERATION = 'INVALID_OPERATION', INVALID_INPUT = 'INVALID_INPUT', + WORKFLOW_RUN_LIMIT_REACHED = 'WORKFLOW_RUN_LIMIT_REACHED', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index 644595df6..f6069bf3f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -1,11 +1,17 @@ import { Scope } from '@nestjs/common'; +import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service'; import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { ThrottlerService } from 'src/engine/core-modules/throttler/throttler.service'; import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; +import { + WorkflowRunException, + WorkflowRunExceptionCode, +} from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; export type RunWorkflowJobData = { @@ -21,6 +27,8 @@ export class RunWorkflowJob { private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService, private readonly workflowExecutorWorkspaceService: WorkflowExecutorWorkspaceService, private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, + private readonly throttlerService: ThrottlerService, + private readonly environmentService: EnvironmentService, ) {} @Process(RunWorkflowJob.name) @@ -36,6 +44,8 @@ export class RunWorkflowJob { workflowVersionId, ); + await this.throttleExecution(workflowVersion.workflowId, workflowRunId); + const { steps, status } = await this.workflowExecutorWorkspaceService.execute({ currentStepIndex: 0, @@ -57,4 +67,27 @@ export class RunWorkflowJob { }, ); } + + private async throttleExecution(workflowId: string, workflowRunId: string) { + try { + await this.throttlerService.throttle( + `${workflowId}-workflow-execution`, + this.environmentService.get('WORKFLOW_EXEC_THROTTLE_LIMIT'), + this.environmentService.get('WORKFLOW_EXEC_THROTTLE_TTL'), + ); + } catch (error) { + await this.workflowRunWorkspaceService.endWorkflowRun( + workflowRunId, + WorkflowRunStatus.FAILED, + { + steps: {}, + error: 'Workflow execution rate limit exceeded', + }, + ); + throw new WorkflowRunException( + 'Workflow execution rate limit exceeded', + WorkflowRunExceptionCode.WORKFLOW_RUN_LIMIT_REACHED, + ); + } + } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts index 72871ae83..19eda68e5 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts @@ -1,13 +1,14 @@ import { Module } from '@nestjs/common'; +import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.module'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module'; import { RunWorkflowJob } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; -import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; +import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; @Module({ - imports: [WorkflowCommonModule, WorkflowExecutorModule], + imports: [WorkflowCommonModule, WorkflowExecutorModule, ThrottlerModule], providers: [ WorkflowRunnerWorkspaceService, WorkflowRunWorkspaceService,