Fix update context and stepOutput when step running (#13030)
add a function to only set stepStatus, and not context or output. Useful when setting step state to RUNNING
This commit is contained in:
@ -39,6 +39,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
|
||||
const mockWorkflowRunWorkspaceService = {
|
||||
saveWorkflowRunState: jest.fn(),
|
||||
updateWorkflowRunStepStatus: jest.fn(),
|
||||
};
|
||||
|
||||
const mockBillingService = {
|
||||
@ -171,26 +172,22 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
'workspace-id',
|
||||
);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(4);
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
output: {},
|
||||
},
|
||||
context: {
|
||||
data: 'some-data',
|
||||
},
|
||||
stepId: 'step-1',
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenNthCalledWith(2, {
|
||||
).toHaveBeenCalledTimes(2);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
@ -231,23 +228,22 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
});
|
||||
expect(workspaceEventEmitter.emitCustomBatchEvent).not.toHaveBeenCalled();
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
output: {},
|
||||
},
|
||||
context: mockContext,
|
||||
stepId: 'step-1',
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenNthCalledWith(2, {
|
||||
).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
@ -277,23 +273,22 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
|
||||
expect(result).toEqual(mockPendingEvent);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
output: {},
|
||||
},
|
||||
context: mockContext,
|
||||
stepId: 'step-1',
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenNthCalledWith(2, {
|
||||
).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
@ -347,26 +342,25 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
});
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(4);
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
|
||||
// execute first step
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
output: {},
|
||||
},
|
||||
context: mockContext,
|
||||
stepId: 'step-1',
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenNthCalledWith(2, {
|
||||
).toHaveBeenCalledTimes(2);
|
||||
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
@ -452,23 +446,22 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
// Should not retry anymore
|
||||
expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(2);
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
output: {},
|
||||
},
|
||||
context: mockContext,
|
||||
stepId: 'step-1',
|
||||
workspaceId: 'workspace-id',
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenNthCalledWith(2, {
|
||||
).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
workflowRunId: mockWorkflowRunId,
|
||||
stepOutput: {
|
||||
id: 'step-1',
|
||||
@ -496,6 +489,9 @@ describe('WorkflowExecutorWorkspaceService', () => {
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledTimes(1);
|
||||
expect(
|
||||
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
|
||||
).not.toHaveBeenCalled();
|
||||
expect(
|
||||
workflowRunWorkspaceService.saveWorkflowRunState,
|
||||
).toHaveBeenCalledWith({
|
||||
|
||||
@ -94,13 +94,9 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor {
|
||||
return billingOutput;
|
||||
}
|
||||
|
||||
await this.workflowRunWorkspaceService.saveWorkflowRunState({
|
||||
await this.workflowRunWorkspaceService.updateWorkflowRunStepStatus({
|
||||
workflowRunId,
|
||||
stepOutput: {
|
||||
id: step.id,
|
||||
output: {},
|
||||
},
|
||||
context,
|
||||
stepId: step.id,
|
||||
workspaceId,
|
||||
stepStatus: StepStatus.RUNNING,
|
||||
});
|
||||
|
||||
@ -255,6 +255,56 @@ export class WorkflowRunWorkspaceService {
|
||||
});
|
||||
}
|
||||
|
||||
async updateWorkflowRunStepStatus({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
stepId,
|
||||
stepStatus,
|
||||
}: {
|
||||
workflowRunId: string;
|
||||
stepId: string;
|
||||
workspaceId: string;
|
||||
stepStatus: StepStatus;
|
||||
}) {
|
||||
const workflowRunRepository =
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'workflowRun',
|
||||
{ shouldBypassPermissionChecks: true },
|
||||
);
|
||||
|
||||
const workflowRunToUpdate = await workflowRunRepository.findOneBy({
|
||||
id: workflowRunId,
|
||||
});
|
||||
|
||||
if (!workflowRunToUpdate) {
|
||||
throw new WorkflowRunException(
|
||||
'No workflow run to save',
|
||||
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
|
||||
);
|
||||
}
|
||||
|
||||
const partialUpdate = {
|
||||
state: {
|
||||
...workflowRunToUpdate.state,
|
||||
stepInfos: {
|
||||
...workflowRunToUpdate.state?.stepInfos,
|
||||
[stepId]: {
|
||||
...(workflowRunToUpdate.state?.stepInfos?.[stepId] || {}),
|
||||
status: stepStatus,
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
await workflowRunRepository.update(workflowRunId, partialUpdate);
|
||||
|
||||
await this.emitWorkflowRunUpdatedEvent({
|
||||
workflowRunBefore: workflowRunToUpdate,
|
||||
updatedFields: ['state'],
|
||||
});
|
||||
}
|
||||
|
||||
async saveWorkflowRunState({
|
||||
workflowRunId,
|
||||
stepOutput,
|
||||
|
||||
Reference in New Issue
Block a user