diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts index 687ffa23d..e88329b39 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts @@ -1,7 +1,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; -import { Any, Repository } from 'typeorm'; +import { Repository } from 'typeorm'; import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; @@ -12,6 +12,7 @@ import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queu import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { CalendarEventListFetchJob, CalendarEventListFetchJobData, @@ -31,6 +32,7 @@ export class CalendarEventListFetchCronJob { private readonly messageQueueService: MessageQueueService, private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly exceptionHandlerService: ExceptionHandlerService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} @Process(CalendarEventListFetchCronJob.name) @@ -45,23 +47,18 @@ export class CalendarEventListFetchCronJob { }, }); + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); + for (const activeWorkspace of activeWorkspaces) { try { - const calendarChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'calendarChannel', - ); + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); - const calendarChannels = await calendarChannelRepository.find({ - where: { - isSyncEnabled: true, - syncStage: Any([ - CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING, - CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING, - ]), - }, - }); + const calendarChannels = await mainDataSource.query( + `SELECT * FROM ${schemaName}."calendarChannel" WHERE "isSyncEnabled" = true AND "syncStage" IN ('${CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING}', '${CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING}')`, + ); for (const calendarChannel of calendarChannels) { await this.messageQueueService.add( diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job.ts index 8f2e0b6d4..7e7398228 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job.ts @@ -1,7 +1,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; -import { Equal, Repository } from 'typeorm'; +import { Repository } from 'typeorm'; import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; @@ -11,7 +11,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { CalendarEventListFetchJobData } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job'; import { CalendarEventsImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-events-import.job'; import { CalendarChannelSyncStage } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; @@ -27,7 +27,7 @@ export class CalendarEventsImportCronJob { private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.calendarQueue) private readonly messageQueueService: MessageQueueService, - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly exceptionHandlerService: ExceptionHandlerService, ) {} @@ -42,23 +42,18 @@ export class CalendarEventsImportCronJob { activationStatus: WorkspaceActivationStatus.ACTIVE, }, }); + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); for (const activeWorkspace of activeWorkspaces) { try { - const calendarChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'calendarChannel', - ); + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); - const calendarChannels = await calendarChannelRepository.find({ - where: { - isSyncEnabled: true, - syncStage: Equal( - CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING, - ), - }, - }); + const calendarChannels = await mainDataSource.query( + `SELECT * FROM ${schemaName}."calendarChannel" WHERE "isSyncEnabled" = true AND "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING}'`, + ); for (const calendarChannel of calendarChannels) { await this.messageQueueService.add( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts index 2375aa209..1e5306d0e 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts @@ -1,7 +1,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; -import { In, Repository } from 'typeorm'; +import { Repository } from 'typeorm'; import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; @@ -11,11 +11,8 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { - MessageChannelSyncStage, - MessageChannelWorkspaceEntity, -} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessageListFetchJob, MessagingMessageListFetchJobData, @@ -30,7 +27,7 @@ export class MessagingMessageListFetchCronJob { private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly exceptionHandlerService: ExceptionHandlerService, ) {} @@ -46,23 +43,18 @@ export class MessagingMessageListFetchCronJob { }, }); + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); + for (const activeWorkspace of activeWorkspaces) { try { - const messageChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'messageChannel', - ); + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); - const messageChannels = await messageChannelRepository.find({ - where: { - isSyncEnabled: true, - syncStage: In([ - MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, - MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, - ]), - }, - }); + const messageChannels = await mainDataSource.query( + `SELECT * FROM ${schemaName}."messageChannel" WHERE "isSyncEnabled" = true AND "syncStage" IN ('${MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING}', '${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}')`, + ); for (const messageChannel of messageChannels) { await this.messageQueueService.add( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts index 71ac47a68..a7181a15f 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts @@ -11,11 +11,8 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { - MessageChannelSyncStage, - MessageChannelWorkspaceEntity, -} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessagesImportJob, MessagingMessagesImportJobData, @@ -30,8 +27,8 @@ export class MessagingMessagesImportCronJob { private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly exceptionHandlerService: ExceptionHandlerService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} @Process(MessagingMessagesImportCronJob.name) @@ -46,20 +43,18 @@ export class MessagingMessagesImportCronJob { }, }); + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); + for (const activeWorkspace of activeWorkspaces) { try { - const messageChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'messageChannel', - ); + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); - const messageChannels = await messageChannelRepository.find({ - where: { - isSyncEnabled: true, - syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, - }, - }); + const messageChannels = await mainDataSource.query( + `SELECT * FROM ${schemaName}."messageChannel" WHERE "isSyncEnabled" = true AND "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_PENDING}'`, + ); for (const messageChannel of messageChannels) { await this.messageQueueService.add(