Store output step by step (#10101)

- add context field
- store it with the output after each step execution
This commit is contained in:
Thomas Trompette
2025-02-10 11:27:15 +01:00
committed by GitHub
parent c908a687fb
commit e70e69cf94
9 changed files with 171 additions and 62 deletions

View File

@ -479,6 +479,7 @@ export const WORKFLOW_RUN_STANDARD_FIELD_IDS = {
position: '20202020-7802-4c40-ae89-1f506fe3365c', position: '20202020-7802-4c40-ae89-1f506fe3365c',
createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3', createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3',
output: '20202020-7be4-4db2-8ac6-3ff0d740843d', output: '20202020-7be4-4db2-8ac6-3ff0d740843d',
context: '20202020-189c-478a-b867-d72feaf5926a',
favorites: '20202020-4baf-4604-b899-2f7fcfbbf90d', favorites: '20202020-4baf-4604-b899-2f7fcfbbf90d',
timelineActivities: '20202020-af4d-4eb0-babc-eb960a45b356', timelineActivities: '20202020-af4d-4eb0-babc-eb960a45b356',
}; };

View File

@ -140,6 +140,16 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity {
@WorkspaceIsNullable() @WorkspaceIsNullable()
output: WorkflowRunOutput | null; output: WorkflowRunOutput | null;
@WorkspaceField({
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.context,
type: FieldMetadataType.RAW_JSON,
label: msg`Context`,
description: msg`Context`,
icon: 'IconHierarchy2',
})
@WorkspaceIsNullable()
context: Record<string, any> | null;
@WorkspaceField({ @WorkspaceField({
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.position, standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.position,
type: FieldMetadataType.POSITION, type: FieldMetadataType.POSITION,

View File

@ -7,6 +7,7 @@ import { CodeActionModule } from 'src/modules/workflow/workflow-executor/workflo
import { SendEmailActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email-action.module'; import { SendEmailActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email-action.module';
import { RecordCRUDActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module'; import { RecordCRUDActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module';
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service';
import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.module';
@Module({ @Module({
imports: [ imports: [
@ -14,6 +15,7 @@ import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-
CodeActionModule, CodeActionModule,
SendEmailActionModule, SendEmailActionModule,
RecordCRUDActionModule, RecordCRUDActionModule,
WorkflowRunModule,
], ],
providers: [ providers: [
WorkflowExecutorWorkspaceService, WorkflowExecutorWorkspaceService,

View File

@ -13,6 +13,7 @@ import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/fa
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
const MAX_RETRIES_ON_FAILURE = 3; const MAX_RETRIES_ON_FAILURE = 3;
@ -28,23 +29,26 @@ export class WorkflowExecutorWorkspaceService {
private readonly workflowActionFactory: WorkflowActionFactory, private readonly workflowActionFactory: WorkflowActionFactory,
private readonly workspaceEventEmitter: WorkspaceEventEmitter, private readonly workspaceEventEmitter: WorkspaceEventEmitter,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
) {} ) {}
async execute({ async execute({
currentStepIndex, currentStepIndex,
steps, steps,
context, context,
output, workflowExecutorOutput,
attemptCount = 1, attemptCount = 1,
workflowRunId,
}: { }: {
currentStepIndex: number; currentStepIndex: number;
steps: WorkflowAction[]; steps: WorkflowAction[];
output: WorkflowExecutorOutput; workflowExecutorOutput: WorkflowExecutorOutput;
context: Record<string, unknown>; context: Record<string, unknown>;
attemptCount?: number; attemptCount?: number;
workflowRunId: string;
}): Promise<WorkflowExecutorOutput> { }): Promise<WorkflowExecutorOutput> {
if (currentStepIndex >= steps.length) { if (currentStepIndex >= steps.length) {
return { ...output, status: WorkflowRunStatus.COMPLETED }; return { ...workflowExecutorOutput, status: WorkflowRunStatus.COMPLETED };
} }
const step = steps[currentStepIndex]; const step = steps[currentStepIndex];
@ -67,7 +71,7 @@ export class WorkflowExecutorWorkspaceService {
}; };
} }
const stepOutput = output.steps[step.id]; const stepOutput = workflowExecutorOutput.steps[step.id];
const error = const error =
result.error?.errorMessage ?? result.error?.errorMessage ??
@ -91,32 +95,54 @@ export class WorkflowExecutorWorkspaceService {
], ],
}; };
const updatedOutput = { const updatedOutputSteps = {
...output, ...workflowExecutorOutput.steps,
steps: { [step.id]: updatedStepOutput,
...output.steps, };
[step.id]: updatedStepOutput,
}, const updatedWorkflowExecutorOutput = {
...workflowExecutorOutput,
steps: updatedOutputSteps,
}; };
if (result.result) { if (result.result) {
const updatedContext = {
...context,
[step.id]: result.result,
};
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
output: {
steps: updatedOutputSteps,
},
context: updatedContext,
});
return await this.execute({ return await this.execute({
workflowRunId,
currentStepIndex: currentStepIndex + 1, currentStepIndex: currentStepIndex + 1,
steps, steps,
context: { context: updatedContext,
...context, workflowExecutorOutput: updatedWorkflowExecutorOutput,
[step.id]: result.result,
},
output: updatedOutput,
}); });
} }
if (step.settings.errorHandlingOptions.continueOnFailure.value) { if (step.settings.errorHandlingOptions.continueOnFailure.value) {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
output: {
steps: updatedOutputSteps,
},
context,
});
return await this.execute({ return await this.execute({
workflowRunId,
currentStepIndex: currentStepIndex + 1, currentStepIndex: currentStepIndex + 1,
steps, steps,
context, context,
output: updatedOutput, workflowExecutorOutput: updatedWorkflowExecutorOutput,
}); });
} }
@ -125,15 +151,27 @@ export class WorkflowExecutorWorkspaceService {
attemptCount < MAX_RETRIES_ON_FAILURE attemptCount < MAX_RETRIES_ON_FAILURE
) { ) {
return await this.execute({ return await this.execute({
workflowRunId,
currentStepIndex, currentStepIndex,
steps, steps,
context, context,
output: updatedOutput, workflowExecutorOutput: updatedWorkflowExecutorOutput,
attemptCount: attemptCount + 1, attemptCount: attemptCount + 1,
}); });
} }
return { ...updatedOutput, status: WorkflowRunStatus.FAILED }; await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
output: {
steps: updatedOutputSteps,
},
context,
});
return {
...updatedWorkflowExecutorOutput,
status: WorkflowRunStatus.FAILED,
};
} }
private sendWorkflowNodeRunEvent() { private sendWorkflowNodeRunEvent() {

View File

@ -12,7 +12,7 @@ import {
WorkflowRunException, WorkflowRunException,
WorkflowRunExceptionCode, WorkflowRunExceptionCode,
} from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception'; } from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
export type RunWorkflowJobData = { export type RunWorkflowJobData = {
workspaceId: string; workspaceId: string;
@ -38,7 +38,14 @@ export class RunWorkflowJob {
workflowRunId, workflowRunId,
payload, payload,
}: RunWorkflowJobData): Promise<void> { }: RunWorkflowJobData): Promise<void> {
await this.workflowRunWorkspaceService.startWorkflowRun(workflowRunId); const context = {
trigger: payload,
};
await this.workflowRunWorkspaceService.startWorkflowRun({
workflowRunId,
context,
});
try { try {
const workflowVersion = const workflowVersion =
@ -48,35 +55,27 @@ export class RunWorkflowJob {
await this.throttleExecution(workflowVersion.workflowId); await this.throttleExecution(workflowVersion.workflowId);
const { steps, status } = const { status } = await this.workflowExecutorWorkspaceService.execute({
await this.workflowExecutorWorkspaceService.execute({ workflowRunId,
currentStepIndex: 0, currentStepIndex: 0,
steps: workflowVersion.steps || [], steps: workflowVersion.steps || [],
context: { context,
trigger: payload, workflowExecutorOutput: {
}, steps: {},
output: { status: WorkflowRunStatus.RUNNING,
steps: {}, },
status: WorkflowRunStatus.RUNNING, });
},
});
await this.workflowRunWorkspaceService.endWorkflowRun( await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId, workflowRunId,
status, status,
{ });
steps,
},
);
} catch (error) { } catch (error) {
await this.workflowRunWorkspaceService.endWorkflowRun( await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId, workflowRunId,
WorkflowRunStatus.FAILED, status: WorkflowRunStatus.FAILED,
{ error: error.message,
steps: {}, });
error: error.message,
},
);
} }
} }

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
@Module({
imports: [WorkflowCommonModule],
providers: [WorkflowRunWorkspaceService],
exports: [WorkflowRunWorkspaceService],
})
export class WorkflowRunModule {}

View File

@ -20,7 +20,13 @@ export class WorkflowRunWorkspaceService {
private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService, private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
) {} ) {}
async createWorkflowRun(workflowVersionId: string, createdBy: ActorMetadata) { async createWorkflowRun({
workflowVersionId,
createdBy,
}: {
workflowVersionId: string;
createdBy: ActorMetadata;
}) {
const workflowRunRepository = const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>( await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun', 'workflowRun',
@ -42,7 +48,13 @@ export class WorkflowRunWorkspaceService {
).id; ).id;
} }
async startWorkflowRun(workflowRunId: string) { async startWorkflowRun({
workflowRunId,
context,
}: {
workflowRunId: string;
context: Record<string, any>;
}) {
const workflowRunRepository = const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>( await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun', 'workflowRun',
@ -69,14 +81,19 @@ export class WorkflowRunWorkspaceService {
return workflowRunRepository.update(workflowRunToUpdate.id, { return workflowRunRepository.update(workflowRunToUpdate.id, {
status: WorkflowRunStatus.RUNNING, status: WorkflowRunStatus.RUNNING,
startedAt: new Date().toISOString(), startedAt: new Date().toISOString(),
context,
}); });
} }
async endWorkflowRun( async endWorkflowRun({
workflowRunId: string, workflowRunId,
status: WorkflowRunStatus, status,
output: WorkflowRunOutput, error,
) { }: {
workflowRunId: string;
status: WorkflowRunStatus;
error?: string;
}) {
const workflowRunRepository = const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>( await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun', 'workflowRun',
@ -102,8 +119,42 @@ export class WorkflowRunWorkspaceService {
return workflowRunRepository.update(workflowRunToUpdate.id, { return workflowRunRepository.update(workflowRunToUpdate.id, {
status, status,
output,
endedAt: new Date().toISOString(), endedAt: new Date().toISOString(),
output: {
...(workflowRunToUpdate.output ?? {}),
error,
},
});
}
async saveWorkflowRunState({
workflowRunId,
output,
context,
}: {
workflowRunId: string;
output: WorkflowRunOutput;
context: Record<string, any>;
}) {
const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun',
);
const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
});
if (!workflowRunToUpdate) {
throw new WorkflowRunException(
'No workflow run to save',
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}
return workflowRunRepository.update(workflowRunId, {
output,
context,
}); });
} }
} }

View File

@ -5,7 +5,7 @@ import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.mod
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module'; import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module';
import { RunWorkflowJob } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; import { RunWorkflowJob } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.module';
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service';
@Module({ @Module({
@ -14,12 +14,9 @@ import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-ru
WorkflowExecutorModule, WorkflowExecutorModule,
ThrottlerModule, ThrottlerModule,
BillingModule, BillingModule,
WorkflowRunModule,
], ],
providers: [ providers: [WorkflowRunnerWorkspaceService, RunWorkflowJob],
WorkflowRunnerWorkspaceService,
WorkflowRunWorkspaceService,
RunWorkflowJob,
],
exports: [WorkflowRunnerWorkspaceService], exports: [WorkflowRunnerWorkspaceService],
}) })
export class WorkflowRunnerModule {} export class WorkflowRunnerModule {}

View File

@ -9,7 +9,7 @@ import {
RunWorkflowJob, RunWorkflowJob,
RunWorkflowJobData, RunWorkflowJobData,
} from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
@Injectable() @Injectable()
export class WorkflowRunnerWorkspaceService { export class WorkflowRunnerWorkspaceService {
@ -36,10 +36,10 @@ export class WorkflowRunnerWorkspaceService {
); );
} }
const workflowRunId = const workflowRunId =
await this.workflowRunWorkspaceService.createWorkflowRun( await this.workflowRunWorkspaceService.createWorkflowRun({
workflowVersionId, workflowVersionId,
source, createdBy: source,
); });
await this.messageQueueService.add<RunWorkflowJobData>( await this.messageQueueService.add<RunWorkflowJobData>(
RunWorkflowJob.name, RunWorkflowJob.name,