Add submit form step endpoint (#10538)

- add endpoint to submit form step
- update context and output of workflow run
- resume workflow execution
This commit is contained in:
Thomas Trompette
2025-02-28 10:03:45 +01:00
committed by GitHub
parent 15d3751b73
commit a29c26c8d7
11 changed files with 285 additions and 64 deletions

View File

@ -0,0 +1,24 @@
import { Field, InputType } from '@nestjs/graphql';
import graphqlTypeJson from 'graphql-type-json';
@InputType()
export class SubmitFormStepInput {
@Field(() => String, {
description: 'Workflow version ID',
nullable: false,
})
stepId: string;
@Field(() => String, {
description: 'Workflow run ID',
nullable: false,
})
workflowRunId: string;
@Field(() => graphqlTypeJson, {
description: 'Form response in JSON format',
nullable: false,
})
response: JSON;
}

View File

@ -3,13 +3,14 @@ import { Args, Mutation, Resolver } from '@nestjs/graphql';
import { CreateWorkflowVersionStepInput } from 'src/engine/core-modules/workflow/dtos/create-workflow-version-step-input.dto';
import { DeleteWorkflowVersionStepInput } from 'src/engine/core-modules/workflow/dtos/delete-workflow-version-step-input.dto';
import { SubmitFormStepInput } from 'src/engine/core-modules/workflow/dtos/submit-form-step-input.dto';
import { UpdateWorkflowVersionStepInput } from 'src/engine/core-modules/workflow/dtos/update-workflow-version-step-input.dto';
import { WorkflowActionDTO } from 'src/engine/core-modules/workflow/dtos/workflow-step.dto';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator';
import { UserAuthGuard } from 'src/engine/guards/user-auth.guard';
import { WorkspaceAuthGuard } from 'src/engine/guards/workspace-auth.guard';
import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service';
import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service';
@Resolver()
@UseGuards(WorkspaceAuthGuard, UserAuthGuard)
@ -56,4 +57,20 @@ export class WorkflowVersionStepResolver {
stepId,
});
}
@Mutation(() => Boolean)
async submitFormStep(
@AuthWorkspace() { id: workspaceId }: Workspace,
@Args('input')
{ stepId, workflowRunId, response }: SubmitFormStepInput,
) {
await this.workflowVersionStepWorkspaceService.submitFormStep({
workspaceId,
stepId,
workflowRunId,
response,
});
return true;
}
}

View File

@ -1,16 +1,16 @@
import { Injectable } from '@nestjs/common';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowActionType } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/exceptions/workflow-trigger.exception';
import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository';
import { WorkflowActionType } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
@Injectable()
export class WorkflowCommonWorkspaceService {

View File

@ -4,7 +4,7 @@ import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { WorkflowSchemaModule } from 'src/modules/workflow/workflow-builder/workflow-schema/workflow-schema.module';
import { WorkflowVersionStepModule } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.module';
import { WorkflowVersionStepModule } from 'src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.module';
import { WorkflowVersionModule } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-version.module';
@Module({

View File

@ -5,12 +5,16 @@ import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module';
import { WorkflowSchemaModule } from 'src/modules/workflow/workflow-builder/workflow-schema/workflow-schema.module';
import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service';
import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service';
import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.module';
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
@Module({
imports: [
WorkflowSchemaModule,
ServerlessFunctionModule,
WorkflowRunnerModule,
WorkflowRunModule,
NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'),
],
providers: [WorkflowVersionStepWorkspaceService],

View File

@ -14,6 +14,7 @@ import {
WorkflowVersionStepException,
WorkflowVersionStepExceptionCode,
} from 'src/modules/workflow/common/exceptions/workflow-version-step.exception';
import { StepOutput } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowSchemaWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-schema/workflow-schema.workspace-service';
import { BaseWorkflowActionSettings } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-settings.type';
@ -21,6 +22,8 @@ import {
WorkflowAction,
WorkflowActionType,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service';
const TRIGGER_STEP_ID = 'trigger';
@ -44,6 +47,8 @@ export class WorkflowVersionStepWorkspaceService {
private readonly serverlessFunctionService: ServerlessFunctionService,
@InjectRepository(ObjectMetadataEntity, 'metadata')
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService,
) {}
async createWorkflowVersionStep({
@ -237,6 +242,58 @@ export class WorkflowVersionStepWorkspaceService {
}
}
async submitFormStep({
workspaceId,
stepId,
workflowRunId,
response,
}: {
workspaceId: string;
stepId: string;
workflowRunId: string;
response: object;
}) {
const workflowRun =
await this.workflowRunWorkspaceService.getWorkflowRunOrFail(
workflowRunId,
);
const step = workflowRun.output?.flow?.steps?.find(
(step) => step.id === stepId,
);
if (!isDefined(step)) {
throw new WorkflowVersionStepException(
'Step not found',
WorkflowVersionStepExceptionCode.NOT_FOUND,
);
}
const newStepOutput: StepOutput = {
id: stepId,
output: {
result: response,
},
};
const updatedContext = {
...workflowRun.context,
[stepId]: response,
};
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput: newStepOutput,
context: updatedContext,
});
await this.workflowRunnerWorkspaceService.resume({
workspaceId,
workflowRunId,
lastExecutedStepId: stepId,
});
}
private async enrichOutputSchema({
step,
workspaceId,

View File

@ -5,8 +5,7 @@ import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module';
import { WorkflowSchemaModule } from 'src/modules/workflow/workflow-builder/workflow-schema/workflow-schema.module';
import { WorkflowVersionStepModule } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.module';
import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service';
import { WorkflowVersionStepModule } from 'src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.module';
import { WorkflowVersionWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-version.workspace-service';
@Module({
@ -14,15 +13,10 @@ import { WorkflowVersionWorkspaceService } from 'src/modules/workflow/workflow-b
WorkflowVersionStepModule,
WorkflowSchemaModule,
ServerlessFunctionModule,
WorkflowVersionStepModule,
NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'),
],
providers: [
WorkflowVersionWorkspaceService,
WorkflowVersionStepWorkspaceService,
],
exports: [
WorkflowVersionWorkspaceService,
WorkflowVersionStepWorkspaceService,
],
providers: [WorkflowVersionWorkspaceService],
exports: [WorkflowVersionWorkspaceService],
})
export class WorkflowVersionModule {}

View File

@ -19,7 +19,7 @@ import {
import { assertWorkflowVersionHasSteps } from 'src/modules/workflow/common/utils/assert-workflow-version-has-steps';
import { assertWorkflowVersionIsDraft } from 'src/modules/workflow/common/utils/assert-workflow-version-is-draft.util';
import { assertWorkflowVersionTriggerIsDefined } from 'src/modules/workflow/common/utils/assert-workflow-version-trigger-is-defined.util';
import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service';
import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
@Injectable()

View File

@ -5,8 +5,10 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { ThrottlerService } from 'src/engine/core-modules/throttler/throttler.service';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service';
import {
WorkflowRunException,
@ -16,9 +18,9 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne
export type RunWorkflowJobData = {
workspaceId: string;
workflowVersionId: string;
workflowRunId: string;
payload: object;
payload?: object;
lastExecutedStepId?: string;
};
@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
@ -29,61 +31,27 @@ export class RunWorkflowJob {
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
private readonly throttlerService: ThrottlerService,
private readonly environmentService: EnvironmentService,
private readonly twentyORMManager: TwentyORMManager,
) {}
@Process(RunWorkflowJob.name)
async handle({
workflowVersionId,
workflowRunId,
payload,
lastExecutedStepId,
}: RunWorkflowJobData): Promise<void> {
const context = {
trigger: payload,
};
try {
const workflowVersion =
await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail(
workflowVersionId,
);
if (!workflowVersion.trigger || !workflowVersion.steps) {
throw new WorkflowRunException(
'Workflow version has no trigger or steps',
WorkflowRunExceptionCode.WORKFLOW_RUN_INVALID,
);
}
await this.workflowRunWorkspaceService.startWorkflowRun({
workflowRunId,
context,
output: {
flow: {
trigger: workflowVersion.trigger,
steps: workflowVersion.steps,
},
},
});
await this.throttleExecution(workflowVersion.workflowId);
const { error, pendingEvent } =
await this.workflowExecutorWorkspaceService.execute({
if (lastExecutedStepId) {
await this.resumeWorkflowExecution({
workflowRunId,
currentStepIndex: 0,
steps: workflowVersion.steps,
context,
lastExecutedStepId,
});
} else {
await this.startWorkflowExecution({
workflowRunId,
payload: payload ?? {},
});
if (pendingEvent) {
return;
}
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
status: error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED,
error,
});
} catch (error) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
@ -93,6 +61,123 @@ export class RunWorkflowJob {
}
}
private async startWorkflowExecution({
workflowRunId,
payload,
}: {
workflowRunId: string;
payload: object;
}): Promise<void> {
const context = {
trigger: payload,
};
const workflowRun =
await this.workflowRunWorkspaceService.getWorkflowRunOrFail(
workflowRunId,
);
const workflowVersion =
await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail(
workflowRun.workflowVersionId,
);
if (!workflowVersion.trigger || !workflowVersion.steps) {
throw new WorkflowRunException(
'Workflow version has no trigger or steps',
WorkflowRunExceptionCode.WORKFLOW_RUN_INVALID,
);
}
await this.workflowRunWorkspaceService.startWorkflowRun({
workflowRunId,
context,
output: {
flow: {
trigger: workflowVersion.trigger,
steps: workflowVersion.steps,
},
},
});
await this.throttleExecution(workflowVersion.workflowId);
await this.executeWorkflow({
workflowRunId,
currentStepIndex: 0,
steps: workflowVersion.steps,
context,
});
}
private async resumeWorkflowExecution({
workflowRunId,
lastExecutedStepId,
}: {
workflowRunId: string;
lastExecutedStepId: string;
}): Promise<void> {
const workflowRun =
await this.workflowRunWorkspaceService.getWorkflowRunOrFail(
workflowRunId,
);
if (workflowRun.status !== WorkflowRunStatus.RUNNING) {
throw new WorkflowRunException(
'Workflow is not running',
WorkflowRunExceptionCode.WORKFLOW_RUN_INVALID,
);
}
const lastExecutedStepIndex = workflowRun.output?.flow?.steps?.findIndex(
(step) => step.id === lastExecutedStepId,
);
if (lastExecutedStepIndex === undefined) {
throw new WorkflowRunException(
'Last executed step not found',
WorkflowRunExceptionCode.INVALID_INPUT,
);
}
await this.executeWorkflow({
workflowRunId,
currentStepIndex: lastExecutedStepIndex + 1,
steps: workflowRun.output?.flow?.steps ?? [],
context: workflowRun.context ?? {},
});
}
private async executeWorkflow({
workflowRunId,
currentStepIndex,
steps,
context,
}: {
workflowRunId: string;
currentStepIndex: number;
steps: WorkflowAction[];
context: Record<string, any>;
}) {
const { error, pendingEvent } =
await this.workflowExecutorWorkspaceService.execute({
workflowRunId,
currentStepIndex,
steps,
context,
});
if (pendingEvent) {
return;
}
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
status: error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED,
error,
});
}
private async throttleExecution(workflowId: string) {
try {
await this.throttlerService.throttle(

View File

@ -163,4 +163,26 @@ export class WorkflowRunWorkspaceService {
context,
});
}
async getWorkflowRunOrFail(
workflowRunId: string,
): Promise<WorkflowRunWorkspaceEntity> {
const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun',
);
const workflowRun = await workflowRunRepository.findOne({
where: { id: workflowRunId },
});
if (!workflowRun) {
throw new WorkflowRunException(
'Workflow run not found',
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}
return workflowRun;
}
}

View File

@ -45,7 +45,6 @@ export class WorkflowRunnerWorkspaceService {
RunWorkflowJob.name,
{
workspaceId,
workflowVersionId,
payload: payload,
workflowRunId,
},
@ -53,4 +52,23 @@ export class WorkflowRunnerWorkspaceService {
return { workflowRunId };
}
async resume({
workspaceId,
workflowRunId,
lastExecutedStepId,
}: {
workspaceId: string;
workflowRunId: string;
lastExecutedStepId: string;
}) {
await this.messageQueueService.add<RunWorkflowJobData>(
RunWorkflowJob.name,
{
workspaceId,
workflowRunId,
lastExecutedStepId,
},
);
}
}