5617 Create CalendarOngoingStaleCron Job (#6748)
Closes #5617 --------- Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
@ -10,9 +10,12 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
|
|||||||
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
|
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
|
||||||
import { CalendarEventCleanerModule } from 'src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module';
|
import { CalendarEventCleanerModule } from 'src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module';
|
||||||
import { CalendarEventListFetchCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command';
|
import { CalendarEventListFetchCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command';
|
||||||
|
import { CalendarOngoingStaleCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command';
|
||||||
import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job';
|
import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job';
|
||||||
|
import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job';
|
||||||
import { GoogleCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/google-calendar-driver.module';
|
import { GoogleCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/google-calendar-driver.module';
|
||||||
import { CalendarEventListFetchJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job';
|
import { CalendarEventListFetchJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job';
|
||||||
|
import { CalendarOngoingStaleJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job';
|
||||||
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service';
|
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service';
|
||||||
import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service';
|
import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service';
|
||||||
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
|
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
|
||||||
@ -51,6 +54,9 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta
|
|||||||
CalendarEventListFetchCronJob,
|
CalendarEventListFetchCronJob,
|
||||||
CalendarEventListFetchCronCommand,
|
CalendarEventListFetchCronCommand,
|
||||||
CalendarEventListFetchJob,
|
CalendarEventListFetchJob,
|
||||||
|
CalendarOngoingStaleCronJob,
|
||||||
|
CalendarOngoingStaleCronCommand,
|
||||||
|
CalendarOngoingStaleJob,
|
||||||
],
|
],
|
||||||
exports: [CalendarEventsImportService],
|
exports: [CalendarEventsImportService],
|
||||||
})
|
})
|
||||||
|
|||||||
@ -0,0 +1 @@
|
|||||||
|
export const CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT = 1000 * 60 * 60; // 1 hour
|
||||||
@ -0,0 +1,32 @@
|
|||||||
|
import { Command, CommandRunner } from 'nest-commander';
|
||||||
|
|
||||||
|
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
|
||||||
|
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||||
|
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||||
|
import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job';
|
||||||
|
|
||||||
|
const CALENDAR_ONGOING_STALE_CRON_PATTERN = '0 * * * *';
|
||||||
|
|
||||||
|
@Command({
|
||||||
|
name: 'cron:calendar:ongoing-stale',
|
||||||
|
description:
|
||||||
|
'Starts a cron job to check for stale ongoing calendar event imports and put them back to pending',
|
||||||
|
})
|
||||||
|
export class CalendarOngoingStaleCronCommand extends CommandRunner {
|
||||||
|
constructor(
|
||||||
|
@InjectMessageQueue(MessageQueue.cronQueue)
|
||||||
|
private readonly messageQueueService: MessageQueueService,
|
||||||
|
) {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
async run(): Promise<void> {
|
||||||
|
await this.messageQueueService.addCron<undefined>(
|
||||||
|
CalendarOngoingStaleCronJob.name,
|
||||||
|
undefined,
|
||||||
|
{
|
||||||
|
repeat: { pattern: CALENDAR_ONGOING_STALE_CRON_PATTERN },
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,55 @@
|
|||||||
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
|
|
||||||
|
import { Repository } from 'typeorm';
|
||||||
|
|
||||||
|
import {
|
||||||
|
Workspace,
|
||||||
|
WorkspaceActivationStatus,
|
||||||
|
} from 'src/engine/core-modules/workspace/workspace.entity';
|
||||||
|
import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service';
|
||||||
|
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
|
||||||
|
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
|
||||||
|
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
|
||||||
|
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||||
|
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||||
|
import {
|
||||||
|
CalendarOngoingStaleJob,
|
||||||
|
CalendarOngoingStaleJobData,
|
||||||
|
} from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job';
|
||||||
|
|
||||||
|
@Processor(MessageQueue.cronQueue)
|
||||||
|
export class CalendarOngoingStaleCronJob {
|
||||||
|
constructor(
|
||||||
|
@InjectRepository(Workspace, 'core')
|
||||||
|
private readonly workspaceRepository: Repository<Workspace>,
|
||||||
|
@InjectMessageQueue(MessageQueue.calendarQueue)
|
||||||
|
private readonly messageQueueService: MessageQueueService,
|
||||||
|
private readonly exceptionHandlerService: ExceptionHandlerService,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@Process(CalendarOngoingStaleCronJob.name)
|
||||||
|
async handle(): Promise<void> {
|
||||||
|
const activeWorkspaces = await this.workspaceRepository.find({
|
||||||
|
where: {
|
||||||
|
activationStatus: WorkspaceActivationStatus.ACTIVE,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const activeWorkspace of activeWorkspaces) {
|
||||||
|
try {
|
||||||
|
await this.messageQueueService.add<CalendarOngoingStaleJobData>(
|
||||||
|
CalendarOngoingStaleJob.name,
|
||||||
|
{
|
||||||
|
workspaceId: activeWorkspace.id,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
this.exceptionHandlerService.captureExceptions([error], {
|
||||||
|
user: {
|
||||||
|
workspaceId: activeWorkspace.id,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,78 @@
|
|||||||
|
import { Logger, Scope } from '@nestjs/common';
|
||||||
|
|
||||||
|
import { In } from 'typeorm';
|
||||||
|
|
||||||
|
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
|
||||||
|
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
|
||||||
|
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||||
|
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||||
|
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service';
|
||||||
|
import { isSyncStale } from 'src/modules/calendar/calendar-event-import-manager/utils/is-sync-stale.util';
|
||||||
|
import {
|
||||||
|
CalendarChannelSyncStage,
|
||||||
|
CalendarChannelWorkspaceEntity,
|
||||||
|
} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
|
||||||
|
|
||||||
|
export type CalendarOngoingStaleJobData = {
|
||||||
|
workspaceId: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
@Processor({
|
||||||
|
queueName: MessageQueue.messagingQueue,
|
||||||
|
scope: Scope.REQUEST,
|
||||||
|
})
|
||||||
|
export class CalendarOngoingStaleJob {
|
||||||
|
private readonly logger = new Logger(CalendarOngoingStaleJob.name);
|
||||||
|
constructor(
|
||||||
|
private readonly twentyORMManager: TwentyORMManager,
|
||||||
|
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@Process(CalendarOngoingStaleJob.name)
|
||||||
|
async handle(data: CalendarOngoingStaleJobData): Promise<void> {
|
||||||
|
const { workspaceId } = data;
|
||||||
|
|
||||||
|
const calendarChannelRepository =
|
||||||
|
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||||
|
'calendarChannel',
|
||||||
|
);
|
||||||
|
|
||||||
|
const calendarChannels = await calendarChannelRepository.find({
|
||||||
|
where: {
|
||||||
|
syncStage: In([
|
||||||
|
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING,
|
||||||
|
CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING,
|
||||||
|
]),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const calendarChannel of calendarChannels) {
|
||||||
|
if (
|
||||||
|
calendarChannel.syncStageStartedAt &&
|
||||||
|
isSyncStale(calendarChannel.syncStageStartedAt)
|
||||||
|
) {
|
||||||
|
this.logger.log(
|
||||||
|
`Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to pending`,
|
||||||
|
);
|
||||||
|
await this.calendarChannelSyncStatusService.resetSyncStageStartedAt(
|
||||||
|
calendarChannel.id,
|
||||||
|
);
|
||||||
|
|
||||||
|
switch (calendarChannel.syncStage) {
|
||||||
|
case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING:
|
||||||
|
await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch(
|
||||||
|
calendarChannel.id,
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
case CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING:
|
||||||
|
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
|
||||||
|
calendarChannel.id,
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -87,6 +87,17 @@ export class CalendarChannelSyncStatusService {
|
|||||||
await this.scheduleFullCalendarEventListFetch(calendarChannelId);
|
await this.scheduleFullCalendarEventListFetch(calendarChannelId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async resetSyncStageStartedAt(calendarChannelId: string) {
|
||||||
|
const calendarChannelRepository =
|
||||||
|
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||||
|
'calendarChannel',
|
||||||
|
);
|
||||||
|
|
||||||
|
await calendarChannelRepository.update(calendarChannelId, {
|
||||||
|
syncStageStartedAt: null,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public async scheduleCalendarEventsImport(calendarChannelId: string) {
|
public async scheduleCalendarEventsImport(calendarChannelId: string) {
|
||||||
const calendarChannelRepository =
|
const calendarChannelRepository =
|
||||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||||
|
|||||||
@ -0,0 +1,13 @@
|
|||||||
|
import { CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/calendar/calendar-event-import-manager/constants/calendar-import-ongoing-sync-timeout.constant';
|
||||||
|
|
||||||
|
export const isSyncStale = (syncStageStartedAt: string): boolean => {
|
||||||
|
const syncStageStartedTime = new Date(syncStageStartedAt).getTime();
|
||||||
|
|
||||||
|
if (isNaN(syncStageStartedTime)) {
|
||||||
|
throw new Error('Invalid date format');
|
||||||
|
}
|
||||||
|
|
||||||
|
return (
|
||||||
|
Date.now() - syncStageStartedTime > CALENDAR_IMPORT_ONGOING_SYNC_TIMEOUT
|
||||||
|
);
|
||||||
|
};
|
||||||
@ -93,6 +93,20 @@ export class MessageChannelSyncStatusService {
|
|||||||
await this.scheduleFullMessageListFetch(messageChannelId);
|
await this.scheduleFullMessageListFetch(messageChannelId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async resetSyncStageStartedAt(messageChannelId: string) {
|
||||||
|
const messageChannelRepository =
|
||||||
|
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||||
|
'messageChannel',
|
||||||
|
);
|
||||||
|
|
||||||
|
await messageChannelRepository.update(
|
||||||
|
{ id: messageChannelId },
|
||||||
|
{
|
||||||
|
syncStageStartedAt: null,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
public async markAsMessagesListFetchOngoing(messageChannelId: string) {
|
public async markAsMessagesListFetchOngoing(messageChannelId: string) {
|
||||||
const messageChannelRepository =
|
const messageChannelRepository =
|
||||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||||
|
|||||||
@ -55,6 +55,10 @@ export class MessagingOngoingStaleJob {
|
|||||||
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
|
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
await this.messagingChannelSyncStatusService.resetSyncStageStartedAt(
|
||||||
|
messageChannel.id,
|
||||||
|
);
|
||||||
|
|
||||||
switch (messageChannel.syncStage) {
|
switch (messageChannel.syncStage) {
|
||||||
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING:
|
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING:
|
||||||
await this.messageChannelSyncStatusService.schedulePartialMessageListFetch(
|
await this.messageChannelSyncStatusService.schedulePartialMessageListFetch(
|
||||||
|
|||||||
Reference in New Issue
Block a user