From e70e69cf9482692f658d67f259b601c8e5ffec07 Mon Sep 17 00:00:00 2001 From: Thomas Trompette Date: Mon, 10 Feb 2025 11:27:15 +0100 Subject: [PATCH] Store output step by step (#10101) - add context field - store it with the output after each step execution --- .../constants/standard-field-ids.ts | 1 + .../workflow-run.workspace-entity.ts | 10 +++ .../workflow-executor.module.ts | 2 + .../workflow-executor.workspace-service.ts | 74 ++++++++++++++----- .../workflow-runner/jobs/run-workflow.job.ts | 51 +++++++------ .../workflow-run/workflow-run.module.ts | 11 +++ .../workflow-run.workspace-service.ts | 67 +++++++++++++++-- .../workflow-runner/workflow-runner.module.ts | 9 +-- .../workflow-runner.workspace-service.ts | 8 +- 9 files changed, 171 insertions(+), 62 deletions(-) create mode 100644 packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts rename packages/twenty-server/src/modules/workflow/workflow-runner/{workspace-services => workflow-run}/workflow-run.workspace-service.ts (71%) diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index 6cbff8dfe..d66b58726 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -479,6 +479,7 @@ export const WORKFLOW_RUN_STANDARD_FIELD_IDS = { position: '20202020-7802-4c40-ae89-1f506fe3365c', createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3', output: '20202020-7be4-4db2-8ac6-3ff0d740843d', + context: '20202020-189c-478a-b867-d72feaf5926a', favorites: '20202020-4baf-4604-b899-2f7fcfbbf90d', timelineActivities: '20202020-af4d-4eb0-babc-eb960a45b356', }; diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 539a1b76c..f426e485b 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -140,6 +140,16 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { @WorkspaceIsNullable() output: WorkflowRunOutput | null; + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.context, + type: FieldMetadataType.RAW_JSON, + label: msg`Context`, + description: msg`Context`, + icon: 'IconHierarchy2', + }) + @WorkspaceIsNullable() + context: Record | null; + @WorkspaceField({ standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.position, type: FieldMetadataType.POSITION, diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts index 08dea7f33..e85d73862 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts @@ -7,6 +7,7 @@ import { CodeActionModule } from 'src/modules/workflow/workflow-executor/workflo import { SendEmailActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email-action.module'; import { RecordCRUDActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; +import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.module'; @Module({ imports: [ @@ -14,6 +15,7 @@ import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow- CodeActionModule, SendEmailActionModule, RecordCRUDActionModule, + WorkflowRunModule, ], providers: [ WorkflowExecutorWorkspaceService, diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index 7481b5abd..0a12fbc65 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -13,6 +13,7 @@ import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/fa import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; const MAX_RETRIES_ON_FAILURE = 3; @@ -28,23 +29,26 @@ export class WorkflowExecutorWorkspaceService { private readonly workflowActionFactory: WorkflowActionFactory, private readonly workspaceEventEmitter: WorkspaceEventEmitter, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, + private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, ) {} async execute({ currentStepIndex, steps, context, - output, + workflowExecutorOutput, attemptCount = 1, + workflowRunId, }: { currentStepIndex: number; steps: WorkflowAction[]; - output: WorkflowExecutorOutput; + workflowExecutorOutput: WorkflowExecutorOutput; context: Record; attemptCount?: number; + workflowRunId: string; }): Promise { if (currentStepIndex >= steps.length) { - return { ...output, status: WorkflowRunStatus.COMPLETED }; + return { ...workflowExecutorOutput, status: WorkflowRunStatus.COMPLETED }; } const step = steps[currentStepIndex]; @@ -67,7 +71,7 @@ export class WorkflowExecutorWorkspaceService { }; } - const stepOutput = output.steps[step.id]; + const stepOutput = workflowExecutorOutput.steps[step.id]; const error = result.error?.errorMessage ?? @@ -91,32 +95,54 @@ export class WorkflowExecutorWorkspaceService { ], }; - const updatedOutput = { - ...output, - steps: { - ...output.steps, - [step.id]: updatedStepOutput, - }, + const updatedOutputSteps = { + ...workflowExecutorOutput.steps, + [step.id]: updatedStepOutput, + }; + + const updatedWorkflowExecutorOutput = { + ...workflowExecutorOutput, + steps: updatedOutputSteps, }; if (result.result) { + const updatedContext = { + ...context, + [step.id]: result.result, + }; + + await this.workflowRunWorkspaceService.saveWorkflowRunState({ + workflowRunId, + output: { + steps: updatedOutputSteps, + }, + context: updatedContext, + }); + return await this.execute({ + workflowRunId, currentStepIndex: currentStepIndex + 1, steps, - context: { - ...context, - [step.id]: result.result, - }, - output: updatedOutput, + context: updatedContext, + workflowExecutorOutput: updatedWorkflowExecutorOutput, }); } if (step.settings.errorHandlingOptions.continueOnFailure.value) { + await this.workflowRunWorkspaceService.saveWorkflowRunState({ + workflowRunId, + output: { + steps: updatedOutputSteps, + }, + context, + }); + return await this.execute({ + workflowRunId, currentStepIndex: currentStepIndex + 1, steps, context, - output: updatedOutput, + workflowExecutorOutput: updatedWorkflowExecutorOutput, }); } @@ -125,15 +151,27 @@ export class WorkflowExecutorWorkspaceService { attemptCount < MAX_RETRIES_ON_FAILURE ) { return await this.execute({ + workflowRunId, currentStepIndex, steps, context, - output: updatedOutput, + workflowExecutorOutput: updatedWorkflowExecutorOutput, attemptCount: attemptCount + 1, }); } - return { ...updatedOutput, status: WorkflowRunStatus.FAILED }; + await this.workflowRunWorkspaceService.saveWorkflowRunState({ + workflowRunId, + output: { + steps: updatedOutputSteps, + }, + context, + }); + + return { + ...updatedWorkflowExecutorOutput, + status: WorkflowRunStatus.FAILED, + }; } private sendWorkflowNodeRunEvent() { diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index cb8fd1f52..b9b9a0ab0 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -12,7 +12,7 @@ import { WorkflowRunException, WorkflowRunExceptionCode, } from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception'; -import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; +import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; export type RunWorkflowJobData = { workspaceId: string; @@ -38,7 +38,14 @@ export class RunWorkflowJob { workflowRunId, payload, }: RunWorkflowJobData): Promise { - await this.workflowRunWorkspaceService.startWorkflowRun(workflowRunId); + const context = { + trigger: payload, + }; + + await this.workflowRunWorkspaceService.startWorkflowRun({ + workflowRunId, + context, + }); try { const workflowVersion = @@ -48,35 +55,27 @@ export class RunWorkflowJob { await this.throttleExecution(workflowVersion.workflowId); - const { steps, status } = - await this.workflowExecutorWorkspaceService.execute({ - currentStepIndex: 0, - steps: workflowVersion.steps || [], - context: { - trigger: payload, - }, - output: { - steps: {}, - status: WorkflowRunStatus.RUNNING, - }, - }); + const { status } = await this.workflowExecutorWorkspaceService.execute({ + workflowRunId, + currentStepIndex: 0, + steps: workflowVersion.steps || [], + context, + workflowExecutorOutput: { + steps: {}, + status: WorkflowRunStatus.RUNNING, + }, + }); - await this.workflowRunWorkspaceService.endWorkflowRun( + await this.workflowRunWorkspaceService.endWorkflowRun({ workflowRunId, status, - { - steps, - }, - ); + }); } catch (error) { - await this.workflowRunWorkspaceService.endWorkflowRun( + await this.workflowRunWorkspaceService.endWorkflowRun({ workflowRunId, - WorkflowRunStatus.FAILED, - { - steps: {}, - error: error.message, - }, - ); + status: WorkflowRunStatus.FAILED, + error: error.message, + }); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts new file mode 100644 index 000000000..55a987bc9 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; + +import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; +import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; + +@Module({ + imports: [WorkflowCommonModule], + providers: [WorkflowRunWorkspaceService], + exports: [WorkflowRunWorkspaceService], +}) +export class WorkflowRunModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts similarity index 71% rename from packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service.ts rename to packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 388c9aba7..93d09b67a 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -20,7 +20,13 @@ export class WorkflowRunWorkspaceService { private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService, ) {} - async createWorkflowRun(workflowVersionId: string, createdBy: ActorMetadata) { + async createWorkflowRun({ + workflowVersionId, + createdBy, + }: { + workflowVersionId: string; + createdBy: ActorMetadata; + }) { const workflowRunRepository = await this.twentyORMManager.getRepository( 'workflowRun', @@ -42,7 +48,13 @@ export class WorkflowRunWorkspaceService { ).id; } - async startWorkflowRun(workflowRunId: string) { + async startWorkflowRun({ + workflowRunId, + context, + }: { + workflowRunId: string; + context: Record; + }) { const workflowRunRepository = await this.twentyORMManager.getRepository( 'workflowRun', @@ -69,14 +81,19 @@ export class WorkflowRunWorkspaceService { return workflowRunRepository.update(workflowRunToUpdate.id, { status: WorkflowRunStatus.RUNNING, startedAt: new Date().toISOString(), + context, }); } - async endWorkflowRun( - workflowRunId: string, - status: WorkflowRunStatus, - output: WorkflowRunOutput, - ) { + async endWorkflowRun({ + workflowRunId, + status, + error, + }: { + workflowRunId: string; + status: WorkflowRunStatus; + error?: string; + }) { const workflowRunRepository = await this.twentyORMManager.getRepository( 'workflowRun', @@ -102,8 +119,42 @@ export class WorkflowRunWorkspaceService { return workflowRunRepository.update(workflowRunToUpdate.id, { status, - output, endedAt: new Date().toISOString(), + output: { + ...(workflowRunToUpdate.output ?? {}), + error, + }, + }); + } + + async saveWorkflowRunState({ + workflowRunId, + output, + context, + }: { + workflowRunId: string; + output: WorkflowRunOutput; + context: Record; + }) { + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + const workflowRunToUpdate = await workflowRunRepository.findOneBy({ + id: workflowRunId, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowRunException( + 'No workflow run to save', + WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + return workflowRunRepository.update(workflowRunId, { + output, + context, }); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts index 5776d893c..bb83a12d1 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts @@ -5,7 +5,7 @@ import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.mod import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module'; import { RunWorkflowJob } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; -import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; +import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.module'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; @Module({ @@ -14,12 +14,9 @@ import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-ru WorkflowExecutorModule, ThrottlerModule, BillingModule, + WorkflowRunModule, ], - providers: [ - WorkflowRunnerWorkspaceService, - WorkflowRunWorkspaceService, - RunWorkflowJob, - ], + providers: [WorkflowRunnerWorkspaceService, RunWorkflowJob], exports: [WorkflowRunnerWorkspaceService], }) export class WorkflowRunnerModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts index 3a4af236e..db7d3f484 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts @@ -9,7 +9,7 @@ import { RunWorkflowJob, RunWorkflowJobData, } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; -import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; +import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; @Injectable() export class WorkflowRunnerWorkspaceService { @@ -36,10 +36,10 @@ export class WorkflowRunnerWorkspaceService { ); } const workflowRunId = - await this.workflowRunWorkspaceService.createWorkflowRun( + await this.workflowRunWorkspaceService.createWorkflowRun({ workflowVersionId, - source, - ); + createdBy: source, + }); await this.messageQueueService.add( RunWorkflowJob.name,