diff --git a/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts b/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts index 9f3f4dfd9..302afad98 100644 --- a/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts +++ b/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts @@ -1,6 +1,7 @@ import { CACHE_MANAGER, Cache } from '@nestjs/cache-manager'; import { Inject, Injectable } from '@nestjs/common'; +import { Milliseconds } from 'cache-manager'; import { RedisCache } from 'cache-manager-redis-yet'; import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; @@ -17,7 +18,7 @@ export class CacheStorageService { return this.cache.get(`${this.namespace}:${key}`); } - async set(key: string, value: T, ttl?: number) { + async set(key: string, value: T, ttl?: Milliseconds) { return this.cache.set(`${this.namespace}:${key}`, value, ttl); } @@ -25,7 +26,7 @@ export class CacheStorageService { return this.cache.del(`${this.namespace}:${key}`); } - async setAdd(key: string, value: string[], ttl?: number) { + async setAdd(key: string, value: string[], ttl?: Milliseconds) { if (value.length === 0) { return; } diff --git a/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts b/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts index d4a779a84..2260994fe 100644 --- a/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts +++ b/packages/twenty-server/src/engine/twenty-orm/factories/workspace-datasource.factory.ts @@ -13,18 +13,14 @@ import { TwentyORMExceptionCode, } from 'src/engine/twenty-orm/exceptions/twenty-orm.exception'; import { EntitySchemaFactory } from 'src/engine/twenty-orm/factories/entity-schema.factory'; -import { CacheManager } from 'src/engine/twenty-orm/storage/cache-manager.storage'; +import { PromiseMemoizer } from 'src/engine/twenty-orm/storage/promise-memoizer.storage'; import { CacheKey } from 'src/engine/twenty-orm/storage/types/cache-key.type'; import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service'; @Injectable() export class WorkspaceDatasourceFactory { private readonly logger = new Logger(WorkspaceDatasourceFactory.name); - private cacheManager = new CacheManager(); - private cachedDataSourcePromise: Record< - CacheKey, - Promise - >; + private promiseMemoizer = new PromiseMemoizer(); constructor( private readonly dataSourceService: DataSourceService, @@ -32,9 +28,7 @@ export class WorkspaceDatasourceFactory { private readonly workspaceCacheStorageService: WorkspaceCacheStorageService, private readonly workspaceMetadataCacheService: WorkspaceMetadataCacheService, private readonly entitySchemaFactory: EntitySchemaFactory, - ) { - this.cachedDataSourcePromise = {}; - } + ) {} public async create( workspaceId: string, @@ -59,142 +53,116 @@ export class WorkspaceDatasourceFactory { const cacheKey: CacheKey = `${workspaceId}-${cachedWorkspaceMetadataVersion}`; - if (cacheKey in this.cachedDataSourcePromise) { - return this.cachedDataSourcePromise[cacheKey]; - } - - const creationPromise = (async (): Promise => { - try { - const result = await this.cacheManager.execute( - cacheKey, - async () => { - const dataSourceMetadata = - await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceId( - workspaceId, - ); - - if (!dataSourceMetadata) { - throw new TwentyORMException( - `Workspace Schema not found for workspace ${workspaceId}`, - TwentyORMExceptionCode.WORKSPACE_SCHEMA_NOT_FOUND, - ); - } - - const cachedEntitySchemaOptions = - await this.workspaceCacheStorageService.getORMEntitySchema( - workspaceId, - cachedWorkspaceMetadataVersion, - ); - - let cachedEntitySchemas: EntitySchema[]; - - const cachedObjectMetadataMaps = - await this.workspaceCacheStorageService.getObjectMetadataMaps( - workspaceId, - cachedWorkspaceMetadataVersion, - ); - - if (!cachedObjectMetadataMaps) { - throw new TwentyORMException( - `Object metadata collection not found for workspace ${workspaceId}`, - TwentyORMExceptionCode.METADATA_COLLECTION_NOT_FOUND, - ); - } - - if (cachedEntitySchemaOptions) { - cachedEntitySchemas = cachedEntitySchemaOptions.map( - (option) => new EntitySchema(option), - ); - } else { - const entitySchemas = await Promise.all( - Object.values(cachedObjectMetadataMaps.byId).map( - (objectMetadata) => - this.entitySchemaFactory.create( - workspaceId, - cachedWorkspaceMetadataVersion, - objectMetadata, - cachedObjectMetadataMaps, - ), - ), - ); - - await this.workspaceCacheStorageService.setORMEntitySchema( - workspaceId, - cachedWorkspaceMetadataVersion, - entitySchemas.map((entitySchema) => entitySchema.options), - ); - - cachedEntitySchemas = entitySchemas; - } - - const workspaceDataSource = new WorkspaceDataSource( - { - workspaceId, - objectMetadataMaps: cachedObjectMetadataMaps, - }, - { - url: - dataSourceMetadata.url ?? - this.environmentService.get('PG_DATABASE_URL'), - type: 'postgres', - logging: - this.environmentService.get('NODE_ENV') === - NodeEnvironment.development - ? ['query', 'error'] - : ['error'], - schema: dataSourceMetadata.schema, - entities: cachedEntitySchemas, - ssl: this.environmentService.get('PG_SSL_ALLOW_SELF_SIGNED') - ? { - rejectUnauthorized: false, - } - : undefined, - }, + const workspaceDataSource = + await this.promiseMemoizer.memoizePromiseAndExecute( + cacheKey, + async () => { + const dataSourceMetadata = + await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceId( + workspaceId, ); - await workspaceDataSource.initialize(); + if (!dataSourceMetadata) { + throw new TwentyORMException( + `Workspace Schema not found for workspace ${workspaceId}`, + TwentyORMExceptionCode.WORKSPACE_SCHEMA_NOT_FOUND, + ); + } - return workspaceDataSource; - }, - async (dataSource) => { - try { - await dataSource.destroy(); - } catch (error) { - // Ignore error if pool has already been destroyed which is a common race condition case - if (error.message === 'Called end on pool more than once') { - return; - } + const cachedEntitySchemaOptions = + await this.workspaceCacheStorageService.getORMEntitySchema( + workspaceId, + cachedWorkspaceMetadataVersion, + ); - throw error; - } - }, - ); + let cachedEntitySchemas: EntitySchema[]; - if (result === null) { - throw new Error( - `Failed to create WorkspaceDataSource for ${cacheKey}`, + const cachedObjectMetadataMaps = + await this.workspaceCacheStorageService.getObjectMetadataMaps( + workspaceId, + cachedWorkspaceMetadataVersion, + ); + + if (!cachedObjectMetadataMaps) { + throw new TwentyORMException( + `Object metadata collection not found for workspace ${workspaceId}`, + TwentyORMExceptionCode.METADATA_COLLECTION_NOT_FOUND, + ); + } + + if (cachedEntitySchemaOptions) { + cachedEntitySchemas = cachedEntitySchemaOptions.map( + (option) => new EntitySchema(option), + ); + } else { + const entitySchemas = await Promise.all( + Object.values(cachedObjectMetadataMaps.byId).map( + (objectMetadata) => + this.entitySchemaFactory.create( + workspaceId, + cachedWorkspaceMetadataVersion, + objectMetadata, + cachedObjectMetadataMaps, + ), + ), + ); + + await this.workspaceCacheStorageService.setORMEntitySchema( + workspaceId, + cachedWorkspaceMetadataVersion, + entitySchemas.map((entitySchema) => entitySchema.options), + ); + + cachedEntitySchemas = entitySchemas; + } + + const workspaceDataSource = new WorkspaceDataSource( + { + workspaceId, + objectMetadataMaps: cachedObjectMetadataMaps, + }, + { + url: + dataSourceMetadata.url ?? + this.environmentService.get('PG_DATABASE_URL'), + type: 'postgres', + logging: + this.environmentService.get('NODE_ENV') === + NodeEnvironment.development + ? ['query', 'error'] + : ['error'], + schema: dataSourceMetadata.schema, + entities: cachedEntitySchemas, + ssl: this.environmentService.get('PG_SSL_ALLOW_SELF_SIGNED') + ? { + rejectUnauthorized: false, + } + : undefined, + }, ); - } - return result; - } finally { - delete this.cachedDataSourcePromise[cacheKey]; - } - })(); + await workspaceDataSource.initialize(); - this.cachedDataSourcePromise[cacheKey] = creationPromise; + return workspaceDataSource; + }, + async (dataSource) => { + try { + await dataSource.destroy(); + } catch (error) { + // Ignore error if pool has already been destroyed which is a common race condition case + if (error.message === 'Called end on pool more than once') { + return; + } + throw error; + } + }, + ); - return creationPromise; - } - - public async destroy(workspaceId: string): Promise { - const cacheKeys = ( - Object.keys(this.cachedDataSourcePromise) as CacheKey[] - ).filter((key) => key.startsWith(`${workspaceId}`)); - - for (const cacheKey of cacheKeys) { - await this.cacheManager.clearKey(cacheKey); + if (!workspaceDataSource) { + throw new Error(`Failed to create WorkspaceDataSource for ${cacheKey}`); } + + return workspaceDataSource; } private async getWorkspaceMetadataVersionFromCache( @@ -232,4 +200,10 @@ export class WorkspaceDatasourceFactory { return latestWorkspaceMetadataVersion; } + + public async destroy(workspaceId: string) { + await this.promiseMemoizer.clearKeys(`${workspaceId}-`, (dataSource) => { + dataSource.destroy(); + }); + } } diff --git a/packages/twenty-server/src/engine/twenty-orm/storage/__tests__/promise-memoizer.storage.spec.ts b/packages/twenty-server/src/engine/twenty-orm/storage/__tests__/promise-memoizer.storage.spec.ts new file mode 100644 index 000000000..a151b3e73 --- /dev/null +++ b/packages/twenty-server/src/engine/twenty-orm/storage/__tests__/promise-memoizer.storage.spec.ts @@ -0,0 +1,182 @@ +import { PromiseMemoizer } from 'src/engine/twenty-orm/storage/promise-memoizer.storage'; + +describe('PromiseMemoizer', () => { + let memoizer: PromiseMemoizer; + let originalDateNow: () => number; + const mockFactory = jest.fn(); + const mockOnDelete = jest.fn(); + const TTL_MS = 1000; // 1 second TTL for testing + + beforeAll(() => { + // Store the original Date.now function + originalDateNow = Date.now; + }); + + afterAll(() => { + // Restore the original Date.now function + global.Date.now = originalDateNow; + }); + + beforeEach(() => { + jest.clearAllMocks(); + + // Start with a fixed timestamp + const currentTimestamp = 1000; + + global.Date.now = jest.fn(() => currentTimestamp); + + memoizer = new PromiseMemoizer(TTL_MS); + }); + + describe('memoizePromiseAndExecute', () => { + it('should execute factory and cache result', async () => { + mockFactory.mockResolvedValue('test-value'); + + const result = await memoizer.memoizePromiseAndExecute( + 'test-key-1', + mockFactory, + ); + + expect(result).toBe('test-value'); + expect(mockFactory).toHaveBeenCalledTimes(1); + }); + + it('should return cached value within TTL', async () => { + mockFactory.mockResolvedValue('test-value'); + + await memoizer.memoizePromiseAndExecute('test-key-1', mockFactory); + + // Move time forward but still within TTL + const currentTime = Date.now(); + + jest + .spyOn(global.Date, 'now') + .mockImplementation(() => currentTime + TTL_MS / 2); + + const result = await memoizer.memoizePromiseAndExecute( + 'test-key-1', + mockFactory, + ); + + expect(result).toBe('test-value'); + expect(mockFactory).toHaveBeenCalledTimes(1); + }); + + it('should re-execute factory after TTL expires', async () => { + mockFactory.mockResolvedValue('test-value'); + + await memoizer.memoizePromiseAndExecute('test-key-1', mockFactory); + + // Move time beyond TTL + const currentTime = Date.now(); + + jest + .spyOn(global.Date, 'now') + .mockImplementation(() => currentTime + TTL_MS + 100); + + const result = await memoizer.memoizePromiseAndExecute( + 'test-key-1', + mockFactory, + ); + + expect(result).toBe('test-value'); + expect(mockFactory).toHaveBeenCalledTimes(2); + }); + + it('should handle null values', async () => { + mockFactory.mockResolvedValue(null); + + const result = await memoizer.memoizePromiseAndExecute( + 'test-key-1', + mockFactory, + ); + + expect(result).toBeNull(); + }); + + it('should deduplicate concurrent requests', async () => { + let resolveFactory: (value: string) => void; + const factoryPromise = new Promise((resolve) => { + resolveFactory = resolve; + }); + + mockFactory.mockImplementation(() => factoryPromise); + + const promise1 = memoizer.memoizePromiseAndExecute( + 'test-key-1', + mockFactory, + ); + const promise2 = memoizer.memoizePromiseAndExecute( + 'test-key-1', + mockFactory, + ); + + resolveFactory!('test-value'); + + const [result1, result2] = await Promise.all([promise1, promise2]); + + expect(result1).toBe('test-value'); + expect(result2).toBe('test-value'); + expect(mockFactory).toHaveBeenCalledTimes(1); + }); + }); + + describe('clearKey', () => { + it('should clear specific key and call onDelete', async () => { + mockFactory.mockResolvedValue('test-value'); + await memoizer.memoizePromiseAndExecute('test-key-1', mockFactory); + + await memoizer.clearKey('test-key-1', mockOnDelete); + + const result = await memoizer.memoizePromiseAndExecute( + 'test-key-1', + mockFactory, + ); + + expect(result).toBe('test-value'); + expect(mockOnDelete).toHaveBeenCalledWith('test-value'); + expect(mockFactory).toHaveBeenCalledTimes(2); + }); + + it('should handle non-existent key', async () => { + await memoizer.clearKey('non-existent-key-1', mockOnDelete); + expect(mockOnDelete).not.toHaveBeenCalled(); + }); + }); + + describe('clearKeys', () => { + it('should clear all keys with matching prefix', async () => { + mockFactory.mockResolvedValue('test-value'); + await memoizer.memoizePromiseAndExecute('prefix-key-1', mockFactory); + await memoizer.memoizePromiseAndExecute('prefix-key-2', mockFactory); + await memoizer.memoizePromiseAndExecute('other-key-1', mockFactory); + + mockFactory.mockClear(); + await memoizer.clearKeys('prefix-key', mockOnDelete); + + await memoizer.memoizePromiseAndExecute('prefix-key-1', mockFactory); + await memoizer.memoizePromiseAndExecute('prefix-key-2', mockFactory); + await memoizer.memoizePromiseAndExecute('other-key-1', mockFactory); + + expect(mockFactory).toHaveBeenCalledTimes(2); // Only prefix keys should be re-executed + expect(mockOnDelete).toHaveBeenCalledTimes(2); // Only prefix keys should trigger onDelete + }); + }); + + describe('clearAll', () => { + it('should clear all cached values', async () => { + mockFactory.mockResolvedValue('test-value'); + await memoizer.memoizePromiseAndExecute('key-1-1', mockFactory); + await memoizer.memoizePromiseAndExecute('key-1-2', mockFactory); + + mockFactory.mockClear(); + await memoizer.clearAll(mockOnDelete); + + await memoizer.memoizePromiseAndExecute('key-1-1', mockFactory); + await memoizer.memoizePromiseAndExecute('key-1-2', mockFactory); + + expect(mockOnDelete).toHaveBeenCalledTimes(2); + expect(mockFactory).toHaveBeenCalledTimes(2); + }); + }); +}); diff --git a/packages/twenty-server/src/engine/twenty-orm/storage/cache-manager.storage.ts b/packages/twenty-server/src/engine/twenty-orm/storage/cache-manager.storage.ts deleted file mode 100644 index d5d0bb405..000000000 --- a/packages/twenty-server/src/engine/twenty-orm/storage/cache-manager.storage.ts +++ /dev/null @@ -1,67 +0,0 @@ -import { isDefined } from 'twenty-shared/utils'; - -import { CacheKey } from 'src/engine/twenty-orm/storage/types/cache-key.type'; - -type AsyncFactoryCallback = () => Promise; - -export class CacheManager { - private cache = new Map(); - - async execute( - cacheKey: CacheKey, - factory: AsyncFactoryCallback, - onDelete?: (value: T) => Promise | void, - ): Promise { - const [workspaceId] = cacheKey.split('-'); - - const cachedValue = this.cache.get(cacheKey); - - if (isDefined(cachedValue)) { - return cachedValue; - } - - for (const key of this.cache.keys()) { - if (key.startsWith(`${workspaceId}-`)) { - const cachedValue = this.cache.get(key); - - if (cachedValue) { - await onDelete?.(cachedValue); - } - this.cache.delete(key); - } - } - - const value = await factory(); - - if (!value) { - return null; - } - - this.cache.set(cacheKey, value); - - return value; - } - - async clearKey( - cacheKey: CacheKey, - onDelete?: (value: T) => Promise | void, - ): Promise { - const cachedValue = this.cache.get(cacheKey); - - if (isDefined(cachedValue)) { - await onDelete?.(cachedValue); - this.cache.delete(cacheKey); - } - // TODO: remove this once we have debug on prod - // eslint-disable-next-line no-console - console.log('Datasource cache size: ', this.cache.size); - } - - async clear(onDelete?: (value: T) => Promise | void): Promise { - for (const value of this.cache.values()) { - await onDelete?.(value); - this.cache.delete(value as any); - } - this.cache.clear(); - } -} diff --git a/packages/twenty-server/src/engine/twenty-orm/storage/promise-memoizer.storage.ts b/packages/twenty-server/src/engine/twenty-orm/storage/promise-memoizer.storage.ts new file mode 100644 index 000000000..61bc465cb --- /dev/null +++ b/packages/twenty-server/src/engine/twenty-orm/storage/promise-memoizer.storage.ts @@ -0,0 +1,104 @@ +import { Milliseconds } from 'cache-manager'; +import { isDefined } from 'twenty-shared/utils'; + +import { CacheKey } from 'src/engine/twenty-orm/storage/types/cache-key.type'; + +type AsyncFactoryCallback = () => Promise; + +const ONE_HOUR_IN_MS = 3600_000; + +export class PromiseMemoizer { + private cache = new Map(); + private pending = new Map>(); + private ttlMs: number; + + constructor(ttlMs: Milliseconds = ONE_HOUR_IN_MS) { + this.ttlMs = ttlMs; + } + + async memoizePromiseAndExecute( + cacheKey: CacheKey, + factory: AsyncFactoryCallback, + onDelete?: (value: T) => Promise | void, + ): Promise { + const now = Date.now(); + + await this.clearExpiredKeys(onDelete); + + const cachedEntry = this.cache.get(cacheKey); + + if (cachedEntry) { + return cachedEntry.value; + } + + const existingPromise = this.pending.get(cacheKey); + + if (existingPromise) { + return existingPromise; + } + + // eslint-disable-next-line no-console + console.log( + `Computing new Datasource for cacheKey: ${cacheKey} out of ${this.cache.size}`, + ); + + const newPromise = (async () => { + try { + const value = await factory(); + + if (value) { + this.cache.set(cacheKey, { value, ttl: now + this.ttlMs }); + } + + return value; + } finally { + this.pending.delete(cacheKey); + } + })(); + + this.pending.set(cacheKey, newPromise); + + return newPromise; + } + + async clearExpiredKeys(onDelete?: (value: T) => Promise | void) { + const now = Date.now(); + + for (const [cacheKey, cachedEntry] of this.cache.entries()) { + if (cachedEntry.ttl < now) { + await this.clearKey(cacheKey, onDelete); + } + } + } + + async clearKey( + cacheKey: CacheKey, + onDelete?: (value: T) => Promise | void, + ): Promise { + const cachedValue = this.cache.get(cacheKey); + + if (isDefined(cachedValue)) { + await onDelete?.(cachedValue.value); + } + this.cache.delete(cacheKey); + } + + async clearKeys( + cacheKeyPrefix: CacheKey, + onDelete?: (value: T) => Promise | void, + ): Promise { + for (const cacheKey of [...this.cache.keys()]) { + if (cacheKey.startsWith(cacheKeyPrefix)) { + await this.clearKey(cacheKey, onDelete); + } + } + } + + async clearAll(onDelete?: (value: T) => Promise | void): Promise { + for (const [, entry] of this.cache.entries()) { + await onDelete?.(entry.value); + } + + this.cache.clear(); + } +} diff --git a/packages/twenty-server/src/engine/workspace-cache-storage/workspace-cache-storage.service.ts b/packages/twenty-server/src/engine/workspace-cache-storage/workspace-cache-storage.service.ts index 3a8e16646..3108ee446 100644 --- a/packages/twenty-server/src/engine/workspace-cache-storage/workspace-cache-storage.service.ts +++ b/packages/twenty-server/src/engine/workspace-cache-storage/workspace-cache-storage.service.ts @@ -73,7 +73,7 @@ export class WorkspaceCacheStorageService { return this.cacheStorageService.set( `${WorkspaceCacheKeys.MetadataObjectMetadataOngoingCachingLock}:${workspaceId}:${metadataVersion}`, true, - TTL_INFINITE, + 1_000 * 60, // 1 minute ); }