13227 workflow wrong completed workflowrun state when multiple branches (#13344)

This commit is contained in:
martmull
2025-07-22 17:51:03 +02:00
committed by GitHub
parent dc617177a9
commit 01805cc71c
13 changed files with 444 additions and 466 deletions

View File

@ -6,6 +6,7 @@ export const getNodeVariantFromStepRunStatus = (
): WorkflowDiagramNodeVariant => {
switch (runStatus) {
case 'SUCCESS':
case 'STOPPED':
return 'success';
case 'FAILED':
return 'failure';

View File

@ -21,7 +21,6 @@ import {
WorkflowVersionStepException,
WorkflowVersionStepExceptionCode,
} from 'src/modules/workflow/common/exceptions/workflow-version-step.exception';
import { StepOutput } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { assertWorkflowVersionIsDraft } from 'src/modules/workflow/common/utils/assert-workflow-version-is-draft.util';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
@ -329,18 +328,14 @@ export class WorkflowVersionStepWorkspaceService {
response,
});
const newStepOutput: StepOutput = {
id: stepId,
output: {
await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({
stepId,
stepInfo: {
status: StepStatus.SUCCESS,
result: enrichedResponse,
},
};
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workspaceId,
workflowRunId,
stepOutput: newStepOutput,
stepStatus: StepStatus.SUCCESS,
});
await this.workflowRunnerWorkspaceService.resume({

View File

@ -2,6 +2,7 @@ export type WorkflowExecutorInput = {
stepIds: string[];
workflowRunId: string;
workspaceId: string;
shouldComputeWorkflowRunStatus?: boolean;
};
export type WorkflowBranchExecutorInput = {

View File

@ -5,6 +5,7 @@ import {
WorkflowActionType,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
describe('canExecuteStep', () => {
const steps = [
@ -56,7 +57,12 @@ describe('canExecuteStep', () => {
},
};
const result = canExecuteStep({ stepInfos, steps, stepId: 'step-3' });
const result = canExecuteStep({
stepInfos,
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
});
expect(result).toBe(true);
});
@ -77,6 +83,7 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
@ -95,6 +102,7 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
@ -113,6 +121,7 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
});
@ -133,6 +142,7 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
@ -151,6 +161,7 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
@ -169,6 +180,7 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
@ -187,7 +199,38 @@ describe('canExecuteStep', () => {
},
steps,
stepId: 'step-3',
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
).toBe(false);
});
it('should return false if workflowRun is not RUNNING', () => {
const stepInfos = {
'step-1': {
status: StepStatus.SUCCESS,
},
'step-2': {
status: StepStatus.SUCCESS,
},
'step-3': {
status: StepStatus.NOT_STARTED,
},
};
for (const workflowRunStatus of [
WorkflowRunStatus.FAILED,
WorkflowRunStatus.ENQUEUED,
WorkflowRunStatus.COMPLETED,
WorkflowRunStatus.NOT_STARTED,
]) {
const result = canExecuteStep({
stepInfos,
steps,
stepId: 'step-3',
workflowRunStatus,
});
expect(result).toBe(false);
}
});
});

View File

@ -0,0 +1,30 @@
import { StepStatus } from 'twenty-shared/workflow';
import { workflowShouldFail } from 'src/modules/workflow/workflow-executor/utils/workflow-should-fail.util';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
describe('workflowShouldFail', () => {
it('should return true if a failed step exists', () => {
const steps = [
{
id: 'step-1',
} as WorkflowAction,
];
const stepInfos = { 'step-1': { status: StepStatus.FAILED } };
expect(workflowShouldFail({ steps, stepInfos })).toBeTruthy();
});
it('should return false if no failed step exists', () => {
const steps = [
{
id: 'step-1',
} as WorkflowAction,
];
const stepInfos = { 'step-1': { status: StepStatus.SUCCESS } };
expect(workflowShouldFail({ steps, stepInfos })).toBeFalsy();
});
});

View File

@ -0,0 +1,79 @@
import { StepStatus } from 'twenty-shared/workflow';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { workflowShouldKeepRunning } from 'src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util';
describe('workflowShouldKeepRunning', () => {
describe('should return true if', () => {
it('running or pending step exists', () => {
for (const testStatus of [StepStatus.PENDING, StepStatus.RUNNING]) {
const steps = [
{
id: 'step-1',
} as WorkflowAction,
];
const stepInfos = { 'step-1': { status: testStatus } };
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeTruthy();
}
});
it('success step with not started executable children exists', () => {
const steps = [
{
id: 'step-1',
nextStepIds: ['step-2'],
} as WorkflowAction,
{
id: 'step-2',
} as WorkflowAction,
];
const stepInfos = {
'step-1': { status: StepStatus.SUCCESS },
'step-2': { status: StepStatus.NOT_STARTED },
};
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeTruthy();
});
});
describe('should return false', () => {
it('workflow run only have success steps', () => {
const steps = [
{
id: 'step-1',
} as WorkflowAction,
];
const stepInfos = { 'step-1': { status: StepStatus.SUCCESS } };
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeFalsy();
});
it('success step with not executable not started children exists', () => {
const steps = [
{
id: 'step-1',
nextStepIds: ['step-3'],
} as WorkflowAction,
{
id: 'step-2',
nextStepIds: ['step-3'],
} as WorkflowAction,
{
id: 'step-3',
} as WorkflowAction,
];
const stepInfos = {
'step-1': { status: StepStatus.SUCCESS },
'step-2': { status: StepStatus.FAILED },
'step-3': { status: StepStatus.NOT_STARTED },
};
expect(workflowShouldKeepRunning({ steps, stepInfos })).toBeFalsy();
});
});
});

View File

@ -2,16 +2,23 @@ import { isDefined } from 'twenty-shared/utils';
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
export const canExecuteStep = ({
stepId,
steps,
stepInfos,
workflowRunStatus,
}: {
steps: WorkflowAction[];
stepInfos: WorkflowRunStepInfos;
stepId: string;
workflowRunStatus: WorkflowRunStatus;
}) => {
if (workflowRunStatus !== WorkflowRunStatus.RUNNING) {
return false;
}
if (
isDefined(stepInfos[stepId]?.status) &&
stepInfos[stepId].status !== StepStatus.NOT_STARTED

View File

@ -0,0 +1,17 @@
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export const workflowShouldFail = ({
stepInfos,
steps,
}: {
stepInfos: WorkflowRunStepInfos;
steps: WorkflowAction[];
}) => {
const failedSteps = steps.filter(
(step) => stepInfos[step.id]?.status === StepStatus.FAILED,
);
return failedSteps.length > 0;
};

View File

@ -0,0 +1,38 @@
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
export const workflowShouldKeepRunning = ({
stepInfos,
steps,
}: {
stepInfos: WorkflowRunStepInfos;
steps: WorkflowAction[];
}) => {
const runningOrPendingStepExists = steps.some((step) =>
[StepStatus.PENDING, StepStatus.RUNNING].includes(
stepInfos[step.id]?.status,
),
);
const successStepWithNotStartedExecutableChildren = steps.some(
(step) =>
stepInfos[step.id]?.status === StepStatus.SUCCESS &&
(step.nextStepIds ?? []).some(
(nextStepId) =>
stepInfos[nextStepId]?.status === StepStatus.NOT_STARTED &&
canExecuteStep({
stepId: nextStepId,
steps,
stepInfos,
workflowRunStatus: WorkflowRunStatus.RUNNING,
}),
),
);
return (
runningOrPendingStepExists || successStepWithNotStartedExecutableChildren
);
};

View File

@ -14,7 +14,6 @@ import {
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
jest.mock(
@ -47,9 +46,8 @@ describe('WorkflowExecutorWorkspaceService', () => {
const mockWorkflowRunWorkspaceService = {
endWorkflowRun: jest.fn(),
updateWorkflowRunStepStatus: jest.fn(),
saveWorkflowRunState: jest.fn(),
getWorkflowRun: jest.fn(),
updateWorkflowRunStepInfo: jest.fn(),
getWorkflowRunOrFail: jest.fn(),
};
const mockBillingService = {
@ -125,11 +123,14 @@ describe('WorkflowExecutorWorkspaceService', () => {
nextStepIds: [],
},
] as WorkflowAction[];
const mockStepInfos = {
trigger: { result: {}, status: StepStatus.SUCCESS },
'step-1': { status: StepStatus.NOT_STARTED },
'step-2': { status: StepStatus.NOT_STARTED },
};
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({
mockWorkflowRunWorkspaceService.getWorkflowRunOrFail.mockReturnValue({
state: { flow: { steps: mockSteps }, stepInfos: mockStepInfos },
});
@ -168,32 +169,30 @@ describe('WorkflowExecutorWorkspaceService', () => {
);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(2);
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenCalledTimes(4);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(1, {
stepId: 'step-1',
stepInfo: {
status: StepStatus.RUNNING,
},
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepOutput: {
id: 'step-1',
output: mockStepResult,
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(2, {
stepId: 'step-1',
stepInfo: {
...mockStepResult,
status: StepStatus.SUCCESS,
},
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
stepStatus: StepStatus.SUCCESS,
});
// execute second step
@ -216,34 +215,30 @@ describe('WorkflowExecutorWorkspaceService', () => {
expect(workspaceEventEmitter.emitCustomBatchEvent).not.toHaveBeenCalled();
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(1);
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(1, {
stepId: 'step-1',
stepInfo: {
status: StepStatus.RUNNING,
},
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepOutput: {
id: 'step-1',
output: {
error: 'Step execution failed',
},
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(2, {
stepId: 'step-1',
stepInfo: {
error: 'Step execution failed',
status: StepStatus.FAILED,
},
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
stepStatus: StepStatus.FAILED,
});
});
@ -261,32 +256,29 @@ describe('WorkflowExecutorWorkspaceService', () => {
});
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(1);
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(1, {
stepId: 'step-1',
stepInfo: {
status: StepStatus.RUNNING,
},
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepOutput: {
id: 'step-1',
output: mockPendingEvent,
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenNthCalledWith(2, {
stepId: 'step-1',
stepInfo: {
status: StepStatus.PENDING,
},
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
stepStatus: StepStatus.PENDING,
});
// No recursive call to execute should happen
@ -295,128 +287,6 @@ describe('WorkflowExecutorWorkspaceService', () => {
);
});
it('should continue to next step if continueOnFailure is true', async () => {
const stepsWithContinueOnFailure = [
{
id: 'step-1',
type: WorkflowActionType.CODE,
settings: {
errorHandlingOptions: {
continueOnFailure: { value: true },
retryOnFailure: { value: false },
},
},
nextStepIds: ['step-2'],
},
{
id: 'step-2',
type: WorkflowActionType.SEND_EMAIL,
settings: {
errorHandlingOptions: {
continueOnFailure: { value: false },
retryOnFailure: { value: false },
},
},
},
] as WorkflowAction[];
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValueOnce({
state: {
flow: { steps: stepsWithContinueOnFailure },
stepInfos: mockStepInfos,
},
});
mockWorkflowExecutor.execute.mockResolvedValueOnce({
error: 'Step execution failed but continue',
});
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepId: 'step-1',
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepOutput: {
id: 'step-1',
output: {
error: 'Step execution failed but continue',
},
},
workspaceId: 'workspace-id',
stepStatus: StepStatus.FAILED,
});
// execute second step
expect(workflowActionFactory.get).toHaveBeenCalledWith(
WorkflowActionType.SEND_EMAIL,
);
});
it('should retry on failure if retryOnFailure is true', async () => {
const stepsWithRetryOnFailure = [
{
id: 'step-1',
type: WorkflowActionType.CODE,
settings: {
errorHandlingOptions: {
continueOnFailure: { value: false },
retryOnFailure: { value: true },
},
},
},
] as WorkflowAction[];
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({
state: {
flow: { steps: stepsWithRetryOnFailure },
stepInfos: mockStepInfos,
},
});
mockWorkflowExecutor.execute.mockResolvedValue({
error: 'Step execution failed, will retry',
});
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
for (let attempt = 1; attempt <= 3; attempt++) {
expect(workflowActionFactory.get).toHaveBeenNthCalledWith(
attempt,
WorkflowActionType.CODE,
);
}
expect(workflowActionFactory.get).not.toHaveBeenCalledWith(
WorkflowActionType.SEND_EMAIL,
);
});
it('should stop when billing validation fails', async () => {
mockBillingService.isBillingEnabled.mockReturnValueOnce(true);
mockBillingService.canBillMeteredProduct.mockReturnValueOnce(false);
@ -430,7 +300,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
expect(workflowActionFactory.get).toHaveBeenCalledTimes(0);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenCalledTimes(1);
expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledTimes(
@ -438,24 +308,15 @@ describe('WorkflowExecutorWorkspaceService', () => {
);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
workflowRunWorkspaceService.updateWorkflowRunStepInfo,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
stepOutput: {
id: 'step-1',
output: {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
},
stepId: 'step-1',
stepInfo: {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
status: StepStatus.FAILED,
},
stepStatus: StepStatus.FAILED,
});
expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
status: WorkflowRunStatus.FAILED,
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
});
});

View File

@ -2,18 +2,15 @@ import { Injectable } from '@nestjs/common';
import { isDefined } from 'twenty-shared/utils';
import { getWorkflowRunContext, StepStatus } from 'twenty-shared/workflow';
import { WorkflowRunStepInfo } from 'twenty-shared/src/workflow/types/WorkflowRunStateStepInfos';
import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant';
import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant';
import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum';
import { BillingService } from 'src/engine/core-modules/billing/services/billing.service';
import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
StepOutput,
WorkflowRunStatus,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import {
@ -22,8 +19,9 @@ import {
} from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
const MAX_RETRIES_ON_FAILURE = 3;
import { workflowShouldKeepRunning } from 'src/modules/workflow/workflow-executor/utils/workflow-should-keep-running.util';
import { workflowShouldFail } from 'src/modules/workflow/workflow-executor/utils/workflow-should-fail.util';
import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant';
@Injectable()
export class WorkflowExecutorWorkspaceService {
@ -38,6 +36,7 @@ export class WorkflowExecutorWorkspaceService {
stepIds,
workflowRunId,
workspaceId,
shouldComputeWorkflowRunStatus = true,
}: WorkflowExecutorInput) {
await Promise.all(
stepIds.map(async (stepIdToExecute) => {
@ -48,187 +47,27 @@ export class WorkflowExecutorWorkspaceService {
});
}),
);
if (shouldComputeWorkflowRunStatus) {
await this.computeWorkflowRunStatus({
workflowRunId,
workspaceId,
});
}
}
private async executeFromStep({
stepId,
attemptCount = 1,
workflowRunId,
workspaceId,
}: WorkflowBranchExecutorInput) {
const workflowRunInfo = await this.getWorkflowRunInfoOrEndWorkflowRun({
stepId,
workflowRunId,
workspaceId,
});
if (!isDefined(workflowRunInfo)) {
return;
}
const { stepToExecute, steps, stepInfos } = workflowRunInfo;
if (!canExecuteStep({ stepId, steps, stepInfos })) {
return;
}
const checkCanBillWorkflowNodeExecution =
await this.checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({
stepIdToExecute: stepToExecute.id,
const workflowRun =
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
workflowRunId,
workspaceId,
});
if (!checkCanBillWorkflowNodeExecution) {
return;
}
const workflowAction = this.workflowActionFactory.get(stepToExecute.type);
let actionOutput: WorkflowActionOutput;
await this.workflowRunWorkspaceService.updateWorkflowRunStepStatus({
workflowRunId,
stepId: stepToExecute.id,
workspaceId,
stepStatus: StepStatus.RUNNING,
});
try {
actionOutput = await workflowAction.execute({
currentStepId: stepId,
steps,
context: getWorkflowRunContext(stepInfos),
});
} catch (error) {
actionOutput = {
error: error.message ?? 'Execution result error, no data or error',
};
}
if (!actionOutput.error) {
this.sendWorkflowNodeRunEvent(workspaceId);
}
const stepOutput: StepOutput = {
id: stepToExecute.id,
output: actionOutput,
};
if (actionOutput.pendingEvent) {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput,
workspaceId,
stepStatus: StepStatus.PENDING,
});
return;
}
const actionOutputSuccess = isDefined(actionOutput.result);
const isValidActionOutput =
actionOutputSuccess ||
stepToExecute.settings.errorHandlingOptions.continueOnFailure.value;
if (isValidActionOutput) {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput,
workspaceId,
stepStatus: isDefined(actionOutput.result)
? StepStatus.SUCCESS
: StepStatus.FAILED,
});
if (
!isDefined(stepToExecute.nextStepIds) ||
stepToExecute.nextStepIds.length === 0 ||
actionOutput.shouldEndWorkflowRun === true
) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.COMPLETED,
});
return;
}
await this.executeFromSteps({
stepIds: stepToExecute.nextStepIds,
workflowRunId,
workspaceId,
});
return;
}
if (
stepToExecute.settings.errorHandlingOptions.retryOnFailure.value &&
attemptCount < MAX_RETRIES_ON_FAILURE
) {
await this.executeFromStep({
stepId,
attemptCount: attemptCount + 1,
workflowRunId,
workspaceId,
});
return;
}
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput,
workspaceId,
stepStatus: StepStatus.FAILED,
});
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: stepOutput.output.error,
});
}
private async getWorkflowRunInfoOrEndWorkflowRun({
stepId,
workflowRunId,
workspaceId,
}: {
stepId: string;
workflowRunId: string;
workspaceId: string;
}) {
const workflowRun = await this.workflowRunWorkspaceService.getWorkflowRun({
workflowRunId,
workspaceId,
});
if (!isDefined(workflowRun)) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: `WorkflowRun ${workflowRunId} not found`,
});
return;
}
if (!isDefined(workflowRun?.state)) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: `WorkflowRun ${workflowRunId} doesn't have any state`,
});
return;
}
const stepInfos = workflowRun.state.stepInfos;
const steps = workflowRun.state.flow.steps;
@ -245,11 +84,142 @@ export class WorkflowExecutorWorkspaceService {
return;
}
return {
stepToExecute,
steps,
stepInfos: workflowRun.state.stepInfos,
};
if (
!canExecuteStep({
stepId,
steps,
stepInfos,
workflowRunStatus: workflowRun.status,
})
) {
return;
}
let actionOutput: WorkflowActionOutput;
if (await this.canBillWorkflowNodeExecution(workspaceId)) {
const workflowAction = this.workflowActionFactory.get(stepToExecute.type);
await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({
stepId,
stepInfo: {
status: StepStatus.RUNNING,
},
workflowRunId,
workspaceId,
});
try {
actionOutput = await workflowAction.execute({
currentStepId: stepId,
steps,
context: getWorkflowRunContext(stepInfos),
});
} catch (error) {
actionOutput = {
error: error.message ?? 'Execution result error, no data or error',
};
}
} else {
actionOutput = {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
};
}
const isPendingEvent = actionOutput.pendingEvent;
const isSuccess = isDefined(actionOutput.result);
const isError = isDefined(actionOutput.error);
const isStopped = actionOutput.shouldEndWorkflowRun;
if (!isError) {
this.sendWorkflowNodeRunEvent(workspaceId);
}
let stepInfo: WorkflowRunStepInfo;
if (isPendingEvent) {
stepInfo = {
status: StepStatus.PENDING,
};
} else if (isStopped) {
stepInfo = {
status: StepStatus.STOPPED,
result: actionOutput?.result,
};
} else if (isSuccess) {
stepInfo = {
status: StepStatus.SUCCESS,
result: actionOutput?.result,
};
} else {
stepInfo = {
status: StepStatus.FAILED,
error: actionOutput?.error,
};
}
await this.workflowRunWorkspaceService.updateWorkflowRunStepInfo({
stepId,
stepInfo,
workflowRunId,
workspaceId,
});
if (
isSuccess &&
!isStopped &&
isDefined(stepToExecute.nextStepIds) &&
stepToExecute.nextStepIds.length > 0
) {
await this.executeFromSteps({
stepIds: stepToExecute.nextStepIds,
workflowRunId,
workspaceId,
shouldComputeWorkflowRunStatus: false,
});
}
}
private async computeWorkflowRunStatus({
workflowRunId,
workspaceId,
}: {
workflowRunId: string;
workspaceId: string;
}) {
const workflowRun =
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
workflowRunId,
workspaceId,
});
const stepInfos = workflowRun.state.stepInfos;
const steps = workflowRun.state.flow.steps;
if (workflowShouldKeepRunning({ stepInfos, steps })) {
return;
}
if (workflowShouldFail({ stepInfos, steps })) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: 'WorkflowRun failed',
});
return;
}
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.COMPLETED,
});
}
private sendWorkflowNodeRunEvent(workspaceId: string) {
@ -265,45 +235,13 @@ export class WorkflowExecutorWorkspaceService {
);
}
private async checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({
stepIdToExecute,
workflowRunId,
workspaceId,
}: {
stepIdToExecute: string;
workflowRunId: string;
workspaceId: string;
}) {
const canBillWorkflowNodeExecution =
private async canBillWorkflowNodeExecution(workspaceId: string) {
return (
!this.billingService.isBillingEnabled() ||
(await this.billingService.canBillMeteredProduct(
workspaceId,
BillingProductKey.WORKFLOW_NODE_EXECUTION,
));
if (!canBillWorkflowNodeExecution) {
const billingOutput = {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
};
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workspaceId,
workflowRunId,
stepOutput: {
id: stepIdToExecute,
output: billingOutput,
},
stepStatus: StepStatus.FAILED,
});
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
});
}
return canBillWorkflowNodeExecution;
))
);
}
}

View File

@ -4,6 +4,7 @@ import { isDefined } from 'twenty-shared/utils';
import { StepStatus } from 'twenty-shared/workflow';
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity';
import { v4 } from 'uuid';
import { WorkflowRunStepInfo } from 'twenty-shared/src/workflow/types/WorkflowRunStateStepInfos';
import { WithLock } from 'src/engine/core-modules/cache-lock/with-lock.decorator';
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
@ -13,7 +14,6 @@ import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/compos
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import {
StepOutput,
WorkflowRunState,
WorkflowRunStatus,
WorkflowRunWorkspaceEntity,
@ -205,16 +205,16 @@ export class WorkflowRunWorkspaceService {
}
@WithLock('workflowRunId')
async updateWorkflowRunStepStatus({
async updateWorkflowRunStepInfo({
stepId,
stepInfo,
workflowRunId,
workspaceId,
stepId,
stepStatus,
}: {
workflowRunId: string;
stepId: string;
stepInfo: WorkflowRunStepInfo;
workflowRunId: string;
workspaceId: string;
stepStatus: StepStatus;
}) {
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
@ -227,43 +227,10 @@ export class WorkflowRunWorkspaceService {
stepInfos: {
...workflowRunToUpdate.state?.stepInfos,
[stepId]: {
...(workflowRunToUpdate.state?.stepInfos?.[stepId] || {}),
status: stepStatus,
},
},
},
};
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
}
@WithLock('workflowRunId')
async saveWorkflowRunState({
workflowRunId,
stepOutput,
workspaceId,
stepStatus,
}: {
workflowRunId: string;
stepOutput: StepOutput;
workspaceId: string;
stepStatus: StepStatus;
}) {
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
workspaceId,
});
const partialUpdate = {
state: {
...workflowRunToUpdate.state,
stepInfos: {
...workflowRunToUpdate.state?.stepInfos,
[stepOutput.id]: {
...(workflowRunToUpdate.state?.stepInfos[stepOutput.id] || {}),
result: stepOutput.output?.result,
error: stepOutput.output?.error,
status: stepStatus,
...(workflowRunToUpdate.state?.stepInfos[stepId] || {}),
result: stepInfo?.result,
error: stepInfo?.error,
status: stepInfo.status,
},
},
},

View File

@ -2,6 +2,7 @@ export enum StepStatus {
NOT_STARTED = 'NOT_STARTED',
RUNNING = 'RUNNING',
SUCCESS = 'SUCCESS',
STOPPED = 'STOPPED',
FAILED = 'FAILED',
PENDING = 'PENDING',
}