Add flow to run output (#10220)

We need the version trigger and steps to be stored in the output. We
should not rely on the version itself because some run are made on draft
versions. Which means versions could be edited afterwards.
This commit is contained in:
Thomas Trompette
2025-02-14 15:20:27 +01:00
committed by GitHub
parent 5fe101545d
commit 017280384a
5 changed files with 61 additions and 38 deletions

View File

@ -22,6 +22,8 @@ import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/f
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type';
export enum WorkflowRunStatus {
NOT_STARTED = 'NOT_STARTED',
@ -32,8 +34,6 @@ export enum WorkflowRunStatus {
type StepRunOutput = {
id: string;
name: string;
type: string;
outputs: {
attemptCount: number;
result: object | undefined;
@ -42,7 +42,11 @@ type StepRunOutput = {
};
export type WorkflowRunOutput = {
steps: Record<string, StepRunOutput>;
flow: {
trigger: WorkflowTrigger;
steps: WorkflowAction[];
};
stepsOutput: Record<string, StepRunOutput>;
error?: string;
};

View File

@ -1,4 +1,4 @@
import { Injectable, Logger } from '@nestjs/common';
import { Injectable } from '@nestjs/common';
import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant';
import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
@ -17,14 +17,13 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne
const MAX_RETRIES_ON_FAILURE = 3;
export type WorkflowExecutorOutput = {
steps: WorkflowRunOutput['steps'];
export type WorkflowExecutorState = {
stepsOutput: WorkflowRunOutput['stepsOutput'];
status: WorkflowRunStatus;
};
@Injectable()
export class WorkflowExecutorWorkspaceService {
private readonly logger = new Logger(WorkflowExecutorWorkspaceService.name);
constructor(
private readonly workflowActionFactory: WorkflowActionFactory,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
@ -36,19 +35,19 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex,
steps,
context,
workflowExecutorOutput,
workflowExecutorState,
attemptCount = 1,
workflowRunId,
}: {
currentStepIndex: number;
steps: WorkflowAction[];
workflowExecutorOutput: WorkflowExecutorOutput;
workflowExecutorState: WorkflowExecutorState;
context: Record<string, unknown>;
attemptCount?: number;
workflowRunId: string;
}): Promise<WorkflowExecutorOutput> {
}): Promise<WorkflowExecutorState> {
if (currentStepIndex >= steps.length) {
return { ...workflowExecutorOutput, status: WorkflowRunStatus.COMPLETED };
return { ...workflowExecutorState, status: WorkflowRunStatus.COMPLETED };
}
const step = steps[currentStepIndex];
@ -71,7 +70,7 @@ export class WorkflowExecutorWorkspaceService {
};
}
const stepOutput = workflowExecutorOutput.steps[step.id];
const stepOutput = workflowExecutorState.stepsOutput[step.id];
const error =
result.error?.errorMessage ??
@ -83,8 +82,6 @@ export class WorkflowExecutorWorkspaceService {
const updatedStepOutput = {
id: step.id,
name: step.name,
type: step.type,
outputs: [
...(stepOutput?.outputs ?? []),
{
@ -95,14 +92,14 @@ export class WorkflowExecutorWorkspaceService {
],
};
const updatedOutputSteps = {
...workflowExecutorOutput.steps,
const updatedStepsOutput = {
...workflowExecutorState.stepsOutput,
[step.id]: updatedStepOutput,
};
const updatedWorkflowExecutorOutput = {
...workflowExecutorOutput,
steps: updatedOutputSteps,
const updatedWorkflowExecutorState = {
...workflowExecutorState,
stepsOutput: updatedStepsOutput,
};
if (result.result) {
@ -114,7 +111,7 @@ export class WorkflowExecutorWorkspaceService {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
output: {
steps: updatedOutputSteps,
stepsOutput: updatedStepsOutput,
},
context: updatedContext,
});
@ -124,7 +121,7 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex: currentStepIndex + 1,
steps,
context: updatedContext,
workflowExecutorOutput: updatedWorkflowExecutorOutput,
workflowExecutorState: updatedWorkflowExecutorState,
});
}
@ -132,7 +129,7 @@ export class WorkflowExecutorWorkspaceService {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
output: {
steps: updatedOutputSteps,
stepsOutput: updatedStepsOutput,
},
context,
});
@ -142,7 +139,7 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex: currentStepIndex + 1,
steps,
context,
workflowExecutorOutput: updatedWorkflowExecutorOutput,
workflowExecutorState: updatedWorkflowExecutorState,
});
}
@ -155,7 +152,7 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex,
steps,
context,
workflowExecutorOutput: updatedWorkflowExecutorOutput,
workflowExecutorState: updatedWorkflowExecutorState,
attemptCount: attemptCount + 1,
});
}
@ -163,13 +160,13 @@ export class WorkflowExecutorWorkspaceService {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
output: {
steps: updatedOutputSteps,
stepsOutput: updatedStepsOutput,
},
context,
});
return {
...updatedWorkflowExecutorOutput,
...updatedWorkflowExecutorState,
status: WorkflowRunStatus.FAILED,
};
}

View File

@ -11,4 +11,5 @@ export enum WorkflowRunExceptionCode {
INVALID_OPERATION = 'INVALID_OPERATION',
INVALID_INPUT = 'INVALID_INPUT',
WORKFLOW_RUN_LIMIT_REACHED = 'WORKFLOW_RUN_LIMIT_REACHED',
WORKFLOW_RUN_INVALID = 'WORKFLOW_RUN_INVALID',
}

View File

@ -1,4 +1,4 @@
import { Logger, Scope } from '@nestjs/common';
import { Scope } from '@nestjs/common';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
@ -23,7 +23,6 @@ export type RunWorkflowJobData = {
@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
export class RunWorkflowJob {
private readonly logger = new Logger(RunWorkflowJob.name);
constructor(
private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
private readonly workflowExecutorWorkspaceService: WorkflowExecutorWorkspaceService,
@ -42,26 +41,39 @@ export class RunWorkflowJob {
trigger: payload,
};
await this.workflowRunWorkspaceService.startWorkflowRun({
workflowRunId,
context,
});
try {
const workflowVersion =
await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail(
workflowVersionId,
);
if (!workflowVersion.trigger || !workflowVersion.steps) {
throw new WorkflowRunException(
'Workflow version has no trigger or steps',
WorkflowRunExceptionCode.WORKFLOW_RUN_INVALID,
);
}
await this.workflowRunWorkspaceService.startWorkflowRun({
workflowRunId,
context,
output: {
flow: {
trigger: workflowVersion.trigger,
steps: workflowVersion.steps,
},
},
});
await this.throttleExecution(workflowVersion.workflowId);
const { status } = await this.workflowExecutorWorkspaceService.execute({
workflowRunId,
currentStepIndex: 0,
steps: workflowVersion.steps || [],
steps: workflowVersion.steps ?? [],
context,
workflowExecutorOutput: {
steps: {},
workflowExecutorState: {
stepsOutput: {},
status: WorkflowRunStatus.RUNNING,
},
});

View File

@ -51,9 +51,11 @@ export class WorkflowRunWorkspaceService {
async startWorkflowRun({
workflowRunId,
context,
output,
}: {
workflowRunId: string;
context: Record<string, any>;
output: Pick<WorkflowRunOutput, 'flow'>;
}) {
const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
@ -82,6 +84,7 @@ export class WorkflowRunWorkspaceService {
status: WorkflowRunStatus.RUNNING,
startedAt: new Date().toISOString(),
context,
output,
});
}
@ -133,7 +136,7 @@ export class WorkflowRunWorkspaceService {
context,
}: {
workflowRunId: string;
output: WorkflowRunOutput;
output: Pick<WorkflowRunOutput, 'error' | 'stepsOutput'>;
context: Record<string, any>;
}) {
const workflowRunRepository =
@ -153,7 +156,13 @@ export class WorkflowRunWorkspaceService {
}
return workflowRunRepository.update(workflowRunId, {
output,
output: {
flow: workflowRunToUpdate.output?.flow ?? {
trigger: undefined,
steps: [],
},
...output,
},
context,
});
}