Fix next step ids backfill command (#11769)

Twenty prod DB has been exported for testing.

Main updates:
- do not process workflow runs with less than 2 steps. Nothing to do.
- update runs by batches of 500. Will avoid java heap space issue
- add more logs
This commit is contained in:
Thomas Trompette
2025-04-28 16:24:15 +02:00
committed by GitHub
parent 0881e35d64
commit b182a676d7

View File

@ -49,7 +49,10 @@ export class BackfillWorkflowNextStepIdsCommand extends ActiveOrSuspendedWorkspa
const workflowVersions = await workflowVersionRepository.find(); const workflowVersions = await workflowVersionRepository.find();
this.logger.log(`Number of workflow versions: ${workflowVersions.length}`);
for (const workflowVersion of workflowVersions) { for (const workflowVersion of workflowVersions) {
this.logger.log(`Processing workflow version: ${workflowVersion.id}`);
const updatedSteps: WorkflowVersionWorkspaceEntity['steps'] = []; const updatedSteps: WorkflowVersionWorkspaceEntity['steps'] = [];
const workflowSteps = workflowVersion.steps; const workflowSteps = workflowVersion.steps;
@ -58,59 +61,83 @@ export class BackfillWorkflowNextStepIdsCommand extends ActiveOrSuspendedWorkspa
} }
// for each step, add the next step id which is the next index // for each step, add the next step id which is the next index
for (let i = 0; i < workflowSteps.length; i++) { for (let stepIndex = 0; stepIndex < workflowSteps.length; stepIndex++) {
const updatedStep = { const updatedStep = {
...workflowSteps[i], ...workflowSteps[stepIndex],
nextStepIds: nextStepIds:
i < workflowSteps.length - 1 && workflowSteps[i + 1]?.id stepIndex < workflowSteps.length - 1 &&
? [workflowSteps[i + 1].id] workflowSteps[stepIndex + 1]?.id
? [workflowSteps[stepIndex + 1].id]
: undefined, : undefined,
}; };
updatedSteps.push(updatedStep); updatedSteps.push(updatedStep);
} }
// update workflow run flows // update workflow run flows by batch of 500
const workflowRuns = await workflowRunRepository.find({ const batchSize = 500;
const totalWorkflowRuns = await workflowRunRepository.count({
where: { where: {
workflowVersionId: workflowVersion.id, workflowVersionId: workflowVersion.id,
}, },
}); });
const workflowRunsToUpdate: WorkflowRunWorkspaceEntity[] = []; const totalBatches = Math.ceil(totalWorkflowRuns / batchSize);
for (const workflowRun of workflowRuns) { this.logger.log(`Total batches: ${totalBatches}`);
const flow = workflowRun.output?.flow;
if (!flow?.steps) { for (let batchIndex = 0; batchIndex < totalBatches; batchIndex++) {
continue; const updatedWorkflowRuns: WorkflowRunWorkspaceEntity[] = [];
}
const updatedFlow = flow.steps.map((step) => { this.logger.log(`Processing batch ${batchIndex + 1}/${totalBatches}`);
const updatedStep = updatedSteps.find((s) => s.id === step.id); const workflowRuns = await workflowRunRepository.find({
where: {
return { workflowVersionId: workflowVersion.id,
...step, },
nextStepIds: updatedStep?.nextStepIds, take: batchSize,
}; skip: batchIndex * batchSize,
order: {
id: 'ASC',
},
}); });
const updatedWorkflowRun: WorkflowRunWorkspaceEntity = { for (const workflowRun of workflowRuns) {
...workflowRun, const flow = workflowRun.output?.flow;
output: {
...workflowRun.output, if (!flow?.steps || flow.steps.length < 2) {
flow: { continue;
trigger: workflowRun.output?.flow?.trigger as WorkflowTrigger, }
steps: updatedFlow,
const updatedStepsMap = new Map(
updatedSteps.map((step) => [step.id, step]),
);
const updatedFlow = flow.steps.map((step) => {
const updatedStep = updatedStepsMap.get(step.id);
return {
...step,
nextStepIds: updatedStep?.nextStepIds,
};
});
const updatedWorkflowRun: WorkflowRunWorkspaceEntity = {
...workflowRun,
output: {
...workflowRun.output,
flow: {
trigger: workflowRun.output?.flow?.trigger as WorkflowTrigger,
steps: updatedFlow,
},
}, },
}, };
};
workflowRunsToUpdate.push(updatedWorkflowRun); updatedWorkflowRuns.push(updatedWorkflowRun);
}
await workflowRunRepository.save(updatedWorkflowRuns);
} }
await workflowRunRepository.save(workflowRunsToUpdate);
await workflowVersionRepository.save({ await workflowVersionRepository.save({
...workflowVersion, ...workflowVersion,
steps: updatedSteps, steps: updatedSteps,