switch datasourcing (#12825)

This PR improves error handling in `handleDriverException` by adding a
duck-typing check for `MessageImportDriverException` objects. It ensures
that errors are correctly identified and processed even if they are
received as plain objects rather than class instances. This change
prevents missed exception handling and increases the robustness of the
message import process.
This commit is contained in:
Guillim
2025-06-24 15:32:16 +02:00
committed by GitHub
parent b063510c79
commit 7a9ec55c5c
4 changed files with 48 additions and 69 deletions

View File

@ -1,7 +1,7 @@
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
import { Any, Repository } from 'typeorm'; import { Repository } from 'typeorm';
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
@ -12,6 +12,7 @@ import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queu
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { import {
CalendarEventListFetchJob, CalendarEventListFetchJob,
CalendarEventListFetchJobData, CalendarEventListFetchJobData,
@ -31,6 +32,7 @@ export class CalendarEventListFetchCronJob {
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly exceptionHandlerService: ExceptionHandlerService, private readonly exceptionHandlerService: ExceptionHandlerService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {} ) {}
@Process(CalendarEventListFetchCronJob.name) @Process(CalendarEventListFetchCronJob.name)
@ -45,23 +47,18 @@ export class CalendarEventListFetchCronJob {
}, },
}); });
const mainDataSource =
await this.workspaceDataSourceService.connectToMainDataSource();
for (const activeWorkspace of activeWorkspaces) { for (const activeWorkspace of activeWorkspaces) {
try { try {
const calendarChannelRepository = const schemaName = this.workspaceDataSourceService.getSchemaName(
await this.twentyORMGlobalManager.getRepositoryForWorkspace( activeWorkspace.id,
activeWorkspace.id, );
'calendarChannel',
);
const calendarChannels = await calendarChannelRepository.find({ const calendarChannels = await mainDataSource.query(
where: { `SELECT * FROM ${schemaName}."calendarChannel" WHERE "isSyncEnabled" = true AND "syncStage" IN ('${CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING}', '${CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING}')`,
isSyncEnabled: true, );
syncStage: Any([
CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING,
CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING,
]),
},
});
for (const calendarChannel of calendarChannels) { for (const calendarChannel of calendarChannels) {
await this.messageQueueService.add<CalendarEventListFetchJobData>( await this.messageQueueService.add<CalendarEventListFetchJobData>(

View File

@ -1,7 +1,7 @@
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
import { Equal, Repository } from 'typeorm'; import { Repository } from 'typeorm';
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
@ -11,7 +11,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { CalendarEventListFetchJobData } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job'; import { CalendarEventListFetchJobData } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job';
import { CalendarEventsImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-events-import.job'; import { CalendarEventsImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-events-import.job';
import { CalendarChannelSyncStage } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; import { CalendarChannelSyncStage } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
@ -27,7 +27,7 @@ export class CalendarEventsImportCronJob {
private readonly workspaceRepository: Repository<Workspace>, private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.calendarQueue) @InjectMessageQueue(MessageQueue.calendarQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly exceptionHandlerService: ExceptionHandlerService, private readonly exceptionHandlerService: ExceptionHandlerService,
) {} ) {}
@ -42,23 +42,18 @@ export class CalendarEventsImportCronJob {
activationStatus: WorkspaceActivationStatus.ACTIVE, activationStatus: WorkspaceActivationStatus.ACTIVE,
}, },
}); });
const mainDataSource =
await this.workspaceDataSourceService.connectToMainDataSource();
for (const activeWorkspace of activeWorkspaces) { for (const activeWorkspace of activeWorkspaces) {
try { try {
const calendarChannelRepository = const schemaName = this.workspaceDataSourceService.getSchemaName(
await this.twentyORMGlobalManager.getRepositoryForWorkspace( activeWorkspace.id,
activeWorkspace.id, );
'calendarChannel',
);
const calendarChannels = await calendarChannelRepository.find({ const calendarChannels = await mainDataSource.query(
where: { `SELECT * FROM ${schemaName}."calendarChannel" WHERE "isSyncEnabled" = true AND "syncStage" = '${CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING}'`,
isSyncEnabled: true, );
syncStage: Equal(
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING,
),
},
});
for (const calendarChannel of calendarChannels) { for (const calendarChannel of calendarChannels) {
await this.messageQueueService.add<CalendarEventListFetchJobData>( await this.messageQueueService.add<CalendarEventListFetchJobData>(

View File

@ -1,7 +1,7 @@
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
import { In, Repository } from 'typeorm'; import { Repository } from 'typeorm';
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
@ -11,11 +11,8 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { import {
MessagingMessageListFetchJob, MessagingMessageListFetchJob,
MessagingMessageListFetchJobData, MessagingMessageListFetchJobData,
@ -30,7 +27,7 @@ export class MessagingMessageListFetchCronJob {
private readonly workspaceRepository: Repository<Workspace>, private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.messagingQueue) @InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly exceptionHandlerService: ExceptionHandlerService, private readonly exceptionHandlerService: ExceptionHandlerService,
) {} ) {}
@ -46,23 +43,18 @@ export class MessagingMessageListFetchCronJob {
}, },
}); });
const mainDataSource =
await this.workspaceDataSourceService.connectToMainDataSource();
for (const activeWorkspace of activeWorkspaces) { for (const activeWorkspace of activeWorkspaces) {
try { try {
const messageChannelRepository = const schemaName = this.workspaceDataSourceService.getSchemaName(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>( activeWorkspace.id,
activeWorkspace.id, );
'messageChannel',
);
const messageChannels = await messageChannelRepository.find({ const messageChannels = await mainDataSource.query(
where: { `SELECT * FROM ${schemaName}."messageChannel" WHERE "isSyncEnabled" = true AND "syncStage" IN ('${MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING}', '${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}')`,
isSyncEnabled: true, );
syncStage: In([
MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
]),
},
});
for (const messageChannel of messageChannels) { for (const messageChannel of messageChannels) {
await this.messageQueueService.add<MessagingMessageListFetchJobData>( await this.messageQueueService.add<MessagingMessageListFetchJobData>(

View File

@ -11,11 +11,8 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { import {
MessagingMessagesImportJob, MessagingMessagesImportJob,
MessagingMessagesImportJobData, MessagingMessagesImportJobData,
@ -30,8 +27,8 @@ export class MessagingMessagesImportCronJob {
private readonly workspaceRepository: Repository<Workspace>, private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.messagingQueue) @InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly exceptionHandlerService: ExceptionHandlerService, private readonly exceptionHandlerService: ExceptionHandlerService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {} ) {}
@Process(MessagingMessagesImportCronJob.name) @Process(MessagingMessagesImportCronJob.name)
@ -46,20 +43,18 @@ export class MessagingMessagesImportCronJob {
}, },
}); });
const mainDataSource =
await this.workspaceDataSourceService.connectToMainDataSource();
for (const activeWorkspace of activeWorkspaces) { for (const activeWorkspace of activeWorkspaces) {
try { try {
const messageChannelRepository = const schemaName = this.workspaceDataSourceService.getSchemaName(
await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>( activeWorkspace.id,
activeWorkspace.id, );
'messageChannel',
);
const messageChannels = await messageChannelRepository.find({ const messageChannels = await mainDataSource.query(
where: { `SELECT * FROM ${schemaName}."messageChannel" WHERE "isSyncEnabled" = true AND "syncStage" = '${MessageChannelSyncStage.MESSAGES_IMPORT_PENDING}'`,
isSyncEnabled: true, );
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
},
});
for (const messageChannel of messageChannels) { for (const messageChannel of messageChannels) {
await this.messageQueueService.add<MessagingMessagesImportJobData>( await this.messageQueueService.add<MessagingMessagesImportJobData>(