Provide a wrapper to execute command on workspace with easier devXP (#10391)
Proposal: - Add a method in ActiveWorkspaceCommand to loop over workspace safely (add counter, add try / catch, provide datasource with fresh cache, destroy datasource => as we do always do it) Also in this PR: - make sure we clear all dataSources (and not only the one on metadata version in RAM)
This commit is contained in:
@ -13,6 +13,7 @@ import {
|
||||
import { BillingCustomer } from 'src/engine/core-modules/billing/entities/billing-customer.entity';
|
||||
import { StripeSubscriptionService } from 'src/engine/core-modules/billing/stripe/services/stripe-subscription.service';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
|
||||
interface SyncCustomerDataCommandOptions
|
||||
extends ActiveWorkspacesCommandOptions {}
|
||||
@ -28,8 +29,9 @@ export class BillingSyncCustomerDataCommand extends ActiveWorkspacesCommandRunne
|
||||
private readonly stripeSubscriptionService: StripeSubscriptionService,
|
||||
@InjectRepository(BillingCustomer, 'core')
|
||||
protected readonly billingCustomerRepository: Repository<BillingCustomer>,
|
||||
protected readonly twentyORMGlobalManager: TwentyORMGlobalManager,
|
||||
) {
|
||||
super(workspaceRepository);
|
||||
super(workspaceRepository, twentyORMGlobalManager);
|
||||
}
|
||||
|
||||
async executeActiveWorkspacesCommand(
|
||||
|
||||
@ -14,13 +14,17 @@ import {
|
||||
} from 'src/engine/twenty-orm/exceptions/twenty-orm.exception';
|
||||
import { EntitySchemaFactory } from 'src/engine/twenty-orm/factories/entity-schema.factory';
|
||||
import { CacheManager } from 'src/engine/twenty-orm/storage/cache-manager.storage';
|
||||
import { CacheKey } from 'src/engine/twenty-orm/storage/types/cache-key.type';
|
||||
import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service';
|
||||
|
||||
@Injectable()
|
||||
export class WorkspaceDatasourceFactory {
|
||||
private readonly logger = new Logger(WorkspaceDatasourceFactory.name);
|
||||
private cacheManager = new CacheManager<WorkspaceDataSource>();
|
||||
private cachedDatasourcePromise: Record<string, Promise<WorkspaceDataSource>>;
|
||||
private cachedDataSourcePromise: Record<
|
||||
CacheKey,
|
||||
Promise<WorkspaceDataSource>
|
||||
>;
|
||||
|
||||
constructor(
|
||||
private readonly dataSourceService: DataSourceService,
|
||||
@ -29,7 +33,7 @@ export class WorkspaceDatasourceFactory {
|
||||
private readonly workspaceMetadataCacheService: WorkspaceMetadataCacheService,
|
||||
private readonly entitySchemaFactory: EntitySchemaFactory,
|
||||
) {
|
||||
this.cachedDatasourcePromise = {};
|
||||
this.cachedDataSourcePromise = {};
|
||||
}
|
||||
|
||||
public async create(
|
||||
@ -53,16 +57,16 @@ export class WorkspaceDatasourceFactory {
|
||||
);
|
||||
}
|
||||
|
||||
const cacheKey = `${workspaceId}-${cachedWorkspaceMetadataVersion}`;
|
||||
const cacheKey: CacheKey = `${workspaceId}-${cachedWorkspaceMetadataVersion}`;
|
||||
|
||||
if (cacheKey in this.cachedDatasourcePromise) {
|
||||
return this.cachedDatasourcePromise[cacheKey];
|
||||
if (cacheKey in this.cachedDataSourcePromise) {
|
||||
return this.cachedDataSourcePromise[cacheKey];
|
||||
}
|
||||
|
||||
const creationPromise = (async (): Promise<WorkspaceDataSource> => {
|
||||
try {
|
||||
const result = await this.cacheManager.execute(
|
||||
cacheKey as '`${string}-${string}`',
|
||||
cacheKey,
|
||||
async () => {
|
||||
this.logger.log(
|
||||
`Creating workspace data source for workspace ${workspaceId} and metadata version ${cachedWorkspaceMetadataVersion}`,
|
||||
@ -178,22 +182,23 @@ export class WorkspaceDatasourceFactory {
|
||||
|
||||
return result;
|
||||
} finally {
|
||||
delete this.cachedDatasourcePromise[cacheKey];
|
||||
delete this.cachedDataSourcePromise[cacheKey];
|
||||
}
|
||||
})();
|
||||
|
||||
this.cachedDatasourcePromise[cacheKey] = creationPromise;
|
||||
this.cachedDataSourcePromise[cacheKey] = creationPromise;
|
||||
|
||||
return creationPromise;
|
||||
}
|
||||
|
||||
public async destroy(workspaceId: string): Promise<void> {
|
||||
const cachedWorkspaceMetadataVersion =
|
||||
await this.workspaceCacheStorageService.getMetadataVersion(workspaceId);
|
||||
const cacheKeys = (
|
||||
Object.keys(this.cachedDataSourcePromise) as CacheKey[]
|
||||
).filter((key) => key.startsWith(`${workspaceId}`));
|
||||
|
||||
await this.cacheManager.clearKey(
|
||||
`${workspaceId}-${cachedWorkspaceMetadataVersion}`,
|
||||
);
|
||||
for (const cacheKey of cacheKeys) {
|
||||
await this.cacheManager.clearKey(cacheKey);
|
||||
}
|
||||
}
|
||||
|
||||
private async getWorkspaceMetadataVersionFromCache(
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
import { isDefined } from 'twenty-shared';
|
||||
|
||||
type CacheKey = `${string}-${string}`;
|
||||
import { CacheKey } from 'src/engine/twenty-orm/storage/types/cache-key.type';
|
||||
|
||||
type AsyncFactoryCallback<T> = () => Promise<T | null>;
|
||||
|
||||
@ -52,6 +52,9 @@ export class CacheManager<T> {
|
||||
await onDelete?.(cachedValue);
|
||||
this.cache.delete(cacheKey);
|
||||
}
|
||||
// TODO: remove this once we have debug on prod
|
||||
// eslint-disable-next-line no-console
|
||||
console.log('Datasource cache size: ', this.cache.size);
|
||||
}
|
||||
|
||||
async clear(onDelete?: (value: T) => Promise<void> | void): Promise<void> {
|
||||
|
||||
@ -0,0 +1 @@
|
||||
export type CacheKey = `${string}-${string}`;
|
||||
@ -50,8 +50,15 @@ export class TwentyORMGlobalManager {
|
||||
return repository;
|
||||
}
|
||||
|
||||
async getDataSourceForWorkspace(workspaceId: string) {
|
||||
return await this.workspaceDataSourceFactory.create(workspaceId, null);
|
||||
async getDataSourceForWorkspace(
|
||||
workspaceId: string,
|
||||
failOnMetadataCacheMiss = true,
|
||||
) {
|
||||
return await this.workspaceDataSourceFactory.create(
|
||||
workspaceId,
|
||||
null,
|
||||
failOnMetadataCacheMiss,
|
||||
);
|
||||
}
|
||||
|
||||
async destroyDataSourceForWorkspace(workspaceId: string) {
|
||||
|
||||
@ -6,6 +6,7 @@ import { Repository } from 'typeorm';
|
||||
import { ActiveWorkspacesCommandRunner } from 'src/database/commands/active-workspaces.command';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
import { WorkspaceHealthService } from 'src/engine/workspace-manager/workspace-health/workspace-health.service';
|
||||
import { WorkspaceSyncMetadataService } from 'src/engine/workspace-manager/workspace-sync-metadata/workspace-sync-metadata.service';
|
||||
|
||||
@ -30,8 +31,9 @@ export class SyncWorkspaceMetadataCommand extends ActiveWorkspacesCommandRunner
|
||||
private readonly workspaceHealthService: WorkspaceHealthService,
|
||||
private readonly dataSourceService: DataSourceService,
|
||||
private readonly syncWorkspaceLoggerService: SyncWorkspaceLoggerService,
|
||||
protected readonly twentyORMGlobalManager: TwentyORMGlobalManager,
|
||||
) {
|
||||
super(workspaceRepository);
|
||||
super(workspaceRepository, twentyORMGlobalManager);
|
||||
}
|
||||
|
||||
async executeActiveWorkspacesCommand(
|
||||
|
||||
Reference in New Issue
Block a user