Add fields to database event settings (#12331)

Backend part of https://github.com/twentyhq/core-team-issues/issues/928

- Add fields to database event settings
- If not set, match all automated triggers with the right event name
- If set, event needs at least one updated field listened to be treated
This commit is contained in:
Thomas Trompette
2025-05-28 14:58:35 +02:00
committed by GitHub
parent ee00e2319e
commit 1144e210c5
7 changed files with 496 additions and 48 deletions

View File

@ -1,19 +1,20 @@
import { msg } from '@lingui/core/macro';
import { Relation } from 'typeorm';
import { FieldMetadataType } from 'twenty-shared/types';
import { Relation } from 'typeorm';
import { RelationType } from 'src/engine/metadata-modules/field-metadata/interfaces/relation-type.interface';
import { RelationOnDeleteAction } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity';
import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity';
import { WorkspaceEntity } from 'src/engine/twenty-orm/decorators/workspace-entity.decorator';
import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator';
import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is-system.decorator';
import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator';
import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator';
import { WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { STANDARD_OBJECT_ICONS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-icons';
import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator';
import { RelationOnDeleteAction } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity';
import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator';
import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator';
import { AutomatedTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings';
import { WorkflowWorkspaceEntity } from './workflow.workspace-entity';
@ -22,11 +23,6 @@ export enum AutomatedTriggerType {
CRON = 'CRON',
}
export type AutomatedTriggerSettings = {
pattern?: string;
eventName?: string;
};
@WorkspaceEntity({
standardId: STANDARD_OBJECT_IDS.workflowAutomatedTrigger,
namePlural: 'workflowAutomatedTriggers',

View File

@ -3,10 +3,10 @@ import { Injectable } from '@nestjs/common';
import { WorkspaceEntityManager } from 'src/engine/twenty-orm/entity-manager/workspace-entity-manager';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import {
AutomatedTriggerSettings,
AutomatedTriggerType,
WorkflowAutomatedTriggerWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
import { AutomatedTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings';
@Injectable()
export class AutomatedTriggerWorkspaceService {

View File

@ -0,0 +1,19 @@
export type BaseDatabaseEventTriggerSettings = {
eventName: string;
};
export type DatabaseEventTriggerSettings =
| BaseDatabaseEventTriggerSettings
| UpdateEventTriggerSettings;
export type UpdateEventTriggerSettings = BaseDatabaseEventTriggerSettings & {
fields: string[];
};
export type CronTriggerSettings = {
pattern: string;
};
export type AutomatedTriggerSettings =
| DatabaseEventTriggerSettings
| CronTriggerSettings;

View File

@ -1,26 +1,27 @@
import { InjectRepository } from '@nestjs/typeorm';
import { isDefined } from 'twenty-shared/utils';
import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
import { Repository } from 'typeorm';
import { isDefined } from 'twenty-shared/utils';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import {
AutomatedTriggerType,
WorkflowAutomatedTriggerWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { CronTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings';
import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils';
import {
WorkflowTriggerJob,
WorkflowTriggerJobData,
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job';
import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils';
export const CRON_TRIGGER_CRON_PATTERN = '* * * * *';
@ -58,11 +59,14 @@ export class CronTriggerCronJob {
});
for (const workflowAutomatedCronTrigger of workflowAutomatedCronTriggers) {
if (!isDefined(workflowAutomatedCronTrigger.settings.pattern)) {
const settings =
workflowAutomatedCronTrigger.settings as CronTriggerSettings;
if (!isDefined(settings.pattern)) {
continue;
}
if (!shouldRunNow(workflowAutomatedCronTrigger.settings.pattern, now)) {
if (!shouldRunNow(settings.pattern, now)) {
continue;
}

View File

@ -0,0 +1,370 @@
import { Test, TestingModule } from '@nestjs/testing';
import { FeatureFlagService } from 'src/engine/core-modules/feature-flag/services/feature-flag.service';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { AutomatedTriggerType } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener';
import { WorkflowTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job';
describe('DatabaseEventTriggerListener', () => {
let listener: DatabaseEventTriggerListener;
let twentyORMGlobalManager: jest.Mocked<TwentyORMGlobalManager>;
let messageQueueService: jest.Mocked<MessageQueueService>;
let featureFlagService: jest.Mocked<FeatureFlagService>;
const mockRepository = {
find: jest.fn(),
};
beforeEach(async () => {
twentyORMGlobalManager = {
getRepositoryForWorkspace: jest.fn().mockResolvedValue(mockRepository),
} as any;
messageQueueService = {
add: jest.fn(),
} as any;
featureFlagService = {
isFeatureEnabled: jest.fn().mockResolvedValue(true),
} as any;
const module: TestingModule = await Test.createTestingModule({
providers: [
DatabaseEventTriggerListener,
{
provide: TwentyORMGlobalManager,
useValue: twentyORMGlobalManager,
},
{
provide: MessageQueueService,
useValue: messageQueueService,
},
{
provide: FeatureFlagService,
useValue: featureFlagService,
},
{
provide: 'MESSAGE_QUEUE_workflow-queue',
useValue: messageQueueService,
},
{
provide: WorkflowCommonWorkspaceService,
useValue: {
getWorkflowById: jest.fn(),
getObjectMetadataItemWithFieldsMaps: jest.fn().mockResolvedValue({
objectMetadataMaps: {
byId: {
'test-object-metadata': {
nameSingular: 'testObject',
namePlural: 'testObjects',
},
},
},
objectMetadataItemWithFieldsMaps: {
fieldsByJoinColumnName: {},
},
}),
},
},
],
}).compile();
listener = module.get<DatabaseEventTriggerListener>(
DatabaseEventTriggerListener,
);
});
describe('handleObjectRecordUpdateEvent', () => {
const workspaceId = 'test-workspace';
const databaseEventName = 'testEvent';
const workflowId = 'test-workflow';
const mockPayload = {
workspaceId,
name: databaseEventName,
events: [
{
recordId: 'test-record',
objectMetadata: {
id: 'test-object-metadata',
workspaceId,
nameSingular: 'testObject',
namePlural: 'testObjects',
labelSingular: 'Test Object',
labelPlural: 'Test Objects',
description: 'Test object for testing',
targetTableName: 'test_objects',
isSystem: false,
isCustom: false,
isActive: true,
isRemote: false,
isAuditLogged: true,
isSearchable: true,
createdAt: new Date(),
updatedAt: new Date(),
fields: [],
relationships: [],
fromRelations: [],
toRelations: [],
indexMetadatas: [],
},
properties: {
updatedFields: ['field1', 'field2'],
before: { field1: 'old', field2: 'old' },
after: { field1: 'new', field2: 'new' },
},
},
],
};
const mockEventListeners = [
{
type: AutomatedTriggerType.DATABASE_EVENT,
workflowId,
settings: {
eventName: databaseEventName,
fields: ['field1', 'field3'],
},
},
];
it('should trigger workflow when fields are specified and match updated fields', async () => {
mockRepository.find.mockResolvedValue(mockEventListeners);
await listener.handleObjectRecordUpdateEvent(mockPayload);
expect(messageQueueService.add).toHaveBeenCalledWith(
WorkflowTriggerJob.name,
{
workspaceId,
workflowId,
payload: mockPayload.events[0],
},
{ retryLimit: 3 },
);
});
it('should trigger workflow when no fields are specified', async () => {
mockRepository.find.mockResolvedValue([
{
...mockEventListeners[0],
settings: {
eventName: databaseEventName,
fields: undefined,
},
},
]);
await listener.handleObjectRecordUpdateEvent(mockPayload);
expect(messageQueueService.add).toHaveBeenCalled();
});
it('should trigger workflow when fields array is empty', async () => {
mockRepository.find.mockResolvedValue([
{
...mockEventListeners[0],
settings: {
eventName: databaseEventName,
fields: [],
},
},
]);
await listener.handleObjectRecordUpdateEvent(mockPayload);
expect(messageQueueService.add).toHaveBeenCalled();
});
it('should not trigger workflow when fields are specified but none match updated fields', async () => {
mockRepository.find.mockResolvedValue([
{
...mockEventListeners[0],
settings: {
eventName: databaseEventName,
fields: ['field3', 'field4'],
},
},
]);
await listener.handleObjectRecordUpdateEvent(mockPayload);
expect(messageQueueService.add).not.toHaveBeenCalled();
});
it('should handle create events correctly', async () => {
const createPayload = {
...mockPayload,
name: 'createEvent',
events: [
{
...mockPayload.events[0],
properties: {
after: { field1: 'new', field2: 'new' },
},
},
],
};
mockRepository.find.mockResolvedValue([
{
type: AutomatedTriggerType.DATABASE_EVENT,
workflowId,
settings: {
eventName: 'createEvent',
},
},
]);
await listener.handleObjectRecordCreateEvent(createPayload);
expect(messageQueueService.add).toHaveBeenCalledWith(
WorkflowTriggerJob.name,
{
workspaceId,
workflowId,
payload: createPayload.events[0],
},
{ retryLimit: 3 },
);
});
it('should handle delete events correctly', async () => {
const deletePayload = {
...mockPayload,
name: 'deleteEvent',
events: [
{
...mockPayload.events[0],
properties: {
before: { field1: 'old', field2: 'old' },
},
},
],
};
mockRepository.find.mockResolvedValue([
{
type: AutomatedTriggerType.DATABASE_EVENT,
workflowId,
settings: {
eventName: 'deleteEvent',
},
},
]);
await listener.handleObjectRecordDeleteEvent(deletePayload);
expect(messageQueueService.add).toHaveBeenCalledWith(
WorkflowTriggerJob.name,
{
workspaceId,
workflowId,
payload: deletePayload.events[0],
},
{ retryLimit: 3 },
);
});
it('should handle destroy events correctly', async () => {
const destroyPayload = {
...mockPayload,
name: 'destroyEvent',
events: [
{
...mockPayload.events[0],
properties: {
before: { field1: 'old', field2: 'old' },
},
},
],
};
mockRepository.find.mockResolvedValue([
{
type: AutomatedTriggerType.DATABASE_EVENT,
workflowId,
settings: {
eventName: 'destroyEvent',
},
},
]);
await listener.handleObjectRecordDestroyEvent(destroyPayload);
expect(messageQueueService.add).toHaveBeenCalledWith(
WorkflowTriggerJob.name,
{
workspaceId,
workflowId,
payload: destroyPayload.events[0],
},
{ retryLimit: 3 },
);
});
it('should ignore events when feature flag is disabled', async () => {
featureFlagService.isFeatureEnabled.mockResolvedValueOnce(false);
await listener.handleObjectRecordUpdateEvent(mockPayload);
expect(messageQueueService.add).not.toHaveBeenCalled();
});
it('should handle multiple events in a batch', async () => {
const batchPayload = {
...mockPayload,
events: [
mockPayload.events[0],
{
...mockPayload.events[0],
recordId: 'test-record-2',
properties: {
updatedFields: ['field1'],
before: { field1: 'old' },
after: { field1: 'new' },
},
},
],
};
mockRepository.find.mockResolvedValue([
{
type: AutomatedTriggerType.DATABASE_EVENT,
workflowId,
settings: {
eventName: databaseEventName,
fields: ['field1'],
},
},
]);
await listener.handleObjectRecordUpdateEvent(batchPayload);
expect(messageQueueService.add).toHaveBeenCalledTimes(2);
expect(messageQueueService.add).toHaveBeenNthCalledWith(
1,
WorkflowTriggerJob.name,
{
workspaceId,
workflowId,
payload: batchPayload.events[0],
},
{ retryLimit: 3 },
);
expect(messageQueueService.add).toHaveBeenNthCalledWith(
2,
WorkflowTriggerJob.name,
{
workspaceId,
workflowId,
payload: batchPayload.events[1],
},
{ retryLimit: 3 },
);
});
});
});

View File

@ -1,12 +1,14 @@
import { Injectable, Logger } from '@nestjs/common';
import { isDefined } from 'twenty-shared/utils';
import { Raw } from 'typeorm';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/types/object-record-delete.event';
import { ObjectRecordDestroyEvent } from 'src/engine/core-modules/event-emitter/types/object-record-destroy.event';
import { ObjectRecordNonDestructiveEvent } from 'src/engine/core-modules/event-emitter/types/object-record-non-destructive-event';
import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-update.event';
import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum';
import { FeatureFlagService } from 'src/engine/core-modules/feature-flag/services/feature-flag.service';
@ -15,16 +17,16 @@ import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queu
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import {
WorkflowTriggerJob,
WorkflowTriggerJobData,
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job';
import { ObjectRecordNonDestructiveEvent } from 'src/engine/core-modules/event-emitter/types/object-record-non-destructive-event';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import {
AutomatedTriggerType,
WorkflowAutomatedTriggerWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { UpdateEventTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings';
import {
WorkflowTriggerJob,
WorkflowTriggerJobData,
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job';
@Injectable()
export class DatabaseEventTriggerListener {
@ -49,7 +51,10 @@ export class DatabaseEventTriggerListener {
const clonedPayload = structuredClone(payload);
await this.enrichCreatedEvent(clonedPayload);
await this.handleEvent(clonedPayload);
await this.handleEvent({
payload: clonedPayload,
action: DatabaseEventAction.CREATED,
});
}
@OnDatabaseBatchEvent('*', DatabaseEventAction.UPDATED)
@ -63,7 +68,11 @@ export class DatabaseEventTriggerListener {
const clonedPayload = structuredClone(payload);
await this.enrichUpdatedEvent(clonedPayload);
await this.handleEvent(clonedPayload);
await this.handleEvent({
payload: clonedPayload,
action: DatabaseEventAction.UPDATED,
});
}
@OnDatabaseBatchEvent('*', DatabaseEventAction.DELETED)
@ -77,7 +86,10 @@ export class DatabaseEventTriggerListener {
const clonedPayload = structuredClone(payload);
await this.enrichDeletedEvent(clonedPayload);
await this.handleEvent(clonedPayload);
await this.handleEvent({
payload: clonedPayload,
action: DatabaseEventAction.DELETED,
});
}
@OnDatabaseBatchEvent('*', DatabaseEventAction.DESTROYED)
@ -91,7 +103,10 @@ export class DatabaseEventTriggerListener {
const clonedPayload = structuredClone(payload);
await this.enrichDestroyedEvent(clonedPayload);
await this.handleEvent(clonedPayload);
await this.handleEvent({
payload: clonedPayload,
action: DatabaseEventAction.DESTROYED,
});
}
private async enrichCreatedEvent(
@ -224,37 +239,79 @@ export class DatabaseEventTriggerListener {
return !isWorkflowEnabled;
}
private async handleEvent(
payload: WorkspaceEventBatch<ObjectRecordNonDestructiveEvent>,
) {
private async handleEvent({
payload,
action,
}: {
payload: WorkspaceEventBatch<ObjectRecordNonDestructiveEvent>;
action: DatabaseEventAction;
}) {
const workspaceId = payload.workspaceId;
const databaseEventName = payload.name;
const automatedTriggerTableName = 'workflowAutomatedTrigger';
const workflowAutomatedTriggerRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowAutomatedTriggerWorkspaceEntity>(
workspaceId,
'workflowAutomatedTrigger',
automatedTriggerTableName,
);
const eventListeners = await workflowAutomatedTriggerRepository.find({
where: {
type: AutomatedTriggerType.DATABASE_EVENT,
settings: { eventName: databaseEventName },
settings: Raw(
() =>
`"${automatedTriggerTableName}"."settings"->>'eventName' = :eventName`,
{ eventName: databaseEventName },
),
},
});
for (const eventListener of eventListeners) {
for (const eventPayload of payload.events) {
await this.messageQueueService.add<WorkflowTriggerJobData>(
WorkflowTriggerJob.name,
{
workspaceId,
workflowId: eventListener.workflowId,
payload: eventPayload,
},
{ retryLimit: 3 },
);
const shouldTriggerJob = this.shouldTriggerJob({
eventPayload,
eventListener,
action,
});
if (shouldTriggerJob) {
await this.messageQueueService.add<WorkflowTriggerJobData>(
WorkflowTriggerJob.name,
{
workspaceId,
workflowId: eventListener.workflowId,
payload: eventPayload,
},
{ retryLimit: 3 },
);
}
}
}
}
private shouldTriggerJob({
eventPayload,
eventListener,
action,
}: {
eventPayload: ObjectRecordNonDestructiveEvent;
eventListener: WorkflowAutomatedTriggerWorkspaceEntity;
action: DatabaseEventAction;
}) {
if (action === DatabaseEventAction.UPDATED) {
const settings = eventListener.settings as UpdateEventTriggerSettings;
const updateEventPayload = eventPayload as ObjectRecordUpdateEvent;
return (
!settings.fields ||
settings.fields.length === 0 ||
settings.fields.some((field) =>
updateEventPayload?.properties?.updatedFields?.includes(field),
)
);
}
return true;
}
}

View File

@ -23,6 +23,7 @@ import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-ru
import { WORKFLOW_VERSION_STATUS_UPDATED } from 'src/modules/workflow/workflow-status/constants/workflow-version-status-updated.constants';
import { WorkflowVersionStatusUpdate } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job';
import { AutomatedTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service';
import { DatabaseEventTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings';
import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
@ -329,12 +330,13 @@ export class WorkflowTriggerWorkspaceService {
case WorkflowTriggerType.WEBHOOK:
return;
case WorkflowTriggerType.DATABASE_EVENT: {
const eventName = workflowVersion.trigger.settings.eventName;
const settings = workflowVersion.trigger
.settings as DatabaseEventTriggerSettings;
await this.automatedTriggerWorkspaceService.addAutomatedTrigger({
workflowId: workflowVersion.workflowId,
type: AutomatedTriggerType.DATABASE_EVENT,
settings: { eventName },
settings,
manager,
});