From 01805cc71c05f4360a00d18a4f937bea41b41623 Mon Sep 17 00:00:00 2001 From: martmull Date: Tue, 22 Jul 2025 17:51:03 +0200 Subject: [PATCH] 13227 workflow wrong completed workflowrun state when multiple branches (#13344) --- .../utils/getNodeVariantFromStepRunStatus.ts | 1 + ...workflow-version-step.workspace-service.ts | 13 +- .../types/workflow-executor-input.ts | 1 + .../__tests__/can-execute-step.util.spec.ts | 45 ++- .../workflow-should-fail.util.spec.ts | 30 ++ .../workflow-should-keep-running.util.spec.ts | 79 ++++ .../utils/can-execute-step.util.ts | 7 + .../utils/workflow-should-fail.util.ts | 17 + .../workflow-should-keep-running.util.ts | 38 ++ ...orkflow-executor.workspace-service.spec.ts | 251 +++--------- .../workflow-executor.workspace-service.ts | 374 ++++++++---------- .../workflow-run.workspace-service.ts | 53 +-- .../types/WorkflowRunStateStepInfos.ts | 1 + 13 files changed, 444 insertions(+), 466 deletions(-) create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/workflow-should-fail.util.spec.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/workflow-should-keep-running.util.spec.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/utils/workflow-should-fail.util.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util.ts diff --git a/packages/twenty-front/src/modules/workflow/workflow-diagram/utils/getNodeVariantFromStepRunStatus.ts b/packages/twenty-front/src/modules/workflow/workflow-diagram/utils/getNodeVariantFromStepRunStatus.ts index 15e6dbf65..01718a167 100644 --- a/packages/twenty-front/src/modules/workflow/workflow-diagram/utils/getNodeVariantFromStepRunStatus.ts +++ b/packages/twenty-front/src/modules/workflow/workflow-diagram/utils/getNodeVariantFromStepRunStatus.ts @@ -6,6 +6,7 @@ export const getNodeVariantFromStepRunStatus = ( ): WorkflowDiagramNodeVariant => { switch (runStatus) { case 'SUCCESS': + case 'STOPPED': return 'success'; case 'FAILED': return 'failure'; diff --git a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts index 215d75119..0aafd3653 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts @@ -21,7 +21,6 @@ import { WorkflowVersionStepException, WorkflowVersionStepExceptionCode, } from 'src/modules/workflow/common/exceptions/workflow-version-step.exception'; -import { StepOutput } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { assertWorkflowVersionIsDraft } from 'src/modules/workflow/common/utils/assert-workflow-version-is-draft.util'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; @@ -329,18 +328,14 @@ export class WorkflowVersionStepWorkspaceService { response, }); - const newStepOutput: StepOutput = { - id: stepId, - output: { + await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({ + stepId, + stepInfo: { + status: StepStatus.SUCCESS, result: enrichedResponse, }, - }; - - await this.workflowRunWorkspaceService.saveWorkflowRunState({ workspaceId, workflowRunId, - stepOutput: newStepOutput, - stepStatus: StepStatus.SUCCESS, }); await this.workflowRunnerWorkspaceService.resume({ diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts index ff44471f7..3721c8ecb 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts @@ -2,6 +2,7 @@ export type WorkflowExecutorInput = { stepIds: string[]; workflowRunId: string; workspaceId: string; + shouldComputeWorkflowRunStatus?: boolean; }; export type WorkflowBranchExecutorInput = { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts index b50dda8d1..6d0b62a03 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts @@ -5,6 +5,7 @@ import { WorkflowActionType, } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util'; +import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; describe('canExecuteStep', () => { const steps = [ @@ -56,7 +57,12 @@ describe('canExecuteStep', () => { }, }; - const result = canExecuteStep({ stepInfos, steps, stepId: 'step-3' }); + const result = canExecuteStep({ + stepInfos, + steps, + stepId: 'step-3', + workflowRunStatus: WorkflowRunStatus.RUNNING, + }); expect(result).toBe(true); }); @@ -77,6 +83,7 @@ describe('canExecuteStep', () => { }, steps, stepId: 'step-3', + workflowRunStatus: WorkflowRunStatus.RUNNING, }), ).toBe(false); @@ -95,6 +102,7 @@ describe('canExecuteStep', () => { }, steps, stepId: 'step-3', + workflowRunStatus: WorkflowRunStatus.RUNNING, }), ).toBe(false); @@ -113,6 +121,7 @@ describe('canExecuteStep', () => { }, steps, stepId: 'step-3', + workflowRunStatus: WorkflowRunStatus.RUNNING, }), ).toBe(false); }); @@ -133,6 +142,7 @@ describe('canExecuteStep', () => { }, steps, stepId: 'step-3', + workflowRunStatus: WorkflowRunStatus.RUNNING, }), ).toBe(false); @@ -151,6 +161,7 @@ describe('canExecuteStep', () => { }, steps, stepId: 'step-3', + workflowRunStatus: WorkflowRunStatus.RUNNING, }), ).toBe(false); @@ -169,6 +180,7 @@ describe('canExecuteStep', () => { }, steps, stepId: 'step-3', + workflowRunStatus: WorkflowRunStatus.RUNNING, }), ).toBe(false); @@ -187,7 +199,38 @@ describe('canExecuteStep', () => { }, steps, stepId: 'step-3', + workflowRunStatus: WorkflowRunStatus.RUNNING, }), ).toBe(false); }); + + it('should return false if workflowRun is not RUNNING', () => { + const stepInfos = { + 'step-1': { + status: StepStatus.SUCCESS, + }, + 'step-2': { + status: StepStatus.SUCCESS, + }, + 'step-3': { + status: StepStatus.NOT_STARTED, + }, + }; + + for (const workflowRunStatus of [ + WorkflowRunStatus.FAILED, + WorkflowRunStatus.ENQUEUED, + WorkflowRunStatus.COMPLETED, + WorkflowRunStatus.NOT_STARTED, + ]) { + const result = canExecuteStep({ + stepInfos, + steps, + stepId: 'step-3', + workflowRunStatus, + }); + + expect(result).toBe(false); + } + }); }); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/workflow-should-fail.util.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/workflow-should-fail.util.spec.ts new file mode 100644 index 000000000..2cfa2fc61 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/workflow-should-fail.util.spec.ts @@ -0,0 +1,30 @@ +import { StepStatus } from 'twenty-shared/workflow'; + +import { workflowShouldFail } from 'src/modules/workflow/workflow-executor/utils/workflow-should-fail.util'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +describe('workflowShouldFail', () => { + it('should return true if a failed step exists', () => { + const steps = [ + { + id: 'step-1', + } as WorkflowAction, + ]; + + const stepInfos = { 'step-1': { status: StepStatus.FAILED } }; + + expect(workflowShouldFail({ steps, stepInfos })).toBeTruthy(); + }); + + it('should return false if no failed step exists', () => { + const steps = [ + { + id: 'step-1', + } as WorkflowAction, + ]; + + const stepInfos = { 'step-1': { status: StepStatus.SUCCESS } }; + + expect(workflowShouldFail({ steps, stepInfos })).toBeFalsy(); + }); +}); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/workflow-should-keep-running.util.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/workflow-should-keep-running.util.spec.ts new file mode 100644 index 000000000..58f9c0a62 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/workflow-should-keep-running.util.spec.ts @@ -0,0 +1,79 @@ +import { StepStatus } from 'twenty-shared/workflow'; + +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { workflowShouldKeepRunning } from 'src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util'; + +describe('workflowShouldKeepRunning', () => { + describe('should return true if', () => { + it('running or pending step exists', () => { + for (const testStatus of [StepStatus.PENDING, StepStatus.RUNNING]) { + const steps = [ + { + id: 'step-1', + } as WorkflowAction, + ]; + + const stepInfos = { 'step-1': { status: testStatus } }; + + expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeTruthy(); + } + }); + + it('success step with not started executable children exists', () => { + const steps = [ + { + id: 'step-1', + nextStepIds: ['step-2'], + } as WorkflowAction, + { + id: 'step-2', + } as WorkflowAction, + ]; + + const stepInfos = { + 'step-1': { status: StepStatus.SUCCESS }, + 'step-2': { status: StepStatus.NOT_STARTED }, + }; + + expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeTruthy(); + }); + }); + + describe('should return false', () => { + it('workflow run only have success steps', () => { + const steps = [ + { + id: 'step-1', + } as WorkflowAction, + ]; + + const stepInfos = { 'step-1': { status: StepStatus.SUCCESS } }; + + expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeFalsy(); + }); + + it('success step with not executable not started children exists', () => { + const steps = [ + { + id: 'step-1', + nextStepIds: ['step-3'], + } as WorkflowAction, + { + id: 'step-2', + nextStepIds: ['step-3'], + } as WorkflowAction, + { + id: 'step-3', + } as WorkflowAction, + ]; + + const stepInfos = { + 'step-1': { status: StepStatus.SUCCESS }, + 'step-2': { status: StepStatus.FAILED }, + 'step-3': { status: StepStatus.NOT_STARTED }, + }; + + expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeFalsy(); + }); + }); +}); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.util.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.util.ts index 1f7a71e9b..f8875aafb 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.util.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.util.ts @@ -2,16 +2,23 @@ import { isDefined } from 'twenty-shared/utils'; import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; export const canExecuteStep = ({ stepId, steps, stepInfos, + workflowRunStatus, }: { steps: WorkflowAction[]; stepInfos: WorkflowRunStepInfos; stepId: string; + workflowRunStatus: WorkflowRunStatus; }) => { + if (workflowRunStatus !== WorkflowRunStatus.RUNNING) { + return false; + } + if ( isDefined(stepInfos[stepId]?.status) && stepInfos[stepId].status !== StepStatus.NOT_STARTED diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/workflow-should-fail.util.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/workflow-should-fail.util.ts new file mode 100644 index 000000000..d85f41ee5 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/workflow-should-fail.util.ts @@ -0,0 +1,17 @@ +import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow'; + +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export const workflowShouldFail = ({ + stepInfos, + steps, +}: { + stepInfos: WorkflowRunStepInfos; + steps: WorkflowAction[]; +}) => { + const failedSteps = steps.filter( + (step) => stepInfos[step.id]?.status === StepStatus.FAILED, + ); + + return failedSteps.length > 0; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util.ts new file mode 100644 index 000000000..9c9dd54fb --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util.ts @@ -0,0 +1,38 @@ +import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow'; + +import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; + +export const workflowShouldKeepRunning = ({ + stepInfos, + steps, +}: { + stepInfos: WorkflowRunStepInfos; + steps: WorkflowAction[]; +}) => { + const runningOrPendingStepExists = steps.some((step) => + [StepStatus.PENDING, StepStatus.RUNNING].includes( + stepInfos[step.id]?.status, + ), + ); + + const successStepWithNotStartedExecutableChildren = steps.some( + (step) => + stepInfos[step.id]?.status === StepStatus.SUCCESS && + (step.nextStepIds ?? []).some( + (nextStepId) => + stepInfos[nextStepId]?.status === StepStatus.NOT_STARTED && + canExecuteStep({ + stepId: nextStepId, + steps, + stepInfos, + workflowRunStatus: WorkflowRunStatus.RUNNING, + }), + ), + ); + + return ( + runningOrPendingStepExists || successStepWithNotStartedExecutableChildren + ); +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts index 5ccb7daaa..2829813c4 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts @@ -14,7 +14,6 @@ import { } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; -import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util'; jest.mock( @@ -47,9 +46,8 @@ describe('WorkflowExecutorWorkspaceService', () => { const mockWorkflowRunWorkspaceService = { endWorkflowRun: jest.fn(), - updateWorkflowRunStepStatus: jest.fn(), - saveWorkflowRunState: jest.fn(), - getWorkflowRun: jest.fn(), + updateWorkflowRunStepInfo: jest.fn(), + getWorkflowRunOrFail: jest.fn(), }; const mockBillingService = { @@ -125,11 +123,14 @@ describe('WorkflowExecutorWorkspaceService', () => { nextStepIds: [], }, ] as WorkflowAction[]; + const mockStepInfos = { trigger: { result: {}, status: StepStatus.SUCCESS }, + 'step-1': { status: StepStatus.NOT_STARTED }, + 'step-2': { status: StepStatus.NOT_STARTED }, }; - mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({ + mockWorkflowRunWorkspaceService.getWorkflowRunOrFail.mockReturnValue({ state: { flow: { steps: mockSteps }, stepInfos: mockStepInfos }, }); @@ -168,32 +169,30 @@ describe('WorkflowExecutorWorkspaceService', () => { ); expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledTimes(2); + workflowRunWorkspaceService.updateWorkflowRunStepInfo, + ).toHaveBeenCalledTimes(4); expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, + workflowRunWorkspaceService.updateWorkflowRunStepInfo, + ).toHaveBeenNthCalledWith(1, { stepId: 'step-1', + stepInfo: { + status: StepStatus.RUNNING, + }, + workflowRunId: mockWorkflowRunId, workspaceId: 'workspace-id', - stepStatus: StepStatus.RUNNING, }); expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(2); - - expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: mockStepResult, + workflowRunWorkspaceService.updateWorkflowRunStepInfo, + ).toHaveBeenNthCalledWith(2, { + stepId: 'step-1', + stepInfo: { + ...mockStepResult, + status: StepStatus.SUCCESS, }, + workflowRunId: mockWorkflowRunId, workspaceId: 'workspace-id', - stepStatus: StepStatus.SUCCESS, }); // execute second step @@ -216,34 +215,30 @@ describe('WorkflowExecutorWorkspaceService', () => { expect(workspaceEventEmitter.emitCustomBatchEvent).not.toHaveBeenCalled(); expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledTimes(1); + workflowRunWorkspaceService.updateWorkflowRunStepInfo, + ).toHaveBeenCalledTimes(2); expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, + workflowRunWorkspaceService.updateWorkflowRunStepInfo, + ).toHaveBeenNthCalledWith(1, { stepId: 'step-1', + stepInfo: { + status: StepStatus.RUNNING, + }, + workflowRunId: mockWorkflowRunId, workspaceId: 'workspace-id', - stepStatus: StepStatus.RUNNING, }); expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(1); - - expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: { - error: 'Step execution failed', - }, + workflowRunWorkspaceService.updateWorkflowRunStepInfo, + ).toHaveBeenNthCalledWith(2, { + stepId: 'step-1', + stepInfo: { + error: 'Step execution failed', + status: StepStatus.FAILED, }, + workflowRunId: mockWorkflowRunId, workspaceId: 'workspace-id', - stepStatus: StepStatus.FAILED, }); }); @@ -261,32 +256,29 @@ describe('WorkflowExecutorWorkspaceService', () => { }); expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledTimes(1); + workflowRunWorkspaceService.updateWorkflowRunStepInfo, + ).toHaveBeenCalledTimes(2); expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, + workflowRunWorkspaceService.updateWorkflowRunStepInfo, + ).toHaveBeenNthCalledWith(1, { stepId: 'step-1', + stepInfo: { + status: StepStatus.RUNNING, + }, + workflowRunId: mockWorkflowRunId, workspaceId: 'workspace-id', - stepStatus: StepStatus.RUNNING, }); expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(1); - - expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: mockPendingEvent, + workflowRunWorkspaceService.updateWorkflowRunStepInfo, + ).toHaveBeenNthCalledWith(2, { + stepId: 'step-1', + stepInfo: { + status: StepStatus.PENDING, }, + workflowRunId: mockWorkflowRunId, workspaceId: 'workspace-id', - stepStatus: StepStatus.PENDING, }); // No recursive call to execute should happen @@ -295,128 +287,6 @@ describe('WorkflowExecutorWorkspaceService', () => { ); }); - it('should continue to next step if continueOnFailure is true', async () => { - const stepsWithContinueOnFailure = [ - { - id: 'step-1', - type: WorkflowActionType.CODE, - settings: { - errorHandlingOptions: { - continueOnFailure: { value: true }, - retryOnFailure: { value: false }, - }, - }, - nextStepIds: ['step-2'], - }, - { - id: 'step-2', - type: WorkflowActionType.SEND_EMAIL, - settings: { - errorHandlingOptions: { - continueOnFailure: { value: false }, - retryOnFailure: { value: false }, - }, - }, - }, - ] as WorkflowAction[]; - - mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValueOnce({ - state: { - flow: { steps: stepsWithContinueOnFailure }, - stepInfos: mockStepInfos, - }, - }); - - mockWorkflowExecutor.execute.mockResolvedValueOnce({ - error: 'Step execution failed but continue', - }); - - await service.executeFromSteps({ - workflowRunId: mockWorkflowRunId, - stepIds: ['step-1'], - workspaceId: mockWorkspaceId, - }); - - expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledTimes(2); - - expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - stepId: 'step-1', - workspaceId: 'workspace-id', - stepStatus: StepStatus.RUNNING, - }); - - expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(2); - - expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: { - error: 'Step execution failed but continue', - }, - }, - workspaceId: 'workspace-id', - stepStatus: StepStatus.FAILED, - }); - - // execute second step - expect(workflowActionFactory.get).toHaveBeenCalledWith( - WorkflowActionType.SEND_EMAIL, - ); - }); - - it('should retry on failure if retryOnFailure is true', async () => { - const stepsWithRetryOnFailure = [ - { - id: 'step-1', - type: WorkflowActionType.CODE, - settings: { - errorHandlingOptions: { - continueOnFailure: { value: false }, - retryOnFailure: { value: true }, - }, - }, - }, - ] as WorkflowAction[]; - - mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({ - state: { - flow: { steps: stepsWithRetryOnFailure }, - stepInfos: mockStepInfos, - }, - }); - - mockWorkflowExecutor.execute.mockResolvedValue({ - error: 'Step execution failed, will retry', - }); - - await service.executeFromSteps({ - workflowRunId: mockWorkflowRunId, - stepIds: ['step-1'], - workspaceId: mockWorkspaceId, - }); - - for (let attempt = 1; attempt <= 3; attempt++) { - expect(workflowActionFactory.get).toHaveBeenNthCalledWith( - attempt, - WorkflowActionType.CODE, - ); - } - - expect(workflowActionFactory.get).not.toHaveBeenCalledWith( - WorkflowActionType.SEND_EMAIL, - ); - }); - it('should stop when billing validation fails', async () => { mockBillingService.isBillingEnabled.mockReturnValueOnce(true); mockBillingService.canBillMeteredProduct.mockReturnValueOnce(false); @@ -430,7 +300,7 @@ describe('WorkflowExecutorWorkspaceService', () => { expect(workflowActionFactory.get).toHaveBeenCalledTimes(0); expect( - workflowRunWorkspaceService.saveWorkflowRunState, + workflowRunWorkspaceService.updateWorkflowRunStepInfo, ).toHaveBeenCalledTimes(1); expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledTimes( @@ -438,24 +308,15 @@ describe('WorkflowExecutorWorkspaceService', () => { ); expect( - workflowRunWorkspaceService.saveWorkflowRunState, + workflowRunWorkspaceService.updateWorkflowRunStepInfo, ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - workspaceId: 'workspace-id', - stepOutput: { - id: 'step-1', - output: { - error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, - }, + stepId: 'step-1', + stepInfo: { + error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, + status: StepStatus.FAILED, }, - stepStatus: StepStatus.FAILED, - }); - - expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, workspaceId: 'workspace-id', - status: WorkflowRunStatus.FAILED, - error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, }); }); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index 7ca405d72..7fe0e3d0f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -2,18 +2,15 @@ import { Injectable } from '@nestjs/common'; import { isDefined } from 'twenty-shared/utils'; import { getWorkflowRunContext, StepStatus } from 'twenty-shared/workflow'; +import { WorkflowRunStepInfo } from 'twenty-shared/src/workflow/types/WorkflowRunStateStepInfos'; import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant'; -import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant'; import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum'; import { BillingService } from 'src/engine/core-modules/billing/services/billing.service'; import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type'; import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; -import { - StepOutput, - WorkflowRunStatus, -} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { @@ -22,8 +19,9 @@ import { } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; - -const MAX_RETRIES_ON_FAILURE = 3; +import { workflowShouldKeepRunning } from 'src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util'; +import { workflowShouldFail } from 'src/modules/workflow/workflow-executor/utils/workflow-should-fail.util'; +import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant'; @Injectable() export class WorkflowExecutorWorkspaceService { @@ -38,6 +36,7 @@ export class WorkflowExecutorWorkspaceService { stepIds, workflowRunId, workspaceId, + shouldComputeWorkflowRunStatus = true, }: WorkflowExecutorInput) { await Promise.all( stepIds.map(async (stepIdToExecute) => { @@ -48,187 +47,27 @@ export class WorkflowExecutorWorkspaceService { }); }), ); + + if (shouldComputeWorkflowRunStatus) { + await this.computeWorkflowRunStatus({ + workflowRunId, + workspaceId, + }); + } } private async executeFromStep({ stepId, - attemptCount = 1, workflowRunId, workspaceId, }: WorkflowBranchExecutorInput) { - const workflowRunInfo = await this.getWorkflowRunInfoOrEndWorkflowRun({ - stepId, - workflowRunId, - workspaceId, - }); - - if (!isDefined(workflowRunInfo)) { - return; - } - - const { stepToExecute, steps, stepInfos } = workflowRunInfo; - - if (!canExecuteStep({ stepId, steps, stepInfos })) { - return; - } - - const checkCanBillWorkflowNodeExecution = - await this.checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({ - stepIdToExecute: stepToExecute.id, + const workflowRun = + await this.workflowRunWorkspaceService.getWorkflowRunOrFail({ workflowRunId, workspaceId, }); - if (!checkCanBillWorkflowNodeExecution) { - return; - } - - const workflowAction = this.workflowActionFactory.get(stepToExecute.type); - - let actionOutput: WorkflowActionOutput; - - await this.workflowRunWorkspaceService.updateWorkflowRunStepStatus({ - workflowRunId, - stepId: stepToExecute.id, - workspaceId, - stepStatus: StepStatus.RUNNING, - }); - - try { - actionOutput = await workflowAction.execute({ - currentStepId: stepId, - steps, - context: getWorkflowRunContext(stepInfos), - }); - } catch (error) { - actionOutput = { - error: error.message ?? 'Execution result error, no data or error', - }; - } - - if (!actionOutput.error) { - this.sendWorkflowNodeRunEvent(workspaceId); - } - - const stepOutput: StepOutput = { - id: stepToExecute.id, - output: actionOutput, - }; - - if (actionOutput.pendingEvent) { - await this.workflowRunWorkspaceService.saveWorkflowRunState({ - workflowRunId, - stepOutput, - workspaceId, - stepStatus: StepStatus.PENDING, - }); - - return; - } - - const actionOutputSuccess = isDefined(actionOutput.result); - - const isValidActionOutput = - actionOutputSuccess || - stepToExecute.settings.errorHandlingOptions.continueOnFailure.value; - - if (isValidActionOutput) { - await this.workflowRunWorkspaceService.saveWorkflowRunState({ - workflowRunId, - stepOutput, - workspaceId, - stepStatus: isDefined(actionOutput.result) - ? StepStatus.SUCCESS - : StepStatus.FAILED, - }); - - if ( - !isDefined(stepToExecute.nextStepIds) || - stepToExecute.nextStepIds.length === 0 || - actionOutput.shouldEndWorkflowRun === true - ) { - await this.workflowRunWorkspaceService.endWorkflowRun({ - workflowRunId, - workspaceId, - status: WorkflowRunStatus.COMPLETED, - }); - - return; - } - - await this.executeFromSteps({ - stepIds: stepToExecute.nextStepIds, - workflowRunId, - workspaceId, - }); - - return; - } - - if ( - stepToExecute.settings.errorHandlingOptions.retryOnFailure.value && - attemptCount < MAX_RETRIES_ON_FAILURE - ) { - await this.executeFromStep({ - stepId, - attemptCount: attemptCount + 1, - workflowRunId, - workspaceId, - }); - - return; - } - - await this.workflowRunWorkspaceService.saveWorkflowRunState({ - workflowRunId, - stepOutput, - workspaceId, - stepStatus: StepStatus.FAILED, - }); - - await this.workflowRunWorkspaceService.endWorkflowRun({ - workflowRunId, - workspaceId, - status: WorkflowRunStatus.FAILED, - error: stepOutput.output.error, - }); - } - - private async getWorkflowRunInfoOrEndWorkflowRun({ - stepId, - workflowRunId, - workspaceId, - }: { - stepId: string; - workflowRunId: string; - workspaceId: string; - }) { - const workflowRun = await this.workflowRunWorkspaceService.getWorkflowRun({ - workflowRunId, - workspaceId, - }); - - if (!isDefined(workflowRun)) { - await this.workflowRunWorkspaceService.endWorkflowRun({ - workflowRunId, - workspaceId, - status: WorkflowRunStatus.FAILED, - error: `WorkflowRun ${workflowRunId} not found`, - }); - - return; - } - - if (!isDefined(workflowRun?.state)) { - await this.workflowRunWorkspaceService.endWorkflowRun({ - workflowRunId, - workspaceId, - status: WorkflowRunStatus.FAILED, - error: `WorkflowRun ${workflowRunId} doesn't have any state`, - }); - - return; - } + const stepInfos = workflowRun.state.stepInfos; const steps = workflowRun.state.flow.steps; @@ -245,11 +84,142 @@ export class WorkflowExecutorWorkspaceService { return; } - return { - stepToExecute, - steps, - stepInfos: workflowRun.state.stepInfos, - }; + if ( + !canExecuteStep({ + stepId, + steps, + stepInfos, + workflowRunStatus: workflowRun.status, + }) + ) { + return; + } + + let actionOutput: WorkflowActionOutput; + + if (await this.canBillWorkflowNodeExecution(workspaceId)) { + const workflowAction = this.workflowActionFactory.get(stepToExecute.type); + + await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({ + stepId, + stepInfo: { + status: StepStatus.RUNNING, + }, + workflowRunId, + workspaceId, + }); + + try { + actionOutput = await workflowAction.execute({ + currentStepId: stepId, + steps, + context: getWorkflowRunContext(stepInfos), + }); + } catch (error) { + actionOutput = { + error: error.message ?? 'Execution result error, no data or error', + }; + } + } else { + actionOutput = { + error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, + }; + } + + const isPendingEvent = actionOutput.pendingEvent; + + const isSuccess = isDefined(actionOutput.result); + + const isError = isDefined(actionOutput.error); + + const isStopped = actionOutput.shouldEndWorkflowRun; + + if (!isError) { + this.sendWorkflowNodeRunEvent(workspaceId); + } + + let stepInfo: WorkflowRunStepInfo; + + if (isPendingEvent) { + stepInfo = { + status: StepStatus.PENDING, + }; + } else if (isStopped) { + stepInfo = { + status: StepStatus.STOPPED, + result: actionOutput?.result, + }; + } else if (isSuccess) { + stepInfo = { + status: StepStatus.SUCCESS, + result: actionOutput?.result, + }; + } else { + stepInfo = { + status: StepStatus.FAILED, + error: actionOutput?.error, + }; + } + + await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({ + stepId, + stepInfo, + workflowRunId, + workspaceId, + }); + + if ( + isSuccess && + !isStopped && + isDefined(stepToExecute.nextStepIds) && + stepToExecute.nextStepIds.length > 0 + ) { + await this.executeFromSteps({ + stepIds: stepToExecute.nextStepIds, + workflowRunId, + workspaceId, + shouldComputeWorkflowRunStatus: false, + }); + } + } + + private async computeWorkflowRunStatus({ + workflowRunId, + workspaceId, + }: { + workflowRunId: string; + workspaceId: string; + }) { + const workflowRun = + await this.workflowRunWorkspaceService.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, + }); + + const stepInfos = workflowRun.state.stepInfos; + + const steps = workflowRun.state.flow.steps; + + if (workflowShouldKeepRunning({ stepInfos, steps })) { + return; + } + + if (workflowShouldFail({ stepInfos, steps })) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: 'WorkflowRun failed', + }); + + return; + } + + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.COMPLETED, + }); } private sendWorkflowNodeRunEvent(workspaceId: string) { @@ -265,45 +235,13 @@ export class WorkflowExecutorWorkspaceService { ); } - private async checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({ - stepIdToExecute, - workflowRunId, - workspaceId, - }: { - stepIdToExecute: string; - workflowRunId: string; - workspaceId: string; - }) { - const canBillWorkflowNodeExecution = + private async canBillWorkflowNodeExecution(workspaceId: string) { + return ( !this.billingService.isBillingEnabled() || (await this.billingService.canBillMeteredProduct( workspaceId, BillingProductKey.WORKFLOW_NODE_EXECUTION, - )); - - if (!canBillWorkflowNodeExecution) { - const billingOutput = { - error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, - }; - - await this.workflowRunWorkspaceService.saveWorkflowRunState({ - workspaceId, - workflowRunId, - stepOutput: { - id: stepIdToExecute, - output: billingOutput, - }, - stepStatus: StepStatus.FAILED, - }); - - await this.workflowRunWorkspaceService.endWorkflowRun({ - workflowRunId, - workspaceId, - status: WorkflowRunStatus.FAILED, - error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, - }); - } - - return canBillWorkflowNodeExecution; + )) + ); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 0b57e7a0e..2833844b5 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -4,6 +4,7 @@ import { isDefined } from 'twenty-shared/utils'; import { StepStatus } from 'twenty-shared/workflow'; import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'; import { v4 } from 'uuid'; +import { WorkflowRunStepInfo } from 'twenty-shared/src/workflow/types/WorkflowRunStateStepInfos'; import { WithLock } from 'src/engine/core-modules/cache-lock/with-lock.decorator'; import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service'; @@ -13,7 +14,6 @@ import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/compos import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { - StepOutput, WorkflowRunState, WorkflowRunStatus, WorkflowRunWorkspaceEntity, @@ -205,16 +205,16 @@ export class WorkflowRunWorkspaceService { } @WithLock('workflowRunId') - async updateWorkflowRunStepStatus({ + async updateWorkflowRunStepInfo({ + stepId, + stepInfo, workflowRunId, workspaceId, - stepId, - stepStatus, }: { - workflowRunId: string; stepId: string; + stepInfo: WorkflowRunStepInfo; + workflowRunId: string; workspaceId: string; - stepStatus: StepStatus; }) { const workflowRunToUpdate = await this.getWorkflowRunOrFail({ workflowRunId, @@ -227,43 +227,10 @@ export class WorkflowRunWorkspaceService { stepInfos: { ...workflowRunToUpdate.state?.stepInfos, [stepId]: { - ...(workflowRunToUpdate.state?.stepInfos?.[stepId] || {}), - status: stepStatus, - }, - }, - }, - }; - - await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); - } - - @WithLock('workflowRunId') - async saveWorkflowRunState({ - workflowRunId, - stepOutput, - workspaceId, - stepStatus, - }: { - workflowRunId: string; - stepOutput: StepOutput; - workspaceId: string; - stepStatus: StepStatus; - }) { - const workflowRunToUpdate = await this.getWorkflowRunOrFail({ - workflowRunId, - workspaceId, - }); - - const partialUpdate = { - state: { - ...workflowRunToUpdate.state, - stepInfos: { - ...workflowRunToUpdate.state?.stepInfos, - [stepOutput.id]: { - ...(workflowRunToUpdate.state?.stepInfos[stepOutput.id] || {}), - result: stepOutput.output?.result, - error: stepOutput.output?.error, - status: stepStatus, + ...(workflowRunToUpdate.state?.stepInfos[stepId] || {}), + result: stepInfo?.result, + error: stepInfo?.error, + status: stepInfo.status, }, }, }, diff --git a/packages/twenty-shared/src/workflow/types/WorkflowRunStateStepInfos.ts b/packages/twenty-shared/src/workflow/types/WorkflowRunStateStepInfos.ts index 2719211be..fcb1d6ea9 100644 --- a/packages/twenty-shared/src/workflow/types/WorkflowRunStateStepInfos.ts +++ b/packages/twenty-shared/src/workflow/types/WorkflowRunStateStepInfos.ts @@ -2,6 +2,7 @@ export enum StepStatus { NOT_STARTED = 'NOT_STARTED', RUNNING = 'RUNNING', SUCCESS = 'SUCCESS', + STOPPED = 'STOPPED', FAILED = 'FAILED', PENDING = 'PENDING', }