Fix object metadata collection not found (#11306)
## Context This fix ensures that even if a datasource creation promise throws and is cached, subsequent requests won't return that cached exception. Also adding a TTL on MetadataObjectMetadataOngoingCachingLock, this is not something that should stay in the cache forever and could potentially unlock some race conditions (the origin of the issue is probably due to performances where the lock is not removed as it should be after metadata computation and caching)
This commit is contained in:
@ -13,18 +13,14 @@ import {
|
||||
TwentyORMExceptionCode,
|
||||
} from 'src/engine/twenty-orm/exceptions/twenty-orm.exception';
|
||||
import { EntitySchemaFactory } from 'src/engine/twenty-orm/factories/entity-schema.factory';
|
||||
import { CacheManager } from 'src/engine/twenty-orm/storage/cache-manager.storage';
|
||||
import { PromiseMemoizer } from 'src/engine/twenty-orm/storage/promise-memoizer.storage';
|
||||
import { CacheKey } from 'src/engine/twenty-orm/storage/types/cache-key.type';
|
||||
import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service';
|
||||
|
||||
@Injectable()
|
||||
export class WorkspaceDatasourceFactory {
|
||||
private readonly logger = new Logger(WorkspaceDatasourceFactory.name);
|
||||
private cacheManager = new CacheManager<WorkspaceDataSource>();
|
||||
private cachedDataSourcePromise: Record<
|
||||
CacheKey,
|
||||
Promise<WorkspaceDataSource>
|
||||
>;
|
||||
private promiseMemoizer = new PromiseMemoizer<WorkspaceDataSource>();
|
||||
|
||||
constructor(
|
||||
private readonly dataSourceService: DataSourceService,
|
||||
@ -32,9 +28,7 @@ export class WorkspaceDatasourceFactory {
|
||||
private readonly workspaceCacheStorageService: WorkspaceCacheStorageService,
|
||||
private readonly workspaceMetadataCacheService: WorkspaceMetadataCacheService,
|
||||
private readonly entitySchemaFactory: EntitySchemaFactory,
|
||||
) {
|
||||
this.cachedDataSourcePromise = {};
|
||||
}
|
||||
) {}
|
||||
|
||||
public async create(
|
||||
workspaceId: string,
|
||||
@ -59,142 +53,116 @@ export class WorkspaceDatasourceFactory {
|
||||
|
||||
const cacheKey: CacheKey = `${workspaceId}-${cachedWorkspaceMetadataVersion}`;
|
||||
|
||||
if (cacheKey in this.cachedDataSourcePromise) {
|
||||
return this.cachedDataSourcePromise[cacheKey];
|
||||
}
|
||||
|
||||
const creationPromise = (async (): Promise<WorkspaceDataSource> => {
|
||||
try {
|
||||
const result = await this.cacheManager.execute(
|
||||
cacheKey,
|
||||
async () => {
|
||||
const dataSourceMetadata =
|
||||
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceId(
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (!dataSourceMetadata) {
|
||||
throw new TwentyORMException(
|
||||
`Workspace Schema not found for workspace ${workspaceId}`,
|
||||
TwentyORMExceptionCode.WORKSPACE_SCHEMA_NOT_FOUND,
|
||||
);
|
||||
}
|
||||
|
||||
const cachedEntitySchemaOptions =
|
||||
await this.workspaceCacheStorageService.getORMEntitySchema(
|
||||
workspaceId,
|
||||
cachedWorkspaceMetadataVersion,
|
||||
);
|
||||
|
||||
let cachedEntitySchemas: EntitySchema[];
|
||||
|
||||
const cachedObjectMetadataMaps =
|
||||
await this.workspaceCacheStorageService.getObjectMetadataMaps(
|
||||
workspaceId,
|
||||
cachedWorkspaceMetadataVersion,
|
||||
);
|
||||
|
||||
if (!cachedObjectMetadataMaps) {
|
||||
throw new TwentyORMException(
|
||||
`Object metadata collection not found for workspace ${workspaceId}`,
|
||||
TwentyORMExceptionCode.METADATA_COLLECTION_NOT_FOUND,
|
||||
);
|
||||
}
|
||||
|
||||
if (cachedEntitySchemaOptions) {
|
||||
cachedEntitySchemas = cachedEntitySchemaOptions.map(
|
||||
(option) => new EntitySchema(option),
|
||||
);
|
||||
} else {
|
||||
const entitySchemas = await Promise.all(
|
||||
Object.values(cachedObjectMetadataMaps.byId).map(
|
||||
(objectMetadata) =>
|
||||
this.entitySchemaFactory.create(
|
||||
workspaceId,
|
||||
cachedWorkspaceMetadataVersion,
|
||||
objectMetadata,
|
||||
cachedObjectMetadataMaps,
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
await this.workspaceCacheStorageService.setORMEntitySchema(
|
||||
workspaceId,
|
||||
cachedWorkspaceMetadataVersion,
|
||||
entitySchemas.map((entitySchema) => entitySchema.options),
|
||||
);
|
||||
|
||||
cachedEntitySchemas = entitySchemas;
|
||||
}
|
||||
|
||||
const workspaceDataSource = new WorkspaceDataSource(
|
||||
{
|
||||
workspaceId,
|
||||
objectMetadataMaps: cachedObjectMetadataMaps,
|
||||
},
|
||||
{
|
||||
url:
|
||||
dataSourceMetadata.url ??
|
||||
this.environmentService.get('PG_DATABASE_URL'),
|
||||
type: 'postgres',
|
||||
logging:
|
||||
this.environmentService.get('NODE_ENV') ===
|
||||
NodeEnvironment.development
|
||||
? ['query', 'error']
|
||||
: ['error'],
|
||||
schema: dataSourceMetadata.schema,
|
||||
entities: cachedEntitySchemas,
|
||||
ssl: this.environmentService.get('PG_SSL_ALLOW_SELF_SIGNED')
|
||||
? {
|
||||
rejectUnauthorized: false,
|
||||
}
|
||||
: undefined,
|
||||
},
|
||||
const workspaceDataSource =
|
||||
await this.promiseMemoizer.memoizePromiseAndExecute(
|
||||
cacheKey,
|
||||
async () => {
|
||||
const dataSourceMetadata =
|
||||
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceId(
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await workspaceDataSource.initialize();
|
||||
if (!dataSourceMetadata) {
|
||||
throw new TwentyORMException(
|
||||
`Workspace Schema not found for workspace ${workspaceId}`,
|
||||
TwentyORMExceptionCode.WORKSPACE_SCHEMA_NOT_FOUND,
|
||||
);
|
||||
}
|
||||
|
||||
return workspaceDataSource;
|
||||
},
|
||||
async (dataSource) => {
|
||||
try {
|
||||
await dataSource.destroy();
|
||||
} catch (error) {
|
||||
// Ignore error if pool has already been destroyed which is a common race condition case
|
||||
if (error.message === 'Called end on pool more than once') {
|
||||
return;
|
||||
}
|
||||
const cachedEntitySchemaOptions =
|
||||
await this.workspaceCacheStorageService.getORMEntitySchema(
|
||||
workspaceId,
|
||||
cachedWorkspaceMetadataVersion,
|
||||
);
|
||||
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
);
|
||||
let cachedEntitySchemas: EntitySchema[];
|
||||
|
||||
if (result === null) {
|
||||
throw new Error(
|
||||
`Failed to create WorkspaceDataSource for ${cacheKey}`,
|
||||
const cachedObjectMetadataMaps =
|
||||
await this.workspaceCacheStorageService.getObjectMetadataMaps(
|
||||
workspaceId,
|
||||
cachedWorkspaceMetadataVersion,
|
||||
);
|
||||
|
||||
if (!cachedObjectMetadataMaps) {
|
||||
throw new TwentyORMException(
|
||||
`Object metadata collection not found for workspace ${workspaceId}`,
|
||||
TwentyORMExceptionCode.METADATA_COLLECTION_NOT_FOUND,
|
||||
);
|
||||
}
|
||||
|
||||
if (cachedEntitySchemaOptions) {
|
||||
cachedEntitySchemas = cachedEntitySchemaOptions.map(
|
||||
(option) => new EntitySchema(option),
|
||||
);
|
||||
} else {
|
||||
const entitySchemas = await Promise.all(
|
||||
Object.values(cachedObjectMetadataMaps.byId).map(
|
||||
(objectMetadata) =>
|
||||
this.entitySchemaFactory.create(
|
||||
workspaceId,
|
||||
cachedWorkspaceMetadataVersion,
|
||||
objectMetadata,
|
||||
cachedObjectMetadataMaps,
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
await this.workspaceCacheStorageService.setORMEntitySchema(
|
||||
workspaceId,
|
||||
cachedWorkspaceMetadataVersion,
|
||||
entitySchemas.map((entitySchema) => entitySchema.options),
|
||||
);
|
||||
|
||||
cachedEntitySchemas = entitySchemas;
|
||||
}
|
||||
|
||||
const workspaceDataSource = new WorkspaceDataSource(
|
||||
{
|
||||
workspaceId,
|
||||
objectMetadataMaps: cachedObjectMetadataMaps,
|
||||
},
|
||||
{
|
||||
url:
|
||||
dataSourceMetadata.url ??
|
||||
this.environmentService.get('PG_DATABASE_URL'),
|
||||
type: 'postgres',
|
||||
logging:
|
||||
this.environmentService.get('NODE_ENV') ===
|
||||
NodeEnvironment.development
|
||||
? ['query', 'error']
|
||||
: ['error'],
|
||||
schema: dataSourceMetadata.schema,
|
||||
entities: cachedEntitySchemas,
|
||||
ssl: this.environmentService.get('PG_SSL_ALLOW_SELF_SIGNED')
|
||||
? {
|
||||
rejectUnauthorized: false,
|
||||
}
|
||||
: undefined,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
return result;
|
||||
} finally {
|
||||
delete this.cachedDataSourcePromise[cacheKey];
|
||||
}
|
||||
})();
|
||||
await workspaceDataSource.initialize();
|
||||
|
||||
this.cachedDataSourcePromise[cacheKey] = creationPromise;
|
||||
return workspaceDataSource;
|
||||
},
|
||||
async (dataSource) => {
|
||||
try {
|
||||
await dataSource.destroy();
|
||||
} catch (error) {
|
||||
// Ignore error if pool has already been destroyed which is a common race condition case
|
||||
if (error.message === 'Called end on pool more than once') {
|
||||
return;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
return creationPromise;
|
||||
}
|
||||
|
||||
public async destroy(workspaceId: string): Promise<void> {
|
||||
const cacheKeys = (
|
||||
Object.keys(this.cachedDataSourcePromise) as CacheKey[]
|
||||
).filter((key) => key.startsWith(`${workspaceId}`));
|
||||
|
||||
for (const cacheKey of cacheKeys) {
|
||||
await this.cacheManager.clearKey(cacheKey);
|
||||
if (!workspaceDataSource) {
|
||||
throw new Error(`Failed to create WorkspaceDataSource for ${cacheKey}`);
|
||||
}
|
||||
|
||||
return workspaceDataSource;
|
||||
}
|
||||
|
||||
private async getWorkspaceMetadataVersionFromCache(
|
||||
@ -232,4 +200,10 @@ export class WorkspaceDatasourceFactory {
|
||||
|
||||
return latestWorkspaceMetadataVersion;
|
||||
}
|
||||
|
||||
public async destroy(workspaceId: string) {
|
||||
await this.promiseMemoizer.clearKeys(`${workspaceId}-`, (dataSource) => {
|
||||
dataSource.destroy();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user