diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index f426e485b..dd558c076 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -22,6 +22,8 @@ import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/f import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { WorkflowTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; export enum WorkflowRunStatus { NOT_STARTED = 'NOT_STARTED', @@ -32,8 +34,6 @@ export enum WorkflowRunStatus { type StepRunOutput = { id: string; - name: string; - type: string; outputs: { attemptCount: number; result: object | undefined; @@ -42,7 +42,11 @@ type StepRunOutput = { }; export type WorkflowRunOutput = { - steps: Record; + flow: { + trigger: WorkflowTrigger; + steps: WorkflowAction[]; + }; + stepsOutput: Record; error?: string; }; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index 0a12fbc65..0f96f07d5 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant'; import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; @@ -17,14 +17,13 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne const MAX_RETRIES_ON_FAILURE = 3; -export type WorkflowExecutorOutput = { - steps: WorkflowRunOutput['steps']; +export type WorkflowExecutorState = { + stepsOutput: WorkflowRunOutput['stepsOutput']; status: WorkflowRunStatus; }; @Injectable() export class WorkflowExecutorWorkspaceService { - private readonly logger = new Logger(WorkflowExecutorWorkspaceService.name); constructor( private readonly workflowActionFactory: WorkflowActionFactory, private readonly workspaceEventEmitter: WorkspaceEventEmitter, @@ -36,19 +35,19 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex, steps, context, - workflowExecutorOutput, + workflowExecutorState, attemptCount = 1, workflowRunId, }: { currentStepIndex: number; steps: WorkflowAction[]; - workflowExecutorOutput: WorkflowExecutorOutput; + workflowExecutorState: WorkflowExecutorState; context: Record; attemptCount?: number; workflowRunId: string; - }): Promise { + }): Promise { if (currentStepIndex >= steps.length) { - return { ...workflowExecutorOutput, status: WorkflowRunStatus.COMPLETED }; + return { ...workflowExecutorState, status: WorkflowRunStatus.COMPLETED }; } const step = steps[currentStepIndex]; @@ -71,7 +70,7 @@ export class WorkflowExecutorWorkspaceService { }; } - const stepOutput = workflowExecutorOutput.steps[step.id]; + const stepOutput = workflowExecutorState.stepsOutput[step.id]; const error = result.error?.errorMessage ?? @@ -83,8 +82,6 @@ export class WorkflowExecutorWorkspaceService { const updatedStepOutput = { id: step.id, - name: step.name, - type: step.type, outputs: [ ...(stepOutput?.outputs ?? []), { @@ -95,14 +92,14 @@ export class WorkflowExecutorWorkspaceService { ], }; - const updatedOutputSteps = { - ...workflowExecutorOutput.steps, + const updatedStepsOutput = { + ...workflowExecutorState.stepsOutput, [step.id]: updatedStepOutput, }; - const updatedWorkflowExecutorOutput = { - ...workflowExecutorOutput, - steps: updatedOutputSteps, + const updatedWorkflowExecutorState = { + ...workflowExecutorState, + stepsOutput: updatedStepsOutput, }; if (result.result) { @@ -114,7 +111,7 @@ export class WorkflowExecutorWorkspaceService { await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, output: { - steps: updatedOutputSteps, + stepsOutput: updatedStepsOutput, }, context: updatedContext, }); @@ -124,7 +121,7 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex: currentStepIndex + 1, steps, context: updatedContext, - workflowExecutorOutput: updatedWorkflowExecutorOutput, + workflowExecutorState: updatedWorkflowExecutorState, }); } @@ -132,7 +129,7 @@ export class WorkflowExecutorWorkspaceService { await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, output: { - steps: updatedOutputSteps, + stepsOutput: updatedStepsOutput, }, context, }); @@ -142,7 +139,7 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex: currentStepIndex + 1, steps, context, - workflowExecutorOutput: updatedWorkflowExecutorOutput, + workflowExecutorState: updatedWorkflowExecutorState, }); } @@ -155,7 +152,7 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex, steps, context, - workflowExecutorOutput: updatedWorkflowExecutorOutput, + workflowExecutorState: updatedWorkflowExecutorState, attemptCount: attemptCount + 1, }); } @@ -163,13 +160,13 @@ export class WorkflowExecutorWorkspaceService { await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, output: { - steps: updatedOutputSteps, + stepsOutput: updatedStepsOutput, }, context, }); return { - ...updatedWorkflowExecutorOutput, + ...updatedWorkflowExecutorState, status: WorkflowRunStatus.FAILED, }; } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts index f47781bc1..ecace5fb8 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts @@ -11,4 +11,5 @@ export enum WorkflowRunExceptionCode { INVALID_OPERATION = 'INVALID_OPERATION', INVALID_INPUT = 'INVALID_INPUT', WORKFLOW_RUN_LIMIT_REACHED = 'WORKFLOW_RUN_LIMIT_REACHED', + WORKFLOW_RUN_INVALID = 'WORKFLOW_RUN_INVALID', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index b9b9a0ab0..cafa53602 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -1,4 +1,4 @@ -import { Logger, Scope } from '@nestjs/common'; +import { Scope } from '@nestjs/common'; import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service'; import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; @@ -23,7 +23,6 @@ export type RunWorkflowJobData = { @Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class RunWorkflowJob { - private readonly logger = new Logger(RunWorkflowJob.name); constructor( private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService, private readonly workflowExecutorWorkspaceService: WorkflowExecutorWorkspaceService, @@ -42,26 +41,39 @@ export class RunWorkflowJob { trigger: payload, }; - await this.workflowRunWorkspaceService.startWorkflowRun({ - workflowRunId, - context, - }); - try { const workflowVersion = await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail( workflowVersionId, ); + if (!workflowVersion.trigger || !workflowVersion.steps) { + throw new WorkflowRunException( + 'Workflow version has no trigger or steps', + WorkflowRunExceptionCode.WORKFLOW_RUN_INVALID, + ); + } + + await this.workflowRunWorkspaceService.startWorkflowRun({ + workflowRunId, + context, + output: { + flow: { + trigger: workflowVersion.trigger, + steps: workflowVersion.steps, + }, + }, + }); + await this.throttleExecution(workflowVersion.workflowId); const { status } = await this.workflowExecutorWorkspaceService.execute({ workflowRunId, currentStepIndex: 0, - steps: workflowVersion.steps || [], + steps: workflowVersion.steps ?? [], context, - workflowExecutorOutput: { - steps: {}, + workflowExecutorState: { + stepsOutput: {}, status: WorkflowRunStatus.RUNNING, }, }); diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 93d09b67a..f1eb0791c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -51,9 +51,11 @@ export class WorkflowRunWorkspaceService { async startWorkflowRun({ workflowRunId, context, + output, }: { workflowRunId: string; context: Record; + output: Pick; }) { const workflowRunRepository = await this.twentyORMManager.getRepository( @@ -82,6 +84,7 @@ export class WorkflowRunWorkspaceService { status: WorkflowRunStatus.RUNNING, startedAt: new Date().toISOString(), context, + output, }); } @@ -133,7 +136,7 @@ export class WorkflowRunWorkspaceService { context, }: { workflowRunId: string; - output: WorkflowRunOutput; + output: Pick; context: Record; }) { const workflowRunRepository = @@ -153,7 +156,13 @@ export class WorkflowRunWorkspaceService { } return workflowRunRepository.update(workflowRunId, { - output, + output: { + flow: workflowRunToUpdate.output?.flow ?? { + trigger: undefined, + steps: [], + }, + ...output, + }, context, }); }