From 92576aec0f9edc7cd1ab527c08b27386aaf660ce Mon Sep 17 00:00:00 2001 From: martmull Date: Fri, 4 Jul 2025 10:09:54 +0200 Subject: [PATCH] Fix update context and stepOutput when step running (#13030) add a function to only set stepStatus, and not context or output. Useful when setting step state to RUNNING --- ...orkflow-executor.workspace-service.spec.ts | 94 +++++++++---------- .../workflow-executor.workspace-service.ts | 8 +- .../workflow-run.workspace-service.ts | 50 ++++++++++ 3 files changed, 97 insertions(+), 55 deletions(-) diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts index 77860c82a..f38520e67 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts @@ -39,6 +39,7 @@ describe('WorkflowExecutorWorkspaceService', () => { const mockWorkflowRunWorkspaceService = { saveWorkflowRunState: jest.fn(), + updateWorkflowRunStepStatus: jest.fn(), }; const mockBillingService = { @@ -171,26 +172,22 @@ describe('WorkflowExecutorWorkspaceService', () => { 'workspace-id', ); expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(4); + workflowRunWorkspaceService.updateWorkflowRunStepStatus, + ).toHaveBeenCalledTimes(2); expect( - workflowRunWorkspaceService.saveWorkflowRunState, + workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: {}, - }, - context: { - data: 'some-data', - }, + stepId: 'step-1', workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); - expect( workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenNthCalledWith(2, { + ).toHaveBeenCalledTimes(2); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, stepOutput: { id: 'step-1', @@ -231,23 +228,22 @@ describe('WorkflowExecutorWorkspaceService', () => { }); expect(workspaceEventEmitter.emitCustomBatchEvent).not.toHaveBeenCalled(); expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(2); + workflowRunWorkspaceService.updateWorkflowRunStepStatus, + ).toHaveBeenCalledTimes(1); expect( - workflowRunWorkspaceService.saveWorkflowRunState, + workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: {}, - }, - context: mockContext, + stepId: 'step-1', workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); expect( workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenNthCalledWith(2, { + ).toHaveBeenCalledTimes(1); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, stepOutput: { id: 'step-1', @@ -277,23 +273,22 @@ describe('WorkflowExecutorWorkspaceService', () => { expect(result).toEqual(mockPendingEvent); expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(2); + workflowRunWorkspaceService.updateWorkflowRunStepStatus, + ).toHaveBeenCalledTimes(1); expect( - workflowRunWorkspaceService.saveWorkflowRunState, + workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: {}, - }, - context: mockContext, + stepId: 'step-1', workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); expect( workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenNthCalledWith(2, { + ).toHaveBeenCalledTimes(1); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, stepOutput: { id: 'step-1', @@ -347,26 +342,25 @@ describe('WorkflowExecutorWorkspaceService', () => { }); expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(4); + workflowRunWorkspaceService.updateWorkflowRunStepStatus, + ).toHaveBeenCalledTimes(2); - // execute first step expect( - workflowRunWorkspaceService.saveWorkflowRunState, + workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: {}, - }, - context: mockContext, + stepId: 'step-1', workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); expect( workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenNthCalledWith(2, { + ).toHaveBeenCalledTimes(2); + + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, stepOutput: { id: 'step-1', @@ -452,23 +446,22 @@ describe('WorkflowExecutorWorkspaceService', () => { // Should not retry anymore expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(1); expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(2); + workflowRunWorkspaceService.updateWorkflowRunStepStatus, + ).toHaveBeenCalledTimes(1); expect( - workflowRunWorkspaceService.saveWorkflowRunState, + workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: {}, - }, - context: mockContext, + stepId: 'step-1', workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); expect( workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenNthCalledWith(2, { + ).toHaveBeenCalledTimes(1); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, stepOutput: { id: 'step-1', @@ -496,6 +489,9 @@ describe('WorkflowExecutorWorkspaceService', () => { expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledTimes(1); + expect( + workflowRunWorkspaceService.updateWorkflowRunStepStatus, + ).not.toHaveBeenCalled(); expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index 96f555633..5eeaaae97 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -94,13 +94,9 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { return billingOutput; } - await this.workflowRunWorkspaceService.saveWorkflowRunState({ + await this.workflowRunWorkspaceService.updateWorkflowRunStepStatus({ workflowRunId, - stepOutput: { - id: step.id, - output: {}, - }, - context, + stepId: step.id, workspaceId, stepStatus: StepStatus.RUNNING, }); diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index fe5f363d5..775618827 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -255,6 +255,56 @@ export class WorkflowRunWorkspaceService { }); } + async updateWorkflowRunStepStatus({ + workflowRunId, + workspaceId, + stepId, + stepStatus, + }: { + workflowRunId: string; + stepId: string; + workspaceId: string; + stepStatus: StepStatus; + }) { + const workflowRunRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowRun', + { shouldBypassPermissionChecks: true }, + ); + + const workflowRunToUpdate = await workflowRunRepository.findOneBy({ + id: workflowRunId, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowRunException( + 'No workflow run to save', + WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + const partialUpdate = { + state: { + ...workflowRunToUpdate.state, + stepInfos: { + ...workflowRunToUpdate.state?.stepInfos, + [stepId]: { + ...(workflowRunToUpdate.state?.stepInfos?.[stepId] || {}), + status: stepStatus, + }, + }, + }, + }; + + await workflowRunRepository.update(workflowRunId, partialUpdate); + + await this.emitWorkflowRunUpdatedEvent({ + workflowRunBefore: workflowRunToUpdate, + updatedFields: ['state'], + }); + } + async saveWorkflowRunState({ workflowRunId, stepOutput,