diff --git a/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts b/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts index 54c5e8ef6..771c63559 100644 --- a/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts +++ b/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts @@ -18,6 +18,7 @@ import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage export class WorkspaceDatasourceFactory { private readonly logger = new Logger(WorkspaceDatasourceFactory.name); private cacheManager = new CacheManager(); + private cachedDatasourcePromise: Record>; constructor( private readonly dataSourceService: DataSourceService, @@ -25,7 +26,9 @@ export class WorkspaceDatasourceFactory { private readonly workspaceCacheStorageService: WorkspaceCacheStorageService, private readonly workspaceMetadataCacheService: WorkspaceMetadataCacheService, private readonly entitySchemaFactory: EntitySchemaFactory, - ) {} + ) { + this.cachedDatasourcePromise = {}; + } public async create( workspaceId: string, @@ -37,121 +40,139 @@ export class WorkspaceDatasourceFactory { workspaceMetadataVersion, ); - const workspaceDataSource = await this.cacheManager.execute( - `${workspaceId}-${desiredWorkspaceMetadataVersion}`, - async () => { - this.logger.log( - `Creating workspace data source for workspace ${workspaceId} and metadata version ${desiredWorkspaceMetadataVersion}`, - ); - const cachedObjectMetadataMap = - await this.workspaceCacheStorageService.getObjectMetadataMap( - workspaceId, - desiredWorkspaceMetadataVersion, - ); + const cacheKey = `${workspaceId}-${desiredWorkspaceMetadataVersion}`; - if (!cachedObjectMetadataMap) { - await this.workspaceMetadataCacheService.recomputeMetadataCache( - workspaceId, - true, - ); - - throw new TwentyORMException( - `Object metadata map not found for workspace ${workspaceId}`, - TwentyORMExceptionCode.METADATA_COLLECTION_NOT_FOUND, - ); - } - - const dataSourceMetadata = - await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceId( - workspaceId, - ); - - if (!dataSourceMetadata) { - throw new TwentyORMException( - `Workspace Schema not found for workspace ${workspaceId}`, - TwentyORMExceptionCode.WORKSPACE_SCHEMA_NOT_FOUND, - ); - } - - const cachedEntitySchemaOptions = - await this.workspaceCacheStorageService.getORMEntitySchema( - workspaceId, - desiredWorkspaceMetadataVersion, - ); - - let cachedEntitySchemas: EntitySchema[]; - - if (cachedEntitySchemaOptions) { - cachedEntitySchemas = cachedEntitySchemaOptions.map( - (option) => new EntitySchema(option), - ); - } else { - const entitySchemas = await Promise.all( - Object.values(cachedObjectMetadataMap).map((objectMetadata) => - this.entitySchemaFactory.create( - workspaceId, - desiredWorkspaceMetadataVersion, - objectMetadata, - cachedObjectMetadataMap, - ), - ), - ); - - await this.workspaceCacheStorageService.setORMEntitySchema( - workspaceId, - desiredWorkspaceMetadataVersion, - entitySchemas.map((entitySchema) => entitySchema.options), - ); - - cachedEntitySchemas = entitySchemas; - } - - const workspaceDataSource = new WorkspaceDataSource( - { - workspaceId, - objectMetadataMap: cachedObjectMetadataMap, - }, - { - url: - dataSourceMetadata.url ?? - this.environmentService.get('PG_DATABASE_URL'), - type: 'postgres', - logging: this.environmentService.get('DEBUG_MODE') - ? ['query', 'error'] - : ['error'], - schema: dataSourceMetadata.schema, - entities: cachedEntitySchemas, - ssl: this.environmentService.get('PG_SSL_ALLOW_SELF_SIGNED') - ? { - rejectUnauthorized: false, - } - : undefined, - }, - ); - - await workspaceDataSource.initialize(); - - return workspaceDataSource; - }, - async (dataSource) => { - try { - await dataSource.destroy(); - } catch (error) { - // Ignore error if pool has already been destroyed which is a common race condition case - if (error.message === 'Called end on pool more than once') { - return; - } - - throw error; - } - }, - ); - - if (!workspaceDataSource) { - throw new Error('Workspace data source not found'); + if (cacheKey in this.cachedDatasourcePromise) { + return this.cachedDatasourcePromise[cacheKey]; } - return workspaceDataSource; + const creationPromise = (async (): Promise => { + try { + const result = await this.cacheManager.execute( + cacheKey as '`${string}-${string}`', + async () => { + this.logger.log( + `Creating workspace data source for workspace ${workspaceId} and metadata version ${desiredWorkspaceMetadataVersion}`, + ); + const cachedObjectMetadataMap = + await this.workspaceCacheStorageService.getObjectMetadataMap( + workspaceId, + desiredWorkspaceMetadataVersion, + ); + + if (!cachedObjectMetadataMap) { + await this.workspaceMetadataCacheService.recomputeMetadataCache( + workspaceId, + true, + ); + + throw new TwentyORMException( + `Object metadata map not found for workspace ${workspaceId}`, + TwentyORMExceptionCode.METADATA_COLLECTION_NOT_FOUND, + ); + } + + const dataSourceMetadata = + await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceId( + workspaceId, + ); + + if (!dataSourceMetadata) { + throw new TwentyORMException( + `Workspace Schema not found for workspace ${workspaceId}`, + TwentyORMExceptionCode.WORKSPACE_SCHEMA_NOT_FOUND, + ); + } + + const cachedEntitySchemaOptions = + await this.workspaceCacheStorageService.getORMEntitySchema( + workspaceId, + desiredWorkspaceMetadataVersion, + ); + + let cachedEntitySchemas: EntitySchema[]; + + if (cachedEntitySchemaOptions) { + cachedEntitySchemas = cachedEntitySchemaOptions.map( + (option) => new EntitySchema(option), + ); + } else { + const entitySchemas = await Promise.all( + Object.values(cachedObjectMetadataMap).map((objectMetadata) => + this.entitySchemaFactory.create( + workspaceId, + desiredWorkspaceMetadataVersion, + objectMetadata, + cachedObjectMetadataMap, + ), + ), + ); + + await this.workspaceCacheStorageService.setORMEntitySchema( + workspaceId, + desiredWorkspaceMetadataVersion, + entitySchemas.map((entitySchema) => entitySchema.options), + ); + + cachedEntitySchemas = entitySchemas; + } + + const workspaceDataSource = new WorkspaceDataSource( + { + workspaceId, + objectMetadataMap: cachedObjectMetadataMap, + }, + { + url: + dataSourceMetadata.url ?? + this.environmentService.get('PG_DATABASE_URL'), + type: 'postgres', + logging: this.environmentService.get('DEBUG_MODE') + ? ['query', 'error'] + : ['error'], + schema: dataSourceMetadata.schema, + entities: cachedEntitySchemas, + ssl: this.environmentService.get('PG_SSL_ALLOW_SELF_SIGNED') + ? { + rejectUnauthorized: false, + } + : undefined, + }, + ); + + await workspaceDataSource.initialize(); + + return workspaceDataSource; + }, + async (dataSource) => { + try { + await dataSource.destroy(); + } catch (error) { + // Ignore error if pool has already been destroyed which is a common race condition case + if (error.message === 'Called end on pool more than once') { + return; + } + + throw error; + } + }, + ); + + if (result === null) { + throw new Error( + `Failed to create WorkspaceDataSource for ${cacheKey}`, + ); + } + + return result; + } finally { + delete this.cachedDatasourcePromise[cacheKey]; + } + })(); + + this.cachedDatasourcePromise[cacheKey] = creationPromise; + + return creationPromise; } private async computeDesiredWorkspaceMetadataVersion(