6686 Add try catch on every cron job, and send exception to exceptionHandler (#6705)

Closes #6686

---------

Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
Raphaël Bosi
2024-08-22 18:23:05 +02:00
committed by GitHub
parent 1030ff43d8
commit 1eeeae8564
4 changed files with 101 additions and 60 deletions

View File

@ -6,6 +6,7 @@ import {
Workspace, Workspace,
WorkspaceActivationStatus, WorkspaceActivationStatus,
} from 'src/engine/core-modules/workspace/workspace.entity'; } from 'src/engine/core-modules/workspace/workspace.entity';
import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
@ -28,6 +29,7 @@ export class CalendarEventListFetchCronJob {
@InjectMessageQueue(MessageQueue.calendarQueue) @InjectMessageQueue(MessageQueue.calendarQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly exceptionHandlerService: ExceptionHandlerService,
) {} ) {}
@Process(CalendarEventListFetchCronJob.name) @Process(CalendarEventListFetchCronJob.name)
@ -41,30 +43,38 @@ export class CalendarEventListFetchCronJob {
}); });
for (const activeWorkspace of activeWorkspaces) { for (const activeWorkspace of activeWorkspaces) {
const calendarChannelRepository = try {
await this.twentyORMGlobalManager.getRepositoryForWorkspace( const calendarChannelRepository =
activeWorkspace.id, await this.twentyORMGlobalManager.getRepositoryForWorkspace(
'calendarChannel', activeWorkspace.id,
); 'calendarChannel',
);
const calendarChannels = await calendarChannelRepository.find({ const calendarChannels = await calendarChannelRepository.find({
where: { where: {
isSyncEnabled: true, isSyncEnabled: true,
syncStage: Any([ syncStage: Any([
CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING, CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING,
CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING, CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING,
]), ]),
}, },
}); });
for (const calendarChannel of calendarChannels) { for (const calendarChannel of calendarChannels) {
await this.messageQueueService.add<CalendarEventsImportJobData>( await this.messageQueueService.add<CalendarEventsImportJobData>(
CalendarEventListFetchJob.name, CalendarEventListFetchJob.name,
{ {
calendarChannelId: calendarChannel.id, calendarChannelId: calendarChannel.id,
workspaceId: activeWorkspace.id,
},
);
}
} catch (error) {
this.exceptionHandlerService.captureExceptions([error], {
user: {
workspaceId: activeWorkspace.id, workspaceId: activeWorkspace.id,
}, },
); });
} }
} }

View File

@ -1,11 +1,12 @@
import { InjectRepository } from '@nestjs/typeorm'; import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm'; import { In, Repository } from 'typeorm';
import { import {
Workspace, Workspace,
WorkspaceActivationStatus, WorkspaceActivationStatus,
} from 'src/engine/core-modules/workspace/workspace.entity'; } from 'src/engine/core-modules/workspace/workspace.entity';
import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
@ -29,6 +30,7 @@ export class MessagingMessageListFetchCronJob {
@InjectMessageQueue(MessageQueue.messagingQueue) @InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly exceptionHandlerService: ExceptionHandlerService,
) {} ) {}
@Process(MessagingMessageListFetchCronJob.name) @Process(MessagingMessageListFetchCronJob.name)
@ -42,22 +44,24 @@ export class MessagingMessageListFetchCronJob {
}); });
for (const activeWorkspace of activeWorkspaces) { for (const activeWorkspace of activeWorkspaces) {
const messageChannelRepository = try {
await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>( const messageChannelRepository =
activeWorkspace.id, await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>(
'messageChannel', activeWorkspace.id,
); 'messageChannel',
);
const messageChannels = await messageChannelRepository.find(); const messageChannels = await messageChannelRepository.find({
where: {
isSyncEnabled: true,
syncStage: In([
MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
]),
},
});
for (const messageChannel of messageChannels) { for (const messageChannel of messageChannels) {
if (
(messageChannel.isSyncEnabled &&
messageChannel.syncStage ===
MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING) ||
messageChannel.syncStage ===
MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING
) {
await this.messageQueueService.add<MessagingMessageListFetchJobData>( await this.messageQueueService.add<MessagingMessageListFetchJobData>(
MessagingMessageListFetchJob.name, MessagingMessageListFetchJob.name,
{ {
@ -66,6 +70,12 @@ export class MessagingMessageListFetchCronJob {
}, },
); );
} }
} catch (error) {
this.exceptionHandlerService.captureExceptions([error], {
user: {
workspaceId: activeWorkspace.id,
},
});
} }
} }

View File

@ -6,6 +6,7 @@ import {
Workspace, Workspace,
WorkspaceActivationStatus, WorkspaceActivationStatus,
} from 'src/engine/core-modules/workspace/workspace.entity'; } from 'src/engine/core-modules/workspace/workspace.entity';
import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
@ -29,11 +30,13 @@ export class MessagingMessagesImportCronJob {
@InjectMessageQueue(MessageQueue.messagingQueue) @InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly exceptionHandlerService: ExceptionHandlerService,
) {} ) {}
@Process(MessagingMessagesImportCronJob.name) @Process(MessagingMessagesImportCronJob.name)
async handle(): Promise<void> { async handle(): Promise<void> {
console.time('MessagingMessagesImportCronJob time'); console.time('MessagingMessagesImportCronJob time');
const activeWorkspaces = await this.workspaceRepository.find({ const activeWorkspaces = await this.workspaceRepository.find({
where: { where: {
activationStatus: WorkspaceActivationStatus.ACTIVE, activationStatus: WorkspaceActivationStatus.ACTIVE,
@ -41,27 +44,35 @@ export class MessagingMessagesImportCronJob {
}); });
for (const activeWorkspace of activeWorkspaces) { for (const activeWorkspace of activeWorkspaces) {
const messageChannelRepository = try {
await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>( const messageChannelRepository =
activeWorkspace.id, await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>(
'messageChannel', activeWorkspace.id,
); 'messageChannel',
);
const messageChannels = await messageChannelRepository.find({ const messageChannels = await messageChannelRepository.find({
where: { where: {
isSyncEnabled: true, isSyncEnabled: true,
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
},
});
for (const messageChannel of messageChannels) {
await this.messageQueueService.add<MessagingMessagesImportJobData>(
MessagingMessagesImportJob.name,
{
workspaceId: activeWorkspace.id,
messageChannelId: messageChannel.id,
}, },
); });
for (const messageChannel of messageChannels) {
await this.messageQueueService.add<MessagingMessagesImportJobData>(
MessagingMessagesImportJob.name,
{
workspaceId: activeWorkspace.id,
messageChannelId: messageChannel.id,
},
);
}
} catch (error) {
this.exceptionHandlerService.captureExceptions([error], {
user: {
workspaceId: activeWorkspace.id,
},
});
} }
} }

View File

@ -6,6 +6,7 @@ import {
Workspace, Workspace,
WorkspaceActivationStatus, WorkspaceActivationStatus,
} from 'src/engine/core-modules/workspace/workspace.entity'; } from 'src/engine/core-modules/workspace/workspace.entity';
import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
@ -23,6 +24,7 @@ export class MessagingOngoingStaleCronJob {
private readonly workspaceRepository: Repository<Workspace>, private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.messagingQueue) @InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly exceptionHandlerService: ExceptionHandlerService,
) {} ) {}
@Process(MessagingOngoingStaleCronJob.name) @Process(MessagingOngoingStaleCronJob.name)
@ -34,12 +36,20 @@ export class MessagingOngoingStaleCronJob {
}); });
for (const activeWorkspace of activeWorkspaces) { for (const activeWorkspace of activeWorkspaces) {
await this.messageQueueService.add<MessagingOngoingStaleJobData>( try {
MessagingOngoingStaleJob.name, await this.messageQueueService.add<MessagingOngoingStaleJobData>(
{ MessagingOngoingStaleJob.name,
workspaceId: activeWorkspace.id, {
}, workspaceId: activeWorkspace.id,
); },
);
} catch (error) {
this.exceptionHandlerService.captureExceptions([error], {
user: {
workspaceId: activeWorkspace.id,
},
});
}
} }
} }
} }