Add db event emitter in twenty orm (#13167)

## Context
Add an eventEmitter instance to twenty datasources so we can emit DB
events.
Add input and output formatting to twenty orm (formatData, formatResult)
Those 2 elements simplified existing logic when we interact with the
ORM, input will be formatted by the ORM so we can directly use
field-like structure instead of column-like. The output will be
formatted, for builder queries it will be in `result.generatedMaps`
where `result.raw` preserves the previous column-like structure.

Important change: We now have an authContext that we can pass when we
get a repository, this will be used for the different events emitted in
the ORM. We also removed the caching for repositories as it was not
scaling well and not necessary imho

Note: An upcoming PR should handle the onDelete: cascade behavior where
we send DESTROY events in cascade when there is an onDelete: CASCADE on
the FK.

---------

Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
Weiko
2025-07-17 18:07:28 +02:00
committed by GitHub
parent 4a3139c9e0
commit 2deac9448e
79 changed files with 1061 additions and 2016 deletions

View File

@ -55,8 +55,64 @@ describe('WorkspaceEntityManager', () => {
mockInternalContext = {
workspaceId: 'test-workspace-id',
objectMetadataMaps: {
idByNameSingular: {},
byId: {
'test-entity-id': {
id: 'test-entity-id',
nameSingular: 'test-entity',
namePlural: 'test-entities',
labelSingular: 'Test Entity',
labelPlural: 'Test Entities',
workspaceId: 'test-workspace-id',
icon: 'test-icon',
color: 'test-color',
isCustom: false,
isRemote: false,
isAuditLogged: false,
isSearchable: false,
isSystem: false,
isActive: true,
targetTableName: 'test_entity',
indexMetadatas: [],
fieldsById: {
'field-id': {
id: 'field-id',
type: 'TEXT',
name: 'fieldName',
label: 'Field Name',
objectMetadataId: 'test-entity-id',
isNullable: true,
isLabelSyncedWithName: false,
createdAt: new Date(),
updatedAt: new Date(),
},
},
fieldIdByName: { fieldName: 'field-id' },
fieldIdByJoinColumnName: {},
},
},
idByNameSingular: {
'test-entity': 'test-entity-id',
},
},
featureFlagsMap: {
IS_AIRTABLE_INTEGRATION_ENABLED: false,
IS_POSTGRESQL_INTEGRATION_ENABLED: false,
IS_STRIPE_INTEGRATION_ENABLED: false,
IS_UNIQUE_INDEXES_ENABLED: false,
IS_JSON_FILTER_ENABLED: false,
IS_AI_ENABLED: false,
IS_IMAP_SMTP_CALDAV_ENABLED: false,
IS_MORPH_RELATION_ENABLED: false,
IS_WORKFLOW_FILTERING_ENABLED: false,
IS_RELATION_CONNECT_ENABLED: false,
IS_WORKSPACE_API_KEY_WEBHOOK_GRAPHQL_ENABLED: false,
IS_FIELDS_PERMISSIONS_ENABLED: false,
},
eventEmitterService: {
emitMutationEvent: jest.fn(),
emitDatabaseBatchEvent: jest.fn(),
emitCustomBatchEvent: jest.fn(),
} as any,
} as WorkspaceInternalContext;
mockDataSource = {

View File

@ -8,6 +8,7 @@ import {
FindManyOptions,
FindOneOptions,
FindOptionsWhere,
In,
InsertResult,
ObjectId,
ObjectLiteral,
@ -32,6 +33,8 @@ import { InstanceChecker } from 'typeorm/util/InstanceChecker';
import { FeatureFlagMap } from 'src/engine/core-modules/feature-flag/interfaces/feature-flag-map.interface';
import { WorkspaceInternalContext } from 'src/engine/twenty-orm/interfaces/workspace-internal-context.interface';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { AuthContext } from 'src/engine/core-modules/auth/types/auth-context.type';
import {
PermissionsException,
PermissionsExceptionCode,
@ -51,6 +54,8 @@ import { WorkspaceSelectQueryBuilder } from 'src/engine/twenty-orm/repository/wo
import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository';
import { computeRelationConnectQueryConfigs } from 'src/engine/twenty-orm/utils/compute-relation-connect-query-configs.util';
import { createSqlWhereTupleInClause } from 'src/engine/twenty-orm/utils/create-sql-where-tuple-in-clause.utils';
import { formatData } from 'src/engine/twenty-orm/utils/format-data.util';
import { formatResult } from 'src/engine/twenty-orm/utils/format-result.util';
import { getObjectMetadataFromEntityTarget } from 'src/engine/twenty-orm/utils/get-object-metadata-from-entity-target.util';
import { getRecordToConnectFields } from 'src/engine/twenty-orm/utils/get-record-to-connect-fields.util';
@ -85,22 +90,10 @@ export class WorkspaceEntityManager extends EntityManager {
shouldBypassPermissionChecks?: boolean;
roleId?: string;
},
authContext?: AuthContext,
): WorkspaceRepository<Entity> {
const dataSource = this.connection;
const repositoryKey = this.getRepositoryKey({
target,
dataSource,
roleId: permissionOptions?.roleId,
shouldBypassPermissionChecks:
permissionOptions?.shouldBypassPermissionChecks ?? false,
});
const repoFromMap = this.repositories.get(repositoryKey);
if (repoFromMap) {
return repoFromMap as WorkspaceRepository<Entity>;
}
let objectPermissions = {};
if (permissionOptions?.roleId) {
@ -128,10 +121,9 @@ export class WorkspaceEntityManager extends EntityManager {
this.queryRunner,
objectPermissions,
permissionOptions?.shouldBypassPermissionChecks,
authContext,
);
this.repositories.set(repositoryKey, newRepository);
return newRepository;
}
@ -360,32 +352,6 @@ export class WorkspaceEntityManager extends EntityManager {
.execute();
}
private getRepositoryKey({
target,
dataSource,
roleId,
shouldBypassPermissionChecks,
}: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
target: EntityTarget<unknown>;
dataSource: WorkspaceDataSource;
shouldBypassPermissionChecks: boolean;
roleId?: string;
}) {
const repositoryPrefix = dataSource.getMetadata(target).name;
const roleIdSuffix = roleId ? `_${roleId}` : '';
const rolesPermissionsVersionSuffix = dataSource.rolesPermissionsVersion
? `_${dataSource.rolesPermissionsVersion}`
: '';
const featureFlagMapVersionSuffix = dataSource.featureFlagMapVersion
? `_${dataSource.featureFlagMapVersion}`
: '';
return shouldBypassPermissionChecks
? `${repositoryPrefix}_bypass${featureFlagMapVersionSuffix}`
: `${repositoryPrefix}${roleIdSuffix}${rolesPermissionsVersionSuffix}${featureFlagMapVersionSuffix}`;
}
validatePermissions<Entity extends ObjectLiteral>(
target: EntityTarget<Entity> | Entity,
operationType: OperationType,
@ -900,6 +866,13 @@ export class WorkspaceEntityManager extends EntityManager {
entityLike: DeepPartial<Entity>,
permissionOptions?: PermissionOptions,
): Promise<Entity | undefined> {
const objectMetadataItem = getObjectMetadataFromEntityTarget(
entityClass,
this.internalContext,
);
const formattedEntityLike = formatData(entityLike, objectMetadataItem);
const managerWithPermissionOptions = Object.assign(
Object.create(Object.getPrototypeOf(this)),
this,
@ -915,12 +888,16 @@ export class WorkspaceEntityManager extends EntityManager {
new PlainObjectToDatabaseEntityTransformer(managerWithPermissionOptions);
const transformedEntity =
await plainObjectToDatabaseEntityTransformer.transform(
entityLike,
formattedEntityLike,
metadata,
);
if (transformedEntity)
return this.merge(entityClass, transformedEntity, entityLike) as Entity;
return this.merge(
entityClass,
transformedEntity,
formattedEntityLike,
) as Entity;
return undefined;
}
@ -1075,17 +1052,81 @@ export class WorkspaceEntityManager extends EntityManager {
const queryRunnerForEntityPersistExecutor =
this.connection.createQueryRunnerForEntityPersistExecutor();
return new EntityPersistExecutor(
const isEntityArray = Array.isArray(entity);
const entityTarget =
target ?? (isEntityArray ? entity[0]?.constructor : entity.constructor);
const entityArray = isEntityArray ? entity : [entity];
const entityIds = entityArray.map((e) => (e as { id: string }).id);
const beforeUpdate = await this.find(
entityTarget,
{
where: { id: In(entityIds) },
},
permissionOptions,
);
const beforeUpdateMapById = beforeUpdate.reduce(
(acc, e: ObjectLiteral) => {
acc[e.id] = e;
return acc;
},
{} as Record<string, ObjectLiteral>,
);
const objectMetadataItem = getObjectMetadataFromEntityTarget(
entityTarget,
this.internalContext,
);
const formattedEntityOrEntities = formatData(
entityArray,
objectMetadataItem,
);
const result = await new EntityPersistExecutor(
this.connection,
queryRunnerForEntityPersistExecutor,
'save',
target,
entity as ObjectLiteral,
formattedEntityOrEntities as ObjectLiteral[],
options as SaveOptions | (SaveOptions & { reload: false }),
)
.execute()
.then(() => entity as Entity)
.then(() => formattedEntityOrEntities as Entity[])
.finally(() => queryRunnerForEntityPersistExecutor.release());
const resultArray = Array.isArray(result) ? result : [result];
const formattedResult = formatResult<Entity[]>(
resultArray,
objectMetadataItem,
this.internalContext.objectMetadataMaps,
);
for (const entity of formattedResult) {
const isUpdate = beforeUpdateMapById[entity.id];
if (isUpdate) {
await this.internalContext.eventEmitterService.emitMutationEvent({
action: DatabaseEventAction.UPDATED,
objectMetadataItem,
workspaceId: this.internalContext.workspaceId,
entities: [entity],
beforeEntities: beforeUpdateMapById[entity.id],
});
} else {
await this.internalContext.eventEmitterService.emitMutationEvent({
action: DatabaseEventAction.CREATED,
objectMetadataItem,
workspaceId: this.internalContext.workspaceId,
entities: [entity],
});
}
}
return isEntityArray ? formattedResult : formattedResult[0];
}
override remove<Entity>(
@ -1145,23 +1186,49 @@ export class WorkspaceEntityManager extends EntityManager {
? maybeOptionsOrMaybePermissionOptions
: entityOrMaybeOptions;
if (Array.isArray(entity) && entity.length === 0)
return Promise.resolve(entity);
const isEntityArray = Array.isArray(entity);
if (isEntityArray && entity.length === 0) return Promise.resolve(entity);
const queryRunnerForEntityPersistExecutor =
this.connection.createQueryRunnerForEntityPersistExecutor();
return new EntityPersistExecutor(
const entityTarget =
target ?? (isEntityArray ? entity[0]?.constructor : entity.constructor);
const objectMetadataItem = getObjectMetadataFromEntityTarget(
entityTarget,
this.internalContext,
);
const formattedEntity = formatData(entity, objectMetadataItem);
const result = new EntityPersistExecutor(
this.connection,
queryRunnerForEntityPersistExecutor,
'remove',
target as string | undefined,
entity as ObjectLiteral,
formattedEntity as ObjectLiteral,
options as RemoveOptions,
)
.execute()
.then(() => entity as Entity | Entity[])
.then(() => formattedEntity as Entity | Entity[])
.finally(() => queryRunnerForEntityPersistExecutor.release());
const formattedResult = formatResult<Entity[]>(
result,
objectMetadataItem,
this.internalContext.objectMetadataMaps,
);
await this.internalContext.eventEmitterService.emitMutationEvent({
action: DatabaseEventAction.DESTROYED,
objectMetadataItem,
workspaceId: this.internalContext.workspaceId,
entities: formattedResult,
});
return isEntityArray ? formattedResult : formattedResult[0];
}
override softRemove<Entity extends ObjectLiteral>(
@ -1237,17 +1304,43 @@ export class WorkspaceEntityManager extends EntityManager {
const queryRunnerForEntityPersistExecutor =
this.connection.createQueryRunnerForEntityPersistExecutor();
return new EntityPersistExecutor(
const isEntityArray = Array.isArray(entity);
const entityTarget =
target ?? (isEntityArray ? entity[0]?.constructor : entity.constructor);
const objectMetadataItem = getObjectMetadataFromEntityTarget(
entityTarget,
this.internalContext,
);
const formattedEntity = formatData(entity, objectMetadataItem);
const result = new EntityPersistExecutor(
this.connection,
queryRunnerForEntityPersistExecutor,
'soft-remove',
target,
entity as ObjectLiteral,
formattedEntity as ObjectLiteral,
options as SaveOptions,
)
.execute()
.then(() => entity as Entity)
.then(() => formattedEntity as Entity)
.finally(() => queryRunnerForEntityPersistExecutor.release());
const formattedResult = formatResult<Entity[]>(
result,
objectMetadataItem,
this.internalContext.objectMetadataMaps,
);
await this.internalContext.eventEmitterService.emitMutationEvent({
action: DatabaseEventAction.DELETED,
objectMetadataItem,
workspaceId: this.internalContext.workspaceId,
entities: formattedResult,
});
return isEntityArray ? formattedResult : formattedResult[0];
}
override recover<Entity>(
@ -1313,23 +1406,49 @@ export class WorkspaceEntityManager extends EntityManager {
: entityOrEntitiesOrMaybeOptions;
if (InstanceChecker.isEntitySchema(target)) target = target.options.name;
if (Array.isArray(entity) && entity.length === 0)
return Promise.resolve(entity);
const isEntityArray = Array.isArray(entity);
if (isEntityArray && entity.length === 0) return Promise.resolve(entity);
const queryRunnerForEntityPersistExecutor =
this.connection.createQueryRunnerForEntityPersistExecutor();
return new EntityPersistExecutor(
const entityTarget =
target ?? (isEntityArray ? entity[0]?.constructor : entity.constructor);
const objectMetadataItem = getObjectMetadataFromEntityTarget(
entityTarget,
this.internalContext,
);
const formattedEntity = formatData(entity, objectMetadataItem);
const result = new EntityPersistExecutor(
this.connection,
queryRunnerForEntityPersistExecutor,
'recover',
target,
entity as ObjectLiteral,
formattedEntity as ObjectLiteral,
options as SaveOptions,
)
.execute()
.then(() => entity as Entity)
.then(() => formattedEntity as Entity)
.finally(() => queryRunnerForEntityPersistExecutor.release());
const formattedResult = formatResult<Entity[]>(
result,
objectMetadataItem,
this.internalContext.objectMetadataMaps,
);
await this.internalContext.eventEmitterService.emitMutationEvent({
action: DatabaseEventAction.RESTORED,
objectMetadataItem,
workspaceId: this.internalContext.workspaceId,
entities: formattedResult,
});
return isEntityArray ? formattedResult : formattedResult[0];
}
// Forbidden methods