22 branches data migration (#13006)

This PR does not produce any functional change

First step of the workflow branch feature

- add gather `workflowRun.output` and `workflowRun.context` into one
column `workflowRun.runContext`
- add a command to fill `runContext` from `output` and `context` in
existing records
- maintain `runContext` up to date during workflow runs
This commit is contained in:
martmull
2025-07-03 10:18:33 +02:00
committed by GitHub
parent 29dbd3fc25
commit 51cb35a27a
11 changed files with 387 additions and 6 deletions

View File

@ -0,0 +1,157 @@
import { InjectRepository } from '@nestjs/typeorm';
import { MoreThan, Repository } from 'typeorm';
import { Command, Option } from 'nest-commander';
import { isDefined } from 'twenty-shared/utils';
import {
ActiveOrSuspendedWorkspacesMigrationCommandRunner,
RunOnWorkspaceArgs,
} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import {
WorkflowRunState,
WorkflowRunOutput,
WorkflowRunWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import {
StepStatus,
WorkflowRunStepInfo,
} from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
const DEFAULT_CHUNK_SIZE = 500;
@Command({
name: 'upgrade:1-1:migrate-workflow-run-state',
description: 'Migrate state column in workflow run records',
})
export class MigrateWorkflowRunStatesCommand extends ActiveOrSuspendedWorkspacesMigrationCommandRunner {
private afterDate: string | undefined;
private chunkSize = DEFAULT_CHUNK_SIZE;
constructor(
@InjectRepository(Workspace, 'core')
protected readonly workspaceRepository: Repository<Workspace>,
protected readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {
super(workspaceRepository, twentyORMGlobalManager);
}
@Option({
flags: '--after-date [after_date]',
description: 'Only select records after this date (YYYY-MM-DD).',
required: false,
})
parseAfterDate(val: string): string | undefined {
const date = new Date(val);
if (isNaN(date.getTime())) {
throw new Error(`Invalid date format: ${val}`);
}
const afterDate = date.toISOString();
this.afterDate = afterDate;
return afterDate;
}
@Option({
flags: '--chunk-size [chunk_size]',
description:
'Split workflowRuns into chunks for each workspaces (default 500)',
required: false,
})
parseChunkSize(val: number): number {
if (isNaN(val) || val <= 0) {
throw new Error(`Invalid chunk size: ${val}. Should be greater than 0`);
}
this.chunkSize = val;
return this.chunkSize;
}
override async runOnWorkspace({
workspaceId,
}: RunOnWorkspaceArgs): Promise<void> {
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
workspaceId,
'workflowRun',
{ shouldBypassPermissionChecks: true },
);
const workflowRunCount = await workflowRunRepository.count();
const chunkCount = Math.ceil(workflowRunCount / this.chunkSize);
this.logger.log(
`Migrate ${workflowRunCount} workflowRun state in ${chunkCount} chunks of size ${this.chunkSize}`,
);
for (let offset = 0; offset < chunkCount; offset += 1) {
this.logger.log(`- Proceeding chunk ${offset + 1}/${chunkCount}`);
const findOption = isDefined(this.afterDate)
? { where: { startedAt: MoreThan(this.afterDate) } }
: {};
const workflowRuns = await workflowRunRepository.find({
...findOption,
skip: offset * this.chunkSize,
take: this.chunkSize,
});
for (const workflowRun of workflowRuns) {
const output = workflowRun.output;
if (!isDefined(output)) {
continue;
}
const state = this.buildRunStateFromOutput(output);
await workflowRunRepository.update(workflowRun.id, {
state,
});
}
}
}
private buildRunStateFromOutput(output: WorkflowRunOutput): WorkflowRunState {
const stepInfos: Record<string, WorkflowRunStepInfo> = Object.fromEntries(
output.flow.steps.map((step) => {
const stepOutput = output.stepsOutput?.[step.id];
const status = stepOutput?.pendingEvent
? StepStatus.PENDING
: stepOutput?.error
? StepStatus.FAILED
: stepOutput?.result
? StepStatus.SUCCESS
: StepStatus.NOT_STARTED;
return [
step.id,
{
result: stepOutput?.result,
error: stepOutput?.error,
status,
},
];
}),
);
stepInfos['trigger'] = {
result: output?.stepsOutput?.trigger?.result,
status: StepStatus.SUCCESS,
};
return {
flow: output?.flow,
workflowRunError: output?.error,
stepInfos,
};
}
}

View File

@ -14,6 +14,7 @@ import { WorkspaceMetadataVersionModule } from 'src/engine/metadata-modules/work
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { WorkspaceHealthModule } from 'src/engine/workspace-manager/workspace-health/workspace-health.module';
import { WorkspaceMigrationRunnerModule } from 'src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.module';
import { MigrateWorkflowRunStatesCommand } from 'src/database/commands/upgrade-version-command/1-1/1-1-migrate-workflow-run-state.command';
@Module({
imports: [
@ -37,10 +38,12 @@ import { WorkspaceMigrationRunnerModule } from 'src/engine/workspace-manager/wor
providers: [
FixUpdateStandardFieldsIsLabelSyncedWithName,
FixSchemaArrayTypeCommand,
MigrateWorkflowRunStatesCommand,
],
exports: [
FixUpdateStandardFieldsIsLabelSyncedWithName,
FixSchemaArrayTypeCommand,
MigrateWorkflowRunStatesCommand,
],
})
export class V1_1_UpgradeVersionCommandModule {}

View File

@ -29,6 +29,7 @@ import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { SyncWorkspaceMetadataCommand } from 'src/engine/workspace-manager/workspace-sync-metadata/commands/sync-workspace-metadata.command';
import { compareVersionMajorAndMinor } from 'src/utils/version/compare-version-minor-and-major';
import { MigrateWorkflowRunStatesCommand } from 'src/database/commands/upgrade-version-command/1-1/1-1-migrate-workflow-run-state.command';
const execPromise = promisify(exec);
@ -144,6 +145,7 @@ export class UpgradeCommand extends UpgradeCommandRunner {
protected readonly fixUpdateStandardFieldsIsLabelSyncedWithNameCommand: FixUpdateStandardFieldsIsLabelSyncedWithName,
// 1.2 Commands
protected readonly migrateWorkflowRunStatesCommand: MigrateWorkflowRunStatesCommand,
protected readonly addEnqueuedStatusToWorkflowRunCommand: AddEnqueuedStatusToWorkflowRunCommand,
) {
super(
@ -195,7 +197,7 @@ export class UpgradeCommand extends UpgradeCommandRunner {
const commands_120: VersionCommands = {
beforeSyncMetadata: [this.addEnqueuedStatusToWorkflowRunCommand],
afterSyncMetadata: [],
afterSyncMetadata: [this.migrateWorkflowRunStatesCommand],
};
this.allCommands = {