Fix: query runner orm (#6397)
Fix WorkspaceQueryRunner events using TwentyORM Fix #6057 --------- Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
@ -50,6 +50,7 @@ import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decora
|
|||||||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||||
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||||
import { assertMutationNotOnRemoteObject } from 'src/engine/metadata-modules/object-metadata/utils/assert-mutation-not-on-remote-object.util';
|
import { assertMutationNotOnRemoteObject } from 'src/engine/metadata-modules/object-metadata/utils/assert-mutation-not-on-remote-object.util';
|
||||||
|
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||||
import { computeObjectTargetTable } from 'src/engine/utils/compute-object-target-table.util';
|
import { computeObjectTargetTable } from 'src/engine/utils/compute-object-target-table.util';
|
||||||
import { isQueryTimeoutError } from 'src/engine/utils/query-timeout.util';
|
import { isQueryTimeoutError } from 'src/engine/utils/query-timeout.util';
|
||||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||||
@ -70,6 +71,7 @@ export class WorkspaceQueryRunnerService {
|
|||||||
private readonly logger = new Logger(WorkspaceQueryRunnerService.name);
|
private readonly logger = new Logger(WorkspaceQueryRunnerService.name);
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
|
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
|
||||||
private readonly workspaceQueryBuilderFactory: WorkspaceQueryBuilderFactory,
|
private readonly workspaceQueryBuilderFactory: WorkspaceQueryBuilderFactory,
|
||||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||||
private readonly queryRunnerArgsFactory: QueryRunnerArgsFactory,
|
private readonly queryRunnerArgsFactory: QueryRunnerArgsFactory,
|
||||||
@ -370,14 +372,22 @@ export class WorkspaceQueryRunnerService {
|
|||||||
options: WorkspaceQueryRunnerOptions,
|
options: WorkspaceQueryRunnerOptions,
|
||||||
): Promise<Record | undefined> {
|
): Promise<Record | undefined> {
|
||||||
const { workspaceId, userId, objectMetadataItem } = options;
|
const { workspaceId, userId, objectMetadataItem } = options;
|
||||||
|
const repository =
|
||||||
|
await this.twentyORMGlobalManager.getRepositoryForWorkspace(
|
||||||
|
workspaceId,
|
||||||
|
objectMetadataItem.nameSingular,
|
||||||
|
);
|
||||||
|
|
||||||
assertMutationNotOnRemoteObject(objectMetadataItem);
|
assertMutationNotOnRemoteObject(objectMetadataItem);
|
||||||
assertIsValidUuid(args.id);
|
assertIsValidUuid(args.id);
|
||||||
|
|
||||||
const existingRecord = await this.findOne(
|
const existingRecord = await repository.findOne({
|
||||||
{ filter: { id: { eq: args.id } } } as FindOneResolverArgs,
|
where: { id: args.id },
|
||||||
options,
|
});
|
||||||
);
|
|
||||||
|
if (!existingRecord) {
|
||||||
|
throw new NotFoundError(`Object with id ${args.id} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
const query = await this.workspaceQueryBuilderFactory.updateOne(
|
const query = await this.workspaceQueryBuilderFactory.updateOne(
|
||||||
args,
|
args,
|
||||||
@ -412,7 +422,7 @@ export class WorkspaceQueryRunnerService {
|
|||||||
name: `${objectMetadataItem.nameSingular}.updated`,
|
name: `${objectMetadataItem.nameSingular}.updated`,
|
||||||
workspaceId,
|
workspaceId,
|
||||||
userId,
|
userId,
|
||||||
recordId: (existingRecord as Record).id,
|
recordId: existingRecord.id,
|
||||||
objectMetadata: objectMetadataItem,
|
objectMetadata: objectMetadataItem,
|
||||||
properties: {
|
properties: {
|
||||||
before: this.removeNestedProperties(existingRecord as Record),
|
before: this.removeNestedProperties(existingRecord as Record),
|
||||||
@ -428,10 +438,21 @@ export class WorkspaceQueryRunnerService {
|
|||||||
options: WorkspaceQueryRunnerOptions,
|
options: WorkspaceQueryRunnerOptions,
|
||||||
): Promise<Record[] | undefined> {
|
): Promise<Record[] | undefined> {
|
||||||
const { userId, workspaceId, objectMetadataItem } = options;
|
const { userId, workspaceId, objectMetadataItem } = options;
|
||||||
|
const repository =
|
||||||
|
await this.twentyORMGlobalManager.getRepositoryForWorkspace(
|
||||||
|
workspaceId,
|
||||||
|
objectMetadataItem.nameSingular,
|
||||||
|
);
|
||||||
|
|
||||||
assertMutationNotOnRemoteObject(objectMetadataItem);
|
assertMutationNotOnRemoteObject(objectMetadataItem);
|
||||||
args.filter?.id?.in?.forEach((id) => assertIsValidUuid(id));
|
args.filter?.id?.in?.forEach((id) => assertIsValidUuid(id));
|
||||||
|
|
||||||
|
const existingRecords = await repository.find({
|
||||||
|
where: { id: { in: args.filter?.id?.in } },
|
||||||
|
});
|
||||||
|
const mappedRecords = new Map(
|
||||||
|
existingRecords.map((record) => [record.id, record]),
|
||||||
|
);
|
||||||
const maximumRecordAffected = this.environmentService.get(
|
const maximumRecordAffected = this.environmentService.get(
|
||||||
'MUTATION_MAXIMUM_AFFECTED_RECORDS',
|
'MUTATION_MAXIMUM_AFFECTED_RECORDS',
|
||||||
);
|
);
|
||||||
@ -464,11 +485,29 @@ export class WorkspaceQueryRunnerService {
|
|||||||
options,
|
options,
|
||||||
);
|
);
|
||||||
|
|
||||||
// TODO: check - NO EVENT SENT?
|
parsedResults.forEach((record) => {
|
||||||
// OK I spent 2 hours trying to implement before/after diff and
|
const existingRecord = mappedRecords.get(record.id);
|
||||||
// figured out why it hasn't been implement
|
|
||||||
// Doing a findMany in that context is very hard as long as we don't
|
if (!existingRecord) {
|
||||||
// have a proper ORM. Let's come back to this once we do (target end of April 24?)
|
this.logger.warn(
|
||||||
|
`Record with id ${record.id} not found in the database`,
|
||||||
|
);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.updated`, {
|
||||||
|
name: `${objectMetadataItem.nameSingular}.updated`,
|
||||||
|
workspaceId,
|
||||||
|
userId,
|
||||||
|
recordId: existingRecord.id,
|
||||||
|
objectMetadata: objectMetadataItem,
|
||||||
|
properties: {
|
||||||
|
before: this.removeNestedProperties(existingRecord as Record),
|
||||||
|
after: this.removeNestedProperties(record),
|
||||||
|
},
|
||||||
|
} satisfies ObjectRecordUpdateEvent<any>);
|
||||||
|
});
|
||||||
|
|
||||||
return parsedResults;
|
return parsedResults;
|
||||||
}
|
}
|
||||||
@ -524,7 +563,7 @@ export class WorkspaceQueryRunnerService {
|
|||||||
recordId: record.id,
|
recordId: record.id,
|
||||||
objectMetadata: objectMetadataItem,
|
objectMetadata: objectMetadataItem,
|
||||||
properties: {
|
properties: {
|
||||||
before: [this.removeNestedProperties(record)],
|
before: this.removeNestedProperties(record),
|
||||||
},
|
},
|
||||||
} satisfies ObjectRecordDeleteEvent<any>);
|
} satisfies ObjectRecordDeleteEvent<any>);
|
||||||
});
|
});
|
||||||
@ -537,6 +576,11 @@ export class WorkspaceQueryRunnerService {
|
|||||||
options: WorkspaceQueryRunnerOptions,
|
options: WorkspaceQueryRunnerOptions,
|
||||||
): Promise<Record | undefined> {
|
): Promise<Record | undefined> {
|
||||||
const { workspaceId, userId, objectMetadataItem } = options;
|
const { workspaceId, userId, objectMetadataItem } = options;
|
||||||
|
const repository =
|
||||||
|
await this.twentyORMGlobalManager.getRepositoryForWorkspace(
|
||||||
|
workspaceId,
|
||||||
|
objectMetadataItem.nameSingular,
|
||||||
|
);
|
||||||
|
|
||||||
assertMutationNotOnRemoteObject(objectMetadataItem);
|
assertMutationNotOnRemoteObject(objectMetadataItem);
|
||||||
assertIsValidUuid(args.id);
|
assertIsValidUuid(args.id);
|
||||||
@ -546,19 +590,9 @@ export class WorkspaceQueryRunnerService {
|
|||||||
options,
|
options,
|
||||||
);
|
);
|
||||||
|
|
||||||
// TODO START: remove this awful patch and use our upcoming custom ORM is developed
|
const existingRecord = await repository.findOne({
|
||||||
const deletedWorkspaceMember = await this.handleDeleteWorkspaceMember(
|
where: { id: args.id },
|
||||||
args.id,
|
});
|
||||||
workspaceId,
|
|
||||||
objectMetadataItem,
|
|
||||||
);
|
|
||||||
|
|
||||||
const deletedBlocklistItem = await this.handleDeleteBlocklistItem(
|
|
||||||
args.id,
|
|
||||||
workspaceId,
|
|
||||||
objectMetadataItem,
|
|
||||||
);
|
|
||||||
// TODO END
|
|
||||||
|
|
||||||
await this.workspaceQueryHookService.executePreQueryHooks(
|
await this.workspaceQueryHookService.executePreQueryHooks(
|
||||||
userId,
|
userId,
|
||||||
@ -592,8 +626,7 @@ export class WorkspaceQueryRunnerService {
|
|||||||
objectMetadata: objectMetadataItem,
|
objectMetadata: objectMetadataItem,
|
||||||
properties: {
|
properties: {
|
||||||
before: {
|
before: {
|
||||||
...(deletedWorkspaceMember ?? {}),
|
...(existingRecord ?? {}),
|
||||||
...(deletedBlocklistItem ?? {}),
|
|
||||||
...this.removeNestedProperties(parsedResults?.[0]),
|
...this.removeNestedProperties(parsedResults?.[0]),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -608,6 +641,7 @@ export class WorkspaceQueryRunnerService {
|
|||||||
if (!record) {
|
if (!record) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const sanitizedRecord = {};
|
const sanitizedRecord = {};
|
||||||
|
|
||||||
for (const [key, value] of Object.entries(record)) {
|
for (const [key, value] of Object.entries(record)) {
|
||||||
@ -615,6 +649,10 @@ export class WorkspaceQueryRunnerService {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (key === '__typename') {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
sanitizedRecord[key] = value;
|
sanitizedRecord[key] = value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -28,23 +28,15 @@ export class WorkspaceDatasourceFactory {
|
|||||||
workspaceId: string,
|
workspaceId: string,
|
||||||
cacheVersion: string | null,
|
cacheVersion: string | null,
|
||||||
): Promise<WorkspaceDataSource> {
|
): Promise<WorkspaceDataSource> {
|
||||||
let dataSourceCacheVersion: string;
|
cacheVersion ??=
|
||||||
|
await this.workspaceCacheVersionService.getVersion(workspaceId);
|
||||||
|
|
||||||
if (cacheVersion) {
|
if (!cacheVersion) {
|
||||||
dataSourceCacheVersion = cacheVersion;
|
throw new Error('Cache version not found');
|
||||||
} else {
|
|
||||||
const cacheVersionFromDatabase =
|
|
||||||
await this.workspaceCacheVersionService.getVersion(workspaceId);
|
|
||||||
|
|
||||||
if (!cacheVersionFromDatabase) {
|
|
||||||
throw new Error('Cache version not found');
|
|
||||||
}
|
|
||||||
|
|
||||||
dataSourceCacheVersion = cacheVersionFromDatabase;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const workspaceDataSource = await workspaceDataSourceCacheInstance.execute(
|
const workspaceDataSource = await workspaceDataSourceCacheInstance.execute(
|
||||||
`${workspaceId}-${dataSourceCacheVersion}`,
|
`${workspaceId}-${cacheVersion}`,
|
||||||
async () => {
|
async () => {
|
||||||
const dataSourceMetadata =
|
const dataSourceMetadata =
|
||||||
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceId(
|
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceId(
|
||||||
@ -58,7 +50,7 @@ export class WorkspaceDatasourceFactory {
|
|||||||
const latestCacheVersion =
|
const latestCacheVersion =
|
||||||
await this.workspaceCacheVersionService.getVersion(workspaceId);
|
await this.workspaceCacheVersionService.getVersion(workspaceId);
|
||||||
|
|
||||||
if (latestCacheVersion !== dataSourceCacheVersion) {
|
if (latestCacheVersion !== cacheVersion) {
|
||||||
throw new Error('Cache version mismatch');
|
throw new Error('Cache version mismatch');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user