diff --git a/packages/twenty-server/src/engine/core-modules/workflow/dtos/submit-form-step-input.dto.ts b/packages/twenty-server/src/engine/core-modules/workflow/dtos/submit-form-step-input.dto.ts new file mode 100644 index 000000000..212a09351 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/workflow/dtos/submit-form-step-input.dto.ts @@ -0,0 +1,24 @@ +import { Field, InputType } from '@nestjs/graphql'; + +import graphqlTypeJson from 'graphql-type-json'; + +@InputType() +export class SubmitFormStepInput { + @Field(() => String, { + description: 'Workflow version ID', + nullable: false, + }) + stepId: string; + + @Field(() => String, { + description: 'Workflow run ID', + nullable: false, + }) + workflowRunId: string; + + @Field(() => graphqlTypeJson, { + description: 'Form response in JSON format', + nullable: false, + }) + response: JSON; +} diff --git a/packages/twenty-server/src/engine/core-modules/workflow/resolvers/workflow-version-step.resolver.ts b/packages/twenty-server/src/engine/core-modules/workflow/resolvers/workflow-version-step.resolver.ts index 15481cc8a..f6a010448 100644 --- a/packages/twenty-server/src/engine/core-modules/workflow/resolvers/workflow-version-step.resolver.ts +++ b/packages/twenty-server/src/engine/core-modules/workflow/resolvers/workflow-version-step.resolver.ts @@ -3,13 +3,14 @@ import { Args, Mutation, Resolver } from '@nestjs/graphql'; import { CreateWorkflowVersionStepInput } from 'src/engine/core-modules/workflow/dtos/create-workflow-version-step-input.dto'; import { DeleteWorkflowVersionStepInput } from 'src/engine/core-modules/workflow/dtos/delete-workflow-version-step-input.dto'; +import { SubmitFormStepInput } from 'src/engine/core-modules/workflow/dtos/submit-form-step-input.dto'; import { UpdateWorkflowVersionStepInput } from 'src/engine/core-modules/workflow/dtos/update-workflow-version-step-input.dto'; import { WorkflowActionDTO } from 'src/engine/core-modules/workflow/dtos/workflow-step.dto'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator'; import { UserAuthGuard } from 'src/engine/guards/user-auth.guard'; import { WorkspaceAuthGuard } from 'src/engine/guards/workspace-auth.guard'; -import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service'; +import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service'; @Resolver() @UseGuards(WorkspaceAuthGuard, UserAuthGuard) @@ -56,4 +57,20 @@ export class WorkflowVersionStepResolver { stepId, }); } + + @Mutation(() => Boolean) + async submitFormStep( + @AuthWorkspace() { id: workspaceId }: Workspace, + @Args('input') + { stepId, workflowRunId, response }: SubmitFormStepInput, + ) { + await this.workflowVersionStepWorkspaceService.submitFormStep({ + workspaceId, + stepId, + workflowRunId, + response, + }); + + return true; + } } diff --git a/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-common.workspace-service.ts b/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-common.workspace-service.ts index 3adfcf8cf..f2419f15b 100644 --- a/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-common.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-common.workspace-service.ts @@ -1,16 +1,16 @@ import { Injectable } from '@nestjs/common'; +import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; +import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { WorkflowActionType } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowTriggerException, WorkflowTriggerExceptionCode, } from 'src/modules/workflow/workflow-trigger/exceptions/workflow-trigger.exception'; -import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; -import { WorkflowActionType } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; -import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; @Injectable() export class WorkflowCommonWorkspaceService { diff --git a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-builder.module.ts b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-builder.module.ts index 7bc33a261..dc9ce358a 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-builder.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-builder.module.ts @@ -4,7 +4,7 @@ import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { WorkflowSchemaModule } from 'src/modules/workflow/workflow-builder/workflow-schema/workflow-schema.module'; -import { WorkflowVersionStepModule } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.module'; +import { WorkflowVersionStepModule } from 'src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.module'; import { WorkflowVersionModule } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-version.module'; @Module({ diff --git a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.module.ts b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.module.ts similarity index 71% rename from packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.module.ts rename to packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.module.ts index 494e21bab..6e4d92d4f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.module.ts @@ -5,12 +5,16 @@ import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module'; import { WorkflowSchemaModule } from 'src/modules/workflow/workflow-builder/workflow-schema/workflow-schema.module'; -import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service'; +import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service'; +import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.module'; +import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module'; @Module({ imports: [ WorkflowSchemaModule, ServerlessFunctionModule, + WorkflowRunnerModule, + WorkflowRunModule, NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'), ], providers: [WorkflowVersionStepWorkspaceService], diff --git a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts similarity index 88% rename from packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service.ts rename to packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts index a881c3850..b1e282a13 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts @@ -14,6 +14,7 @@ import { WorkflowVersionStepException, WorkflowVersionStepExceptionCode, } from 'src/modules/workflow/common/exceptions/workflow-version-step.exception'; +import { StepOutput } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowSchemaWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-schema/workflow-schema.workspace-service'; import { BaseWorkflowActionSettings } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-settings.type'; @@ -21,6 +22,8 @@ import { WorkflowAction, WorkflowActionType, } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; +import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; const TRIGGER_STEP_ID = 'trigger'; @@ -44,6 +47,8 @@ export class WorkflowVersionStepWorkspaceService { private readonly serverlessFunctionService: ServerlessFunctionService, @InjectRepository(ObjectMetadataEntity, 'metadata') private readonly objectMetadataRepository: Repository, + private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, + private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService, ) {} async createWorkflowVersionStep({ @@ -237,6 +242,58 @@ export class WorkflowVersionStepWorkspaceService { } } + async submitFormStep({ + workspaceId, + stepId, + workflowRunId, + response, + }: { + workspaceId: string; + stepId: string; + workflowRunId: string; + response: object; + }) { + const workflowRun = + await this.workflowRunWorkspaceService.getWorkflowRunOrFail( + workflowRunId, + ); + + const step = workflowRun.output?.flow?.steps?.find( + (step) => step.id === stepId, + ); + + if (!isDefined(step)) { + throw new WorkflowVersionStepException( + 'Step not found', + WorkflowVersionStepExceptionCode.NOT_FOUND, + ); + } + + const newStepOutput: StepOutput = { + id: stepId, + output: { + result: response, + }, + }; + + const updatedContext = { + ...workflowRun.context, + [stepId]: response, + }; + + await this.workflowRunWorkspaceService.saveWorkflowRunState({ + workflowRunId, + stepOutput: newStepOutput, + context: updatedContext, + }); + + await this.workflowRunnerWorkspaceService.resume({ + workspaceId, + workflowRunId, + lastExecutedStepId: stepId, + }); + } + private async enrichOutputSchema({ step, workspaceId, diff --git a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-version.module.ts b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-version.module.ts index 48de4d2e6..6fc67af17 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-version.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-version.module.ts @@ -5,8 +5,7 @@ import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module'; import { WorkflowSchemaModule } from 'src/modules/workflow/workflow-builder/workflow-schema/workflow-schema.module'; -import { WorkflowVersionStepModule } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.module'; -import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service'; +import { WorkflowVersionStepModule } from 'src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.module'; import { WorkflowVersionWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-version.workspace-service'; @Module({ @@ -14,15 +13,10 @@ import { WorkflowVersionWorkspaceService } from 'src/modules/workflow/workflow-b WorkflowVersionStepModule, WorkflowSchemaModule, ServerlessFunctionModule, + WorkflowVersionStepModule, NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'), ], - providers: [ - WorkflowVersionWorkspaceService, - WorkflowVersionStepWorkspaceService, - ], - exports: [ - WorkflowVersionWorkspaceService, - WorkflowVersionStepWorkspaceService, - ], + providers: [WorkflowVersionWorkspaceService], + exports: [WorkflowVersionWorkspaceService], }) export class WorkflowVersionModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-version.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-version.workspace-service.ts index 945326a53..0e773d480 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-version.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-version/workflow-version.workspace-service.ts @@ -19,7 +19,7 @@ import { import { assertWorkflowVersionHasSteps } from 'src/modules/workflow/common/utils/assert-workflow-version-has-steps'; import { assertWorkflowVersionIsDraft } from 'src/modules/workflow/common/utils/assert-workflow-version-is-draft.util'; import { assertWorkflowVersionTriggerIsDefined } from 'src/modules/workflow/common/utils/assert-workflow-version-trigger-is-defined.util'; -import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-version/workflow-step/workflow-version-step.workspace-service'; +import { WorkflowVersionStepWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; @Injectable() diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index a44d8c74c..75ae7eb2c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -5,8 +5,10 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { ThrottlerService } from 'src/engine/core-modules/throttler/throttler.service'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; import { WorkflowRunException, @@ -16,9 +18,9 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne export type RunWorkflowJobData = { workspaceId: string; - workflowVersionId: string; workflowRunId: string; - payload: object; + payload?: object; + lastExecutedStepId?: string; }; @Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) @@ -29,61 +31,27 @@ export class RunWorkflowJob { private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, private readonly throttlerService: ThrottlerService, private readonly environmentService: EnvironmentService, + private readonly twentyORMManager: TwentyORMManager, ) {} @Process(RunWorkflowJob.name) async handle({ - workflowVersionId, workflowRunId, payload, + lastExecutedStepId, }: RunWorkflowJobData): Promise { - const context = { - trigger: payload, - }; - try { - const workflowVersion = - await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail( - workflowVersionId, - ); - - if (!workflowVersion.trigger || !workflowVersion.steps) { - throw new WorkflowRunException( - 'Workflow version has no trigger or steps', - WorkflowRunExceptionCode.WORKFLOW_RUN_INVALID, - ); - } - - await this.workflowRunWorkspaceService.startWorkflowRun({ - workflowRunId, - context, - output: { - flow: { - trigger: workflowVersion.trigger, - steps: workflowVersion.steps, - }, - }, - }); - - await this.throttleExecution(workflowVersion.workflowId); - - const { error, pendingEvent } = - await this.workflowExecutorWorkspaceService.execute({ + if (lastExecutedStepId) { + await this.resumeWorkflowExecution({ workflowRunId, - currentStepIndex: 0, - steps: workflowVersion.steps, - context, + lastExecutedStepId, + }); + } else { + await this.startWorkflowExecution({ + workflowRunId, + payload: payload ?? {}, }); - - if (pendingEvent) { - return; } - - await this.workflowRunWorkspaceService.endWorkflowRun({ - workflowRunId, - status: error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED, - error, - }); } catch (error) { await this.workflowRunWorkspaceService.endWorkflowRun({ workflowRunId, @@ -93,6 +61,123 @@ export class RunWorkflowJob { } } + private async startWorkflowExecution({ + workflowRunId, + payload, + }: { + workflowRunId: string; + payload: object; + }): Promise { + const context = { + trigger: payload, + }; + + const workflowRun = + await this.workflowRunWorkspaceService.getWorkflowRunOrFail( + workflowRunId, + ); + + const workflowVersion = + await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail( + workflowRun.workflowVersionId, + ); + + if (!workflowVersion.trigger || !workflowVersion.steps) { + throw new WorkflowRunException( + 'Workflow version has no trigger or steps', + WorkflowRunExceptionCode.WORKFLOW_RUN_INVALID, + ); + } + + await this.workflowRunWorkspaceService.startWorkflowRun({ + workflowRunId, + context, + output: { + flow: { + trigger: workflowVersion.trigger, + steps: workflowVersion.steps, + }, + }, + }); + + await this.throttleExecution(workflowVersion.workflowId); + + await this.executeWorkflow({ + workflowRunId, + currentStepIndex: 0, + steps: workflowVersion.steps, + context, + }); + } + + private async resumeWorkflowExecution({ + workflowRunId, + lastExecutedStepId, + }: { + workflowRunId: string; + lastExecutedStepId: string; + }): Promise { + const workflowRun = + await this.workflowRunWorkspaceService.getWorkflowRunOrFail( + workflowRunId, + ); + + if (workflowRun.status !== WorkflowRunStatus.RUNNING) { + throw new WorkflowRunException( + 'Workflow is not running', + WorkflowRunExceptionCode.WORKFLOW_RUN_INVALID, + ); + } + + const lastExecutedStepIndex = workflowRun.output?.flow?.steps?.findIndex( + (step) => step.id === lastExecutedStepId, + ); + + if (lastExecutedStepIndex === undefined) { + throw new WorkflowRunException( + 'Last executed step not found', + WorkflowRunExceptionCode.INVALID_INPUT, + ); + } + + await this.executeWorkflow({ + workflowRunId, + currentStepIndex: lastExecutedStepIndex + 1, + steps: workflowRun.output?.flow?.steps ?? [], + context: workflowRun.context ?? {}, + }); + } + + private async executeWorkflow({ + workflowRunId, + currentStepIndex, + steps, + context, + }: { + workflowRunId: string; + currentStepIndex: number; + steps: WorkflowAction[]; + context: Record; + }) { + const { error, pendingEvent } = + await this.workflowExecutorWorkspaceService.execute({ + workflowRunId, + currentStepIndex, + steps, + context, + }); + + if (pendingEvent) { + return; + } + + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + status: error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED, + error, + }); + } + private async throttleExecution(workflowId: string) { try { await this.throttlerService.throttle( diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index ce6103516..8ddb94086 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -163,4 +163,26 @@ export class WorkflowRunWorkspaceService { context, }); } + + async getWorkflowRunOrFail( + workflowRunId: string, + ): Promise { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + const workflowRun = await workflowRunRepository.findOne({ + where: { id: workflowRunId }, + }); + + if (!workflowRun) { + throw new WorkflowRunException( + 'Workflow run not found', + WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + return workflowRun; + } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts index db7d3f484..971ad2156 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts @@ -45,7 +45,6 @@ export class WorkflowRunnerWorkspaceService { RunWorkflowJob.name, { workspaceId, - workflowVersionId, payload: payload, workflowRunId, }, @@ -53,4 +52,23 @@ export class WorkflowRunnerWorkspaceService { return { workflowRunId }; } + + async resume({ + workspaceId, + workflowRunId, + lastExecutedStepId, + }: { + workspaceId: string; + workflowRunId: string; + lastExecutedStepId: string; + }) { + await this.messageQueueService.add( + RunWorkflowJob.name, + { + workspaceId, + workflowRunId, + lastExecutedStepId, + }, + ); + } }