Compare distant tables schema with remote tables schema (#5413)
Closes #4532 and part of #5062
This commit is contained in:
@ -126,7 +126,7 @@ export class RemoteServerService<T extends RemoteServerType> {
|
||||
}
|
||||
|
||||
const currentRemoteTablesForServer =
|
||||
await this.remoteTableService.findCurrentRemoteTablesByServerId({
|
||||
await this.remoteTableService.findRemoteTablesByServerId({
|
||||
remoteServerId: remoteServer.id,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
@ -23,10 +23,10 @@ export class DistantTableService {
|
||||
>,
|
||||
) {}
|
||||
|
||||
public async fetchDistantTableColumns(
|
||||
public getDistantTableColumns(
|
||||
remoteServer: RemoteServerEntity<RemoteServerType>,
|
||||
tableName: string,
|
||||
): Promise<DistantTableColumn[]> {
|
||||
): DistantTableColumn[] {
|
||||
if (!remoteServer.availableTables) {
|
||||
throw new BadRequestException(
|
||||
'Remote server available tables are not defined',
|
||||
@ -36,15 +36,16 @@ export class DistantTableService {
|
||||
return remoteServer.availableTables[tableName];
|
||||
}
|
||||
|
||||
public async fetchDistantTableNames(
|
||||
public async fetchDistantTables(
|
||||
remoteServer: RemoteServerEntity<RemoteServerType>,
|
||||
workspaceId: string,
|
||||
): Promise<string[]> {
|
||||
const availableTables =
|
||||
remoteServer.availableTables ??
|
||||
(await this.createAvailableTables(remoteServer, workspaceId));
|
||||
refreshData?: boolean,
|
||||
): Promise<DistantTables> {
|
||||
if (!refreshData && remoteServer.availableTables) {
|
||||
return remoteServer.availableTables;
|
||||
}
|
||||
|
||||
return Object.keys(availableTables);
|
||||
return await this.createAvailableTables(remoteServer, workspaceId);
|
||||
}
|
||||
|
||||
private async createAvailableTables(
|
||||
|
||||
@ -0,0 +1,17 @@
|
||||
import { InputType, ID, Field } from '@nestjs/graphql';
|
||||
|
||||
import { IDField } from '@ptc-org/nestjs-query-graphql';
|
||||
import { IsOptional } from 'class-validator';
|
||||
|
||||
@InputType()
|
||||
export class FindManyRemoteTablesInput {
|
||||
@IDField(() => ID, { description: 'The id of the remote server.' })
|
||||
id!: string;
|
||||
|
||||
@IsOptional()
|
||||
@Field(() => Boolean, {
|
||||
description: 'Indicates if data from distant tables should be refreshed.',
|
||||
nullable: true,
|
||||
})
|
||||
refreshData?: boolean;
|
||||
}
|
||||
@ -1,7 +1,7 @@
|
||||
import { ObjectType, Field, registerEnumType } from '@nestjs/graphql';
|
||||
|
||||
import { IDField } from '@ptc-org/nestjs-query-graphql';
|
||||
import { IsEnum, IsOptional } from 'class-validator';
|
||||
import { IsOptional } from 'class-validator';
|
||||
|
||||
import { UUIDScalarType } from 'src/engine/api/graphql/workspace-schema-builder/graphql-types/scalars';
|
||||
|
||||
@ -10,11 +10,23 @@ export enum RemoteTableStatus {
|
||||
NOT_SYNCED = 'NOT_SYNCED',
|
||||
}
|
||||
|
||||
export enum TableUpdate {
|
||||
TABLE_DELETED = 'TABLE_DELETED',
|
||||
COLUMNS_DELETED = 'COLUMN_DELETED',
|
||||
COLUMNS_ADDED = 'COLUMN_ADDED',
|
||||
COLUMNS_TYPE_CHANGED = 'COLUMN_TYPE_CHANGED',
|
||||
}
|
||||
|
||||
registerEnumType(RemoteTableStatus, {
|
||||
name: 'RemoteTableStatus',
|
||||
description: 'Status of the table',
|
||||
});
|
||||
|
||||
registerEnumType(TableUpdate, {
|
||||
name: 'TableUpdate',
|
||||
description: 'Schema update on a table',
|
||||
});
|
||||
|
||||
@ObjectType('RemoteTable')
|
||||
export class RemoteTableDTO {
|
||||
@IDField(() => UUIDScalarType, { nullable: true })
|
||||
@ -23,11 +35,14 @@ export class RemoteTableDTO {
|
||||
@Field(() => String)
|
||||
name: string;
|
||||
|
||||
@IsEnum(RemoteTableStatus)
|
||||
@Field(() => RemoteTableStatus)
|
||||
status: RemoteTableStatus;
|
||||
|
||||
@IsOptional()
|
||||
@Field(() => String, { nullable: true })
|
||||
schema?: string;
|
||||
|
||||
@IsOptional()
|
||||
@Field(() => [TableUpdate], { nullable: true })
|
||||
schemaPendingUpdates?: [TableUpdate];
|
||||
}
|
||||
|
||||
@ -4,7 +4,7 @@ import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator';
|
||||
import { JwtAuthGuard } from 'src/engine/guards/jwt.auth.guard';
|
||||
import { RemoteServerIdInput } from 'src/engine/metadata-modules/remote-server/dtos/remote-server-id.input';
|
||||
import { FindManyRemoteTablesInput } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/find-many-remote-tables-input';
|
||||
import { RemoteTableInput } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table-input';
|
||||
import { RemoteTableDTO } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
|
||||
import { RemoteTableService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.service';
|
||||
@ -16,12 +16,13 @@ export class RemoteTableResolver {
|
||||
|
||||
@Query(() => [RemoteTableDTO])
|
||||
async findAvailableRemoteTablesByServerId(
|
||||
@Args('input') { id }: RemoteServerIdInput,
|
||||
@Args('input') input: FindManyRemoteTablesInput,
|
||||
@AuthWorkspace() { id: workspaceId }: Workspace,
|
||||
) {
|
||||
return this.remoteTableService.findAvailableRemoteTablesByServerId(
|
||||
id,
|
||||
return this.remoteTableService.findDistantTablesByServerId(
|
||||
input.id,
|
||||
workspaceId,
|
||||
input.refreshData,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@ -8,7 +8,10 @@ import {
|
||||
RemoteServerType,
|
||||
RemoteServerEntity,
|
||||
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
|
||||
import { RemoteTableStatus } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
|
||||
import {
|
||||
RemoteTableStatus,
|
||||
TableUpdate,
|
||||
} from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
|
||||
import {
|
||||
mapUdtNameToFieldType,
|
||||
mapUdtNameToFieldSettings,
|
||||
@ -36,6 +39,8 @@ import { RemoteTableEntity } from 'src/engine/metadata-modules/remote-server/rem
|
||||
import { getRemoteTableLocalName } from 'src/engine/metadata-modules/remote-server/remote-table/utils/get-remote-table-local-name.util';
|
||||
import { DistantTableService } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/distant-table.service';
|
||||
import { DistantTableColumn } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/types/distant-table-column';
|
||||
import { DistantTables } from 'src/engine/metadata-modules/remote-server/remote-table/distant-table/types/distant-table';
|
||||
import { RemoteTableSchemaColumn } from 'src/engine/metadata-modules/remote-server/remote-table/types/remote-table-schema-column';
|
||||
|
||||
export class RemoteTableService {
|
||||
private readonly logger = new Logger(RemoteTableService.name);
|
||||
@ -57,9 +62,10 @@ export class RemoteTableService {
|
||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||
) {}
|
||||
|
||||
public async findAvailableRemoteTablesByServerId(
|
||||
public async findDistantTablesByServerId(
|
||||
id: string,
|
||||
workspaceId: string,
|
||||
refreshData?: boolean,
|
||||
) {
|
||||
const remoteServer = await this.remoteServerRepository.findOne({
|
||||
where: {
|
||||
@ -72,7 +78,7 @@ export class RemoteTableService {
|
||||
throw new NotFoundException('Remote server does not exist');
|
||||
}
|
||||
|
||||
const currentRemoteTables = await this.findCurrentRemoteTablesByServerId({
|
||||
const currentRemoteTables = await this.findRemoteTablesByServerId({
|
||||
remoteServerId: id,
|
||||
workspaceId,
|
||||
});
|
||||
@ -81,22 +87,113 @@ export class RemoteTableService {
|
||||
(remoteTable) => remoteTable.distantTableName,
|
||||
);
|
||||
|
||||
const distantTableNames =
|
||||
await this.distantTableService.fetchDistantTableNames(
|
||||
remoteServer,
|
||||
workspaceId,
|
||||
const distantTables = await this.distantTableService.fetchDistantTables(
|
||||
remoteServer,
|
||||
workspaceId,
|
||||
refreshData,
|
||||
);
|
||||
|
||||
if (!refreshData || currentRemoteTables.length === 0) {
|
||||
const distantTablesWithStatus = Object.keys(distantTables).map(
|
||||
(tableName) => ({
|
||||
name: tableName,
|
||||
schema: remoteServer.schema,
|
||||
status: currentRemoteTableDistantNames.includes(tableName)
|
||||
? RemoteTableStatus.SYNCED
|
||||
: RemoteTableStatus.NOT_SYNCED,
|
||||
}),
|
||||
);
|
||||
|
||||
return distantTableNames.map((tableName) => ({
|
||||
name: tableName,
|
||||
schema: remoteServer.schema,
|
||||
status: currentRemoteTableDistantNames.includes(tableName)
|
||||
? RemoteTableStatus.SYNCED
|
||||
: RemoteTableStatus.NOT_SYNCED,
|
||||
}));
|
||||
return distantTablesWithStatus;
|
||||
}
|
||||
|
||||
const schemaPendingUpdates =
|
||||
await this.getSchemaUpdatesBetweenRemoteAndDistantTables({
|
||||
workspaceId,
|
||||
remoteTables: currentRemoteTables,
|
||||
distantTables,
|
||||
});
|
||||
|
||||
const distantTablesWithUpdates = Object.keys(distantTables).map(
|
||||
(tableName) => ({
|
||||
name: tableName,
|
||||
schema: remoteServer.schema,
|
||||
status: currentRemoteTableDistantNames.includes(tableName)
|
||||
? RemoteTableStatus.SYNCED
|
||||
: RemoteTableStatus.NOT_SYNCED,
|
||||
schemaPendingUpdates: schemaPendingUpdates[tableName],
|
||||
}),
|
||||
);
|
||||
|
||||
const deletedTables = Object.entries(schemaPendingUpdates)
|
||||
.filter(([_tableName, updates]) =>
|
||||
updates.includes(TableUpdate.TABLE_DELETED),
|
||||
)
|
||||
.map(([tableName, updates]) => ({
|
||||
name: tableName,
|
||||
schema: remoteServer.schema,
|
||||
status: RemoteTableStatus.SYNCED,
|
||||
schemaPendingUpdates: updates,
|
||||
}));
|
||||
|
||||
return distantTablesWithUpdates.concat(deletedTables);
|
||||
}
|
||||
|
||||
public async findCurrentRemoteTablesByServerId({
|
||||
private async getSchemaUpdatesBetweenRemoteAndDistantTables({
|
||||
workspaceId,
|
||||
remoteTables,
|
||||
distantTables,
|
||||
}: {
|
||||
workspaceId: string;
|
||||
remoteTables: RemoteTableEntity[];
|
||||
distantTables: DistantTables;
|
||||
}): Promise<{ [tablename: string]: TableUpdate[] }> {
|
||||
const updates = {};
|
||||
|
||||
for (const remoteTable of remoteTables) {
|
||||
const distantTable = distantTables[remoteTable.distantTableName];
|
||||
const tableName = remoteTable.distantTableName;
|
||||
|
||||
if (!distantTable) {
|
||||
updates[tableName] = [TableUpdate.TABLE_DELETED];
|
||||
continue;
|
||||
}
|
||||
|
||||
const distantColumnNames = new Set(
|
||||
distantTable.map((column) => column.columnName),
|
||||
);
|
||||
const localColumnNames = new Set(
|
||||
(
|
||||
await this.fetchTableColumns(workspaceId, remoteTable.localTableName)
|
||||
).map((column) => column.column_name),
|
||||
);
|
||||
|
||||
const columnsAdded = [...distantColumnNames].filter(
|
||||
(columnName) => !localColumnNames.has(columnName),
|
||||
);
|
||||
|
||||
const columnsDeleted = [...localColumnNames].filter(
|
||||
(columnName) => !distantColumnNames.has(columnName),
|
||||
);
|
||||
|
||||
if (columnsAdded.length > 0) {
|
||||
updates[tableName] = [
|
||||
...(updates[tableName] || []),
|
||||
TableUpdate.COLUMNS_ADDED,
|
||||
];
|
||||
}
|
||||
if (columnsDeleted.length > 0) {
|
||||
updates[tableName] = [
|
||||
...(updates[tableName] || []),
|
||||
TableUpdate.COLUMNS_DELETED,
|
||||
];
|
||||
}
|
||||
}
|
||||
|
||||
return updates;
|
||||
}
|
||||
|
||||
public async findRemoteTablesByServerId({
|
||||
remoteServerId,
|
||||
workspaceId,
|
||||
}: {
|
||||
@ -156,11 +253,10 @@ export class RemoteTableService {
|
||||
remoteServerId: remoteServer.id,
|
||||
});
|
||||
|
||||
const distantTableColumns =
|
||||
await this.distantTableService.fetchDistantTableColumns(
|
||||
remoteServer,
|
||||
input.name,
|
||||
);
|
||||
const distantTableColumns = this.distantTableService.getDistantTableColumns(
|
||||
remoteServer,
|
||||
input.name,
|
||||
);
|
||||
|
||||
// We only support remote tables with an id column for now.
|
||||
const distantTableIdColumn = distantTableColumns.find(
|
||||
@ -332,6 +428,25 @@ export class RemoteTableService {
|
||||
).map((foreignTable) => foreignTable.foreign_table_name);
|
||||
}
|
||||
|
||||
private async fetchTableColumns(
|
||||
workspaceId: string,
|
||||
tableName: string,
|
||||
): Promise<RemoteTableSchemaColumn[]> {
|
||||
const workspaceDataSource =
|
||||
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
const schemaName =
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
return await workspaceDataSource.query(
|
||||
`SELECT column_name, data_type
|
||||
FROM information_schema.columns
|
||||
WHERE table_schema = '${schemaName}' AND table_name = '${tableName}'`,
|
||||
);
|
||||
}
|
||||
|
||||
private async createForeignTable(
|
||||
workspaceId: string,
|
||||
localTableName: string,
|
||||
|
||||
@ -0,0 +1,4 @@
|
||||
export type RemoteTableSchemaColumn = {
|
||||
column_name: string;
|
||||
data_type: string;
|
||||
};
|
||||
Reference in New Issue
Block a user