diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/0-52/0-52-backfill-workflow-next-step-ids.command.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/0-52/0-52-backfill-workflow-next-step-ids.command.ts index 9d9205188..5851644c5 100644 --- a/packages/twenty-server/src/database/commands/upgrade-version-command/0-52/0-52-backfill-workflow-next-step-ids.command.ts +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/0-52/0-52-backfill-workflow-next-step-ids.command.ts @@ -49,7 +49,10 @@ export class BackfillWorkflowNextStepIdsCommand extends ActiveOrSuspendedWorkspa const workflowVersions = await workflowVersionRepository.find(); + this.logger.log(`Number of workflow versions: ${workflowVersions.length}`); + for (const workflowVersion of workflowVersions) { + this.logger.log(`Processing workflow version: ${workflowVersion.id}`); const updatedSteps: WorkflowVersionWorkspaceEntity['steps'] = []; const workflowSteps = workflowVersion.steps; @@ -58,59 +61,83 @@ export class BackfillWorkflowNextStepIdsCommand extends ActiveOrSuspendedWorkspa } // for each step, add the next step id which is the next index - for (let i = 0; i < workflowSteps.length; i++) { + for (let stepIndex = 0; stepIndex < workflowSteps.length; stepIndex++) { const updatedStep = { - ...workflowSteps[i], + ...workflowSteps[stepIndex], nextStepIds: - i < workflowSteps.length - 1 && workflowSteps[i + 1]?.id - ? [workflowSteps[i + 1].id] + stepIndex < workflowSteps.length - 1 && + workflowSteps[stepIndex + 1]?.id + ? [workflowSteps[stepIndex + 1].id] : undefined, }; updatedSteps.push(updatedStep); } - // update workflow run flows - const workflowRuns = await workflowRunRepository.find({ + // update workflow run flows by batch of 500 + const batchSize = 500; + const totalWorkflowRuns = await workflowRunRepository.count({ where: { workflowVersionId: workflowVersion.id, }, }); - const workflowRunsToUpdate: WorkflowRunWorkspaceEntity[] = []; + const totalBatches = Math.ceil(totalWorkflowRuns / batchSize); - for (const workflowRun of workflowRuns) { - const flow = workflowRun.output?.flow; + this.logger.log(`Total batches: ${totalBatches}`); - if (!flow?.steps) { - continue; - } + for (let batchIndex = 0; batchIndex < totalBatches; batchIndex++) { + const updatedWorkflowRuns: WorkflowRunWorkspaceEntity[] = []; - const updatedFlow = flow.steps.map((step) => { - const updatedStep = updatedSteps.find((s) => s.id === step.id); - - return { - ...step, - nextStepIds: updatedStep?.nextStepIds, - }; + this.logger.log(`Processing batch ${batchIndex + 1}/${totalBatches}`); + const workflowRuns = await workflowRunRepository.find({ + where: { + workflowVersionId: workflowVersion.id, + }, + take: batchSize, + skip: batchIndex * batchSize, + order: { + id: 'ASC', + }, }); - const updatedWorkflowRun: WorkflowRunWorkspaceEntity = { - ...workflowRun, - output: { - ...workflowRun.output, - flow: { - trigger: workflowRun.output?.flow?.trigger as WorkflowTrigger, - steps: updatedFlow, + for (const workflowRun of workflowRuns) { + const flow = workflowRun.output?.flow; + + if (!flow?.steps || flow.steps.length < 2) { + continue; + } + + const updatedStepsMap = new Map( + updatedSteps.map((step) => [step.id, step]), + ); + + const updatedFlow = flow.steps.map((step) => { + const updatedStep = updatedStepsMap.get(step.id); + + return { + ...step, + nextStepIds: updatedStep?.nextStepIds, + }; + }); + + const updatedWorkflowRun: WorkflowRunWorkspaceEntity = { + ...workflowRun, + output: { + ...workflowRun.output, + flow: { + trigger: workflowRun.output?.flow?.trigger as WorkflowTrigger, + steps: updatedFlow, + }, }, - }, - }; + }; - workflowRunsToUpdate.push(updatedWorkflowRun); + updatedWorkflowRuns.push(updatedWorkflowRun); + } + + await workflowRunRepository.save(updatedWorkflowRuns); } - await workflowRunRepository.save(workflowRunsToUpdate); - await workflowVersionRepository.save({ ...workflowVersion, steps: updatedSteps,