Add relations in database event trigger output data (#11820)
## Done - add relations in dropdown variables - add relations in worklfow run inputs - use objectMetadataMaps in workflow folder ## To do - does not work with rest api calls, will be fixed after https://github.com/twentyhq/twenty/pull/11349 is merged - waiting for crud action relation fields https://github.com/twentyhq/core-team-issues/issues/509
This commit is contained in:
@ -7,9 +7,14 @@ import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trig
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { CronTriggerCronCommand } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/commands/cron-trigger.cron.command';
|
||||
import { CronTriggerCronJob } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job';
|
||||
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
|
||||
|
||||
@Module({
|
||||
imports: [FeatureFlagModule, TypeOrmModule.forFeature([Workspace], 'core')],
|
||||
imports: [
|
||||
FeatureFlagModule,
|
||||
TypeOrmModule.forFeature([Workspace], 'core'),
|
||||
WorkflowCommonModule,
|
||||
],
|
||||
providers: [
|
||||
AutomatedTriggerWorkspaceService,
|
||||
DatabaseEventTriggerListener,
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
|
||||
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
|
||||
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
|
||||
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
|
||||
@ -17,6 +19,8 @@ import {
|
||||
WorkflowTriggerJob,
|
||||
WorkflowTriggerJobData,
|
||||
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job';
|
||||
import { ObjectRecordNonDestructiveEvent } from 'src/engine/core-modules/event-emitter/types/object-record-non-destructive-event';
|
||||
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
|
||||
import {
|
||||
AutomatedTriggerType,
|
||||
WorkflowAutomatedTriggerWorkspaceEntity,
|
||||
@ -31,43 +35,172 @@ export class DatabaseEventTriggerListener {
|
||||
@InjectMessageQueue(MessageQueue.workflowQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
private readonly isFeatureFlagEnabledService: FeatureFlagService,
|
||||
private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
|
||||
) {}
|
||||
|
||||
@OnDatabaseBatchEvent('*', DatabaseEventAction.CREATED)
|
||||
async handleObjectRecordCreateEvent(
|
||||
payload: WorkspaceEventBatch<ObjectRecordCreateEvent>,
|
||||
) {
|
||||
await this.handleEvent(payload);
|
||||
if (await this.shouldIgnoreEvent(payload)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const clonedPayload = structuredClone(payload);
|
||||
|
||||
await this.enrichCreatedEvent(clonedPayload);
|
||||
await this.handleEvent(clonedPayload);
|
||||
}
|
||||
|
||||
@OnDatabaseBatchEvent('*', DatabaseEventAction.UPDATED)
|
||||
async handleObjectRecordUpdateEvent(
|
||||
payload: WorkspaceEventBatch<ObjectRecordUpdateEvent>,
|
||||
) {
|
||||
await this.handleEvent(payload);
|
||||
if (await this.shouldIgnoreEvent(payload)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const clonedPayload = structuredClone(payload);
|
||||
|
||||
await this.enrichUpdatedEvent(clonedPayload);
|
||||
await this.handleEvent(clonedPayload);
|
||||
}
|
||||
|
||||
@OnDatabaseBatchEvent('*', DatabaseEventAction.DELETED)
|
||||
async handleObjectRecordDeleteEvent(
|
||||
payload: WorkspaceEventBatch<ObjectRecordDeleteEvent>,
|
||||
) {
|
||||
await this.handleEvent(payload);
|
||||
if (await this.shouldIgnoreEvent(payload)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const clonedPayload = structuredClone(payload);
|
||||
|
||||
await this.enrichDeletedEvent(clonedPayload);
|
||||
await this.handleEvent(clonedPayload);
|
||||
}
|
||||
|
||||
@OnDatabaseBatchEvent('*', DatabaseEventAction.DESTROYED)
|
||||
async handleObjectRecordDestroyEvent(
|
||||
payload: WorkspaceEventBatch<ObjectRecordDestroyEvent>,
|
||||
) {
|
||||
await this.handleEvent(payload);
|
||||
if (await this.shouldIgnoreEvent(payload)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const clonedPayload = structuredClone(payload);
|
||||
|
||||
await this.enrichDestroyedEvent(clonedPayload);
|
||||
await this.handleEvent(clonedPayload);
|
||||
}
|
||||
|
||||
private async handleEvent(
|
||||
payload: WorkspaceEventBatch<
|
||||
| ObjectRecordCreateEvent
|
||||
| ObjectRecordUpdateEvent
|
||||
| ObjectRecordDeleteEvent
|
||||
| ObjectRecordDestroyEvent
|
||||
>,
|
||||
private async enrichCreatedEvent(
|
||||
payload: WorkspaceEventBatch<ObjectRecordCreateEvent>,
|
||||
) {
|
||||
const workspaceId = payload.workspaceId;
|
||||
|
||||
for (const event of payload.events) {
|
||||
await this.enrichRecord({
|
||||
event,
|
||||
record: event.properties.after,
|
||||
workspaceId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async enrichUpdatedEvent(
|
||||
payload: WorkspaceEventBatch<ObjectRecordUpdateEvent>,
|
||||
) {
|
||||
const workspaceId = payload.workspaceId;
|
||||
|
||||
for (const event of payload.events) {
|
||||
await this.enrichRecord({
|
||||
event,
|
||||
record: event.properties.before,
|
||||
workspaceId,
|
||||
});
|
||||
await this.enrichRecord({
|
||||
event,
|
||||
record: event.properties.after,
|
||||
workspaceId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async enrichDeletedEvent(
|
||||
payload: WorkspaceEventBatch<ObjectRecordDeleteEvent>,
|
||||
) {
|
||||
const workspaceId = payload.workspaceId;
|
||||
|
||||
for (const event of payload.events) {
|
||||
await this.enrichRecord({
|
||||
event,
|
||||
record: event.properties.before,
|
||||
workspaceId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async enrichDestroyedEvent(
|
||||
payload: WorkspaceEventBatch<ObjectRecordDestroyEvent>,
|
||||
) {
|
||||
const workspaceId = payload.workspaceId;
|
||||
|
||||
for (const event of payload.events) {
|
||||
await this.enrichRecord({
|
||||
event,
|
||||
record: event.properties.before,
|
||||
workspaceId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async enrichRecord({
|
||||
event,
|
||||
record,
|
||||
workspaceId,
|
||||
}: {
|
||||
event: ObjectRecordNonDestructiveEvent;
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
record: Record<string, any>;
|
||||
workspaceId: string;
|
||||
}) {
|
||||
const { objectMetadataMaps, objectMetadataItemWithFieldsMaps } =
|
||||
await this.workflowCommonWorkspaceService.getObjectMetadataItemWithFieldsMaps(
|
||||
event.objectMetadata.nameSingular,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
const fieldsByJoinColumnName =
|
||||
objectMetadataItemWithFieldsMaps.fieldsByJoinColumnName;
|
||||
|
||||
for (const [joinColumn, joinField] of Object.entries(
|
||||
fieldsByJoinColumnName,
|
||||
)) {
|
||||
const joinRecordId = record[joinColumn];
|
||||
const relatedObjectMetadataId = joinField.relationTargetObjectMetadataId;
|
||||
|
||||
if (!isDefined(relatedObjectMetadataId)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const relatedObjectMetadataNameSingular =
|
||||
objectMetadataMaps.byId[relatedObjectMetadataId].nameSingular;
|
||||
|
||||
const relatedObjectRepository =
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace(
|
||||
workspaceId,
|
||||
relatedObjectMetadataNameSingular,
|
||||
);
|
||||
|
||||
record[joinField.name] = await relatedObjectRepository.findOne({
|
||||
where: { id: joinRecordId },
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async shouldIgnoreEvent(
|
||||
payload: WorkspaceEventBatch<ObjectRecordNonDestructiveEvent>,
|
||||
) {
|
||||
const workspaceId = payload.workspaceId;
|
||||
const databaseEventName = payload.name;
|
||||
@ -79,7 +212,7 @@ export class DatabaseEventTriggerListener {
|
||||
)}`,
|
||||
);
|
||||
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
|
||||
const isWorkflowEnabled =
|
||||
@ -88,9 +221,14 @@ export class DatabaseEventTriggerListener {
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (!isWorkflowEnabled) {
|
||||
return;
|
||||
}
|
||||
return !isWorkflowEnabled;
|
||||
}
|
||||
|
||||
private async handleEvent(
|
||||
payload: WorkspaceEventBatch<ObjectRecordNonDestructiveEvent>,
|
||||
) {
|
||||
const workspaceId = payload.workspaceId;
|
||||
const databaseEventName = payload.name;
|
||||
|
||||
const workflowAutomatedTriggerRepository =
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowAutomatedTriggerWorkspaceEntity>(
|
||||
@ -107,7 +245,7 @@ export class DatabaseEventTriggerListener {
|
||||
|
||||
for (const eventListener of eventListeners) {
|
||||
for (const eventPayload of payload.events) {
|
||||
this.messageQueueService.add<WorkflowTriggerJobData>(
|
||||
await this.messageQueueService.add<WorkflowTriggerJobData>(
|
||||
WorkflowTriggerJob.name,
|
||||
{
|
||||
workspaceId,
|
||||
|
||||
Reference in New Issue
Block a user