From 017280384ab5a007b9cf2069c5a09bd3e19a02ac Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Fri, 14 Feb 2025 15:20:27 +0100 Subject: [PATCH] Add flow to run output (#10220) We need the version trigger and steps to be stored in the output. We should not rely on the version itself because some run are made on draft versions. Which means versions could be edited afterwards. --- .../workflow-run.workspace-entity.ts | 10 +++-- .../workflow-executor.workspace-service.ts | 43 +++++++++---------- .../exceptions/workflow-run.exception.ts | 1 + .../workflow-runner/jobs/run-workflow.job.ts | 32 +++++++++----- .../workflow-run.workspace-service.ts | 13 +++++- 5 files changed, 61 insertions(+), 38 deletions(-) diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index f426e485b..dd558c076 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -22,6 +22,8 @@ import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/f import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { WorkflowTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; export enum WorkflowRunStatus { NOT_STARTED = 'NOT_STARTED', @@ -32,8 +34,6 @@ export enum WorkflowRunStatus { type StepRunOutput = { id: string; - name: string; - type: string; outputs: { attemptCount: number; result: object | undefined; @@ -42,7 +42,11 @@ type StepRunOutput = { }; export type WorkflowRunOutput = { - steps: Record; + flow: { + trigger: WorkflowTrigger; + steps: WorkflowAction[]; + }; + stepsOutput: Record; error?: string; }; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index 0a12fbc65..0f96f07d5 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant'; import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; @@ -17,14 +17,13 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne const MAX_RETRIES_ON_FAILURE = 3; -export type WorkflowExecutorOutput = { - steps: WorkflowRunOutput['steps']; +export type WorkflowExecutorState = { + stepsOutput: WorkflowRunOutput['stepsOutput']; status: WorkflowRunStatus; }; @Injectable() export class WorkflowExecutorWorkspaceService { - private readonly logger = new Logger(WorkflowExecutorWorkspaceService.name); constructor( private readonly workflowActionFactory: WorkflowActionFactory, private readonly workspaceEventEmitter: WorkspaceEventEmitter, @@ -36,19 +35,19 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex, steps, context, - workflowExecutorOutput, + workflowExecutorState, attemptCount = 1, workflowRunId, }: { currentStepIndex: number; steps: WorkflowAction[]; - workflowExecutorOutput: WorkflowExecutorOutput; + workflowExecutorState: WorkflowExecutorState; context: Record; attemptCount?: number; workflowRunId: string; - }): Promise { + }): Promise { if (currentStepIndex >= steps.length) { - return { ...workflowExecutorOutput, status: WorkflowRunStatus.COMPLETED }; + return { ...workflowExecutorState, status: WorkflowRunStatus.COMPLETED }; } const step = steps[currentStepIndex]; @@ -71,7 +70,7 @@ export class WorkflowExecutorWorkspaceService { }; } - const stepOutput = workflowExecutorOutput.steps[step.id]; + const stepOutput = workflowExecutorState.stepsOutput[step.id]; const error = result.error?.errorMessage ?? @@ -83,8 +82,6 @@ export class WorkflowExecutorWorkspaceService { const updatedStepOutput = { id: step.id, - name: step.name, - type: step.type, outputs: [ ...(stepOutput?.outputs ?? []), { @@ -95,14 +92,14 @@ export class WorkflowExecutorWorkspaceService { ], }; - const updatedOutputSteps = { - ...workflowExecutorOutput.steps, + const updatedStepsOutput = { + ...workflowExecutorState.stepsOutput, [step.id]: updatedStepOutput, }; - const updatedWorkflowExecutorOutput = { - ...workflowExecutorOutput, - steps: updatedOutputSteps, + const updatedWorkflowExecutorState = { + ...workflowExecutorState, + stepsOutput: updatedStepsOutput, }; if (result.result) { @@ -114,7 +111,7 @@ export class WorkflowExecutorWorkspaceService { await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, output: { - steps: updatedOutputSteps, + stepsOutput: updatedStepsOutput, }, context: updatedContext, }); @@ -124,7 +121,7 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex: currentStepIndex + 1, steps, context: updatedContext, - workflowExecutorOutput: updatedWorkflowExecutorOutput, + workflowExecutorState: updatedWorkflowExecutorState, }); } @@ -132,7 +129,7 @@ export class WorkflowExecutorWorkspaceService { await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, output: { - steps: updatedOutputSteps, + stepsOutput: updatedStepsOutput, }, context, }); @@ -142,7 +139,7 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex: currentStepIndex + 1, steps, context, - workflowExecutorOutput: updatedWorkflowExecutorOutput, + workflowExecutorState: updatedWorkflowExecutorState, }); } @@ -155,7 +152,7 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex, steps, context, - workflowExecutorOutput: updatedWorkflowExecutorOutput, + workflowExecutorState: updatedWorkflowExecutorState, attemptCount: attemptCount + 1, }); } @@ -163,13 +160,13 @@ export class WorkflowExecutorWorkspaceService { await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, output: { - steps: updatedOutputSteps, + stepsOutput: updatedStepsOutput, }, context, }); return { - ...updatedWorkflowExecutorOutput, + ...updatedWorkflowExecutorState, status: WorkflowRunStatus.FAILED, }; } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts index f47781bc1..ecace5fb8 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts @@ -11,4 +11,5 @@ export enum WorkflowRunExceptionCode { INVALID_OPERATION = 'INVALID_OPERATION', INVALID_INPUT = 'INVALID_INPUT', WORKFLOW_RUN_LIMIT_REACHED = 'WORKFLOW_RUN_LIMIT_REACHED', + WORKFLOW_RUN_INVALID = 'WORKFLOW_RUN_INVALID', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index b9b9a0ab0..cafa53602 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -1,4 +1,4 @@ -import { Logger, Scope } from '@nestjs/common'; +import { Scope } from '@nestjs/common'; import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service'; import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; @@ -23,7 +23,6 @@ export type RunWorkflowJobData = { @Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class RunWorkflowJob { - private readonly logger = new Logger(RunWorkflowJob.name); constructor( private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService, private readonly workflowExecutorWorkspaceService: WorkflowExecutorWorkspaceService, @@ -42,26 +41,39 @@ export class RunWorkflowJob { trigger: payload, }; - await this.workflowRunWorkspaceService.startWorkflowRun({ - workflowRunId, - context, - }); - try { const workflowVersion = await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail( workflowVersionId, ); + if (!workflowVersion.trigger || !workflowVersion.steps) { + throw new WorkflowRunException( + 'Workflow version has no trigger or steps', + WorkflowRunExceptionCode.WORKFLOW_RUN_INVALID, + ); + } + + await this.workflowRunWorkspaceService.startWorkflowRun({ + workflowRunId, + context, + output: { + flow: { + trigger: workflowVersion.trigger, + steps: workflowVersion.steps, + }, + }, + }); + await this.throttleExecution(workflowVersion.workflowId); const { status } = await this.workflowExecutorWorkspaceService.execute({ workflowRunId, currentStepIndex: 0, - steps: workflowVersion.steps || [], + steps: workflowVersion.steps ?? [], context, - workflowExecutorOutput: { - steps: {}, + workflowExecutorState: { + stepsOutput: {}, status: WorkflowRunStatus.RUNNING, }, }); diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 93d09b67a..f1eb0791c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -51,9 +51,11 @@ export class WorkflowRunWorkspaceService { async startWorkflowRun({ workflowRunId, context, + output, }: { workflowRunId: string; context: Record; + output: Pick; }) { const workflowRunRepository = await this.twentyORMManager.getRepository( @@ -82,6 +84,7 @@ export class WorkflowRunWorkspaceService { status: WorkflowRunStatus.RUNNING, startedAt: new Date().toISOString(), context, + output, }); } @@ -133,7 +136,7 @@ export class WorkflowRunWorkspaceService { context, }: { workflowRunId: string; - output: WorkflowRunOutput; + output: Pick; context: Record; }) { const workflowRunRepository = @@ -153,7 +156,13 @@ export class WorkflowRunWorkspaceService { } return workflowRunRepository.update(workflowRunId, { - output, + output: { + flow: workflowRunToUpdate.output?.flow ?? { + trigger: undefined, + steps: [], + }, + ...output, + }, context, }); }