Introduce remote table entity (#4994)
We will require remote table entity to map distant table name and local foreign table name. Introducing the entity: - new source of truth to know if a table is sync or not - created synchronously at the same time as metadata and foreign table Adding a few more changes: - exception rather than errors so the user can see these - `pluralize` library that will allow to stop adding `Remote` suffix on names --------- Co-authored-by: Thomas Trompette <thomast@twenty.com>
This commit is contained in:
@ -1,14 +1,16 @@
|
||||
import { ObjectType } from '@nestjs/graphql';
|
||||
|
||||
import {
|
||||
Column,
|
||||
CreateDateColumn,
|
||||
Entity,
|
||||
Generated,
|
||||
OneToMany,
|
||||
PrimaryGeneratedColumn,
|
||||
Relation,
|
||||
UpdateDateColumn,
|
||||
} from 'typeorm';
|
||||
|
||||
import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.entity';
|
||||
|
||||
export enum RemoteServerType {
|
||||
POSTGRES_FDW = 'postgres_fdw',
|
||||
}
|
||||
@ -30,7 +32,6 @@ export type UserMappingOptions = {
|
||||
};
|
||||
|
||||
@Entity('remoteServer')
|
||||
@ObjectType('RemoteServer')
|
||||
export class RemoteServerEntity<T extends RemoteServerType> {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
id: string;
|
||||
@ -51,6 +52,11 @@ export class RemoteServerEntity<T extends RemoteServerType> {
|
||||
@Column({ nullable: false, type: 'uuid' })
|
||||
workspaceId: string;
|
||||
|
||||
@OneToMany(() => RemoteTableEntity, (table) => table.server, {
|
||||
cascade: true,
|
||||
})
|
||||
tables: Relation<RemoteTableEntity[]>;
|
||||
|
||||
@CreateDateColumn({ type: 'timestamptz' })
|
||||
createdAt: Date;
|
||||
|
||||
|
||||
@ -113,24 +113,10 @@ export class RemoteServerService<T extends RemoteServerType> {
|
||||
});
|
||||
|
||||
if (!remoteServer) {
|
||||
throw new NotFoundException('Object does not exist');
|
||||
throw new NotFoundException('Remote server does not exist');
|
||||
}
|
||||
|
||||
const foreignTablesToRemove =
|
||||
await this.remoteTableService.fetchForeignTableNamesWithinWorkspace(
|
||||
workspaceId,
|
||||
remoteServer.foreignDataWrapperId,
|
||||
);
|
||||
|
||||
if (foreignTablesToRemove.length) {
|
||||
for (const foreignTableName of foreignTablesToRemove) {
|
||||
await this.remoteTableService.removeForeignTableAndMetadata(
|
||||
foreignTableName,
|
||||
workspaceId,
|
||||
remoteServer,
|
||||
);
|
||||
}
|
||||
}
|
||||
await this.remoteTableService.unsyncAll(workspaceId, remoteServer);
|
||||
|
||||
return this.metadataDataSource.transaction(
|
||||
async (entityManager: EntityManager) => {
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import { ObjectType, Field, registerEnumType } from '@nestjs/graphql';
|
||||
|
||||
import { IsEnum } from 'class-validator';
|
||||
import { PrimaryGeneratedColumn } from 'typeorm';
|
||||
|
||||
export enum RemoteTableStatus {
|
||||
SYNCED = 'SYNCED',
|
||||
@ -14,6 +15,9 @@ registerEnumType(RemoteTableStatus, {
|
||||
|
||||
@ObjectType('RemoteTable')
|
||||
export class RemoteTableDTO {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
id: string;
|
||||
|
||||
@Field(() => String)
|
||||
name: string;
|
||||
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
import {
|
||||
Entity,
|
||||
PrimaryGeneratedColumn,
|
||||
Column,
|
||||
CreateDateColumn,
|
||||
UpdateDateColumn,
|
||||
JoinColumn,
|
||||
ManyToOne,
|
||||
Relation,
|
||||
} from 'typeorm';
|
||||
|
||||
import {
|
||||
RemoteServerEntity,
|
||||
RemoteServerType,
|
||||
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
|
||||
|
||||
@Entity('remoteTable')
|
||||
export class RemoteTableEntity {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
id: string;
|
||||
|
||||
@Column({ nullable: false })
|
||||
distantTableName: string;
|
||||
|
||||
@Column({ nullable: false })
|
||||
localTableName: string;
|
||||
|
||||
@Column({ nullable: false, type: 'uuid' })
|
||||
workspaceId: string;
|
||||
|
||||
@Column({ nullable: false, type: 'uuid' })
|
||||
remoteServerId: string;
|
||||
|
||||
@ManyToOne(() => RemoteServerEntity, (server) => server.tables, {
|
||||
onDelete: 'CASCADE',
|
||||
})
|
||||
@JoinColumn({ name: 'remoteServerId' })
|
||||
server: Relation<RemoteServerEntity<RemoteServerType>>;
|
||||
|
||||
@CreateDateColumn({ type: 'timestamptz' })
|
||||
createdAt: Date;
|
||||
|
||||
@UpdateDateColumn({ type: 'timestamptz' })
|
||||
updatedAt: Date;
|
||||
}
|
||||
@ -7,6 +7,7 @@ import { FieldMetadataModule } from 'src/engine/metadata-modules/field-metadata/
|
||||
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
|
||||
import { RemoteServerEntity } from 'src/engine/metadata-modules/remote-server/remote-server.entity';
|
||||
import { RemotePostgresTableModule } from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/remote-postgres-table.module';
|
||||
import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.entity';
|
||||
import { RemoteTableResolver } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.resolver';
|
||||
import { RemoteTableService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.service';
|
||||
import { WorkspaceCacheVersionModule } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.module';
|
||||
@ -16,7 +17,10 @@ import { WorkspaceMigrationRunnerModule } from 'src/engine/workspace-manager/wor
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
TypeOrmModule.forFeature([RemoteServerEntity], 'metadata'),
|
||||
TypeOrmModule.forFeature(
|
||||
[RemoteServerEntity, RemoteTableEntity],
|
||||
'metadata',
|
||||
),
|
||||
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
|
||||
DataSourceModule,
|
||||
ObjectMetadataModule,
|
||||
|
||||
@ -1,16 +1,14 @@
|
||||
import { NotFoundException } from '@nestjs/common';
|
||||
import { BadRequestException, NotFoundException } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
|
||||
import { Repository } from 'typeorm';
|
||||
import { plural } from 'pluralize';
|
||||
|
||||
import {
|
||||
RemoteServerType,
|
||||
RemoteServerEntity,
|
||||
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
|
||||
import {
|
||||
RemoteTableDTO,
|
||||
RemoteTableStatus,
|
||||
} from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
|
||||
import { RemoteTableStatus } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
|
||||
import {
|
||||
isPostgreSQLIntegrationEnabled,
|
||||
mapUdtNameToFieldType,
|
||||
@ -26,7 +24,6 @@ import { RemotePostgresTableService } from 'src/engine/metadata-modules/remote-s
|
||||
import { WorkspaceCacheVersionService } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.service';
|
||||
import { camelCase } from 'src/utils/camel-case';
|
||||
import { camelToTitleCase } from 'src/utils/camel-to-title-case';
|
||||
import { getRemoteTableLocalName } from 'src/engine/metadata-modules/remote-server/remote-table/utils/get-remote-table-local-name.util';
|
||||
import { WorkspaceMigrationService } from 'src/engine/metadata-modules/workspace-migration/workspace-migration.service';
|
||||
import { WorkspaceMigrationRunnerService } from 'src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.service';
|
||||
import { generateMigrationName } from 'src/engine/metadata-modules/workspace-migration/utils/generate-migration-name.util';
|
||||
@ -38,9 +35,13 @@ import {
|
||||
import { RemoteTableColumn } from 'src/engine/metadata-modules/remote-server/remote-table/types/remote-table-column';
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
import { RemoteTable } from 'src/engine/metadata-modules/remote-server/remote-table/types/remote-table';
|
||||
import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.entity';
|
||||
import { getRemoteTableLocalName } from 'src/engine/metadata-modules/remote-server/remote-table/utils/get-remote-table-local-name.util';
|
||||
|
||||
export class RemoteTableService {
|
||||
constructor(
|
||||
@InjectRepository(RemoteTableEntity, 'metadata')
|
||||
private readonly remoteTableRepository: Repository<RemoteTableEntity>,
|
||||
@InjectRepository(RemoteServerEntity, 'metadata')
|
||||
private readonly remoteServerRepository: Repository<
|
||||
RemoteServerEntity<RemoteServerType>
|
||||
@ -72,27 +73,34 @@ export class RemoteTableService {
|
||||
throw new NotFoundException('Remote server does not exist');
|
||||
}
|
||||
|
||||
const currentForeignTableNames =
|
||||
await this.fetchForeignTableNamesWithinWorkspace(
|
||||
workspaceId,
|
||||
remoteServer.foreignDataWrapperId,
|
||||
);
|
||||
const currentRemoteTableDistantNames = (
|
||||
await this.remoteTableRepository.find({
|
||||
where: {
|
||||
remoteServerId: id,
|
||||
workspaceId,
|
||||
},
|
||||
})
|
||||
).map((remoteTable) => remoteTable.distantTableName);
|
||||
|
||||
const tableInRemoteSchema =
|
||||
const tablesInRemoteSchema =
|
||||
await this.fetchTablesFromRemoteSchema(remoteServer);
|
||||
|
||||
return tableInRemoteSchema.map((remoteTable) => ({
|
||||
return tablesInRemoteSchema.map((remoteTable) => ({
|
||||
name: remoteTable.tableName,
|
||||
schema: remoteTable.tableSchema,
|
||||
status: currentForeignTableNames.includes(
|
||||
getRemoteTableLocalName(remoteTable.tableName),
|
||||
)
|
||||
status: currentRemoteTableDistantNames.includes(remoteTable.tableName)
|
||||
? RemoteTableStatus.SYNCED
|
||||
: RemoteTableStatus.NOT_SYNCED,
|
||||
}));
|
||||
}
|
||||
|
||||
public async syncRemoteTable(input: RemoteTableInput, workspaceId: string) {
|
||||
if (!input.schema) {
|
||||
throw new BadRequestException(
|
||||
'Schema is required for syncing remote table',
|
||||
);
|
||||
}
|
||||
|
||||
const remoteServer = await this.remoteServerRepository.findOne({
|
||||
where: {
|
||||
id: input.remoteServerId,
|
||||
@ -104,13 +112,70 @@ export class RemoteTableService {
|
||||
throw new NotFoundException('Remote server does not exist');
|
||||
}
|
||||
|
||||
const remoteTable = await this.createForeignTableAndMetadata(
|
||||
input,
|
||||
remoteServer,
|
||||
const currentRemoteTableWithSameDistantName =
|
||||
await this.remoteTableRepository.findOne({
|
||||
where: {
|
||||
distantTableName: input.name,
|
||||
remoteServerId: remoteServer.id,
|
||||
workspaceId,
|
||||
},
|
||||
});
|
||||
|
||||
if (currentRemoteTableWithSameDistantName) {
|
||||
throw new BadRequestException('Remote table already exists');
|
||||
}
|
||||
|
||||
const dataSourceMetatada =
|
||||
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
const localTableName = getRemoteTableLocalName(input.name);
|
||||
|
||||
await this.validateTableNameDoesNotExists(
|
||||
localTableName,
|
||||
workspaceId,
|
||||
dataSourceMetatada.schema,
|
||||
);
|
||||
|
||||
return remoteTable;
|
||||
const remoteTableEntity = this.remoteTableRepository.create({
|
||||
distantTableName: input.name,
|
||||
localTableName,
|
||||
workspaceId,
|
||||
remoteServerId: remoteServer.id,
|
||||
});
|
||||
|
||||
const remoteTableColumns = await this.fetchTableColumnsSchema(
|
||||
remoteServer,
|
||||
input.name,
|
||||
input.schema,
|
||||
);
|
||||
|
||||
await this.createForeignTable(
|
||||
workspaceId,
|
||||
localTableName,
|
||||
input,
|
||||
remoteServer,
|
||||
remoteTableColumns,
|
||||
);
|
||||
|
||||
await this.createRemoteTableMetadata(
|
||||
workspaceId,
|
||||
localTableName,
|
||||
remoteTableColumns,
|
||||
dataSourceMetatada.id,
|
||||
);
|
||||
|
||||
await this.remoteTableRepository.save(remoteTableEntity);
|
||||
|
||||
await this.workspaceCacheVersionService.incrementVersion(workspaceId);
|
||||
|
||||
return {
|
||||
id: remoteTableEntity.id,
|
||||
name: input.name,
|
||||
schema: input.schema,
|
||||
status: RemoteTableStatus.SYNCED,
|
||||
};
|
||||
}
|
||||
|
||||
public async unsyncRemoteTable(input: RemoteTableInput, workspaceId: string) {
|
||||
@ -125,13 +190,19 @@ export class RemoteTableService {
|
||||
throw new NotFoundException('Remote server does not exist');
|
||||
}
|
||||
|
||||
const remoteTableLocalName = getRemoteTableLocalName(input.name);
|
||||
const remoteTable = await this.remoteTableRepository.findOne({
|
||||
where: {
|
||||
distantTableName: input.name,
|
||||
remoteServerId: remoteServer.id,
|
||||
workspaceId,
|
||||
},
|
||||
});
|
||||
|
||||
await this.removeForeignTableAndMetadata(
|
||||
remoteTableLocalName,
|
||||
workspaceId,
|
||||
remoteServer,
|
||||
);
|
||||
if (!remoteTable) {
|
||||
throw new NotFoundException('Remote table does not exist');
|
||||
}
|
||||
|
||||
await this.unsyncOne(workspaceId, remoteTable, remoteServer);
|
||||
|
||||
return {
|
||||
name: input.name,
|
||||
@ -140,7 +211,131 @@ export class RemoteTableService {
|
||||
};
|
||||
}
|
||||
|
||||
public async fetchForeignTableNamesWithinWorkspace(
|
||||
public async unsyncAll(
|
||||
workspaceId: string,
|
||||
remoteServer: RemoteServerEntity<RemoteServerType>,
|
||||
) {
|
||||
const remoteTables = await this.remoteTableRepository.find({
|
||||
where: {
|
||||
remoteServerId: remoteServer.id,
|
||||
workspaceId,
|
||||
},
|
||||
});
|
||||
|
||||
for (const remoteTable of remoteTables) {
|
||||
await this.unsyncOne(workspaceId, remoteTable, remoteServer);
|
||||
}
|
||||
}
|
||||
|
||||
private async unsyncOne(
|
||||
workspaceId: string,
|
||||
remoteTable: RemoteTableEntity,
|
||||
remoteServer: RemoteServerEntity<RemoteServerType>,
|
||||
) {
|
||||
const currentForeignTableNames =
|
||||
await this.fetchForeignTableNamesWithinWorkspace(
|
||||
workspaceId,
|
||||
remoteServer.foreignDataWrapperId,
|
||||
);
|
||||
|
||||
if (!currentForeignTableNames.includes(remoteTable.localTableName)) {
|
||||
throw new NotFoundException('Foreign table does not exist');
|
||||
}
|
||||
|
||||
const objectMetadata =
|
||||
await this.objectMetadataService.findOneWithinWorkspace(workspaceId, {
|
||||
where: { nameSingular: remoteTable.localTableName },
|
||||
});
|
||||
|
||||
if (objectMetadata) {
|
||||
await this.objectMetadataService.deleteOneObject(
|
||||
{ id: objectMetadata.id },
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
await this.workspaceMigrationService.createCustomMigration(
|
||||
generateMigrationName(`drop-foreign-table-${remoteTable.localTableName}`),
|
||||
workspaceId,
|
||||
[
|
||||
{
|
||||
name: remoteTable.localTableName,
|
||||
action: WorkspaceMigrationTableActionType.DROP_FOREIGN_TABLE,
|
||||
},
|
||||
],
|
||||
);
|
||||
|
||||
await this.workspaceMigrationRunnerService.executeMigrationFromPendingMigrations(
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.remoteTableRepository.delete(remoteTable.id);
|
||||
|
||||
await this.workspaceCacheVersionService.incrementVersion(workspaceId);
|
||||
}
|
||||
|
||||
private async fetchTableColumnsSchema(
|
||||
remoteServer: RemoteServerEntity<RemoteServerType>,
|
||||
tableName: string,
|
||||
tableSchema: string,
|
||||
): Promise<RemoteTableColumn[]> {
|
||||
switch (remoteServer.foreignDataWrapperType) {
|
||||
case RemoteServerType.POSTGRES_FDW:
|
||||
await isPostgreSQLIntegrationEnabled(
|
||||
this.featureFlagRepository,
|
||||
remoteServer.workspaceId,
|
||||
);
|
||||
|
||||
return this.remotePostgresTableService.fetchPostgresTableColumnsSchema(
|
||||
remoteServer,
|
||||
tableName,
|
||||
tableSchema,
|
||||
);
|
||||
default:
|
||||
throw new BadRequestException('Unsupported foreign data wrapper type');
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchTablesFromRemoteSchema(
|
||||
remoteServer: RemoteServerEntity<RemoteServerType>,
|
||||
): Promise<RemoteTable[]> {
|
||||
switch (remoteServer.foreignDataWrapperType) {
|
||||
case RemoteServerType.POSTGRES_FDW:
|
||||
await isPostgreSQLIntegrationEnabled(
|
||||
this.featureFlagRepository,
|
||||
remoteServer.workspaceId,
|
||||
);
|
||||
|
||||
return this.remotePostgresTableService.fetchTablesFromRemotePostgresSchema(
|
||||
remoteServer,
|
||||
);
|
||||
default:
|
||||
throw new BadRequestException('Unsupported foreign data wrapper type');
|
||||
}
|
||||
}
|
||||
|
||||
private async validateTableNameDoesNotExists(
|
||||
tableName: string,
|
||||
workspaceId: string,
|
||||
workspaceSchemaName: string,
|
||||
) {
|
||||
const workspaceDataSource =
|
||||
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
const numberOfTablesWithSameName = +(
|
||||
await workspaceDataSource.query(
|
||||
`SELECT count(table_name) FROM information_schema.tables WHERE table_name LIKE '${tableName}' AND table_schema IN ('core', 'metadata', '${workspaceSchemaName}')`,
|
||||
)
|
||||
)[0].count;
|
||||
|
||||
if (numberOfTablesWithSameName > 0) {
|
||||
throw new BadRequestException('Table name is not available.');
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchForeignTableNamesWithinWorkspace(
|
||||
workspaceId: string,
|
||||
foreignDataWrapperId: string,
|
||||
): Promise<string[]> {
|
||||
@ -156,129 +351,79 @@ export class RemoteTableService {
|
||||
).map((foreignTable) => foreignTable.foreign_table_name);
|
||||
}
|
||||
|
||||
public async removeForeignTableAndMetadata(
|
||||
remoteTableLocalName: string,
|
||||
private async createForeignTable(
|
||||
workspaceId: string,
|
||||
localTableName: string,
|
||||
remoteTableInput: RemoteTableInput,
|
||||
remoteServer: RemoteServerEntity<RemoteServerType>,
|
||||
remoteTableColumns: RemoteTableColumn[],
|
||||
) {
|
||||
const currentForeignTableNames =
|
||||
await this.fetchForeignTableNamesWithinWorkspace(
|
||||
workspaceId,
|
||||
remoteServer.foreignDataWrapperId,
|
||||
);
|
||||
|
||||
if (!currentForeignTableNames.includes(remoteTableLocalName)) {
|
||||
throw new Error('Remote table does not exist');
|
||||
}
|
||||
|
||||
const objectMetadata =
|
||||
await this.objectMetadataService.findOneWithinWorkspace(workspaceId, {
|
||||
where: { nameSingular: remoteTableLocalName },
|
||||
});
|
||||
|
||||
if (objectMetadata) {
|
||||
await this.objectMetadataService.deleteOneObject(
|
||||
{ id: objectMetadata.id },
|
||||
workspaceId,
|
||||
if (!remoteTableInput.schema) {
|
||||
throw new BadRequestException(
|
||||
'Schema is required for creating foreign table',
|
||||
);
|
||||
}
|
||||
|
||||
await this.workspaceMigrationService.createCustomMigration(
|
||||
generateMigrationName(`drop-foreign-table-${remoteTableLocalName}`),
|
||||
workspaceId,
|
||||
[
|
||||
{
|
||||
name: remoteTableLocalName,
|
||||
action: WorkspaceMigrationTableActionType.DROP_FOREIGN_TABLE,
|
||||
},
|
||||
],
|
||||
);
|
||||
const workspaceMigration =
|
||||
await this.workspaceMigrationService.createCustomMigration(
|
||||
generateMigrationName(`create-foreign-table-${localTableName}`),
|
||||
workspaceId,
|
||||
[
|
||||
{
|
||||
name: localTableName,
|
||||
action: WorkspaceMigrationTableActionType.CREATE_FOREIGN_TABLE,
|
||||
foreignTable: {
|
||||
columns: remoteTableColumns.map(
|
||||
(column) =>
|
||||
({
|
||||
columnName: column.columnName,
|
||||
columnType: column.dataType,
|
||||
}) satisfies WorkspaceMigrationColumnDefinition,
|
||||
),
|
||||
referencedTableName: remoteTableInput.name,
|
||||
referencedTableSchema: remoteTableInput.schema,
|
||||
foreignDataWrapperId: remoteServer.foreignDataWrapperId,
|
||||
} satisfies WorkspaceMigrationForeignTable,
|
||||
},
|
||||
],
|
||||
);
|
||||
|
||||
await this.workspaceMigrationRunnerService.executeMigrationFromPendingMigrations(
|
||||
workspaceId,
|
||||
);
|
||||
// TODO: This should be done in a transaction. Waiting for a global refactoring of transaction management.
|
||||
try {
|
||||
await this.workspaceMigrationRunnerService.executeMigrationFromPendingMigrations(
|
||||
workspaceId,
|
||||
);
|
||||
} catch (exception) {
|
||||
this.workspaceMigrationService.deleteById(workspaceMigration.id);
|
||||
|
||||
await this.workspaceCacheVersionService.incrementVersion(workspaceId);
|
||||
throw new BadRequestException(
|
||||
'Could not create foreign table. Please check if the table already exists.',
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async createForeignTableAndMetadata(
|
||||
input: RemoteTableInput,
|
||||
remoteServer: RemoteServerEntity<RemoteServerType>,
|
||||
private async createRemoteTableMetadata(
|
||||
workspaceId: string,
|
||||
): Promise<RemoteTableDTO> {
|
||||
if (!input.schema) {
|
||||
throw new Error('Schema is required for syncing remote table');
|
||||
}
|
||||
|
||||
const currentForeignTableNames =
|
||||
await this.fetchForeignTableNamesWithinWorkspace(
|
||||
workspaceId,
|
||||
remoteServer.foreignDataWrapperId,
|
||||
);
|
||||
|
||||
if (
|
||||
currentForeignTableNames.includes(getRemoteTableLocalName(input.name))
|
||||
) {
|
||||
throw new Error('Remote table already exists');
|
||||
}
|
||||
|
||||
const remoteTableColumns = await this.fetchTableColumnsSchema(
|
||||
remoteServer,
|
||||
input.name,
|
||||
input.schema,
|
||||
);
|
||||
|
||||
const remoteTableLocalName = getRemoteTableLocalName(input.name);
|
||||
const remoteTableLabel = camelToTitleCase(remoteTableLocalName);
|
||||
|
||||
localTableName: string,
|
||||
remoteTableColumns: RemoteTableColumn[],
|
||||
dataSourceMetadataId: string,
|
||||
) {
|
||||
// We only support remote tables with an id column for now.
|
||||
const remoteTableIdColumn = remoteTableColumns.filter(
|
||||
(column) => column.columnName === 'id',
|
||||
)?.[0];
|
||||
|
||||
if (!remoteTableIdColumn) {
|
||||
throw new Error('Remote table must have an id column');
|
||||
throw new BadRequestException('Remote table must have an id column');
|
||||
}
|
||||
|
||||
const dataSourceMetatada =
|
||||
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.workspaceMigrationService.createCustomMigration(
|
||||
generateMigrationName(`create-foreign-table-${remoteTableLocalName}`),
|
||||
workspaceId,
|
||||
[
|
||||
{
|
||||
name: remoteTableLocalName,
|
||||
action: WorkspaceMigrationTableActionType.CREATE_FOREIGN_TABLE,
|
||||
foreignTable: {
|
||||
columns: remoteTableColumns.map(
|
||||
(column) =>
|
||||
({
|
||||
columnName: column.columnName,
|
||||
columnType: column.dataType,
|
||||
}) satisfies WorkspaceMigrationColumnDefinition,
|
||||
),
|
||||
referencedTableName: input.name,
|
||||
referencedTableSchema: input.schema,
|
||||
foreignDataWrapperId: remoteServer.foreignDataWrapperId,
|
||||
} satisfies WorkspaceMigrationForeignTable,
|
||||
},
|
||||
],
|
||||
);
|
||||
|
||||
await this.workspaceMigrationRunnerService.executeMigrationFromPendingMigrations(
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
const objectMetadata = await this.objectMetadataService.createOne({
|
||||
nameSingular: remoteTableLocalName,
|
||||
namePlural: `${remoteTableLocalName}s`,
|
||||
labelSingular: remoteTableLabel,
|
||||
labelPlural: `${remoteTableLabel}s`,
|
||||
nameSingular: localTableName,
|
||||
namePlural: plural(localTableName),
|
||||
labelSingular: camelToTitleCase(camelCase(localTableName)),
|
||||
labelPlural: camelToTitleCase(plural(camelCase(localTableName))),
|
||||
description: 'Remote table',
|
||||
dataSourceId: dataSourceMetatada.id,
|
||||
dataSourceId: dataSourceMetadataId,
|
||||
workspaceId: workspaceId,
|
||||
icon: 'IconPlug',
|
||||
isRemote: true,
|
||||
@ -305,53 +450,5 @@ export class RemoteTableService {
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
await this.workspaceCacheVersionService.incrementVersion(workspaceId);
|
||||
|
||||
return {
|
||||
name: input.name,
|
||||
schema: input.schema,
|
||||
status: RemoteTableStatus.SYNCED,
|
||||
};
|
||||
}
|
||||
|
||||
private async fetchTableColumnsSchema(
|
||||
remoteServer: RemoteServerEntity<RemoteServerType>,
|
||||
tableName: string,
|
||||
tableSchema: string,
|
||||
): Promise<RemoteTableColumn[]> {
|
||||
switch (remoteServer.foreignDataWrapperType) {
|
||||
case RemoteServerType.POSTGRES_FDW:
|
||||
await isPostgreSQLIntegrationEnabled(
|
||||
this.featureFlagRepository,
|
||||
remoteServer.workspaceId,
|
||||
);
|
||||
|
||||
return this.remotePostgresTableService.fetchPostgresTableColumnsSchema(
|
||||
remoteServer,
|
||||
tableName,
|
||||
tableSchema,
|
||||
);
|
||||
default:
|
||||
throw new Error('Unsupported foreign data wrapper type');
|
||||
}
|
||||
}
|
||||
|
||||
private async fetchTablesFromRemoteSchema(
|
||||
remoteServer: RemoteServerEntity<RemoteServerType>,
|
||||
): Promise<RemoteTable[]> {
|
||||
switch (remoteServer.foreignDataWrapperType) {
|
||||
case RemoteServerType.POSTGRES_FDW:
|
||||
await isPostgreSQLIntegrationEnabled(
|
||||
this.featureFlagRepository,
|
||||
remoteServer.workspaceId,
|
||||
);
|
||||
|
||||
return this.remotePostgresTableService.fetchTablesFromRemotePostgresSchema(
|
||||
remoteServer,
|
||||
);
|
||||
default:
|
||||
throw new Error('Unsupported foreign data wrapper type');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
import { singular } from 'pluralize';
|
||||
|
||||
import { camelCase } from 'src/utils/camel-case';
|
||||
|
||||
export const getRemoteTableLocalName = (distantTableName: string) =>
|
||||
`${camelCase(distantTableName)}Remote`;
|
||||
singular(camelCase(distantTableName));
|
||||
|
||||
@ -61,7 +61,7 @@ export class WorkspaceMigrationService {
|
||||
workspaceId: string,
|
||||
migrations: WorkspaceMigrationTableAction[],
|
||||
) {
|
||||
await this.workspaceMigrationRepository.save({
|
||||
return this.workspaceMigrationRepository.save({
|
||||
name,
|
||||
migrations,
|
||||
workspaceId,
|
||||
@ -69,7 +69,11 @@ export class WorkspaceMigrationService {
|
||||
});
|
||||
}
|
||||
|
||||
public async delete(workspaceId: string) {
|
||||
public async deleteAllWithinWorkspace(workspaceId: string) {
|
||||
await this.workspaceMigrationRepository.delete({ workspaceId });
|
||||
}
|
||||
|
||||
public async deleteById(id: string) {
|
||||
await this.workspaceMigrationRepository.delete({ id });
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user