diff --git a/packages/twenty-server/src/engine/core-modules/twenty-config/config-variables.ts b/packages/twenty-server/src/engine/core-modules/twenty-config/config-variables.ts index 8ae97de52..0cf26149d 100644 --- a/packages/twenty-server/src/engine/core-modules/twenty-config/config-variables.ts +++ b/packages/twenty-server/src/engine/core-modules/twenty-config/config-variables.ts @@ -755,6 +755,44 @@ export class ConfigVariables { @IsOptional() PG_SSL_ALLOW_SELF_SIGNED = false; + @ConfigVariablesMetadata({ + group: ConfigVariablesGroup.ServerConfig, + description: 'Enable pg connection pool sharing across tenants', + isEnvOnly: true, + type: ConfigVariableType.BOOLEAN, + }) + @IsOptional() + PG_ENABLE_POOL_SHARING = true; + + @ConfigVariablesMetadata({ + group: ConfigVariablesGroup.ServerConfig, + description: 'Maximum number of clients in pg connection pool', + isEnvOnly: true, + type: ConfigVariableType.NUMBER, + }) + @CastToPositiveNumber() + @IsOptional() + PG_POOL_MAX_CONNECTIONS = 10; + + @ConfigVariablesMetadata({ + group: ConfigVariablesGroup.ServerConfig, + description: 'Idle timeout in milliseconds for pg connection pool clients', + isEnvOnly: true, + type: ConfigVariableType.NUMBER, + }) + @CastToPositiveNumber() + @IsOptional() + PG_POOL_IDLE_TIMEOUT_MS = 600000; + + @ConfigVariablesMetadata({ + group: ConfigVariablesGroup.ServerConfig, + description: 'Allow idle pg connection pool clients to exit', + isEnvOnly: true, + type: ConfigVariableType.BOOLEAN, + }) + @IsOptional() + PG_POOL_ALLOW_EXIT_ON_IDLE = true; + @ConfigVariablesMetadata({ group: ConfigVariablesGroup.ServerConfig, description: 'Enable configuration variables to be stored in the database', diff --git a/packages/twenty-server/src/engine/core-modules/twenty-config/drivers/database-config.driver.ts b/packages/twenty-server/src/engine/core-modules/twenty-config/drivers/database-config.driver.ts index 4d58c344f..86a945cf1 100644 --- a/packages/twenty-server/src/engine/core-modules/twenty-config/drivers/database-config.driver.ts +++ b/packages/twenty-server/src/engine/core-modules/twenty-config/drivers/database-config.driver.ts @@ -40,7 +40,7 @@ export class DatabaseConfigDriver const loadedCount = await this.loadAllConfigVarsFromDb(); this.logger.log( - `[INIT] Config variables loaded: ${loadedCount} values found, ${this.allPossibleConfigKeys.length - loadedCount} missing`, + `[INIT] Config variables loaded: ${loadedCount} values found in DB, ${this.allPossibleConfigKeys.length - loadedCount} falling to env vars/defaults`, ); } catch (error) { this.logger.error( diff --git a/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts b/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts index 32145ab54..9a07903a4 100644 --- a/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts +++ b/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts @@ -58,6 +58,30 @@ export class WorkspaceDatasourceFactory { private readonly workspaceFeatureFlagsMapCacheService: WorkspaceFeatureFlagsMapCacheService, ) {} + private async safelyDestroyDataSource( + dataSource: WorkspaceDataSource, + ): Promise { + try { + await dataSource.destroy(); + } catch (error) { + // Ignore known race-condition errors to prevent noise during shutdown + if ( + error.message === 'Called end on pool more than once' || + error.message?.includes( + 'pool is draining and cannot accommodate new clients', + ) + ) { + this.logger.debug( + `Ignoring pool error during cleanup: ${error.message}`, + ); + + return; + } + + throw error; + } + } + public async create( workspaceId: string, workspaceMetadataVersion: number | null, @@ -182,9 +206,8 @@ export class WorkspaceDatasourceFactory { extra: { query_timeout: 10000, // https://node-postgres.com/apis/pool - // TypeORM doesn't allow sharing connection pools bet - // So for now we keep a small pool open for longer - // for each workspace. + // TypeORM doesn't allow sharing connection pools between data sources + // So we keep a small pool open for longer if connection pooling patch isn't enabled idleTimeoutMillis: ONE_HOUR_IN_MS, max: 4, allowExitOnIdle: true, @@ -201,15 +224,7 @@ export class WorkspaceDatasourceFactory { return workspaceDataSource; }, async (dataSource) => { - try { - await dataSource.destroy(); - } catch (error) { - // Ignore error if pool has already been destroyed which is a common race condition case - if (error.message === 'Called end on pool more than once') { - return; - } - throw error; - } + await this.safelyDestroyDataSource(dataSource); }, ); @@ -359,8 +374,18 @@ export class WorkspaceDatasourceFactory { } public async destroy(workspaceId: string) { - await this.promiseMemoizer.clearKeys(`${workspaceId}-`, (dataSource) => { - dataSource.destroy(); - }); + try { + await this.promiseMemoizer.clearKeys( + `${workspaceId}-`, + async (dataSource) => { + await this.safelyDestroyDataSource(dataSource); + }, + ); + } catch (error) { + // Log and swallow any errors during cleanup to prevent crashes + this.logger.warn( + `Error cleaning up datasources for workspace ${workspaceId}: ${error.message}`, + ); + } } } diff --git a/packages/twenty-server/src/engine/twenty-orm/pg-shared-pool/__tests__/pg-shared-pool.service.spec.ts b/packages/twenty-server/src/engine/twenty-orm/pg-shared-pool/__tests__/pg-shared-pool.service.spec.ts new file mode 100644 index 000000000..1864111f7 --- /dev/null +++ b/packages/twenty-server/src/engine/twenty-orm/pg-shared-pool/__tests__/pg-shared-pool.service.spec.ts @@ -0,0 +1,317 @@ +import { Logger, LogLevel } from '@nestjs/common'; +import { Test, TestingModule } from '@nestjs/testing'; + +import { Pool } from 'pg'; + +import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; +import { PgPoolSharedService } from 'src/engine/twenty-orm/pg-shared-pool/pg-shared-pool.service'; + +type ConfigKey = + | 'PG_ENABLE_POOL_SHARING' + | 'PG_POOL_MAX_CONNECTIONS' + | 'LOG_LEVELS'; + +type ConfigValue = boolean | number | LogLevel[] | string; + +interface PoolWithEndTracker extends Pool { + __hasEnded?: boolean; +} + +jest.mock('pg', () => { + const mockPool = jest.fn().mockImplementation(() => ({ + on: jest.fn(), + end: jest.fn().mockImplementation((callback) => { + if (callback) callback(); + + return Promise.resolve(); + }), + _clients: [], + _idle: [], + _pendingQueue: { length: 0 }, + })); + + mockPool.prototype = { + on: jest.fn(), + end: jest.fn(), + }; + + return { + Pool: mockPool, + }; +}); + +describe('PgPoolSharedService', () => { + let service: PgPoolSharedService; + let configService: TwentyConfigService; + let mockLogger: Partial; + + const configValues: Record = { + PG_ENABLE_POOL_SHARING: true, + PG_POOL_MAX_CONNECTIONS: 10, + LOG_LEVELS: ['error', 'warn'], + }; + + beforeEach(async () => { + mockLogger = { + log: jest.fn(), + debug: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + PgPoolSharedService, + { + provide: TwentyConfigService, + useValue: { + get: jest + .fn() + .mockImplementation( + (key: string) => configValues[key as ConfigKey], + ), + }, + }, + ], + }).compile(); + + service = module.get(PgPoolSharedService); + configService = module.get(TwentyConfigService); + + Object.defineProperty(service, 'logger', { + value: mockLogger, + writable: true, + }); + }); + + afterEach(() => { + if (service && typeof service.unpatchForTesting === 'function') { + service.unpatchForTesting(); + } + jest.clearAllMocks(); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + describe('initialize', () => { + it('should initialize and patch pg Pool when enabled', () => { + service.initialize(); + + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining( + 'Pool sharing will use max 10 connections per pool', + ), + ); + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('Pg pool sharing initialized'), + ); + }); + + it('should not initialize when pool sharing is disabled', () => { + jest.spyOn(configService, 'get').mockImplementation((key: string) => { + if (key === 'PG_ENABLE_POOL_SHARING') return false; + + return configValues[key as ConfigKey]; + }); + + service.initialize(); + + expect(mockLogger.log).toHaveBeenCalledWith( + 'Pg pool sharing is disabled by configuration', + ); + expect(mockLogger.log).not.toHaveBeenCalledWith( + expect.stringContaining('Pg pool sharing initialized'), + ); + }); + + it('should not initialize twice', () => { + service.initialize(); + jest.clearAllMocks(); + + service.initialize(); + + expect(mockLogger.debug).toHaveBeenCalledWith( + 'Pg pool sharing already initialized, skipping', + ); + }); + }); + + describe('pool sharing functionality', () => { + beforeEach(() => { + service.initialize(); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + it('should reuse pools with identical connection parameters', () => { + const pool1 = new Pool({ + host: 'localhost', + port: 5432, + database: 'testdb', + user: 'testuser', + }); + + const pool2 = new Pool({ + host: 'localhost', + port: 5432, + database: 'testdb', + user: 'testuser', + }); + + expect(pool1).toBe(pool2); + + const poolsMap = service.getPoolsMapForTesting(); + + expect(poolsMap?.size).toBe(1); + }); + + it('should create separate pools for different connection parameters', () => { + const pool1 = new Pool({ + host: 'localhost', + port: 5432, + database: 'db1', + user: 'user1', + }); + + const pool2 = new Pool({ + host: 'localhost', + port: 5432, + database: 'db2', + user: 'user1', + }); + + expect(pool1).not.toBe(pool2); + + const poolsMap = service.getPoolsMapForTesting(); + + expect(poolsMap?.size).toBe(2); + }); + + it('should remove pools from cache when they are ended', async () => { + const pool = new Pool({ + host: 'localhost', + database: 'testdb', + }); + + const poolsMapBefore = service.getPoolsMapForTesting(); + + expect(poolsMapBefore?.size).toBe(1); + + await pool.end(); + + const poolsMapAfter = service.getPoolsMapForTesting(); + + expect(poolsMapAfter?.size).toBe(0); + + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('pg Pool for key'), + ); + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('has been closed'), + ); + }); + + it('should handle calling end() multiple times on the same pool', async () => { + const pool = new Pool({ + host: 'localhost', + database: 'testdb', + }); + + await pool.end(); + + expect(service.getPoolsMapForTesting()?.size).toBe(0); + + expect((pool as PoolWithEndTracker).__hasEnded).toBe(true); + + await pool.end(); + + expect(mockLogger.debug).toHaveBeenCalledWith( + expect.stringContaining('Ignoring duplicate end() call'), + ); + + const closeMessageCalls = (mockLogger.log as jest.Mock).mock.calls.filter( + (call: any[]) => call[0].includes('has been closed'), + ); + + expect(closeMessageCalls.length).toBe(1); + expect(mockLogger.log).toHaveBeenCalledWith( + expect.stringContaining('has been closed'), + ); + }); + }); + + describe('debug logging', () => { + it('should enable debug logging when debug log level is set', async () => { + jest + .spyOn(configService, 'get') + .mockImplementation((key: string): ConfigValue => { + if (key === 'LOG_LEVELS') return ['debug', 'error', 'warn']; + + return configValues[key as ConfigKey]; + }); + + const module = await Test.createTestingModule({ + providers: [ + PgPoolSharedService, + { + provide: TwentyConfigService, + useValue: configService, + }, + ], + }).compile(); + + const debugService = module.get(PgPoolSharedService); + + Object.defineProperty(debugService, 'logger', { + value: mockLogger, + writable: true, + }); + + const spyInterval = jest.spyOn(global, 'setInterval'); + + debugService.initialize(); + + expect(spyInterval).toHaveBeenCalled(); + + expect(mockLogger.debug).toHaveBeenCalledWith( + expect.stringContaining('Pool statistics logging enabled'), + ); + }); + }); + + describe('logPoolStats', () => { + it('should log pool statistics correctly', () => { + service.initialize(); + + new Pool({ host: 'localhost', database: 'testdb' }); + + jest.clearAllMocks(); + + service.logPoolStats(); + + expect(mockLogger.debug).toHaveBeenCalledWith( + '=== PostgreSQL Connection Pool Stats ===', + ); + expect(mockLogger.debug).toHaveBeenCalledWith( + expect.stringContaining('Total pools: 1'), + ); + }); + }); + + describe('onApplicationShutdown', () => { + it('should clear interval on shutdown', () => { + const clearIntervalSpy = jest.spyOn(global, 'clearInterval'); + + service['logStatsInterval'] = setInterval(() => {}, 1000); + + service.onApplicationShutdown(); + + expect(clearIntervalSpy).toHaveBeenCalled(); + expect(service['logStatsInterval']).toBeNull(); + }); + }); +}); diff --git a/packages/twenty-server/src/engine/twenty-orm/pg-shared-pool/pg-shared-pool.module.ts b/packages/twenty-server/src/engine/twenty-orm/pg-shared-pool/pg-shared-pool.module.ts new file mode 100644 index 000000000..9dafab619 --- /dev/null +++ b/packages/twenty-server/src/engine/twenty-orm/pg-shared-pool/pg-shared-pool.module.ts @@ -0,0 +1,36 @@ +import { + Global, + Module, + OnApplicationShutdown, + OnModuleInit, +} from '@nestjs/common'; + +import { TwentyConfigModule } from 'src/engine/core-modules/twenty-config/twenty-config.module'; +import { PgPoolSharedService } from 'src/engine/twenty-orm/pg-shared-pool/pg-shared-pool.service'; + +/** + * Module that initializes the shared pg pool at application bootstrap + */ +@Global() +@Module({ + imports: [TwentyConfigModule], + providers: [PgPoolSharedService], + exports: [PgPoolSharedService], +}) +export class PgPoolSharedModule implements OnModuleInit, OnApplicationShutdown { + constructor(private readonly pgPoolSharedService: PgPoolSharedService) {} + + /** + * Initialize the pool sharing service when the module is initialized + */ + async onModuleInit() { + await this.pgPoolSharedService.initialize(); + } + + /** + * Clean up any resources when the application shuts down + */ + async onApplicationShutdown() { + await this.pgPoolSharedService.onApplicationShutdown(); + } +} diff --git a/packages/twenty-server/src/engine/twenty-orm/pg-shared-pool/pg-shared-pool.service.ts b/packages/twenty-server/src/engine/twenty-orm/pg-shared-pool/pg-shared-pool.service.ts new file mode 100644 index 000000000..1c303d88c --- /dev/null +++ b/packages/twenty-server/src/engine/twenty-orm/pg-shared-pool/pg-shared-pool.service.ts @@ -0,0 +1,565 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import pg, { Pool, PoolConfig } from 'pg'; +import { isDefined } from 'twenty-shared/utils'; + +import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; + +interface PgWithPatchSymbol { + Pool: typeof Pool; + [key: symbol]: boolean; +} + +interface SSLConfig { + rejectUnauthorized?: boolean; + [key: string]: unknown; +} + +interface PoolWithEndTracker extends Pool { + __hasEnded?: boolean; +} + +interface ExtendedPoolConfig extends PoolConfig { + extra?: { + allowExitOnIdle?: boolean; + idleTimeoutMillis?: number; + [key: string]: unknown; + }; +} + +interface PoolInternalStats { + _clients?: Array; + _idle?: Array; + _pendingQueue?: { + length: number; + }; +} + +/** + * Service that manages shared pg connection pools across tenants. + * It patches the pg.Pool constructor to return cached instances for + * identical connection parameters. + */ +@Injectable() +export class PgPoolSharedService { + private readonly logger = new Logger('PgPoolSharedService'); + private initialized = false; + private readonly PATCH_SYMBOL = Symbol.for('@@twenty/pg-shared-pool'); + private isDebugEnabled = false; + private logStatsInterval: NodeJS.Timeout | null = null; + + // Internal pool cache - exposed for testing only + private poolsMap = new Map(); + + private capturedOriginalPoolValue: typeof Pool | null = null; + private originalPgPoolDescriptor: PropertyDescriptor | undefined = undefined; + + constructor(private readonly configService: TwentyConfigService) { + this.detectDebugMode(); + } + + private detectDebugMode(): void { + const logLevels = this.configService.get('LOG_LEVELS'); + + this.isDebugEnabled = + Array.isArray(logLevels) && + logLevels.some((level) => level === 'debug' || level === 'verbose'); + } + + /** + * Provides access to the internal pools map for testing purposes + */ + getPoolsMapForTesting(): Map | null { + if (!this.initialized) { + return null; + } + + return this.poolsMap; + } + + /** + * Applies the pg.Pool patch to enable connection pool sharing. + * Safe to call multiple times - will only apply the patch once. + */ + async initialize(): Promise { + if (this.initialized) { + this.logger.debug('Pg pool sharing already initialized, skipping'); + + return; + } + + if (!this.isPoolSharingEnabled()) { + this.logger.log('Pg pool sharing is disabled by configuration'); + + return; + } + + this.logPoolConfiguration(); + this.patchPgPool(); + + if (!this.isPatchSuccessful()) { + this.logger.error( + 'Failed to patch pg.Pool. PgPoolSharedService will not be active.', + ); + this.initialized = false; + + return; + } + + this.initialized = true; + this.logger.log( + 'Pg pool sharing initialized - pools will be shared across tenants', + ); + + if (this.isDebugEnabled) { + this.startPoolStatsLogging(); + } + } + + private isPoolSharingEnabled(): boolean { + return !!this.configService.get('PG_ENABLE_POOL_SHARING'); + } + + private logPoolConfiguration(): void { + const maxConnections = this.configService.get('PG_POOL_MAX_CONNECTIONS'); + const idleTimeoutMs = this.configService.get('PG_POOL_IDLE_TIMEOUT_MS'); + const allowExitOnIdle = this.configService.get( + 'PG_POOL_ALLOW_EXIT_ON_IDLE', + ); + + this.logger.log( + `Pool sharing will use max ${maxConnections} connections per pool with ${idleTimeoutMs}ms idle timeout and allowExitOnIdle=${allowExitOnIdle}`, + ); + } + + private isPatchSuccessful(): boolean { + const pgWithSymbol = pg as PgWithPatchSymbol; + + return !!pgWithSymbol[this.PATCH_SYMBOL]; + } + + /** + * Stops the periodic logging of pool statistics. + * Call this during application shutdown. + */ + async onApplicationShutdown(): Promise { + this.stopStatsLogging(); + await this.closeAllPools(); + } + + private stopStatsLogging(): void { + if (!this.logStatsInterval) { + return; + } + + clearInterval(this.logStatsInterval); + this.logStatsInterval = null; + } + + private async closeAllPools(): Promise { + if (this.poolsMap.size === 0) { + return; + } + + const closePromises: Promise[] = []; + + for (const [key, pool] of this.poolsMap.entries()) { + closePromises.push( + pool + .end() + .catch((err) => { + if (err?.message !== 'Called end on pool more than once') { + this.logger.debug( + `Pool[${key}] error during shutdown: ${err.message}`, + ); + } + }) + .then(() => { + this.logger.debug( + `Pool[${key}] closed during application shutdown`, + ); + }), + ); + } + + this.logger.debug('Attempting to close all pg pools...'); + await Promise.allSettled(closePromises); + this.logger.debug('All pg pools closure attempts completed.'); + } + + /** + * Logs detailed statistics about all connection pools + */ + logPoolStats(): void { + if (!this.initialized || this.poolsMap.size === 0) { + this.logger.debug('No active pg pools to log stats for'); + + return; + } + + let totalActive = 0; + let totalIdle = 0; + let totalPoolSize = 0; + let totalQueueSize = 0; + + this.logger.debug('=== PostgreSQL Connection Pool Stats ==='); + + for (const [key, pool] of this.poolsMap.entries()) { + const stats = this.collectPoolStats(key, pool); + + totalActive += stats.active; + totalIdle += stats.idle; + totalPoolSize += stats.poolSize; + totalQueueSize += stats.queueSize; + } + + this.logTotalStats(totalActive, totalIdle, totalPoolSize, totalQueueSize); + } + + private collectPoolStats(key: string, pool: Pool) { + const poolStats = pool as PoolInternalStats; + + const active = + (poolStats._clients?.length || 0) - (poolStats._idle?.length || 0); + const idle = poolStats._idle?.length || 0; + const poolSize = poolStats._clients?.length || 0; + const queueSize = poolStats._pendingQueue?.length || 0; + + this.logger.debug( + `Pool [${key}]: active=${active}, idle=${idle}, size=${poolSize}, queue=${queueSize}`, + ); + + return { active, idle, poolSize, queueSize }; + } + + private logTotalStats( + totalActive: number, + totalIdle: number, + totalPoolSize: number, + totalQueueSize: number, + ): void { + this.logger.debug( + `Total pools: ${this.poolsMap.size}, active=${totalActive}, idle=${totalIdle}, ` + + `total connections=${totalPoolSize}, queued requests=${totalQueueSize}`, + ); + this.logger.debug('========================================='); + } + + /** + * Starts periodically logging pool statistics if debug is enabled + */ + private startPoolStatsLogging(): void { + this.logPoolStats(); + + this.logStatsInterval = setInterval(() => { + this.logPoolStats(); + }, 30000); + + this.logger.debug('Pool statistics logging enabled (30s interval)'); + } + + /** + * Patches the pg module's Pool constructor to provide shared instances + * across all tenant workspaces. + */ + private patchPgPool(): void { + const pgWithSymbol = pg as PgWithPatchSymbol; + + if (this.isAlreadyPatched(pgWithSymbol)) { + return; + } + + if (!this.captureOriginalPool(pgWithSymbol)) { + return; + } + + this.applyPatchWithSharedPool(pgWithSymbol); + } + + private isAlreadyPatched(pgWithSymbol: PgWithPatchSymbol): boolean { + if (pgWithSymbol[this.PATCH_SYMBOL]) { + this.logger.debug( + 'pg.Pool is already patched. Skipping patch operation for this instance.', + ); + + return true; + } + + return false; + } + + private captureOriginalPool(pgWithSymbol: PgWithPatchSymbol): boolean { + this.originalPgPoolDescriptor = Object.getOwnPropertyDescriptor( + pgWithSymbol, + 'Pool', + ); + + if ( + !this.originalPgPoolDescriptor || + typeof this.originalPgPoolDescriptor.value !== 'function' + ) { + this.logger.error( + 'Could not get original pg.Pool constructor or descriptor is invalid. Aborting patch.', + ); + + return false; + } + + this.capturedOriginalPoolValue = this.originalPgPoolDescriptor + .value as typeof Pool; + + return true; + } + + private applyPatchWithSharedPool(pgWithSymbol: PgWithPatchSymbol): void { + const OriginalPool = this.capturedOriginalPoolValue as typeof Pool; + const SharedPool = this.createSharedPoolConstructor(OriginalPool); + + // Preserve prototype chain for instanceof checks + SharedPool.prototype = Object.create(OriginalPool.prototype); + SharedPool.prototype.constructor = SharedPool; + + // Replace the original Pool with our patched version + Object.defineProperty(pgWithSymbol, 'Pool', { + value: SharedPool as unknown as typeof Pool, + writable: true, + configurable: true, + enumerable: this.originalPgPoolDescriptor?.enumerable, + }); + + pgWithSymbol[this.PATCH_SYMBOL] = true; + this.logger.log('pg.Pool patched successfully by this service instance.'); + } + + private createSharedPoolConstructor(OriginalPool: typeof Pool) { + const maxConnections = this.configService.get('PG_POOL_MAX_CONNECTIONS'); + const idleTimeoutMs = this.configService.get('PG_POOL_IDLE_TIMEOUT_MS'); + const allowExitOnIdle = this.configService.get( + 'PG_POOL_ALLOW_EXIT_ON_IDLE', + ); + + // Store references to service functions/properties that we need in our constructor + const buildPoolKey = this.buildPoolKey.bind(this); + const poolsMap = this.poolsMap; + const logger = this.logger; + const isDebugEnabled = this.isDebugEnabled; + const setupPoolEvents = this.setupPoolEvents.bind(this); + const replacePoolEndMethod = this.replacePoolEndMethod.bind(this); + + // Define a proper constructor function that can be used with "new" + function SharedPool(this: Pool, config?: PoolConfig): Pool { + // When called as a function (without new), make sure to return a new instance + if (!(this instanceof SharedPool)) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-expect-error We know this works at runtime + return new SharedPool(config); + } + + const poolConfig = config + ? ({ ...config } as ExtendedPoolConfig) + : ({} as ExtendedPoolConfig); + + if (maxConnections) { + poolConfig.max = maxConnections; + } + + if (idleTimeoutMs) { + poolConfig.idleTimeoutMillis = idleTimeoutMs; + } + + if (!poolConfig.extra) { + poolConfig.extra = {}; + } + poolConfig.extra.allowExitOnIdle = allowExitOnIdle; + + const key = buildPoolKey(poolConfig); + const existing = poolsMap.get(key); + + if (existing) { + if (isDebugEnabled) { + logger.debug(`Reusing existing pg Pool for key "${key}"`); + } + + return existing; + } + + const pool = new OriginalPool(poolConfig); + + poolsMap.set(key, pool); + + logger.log( + `Created new shared pg Pool for key "${key}" with ${poolConfig.max ?? 'default'} max connections and ${poolConfig.idleTimeoutMillis ?? 'default'} ms idle timeout. Total pools: ${poolsMap.size}`, + ); + + if (isDebugEnabled) { + setupPoolEvents(pool, key); + } + + replacePoolEndMethod(pool, key); + + return pool; + } + + return SharedPool; + } + + private setupPoolEvents(pool: Pool, key: string): void { + pool.on('connect', () => { + this.logger.debug(`Pool[${key}]: New connection established`); + }); + + pool.on('acquire', () => { + this.logger.debug(`Pool[${key}]: Client acquired from pool`); + }); + + pool.on('remove', () => { + this.logger.debug(`Pool[${key}]: Connection removed from pool`); + }); + + pool.on('error', (err) => { + this.logger.warn(`Pool[${key}]: Connection error: ${err.message}`); + }); + } + + private replacePoolEndMethod(pool: Pool, key: string): void { + const originalEnd = pool.end.bind(pool) as { + (callback?: (err?: Error) => void): void; + }; + + (pool as PoolWithEndTracker).end = ( + callback?: (err?: Error) => void, + ): Promise => { + if ((pool as PoolWithEndTracker).__hasEnded) { + if (callback) { + callback(); + } + + this.logger.debug(`Ignoring duplicate end() call for pool "${key}"`); + + return Promise.resolve(); + } + + // Mark this pool as ended to prevent subsequent calls + (pool as PoolWithEndTracker).__hasEnded = true; + this.poolsMap.delete(key); + + this.logger.log( + `pg Pool for key "${key}" has been closed. Remaining pools: ${this.poolsMap.size}`, + ); + + return new Promise((resolve, reject) => { + originalEnd((err) => { + if (err) { + // If error is about duplicate end, suppress it + if (err.message === 'Called end on pool more than once') { + if (callback) callback(); + resolve(); + + return; + } + + if (callback) callback(err); + reject(err); + + return; + } + + if (callback) callback(); + resolve(); + }); + }); + }; + } + + /** + * Builds a unique key for a pool configuration to identify identical connections + */ + private buildPoolKey(config: PoolConfig = {}): string { + // We identify pools only by parameters that open a *physical* connection. + // `search_path`/schema is not included because it is changed at session level. + const { + host = 'localhost', + port = 5432, + user = 'postgres', + database = '', + ssl, + } = config; + + // Note: SSL object can contain certificates, so only stringify relevant + // properties that influence connection reuse. + const sslKey = isDefined(ssl) + ? typeof ssl === 'object' + ? JSON.stringify({ + rejectUnauthorized: (ssl as SSLConfig).rejectUnauthorized, + }) + : String(ssl) + : 'no-ssl'; + + return [host, port, user, database, sslKey].join('|'); + } + + /** + * Resets the pg.Pool patch and clears service state. For testing purposes only. + */ + public unpatchForTesting(): void { + this.logger.debug('Attempting to unpatch pg.Pool for testing...'); + const pgWithSymbol = pg as PgWithPatchSymbol; + + if (!pgWithSymbol[this.PATCH_SYMBOL]) { + this.logger.debug( + 'pg.Pool was not patched by this instance or PATCH_SYMBOL not found, no unpatch needed from this instance.', + ); + this.resetStateForTesting(); + + return; + } + + this.restoreOriginalPool(pgWithSymbol); + delete pgWithSymbol[this.PATCH_SYMBOL]; + this.logger.debug('PATCH_SYMBOL removed from pg module.'); + this.resetStateForTesting(); + } + + private restoreOriginalPool(pgWithSymbol: PgWithPatchSymbol): void { + if (this.originalPgPoolDescriptor) { + Object.defineProperty( + pgWithSymbol, + 'Pool', + this.originalPgPoolDescriptor, + ); + this.logger.debug('pg.Pool restored using original property descriptor.'); + + return; + } + + if (this.capturedOriginalPoolValue) { + // Fallback if descriptor wasn't captured + pgWithSymbol.Pool = this.capturedOriginalPoolValue; + this.logger.warn( + 'pg.Pool restored using captured value (descriptor method preferred).', + ); + + return; + } + + this.logger.error( + 'Cannot unpatch pg.Pool: no original Pool reference or descriptor was captured by this instance.', + ); + } + + private resetStateForTesting(): void { + this.initialized = false; + this.poolsMap.clear(); + this.capturedOriginalPoolValue = null; + this.originalPgPoolDescriptor = undefined; + + if (this.logStatsInterval) { + clearInterval(this.logStatsInterval); + this.logStatsInterval = null; + } + + this.logger.debug( + 'Service instance state (initialized, poolsMap, captured originals, timers) reset for testing.', + ); + } +} diff --git a/packages/twenty-server/src/engine/twenty-orm/twenty-orm.module.ts b/packages/twenty-server/src/engine/twenty-orm/twenty-orm.module.ts index d0a08e3c4..8a7124a02 100644 --- a/packages/twenty-server/src/engine/twenty-orm/twenty-orm.module.ts +++ b/packages/twenty-server/src/engine/twenty-orm/twenty-orm.module.ts @@ -2,6 +2,7 @@ import { Global, Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module'; +import { TwentyConfigModule } from 'src/engine/core-modules/twenty-config/twenty-config.module'; import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-source.module'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { PermissionsModule } from 'src/engine/metadata-modules/permissions/permissions.module'; @@ -15,6 +16,8 @@ import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global. import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { WorkspaceCacheStorageModule } from 'src/engine/workspace-cache-storage/workspace-cache-storage.module'; +import { PgPoolSharedModule } from './pg-shared-pool/pg-shared-pool.module'; + @Global() @Module({ imports: [ @@ -29,12 +32,19 @@ import { WorkspaceCacheStorageModule } from 'src/engine/workspace-cache-storage/ WorkspaceFeatureFlagsMapCacheModule, WorkspacePermissionsCacheModule, FeatureFlagModule, + TwentyConfigModule, + PgPoolSharedModule, ], providers: [ ...entitySchemaFactories, TwentyORMManager, TwentyORMGlobalManager, ], - exports: [EntitySchemaFactory, TwentyORMManager, TwentyORMGlobalManager], + exports: [ + EntitySchemaFactory, + TwentyORMManager, + TwentyORMGlobalManager, + PgPoolSharedModule, + ], }) export class TwentyORMModule {}