From 7a9ec55c5cdeece231af522ac0b64c4cb48da12d Mon Sep 17 00:00:00 2001 From: Guillim Date: Tue, 24 Jun 2025 15:32:16 +0200 Subject: [PATCH] switch datasourcing (#12825) This PR improves error handling in `handleDriverException` by adding a duck-typing check for `MessageImportDriverException` objects. It ensures that errors are correctly identified and processed even if they are received as plain objects rather than class instances. This change prevents missed exception handling and increases the robustness of the message import process. --- .../calendar-event-list-fetch.cron.job.ts | 27 +++++++-------- .../jobs/calendar-events-import.cron.job.ts | 27 ++++++--------- .../messaging-message-list-fetch.cron.job.ts | 34 +++++++------------ .../messaging-messages-import.cron.job.ts | 29 +++++++--------- 4 files changed, 48 insertions(+), 69 deletions(-) diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts index 687ffa23d..e88329b39 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job.ts @@ -1,7 +1,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; -import { Any, Repository } from 'typeorm'; +import { Repository } from 'typeorm'; import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; @@ -12,6 +12,7 @@ import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queu import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { CalendarEventListFetchJob, CalendarEventListFetchJobData, @@ -31,6 +32,7 @@ export class CalendarEventListFetchCronJob { private readonly messageQueueService: MessageQueueService, private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly exceptionHandlerService: ExceptionHandlerService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} @Process(CalendarEventListFetchCronJob.name) @@ -45,23 +47,18 @@ export class CalendarEventListFetchCronJob { }, }); + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); + for (const activeWorkspace of activeWorkspaces) { try { - const calendarChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'calendarChannel', - ); + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); - const calendarChannels = await calendarChannelRepository.find({ - where: { - isSyncEnabled: true, - syncStage: Any([ - CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING, - CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING, - ]), - }, - }); + const calendarChannels = await mainDataSource.query( + `SELECT * FROM ${schemaName}."calendarChannel" WHERE "isSyncEnabled" = true AND "syncStage" IN ('${CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING}', '${CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING}')`, + ); for (const calendarChannel of calendarChannels) { await this.messageQueueService.add( diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job.ts index 8f2e0b6d4..7e7398228 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job.ts @@ -1,7 +1,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; -import { Equal, Repository } from 'typeorm'; +import { Repository } from 'typeorm'; import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; @@ -11,7 +11,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; import { CalendarEventListFetchJobData } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job'; import { CalendarEventsImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-events-import.job'; import { CalendarChannelSyncStage } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; @@ -27,7 +27,7 @@ export class CalendarEventsImportCronJob { private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.calendarQueue) private readonly messageQueueService: MessageQueueService, - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly exceptionHandlerService: ExceptionHandlerService, ) {} @@ -42,23 +42,18 @@ export class CalendarEventsImportCronJob { activationStatus: WorkspaceActivationStatus.ACTIVE, }, }); + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); for (const activeWorkspace of activeWorkspaces) { try { - const calendarChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'calendarChannel', - ); + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); - const calendarChannels = await calendarChannelRepository.find({ - where: { - isSyncEnabled: true, - syncStage: Equal( - CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING, - ), - }, - }); + const calendarChannels = await mainDataSource.query( + `SELECT * FROM ${schemaName}."calendarChannel" WHERE "isSyncEnabled" = true AND "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING}'`, + ); for (const calendarChannel of calendarChannels) { await this.messageQueueService.add( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts index 2375aa209..1e5306d0e 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job.ts @@ -1,7 +1,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; -import { In, Repository } from 'typeorm'; +import { Repository } from 'typeorm'; import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; @@ -11,11 +11,8 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { - MessageChannelSyncStage, - MessageChannelWorkspaceEntity, -} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessageListFetchJob, MessagingMessageListFetchJobData, @@ -30,7 +27,7 @@ export class MessagingMessageListFetchCronJob { private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, private readonly exceptionHandlerService: ExceptionHandlerService, ) {} @@ -46,23 +43,18 @@ export class MessagingMessageListFetchCronJob { }, }); + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); + for (const activeWorkspace of activeWorkspaces) { try { - const messageChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'messageChannel', - ); + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); - const messageChannels = await messageChannelRepository.find({ - where: { - isSyncEnabled: true, - syncStage: In([ - MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, - MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, - ]), - }, - }); + const messageChannels = await mainDataSource.query( + `SELECT * FROM ${schemaName}."messageChannel" WHERE "isSyncEnabled" = true AND "syncStage" IN ('${MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING}', '${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}')`, + ); for (const messageChannel of messageChannels) { await this.messageQueueService.add( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts index 71ac47a68..a7181a15f 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job.ts @@ -11,11 +11,8 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; -import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; -import { - MessageChannelSyncStage, - MessageChannelWorkspaceEntity, -} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessagesImportJob, MessagingMessagesImportJobData, @@ -30,8 +27,8 @@ export class MessagingMessagesImportCronJob { private readonly workspaceRepository: Repository, @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly exceptionHandlerService: ExceptionHandlerService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, ) {} @Process(MessagingMessagesImportCronJob.name) @@ -46,20 +43,18 @@ export class MessagingMessagesImportCronJob { }, }); + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); + for (const activeWorkspace of activeWorkspaces) { try { - const messageChannelRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - activeWorkspace.id, - 'messageChannel', - ); + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); - const messageChannels = await messageChannelRepository.find({ - where: { - isSyncEnabled: true, - syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, - }, - }); + const messageChannels = await mainDataSource.query( + `SELECT * FROM ${schemaName}."messageChannel" WHERE "isSyncEnabled" = true AND "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_PENDING}'`, + ); for (const messageChannel of messageChannels) { await this.messageQueueService.add(