Sync remote object (#4713)

* Sync objects

* Generate data for isRemote

* Add cache version update

* Add label identifier + fix field metadata input

---------

Co-authored-by: Thomas Trompette <thomast@twenty.com>
This commit is contained in:
Thomas Trompette
2024-03-29 18:23:58 +01:00
committed by GitHub
parent 7f3623239a
commit 1d351a29b8
22 changed files with 777 additions and 379 deletions

View File

@ -0,0 +1,21 @@
import { InputType, Field, ID } from '@nestjs/graphql';
import { IsEnum } from 'class-validator';
import { RemoteTableStatus } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
@InputType()
export class RemoteTableInput {
@Field(() => ID)
remoteServerId: string;
@Field(() => String)
name: string;
@IsEnum(RemoteTableStatus)
@Field(() => RemoteTableStatus)
status: RemoteTableStatus;
@Field(() => String)
schema: string;
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { RemotePostgresTableService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/remote-postgres-table.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [RemotePostgresTableService],
exports: [RemotePostgresTableService],
})
export class RemotePostgresTableModule {}

View File

@ -0,0 +1,106 @@
import { Injectable } from '@nestjs/common';
import { DataSource } from 'typeorm';
import {
RemoteServerEntity,
RemoteServerType,
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import { RemoteTableStatus } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
import {
buildPostgresUrl,
EXCLUDED_POSTGRES_SCHEMAS,
} from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/utils/remote-postgres-table.util';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
@Injectable()
export class RemotePostgresTableService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly environmentService: EnvironmentService,
) {}
public async findAvailableRemotePostgresTables(
workspaceId: string,
remoteServer: RemoteServerEntity<RemoteServerType>,
) {
const remotePostgresTables =
await this.fetchTablesFromRemotePostgresSchema(remoteServer);
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
const currentForeignTableNames = (
await workspaceDataSource.query(
`SELECT foreign_table_name FROM information_schema.foreign_tables`,
)
).map((foreignTable) => foreignTable.foreign_table_name);
return remotePostgresTables.map((remoteTable) => ({
name: remoteTable.table_name,
schema: remoteTable.table_schema,
status: currentForeignTableNames.includes(remoteTable.table_name)
? RemoteTableStatus.SYNCED
: RemoteTableStatus.NOT_SYNCED,
}));
}
public async fetchPostgresTableColumnsSchema(
remoteServer: RemoteServerEntity<RemoteServerType>,
tableName: string,
tableSchema: string,
) {
const dataSource = new DataSource({
url: buildPostgresUrl(
this.environmentService.get('LOGIN_TOKEN_SECRET'),
remoteServer,
),
type: 'postgres',
logging: true,
});
await dataSource.initialize();
const columns = await dataSource.query(
`SELECT column_name, data_type, udt_name FROM information_schema.columns WHERE table_name = '${tableName}' AND table_schema = '${tableSchema}'`,
);
await dataSource.destroy();
return columns;
}
private async fetchTablesFromRemotePostgresSchema(
remoteServer: RemoteServerEntity<RemoteServerType>,
) {
const dataSource = new DataSource({
url: buildPostgresUrl(
this.environmentService.get('LOGIN_TOKEN_SECRET'),
remoteServer,
),
type: 'postgres',
logging: true,
});
await dataSource.initialize();
const schemaNames = await dataSource.query(
`SELECT schema_name FROM information_schema.schemata where schema_name not in ( ${EXCLUDED_POSTGRES_SCHEMAS.map(
(schema) => `'${schema}'`,
).join(', ')} ) order by schema_name limit 1`,
);
const remotePostgresTables = await dataSource.query(
`SELECT table_name, table_schema FROM information_schema.tables WHERE table_schema IN (${schemaNames
.map((schemaName) => `'${schemaName.schema_name}'`)
.join(', ')})`,
);
await dataSource.destroy();
return remotePostgresTables;
}
}

View File

@ -0,0 +1,65 @@
import { Repository } from 'typeorm/repository/Repository';
import { decryptText } from 'src/engine/core-modules/auth/auth.util';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { FieldMetadataType } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity';
import {
RemoteServerEntity,
RemoteServerType,
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
export const EXCLUDED_POSTGRES_SCHEMAS = [
'information_schema',
'pg_catalog',
'pg_toast',
];
export const buildPostgresUrl = (
secretKey: string,
remoteServer: RemoteServerEntity<RemoteServerType>,
): string => {
const foreignDataWrapperOptions = remoteServer.foreignDataWrapperOptions;
const userMappingOptions = remoteServer.userMappingOptions;
const password = decryptText(userMappingOptions.password, secretKey);
const url = `postgres://${userMappingOptions.username}:${password}@${foreignDataWrapperOptions.host}:${foreignDataWrapperOptions.port}/${foreignDataWrapperOptions.dbname}`;
return url;
};
export const mapUdtNameToFieldType = (udtName: string): FieldMetadataType => {
switch (udtName) {
case 'uuid':
return FieldMetadataType.UUID;
case 'varchar':
return FieldMetadataType.TEXT;
case 'bool':
return FieldMetadataType.BOOLEAN;
case 'timestamp':
case 'timestamptz':
return FieldMetadataType.DATE_TIME;
default:
return FieldMetadataType.TEXT;
}
};
export const isPostgreSQLIntegrationEnabled = async (
featureFlagRepository: Repository<FeatureFlagEntity>,
workspaceId: string,
) => {
const featureFlag = await featureFlagRepository.findOneBy({
workspaceId,
key: FeatureFlagKeys.IsPostgreSQLIntegrationEnabled,
value: true,
});
const featureFlagEnabled = featureFlag && featureFlag.value;
if (!featureFlagEnabled) {
throw new Error('PostgreSQL integration is not enabled');
}
};

View File

@ -1,15 +1,27 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-source.module';
import { FieldMetadataModule } from 'src/engine/metadata-modules/field-metadata/field-metadata.module';
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
import { RemoteServerEntity } from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import { RemotePostgresTableModule } from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/remote-postgres-table.module';
import { RemoteTableResolver } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.resolver';
import { RemoteTableService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.service';
import { WorkspaceCacheVersionModule } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.module';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [
TypeOrmModule.forFeature([RemoteServerEntity], 'metadata'),
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
WorkspaceDataSourceModule,
DataSourceModule,
ObjectMetadataModule,
FieldMetadataModule,
RemotePostgresTableModule,
WorkspaceCacheVersionModule,
],
providers: [RemoteTableService, RemoteTableResolver],
})

View File

@ -1,10 +1,11 @@
import { UseGuards } from '@nestjs/common';
import { Args, Query, Resolver } from '@nestjs/graphql';
import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator';
import { JwtAuthGuard } from 'src/engine/guards/jwt.auth.guard';
import { RemoteServerIdInput } from 'src/engine/metadata-modules/remote-server/dtos/remote-server-id.input';
import { RemoteTableInput } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table-input';
import { RemoteTableDTO } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
import { RemoteTableService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-table.service';
@ -23,4 +24,15 @@ export class RemoteTableResolver {
workspaceId,
);
}
@Mutation(() => RemoteTableDTO)
async updateRemoteTableSyncStatus(
@Args('input') input: RemoteTableInput,
@AuthWorkspace() { id: workspaceId }: Workspace,
) {
return this.remoteTableService.updateRemoteTableSyncStatus(
input,
workspaceId,
);
}
}

View File

@ -8,12 +8,23 @@ import {
RemoteServerEntity,
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
import { RemoteTableStatus } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table.dto';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import {
EXCLUDED_POSTGRES_SCHEMAS,
buildPostgresUrl,
} from 'src/engine/metadata-modules/remote-server/remote-table/utils/remote-table-postgres.util';
isPostgreSQLIntegrationEnabled,
mapUdtNameToFieldType,
} from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/utils/remote-postgres-table.util';
import { RemoteTableInput } from 'src/engine/metadata-modules/remote-server/remote-table/dtos/remote-table-input';
import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service';
import { ObjectMetadataService } from 'src/engine/metadata-modules/object-metadata/object-metadata.service';
import { CreateObjectInput } from 'src/engine/metadata-modules/object-metadata/dtos/create-object.input';
import { FieldMetadataService } from 'src/engine/metadata-modules/field-metadata/field-metadata.service';
import { CreateFieldInput } from 'src/engine/metadata-modules/field-metadata/dtos/create-field.input';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { RemotePostgresTableService } from 'src/engine/metadata-modules/remote-server/remote-table/remote-postgres-table/remote-postgres-table.service';
import { snakeCase } from 'src/utils/snake-case';
import { capitalize } from 'src/utils/capitalize';
import { WorkspaceCacheVersionService } from 'src/engine/metadata-modules/workspace-cache-version/workspace-cache-version.service';
export class RemoteTableService {
constructor(
@ -21,8 +32,14 @@ export class RemoteTableService {
private readonly remoteServerRepository: Repository<
RemoteServerEntity<RemoteServerType>
>,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly environmentService: EnvironmentService,
private readonly workspaceCacheVersionService: WorkspaceCacheVersionService,
private readonly dataSourceService: DataSourceService,
private readonly objectMetadataService: ObjectMetadataService,
private readonly fieldMetadataService: FieldMetadataService,
private readonly remotePostgresTableService: RemotePostgresTableService,
) {}
public async findAvailableRemoteTablesByServerId(
@ -42,7 +59,12 @@ export class RemoteTableService {
switch (remoteServer.foreignDataWrapperType) {
case RemoteServerType.POSTGRES_FDW:
return this.findAvailableRemotePostgresTables(
await isPostgreSQLIntegrationEnabled(
this.featureFlagRepository,
workspaceId,
);
return this.remotePostgresTableService.findAvailableRemotePostgresTables(
workspaceId,
remoteServer,
);
@ -51,62 +73,171 @@ export class RemoteTableService {
}
}
// TODO: may be moved into a separated postgres table service once we have more use cases
private async findAvailableRemotePostgresTables(
public async updateRemoteTableSyncStatus(
input: RemoteTableInput,
workspaceId: string,
remoteServer: RemoteServerEntity<RemoteServerType>,
) {
const remotePostgresTables =
await this.fetchTablesFromRemotePostgresSchema(remoteServer);
const remoteServer = await this.remoteServerRepository.findOne({
where: {
id: input.remoteServerId,
workspaceId,
},
});
if (!remoteServer) {
throw new NotFoundException('Remote server does not exist');
}
const dataSourcesMetatada =
await this.dataSourceService.getDataSourcesMetadataFromWorkspaceId(
workspaceId,
);
if (!dataSourcesMetatada) {
throw new NotFoundException('Workspace data source does not exist');
}
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
const currentForeignTableNames = (
await workspaceDataSource.query(
`SELECT foreign_table_name FROM information_schema.foreign_tables`,
)
).map((foreignTable) => foreignTable.foreign_table_name);
switch (input.status) {
case RemoteTableStatus.SYNCED:
await this.buildForeignTableAndMetadata(
input,
remoteServer,
workspaceId,
workspaceDataSource,
dataSourcesMetatada[0],
);
break;
case RemoteTableStatus.NOT_SYNCED:
await this.removeForeignTableAndMetadata(
input,
workspaceId,
workspaceDataSource,
dataSourcesMetatada[0].schema,
);
break;
default:
throw new Error('Unsupported remote table status');
}
return remotePostgresTables.map((remoteTable) => ({
name: remoteTable.table_name,
schema: remoteTable.table_schema,
status: currentForeignTableNames.includes(remoteTable.table_name)
? RemoteTableStatus.SYNCED
: RemoteTableStatus.NOT_SYNCED,
}));
await this.workspaceCacheVersionService.incrementVersion(workspaceId);
return input;
}
private async fetchTablesFromRemotePostgresSchema(
private async buildForeignTableAndMetadata(
input: RemoteTableInput,
remoteServer: RemoteServerEntity<RemoteServerType>,
workspaceId: string,
workspaceDataSource: DataSource,
dataSourceMetadata: DataSourceEntity,
) {
const dataSource = new DataSource({
url: buildPostgresUrl(
this.environmentService.get('LOGIN_TOKEN_SECRET'),
remoteServer,
),
type: 'postgres',
logging: true,
});
const localSchema = dataSourceMetadata.schema;
await dataSource.initialize();
const schemaNames = await dataSource.query(
`SELECT schema_name FROM information_schema.schemata where schema_name not in ( ${EXCLUDED_POSTGRES_SCHEMAS.map(
(schema) => `'${schema}'`,
).join(', ')} ) order by schema_name limit 1`,
// TODO: Add strong typing for remote table columns. Will be done when we have another use case than Postgres
const remoteTableColumns = await this.fetchTableColumnsSchema(
remoteServer,
input.name,
input.schema,
);
const remotePostgresTables = await dataSource.query(
`SELECT table_name, table_schema FROM information_schema.tables WHERE table_schema IN (${schemaNames
.map((schemaName) => `'${schemaName.schema_name}'`)
.join(', ')})`,
const foreignTableColumns = remoteTableColumns
.map((column) => `"${column.column_name}" ${column.data_type}`)
.join(', ');
await workspaceDataSource.query(
`CREATE FOREIGN TABLE ${localSchema}."${input.name}Remote" (${foreignTableColumns}) SERVER "${remoteServer.foreignDataWrapperId}" OPTIONS (schema_name '${input.schema}', table_name '${input.name}')`,
);
await workspaceDataSource.query(
`COMMENT ON FOREIGN TABLE ${localSchema}."${input.name}Remote" IS e'@graphql({"primary_key_columns": ["id"], "totalCount": {"enabled": true}})'`,
);
await dataSource.destroy();
// Should be done in a transaction. To be discussed
const objectMetadata = await this.objectMetadataService.createOne({
nameSingular: `${input.name}Remote`,
namePlural: `${input.name}Remotes`,
labelSingular: `${capitalize(snakeCase(input.name)).replace(
/_/g,
' ',
)} remote`,
labelPlural: `${capitalize(snakeCase(input.name)).replace(
/_/g,
' ',
)} remotes`,
description: 'Remote table',
dataSourceId: dataSourceMetadata.id,
workspaceId: workspaceId,
icon: 'IconUser',
isRemote: true,
} as CreateObjectInput);
return remotePostgresTables;
for (const column of remoteTableColumns) {
const field = await this.fieldMetadataService.createOne({
name: column.column_name,
label: capitalize(snakeCase(column.column_name)).replace(/_/g, ' '),
description: 'Field of remote',
// TODO: function should work for other types than Postgres
type: mapUdtNameToFieldType(column.udt_name),
workspaceId: workspaceId,
objectMetadataId: objectMetadata.id,
isRemoteCreation: true,
isNullable: true,
} as CreateFieldInput);
if (column.column_name === 'id') {
await this.objectMetadataService.updateOne(objectMetadata.id, {
labelIdentifierFieldMetadataId: field.id,
});
}
}
}
private async removeForeignTableAndMetadata(
input: RemoteTableInput,
workspaceId: string,
workspaceDataSource: DataSource,
localSchema: string,
) {
const objectMetadata =
await this.objectMetadataService.findOneWithinWorkspace(workspaceId, {
where: { nameSingular: `${input.name}Remote` },
});
if (objectMetadata) {
await this.objectMetadataService.deleteOneObject(
{ id: objectMetadata.id },
workspaceId,
);
}
await workspaceDataSource.query(
`DROP FOREIGN TABLE ${localSchema}."${input.name}Remote"`,
);
}
private async fetchTableColumnsSchema(
remoteServer: RemoteServerEntity<RemoteServerType>,
tableName: string,
tableSchema: string,
) {
switch (remoteServer.foreignDataWrapperType) {
case RemoteServerType.POSTGRES_FDW:
await isPostgreSQLIntegrationEnabled(
this.featureFlagRepository,
remoteServer.workspaceId,
);
return this.remotePostgresTableService.fetchPostgresTableColumnsSchema(
remoteServer,
tableName,
tableSchema,
);
default:
throw new Error('Unsupported foreign data wrapper type');
}
}
}

View File

@ -1,25 +0,0 @@
import { decryptText } from 'src/engine/core-modules/auth/auth.util';
import {
RemoteServerEntity,
RemoteServerType,
} from 'src/engine/metadata-modules/remote-server/remote-server.entity';
export const EXCLUDED_POSTGRES_SCHEMAS = [
'information_schema',
'pg_catalog',
'pg_toast',
];
export const buildPostgresUrl = (
secretKey: string,
remoteServer: RemoteServerEntity<RemoteServerType>,
): string => {
const foreignDataWrapperOptions = remoteServer.foreignDataWrapperOptions;
const userMappingOptions = remoteServer.userMappingOptions;
const password = decryptText(userMappingOptions.password, secretKey);
const url = `postgres://${userMappingOptions.username}:${password}@${foreignDataWrapperOptions.host}:${foreignDataWrapperOptions.port}/${foreignDataWrapperOptions.dbname}`;
return url;
};