diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity.ts index af8ea106b..77fb9a307 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity.ts @@ -1,19 +1,20 @@ import { msg } from '@lingui/core/macro'; -import { Relation } from 'typeorm'; import { FieldMetadataType } from 'twenty-shared/types'; +import { Relation } from 'typeorm'; import { RelationType } from 'src/engine/metadata-modules/field-metadata/interfaces/relation-type.interface'; +import { RelationOnDeleteAction } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity'; import { WorkspaceEntity } from 'src/engine/twenty-orm/decorators/workspace-entity.decorator'; +import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator'; import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is-system.decorator'; +import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator'; +import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; +import { WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; import { STANDARD_OBJECT_ICONS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-icons'; import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; -import { WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; -import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; -import { RelationOnDeleteAction } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; -import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator'; -import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator'; +import { AutomatedTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings'; import { WorkflowWorkspaceEntity } from './workflow.workspace-entity'; @@ -22,11 +23,6 @@ export enum AutomatedTriggerType { CRON = 'CRON', } -export type AutomatedTriggerSettings = { - pattern?: string; - eventName?: string; -}; - @WorkspaceEntity({ standardId: STANDARD_OBJECT_IDS.workflowAutomatedTrigger, namePlural: 'workflowAutomatedTriggers', diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service.ts index a0b60e5cc..e67a25c07 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service.ts @@ -3,10 +3,10 @@ import { Injectable } from '@nestjs/common'; import { WorkspaceEntityManager } from 'src/engine/twenty-orm/entity-manager/workspace-entity-manager'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { - AutomatedTriggerSettings, AutomatedTriggerType, WorkflowAutomatedTriggerWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; +import { AutomatedTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings'; @Injectable() export class AutomatedTriggerWorkspaceService { diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings.ts new file mode 100644 index 000000000..e60fb71e9 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings.ts @@ -0,0 +1,19 @@ +export type BaseDatabaseEventTriggerSettings = { + eventName: string; +}; + +export type DatabaseEventTriggerSettings = + | BaseDatabaseEventTriggerSettings + | UpdateEventTriggerSettings; + +export type UpdateEventTriggerSettings = BaseDatabaseEventTriggerSettings & { + fields: string[]; +}; + +export type CronTriggerSettings = { + pattern: string; +}; + +export type AutomatedTriggerSettings = + | DatabaseEventTriggerSettings + | CronTriggerSettings; diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts index b69f1bead..8cf1727a2 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts @@ -1,26 +1,27 @@ import { InjectRepository } from '@nestjs/typeorm'; +import { isDefined } from 'twenty-shared/utils'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; import { Repository } from 'typeorm'; -import { isDefined } from 'twenty-shared/utils'; -import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; -import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; -import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { AutomatedTriggerType, WorkflowAutomatedTriggerWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; -import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; -import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; -import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { CronTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings'; +import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils'; import { WorkflowTriggerJob, WorkflowTriggerJobData, } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job'; -import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils'; export const CRON_TRIGGER_CRON_PATTERN = '* * * * *'; @@ -58,11 +59,14 @@ export class CronTriggerCronJob { }); for (const workflowAutomatedCronTrigger of workflowAutomatedCronTriggers) { - if (!isDefined(workflowAutomatedCronTrigger.settings.pattern)) { + const settings = + workflowAutomatedCronTrigger.settings as CronTriggerSettings; + + if (!isDefined(settings.pattern)) { continue; } - if (!shouldRunNow(workflowAutomatedCronTrigger.settings.pattern, now)) { + if (!shouldRunNow(settings.pattern, now)) { continue; } diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/listeners/__tests__/database-event-trigger.listener.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/listeners/__tests__/database-event-trigger.listener.spec.ts new file mode 100644 index 000000000..1d8886cdc --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/listeners/__tests__/database-event-trigger.listener.spec.ts @@ -0,0 +1,370 @@ +import { Test, TestingModule } from '@nestjs/testing'; + +import { FeatureFlagService } from 'src/engine/core-modules/feature-flag/services/feature-flag.service'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { AutomatedTriggerType } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; +import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; +import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener'; +import { WorkflowTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job'; + +describe('DatabaseEventTriggerListener', () => { + let listener: DatabaseEventTriggerListener; + let twentyORMGlobalManager: jest.Mocked; + let messageQueueService: jest.Mocked; + let featureFlagService: jest.Mocked; + + const mockRepository = { + find: jest.fn(), + }; + + beforeEach(async () => { + twentyORMGlobalManager = { + getRepositoryForWorkspace: jest.fn().mockResolvedValue(mockRepository), + } as any; + + messageQueueService = { + add: jest.fn(), + } as any; + + featureFlagService = { + isFeatureEnabled: jest.fn().mockResolvedValue(true), + } as any; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + DatabaseEventTriggerListener, + { + provide: TwentyORMGlobalManager, + useValue: twentyORMGlobalManager, + }, + { + provide: MessageQueueService, + useValue: messageQueueService, + }, + { + provide: FeatureFlagService, + useValue: featureFlagService, + }, + { + provide: 'MESSAGE_QUEUE_workflow-queue', + useValue: messageQueueService, + }, + { + provide: WorkflowCommonWorkspaceService, + useValue: { + getWorkflowById: jest.fn(), + getObjectMetadataItemWithFieldsMaps: jest.fn().mockResolvedValue({ + objectMetadataMaps: { + byId: { + 'test-object-metadata': { + nameSingular: 'testObject', + namePlural: 'testObjects', + }, + }, + }, + objectMetadataItemWithFieldsMaps: { + fieldsByJoinColumnName: {}, + }, + }), + }, + }, + ], + }).compile(); + + listener = module.get( + DatabaseEventTriggerListener, + ); + }); + + describe('handleObjectRecordUpdateEvent', () => { + const workspaceId = 'test-workspace'; + const databaseEventName = 'testEvent'; + const workflowId = 'test-workflow'; + + const mockPayload = { + workspaceId, + name: databaseEventName, + events: [ + { + recordId: 'test-record', + objectMetadata: { + id: 'test-object-metadata', + workspaceId, + nameSingular: 'testObject', + namePlural: 'testObjects', + labelSingular: 'Test Object', + labelPlural: 'Test Objects', + description: 'Test object for testing', + targetTableName: 'test_objects', + isSystem: false, + isCustom: false, + isActive: true, + isRemote: false, + isAuditLogged: true, + isSearchable: true, + createdAt: new Date(), + updatedAt: new Date(), + fields: [], + relationships: [], + fromRelations: [], + toRelations: [], + indexMetadatas: [], + }, + properties: { + updatedFields: ['field1', 'field2'], + before: { field1: 'old', field2: 'old' }, + after: { field1: 'new', field2: 'new' }, + }, + }, + ], + }; + + const mockEventListeners = [ + { + type: AutomatedTriggerType.DATABASE_EVENT, + workflowId, + settings: { + eventName: databaseEventName, + fields: ['field1', 'field3'], + }, + }, + ]; + + it('should trigger workflow when fields are specified and match updated fields', async () => { + mockRepository.find.mockResolvedValue(mockEventListeners); + + await listener.handleObjectRecordUpdateEvent(mockPayload); + + expect(messageQueueService.add).toHaveBeenCalledWith( + WorkflowTriggerJob.name, + { + workspaceId, + workflowId, + payload: mockPayload.events[0], + }, + { retryLimit: 3 }, + ); + }); + + it('should trigger workflow when no fields are specified', async () => { + mockRepository.find.mockResolvedValue([ + { + ...mockEventListeners[0], + settings: { + eventName: databaseEventName, + fields: undefined, + }, + }, + ]); + + await listener.handleObjectRecordUpdateEvent(mockPayload); + + expect(messageQueueService.add).toHaveBeenCalled(); + }); + + it('should trigger workflow when fields array is empty', async () => { + mockRepository.find.mockResolvedValue([ + { + ...mockEventListeners[0], + settings: { + eventName: databaseEventName, + fields: [], + }, + }, + ]); + + await listener.handleObjectRecordUpdateEvent(mockPayload); + + expect(messageQueueService.add).toHaveBeenCalled(); + }); + + it('should not trigger workflow when fields are specified but none match updated fields', async () => { + mockRepository.find.mockResolvedValue([ + { + ...mockEventListeners[0], + settings: { + eventName: databaseEventName, + fields: ['field3', 'field4'], + }, + }, + ]); + + await listener.handleObjectRecordUpdateEvent(mockPayload); + + expect(messageQueueService.add).not.toHaveBeenCalled(); + }); + + it('should handle create events correctly', async () => { + const createPayload = { + ...mockPayload, + name: 'createEvent', + events: [ + { + ...mockPayload.events[0], + properties: { + after: { field1: 'new', field2: 'new' }, + }, + }, + ], + }; + + mockRepository.find.mockResolvedValue([ + { + type: AutomatedTriggerType.DATABASE_EVENT, + workflowId, + settings: { + eventName: 'createEvent', + }, + }, + ]); + + await listener.handleObjectRecordCreateEvent(createPayload); + + expect(messageQueueService.add).toHaveBeenCalledWith( + WorkflowTriggerJob.name, + { + workspaceId, + workflowId, + payload: createPayload.events[0], + }, + { retryLimit: 3 }, + ); + }); + + it('should handle delete events correctly', async () => { + const deletePayload = { + ...mockPayload, + name: 'deleteEvent', + events: [ + { + ...mockPayload.events[0], + properties: { + before: { field1: 'old', field2: 'old' }, + }, + }, + ], + }; + + mockRepository.find.mockResolvedValue([ + { + type: AutomatedTriggerType.DATABASE_EVENT, + workflowId, + settings: { + eventName: 'deleteEvent', + }, + }, + ]); + + await listener.handleObjectRecordDeleteEvent(deletePayload); + + expect(messageQueueService.add).toHaveBeenCalledWith( + WorkflowTriggerJob.name, + { + workspaceId, + workflowId, + payload: deletePayload.events[0], + }, + { retryLimit: 3 }, + ); + }); + + it('should handle destroy events correctly', async () => { + const destroyPayload = { + ...mockPayload, + name: 'destroyEvent', + events: [ + { + ...mockPayload.events[0], + properties: { + before: { field1: 'old', field2: 'old' }, + }, + }, + ], + }; + + mockRepository.find.mockResolvedValue([ + { + type: AutomatedTriggerType.DATABASE_EVENT, + workflowId, + settings: { + eventName: 'destroyEvent', + }, + }, + ]); + + await listener.handleObjectRecordDestroyEvent(destroyPayload); + + expect(messageQueueService.add).toHaveBeenCalledWith( + WorkflowTriggerJob.name, + { + workspaceId, + workflowId, + payload: destroyPayload.events[0], + }, + { retryLimit: 3 }, + ); + }); + + it('should ignore events when feature flag is disabled', async () => { + featureFlagService.isFeatureEnabled.mockResolvedValueOnce(false); + + await listener.handleObjectRecordUpdateEvent(mockPayload); + + expect(messageQueueService.add).not.toHaveBeenCalled(); + }); + + it('should handle multiple events in a batch', async () => { + const batchPayload = { + ...mockPayload, + events: [ + mockPayload.events[0], + { + ...mockPayload.events[0], + recordId: 'test-record-2', + properties: { + updatedFields: ['field1'], + before: { field1: 'old' }, + after: { field1: 'new' }, + }, + }, + ], + }; + + mockRepository.find.mockResolvedValue([ + { + type: AutomatedTriggerType.DATABASE_EVENT, + workflowId, + settings: { + eventName: databaseEventName, + fields: ['field1'], + }, + }, + ]); + + await listener.handleObjectRecordUpdateEvent(batchPayload); + + expect(messageQueueService.add).toHaveBeenCalledTimes(2); + expect(messageQueueService.add).toHaveBeenNthCalledWith( + 1, + WorkflowTriggerJob.name, + { + workspaceId, + workflowId, + payload: batchPayload.events[0], + }, + { retryLimit: 3 }, + ); + expect(messageQueueService.add).toHaveBeenNthCalledWith( + 2, + WorkflowTriggerJob.name, + { + workspaceId, + workflowId, + payload: batchPayload.events[1], + }, + { retryLimit: 3 }, + ); + }); + }); +}); diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener.ts index e3ae63f16..755f164e8 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener.ts @@ -1,12 +1,14 @@ import { Injectable, Logger } from '@nestjs/common'; import { isDefined } from 'twenty-shared/utils'; +import { Raw } from 'typeorm'; import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event'; import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/types/object-record-delete.event'; import { ObjectRecordDestroyEvent } from 'src/engine/core-modules/event-emitter/types/object-record-destroy.event'; +import { ObjectRecordNonDestructiveEvent } from 'src/engine/core-modules/event-emitter/types/object-record-non-destructive-event'; import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-update.event'; import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; import { FeatureFlagService } from 'src/engine/core-modules/feature-flag/services/feature-flag.service'; @@ -15,16 +17,16 @@ import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queu import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type'; -import { - WorkflowTriggerJob, - WorkflowTriggerJobData, -} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job'; -import { ObjectRecordNonDestructiveEvent } from 'src/engine/core-modules/event-emitter/types/object-record-non-destructive-event'; -import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; import { AutomatedTriggerType, WorkflowAutomatedTriggerWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; +import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; +import { UpdateEventTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings'; +import { + WorkflowTriggerJob, + WorkflowTriggerJobData, +} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job'; @Injectable() export class DatabaseEventTriggerListener { @@ -49,7 +51,10 @@ export class DatabaseEventTriggerListener { const clonedPayload = structuredClone(payload); await this.enrichCreatedEvent(clonedPayload); - await this.handleEvent(clonedPayload); + await this.handleEvent({ + payload: clonedPayload, + action: DatabaseEventAction.CREATED, + }); } @OnDatabaseBatchEvent('*', DatabaseEventAction.UPDATED) @@ -63,7 +68,11 @@ export class DatabaseEventTriggerListener { const clonedPayload = structuredClone(payload); await this.enrichUpdatedEvent(clonedPayload); - await this.handleEvent(clonedPayload); + + await this.handleEvent({ + payload: clonedPayload, + action: DatabaseEventAction.UPDATED, + }); } @OnDatabaseBatchEvent('*', DatabaseEventAction.DELETED) @@ -77,7 +86,10 @@ export class DatabaseEventTriggerListener { const clonedPayload = structuredClone(payload); await this.enrichDeletedEvent(clonedPayload); - await this.handleEvent(clonedPayload); + await this.handleEvent({ + payload: clonedPayload, + action: DatabaseEventAction.DELETED, + }); } @OnDatabaseBatchEvent('*', DatabaseEventAction.DESTROYED) @@ -91,7 +103,10 @@ export class DatabaseEventTriggerListener { const clonedPayload = structuredClone(payload); await this.enrichDestroyedEvent(clonedPayload); - await this.handleEvent(clonedPayload); + await this.handleEvent({ + payload: clonedPayload, + action: DatabaseEventAction.DESTROYED, + }); } private async enrichCreatedEvent( @@ -224,37 +239,79 @@ export class DatabaseEventTriggerListener { return !isWorkflowEnabled; } - private async handleEvent( - payload: WorkspaceEventBatch, - ) { + private async handleEvent({ + payload, + action, + }: { + payload: WorkspaceEventBatch; + action: DatabaseEventAction; + }) { const workspaceId = payload.workspaceId; const databaseEventName = payload.name; + const automatedTriggerTableName = 'workflowAutomatedTrigger'; const workflowAutomatedTriggerRepository = await this.twentyORMGlobalManager.getRepositoryForWorkspace( workspaceId, - 'workflowAutomatedTrigger', + automatedTriggerTableName, ); const eventListeners = await workflowAutomatedTriggerRepository.find({ where: { type: AutomatedTriggerType.DATABASE_EVENT, - settings: { eventName: databaseEventName }, + settings: Raw( + () => + `"${automatedTriggerTableName}"."settings"->>'eventName' = :eventName`, + { eventName: databaseEventName }, + ), }, }); for (const eventListener of eventListeners) { for (const eventPayload of payload.events) { - await this.messageQueueService.add( - WorkflowTriggerJob.name, - { - workspaceId, - workflowId: eventListener.workflowId, - payload: eventPayload, - }, - { retryLimit: 3 }, - ); + const shouldTriggerJob = this.shouldTriggerJob({ + eventPayload, + eventListener, + action, + }); + + if (shouldTriggerJob) { + await this.messageQueueService.add( + WorkflowTriggerJob.name, + { + workspaceId, + workflowId: eventListener.workflowId, + payload: eventPayload, + }, + { retryLimit: 3 }, + ); + } } } } + + private shouldTriggerJob({ + eventPayload, + eventListener, + action, + }: { + eventPayload: ObjectRecordNonDestructiveEvent; + eventListener: WorkflowAutomatedTriggerWorkspaceEntity; + action: DatabaseEventAction; + }) { + if (action === DatabaseEventAction.UPDATED) { + const settings = eventListener.settings as UpdateEventTriggerSettings; + const updateEventPayload = eventPayload as ObjectRecordUpdateEvent; + + return ( + !settings.fields || + settings.fields.length === 0 || + settings.fields.some((field) => + updateEventPayload?.properties?.updatedFields?.includes(field), + ) + ); + } + + return true; + } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts index 3607a5b9c..4264bdfe4 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts @@ -23,6 +23,7 @@ import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-ru import { WORKFLOW_VERSION_STATUS_UPDATED } from 'src/modules/workflow/workflow-status/constants/workflow-version-status-updated.constants'; import { WorkflowVersionStatusUpdate } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; import { AutomatedTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service'; +import { DatabaseEventTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings'; import { WorkflowTriggerException, WorkflowTriggerExceptionCode, @@ -329,12 +330,13 @@ export class WorkflowTriggerWorkspaceService { case WorkflowTriggerType.WEBHOOK: return; case WorkflowTriggerType.DATABASE_EVENT: { - const eventName = workflowVersion.trigger.settings.eventName; + const settings = workflowVersion.trigger + .settings as DatabaseEventTriggerSettings; await this.automatedTriggerWorkspaceService.addAutomatedTrigger({ workflowId: workflowVersion.workflowId, type: AutomatedTriggerType.DATABASE_EVENT, - settings: { eventName }, + settings, manager, });