From 849a35955a8dc332ac94659f2df27ca1f3ba82fd Mon Sep 17 00:00:00 2001 From: martmull Date: Wed, 30 Apr 2025 17:08:47 +0200 Subject: [PATCH] 866 refactor cron trigger only one cron each minutes triggers all cron triggers (#11809) image --- package.json | 1 + ...listeners-to-automated-triggers.command.ts | 72 ++++++++++++++ .../0-53-upgrade-version-command.module.ts | 16 ++++ .../upgrade-version-command.module.ts | 2 + .../upgrade.command.ts | 15 ++- .../constants/standard-field-ids.ts | 7 ++ .../constants/standard-object-icons.ts | 1 + .../constants/standard-object-ids.ts | 1 + .../standard-objects/index.ts | 2 + ...flow-automated-trigger.workspace-entity.ts | 84 +++++++++++++++++ .../workflow.workspace-entity.ts | 12 +++ .../workflow-common.workspace-service.ts | 10 ++ .../automated-trigger.module.ts | 21 +++++ .../automated-trigger.workspace-service.ts | 94 +++++++++++++++++++ .../commands/cron-trigger.cron.command.ts | 34 +++++++ .../crons/jobs/cron-trigger.cron.job.ts | 81 ++++++++++++++++ .../crons/utils/should-run-now.utils.ts | 20 ++++ .../utils/tests/should-run-now.utils.spec.ts | 47 ++++++++++ .../database-event-trigger.listener.ts | 40 ++++++-- .../database-event-trigger.module.ts | 12 --- .../database-event-trigger.service.ts | 50 ---------- .../workflow-trigger.module.ts | 4 +- .../workflow-trigger.workspace-service.ts | 66 +++++-------- .../content/developers/self-hosting/setup.mdx | 2 + yarn.lock | 17 ++++ 25 files changed, 595 insertions(+), 116 deletions(-) create mode 100644 packages/twenty-server/src/database/commands/upgrade-version-command/0-53/0-53-migrate-workflow-event-listeners-to-automated-triggers.command.ts create mode 100644 packages/twenty-server/src/database/commands/upgrade-version-command/0-53/0-53-upgrade-version-command.module.ts create mode 100644 packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.module.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/commands/cron-trigger.cron.command.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/tests/should-run-now.utils.spec.ts rename packages/twenty-server/src/modules/workflow/workflow-trigger/{database-event-trigger => automated-trigger}/listeners/database-event-trigger.listener.ts (78%) delete mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.module.ts delete mode 100644 packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service.ts diff --git a/package.json b/package.json index 465008015..fb4070851 100644 --- a/package.json +++ b/package.json @@ -77,6 +77,7 @@ "bytes": "^3.1.2", "class-transformer": "^0.5.1", "clsx": "^2.1.1", + "cron-parser": "^5.1.1", "cron-validate": "^1.4.5", "cross-env": "^7.0.3", "css-loader": "^7.1.2", diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/0-53/0-53-migrate-workflow-event-listeners-to-automated-triggers.command.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/0-53/0-53-migrate-workflow-event-listeners-to-automated-triggers.command.ts new file mode 100644 index 000000000..8c8c2cf4d --- /dev/null +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/0-53/0-53-migrate-workflow-event-listeners-to-automated-triggers.command.ts @@ -0,0 +1,72 @@ +import { InjectRepository } from '@nestjs/typeorm'; + +import { Command } from 'nest-commander'; +import { Repository } from 'typeorm'; + +import { + ActiveOrSuspendedWorkspacesMigrationCommandRunner, + RunOnWorkspaceArgs, +} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { + AutomatedTriggerType, + WorkflowAutomatedTriggerWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; +import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; + +@Command({ + name: 'upgrade:0-53:migrate-workflow-event-listeners-to-automated-triggers', + description: 'Migrate workflow event listeners to automated triggers', +}) +export class MigrateWorkflowEventListenersToAutomatedTriggersCommand extends ActiveOrSuspendedWorkspacesMigrationCommandRunner { + constructor( + @InjectRepository(Workspace, 'core') + protected readonly workspaceRepository: Repository, + protected readonly twentyORMGlobalManager: TwentyORMGlobalManager, + ) { + super(workspaceRepository, twentyORMGlobalManager); + } + + override async runOnWorkspace({ + index, + total, + workspaceId, + }: RunOnWorkspaceArgs): Promise { + this.logger.log( + `Running command for workspace ${workspaceId} ${index + 1}/${total}`, + ); + + const workflowEventListenerRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowEventListener', + ); + + const workflowAutomatedTriggerRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowAutomatedTrigger', + ); + + const workflowEventListeners = await workflowEventListenerRepository.find(); + + await workflowAutomatedTriggerRepository.delete({ + type: AutomatedTriggerType.DATABASE_EVENT, + }); + + for (const eventListener of workflowEventListeners) { + const { eventName, ...rest } = eventListener; + + await workflowAutomatedTriggerRepository.save({ + ...rest, + type: AutomatedTriggerType.DATABASE_EVENT, + settings: { eventName }, + }); + } + + this.logger.log( + `Migrated ${workflowEventListeners.length} workflow event listeners to automated triggers`, + ); + } +} diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/0-53/0-53-upgrade-version-command.module.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/0-53/0-53-upgrade-version-command.module.ts new file mode 100644 index 000000000..4fc7fb438 --- /dev/null +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/0-53/0-53-upgrade-version-command.module.ts @@ -0,0 +1,16 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; + +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; +import { MigrateWorkflowEventListenersToAutomatedTriggersCommand } from 'src/database/commands/upgrade-version-command/0-53/0-53-migrate-workflow-event-listeners-to-automated-triggers.command'; + +@Module({ + imports: [ + TypeOrmModule.forFeature([Workspace], 'core'), + WorkspaceDataSourceModule, + ], + providers: [MigrateWorkflowEventListenersToAutomatedTriggersCommand], + exports: [MigrateWorkflowEventListenersToAutomatedTriggersCommand], +}) +export class V0_53_UpgradeVersionCommandModule {} diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade-version-command.module.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade-version-command.module.ts index c23a37e21..21ae5aae8 100644 --- a/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade-version-command.module.ts +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade-version-command.module.ts @@ -6,6 +6,7 @@ import { V0_44_UpgradeVersionCommandModule } from 'src/database/commands/upgrade import { V0_50_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-50/0-50-upgrade-version-command.module'; import { V0_51_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-51/0-51-upgrade-version-command.module'; import { V0_52_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-52/0-52-upgrade-version-command.module'; +import { V0_53_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-53/0-53-upgrade-version-command.module'; import { UpgradeCommand } from 'src/database/commands/upgrade-version-command/upgrade.command'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { WorkspaceSyncMetadataModule } from 'src/engine/workspace-manager/workspace-sync-metadata/workspace-sync-metadata.module'; @@ -18,6 +19,7 @@ import { WorkspaceSyncMetadataModule } from 'src/engine/workspace-manager/worksp V0_50_UpgradeVersionCommandModule, V0_51_UpgradeVersionCommandModule, V0_52_UpgradeVersionCommandModule, + V0_53_UpgradeVersionCommandModule, WorkspaceSyncMetadataModule, ], providers: [UpgradeCommand], diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts index 7e2da8ac6..59deff412 100644 --- a/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts @@ -23,6 +23,7 @@ import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twent import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { SyncWorkspaceMetadataCommand } from 'src/engine/workspace-manager/workspace-sync-metadata/commands/sync-workspace-metadata.command'; +import { MigrateWorkflowEventListenersToAutomatedTriggersCommand } from 'src/database/commands/upgrade-version-command/0-53/0-53-migrate-workflow-event-listeners-to-automated-triggers.command'; type VersionCommands = { beforeSyncMetadata: ActiveOrSuspendedWorkspacesMigrationCommandRunner[]; @@ -60,6 +61,9 @@ export class UpgradeCommand extends UpgradeCommandRunner { // 0.52 Commands protected readonly upgradeDateAndDateTimeFieldsSettingsJsonCommand: UpgradeDateAndDateTimeFieldsSettingsJsonCommand, protected readonly migrateRelationsToFieldMetadataCommand: MigrateRelationsToFieldMetadataCommand, + + // 0.53 Commands + protected readonly migrateWorkflowEventListenersToAutomatedTriggersCommand: MigrateWorkflowEventListenersToAutomatedTriggersCommand, ) { super( workspaceRepository, @@ -93,7 +97,7 @@ export class UpgradeCommand extends UpgradeCommandRunner { afterSyncMetadata: [], }; - const commands_051: VersionCommands = { + const _commands_051: VersionCommands = { beforeSyncMetadata: [this.upgradeCreatedByEnumCommand], afterSyncMetadata: [], }; @@ -106,7 +110,14 @@ export class UpgradeCommand extends UpgradeCommandRunner { afterSyncMetadata: [], }; - this.commands = commands_051; + const commands_053: VersionCommands = { + beforeSyncMetadata: [], + afterSyncMetadata: [ + this.migrateWorkflowEventListenersToAutomatedTriggersCommand, + ], + }; + + this.commands = commands_053; } override async runBeforeSyncMetadata(args: RunOnWorkspaceArgs) { diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index 809ffa523..ef8427729 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -458,6 +458,12 @@ export const WORKFLOW_EVENT_LISTENER_STANDARD_FIELD_IDS = { workflow: '20202020-4082-4641-8569-dc08d5365002', }; +export const WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS = { + type: '20202020-3319-4234-a34c-3f92c1ab56e7', + settings: '20202020-3319-4234-a34c-bac8f903de12', + workflow: '20202020-3319-4234-a34c-8e1a4d2f7c03', +}; + export const WORKFLOW_STANDARD_FIELD_IDS = { name: '20202020-b3d3-478f-acc0-5d901e725b20', lastPublishedVersionId: '20202020-326a-4fba-8639-3456c0a169e8', @@ -466,6 +472,7 @@ export const WORKFLOW_STANDARD_FIELD_IDS = { versions: '20202020-9432-416e-8f3c-27ee3153d099', runs: '20202020-759b-4340-b58b-e73595c4df4f', eventListeners: '20202020-0229-4c66-832e-035c67579a38', + automatedTriggers: '20202020-3319-4234-a34c-117ecad2b8a9', favorites: '20202020-c554-4c41-be7a-cf9cd4b0d512', timelineActivities: '20202020-906e-486a-a798-131a5f081faf', createdBy: '20202020-6007-401a-8aa5-e6f48581a6f3', diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-icons.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-icons.ts index f02f178c5..3df2319a2 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-icons.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-icons.ts @@ -38,5 +38,6 @@ export const STANDARD_OBJECT_ICONS = { workflowEventListener: 'IconSettingsAutomation', workflowRun: 'IconHistoryToggle', workflowVersion: 'IconVersions', + workflowAutomatedTrigger: 'IconSettingsAutomation', workspaceMember: 'IconUserCircle', }; diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts index c5ad0f69d..056ce5d1d 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids.ts @@ -49,4 +49,5 @@ export const STANDARD_OBJECT_IDS = { workflowRun: '20202020-4e28-4e95-a9d7-6c00874f843c', workflowVersion: '20202020-d65d-4ab9-9344-d77bfb376a3d', workspaceMember: '20202020-3319-4234-a34c-82d5c0e881a6', + workflowAutomatedTrigger: '20202020-3319-4234-a34c-7f3b9d2e4d1f', }; diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts index e447739e6..b162cfffd 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/standard-objects/index.ts @@ -36,6 +36,7 @@ import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; +import { WorkflowAutomatedTriggerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; // TODO: Maybe we should automate this with the DiscoverService of Nest.JS export const standardObjectMetadataDefinitions = [ @@ -64,6 +65,7 @@ export const standardObjectMetadataDefinitions = [ WorkflowEventListenerWorkspaceEntity, WorkflowVersionWorkspaceEntity, WorkflowRunWorkspaceEntity, + WorkflowAutomatedTriggerWorkspaceEntity, WorkspaceMemberWorkspaceEntity, MessageThreadWorkspaceEntity, MessageWorkspaceEntity, diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity.ts new file mode 100644 index 000000000..af8ea106b --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity.ts @@ -0,0 +1,84 @@ +import { msg } from '@lingui/core/macro'; +import { Relation } from 'typeorm'; +import { FieldMetadataType } from 'twenty-shared/types'; + +import { RelationType } from 'src/engine/metadata-modules/field-metadata/interfaces/relation-type.interface'; + +import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity'; +import { WorkspaceEntity } from 'src/engine/twenty-orm/decorators/workspace-entity.decorator'; +import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is-system.decorator'; +import { STANDARD_OBJECT_ICONS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-icons'; +import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids'; +import { WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; +import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; +import { RelationOnDeleteAction } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity'; +import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator'; +import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator'; + +import { WorkflowWorkspaceEntity } from './workflow.workspace-entity'; + +export enum AutomatedTriggerType { + DATABASE_EVENT = 'DATABASE_EVENT', + CRON = 'CRON', +} + +export type AutomatedTriggerSettings = { + pattern?: string; + eventName?: string; +}; + +@WorkspaceEntity({ + standardId: STANDARD_OBJECT_IDS.workflowAutomatedTrigger, + namePlural: 'workflowAutomatedTriggers', + labelSingular: msg`WorkflowAutomatedTrigger`, + labelPlural: msg`WorkflowAutomatedTriggers`, + description: msg`A workflow automated trigger`, + icon: STANDARD_OBJECT_ICONS.workflowAutomatedTrigger, +}) +@WorkspaceIsSystem() +export class WorkflowAutomatedTriggerWorkspaceEntity extends BaseWorkspaceEntity { + @WorkspaceField({ + standardId: WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS.type, + type: FieldMetadataType.SELECT, + label: msg`Automated Trigger Type`, + description: msg`The workflow automated trigger type`, + options: [ + { + value: AutomatedTriggerType.DATABASE_EVENT, + label: 'Database Event', + position: 0, + color: 'green', + }, + { + value: AutomatedTriggerType.CRON, + label: 'Cron', + position: 1, + color: 'blue', + }, + ], + }) + type: AutomatedTriggerType; + + @WorkspaceField({ + standardId: WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS.settings, + type: FieldMetadataType.RAW_JSON, + label: msg`Settings`, + description: msg`The workflow automated trigger settings`, + }) + settings: AutomatedTriggerSettings; + + @WorkspaceRelation({ + standardId: WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS.workflow, + type: RelationType.MANY_TO_ONE, + label: msg`Workflow`, + description: msg`WorkflowAutomatedTrigger workflow`, + icon: 'IconSettingsAutomation', + inverseSideTarget: () => WorkflowWorkspaceEntity, + inverseSideFieldKey: 'automatedTriggers', + onDelete: RelationOnDeleteAction.CASCADE, + }) + workflow: Relation; + + @WorkspaceJoinColumn('workflow') + workflowId: string; +} diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts index 3ceccbff4..87e09ce37 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow.workspace-entity.ts @@ -21,6 +21,7 @@ import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-o import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { WorkflowAutomatedTriggerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; export enum WorkflowStatus { DRAFT = 'DRAFT', @@ -135,6 +136,17 @@ export class WorkflowWorkspaceEntity extends BaseWorkspaceEntity { @WorkspaceIsSystem() eventListeners: Relation; + @WorkspaceRelation({ + standardId: WORKFLOW_STANDARD_FIELD_IDS.automatedTriggers, + type: RelationType.ONE_TO_MANY, + label: msg`Automated Triggers`, + description: msg`Workflow automated triggers linked to the workflow.`, + inverseSideTarget: () => WorkflowAutomatedTriggerWorkspaceEntity, + onDelete: RelationOnDeleteAction.CASCADE, + }) + @WorkspaceIsSystem() + automatedTriggers: Relation; + @WorkspaceRelation({ standardId: WORKFLOW_STANDARD_FIELD_IDS.favorites, type: RelationType.ONE_TO_MANY, diff --git a/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-common.workspace-service.ts b/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-common.workspace-service.ts index 665e085a9..00c2c8f64 100644 --- a/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-common.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-common.workspace-service.ts @@ -19,6 +19,7 @@ import { WorkflowTriggerException, WorkflowTriggerExceptionCode, } from 'src/modules/workflow/workflow-trigger/exceptions/workflow-trigger.exception'; +import { WorkflowAutomatedTriggerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; @Injectable() export class WorkflowCommonWorkspaceService { @@ -141,11 +142,20 @@ export class WorkflowCommonWorkspaceService { 'workflowEventListener', ); + const workflowAutomatedTriggerRepository = + await this.twentyORMManager.getRepository( + 'workflowAutomatedTrigger', + ); + workflowIds.forEach((workflowId) => { workflowEventListenerRepository.softDelete({ workflowId, }); + workflowAutomatedTriggerRepository.softDelete({ + workflowId, + }); + workflowRunRepository.softDelete({ workflowId, }); diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.module.ts new file mode 100644 index 000000000..ac4ffc5f4 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.module.ts @@ -0,0 +1,21 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; + +import { AutomatedTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service'; +import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module'; +import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { CronTriggerCronCommand } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/commands/cron-trigger.cron.command'; +import { CronTriggerCronJob } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job'; + +@Module({ + imports: [FeatureFlagModule, TypeOrmModule.forFeature([Workspace], 'core')], + providers: [ + AutomatedTriggerWorkspaceService, + DatabaseEventTriggerListener, + CronTriggerCronJob, + CronTriggerCronCommand, + ], + exports: [AutomatedTriggerWorkspaceService], +}) +export class AutomatedTriggerModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service.ts new file mode 100644 index 000000000..a4f9f48b9 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service.ts @@ -0,0 +1,94 @@ +import { Injectable } from '@nestjs/common'; + +import { EntityManager } from 'typeorm'; + +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; +import { + AutomatedTriggerType, + AutomatedTriggerSettings, + WorkflowAutomatedTriggerWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; + +@Injectable() +export class AutomatedTriggerWorkspaceService { + constructor(private readonly twentyORMManager: TwentyORMManager) {} + + async addAutomatedTrigger({ + workflowId, + manager, + type, + settings, + }: { + workflowId: string; + manager: EntityManager; + type: AutomatedTriggerType; + settings: AutomatedTriggerSettings; + }) { + if (type === AutomatedTriggerType.DATABASE_EVENT) { + // Todo: remove workflowEventListenerRepository updates when data are migrated to workflowAutomatedTrigger + const workflowEventListenerRepository = + await this.twentyORMManager.getRepository( + 'workflowEventListener', + ); + + const workflowEventListener = workflowEventListenerRepository.create({ + workflowId, + eventName: settings.eventName, + }); + + await workflowEventListenerRepository.save( + workflowEventListener, + {}, + manager, + ); + // end-Todo + } + + const workflowAutomatedTriggerRepository = + await this.twentyORMManager.getRepository( + 'workflowAutomatedTrigger', + ); + + const workflowAutomatedTrigger = workflowAutomatedTriggerRepository.create({ + type, + settings, + workflowId, + }); + + await workflowAutomatedTriggerRepository.save( + workflowAutomatedTrigger, + {}, + manager, + ); + } + + async deleteAutomatedTrigger({ + workflowId, + manager, + }: { + workflowId: string; + manager: EntityManager; + }) { + // Todo: remove workflowEventListenerRepository updates when data are migrated to workflowAutomatedTrigger + const workflowEventListenerRepository = + await this.twentyORMManager.getRepository( + 'workflowEventListener', + ); + + await workflowEventListenerRepository.delete( + { + workflowId, + }, + manager, + ); + // end-Todo + + const workflowAutomatedTriggerRepository = + await this.twentyORMManager.getRepository( + 'workflowAutomatedTrigger', + ); + + await workflowAutomatedTriggerRepository.delete({ workflowId }, manager); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/commands/cron-trigger.cron.command.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/commands/cron-trigger.cron.command.ts new file mode 100644 index 000000000..aa00ffd53 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/commands/cron-trigger.cron.command.ts @@ -0,0 +1,34 @@ +import { Command, CommandRunner } from 'nest-commander'; + +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { + CRON_TRIGGER_CRON_PATTERN, + CronTriggerCronJob, +} from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job'; + +@Command({ + name: 'cron:workflow:automated-cron-trigger', + description: 'Starts a cron job to trigger cron triggered workflows', +}) +export class CronTriggerCronCommand extends CommandRunner { + constructor( + @InjectMessageQueue(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron({ + jobName: CronTriggerCronJob.name, + data: undefined, + options: { + repeat: { + pattern: CRON_TRIGGER_CRON_PATTERN, + }, + }, + }); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts new file mode 100644 index 000000000..b69f1bead --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts @@ -0,0 +1,81 @@ +import { InjectRepository } from '@nestjs/typeorm'; + +import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; +import { Repository } from 'typeorm'; +import { isDefined } from 'twenty-shared/utils'; + +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { + AutomatedTriggerType, + WorkflowAutomatedTriggerWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { + WorkflowTriggerJob, + WorkflowTriggerJobData, +} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job'; +import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils'; + +export const CRON_TRIGGER_CRON_PATTERN = '* * * * *'; + +@Processor(MessageQueue.cronQueue) +export class CronTriggerCronJob { + constructor( + @InjectRepository(Workspace, 'core') + private readonly workspaceRepository: Repository, + @InjectMessageQueue(MessageQueue.workflowQueue) + private readonly messageQueueService: MessageQueueService, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + ) {} + + @Process(CronTriggerCronJob.name) + @SentryCronMonitor(CronTriggerCronJob.name, CRON_TRIGGER_CRON_PATTERN) + async handle() { + const activeWorkspaces = await this.workspaceRepository.find({ + where: { + activationStatus: WorkspaceActivationStatus.ACTIVE, + }, + }); + + const now = new Date(); + + for (const activeWorkspace of activeWorkspaces) { + const workflowAutomatedTriggerRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + activeWorkspace.id, + 'workflowAutomatedTrigger', + ); + + const workflowAutomatedCronTriggers = + await workflowAutomatedTriggerRepository.find({ + where: { type: AutomatedTriggerType.CRON }, + }); + + for (const workflowAutomatedCronTrigger of workflowAutomatedCronTriggers) { + if (!isDefined(workflowAutomatedCronTrigger.settings.pattern)) { + continue; + } + + if (!shouldRunNow(workflowAutomatedCronTrigger.settings.pattern, now)) { + continue; + } + + await this.messageQueueService.add( + WorkflowTriggerJob.name, + { + workspaceId: activeWorkspace.id, + workflowId: workflowAutomatedCronTrigger.workflowId, + payload: {}, + }, + { retryLimit: 3 }, + ); + } + } + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils.ts new file mode 100644 index 000000000..960567820 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils.ts @@ -0,0 +1,20 @@ +import { CronExpressionParser } from 'cron-parser'; + +export const shouldRunNow = ( + pattern: string, + now: Date, + rootCronIntervalMs = 60_000, +) => { + try { + const interval = CronExpressionParser.parse(pattern, { + currentDate: now, + }); + + const prevTriggerDate = interval.prev(); + const diff = Math.abs(prevTriggerDate.getTime() - now.getTime()); + + return diff < rootCronIntervalMs; + } catch { + return false; + } +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/tests/should-run-now.utils.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/tests/should-run-now.utils.spec.ts new file mode 100644 index 000000000..a22296d07 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/tests/should-run-now.utils.spec.ts @@ -0,0 +1,47 @@ +import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils'; + +const getNowDate = (hour: string) => { + return new Date(`2025-01-01T${hour}.100Z`); +}; + +describe('shouldRunNow', () => { + it('returns true when now matches cron pattern */1 * * * *', () => { + const cron = '*/1 * * * *'; + + expect(shouldRunNow(cron, getNowDate('10:00:00'))).toBe(true); + }); + + it('returns true with a 50s root cron delay', () => { + const cron = '*/1 * * * *'; + + expect(shouldRunNow(cron, getNowDate('10:00:50'))).toBe(true); + }); + + it('returns true 5 times in a row for a */5 pattern', () => { + const cron = '*/5 * * * *'; // every 5 minutes + + expect(shouldRunNow(cron, getNowDate('09:59:00'))).toBe(false); + expect(shouldRunNow(cron, getNowDate('10:00:00'))).toBe(true); + expect(shouldRunNow(cron, getNowDate('10:01:00'))).toBe(false); + expect(shouldRunNow(cron, getNowDate('10:02:00'))).toBe(false); + expect(shouldRunNow(cron, getNowDate('10:03:00'))).toBe(false); + expect(shouldRunNow(cron, getNowDate('10:04:00'))).toBe(false); + expect(shouldRunNow(cron, getNowDate('10:05:00'))).toBe(true); + expect(shouldRunNow(cron, getNowDate('10:06:00'))).toBe(false); + }); + + it('returns false for invalid cron pattern', () => { + const cron = 'invalid-cron'; + + expect(shouldRunNow(cron, getNowDate('10:00:00'))).toBe(false); + }); + + it('returns false if the next run is outside the interval window (2 minutes)', () => { + const cron = '*/10 * * * *'; // every 10 minutes + const interval2min = 2 * 60_000; + + expect(shouldRunNow(cron, getNowDate('10:06:00'), interval2min)).toBe( + false, + ); + }); +}); diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/listeners/database-event-trigger.listener.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener.ts similarity index 78% rename from packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/listeners/database-event-trigger.listener.ts rename to packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener.ts index 36f78a194..7d6406dbf 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/listeners/database-event-trigger.listener.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener.ts @@ -13,11 +13,11 @@ import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queu import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type'; -import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; import { WorkflowTriggerJob, WorkflowTriggerJobData, } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job'; +import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; @Injectable() export class DatabaseEventTriggerListener { @@ -67,9 +67,9 @@ export class DatabaseEventTriggerListener { >, ) { const workspaceId = payload.workspaceId; - const eventName = payload.name; + const databaseEventName = payload.name; - if (!workspaceId || !eventName) { + if (!workspaceId || !databaseEventName) { this.logger.error( `Missing workspaceId or eventName in payload ${JSON.stringify( payload, @@ -89,19 +89,43 @@ export class DatabaseEventTriggerListener { return; } + // Todo: uncomment that when data are migrated to workflowAutomatedTrigger + /* + const workflowAutomatedTriggerRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowAutomatedTrigger', + ); + + const eventListeners = await workflowAutomatedTriggerRepository.find({ + where: { + type: AutomatedTriggerType.DATABASE_EVENT, + settings: { eventName: databaseEventName }, + }, + }); + */ + // end Todo + + // Todo: remove that when data are migrated to workflowAutomatedTrigger const workflowEventListenerRepository = await this.twentyORMGlobalManager.getRepositoryForWorkspace( workspaceId, 'workflowEventListener', ); - const eventListeners = await workflowEventListenerRepository.find({ - where: { - eventName, - }, + const oldEventListeners = await workflowEventListenerRepository.find({ + where: { eventName: databaseEventName }, }); - for (const eventListener of eventListeners) { + // end Todo + + // Todo: uncomment that when data are migrated to workflowAutomatedTrigger + //for (const eventListener of eventListeners) { + // end Todo + + // Todo: remove that when data are migrated to workflowAutomatedTrigger + for (const eventListener of oldEventListeners) { + // end Todo for (const eventPayload of payload.events) { this.messageQueueService.add( WorkflowTriggerJob.name, diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.module.ts deleted file mode 100644 index 8d69223b5..000000000 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.module.ts +++ /dev/null @@ -1,12 +0,0 @@ -import { Module } from '@nestjs/common'; - -import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module'; -import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service'; -import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/database-event-trigger/listeners/database-event-trigger.listener'; - -@Module({ - imports: [FeatureFlagModule], - providers: [DatabaseEventTriggerService, DatabaseEventTriggerListener], - exports: [DatabaseEventTriggerService], -}) -export class DatabaseEventTriggerModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service.ts deleted file mode 100644 index da8affbb6..000000000 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { EntityManager } from 'typeorm'; - -import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; -import { WorkflowDatabaseEventTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; - -@Injectable() -export class DatabaseEventTriggerService { - constructor(private readonly twentyORMManager: TwentyORMManager) {} - - async createEventListener( - workflowId: string, - trigger: WorkflowDatabaseEventTrigger, - manager: EntityManager, - ) { - const eventName = trigger.settings.eventName; - - const workflowEventListenerRepository = - await this.twentyORMManager.getRepository( - 'workflowEventListener', - ); - - const workflowEventListener = await workflowEventListenerRepository.create({ - workflowId, - eventName, - }); - - await workflowEventListenerRepository.save( - workflowEventListener, - {}, - manager, - ); - } - - async deleteEventListener(workflowId: string, manager: EntityManager) { - const workflowEventListenerRepository = - await this.twentyORMManager.getRepository( - 'workflowEventListener', - ); - - await workflowEventListenerRepository.delete( - { - workflowId, - }, - manager, - ); - } -} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.module.ts index 103f677de..a76c92dc7 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workflow-trigger.module.ts @@ -5,16 +5,16 @@ import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module'; -import { DatabaseEventTriggerModule } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.module'; import { WorkflowTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job'; import { WorkflowTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; +import { AutomatedTriggerModule } from 'src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.module'; @Module({ imports: [ WorkflowCommonModule, WorkflowRunnerModule, - DatabaseEventTriggerModule, + AutomatedTriggerModule, NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'), ], providers: [ diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts index 30b6768f5..a3b25bc93 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts @@ -4,9 +4,6 @@ import { InjectRepository } from '@nestjs/typeorm'; import { EntityManager, Repository } from 'typeorm'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; -import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; -import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; -import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; @@ -23,19 +20,16 @@ import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/work import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; import { WORKFLOW_VERSION_STATUS_UPDATED } from 'src/modules/workflow/workflow-status/constants/workflow-version-status-updated.constants'; import { WorkflowVersionStatusUpdate } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; -import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service'; import { WorkflowTriggerException, WorkflowTriggerExceptionCode, } from 'src/modules/workflow/workflow-trigger/exceptions/workflow-trigger.exception'; -import { - WorkflowTriggerJob, - WorkflowTriggerJobData, -} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job'; import { WorkflowTriggerType } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; import { assertVersionCanBeActivated } from 'src/modules/workflow/workflow-trigger/utils/assert-version-can-be-activated.util'; import { computeCronPatternFromSchedule } from 'src/modules/workflow/workflow-trigger/utils/compute-cron-pattern-from-schedule'; import { assertNever } from 'src/utils/assert'; +import { AutomatedTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service'; +import { AutomatedTriggerType } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; @Injectable() export class WorkflowTriggerWorkspaceService { @@ -44,12 +38,10 @@ export class WorkflowTriggerWorkspaceService { private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService, - private readonly databaseEventTriggerService: DatabaseEventTriggerService, + private readonly automatedTriggerWorkspaceService: AutomatedTriggerWorkspaceService, private readonly workspaceEventEmitter: WorkspaceEventEmitter, @InjectRepository(ObjectMetadataEntity, 'metadata') private readonly objectMetadataRepository: Repository, - @InjectMessageQueue(MessageQueue.workflowQueue) - private readonly messageQueueService: MessageQueueService, ) {} private getWorkspaceId() { @@ -332,33 +324,29 @@ export class WorkflowTriggerWorkspaceService { assertWorkflowVersionTriggerIsDefined(workflowVersion); switch (workflowVersion.trigger.type) { - case WorkflowTriggerType.DATABASE_EVENT: - await this.databaseEventTriggerService.createEventListener( - workflowVersion.workflowId, - workflowVersion.trigger, - manager, - ); - - return; case WorkflowTriggerType.MANUAL: case WorkflowTriggerType.WEBHOOK: return; + case WorkflowTriggerType.DATABASE_EVENT: { + const eventName = workflowVersion.trigger.settings.eventName; + + await this.automatedTriggerWorkspaceService.addAutomatedTrigger({ + workflowId: workflowVersion.workflowId, + type: AutomatedTriggerType.DATABASE_EVENT, + settings: { eventName }, + manager, + }); + + return; + } case WorkflowTriggerType.CRON: { const pattern = computeCronPatternFromSchedule(workflowVersion.trigger); - await this.messageQueueService.addCron({ - jobName: WorkflowTriggerJob.name, - jobId: workflowVersion.workflowId, - data: { - workspaceId: this.getWorkspaceId(), - workflowId: workflowVersion.workflowId, - payload: {}, - }, - options: { - repeat: { - pattern, - }, - }, + await this.automatedTriggerWorkspaceService.addAutomatedTrigger({ + workflowId: workflowVersion.workflowId, + type: AutomatedTriggerType.CRON, + settings: { pattern }, + manager, }); return; @@ -377,21 +365,15 @@ export class WorkflowTriggerWorkspaceService { switch (workflowVersion.trigger.type) { case WorkflowTriggerType.DATABASE_EVENT: - await this.databaseEventTriggerService.deleteEventListener( - workflowVersion.workflowId, + case WorkflowTriggerType.CRON: + await this.automatedTriggerWorkspaceService.deleteAutomatedTrigger({ + workflowId: workflowVersion.workflowId, manager, - ); + }); return; case WorkflowTriggerType.MANUAL: case WorkflowTriggerType.WEBHOOK: - return; - case WorkflowTriggerType.CRON: - await this.messageQueueService.removeCron({ - jobName: WorkflowTriggerJob.name, - jobId: workflowVersion.workflowId, - }); - return; default: assertNever(workflowVersion.trigger); diff --git a/packages/twenty-website/src/content/developers/self-hosting/setup.mdx b/packages/twenty-website/src/content/developers/self-hosting/setup.mdx index 42126169b..1d6f05580 100644 --- a/packages/twenty-website/src/content/developers/self-hosting/setup.mdx +++ b/packages/twenty-website/src/content/developers/self-hosting/setup.mdx @@ -65,6 +65,7 @@ yarn command:prod cron:calendar:calendar-event-list-fetch yarn command:prod cron:calendar:calendar-events-import yarn command:prod cron:messaging:ongoing-stale yarn command:prod cron:calendar:ongoing-stale +yarn command:prod cron:workflow:automated-cron-trigger ``` ## For Outlook and Outlook Calendar (Microsoft 365) @@ -135,6 +136,7 @@ yarn command:prod cron:calendar:calendar-event-list-fetch yarn command:prod cron:calendar:calendar-events-import yarn command:prod cron:messaging:ongoing-stale yarn command:prod cron:calendar:ongoing-stale +yarn command:prod cron:workflow:automated-cron-trigger ``` # Setup Environment Variables diff --git a/yarn.lock b/yarn.lock index 36e6a027b..3b4ffef3a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -30213,6 +30213,15 @@ __metadata: languageName: node linkType: hard +"cron-parser@npm:^5.1.1": + version: 5.1.1 + resolution: "cron-parser@npm:5.1.1" + dependencies: + luxon: "npm:^3.6.1" + checksum: 10c0/6e0a62833111974884407eb9e2c57f4d6abdddf302f9e43097363c57a1e32bd950f17fb4f14d857ce44bf73a6e5822918c9e5951a4b274c2eb02d5ec03e34b15 + languageName: node + linkType: hard + "cron-validate@npm:^1.4.5": version: 1.4.5 resolution: "cron-validate@npm:1.4.5" @@ -42069,6 +42078,13 @@ __metadata: languageName: node linkType: hard +"luxon@npm:^3.6.1": + version: 3.6.1 + resolution: "luxon@npm:3.6.1" + checksum: 10c0/906d57a9dc4d1de9383f2e9223e378c298607c1b4d17b6657b836a3cd120feb1c1de3b5d06d846a3417e1ca764de8476e8c23b3cd4083b5cdb870adcb06a99d5 + languageName: node + linkType: hard + "luxon@npm:~3.3.0": version: 3.3.0 resolution: "luxon@npm:3.3.0" @@ -55407,6 +55423,7 @@ __metadata: class-transformer: "npm:^0.5.1" clsx: "npm:^2.1.1" concurrently: "npm:^8.2.2" + cron-parser: "npm:^5.1.1" cron-validate: "npm:^1.4.5" cross-env: "npm:^7.0.3" cross-var: "npm:^1.1.0"