From 999974893c3ec9f0aa5e1f82670640121afc602d Mon Sep 17 00:00:00 2001 From: Weiko Date: Wed, 18 Sep 2024 14:01:55 +0200 Subject: [PATCH] Fix race condition with datasource creation (#7106) ## Context We currently have a race condition when dealing with datasource creation. This happen when multiple queries arrive at the same time (for example graphql dataloaders) and the datasource is not created yet. Since the datasource is stored in memory this can happen more often as well and they were all triggering the datasource creation at the same time. I'm trying to fix the issue with promise memoization. Now, instead of caching the datasource only, we also want to cache the promise of the datasource creation and make the creation itself synchronous. More info about promise memoization in this article for example: https://www.jonmellman.com/posts/promise-memoization Co-authored-by: Charles Bochet --- .../factories/workspace-datasource.factory.ts | 247 ++++++++++-------- 1 file changed, 134 insertions(+), 113 deletions(-) diff --git a/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts b/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts index 54c5e8ef6..771c63559 100644 --- a/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts +++ b/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts @@ -18,6 +18,7 @@ import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage export class WorkspaceDatasourceFactory { private readonly logger = new Logger(WorkspaceDatasourceFactory.name); private cacheManager = new CacheManager(); + private cachedDatasourcePromise: Record>; constructor( private readonly dataSourceService: DataSourceService, @@ -25,7 +26,9 @@ export class WorkspaceDatasourceFactory { private readonly workspaceCacheStorageService: WorkspaceCacheStorageService, private readonly workspaceMetadataCacheService: WorkspaceMetadataCacheService, private readonly entitySchemaFactory: EntitySchemaFactory, - ) {} + ) { + this.cachedDatasourcePromise = {}; + } public async create( workspaceId: string, @@ -37,121 +40,139 @@ export class WorkspaceDatasourceFactory { workspaceMetadataVersion, ); - const workspaceDataSource = await this.cacheManager.execute( - `${workspaceId}-${desiredWorkspaceMetadataVersion}`, - async () => { - this.logger.log( - `Creating workspace data source for workspace ${workspaceId} and metadata version ${desiredWorkspaceMetadataVersion}`, - ); - const cachedObjectMetadataMap = - await this.workspaceCacheStorageService.getObjectMetadataMap( - workspaceId, - desiredWorkspaceMetadataVersion, - ); + const cacheKey = `${workspaceId}-${desiredWorkspaceMetadataVersion}`; - if (!cachedObjectMetadataMap) { - await this.workspaceMetadataCacheService.recomputeMetadataCache( - workspaceId, - true, - ); - - throw new TwentyORMException( - `Object metadata map not found for workspace ${workspaceId}`, - TwentyORMExceptionCode.METADATA_COLLECTION_NOT_FOUND, - ); - } - - const dataSourceMetadata = - await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceId( - workspaceId, - ); - - if (!dataSourceMetadata) { - throw new TwentyORMException( - `Workspace Schema not found for workspace ${workspaceId}`, - TwentyORMExceptionCode.WORKSPACE_SCHEMA_NOT_FOUND, - ); - } - - const cachedEntitySchemaOptions = - await this.workspaceCacheStorageService.getORMEntitySchema( - workspaceId, - desiredWorkspaceMetadataVersion, - ); - - let cachedEntitySchemas: EntitySchema[]; - - if (cachedEntitySchemaOptions) { - cachedEntitySchemas = cachedEntitySchemaOptions.map( - (option) => new EntitySchema(option), - ); - } else { - const entitySchemas = await Promise.all( - Object.values(cachedObjectMetadataMap).map((objectMetadata) => - this.entitySchemaFactory.create( - workspaceId, - desiredWorkspaceMetadataVersion, - objectMetadata, - cachedObjectMetadataMap, - ), - ), - ); - - await this.workspaceCacheStorageService.setORMEntitySchema( - workspaceId, - desiredWorkspaceMetadataVersion, - entitySchemas.map((entitySchema) => entitySchema.options), - ); - - cachedEntitySchemas = entitySchemas; - } - - const workspaceDataSource = new WorkspaceDataSource( - { - workspaceId, - objectMetadataMap: cachedObjectMetadataMap, - }, - { - url: - dataSourceMetadata.url ?? - this.environmentService.get('PG_DATABASE_URL'), - type: 'postgres', - logging: this.environmentService.get('DEBUG_MODE') - ? ['query', 'error'] - : ['error'], - schema: dataSourceMetadata.schema, - entities: cachedEntitySchemas, - ssl: this.environmentService.get('PG_SSL_ALLOW_SELF_SIGNED') - ? { - rejectUnauthorized: false, - } - : undefined, - }, - ); - - await workspaceDataSource.initialize(); - - return workspaceDataSource; - }, - async (dataSource) => { - try { - await dataSource.destroy(); - } catch (error) { - // Ignore error if pool has already been destroyed which is a common race condition case - if (error.message === 'Called end on pool more than once') { - return; - } - - throw error; - } - }, - ); - - if (!workspaceDataSource) { - throw new Error('Workspace data source not found'); + if (cacheKey in this.cachedDatasourcePromise) { + return this.cachedDatasourcePromise[cacheKey]; } - return workspaceDataSource; + const creationPromise = (async (): Promise => { + try { + const result = await this.cacheManager.execute( + cacheKey as '`${string}-${string}`', + async () => { + this.logger.log( + `Creating workspace data source for workspace ${workspaceId} and metadata version ${desiredWorkspaceMetadataVersion}`, + ); + const cachedObjectMetadataMap = + await this.workspaceCacheStorageService.getObjectMetadataMap( + workspaceId, + desiredWorkspaceMetadataVersion, + ); + + if (!cachedObjectMetadataMap) { + await this.workspaceMetadataCacheService.recomputeMetadataCache( + workspaceId, + true, + ); + + throw new TwentyORMException( + `Object metadata map not found for workspace ${workspaceId}`, + TwentyORMExceptionCode.METADATA_COLLECTION_NOT_FOUND, + ); + } + + const dataSourceMetadata = + await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceId( + workspaceId, + ); + + if (!dataSourceMetadata) { + throw new TwentyORMException( + `Workspace Schema not found for workspace ${workspaceId}`, + TwentyORMExceptionCode.WORKSPACE_SCHEMA_NOT_FOUND, + ); + } + + const cachedEntitySchemaOptions = + await this.workspaceCacheStorageService.getORMEntitySchema( + workspaceId, + desiredWorkspaceMetadataVersion, + ); + + let cachedEntitySchemas: EntitySchema[]; + + if (cachedEntitySchemaOptions) { + cachedEntitySchemas = cachedEntitySchemaOptions.map( + (option) => new EntitySchema(option), + ); + } else { + const entitySchemas = await Promise.all( + Object.values(cachedObjectMetadataMap).map((objectMetadata) => + this.entitySchemaFactory.create( + workspaceId, + desiredWorkspaceMetadataVersion, + objectMetadata, + cachedObjectMetadataMap, + ), + ), + ); + + await this.workspaceCacheStorageService.setORMEntitySchema( + workspaceId, + desiredWorkspaceMetadataVersion, + entitySchemas.map((entitySchema) => entitySchema.options), + ); + + cachedEntitySchemas = entitySchemas; + } + + const workspaceDataSource = new WorkspaceDataSource( + { + workspaceId, + objectMetadataMap: cachedObjectMetadataMap, + }, + { + url: + dataSourceMetadata.url ?? + this.environmentService.get('PG_DATABASE_URL'), + type: 'postgres', + logging: this.environmentService.get('DEBUG_MODE') + ? ['query', 'error'] + : ['error'], + schema: dataSourceMetadata.schema, + entities: cachedEntitySchemas, + ssl: this.environmentService.get('PG_SSL_ALLOW_SELF_SIGNED') + ? { + rejectUnauthorized: false, + } + : undefined, + }, + ); + + await workspaceDataSource.initialize(); + + return workspaceDataSource; + }, + async (dataSource) => { + try { + await dataSource.destroy(); + } catch (error) { + // Ignore error if pool has already been destroyed which is a common race condition case + if (error.message === 'Called end on pool more than once') { + return; + } + + throw error; + } + }, + ); + + if (result === null) { + throw new Error( + `Failed to create WorkspaceDataSource for ${cacheKey}`, + ); + } + + return result; + } finally { + delete this.cachedDatasourcePromise[cacheKey]; + } + })(); + + this.cachedDatasourcePromise[cacheKey] = creationPromise; + + return creationPromise; } private async computeDesiredWorkspaceMetadataVersion(