From 446924cf247f3eecd545a1b8b8cfb9d3b04f536e Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Mon, 24 Feb 2025 14:36:24 +0100 Subject: [PATCH] Migrate workflow actions to executors (#10432) Actions will now: - receive the complete input - get the step they want to execute by themself - check that the type is the right one - resolve variables These all share a common executor interface. It will allow for actions with a special execution process (forms, loop, router) to have all required informations. Main workflow executor should: - find the right executor to call for current step - store the output and context from step execution - call next step index --- .../workflow-run.workspace-entity.ts | 11 +- ...actory.ts => workflow-executor.factory.ts} | 12 +- .../interfaces/workflow-action.interface.ts | 5 - .../interfaces/workflow-executor.interface.ts | 8 ++ .../types/workflow-executor-input.ts | 9 ++ .../types/workflow-executor-output.type.ts | 4 + .../code/code.workflow-action.ts | 33 ++++-- .../guards/is-workflow-code-action.guard.ts | 11 ++ .../is-workflow-send-email-action.guard.ts | 11 ++ .../mail-sender/send-email.workflow-action.ts | 34 ++++-- .../create-record.workflow-action.ts | 41 +++++-- .../delete-record.workflow-action.ts | 35 +++++- .../find-records.workflow-action.ts | 36 +++++- .../is-workflow-create-record-action.guard.ts | 11 ++ .../is-workflow-delete-record-action.guard.ts | 11 ++ .../is-workflow-find-records-action.guard.ts | 11 ++ .../is-workflow-update-record-action.guard.ts | 11 ++ .../record-crud/record-crud-action.module.ts | 6 +- .../update-record.workflow-action.ts | 35 +++++- .../types/workflow-action-result.type.ts | 10 -- .../workflow-executor.module.ts | 4 +- .../workflow-executor.workspace-service.ts | 105 ++++++------------ .../workflow-runner/jobs/run-workflow.job.ts | 11 +- .../workflow-run.workspace-service.ts | 10 +- 24 files changed, 320 insertions(+), 155 deletions(-) rename packages/twenty-server/src/modules/workflow/workflow-executor/factories/{workflow-action.factory.ts => workflow-executor.factory.ts} (82%) delete mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-output.type.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/guards/is-workflow-code-action.guard.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/guards/is-workflow-send-email-action.guard.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-create-record-action.guard.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-delete-record-action.guard.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-find-records-action.guard.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-update-record-action.guard.ts delete mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type.ts diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 2b3242d02..f26455819 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -22,6 +22,7 @@ import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/f import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; @@ -32,13 +33,9 @@ export enum WorkflowRunStatus { FAILED = 'FAILED', } -type StepRunOutput = { +export type StepOutput = { id: string; - outputs: { - attemptCount: number; - result: object | undefined; - error: string | undefined; - }[]; + output: WorkflowExecutorOutput; }; export type WorkflowRunOutput = { @@ -46,7 +43,7 @@ export type WorkflowRunOutput = { trigger: WorkflowTrigger; steps: WorkflowAction[]; }; - stepsOutput?: Record; + stepsOutput?: Record; error?: string; }; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-action.factory.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-executor.factory.ts similarity index 82% rename from packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-action.factory.ts rename to packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-executor.factory.ts index 5b98cd3a5..843e28823 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-action.factory.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-executor.factory.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; import { WorkflowStepExecutorException, @@ -10,22 +10,22 @@ import { CodeWorkflowAction } from 'src/modules/workflow/workflow-executor/workf import { SendEmailWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action'; import { CreateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action'; import { DeleteRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action'; -import { FindRecordsWorflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action'; +import { FindRecordsWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action'; import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action'; import { WorkflowActionType } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; @Injectable() -export class WorkflowActionFactory { +export class WorkflowExecutorFactory { constructor( private readonly codeWorkflowAction: CodeWorkflowAction, private readonly sendEmailWorkflowAction: SendEmailWorkflowAction, private readonly createRecordWorkflowAction: CreateRecordWorkflowAction, private readonly updateRecordWorkflowAction: UpdateRecordWorkflowAction, private readonly deleteRecordWorkflowAction: DeleteRecordWorkflowAction, - private readonly findRecordsWorflowAction: FindRecordsWorflowAction, + private readonly findRecordsWorkflowAction: FindRecordsWorkflowAction, ) {} - get(stepType: WorkflowActionType): WorkflowAction { + get(stepType: WorkflowActionType): WorkflowExecutor { switch (stepType) { case WorkflowActionType.CODE: return this.codeWorkflowAction; @@ -38,7 +38,7 @@ export class WorkflowActionFactory { case WorkflowActionType.DELETE_RECORD: return this.deleteRecordWorkflowAction; case WorkflowActionType.FIND_RECORDS: - return this.findRecordsWorflowAction; + return this.findRecordsWorkflowAction; default: throw new WorkflowStepExecutorException( `Workflow step executor not found for step type '${stepType}'`, diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts deleted file mode 100644 index e86aa5d26..000000000 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; - -export interface WorkflowAction { - execute(workflowStepInput: unknown): Promise; -} diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts new file mode 100644 index 000000000..53aa949a4 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts @@ -0,0 +1,8 @@ +import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; +import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; + +export interface WorkflowExecutor { + execute( + workflowExecutorInput: WorkflowExecutorInput, + ): Promise; +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts new file mode 100644 index 000000000..29d8e959a --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts @@ -0,0 +1,9 @@ +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export type WorkflowExecutorInput = { + currentStepIndex: number; + steps: WorkflowAction[]; + context: Record; + workflowRunId: string; + attemptCount?: number; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-output.type.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-output.type.ts new file mode 100644 index 000000000..65ef4ad08 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-output.type.ts @@ -0,0 +1,4 @@ +export type WorkflowExecutorOutput = { + result?: object; + error?: string; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts index 8809d1cf8..4e67ddddc 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; @@ -8,19 +8,38 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; +import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; +import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; +import { isWorkflowCodeAction } from 'src/modules/workflow/workflow-executor/workflow-actions/code/guards/is-workflow-code-action.guard'; import { WorkflowCodeActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/code/types/workflow-code-action-input.type'; -import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; @Injectable() -export class CodeWorkflowAction implements WorkflowAction { +export class CodeWorkflowAction implements WorkflowExecutor { constructor( private readonly serverlessFunctionService: ServerlessFunctionService, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, ) {} - async execute( - workflowActionInput: WorkflowCodeActionInput, - ): Promise { + async execute({ + currentStepIndex, + steps, + context, + }: WorkflowExecutorInput): Promise { + const step = steps[currentStepIndex]; + + if (!isWorkflowCodeAction(step)) { + throw new WorkflowStepExecutorException( + 'Step is not a code action', + WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE, + ); + } + + const workflowActionInput = resolveInput( + step.settings.input, + context, + ) as WorkflowCodeActionInput; + try { const { workspaceId } = this.scopedWorkspaceContextFactory.create(); @@ -40,7 +59,7 @@ export class CodeWorkflowAction implements WorkflowAction { ); if (result.error) { - return { error: result.error }; + return { error: result.error.errorMessage }; } return { result: result.data || {} }; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/guards/is-workflow-code-action.guard.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/guards/is-workflow-code-action.guard.ts new file mode 100644 index 000000000..f063724dc --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/guards/is-workflow-code-action.guard.ts @@ -0,0 +1,11 @@ +import { + WorkflowAction, + WorkflowActionType, + WorkflowCodeAction, +} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export const isWorkflowCodeAction = ( + action: WorkflowAction, +): action is WorkflowCodeAction => { + return action.type === WorkflowActionType.CODE; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/guards/is-workflow-send-email-action.guard.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/guards/is-workflow-send-email-action.guard.ts new file mode 100644 index 000000000..5fc999904 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/guards/is-workflow-send-email-action.guard.ts @@ -0,0 +1,11 @@ +import { + WorkflowAction, + WorkflowActionType, + WorkflowSendEmailAction, +} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export const isWorkflowSendEmailAction = ( + action: WorkflowAction, +): action is WorkflowSendEmailAction => { + return action.type === WorkflowActionType.SEND_EMAIL; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts index 2ba4e9442..a016cee4a 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts @@ -5,7 +5,7 @@ import { JSDOM } from 'jsdom'; import { isDefined, isValidUuid } from 'twenty-shared'; import { z } from 'zod'; -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; @@ -15,19 +15,22 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; +import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; +import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { SendEmailActionException, SendEmailActionExceptionCode, } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/exceptions/send-email-action.exception'; +import { isWorkflowSendEmailAction } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/guards/is-workflow-send-email-action.guard'; import { WorkflowSendEmailActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/types/workflow-send-email-action-input.type'; -import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; export type WorkflowSendEmailStepOutputSchema = { success: boolean; }; @Injectable() -export class SendEmailWorkflowAction implements WorkflowAction { +export class SendEmailWorkflowAction implements WorkflowExecutor { private readonly logger = new Logger(SendEmailWorkflowAction.name); constructor( private readonly gmailClientProvider: GmailClientProvider, @@ -79,12 +82,29 @@ export class SendEmailWorkflowAction implements WorkflowAction { } } - async execute( - workflowActionInput: WorkflowSendEmailActionInput, - ): Promise { + async execute({ + currentStepIndex, + steps, + context, + }: WorkflowExecutorInput): Promise { + const step = steps[currentStepIndex]; + + if (!isWorkflowSendEmailAction(step)) { + throw new WorkflowStepExecutorException( + 'Step is not a send email action', + WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE, + ); + } + const emailProvider = await this.getEmailClient( - workflowActionInput.connectedAccountId, + step.settings.input.connectedAccountId, ); + + const workflowActionInput = resolveInput( + step.settings.input, + context, + ) as WorkflowSendEmailActionInput; + const { email, body, subject } = workflowActionInput; const emailSchema = z.string().trim().email('Invalid email'); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts index 2d580eebd..06489ed06 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts @@ -3,7 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; @@ -11,15 +11,22 @@ import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadat import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; +import { + WorkflowStepExecutorException, + WorkflowStepExecutorExceptionCode, +} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; +import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; +import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, RecordCRUDActionExceptionCode, } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception'; +import { isWorkflowCreateRecordAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-create-record-action.guard'; import { WorkflowCreateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; -import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; @Injectable() -export class CreateRecordWorkflowAction implements WorkflowAction { +export class CreateRecordWorkflowAction implements WorkflowExecutor { constructor( private readonly twentyORMManager: TwentyORMManager, @InjectRepository(ObjectMetadataEntity, 'metadata') @@ -28,12 +35,19 @@ export class CreateRecordWorkflowAction implements WorkflowAction { private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, ) {} - async execute( - workflowActionInput: WorkflowCreateRecordActionInput, - ): Promise { - const repository = await this.twentyORMManager.getRepository( - workflowActionInput.objectName, - ); + async execute({ + currentStepIndex, + steps, + context, + }: WorkflowExecutorInput): Promise { + const step = steps[currentStepIndex]; + + if (!isWorkflowCreateRecordAction(step)) { + throw new WorkflowStepExecutorException( + 'Step is not a create record action', + WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE, + ); + } const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; @@ -44,6 +58,15 @@ export class CreateRecordWorkflowAction implements WorkflowAction { ); } + const workflowActionInput = resolveInput( + step.settings.input, + context, + ) as WorkflowCreateRecordActionInput; + + const repository = await this.twentyORMManager.getRepository( + workflowActionInput.objectName, + ); + const objectMetadata = await this.objectMetadataRepository.findOne({ where: { nameSingular: workflowActionInput.objectName, diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts index 29fa9c3fd..1eb3db79c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts @@ -3,22 +3,29 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; +import { + WorkflowStepExecutorException, + WorkflowStepExecutorExceptionCode, +} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; +import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; +import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, RecordCRUDActionExceptionCode, } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception'; +import { isWorkflowDeleteRecordAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-delete-record-action.guard'; import { WorkflowDeleteRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; -import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; @Injectable() -export class DeleteRecordWorkflowAction implements WorkflowAction { +export class DeleteRecordWorkflowAction implements WorkflowExecutor { constructor( private readonly twentyORMManager: TwentyORMManager, @InjectRepository(ObjectMetadataEntity, 'metadata') @@ -27,9 +34,25 @@ export class DeleteRecordWorkflowAction implements WorkflowAction { private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, ) {} - async execute( - workflowActionInput: WorkflowDeleteRecordActionInput, - ): Promise { + async execute({ + currentStepIndex, + steps, + context, + }: WorkflowExecutorInput): Promise { + const step = steps[currentStepIndex]; + + if (!isWorkflowDeleteRecordAction(step)) { + throw new WorkflowStepExecutorException( + 'Step is not a delete record action', + WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE, + ); + } + + const workflowActionInput = resolveInput( + step.settings.input, + context, + ) as WorkflowDeleteRecordActionInput; + const repository = await this.twentyORMManager.getRepository( workflowActionInput.objectName, ); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts index b5499c51b..bb460cb4f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts @@ -8,7 +8,7 @@ import { ObjectRecordOrderBy, OrderByDirection, } from 'src/engine/api/graphql/workspace-query-builder/interfaces/object-record.interface'; -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; import { QUERY_MAX_RECORDS } from 'src/engine/api/graphql/graphql-query-runner/constants/query-max-records.constant'; import { GraphqlQueryParser } from 'src/engine/api/graphql/graphql-query-runner/graphql-query-parsers/graphql-query.parser'; @@ -21,15 +21,22 @@ import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace. import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { formatResult } from 'src/engine/twenty-orm/utils/format-result.util'; import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service'; +import { + WorkflowStepExecutorException, + WorkflowStepExecutorExceptionCode, +} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; +import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; +import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, RecordCRUDActionExceptionCode, } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception'; +import { isWorkflowFindRecordsAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-find-records-action.guard'; import { WorkflowFindRecordsActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; -import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; @Injectable() -export class FindRecordsWorflowAction implements WorkflowAction { +export class FindRecordsWorkflowAction implements WorkflowExecutor { constructor( private readonly twentyORMManager: TwentyORMManager, private readonly workspaceCacheStorageService: WorkspaceCacheStorageService, @@ -37,12 +44,29 @@ export class FindRecordsWorflowAction implements WorkflowAction { private readonly featureFlagService: FeatureFlagService, ) {} - async execute( - workflowActionInput: WorkflowFindRecordsActionInput, - ): Promise { + async execute({ + currentStepIndex, + steps, + context, + }: WorkflowExecutorInput): Promise { + const step = steps[currentStepIndex]; + + if (!isWorkflowFindRecordsAction(step)) { + throw new WorkflowStepExecutorException( + 'Step is not a find records action', + WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE, + ); + } + + const workflowActionInput = resolveInput( + step.settings.input, + context, + ) as WorkflowFindRecordsActionInput; + const repository = await this.twentyORMManager.getRepository( workflowActionInput.objectName, ); + const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; if (!workspaceId) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-create-record-action.guard.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-create-record-action.guard.ts new file mode 100644 index 000000000..a9e388fdb --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-create-record-action.guard.ts @@ -0,0 +1,11 @@ +import { + WorkflowAction, + WorkflowActionType, + WorkflowCreateRecordAction, +} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export const isWorkflowCreateRecordAction = ( + action: WorkflowAction, +): action is WorkflowCreateRecordAction => { + return action.type === WorkflowActionType.CREATE_RECORD; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-delete-record-action.guard.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-delete-record-action.guard.ts new file mode 100644 index 000000000..5b088c00d --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-delete-record-action.guard.ts @@ -0,0 +1,11 @@ +import { + WorkflowAction, + WorkflowActionType, + WorkflowDeleteRecordAction, +} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export const isWorkflowDeleteRecordAction = ( + action: WorkflowAction, +): action is WorkflowDeleteRecordAction => { + return action.type === WorkflowActionType.DELETE_RECORD; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-find-records-action.guard.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-find-records-action.guard.ts new file mode 100644 index 000000000..a71b6e336 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-find-records-action.guard.ts @@ -0,0 +1,11 @@ +import { + WorkflowAction, + WorkflowActionType, + WorkflowFindRecordsAction, +} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export const isWorkflowFindRecordsAction = ( + action: WorkflowAction, +): action is WorkflowFindRecordsAction => { + return action.type === WorkflowActionType.FIND_RECORDS; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-update-record-action.guard.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-update-record-action.guard.ts new file mode 100644 index 000000000..9f7102168 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-update-record-action.guard.ts @@ -0,0 +1,11 @@ +import { + WorkflowAction, + WorkflowActionType, + WorkflowUpdateRecordAction, +} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export const isWorkflowUpdateRecordAction = ( + action: WorkflowAction, +): action is WorkflowUpdateRecordAction => { + return action.type === WorkflowActionType.UPDATE_RECORD; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts index 5a1a86418..01532f0dc 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module.ts @@ -8,7 +8,7 @@ import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/s import { WorkspaceCacheStorageModule } from 'src/engine/workspace-cache-storage/workspace-cache-storage.module'; import { CreateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action'; import { DeleteRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action'; -import { FindRecordsWorflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action'; +import { FindRecordsWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action'; import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action'; @Module({ @@ -22,13 +22,13 @@ import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-execut CreateRecordWorkflowAction, UpdateRecordWorkflowAction, DeleteRecordWorkflowAction, - FindRecordsWorflowAction, + FindRecordsWorkflowAction, ], exports: [ CreateRecordWorkflowAction, UpdateRecordWorkflowAction, DeleteRecordWorkflowAction, - FindRecordsWorflowAction, + FindRecordsWorkflowAction, ], }) export class RecordCRUDActionModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts index 35798f651..4a41c749a 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts @@ -3,7 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; +import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; @@ -13,15 +13,22 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { formatData } from 'src/engine/twenty-orm/utils/format-data.util'; import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service'; import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; +import { + WorkflowStepExecutorException, + WorkflowStepExecutorExceptionCode, +} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; +import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; +import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, RecordCRUDActionExceptionCode, } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception'; +import { isWorkflowUpdateRecordAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-update-record-action.guard'; import { WorkflowUpdateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; -import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; @Injectable() -export class UpdateRecordWorkflowAction implements WorkflowAction { +export class UpdateRecordWorkflowAction implements WorkflowExecutor { constructor( private readonly twentyORMManager: TwentyORMManager, private readonly workspaceCacheStorageService: WorkspaceCacheStorageService, @@ -31,9 +38,25 @@ export class UpdateRecordWorkflowAction implements WorkflowAction { private readonly workspaceEventEmitter: WorkspaceEventEmitter, ) {} - async execute( - workflowActionInput: WorkflowUpdateRecordActionInput, - ): Promise { + async execute({ + currentStepIndex, + steps, + context, + }: WorkflowExecutorInput): Promise { + const step = steps[currentStepIndex]; + + if (!isWorkflowUpdateRecordAction(step)) { + throw new WorkflowStepExecutorException( + 'Step is not an update record action', + WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE, + ); + } + + const workflowActionInput = resolveInput( + step.settings.input, + context, + ) as WorkflowUpdateRecordActionInput; + const repository = await this.twentyORMManager.getRepository( workflowActionInput.objectName, ); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type.ts deleted file mode 100644 index b856dfb75..000000000 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type.ts +++ /dev/null @@ -1,10 +0,0 @@ -type WorkflowActionError = { - errorType: string; - errorMessage: string; - stackTrace: string; -}; - -export type WorkflowActionResult = { - result?: object; - error?: WorkflowActionError; -}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts index e85d73862..c88f67162 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts @@ -2,7 +2,7 @@ import { Module } from '@nestjs/common'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; -import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; +import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory'; import { CodeActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/code/code-action.module'; import { SendEmailActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email-action.module'; import { RecordCRUDActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module'; @@ -20,7 +20,7 @@ import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow providers: [ WorkflowExecutorWorkspaceService, ScopedWorkspaceContextFactory, - WorkflowActionFactory, + WorkflowExecutorFactory, ], exports: [WorkflowExecutorWorkspaceService], }) diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index 50e3117b9..d491ea9d0 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -1,18 +1,20 @@ import { Injectable } from '@nestjs/common'; +import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; + import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant'; import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { + StepOutput, WorkflowRunOutput, WorkflowRunStatus, } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; -import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; -import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; -import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory'; +import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; +import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; const MAX_RETRIES_ON_FAILURE = 3; @@ -23,9 +25,9 @@ export type WorkflowExecutorState = { }; @Injectable() -export class WorkflowExecutorWorkspaceService { +export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { constructor( - private readonly workflowActionFactory: WorkflowActionFactory, + private readonly workflowExecutorFactory: WorkflowExecutorFactory, private readonly workspaceEventEmitter: WorkspaceEventEmitter, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, @@ -35,84 +37,55 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex, steps, context, - workflowExecutorState, attemptCount = 1, workflowRunId, - }: { - currentStepIndex: number; - steps: WorkflowAction[]; - workflowExecutorState: WorkflowExecutorState; - context: Record; - attemptCount?: number; - workflowRunId: string; - }): Promise { + }: WorkflowExecutorInput): Promise { if (currentStepIndex >= steps.length) { - return { ...workflowExecutorState, status: WorkflowRunStatus.COMPLETED }; + return { + result: { + success: true, + }, + }; } const step = steps[currentStepIndex]; - const workflowAction = this.workflowActionFactory.get(step.type); + const workflowExecutor = this.workflowExecutorFactory.get(step.type); - const actionPayload = resolveInput(step.settings.input, context); - - let result: WorkflowActionResult; + let actionOutput: WorkflowExecutorOutput; try { - result = await workflowAction.execute(actionPayload); + actionOutput = await workflowExecutor.execute({ + currentStepIndex, + steps, + context, + attemptCount, + workflowRunId, + }); } catch (error) { - result = { - error: { - errorType: error.name, - errorMessage: error.message, - stackTrace: error.stack, - }, + actionOutput = { + error: error.message ?? 'Execution result error, no data or error', }; } - const stepOutput = workflowExecutorState.stepsOutput?.[step.id]; - - const error = - result.error?.errorMessage ?? - (result.result ? undefined : 'Execution result error, no data or error'); - - if (!error) { + if (!actionOutput.error) { this.sendWorkflowNodeRunEvent(); } - const updatedStepOutput = { + const stepOutput: StepOutput = { id: step.id, - outputs: [ - ...(stepOutput?.outputs ?? []), - { - attemptCount, - result: result.result, - error, - }, - ], + output: actionOutput, }; - const updatedStepsOutput = { - ...workflowExecutorState.stepsOutput, - [step.id]: updatedStepOutput, - }; - - const updatedWorkflowExecutorState = { - ...workflowExecutorState, - stepsOutput: updatedStepsOutput, - }; - - if (result.result) { + if (actionOutput.result) { const updatedContext = { ...context, - [step.id]: result.result, + [step.id]: actionOutput.result, }; await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, - output: { - stepsOutput: updatedStepsOutput, - }, + stepOutput, context: updatedContext, }); @@ -121,16 +94,13 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex: currentStepIndex + 1, steps, context: updatedContext, - workflowExecutorState: updatedWorkflowExecutorState, }); } if (step.settings.errorHandlingOptions.continueOnFailure.value) { await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, - output: { - stepsOutput: updatedStepsOutput, - }, + stepOutput, context, }); @@ -139,7 +109,6 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex: currentStepIndex + 1, steps, context, - workflowExecutorState: updatedWorkflowExecutorState, }); } @@ -152,23 +121,17 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex, steps, context, - workflowExecutorState: updatedWorkflowExecutorState, attemptCount: attemptCount + 1, }); } await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, - output: { - stepsOutput: updatedStepsOutput, - }, + stepOutput, context, }); - return { - ...updatedWorkflowExecutorState, - status: WorkflowRunStatus.FAILED, - }; + return actionOutput; } private sendWorkflowNodeRunEvent() { diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index cafa53602..960e1ae90 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -67,20 +67,17 @@ export class RunWorkflowJob { await this.throttleExecution(workflowVersion.workflowId); - const { status } = await this.workflowExecutorWorkspaceService.execute({ + const { error } = await this.workflowExecutorWorkspaceService.execute({ workflowRunId, currentStepIndex: 0, - steps: workflowVersion.steps ?? [], + steps: workflowVersion.steps, context, - workflowExecutorState: { - stepsOutput: {}, - status: WorkflowRunStatus.RUNNING, - }, }); await this.workflowRunWorkspaceService.endWorkflowRun({ workflowRunId, - status, + status: error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED, + error, }); } catch (error) { await this.workflowRunWorkspaceService.endWorkflowRun({ diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 0845db6be..ce6103516 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -3,6 +3,7 @@ import { Injectable } from '@nestjs/common'; import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { + StepOutput, WorkflowRunOutput, WorkflowRunStatus, WorkflowRunWorkspaceEntity, @@ -125,11 +126,11 @@ export class WorkflowRunWorkspaceService { async saveWorkflowRunState({ workflowRunId, - output, + stepOutput, context, }: { workflowRunId: string; - output: Pick; + stepOutput: StepOutput; context: Record; }) { const workflowRunRepository = @@ -154,7 +155,10 @@ export class WorkflowRunWorkspaceService { trigger: undefined, steps: [], }, - ...output, + stepsOutput: { + ...(workflowRunToUpdate.output?.stepsOutput ?? {}), + [stepOutput.id]: stepOutput.output, + }, }, context, });