866 refactor cron trigger only one cron each minutes triggers all cron triggers (#11809)

<img width="1123" alt="image"
src="https://github.com/user-attachments/assets/75447922-81dd-4cfc-805d-f511f73cc778"
/>
This commit is contained in:
martmull
2025-04-30 17:08:47 +02:00
committed by GitHub
parent 357649db95
commit 849a35955a
25 changed files with 595 additions and 116 deletions

View File

@ -77,6 +77,7 @@
"bytes": "^3.1.2",
"class-transformer": "^0.5.1",
"clsx": "^2.1.1",
"cron-parser": "^5.1.1",
"cron-validate": "^1.4.5",
"cross-env": "^7.0.3",
"css-loader": "^7.1.2",

View File

@ -0,0 +1,72 @@
import { InjectRepository } from '@nestjs/typeorm';
import { Command } from 'nest-commander';
import { Repository } from 'typeorm';
import {
ActiveOrSuspendedWorkspacesMigrationCommandRunner,
RunOnWorkspaceArgs,
} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import {
AutomatedTriggerType,
WorkflowAutomatedTriggerWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
@Command({
name: 'upgrade:0-53:migrate-workflow-event-listeners-to-automated-triggers',
description: 'Migrate workflow event listeners to automated triggers',
})
export class MigrateWorkflowEventListenersToAutomatedTriggersCommand extends ActiveOrSuspendedWorkspacesMigrationCommandRunner {
constructor(
@InjectRepository(Workspace, 'core')
protected readonly workspaceRepository: Repository<Workspace>,
protected readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {
super(workspaceRepository, twentyORMGlobalManager);
}
override async runOnWorkspace({
index,
total,
workspaceId,
}: RunOnWorkspaceArgs): Promise<void> {
this.logger.log(
`Running command for workspace ${workspaceId} ${index + 1}/${total}`,
);
const workflowEventListenerRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowEventListenerWorkspaceEntity>(
workspaceId,
'workflowEventListener',
);
const workflowAutomatedTriggerRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowAutomatedTriggerWorkspaceEntity>(
workspaceId,
'workflowAutomatedTrigger',
);
const workflowEventListeners = await workflowEventListenerRepository.find();
await workflowAutomatedTriggerRepository.delete({
type: AutomatedTriggerType.DATABASE_EVENT,
});
for (const eventListener of workflowEventListeners) {
const { eventName, ...rest } = eventListener;
await workflowAutomatedTriggerRepository.save({
...rest,
type: AutomatedTriggerType.DATABASE_EVENT,
settings: { eventName },
});
}
this.logger.log(
`Migrated ${workflowEventListeners.length} workflow event listeners to automated triggers`,
);
}
}

View File

@ -0,0 +1,16 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { MigrateWorkflowEventListenersToAutomatedTriggersCommand } from 'src/database/commands/upgrade-version-command/0-53/0-53-migrate-workflow-event-listeners-to-automated-triggers.command';
@Module({
imports: [
TypeOrmModule.forFeature([Workspace], 'core'),
WorkspaceDataSourceModule,
],
providers: [MigrateWorkflowEventListenersToAutomatedTriggersCommand],
exports: [MigrateWorkflowEventListenersToAutomatedTriggersCommand],
})
export class V0_53_UpgradeVersionCommandModule {}

View File

@ -6,6 +6,7 @@ import { V0_44_UpgradeVersionCommandModule } from 'src/database/commands/upgrade
import { V0_50_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-50/0-50-upgrade-version-command.module';
import { V0_51_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-51/0-51-upgrade-version-command.module';
import { V0_52_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-52/0-52-upgrade-version-command.module';
import { V0_53_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-53/0-53-upgrade-version-command.module';
import { UpgradeCommand } from 'src/database/commands/upgrade-version-command/upgrade.command';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { WorkspaceSyncMetadataModule } from 'src/engine/workspace-manager/workspace-sync-metadata/workspace-sync-metadata.module';
@ -18,6 +19,7 @@ import { WorkspaceSyncMetadataModule } from 'src/engine/workspace-manager/worksp
V0_50_UpgradeVersionCommandModule,
V0_51_UpgradeVersionCommandModule,
V0_52_UpgradeVersionCommandModule,
V0_53_UpgradeVersionCommandModule,
WorkspaceSyncMetadataModule,
],
providers: [UpgradeCommand],

View File

@ -23,6 +23,7 @@ import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twent
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { SyncWorkspaceMetadataCommand } from 'src/engine/workspace-manager/workspace-sync-metadata/commands/sync-workspace-metadata.command';
import { MigrateWorkflowEventListenersToAutomatedTriggersCommand } from 'src/database/commands/upgrade-version-command/0-53/0-53-migrate-workflow-event-listeners-to-automated-triggers.command';
type VersionCommands = {
beforeSyncMetadata: ActiveOrSuspendedWorkspacesMigrationCommandRunner[];
@ -60,6 +61,9 @@ export class UpgradeCommand extends UpgradeCommandRunner {
// 0.52 Commands
protected readonly upgradeDateAndDateTimeFieldsSettingsJsonCommand: UpgradeDateAndDateTimeFieldsSettingsJsonCommand,
protected readonly migrateRelationsToFieldMetadataCommand: MigrateRelationsToFieldMetadataCommand,
// 0.53 Commands
protected readonly migrateWorkflowEventListenersToAutomatedTriggersCommand: MigrateWorkflowEventListenersToAutomatedTriggersCommand,
) {
super(
workspaceRepository,
@ -93,7 +97,7 @@ export class UpgradeCommand extends UpgradeCommandRunner {
afterSyncMetadata: [],
};
const commands_051: VersionCommands = {
const _commands_051: VersionCommands = {
beforeSyncMetadata: [this.upgradeCreatedByEnumCommand],
afterSyncMetadata: [],
};
@ -106,7 +110,14 @@ export class UpgradeCommand extends UpgradeCommandRunner {
afterSyncMetadata: [],
};
this.commands = commands_051;
const commands_053: VersionCommands = {
beforeSyncMetadata: [],
afterSyncMetadata: [
this.migrateWorkflowEventListenersToAutomatedTriggersCommand,
],
};
this.commands = commands_053;
}
override async runBeforeSyncMetadata(args: RunOnWorkspaceArgs) {

View File

@ -458,6 +458,12 @@ export const WORKFLOW_EVENT_LISTENER_STANDARD_FIELD_IDS = {
workflow: '20202020-4082-4641-8569-dc08d5365002',
};
export const WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS = {
type: '20202020-3319-4234-a34c-3f92c1ab56e7',
settings: '20202020-3319-4234-a34c-bac8f903de12',
workflow: '20202020-3319-4234-a34c-8e1a4d2f7c03',
};
export const WORKFLOW_STANDARD_FIELD_IDS = {
name: '20202020-b3d3-478f-acc0-5d901e725b20',
lastPublishedVersionId: '20202020-326a-4fba-8639-3456c0a169e8',
@ -466,6 +472,7 @@ export const WORKFLOW_STANDARD_FIELD_IDS = {
versions: '20202020-9432-416e-8f3c-27ee3153d099',
runs: '20202020-759b-4340-b58b-e73595c4df4f',
eventListeners: '20202020-0229-4c66-832e-035c67579a38',
automatedTriggers: '20202020-3319-4234-a34c-117ecad2b8a9',
favorites: '20202020-c554-4c41-be7a-cf9cd4b0d512',
timelineActivities: '20202020-906e-486a-a798-131a5f081faf',
createdBy: '20202020-6007-401a-8aa5-e6f48581a6f3',

View File

@ -38,5 +38,6 @@ export const STANDARD_OBJECT_ICONS = {
workflowEventListener: 'IconSettingsAutomation',
workflowRun: 'IconHistoryToggle',
workflowVersion: 'IconVersions',
workflowAutomatedTrigger: 'IconSettingsAutomation',
workspaceMember: 'IconUserCircle',
};

View File

@ -49,4 +49,5 @@ export const STANDARD_OBJECT_IDS = {
workflowRun: '20202020-4e28-4e95-a9d7-6c00874f843c',
workflowVersion: '20202020-d65d-4ab9-9344-d77bfb376a3d',
workspaceMember: '20202020-3319-4234-a34c-82d5c0e881a6',
workflowAutomatedTrigger: '20202020-3319-4234-a34c-7f3b9d2e4d1f',
};

View File

@ -36,6 +36,7 @@ import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
import { WorkflowAutomatedTriggerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
// TODO: Maybe we should automate this with the DiscoverService of Nest.JS
export const standardObjectMetadataDefinitions = [
@ -64,6 +65,7 @@ export const standardObjectMetadataDefinitions = [
WorkflowEventListenerWorkspaceEntity,
WorkflowVersionWorkspaceEntity,
WorkflowRunWorkspaceEntity,
WorkflowAutomatedTriggerWorkspaceEntity,
WorkspaceMemberWorkspaceEntity,
MessageThreadWorkspaceEntity,
MessageWorkspaceEntity,

View File

@ -0,0 +1,84 @@
import { msg } from '@lingui/core/macro';
import { Relation } from 'typeorm';
import { FieldMetadataType } from 'twenty-shared/types';
import { RelationType } from 'src/engine/metadata-modules/field-metadata/interfaces/relation-type.interface';
import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity';
import { WorkspaceEntity } from 'src/engine/twenty-orm/decorators/workspace-entity.decorator';
import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is-system.decorator';
import { STANDARD_OBJECT_ICONS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-icons';
import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator';
import { RelationOnDeleteAction } from 'src/engine/metadata-modules/relation-metadata/relation-metadata.entity';
import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-join-column.decorator';
import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator';
import { WorkflowWorkspaceEntity } from './workflow.workspace-entity';
export enum AutomatedTriggerType {
DATABASE_EVENT = 'DATABASE_EVENT',
CRON = 'CRON',
}
export type AutomatedTriggerSettings = {
pattern?: string;
eventName?: string;
};
@WorkspaceEntity({
standardId: STANDARD_OBJECT_IDS.workflowAutomatedTrigger,
namePlural: 'workflowAutomatedTriggers',
labelSingular: msg`WorkflowAutomatedTrigger`,
labelPlural: msg`WorkflowAutomatedTriggers`,
description: msg`A workflow automated trigger`,
icon: STANDARD_OBJECT_ICONS.workflowAutomatedTrigger,
})
@WorkspaceIsSystem()
export class WorkflowAutomatedTriggerWorkspaceEntity extends BaseWorkspaceEntity {
@WorkspaceField({
standardId: WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS.type,
type: FieldMetadataType.SELECT,
label: msg`Automated Trigger Type`,
description: msg`The workflow automated trigger type`,
options: [
{
value: AutomatedTriggerType.DATABASE_EVENT,
label: 'Database Event',
position: 0,
color: 'green',
},
{
value: AutomatedTriggerType.CRON,
label: 'Cron',
position: 1,
color: 'blue',
},
],
})
type: AutomatedTriggerType;
@WorkspaceField({
standardId: WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS.settings,
type: FieldMetadataType.RAW_JSON,
label: msg`Settings`,
description: msg`The workflow automated trigger settings`,
})
settings: AutomatedTriggerSettings;
@WorkspaceRelation({
standardId: WORKFLOW_AUTOMATED_TRIGGER_STANDARD_FIELD_IDS.workflow,
type: RelationType.MANY_TO_ONE,
label: msg`Workflow`,
description: msg`WorkflowAutomatedTrigger workflow`,
icon: 'IconSettingsAutomation',
inverseSideTarget: () => WorkflowWorkspaceEntity,
inverseSideFieldKey: 'automatedTriggers',
onDelete: RelationOnDeleteAction.CASCADE,
})
workflow: Relation<WorkflowWorkspaceEntity>;
@WorkspaceJoinColumn('workflow')
workflowId: string;
}

View File

@ -21,6 +21,7 @@ import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-o
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowAutomatedTriggerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
export enum WorkflowStatus {
DRAFT = 'DRAFT',
@ -135,6 +136,17 @@ export class WorkflowWorkspaceEntity extends BaseWorkspaceEntity {
@WorkspaceIsSystem()
eventListeners: Relation<WorkflowEventListenerWorkspaceEntity[]>;
@WorkspaceRelation({
standardId: WORKFLOW_STANDARD_FIELD_IDS.automatedTriggers,
type: RelationType.ONE_TO_MANY,
label: msg`Automated Triggers`,
description: msg`Workflow automated triggers linked to the workflow.`,
inverseSideTarget: () => WorkflowAutomatedTriggerWorkspaceEntity,
onDelete: RelationOnDeleteAction.CASCADE,
})
@WorkspaceIsSystem()
automatedTriggers: Relation<WorkflowAutomatedTriggerWorkspaceEntity[]>;
@WorkspaceRelation({
standardId: WORKFLOW_STANDARD_FIELD_IDS.favorites,
type: RelationType.ONE_TO_MANY,

View File

@ -19,6 +19,7 @@ import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/exceptions/workflow-trigger.exception';
import { WorkflowAutomatedTriggerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
@Injectable()
export class WorkflowCommonWorkspaceService {
@ -141,11 +142,20 @@ export class WorkflowCommonWorkspaceService {
'workflowEventListener',
);
const workflowAutomatedTriggerRepository =
await this.twentyORMManager.getRepository<WorkflowAutomatedTriggerWorkspaceEntity>(
'workflowAutomatedTrigger',
);
workflowIds.forEach((workflowId) => {
workflowEventListenerRepository.softDelete({
workflowId,
});
workflowAutomatedTriggerRepository.softDelete({
workflowId,
});
workflowRunRepository.softDelete({
workflowId,
});

View File

@ -0,0 +1,21 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AutomatedTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service';
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { CronTriggerCronCommand } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/commands/cron-trigger.cron.command';
import { CronTriggerCronJob } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job';
@Module({
imports: [FeatureFlagModule, TypeOrmModule.forFeature([Workspace], 'core')],
providers: [
AutomatedTriggerWorkspaceService,
DatabaseEventTriggerListener,
CronTriggerCronJob,
CronTriggerCronCommand,
],
exports: [AutomatedTriggerWorkspaceService],
})
export class AutomatedTriggerModule {}

View File

@ -0,0 +1,94 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import {
AutomatedTriggerType,
AutomatedTriggerSettings,
WorkflowAutomatedTriggerWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
@Injectable()
export class AutomatedTriggerWorkspaceService {
constructor(private readonly twentyORMManager: TwentyORMManager) {}
async addAutomatedTrigger({
workflowId,
manager,
type,
settings,
}: {
workflowId: string;
manager: EntityManager;
type: AutomatedTriggerType;
settings: AutomatedTriggerSettings;
}) {
if (type === AutomatedTriggerType.DATABASE_EVENT) {
// Todo: remove workflowEventListenerRepository updates when data are migrated to workflowAutomatedTrigger
const workflowEventListenerRepository =
await this.twentyORMManager.getRepository<WorkflowEventListenerWorkspaceEntity>(
'workflowEventListener',
);
const workflowEventListener = workflowEventListenerRepository.create({
workflowId,
eventName: settings.eventName,
});
await workflowEventListenerRepository.save(
workflowEventListener,
{},
manager,
);
// end-Todo
}
const workflowAutomatedTriggerRepository =
await this.twentyORMManager.getRepository<WorkflowAutomatedTriggerWorkspaceEntity>(
'workflowAutomatedTrigger',
);
const workflowAutomatedTrigger = workflowAutomatedTriggerRepository.create({
type,
settings,
workflowId,
});
await workflowAutomatedTriggerRepository.save(
workflowAutomatedTrigger,
{},
manager,
);
}
async deleteAutomatedTrigger({
workflowId,
manager,
}: {
workflowId: string;
manager: EntityManager;
}) {
// Todo: remove workflowEventListenerRepository updates when data are migrated to workflowAutomatedTrigger
const workflowEventListenerRepository =
await this.twentyORMManager.getRepository<WorkflowEventListenerWorkspaceEntity>(
'workflowEventListener',
);
await workflowEventListenerRepository.delete(
{
workflowId,
},
manager,
);
// end-Todo
const workflowAutomatedTriggerRepository =
await this.twentyORMManager.getRepository<WorkflowAutomatedTriggerWorkspaceEntity>(
'workflowAutomatedTrigger',
);
await workflowAutomatedTriggerRepository.delete({ workflowId }, manager);
}
}

View File

@ -0,0 +1,34 @@
import { Command, CommandRunner } from 'nest-commander';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import {
CRON_TRIGGER_CRON_PATTERN,
CronTriggerCronJob,
} from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job';
@Command({
name: 'cron:workflow:automated-cron-trigger',
description: 'Starts a cron job to trigger cron triggered workflows',
})
export class CronTriggerCronCommand extends CommandRunner {
constructor(
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}
async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>({
jobName: CronTriggerCronJob.name,
data: undefined,
options: {
repeat: {
pattern: CRON_TRIGGER_CRON_PATTERN,
},
},
});
}
}

View File

@ -0,0 +1,81 @@
import { InjectRepository } from '@nestjs/typeorm';
import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
import { Repository } from 'typeorm';
import { isDefined } from 'twenty-shared/utils';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import {
AutomatedTriggerType,
WorkflowAutomatedTriggerWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import {
WorkflowTriggerJob,
WorkflowTriggerJobData,
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job';
import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils';
export const CRON_TRIGGER_CRON_PATTERN = '* * * * *';
@Processor(MessageQueue.cronQueue)
export class CronTriggerCronJob {
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
@Process(CronTriggerCronJob.name)
@SentryCronMonitor(CronTriggerCronJob.name, CRON_TRIGGER_CRON_PATTERN)
async handle() {
const activeWorkspaces = await this.workspaceRepository.find({
where: {
activationStatus: WorkspaceActivationStatus.ACTIVE,
},
});
const now = new Date();
for (const activeWorkspace of activeWorkspaces) {
const workflowAutomatedTriggerRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowAutomatedTriggerWorkspaceEntity>(
activeWorkspace.id,
'workflowAutomatedTrigger',
);
const workflowAutomatedCronTriggers =
await workflowAutomatedTriggerRepository.find({
where: { type: AutomatedTriggerType.CRON },
});
for (const workflowAutomatedCronTrigger of workflowAutomatedCronTriggers) {
if (!isDefined(workflowAutomatedCronTrigger.settings.pattern)) {
continue;
}
if (!shouldRunNow(workflowAutomatedCronTrigger.settings.pattern, now)) {
continue;
}
await this.messageQueueService.add<WorkflowTriggerJobData>(
WorkflowTriggerJob.name,
{
workspaceId: activeWorkspace.id,
workflowId: workflowAutomatedCronTrigger.workflowId,
payload: {},
},
{ retryLimit: 3 },
);
}
}
}
}

View File

@ -0,0 +1,20 @@
import { CronExpressionParser } from 'cron-parser';
export const shouldRunNow = (
pattern: string,
now: Date,
rootCronIntervalMs = 60_000,
) => {
try {
const interval = CronExpressionParser.parse(pattern, {
currentDate: now,
});
const prevTriggerDate = interval.prev();
const diff = Math.abs(prevTriggerDate.getTime() - now.getTime());
return diff < rootCronIntervalMs;
} catch {
return false;
}
};

View File

@ -0,0 +1,47 @@
import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils';
const getNowDate = (hour: string) => {
return new Date(`2025-01-01T${hour}.100Z`);
};
describe('shouldRunNow', () => {
it('returns true when now matches cron pattern */1 * * * *', () => {
const cron = '*/1 * * * *';
expect(shouldRunNow(cron, getNowDate('10:00:00'))).toBe(true);
});
it('returns true with a 50s root cron delay', () => {
const cron = '*/1 * * * *';
expect(shouldRunNow(cron, getNowDate('10:00:50'))).toBe(true);
});
it('returns true 5 times in a row for a */5 pattern', () => {
const cron = '*/5 * * * *'; // every 5 minutes
expect(shouldRunNow(cron, getNowDate('09:59:00'))).toBe(false);
expect(shouldRunNow(cron, getNowDate('10:00:00'))).toBe(true);
expect(shouldRunNow(cron, getNowDate('10:01:00'))).toBe(false);
expect(shouldRunNow(cron, getNowDate('10:02:00'))).toBe(false);
expect(shouldRunNow(cron, getNowDate('10:03:00'))).toBe(false);
expect(shouldRunNow(cron, getNowDate('10:04:00'))).toBe(false);
expect(shouldRunNow(cron, getNowDate('10:05:00'))).toBe(true);
expect(shouldRunNow(cron, getNowDate('10:06:00'))).toBe(false);
});
it('returns false for invalid cron pattern', () => {
const cron = 'invalid-cron';
expect(shouldRunNow(cron, getNowDate('10:00:00'))).toBe(false);
});
it('returns false if the next run is outside the interval window (2 minutes)', () => {
const cron = '*/10 * * * *'; // every 10 minutes
const interval2min = 2 * 60_000;
expect(shouldRunNow(cron, getNowDate('10:06:00'), interval2min)).toBe(
false,
);
});
});

View File

@ -13,11 +13,11 @@ import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queu
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import {
WorkflowTriggerJob,
WorkflowTriggerJobData,
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
@Injectable()
export class DatabaseEventTriggerListener {
@ -67,9 +67,9 @@ export class DatabaseEventTriggerListener {
>,
) {
const workspaceId = payload.workspaceId;
const eventName = payload.name;
const databaseEventName = payload.name;
if (!workspaceId || !eventName) {
if (!workspaceId || !databaseEventName) {
this.logger.error(
`Missing workspaceId or eventName in payload ${JSON.stringify(
payload,
@ -89,19 +89,43 @@ export class DatabaseEventTriggerListener {
return;
}
// Todo: uncomment that when data are migrated to workflowAutomatedTrigger
/*
const workflowAutomatedTriggerRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowAutomatedTriggerWorkspaceEntity>(
workspaceId,
'workflowAutomatedTrigger',
);
const eventListeners = await workflowAutomatedTriggerRepository.find({
where: {
type: AutomatedTriggerType.DATABASE_EVENT,
settings: { eventName: databaseEventName },
},
});
*/
// end Todo
// Todo: remove that when data are migrated to workflowAutomatedTrigger
const workflowEventListenerRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowEventListenerWorkspaceEntity>(
workspaceId,
'workflowEventListener',
);
const eventListeners = await workflowEventListenerRepository.find({
where: {
eventName,
},
const oldEventListeners = await workflowEventListenerRepository.find({
where: { eventName: databaseEventName },
});
for (const eventListener of eventListeners) {
// end Todo
// Todo: uncomment that when data are migrated to workflowAutomatedTrigger
//for (const eventListener of eventListeners) {
// end Todo
// Todo: remove that when data are migrated to workflowAutomatedTrigger
for (const eventListener of oldEventListeners) {
// end Todo
for (const eventPayload of payload.events) {
this.messageQueueService.add<WorkflowTriggerJobData>(
WorkflowTriggerJob.name,

View File

@ -1,12 +0,0 @@
import { Module } from '@nestjs/common';
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service';
import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/database-event-trigger/listeners/database-event-trigger.listener';
@Module({
imports: [FeatureFlagModule],
providers: [DatabaseEventTriggerService, DatabaseEventTriggerListener],
exports: [DatabaseEventTriggerService],
})
export class DatabaseEventTriggerModule {}

View File

@ -1,50 +0,0 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import { WorkflowDatabaseEventTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type';
@Injectable()
export class DatabaseEventTriggerService {
constructor(private readonly twentyORMManager: TwentyORMManager) {}
async createEventListener(
workflowId: string,
trigger: WorkflowDatabaseEventTrigger,
manager: EntityManager,
) {
const eventName = trigger.settings.eventName;
const workflowEventListenerRepository =
await this.twentyORMManager.getRepository<WorkflowEventListenerWorkspaceEntity>(
'workflowEventListener',
);
const workflowEventListener = await workflowEventListenerRepository.create({
workflowId,
eventName,
});
await workflowEventListenerRepository.save(
workflowEventListener,
{},
manager,
);
}
async deleteEventListener(workflowId: string, manager: EntityManager) {
const workflowEventListenerRepository =
await this.twentyORMManager.getRepository<WorkflowEventListenerWorkspaceEntity>(
'workflowEventListener',
);
await workflowEventListenerRepository.delete(
{
workflowId,
},
manager,
);
}
}

View File

@ -5,16 +5,16 @@ import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
import { DatabaseEventTriggerModule } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.module';
import { WorkflowTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job';
import { WorkflowTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { AutomatedTriggerModule } from 'src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.module';
@Module({
imports: [
WorkflowCommonModule,
WorkflowRunnerModule,
DatabaseEventTriggerModule,
AutomatedTriggerModule,
NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'),
],
providers: [

View File

@ -4,9 +4,6 @@ import { InjectRepository } from '@nestjs/typeorm';
import { EntityManager, Repository } from 'typeorm';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
@ -23,19 +20,16 @@ import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/work
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service';
import { WORKFLOW_VERSION_STATUS_UPDATED } from 'src/modules/workflow/workflow-status/constants/workflow-version-status-updated.constants';
import { WorkflowVersionStatusUpdate } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job';
import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service';
import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/exceptions/workflow-trigger.exception';
import {
WorkflowTriggerJob,
WorkflowTriggerJobData,
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job';
import { WorkflowTriggerType } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type';
import { assertVersionCanBeActivated } from 'src/modules/workflow/workflow-trigger/utils/assert-version-can-be-activated.util';
import { computeCronPatternFromSchedule } from 'src/modules/workflow/workflow-trigger/utils/compute-cron-pattern-from-schedule';
import { assertNever } from 'src/utils/assert';
import { AutomatedTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.workspace-service';
import { AutomatedTriggerType } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
@Injectable()
export class WorkflowTriggerWorkspaceService {
@ -44,12 +38,10 @@ export class WorkflowTriggerWorkspaceService {
private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService,
private readonly databaseEventTriggerService: DatabaseEventTriggerService,
private readonly automatedTriggerWorkspaceService: AutomatedTriggerWorkspaceService,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
@InjectRepository(ObjectMetadataEntity, 'metadata')
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
) {}
private getWorkspaceId() {
@ -332,33 +324,29 @@ export class WorkflowTriggerWorkspaceService {
assertWorkflowVersionTriggerIsDefined(workflowVersion);
switch (workflowVersion.trigger.type) {
case WorkflowTriggerType.DATABASE_EVENT:
await this.databaseEventTriggerService.createEventListener(
workflowVersion.workflowId,
workflowVersion.trigger,
manager,
);
return;
case WorkflowTriggerType.MANUAL:
case WorkflowTriggerType.WEBHOOK:
return;
case WorkflowTriggerType.DATABASE_EVENT: {
const eventName = workflowVersion.trigger.settings.eventName;
await this.automatedTriggerWorkspaceService.addAutomatedTrigger({
workflowId: workflowVersion.workflowId,
type: AutomatedTriggerType.DATABASE_EVENT,
settings: { eventName },
manager,
});
return;
}
case WorkflowTriggerType.CRON: {
const pattern = computeCronPatternFromSchedule(workflowVersion.trigger);
await this.messageQueueService.addCron<WorkflowTriggerJobData>({
jobName: WorkflowTriggerJob.name,
jobId: workflowVersion.workflowId,
data: {
workspaceId: this.getWorkspaceId(),
workflowId: workflowVersion.workflowId,
payload: {},
},
options: {
repeat: {
pattern,
},
},
await this.automatedTriggerWorkspaceService.addAutomatedTrigger({
workflowId: workflowVersion.workflowId,
type: AutomatedTriggerType.CRON,
settings: { pattern },
manager,
});
return;
@ -377,21 +365,15 @@ export class WorkflowTriggerWorkspaceService {
switch (workflowVersion.trigger.type) {
case WorkflowTriggerType.DATABASE_EVENT:
await this.databaseEventTriggerService.deleteEventListener(
workflowVersion.workflowId,
case WorkflowTriggerType.CRON:
await this.automatedTriggerWorkspaceService.deleteAutomatedTrigger({
workflowId: workflowVersion.workflowId,
manager,
);
});
return;
case WorkflowTriggerType.MANUAL:
case WorkflowTriggerType.WEBHOOK:
return;
case WorkflowTriggerType.CRON:
await this.messageQueueService.removeCron({
jobName: WorkflowTriggerJob.name,
jobId: workflowVersion.workflowId,
});
return;
default:
assertNever(workflowVersion.trigger);

View File

@ -65,6 +65,7 @@ yarn command:prod cron:calendar:calendar-event-list-fetch
yarn command:prod cron:calendar:calendar-events-import
yarn command:prod cron:messaging:ongoing-stale
yarn command:prod cron:calendar:ongoing-stale
yarn command:prod cron:workflow:automated-cron-trigger
```
## For Outlook and Outlook Calendar (Microsoft 365)
@ -135,6 +136,7 @@ yarn command:prod cron:calendar:calendar-event-list-fetch
yarn command:prod cron:calendar:calendar-events-import
yarn command:prod cron:messaging:ongoing-stale
yarn command:prod cron:calendar:ongoing-stale
yarn command:prod cron:workflow:automated-cron-trigger
```
# Setup Environment Variables

View File

@ -30213,6 +30213,15 @@ __metadata:
languageName: node
linkType: hard
"cron-parser@npm:^5.1.1":
version: 5.1.1
resolution: "cron-parser@npm:5.1.1"
dependencies:
luxon: "npm:^3.6.1"
checksum: 10c0/6e0a62833111974884407eb9e2c57f4d6abdddf302f9e43097363c57a1e32bd950f17fb4f14d857ce44bf73a6e5822918c9e5951a4b274c2eb02d5ec03e34b15
languageName: node
linkType: hard
"cron-validate@npm:^1.4.5":
version: 1.4.5
resolution: "cron-validate@npm:1.4.5"
@ -42069,6 +42078,13 @@ __metadata:
languageName: node
linkType: hard
"luxon@npm:^3.6.1":
version: 3.6.1
resolution: "luxon@npm:3.6.1"
checksum: 10c0/906d57a9dc4d1de9383f2e9223e378c298607c1b4d17b6657b836a3cd120feb1c1de3b5d06d846a3417e1ca764de8476e8c23b3cd4083b5cdb870adcb06a99d5
languageName: node
linkType: hard
"luxon@npm:~3.3.0":
version: 3.3.0
resolution: "luxon@npm:3.3.0"
@ -55407,6 +55423,7 @@ __metadata:
class-transformer: "npm:^0.5.1"
clsx: "npm:^2.1.1"
concurrently: "npm:^8.2.2"
cron-parser: "npm:^5.1.1"
cron-validate: "npm:^1.4.5"
cross-env: "npm:^7.0.3"
cross-var: "npm:^1.1.0"