From ce55b20faaea96b2803a8accf1af965204d9c841 Mon Sep 17 00:00:00 2001 From: martmull Date: Tue, 24 Jun 2025 15:29:00 +0200 Subject: [PATCH] Use main dataSource to query CRON jobs (#12830) As title --- .../automated-trigger.module.ts | 2 ++ .../crons/jobs/cron-trigger.cron.job.ts | 27 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.module.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.module.ts index fa2b83cb4..dbff9374c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/automated-trigger.module.ts @@ -7,11 +7,13 @@ import { AutomatedTriggerWorkspaceService } from 'src/modules/workflow/workflow- import { CronTriggerCronCommand } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/commands/cron-trigger.cron.command'; import { CronTriggerCronJob } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job'; import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener'; +import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; @Module({ imports: [ TypeOrmModule.forFeature([Workspace], 'core'), WorkflowCommonModule, + WorkspaceDataSourceModule, ], providers: [ AutomatedTriggerWorkspaceService, diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts index 300d082a7..fad9137f9 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job.ts @@ -12,28 +12,25 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { - AutomatedTriggerType, - WorkflowAutomatedTriggerWorkspaceEntity, -} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; +import { AutomatedTriggerType } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity'; import { CronTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings'; import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils'; import { WorkflowTriggerJob, WorkflowTriggerJobData, } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; export const CRON_TRIGGER_CRON_PATTERN = '* * * * *'; @Processor(MessageQueue.cronQueue) export class CronTriggerCronJob { constructor( + private readonly workspaceDataSourceService: WorkspaceDataSourceService, @InjectRepository(Workspace, 'core') private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.workflowQueue) private readonly messageQueueService: MessageQueueService, - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly exceptionHandlerService: ExceptionHandlerService, ) {} @@ -48,18 +45,18 @@ export class CronTriggerCronJob { const now = new Date(); + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); + for (const activeWorkspace of activeWorkspaces) { try { - const workflowAutomatedTriggerRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'workflowAutomatedTrigger', - ); + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); - const workflowAutomatedCronTriggers = - await workflowAutomatedTriggerRepository.find({ - where: { type: AutomatedTriggerType.CRON }, - }); + const workflowAutomatedCronTriggers = await mainDataSource.query( + `SELECT * FROM ${schemaName}."workflowAutomatedTrigger" WHERE type = '${AutomatedTriggerType.CRON}'`, + ); for (const workflowAutomatedCronTrigger of workflowAutomatedCronTriggers) { const settings =