Refactor migration runner within transaction (#12941)
Modifying the data-model can sometimes fail in the middle of your operation, due to the way we handle both metadata update and schema migration separately, a field can be created while the associated column creation failed (same for object/table and such). This is also an issue because WorkspaceMigrations are then stored as FAILED can never really recovered by themselves so the schema is broken and we can't update the models anymore. This PR adds a executeMigrationFromPendingMigrationsWithinTransaction method where we can (and must) pass a queryRunner executing a transaction, which should come from the metadata services so that if anything during metadata update OR schema update fails, it rolls back everything (this also mean a workspaceMigration should never stay in a failed state now). This also fixes some issues with migration not running in the correct order due to having the same timestamp and having to do some weird logic to fix that. This is a first step and fix before working on a much more reliable solution in the upcoming weeks where we will refactor the way we interact with the data model. --------- Co-authored-by: Charles Bochet <charlesBochet@users.noreply.github.com>
This commit is contained in:
@ -32,6 +32,54 @@ export class WorkspaceMigrationRunnerService {
|
||||
private readonly workspaceMigrationColumnService: WorkspaceMigrationColumnService,
|
||||
) {}
|
||||
|
||||
public async executeMigrationFromPendingMigrationsWithinTransaction(
|
||||
workspaceId: string,
|
||||
transactionQueryRunner: QueryRunner,
|
||||
): Promise<WorkspaceMigrationTableAction[]> {
|
||||
const pendingMigrations =
|
||||
await this.workspaceMigrationService.getPendingMigrations(
|
||||
workspaceId,
|
||||
transactionQueryRunner,
|
||||
);
|
||||
|
||||
if (pendingMigrations.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const migrationActionsWithParent = pendingMigrations.flatMap(
|
||||
(pendingMigration) =>
|
||||
(pendingMigration.migrations || []).map((tableAction) => ({
|
||||
tableAction,
|
||||
parentMigrationId: pendingMigration.id,
|
||||
})),
|
||||
);
|
||||
|
||||
const schemaName =
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
await transactionQueryRunner.query(
|
||||
`SET LOCAL search_path TO ${schemaName}`,
|
||||
);
|
||||
|
||||
for (const {
|
||||
tableAction,
|
||||
parentMigrationId,
|
||||
} of migrationActionsWithParent) {
|
||||
await this.handleTableChanges(
|
||||
transactionQueryRunner as PostgresQueryRunner,
|
||||
schemaName,
|
||||
tableAction,
|
||||
);
|
||||
|
||||
await transactionQueryRunner.query(
|
||||
`UPDATE "core"."workspaceMigration" SET "appliedAt" = NOW() WHERE "id" = $1 AND "workspaceId" = $2`,
|
||||
[parentMigrationId, workspaceId],
|
||||
);
|
||||
}
|
||||
|
||||
return migrationActionsWithParent.map((item) => item.tableAction);
|
||||
}
|
||||
|
||||
public async executeMigrationFromPendingMigrations(
|
||||
workspaceId: string,
|
||||
): Promise<WorkspaceMigrationTableAction[]> {
|
||||
@ -42,58 +90,32 @@ export class WorkspaceMigrationRunnerService {
|
||||
throw new Error('Main data source not found');
|
||||
}
|
||||
|
||||
const pendingMigrations =
|
||||
await this.workspaceMigrationService.getPendingMigrations(workspaceId);
|
||||
|
||||
if (pendingMigrations.length === 0) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const flattenedPendingMigrations: WorkspaceMigrationTableAction[] =
|
||||
pendingMigrations.reduce((acc, pendingMigration) => {
|
||||
return [...acc, ...pendingMigration.migrations];
|
||||
}, []);
|
||||
|
||||
const queryRunner =
|
||||
mainDataSource.createQueryRunner() as PostgresQueryRunner;
|
||||
|
||||
await queryRunner.connect();
|
||||
await queryRunner.startTransaction();
|
||||
|
||||
const schemaName =
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
await queryRunner.query(`SET LOCAL search_path TO ${schemaName}`);
|
||||
|
||||
try {
|
||||
// Loop over each migration and create or update the table
|
||||
for (const migration of flattenedPendingMigrations) {
|
||||
await this.handleTableChanges(queryRunner, schemaName, migration);
|
||||
}
|
||||
const result =
|
||||
await this.executeMigrationFromPendingMigrationsWithinTransaction(
|
||||
workspaceId,
|
||||
queryRunner,
|
||||
);
|
||||
|
||||
await queryRunner.commitTransaction();
|
||||
|
||||
return result;
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error executing migration: ${error.message}`,
|
||||
error.stack,
|
||||
);
|
||||
|
||||
await queryRunner.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await queryRunner.release();
|
||||
}
|
||||
|
||||
// Update appliedAt date for each migration
|
||||
// TODO: Should be done after the migration is successful
|
||||
for (const pendingMigration of pendingMigrations) {
|
||||
await this.workspaceMigrationService.setAppliedAtForMigration(
|
||||
workspaceId,
|
||||
pendingMigration,
|
||||
);
|
||||
}
|
||||
|
||||
return flattenedPendingMigrations;
|
||||
}
|
||||
|
||||
private async handleTableChanges(
|
||||
|
||||
Reference in New Issue
Block a user