Start using next step ids (#11683)

- update workflow executor
- update next step ids on step creation/deletion
- use these in workflow run
- use these in variables
This commit is contained in:
Thomas Trompette
2025-04-29 16:29:25 +02:00
committed by GitHub
parent 19f46a0091
commit d8b2e1fb34
28 changed files with 325 additions and 105 deletions

View File

@ -9,4 +9,5 @@ export class WorkflowStepExecutorException extends CustomException {
export enum WorkflowStepExecutorExceptionCode {
SCOPED_WORKSPACE_NOT_FOUND = 'SCOPED_WORKSPACE_NOT_FOUND',
INVALID_STEP_TYPE = 'INVALID_STEP_TYPE',
STEP_NOT_FOUND = 'STEP_NOT_FOUND',
}

View File

@ -1,7 +1,7 @@
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export type WorkflowExecutorInput = {
currentStepIndex: number;
currentStepId: string;
steps: WorkflowAction[];
context: Record<string, unknown>;
workflowRunId: string;

View File

@ -22,11 +22,18 @@ export class CodeWorkflowAction implements WorkflowExecutor {
) {}
async execute({
currentStepIndex,
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
const step = steps.find((step) => step.id === currentStepId);
if (!step) {
throw new WorkflowStepExecutorException(
'Step not found',
WorkflowStepExecutorExceptionCode.STEP_NOT_FOUND,
);
}
if (!isWorkflowCodeAction(step)) {
throw new WorkflowStepExecutorException(

View File

@ -13,10 +13,17 @@ import { isWorkflowFormAction } from 'src/modules/workflow/workflow-executor/wor
@Injectable()
export class FormWorkflowAction implements WorkflowExecutor {
async execute({
currentStepIndex,
currentStepId,
steps,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
const step = steps.find((step) => step.id === currentStepId);
if (!step) {
throw new WorkflowStepExecutorException(
'Step not found',
WorkflowStepExecutorExceptionCode.STEP_NOT_FOUND,
);
}
if (!isWorkflowFormAction(step)) {
throw new WorkflowStepExecutorException(

View File

@ -75,11 +75,18 @@ export class SendEmailWorkflowAction implements WorkflowExecutor {
}
async execute({
currentStepIndex,
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
const step = steps.find((step) => step.id === currentStepId);
if (!step) {
throw new WorkflowStepExecutorException(
'Step not found',
WorkflowStepExecutorExceptionCode.STEP_NOT_FOUND,
);
}
if (!isWorkflowSendEmailAction(step)) {
throw new WorkflowStepExecutorException(

View File

@ -42,12 +42,18 @@ export class CreateRecordWorkflowAction implements WorkflowExecutor {
) {}
async execute({
currentStepIndex,
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
const step = steps.find((step) => step.id === currentStepId);
if (!step) {
throw new WorkflowStepExecutorException(
'Step not found',
WorkflowStepExecutorExceptionCode.STEP_NOT_FOUND,
);
}
if (!isWorkflowCreateRecordAction(step)) {
throw new WorkflowStepExecutorException(
'Step is not a create record action',

View File

@ -35,12 +35,18 @@ export class DeleteRecordWorkflowAction implements WorkflowExecutor {
) {}
async execute({
currentStepIndex,
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
const step = steps.find((step) => step.id === currentStepId);
if (!step) {
throw new WorkflowStepExecutorException(
'Step not found',
WorkflowStepExecutorExceptionCode.STEP_NOT_FOUND,
);
}
if (!isWorkflowDeleteRecordAction(step)) {
throw new WorkflowStepExecutorException(
'Step is not a delete record action',

View File

@ -45,11 +45,18 @@ export class FindRecordsWorkflowAction implements WorkflowExecutor {
) {}
async execute({
currentStepIndex,
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
const step = steps.find((step) => step.id === currentStepId);
if (!step) {
throw new WorkflowStepExecutorException(
'Step not found',
WorkflowStepExecutorExceptionCode.STEP_NOT_FOUND,
);
}
if (!isWorkflowFindRecordsAction(step)) {
throw new WorkflowStepExecutorException(

View File

@ -42,11 +42,18 @@ export class UpdateRecordWorkflowAction implements WorkflowExecutor {
) {}
async execute({
currentStepIndex,
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
const step = steps.find((step) => step.id === currentStepId);
if (!step) {
throw new WorkflowStepExecutorException(
'Step not found',
WorkflowStepExecutorExceptionCode.STEP_NOT_FOUND,
);
}
if (!isWorkflowUpdateRecordAction(step)) {
throw new WorkflowStepExecutorException(

View File

@ -107,6 +107,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
retryOnFailure: { value: false },
},
},
nextStepIds: ['step-2'],
},
{
id: 'step-2',
@ -117,6 +118,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
retryOnFailure: { value: false },
},
},
nextStepIds: [],
},
] as WorkflowAction[];
@ -124,7 +126,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
// No steps to execute
const result = await service.execute({
workflowRunId: mockWorkflowRunId,
currentStepIndex: 2,
currentStepId: 'step-2',
steps: mockSteps,
context: mockContext,
});
@ -145,7 +147,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
const result = await service.execute({
workflowRunId: mockWorkflowRunId,
currentStepIndex: 0,
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
});
@ -156,7 +158,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
);
expect(mockWorkflowExecutor.execute).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
currentStepIndex: 0,
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
attemptCount: 1,
@ -199,7 +201,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
const result = await service.execute({
workflowRunId: mockWorkflowRunId,
currentStepIndex: 0,
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
});
@ -231,7 +233,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
const result = await service.execute({
workflowRunId: mockWorkflowRunId,
currentStepIndex: 0,
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
});
@ -265,6 +267,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
retryOnFailure: { value: false },
},
},
nextStepIds: ['step-2'],
},
{
id: 'step-2',
@ -284,7 +287,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
const result = await service.execute({
workflowRunId: mockWorkflowRunId,
currentStepIndex: 0,
currentStepId: 'step-1',
steps: stepsWithContinueOnFailure,
context: mockContext,
});
@ -330,7 +333,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
await service.execute({
workflowRunId: mockWorkflowRunId,
currentStepIndex: 0,
currentStepId: 'step-1',
steps: stepsWithRetryOnFailure,
context: mockContext,
});
@ -367,7 +370,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
const result = await service.execute({
workflowRunId: mockWorkflowRunId,
currentStepIndex: 0,
currentStepId: 'step-1',
steps: stepsWithRetryOnFailure,
context: mockContext,
attemptCount: 3, // MAX_RETRIES_ON_FAILURE is 3
@ -394,7 +397,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
const result = await service.execute({
workflowRunId: mockWorkflowRunId,
currentStepIndex: 0,
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
});

View File

@ -1,5 +1,7 @@
import { Injectable } from '@nestjs/common';
import { isDefined } from 'twenty-shared/utils';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant';
@ -38,22 +40,20 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor {
) {}
async execute({
currentStepIndex,
currentStepId,
steps,
context,
attemptCount = 1,
workflowRunId,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
if (currentStepIndex >= steps.length) {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {
return {
result: {
success: true,
},
error: 'Step not found',
};
}
const step = steps[currentStepIndex];
const workflowExecutor = this.workflowExecutorFactory.get(step.type);
let actionOutput: WorkflowExecutorOutput;
@ -80,7 +80,7 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor {
try {
actionOutput = await workflowExecutor.execute({
currentStepIndex,
currentStepId,
steps,
context,
attemptCount,
@ -111,11 +111,17 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor {
return actionOutput;
}
if (actionOutput.result) {
const updatedContext = {
...context,
[step.id]: actionOutput.result,
};
const shouldContinue =
isDefined(actionOutput.result) ||
step.settings.errorHandlingOptions.continueOnFailure.value;
if (shouldContinue) {
const updatedContext = isDefined(actionOutput.result)
? {
...context,
[step.id]: actionOutput.result,
}
: context;
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
@ -123,36 +129,26 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor {
context: updatedContext,
});
if (!isDefined(step.nextStepIds?.[0])) {
return actionOutput;
}
// TODO: handle multiple next steps
return await this.execute({
workflowRunId,
currentStepIndex: currentStepIndex + 1,
currentStepId: step.nextStepIds[0],
steps,
context: updatedContext,
});
}
if (step.settings.errorHandlingOptions.continueOnFailure.value) {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput,
context,
});
return await this.execute({
workflowRunId,
currentStepIndex: currentStepIndex + 1,
steps,
context,
});
}
if (
step.settings.errorHandlingOptions.retryOnFailure.value &&
attemptCount < MAX_RETRIES_ON_FAILURE
) {
return await this.execute({
workflowRunId,
currentStepIndex,
currentStepId,
steps,
context,
attemptCount: attemptCount + 1,

View File

@ -5,7 +5,6 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { ThrottlerService } from 'src/engine/core-modules/throttler/throttler.service';
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
@ -31,7 +30,6 @@ export class RunWorkflowJob {
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
private readonly throttlerService: ThrottlerService,
private readonly twentyConfigService: TwentyConfigService,
private readonly twentyORMManager: TwentyORMManager,
) {}
@Process(RunWorkflowJob.name)
@ -109,7 +107,7 @@ export class RunWorkflowJob {
await this.executeWorkflow({
workflowRunId,
currentStepIndex: 0,
currentStepId: workflowVersion.steps[0].id,
steps: workflowVersion.steps,
context,
});
@ -134,20 +132,31 @@ export class RunWorkflowJob {
);
}
const lastExecutedStepIndex = workflowRun.output?.flow?.steps?.findIndex(
const lastExecutedStep = workflowRun.output?.flow?.steps?.find(
(step) => step.id === lastExecutedStepId,
);
if (lastExecutedStepIndex === undefined) {
if (!lastExecutedStep) {
throw new WorkflowRunException(
'Last executed step not found',
WorkflowRunExceptionCode.INVALID_INPUT,
);
}
const nextStepId = lastExecutedStep.nextStepIds?.[0];
if (!nextStepId) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
status: WorkflowRunStatus.COMPLETED,
});
return;
}
await this.executeWorkflow({
workflowRunId,
currentStepIndex: lastExecutedStepIndex + 1,
currentStepId: nextStepId,
steps: workflowRun.output?.flow?.steps ?? [],
context: workflowRun.context ?? {},
});
@ -155,19 +164,19 @@ export class RunWorkflowJob {
private async executeWorkflow({
workflowRunId,
currentStepIndex,
currentStepId,
steps,
context,
}: {
workflowRunId: string;
currentStepIndex: number;
currentStepId: string;
steps: WorkflowAction[];
context: Record<string, any>;
}) {
const { error, pendingEvent } =
await this.workflowExecutorWorkspaceService.execute({
workflowRunId,
currentStepIndex,
currentStepId,
steps,
context,
});