Use main dataSource to query CRON jobs (#12830)

As title
This commit is contained in:
martmull
2025-06-24 15:29:00 +02:00
committed by GitHub
parent 251a19b87d
commit ce55b20faa
2 changed files with 14 additions and 15 deletions

View File

@ -7,11 +7,13 @@ import { AutomatedTriggerWorkspaceService } from 'src/modules/workflow/workflow-
import { CronTriggerCronCommand } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/commands/cron-trigger.cron.command'; import { CronTriggerCronCommand } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/commands/cron-trigger.cron.command';
import { CronTriggerCronJob } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job'; import { CronTriggerCronJob } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/jobs/cron-trigger.cron.job';
import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener'; import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/automated-trigger/listeners/database-event-trigger.listener';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({ @Module({
imports: [ imports: [
TypeOrmModule.forFeature([Workspace], 'core'), TypeOrmModule.forFeature([Workspace], 'core'),
WorkflowCommonModule, WorkflowCommonModule,
WorkspaceDataSourceModule,
], ],
providers: [ providers: [
AutomatedTriggerWorkspaceService, AutomatedTriggerWorkspaceService,

View File

@ -12,28 +12,25 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { AutomatedTriggerType } from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
import {
AutomatedTriggerType,
WorkflowAutomatedTriggerWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-automated-trigger.workspace-entity';
import { CronTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings'; import { CronTriggerSettings } from 'src/modules/workflow/workflow-trigger/automated-trigger/constants/automated-trigger-settings';
import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils'; import { shouldRunNow } from 'src/modules/workflow/workflow-trigger/automated-trigger/crons/utils/should-run-now.utils';
import { import {
WorkflowTriggerJob, WorkflowTriggerJob,
WorkflowTriggerJobData, WorkflowTriggerJobData,
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job'; } from 'src/modules/workflow/workflow-trigger/jobs/workflow-trigger.job';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
export const CRON_TRIGGER_CRON_PATTERN = '* * * * *'; export const CRON_TRIGGER_CRON_PATTERN = '* * * * *';
@Processor(MessageQueue.cronQueue) @Processor(MessageQueue.cronQueue)
export class CronTriggerCronJob { export class CronTriggerCronJob {
constructor( constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@InjectRepository(Workspace, 'core') @InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>, private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.workflowQueue) @InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly exceptionHandlerService: ExceptionHandlerService, private readonly exceptionHandlerService: ExceptionHandlerService,
) {} ) {}
@ -48,18 +45,18 @@ export class CronTriggerCronJob {
const now = new Date(); const now = new Date();
const mainDataSource =
await this.workspaceDataSourceService.connectToMainDataSource();
for (const activeWorkspace of activeWorkspaces) { for (const activeWorkspace of activeWorkspaces) {
try { try {
const workflowAutomatedTriggerRepository = const schemaName = this.workspaceDataSourceService.getSchemaName(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowAutomatedTriggerWorkspaceEntity>( activeWorkspace.id,
activeWorkspace.id, );
'workflowAutomatedTrigger',
);
const workflowAutomatedCronTriggers = const workflowAutomatedCronTriggers = await mainDataSource.query(
await workflowAutomatedTriggerRepository.find({ `SELECT * FROM ${schemaName}."workflowAutomatedTrigger" WHERE type = '${AutomatedTriggerType.CRON}'`,
where: { type: AutomatedTriggerType.CRON }, );
});
for (const workflowAutomatedCronTrigger of workflowAutomatedCronTriggers) { for (const workflowAutomatedCronTrigger of workflowAutomatedCronTriggers) {
const settings = const settings =