@ -0,0 +1,32 @@
|
|||||||
|
import { Command, CommandRunner } from 'nest-commander';
|
||||||
|
|
||||||
|
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
|
||||||
|
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||||
|
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||||
|
import { MessagingOngoingStaleCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job';
|
||||||
|
|
||||||
|
const MESSAGING_ONGOING_STALE_CRON_PATTERN = '0 * * * *';
|
||||||
|
|
||||||
|
@Command({
|
||||||
|
name: 'cron:messaging:ongoing-stale',
|
||||||
|
description:
|
||||||
|
'Starts a cron job to check for stale ongoing message imports and put them back to pending',
|
||||||
|
})
|
||||||
|
export class MessagingOngoingStaleCronCommand extends CommandRunner {
|
||||||
|
constructor(
|
||||||
|
@InjectMessageQueue(MessageQueue.cronQueue)
|
||||||
|
private readonly messageQueueService: MessageQueueService,
|
||||||
|
) {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
async run(): Promise<void> {
|
||||||
|
await this.messageQueueService.addCron<undefined>(
|
||||||
|
MessagingOngoingStaleCronJob.name,
|
||||||
|
undefined,
|
||||||
|
{
|
||||||
|
repeat: { pattern: MESSAGING_ONGOING_STALE_CRON_PATTERN },
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,4 +1,4 @@
|
|||||||
import { Inject, Injectable, Logger } from '@nestjs/common';
|
import { Logger } from '@nestjs/common';
|
||||||
import { InjectRepository } from '@nestjs/typeorm';
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
|
|
||||||
import { Repository, In } from 'typeorm';
|
import { Repository, In } from 'typeorm';
|
||||||
|
|||||||
@ -0,0 +1,62 @@
|
|||||||
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
|
|
||||||
|
import { Repository, In } from 'typeorm';
|
||||||
|
|
||||||
|
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||||
|
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
|
||||||
|
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
|
||||||
|
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||||
|
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
|
||||||
|
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
|
||||||
|
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
|
||||||
|
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||||
|
import {
|
||||||
|
MessagingOngoingStaleJobData,
|
||||||
|
MessagingOngoingStaleJob,
|
||||||
|
} from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';
|
||||||
|
|
||||||
|
@Processor(MessageQueue.cronQueue)
|
||||||
|
export class MessagingOngoingStaleCronJob {
|
||||||
|
constructor(
|
||||||
|
@InjectRepository(Workspace, 'core')
|
||||||
|
private readonly workspaceRepository: Repository<Workspace>,
|
||||||
|
@InjectRepository(DataSourceEntity, 'metadata')
|
||||||
|
private readonly dataSourceRepository: Repository<DataSourceEntity>,
|
||||||
|
private readonly environmentService: EnvironmentService,
|
||||||
|
@InjectMessageQueue(MessageQueue.messagingQueue)
|
||||||
|
private readonly messageQueueService: MessageQueueService,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@Process(MessagingOngoingStaleCronJob.name)
|
||||||
|
async handle(): Promise<void> {
|
||||||
|
const workspaceIds = (
|
||||||
|
await this.workspaceRepository.find({
|
||||||
|
where: this.environmentService.get('IS_BILLING_ENABLED')
|
||||||
|
? {
|
||||||
|
subscriptionStatus: In(['active', 'trialing', 'past_due']),
|
||||||
|
}
|
||||||
|
: {},
|
||||||
|
select: ['id'],
|
||||||
|
})
|
||||||
|
).map((workspace) => workspace.id);
|
||||||
|
|
||||||
|
const dataSources = await this.dataSourceRepository.find({
|
||||||
|
where: {
|
||||||
|
workspaceId: In(workspaceIds),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const workspaceIdsWithDataSources = new Set(
|
||||||
|
dataSources.map((dataSource) => dataSource.workspaceId),
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const workspaceId of workspaceIdsWithDataSources) {
|
||||||
|
await this.messageQueueService.add<MessagingOngoingStaleJobData>(
|
||||||
|
MessagingOngoingStaleJob.name,
|
||||||
|
{
|
||||||
|
workspaceId,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,74 @@
|
|||||||
|
import { Logger, Scope } from '@nestjs/common';
|
||||||
|
|
||||||
|
import { In } from 'typeorm';
|
||||||
|
|
||||||
|
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
|
||||||
|
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
|
||||||
|
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||||
|
import {
|
||||||
|
MessageChannelSyncStage,
|
||||||
|
MessageChannelWorkspaceEntity,
|
||||||
|
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||||
|
import { isSyncStale } from 'src/modules/messaging/message-import-manager/utils/is-sync-stale.util';
|
||||||
|
import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository';
|
||||||
|
import { InjectWorkspaceRepository } from 'src/engine/twenty-orm/decorators/inject-workspace-repository.decorator';
|
||||||
|
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
|
||||||
|
|
||||||
|
export type MessagingOngoingStaleJobData = {
|
||||||
|
workspaceId: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
@Processor({
|
||||||
|
queueName: MessageQueue.messagingQueue,
|
||||||
|
scope: Scope.REQUEST,
|
||||||
|
})
|
||||||
|
export class MessagingOngoingStaleJob {
|
||||||
|
private readonly logger = new Logger(MessagingOngoingStaleJob.name);
|
||||||
|
constructor(
|
||||||
|
@InjectWorkspaceRepository(MessageChannelWorkspaceEntity)
|
||||||
|
private readonly messageChannelRepository: WorkspaceRepository<MessageChannelWorkspaceEntity>,
|
||||||
|
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@Process(MessagingOngoingStaleJob.name)
|
||||||
|
async handle(data: MessagingOngoingStaleJobData): Promise<void> {
|
||||||
|
const { workspaceId } = data;
|
||||||
|
|
||||||
|
const messageChannels = await this.messageChannelRepository.find({
|
||||||
|
where: {
|
||||||
|
syncStage: In([
|
||||||
|
MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING,
|
||||||
|
MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
|
||||||
|
]),
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const messageChannel of messageChannels) {
|
||||||
|
if (
|
||||||
|
messageChannel.syncStageStartedAt &&
|
||||||
|
isSyncStale(messageChannel.syncStageStartedAt)
|
||||||
|
) {
|
||||||
|
this.logger.log(
|
||||||
|
`Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`,
|
||||||
|
);
|
||||||
|
|
||||||
|
switch (messageChannel.syncStage) {
|
||||||
|
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING:
|
||||||
|
await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch(
|
||||||
|
messageChannel.id,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING:
|
||||||
|
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
|
||||||
|
messageChannel.id,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -3,16 +3,21 @@ import { TypeOrmModule } from '@nestjs/typeorm';
|
|||||||
|
|
||||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||||
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
|
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
|
||||||
|
import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module';
|
||||||
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
|
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
|
||||||
|
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||||
import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command';
|
import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command';
|
||||||
import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command';
|
import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command';
|
||||||
import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command';
|
import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command';
|
||||||
|
import { MessagingOngoingStaleCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-ongoing-stale.cron.command';
|
||||||
import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job';
|
import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job';
|
||||||
import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job';
|
import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job';
|
||||||
|
import { MessagingOngoingStaleCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job';
|
||||||
import { MessagingGmailDriverModule } from 'src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module';
|
import { MessagingGmailDriverModule } from 'src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module';
|
||||||
import { MessagingAddSingleMessageToCacheForImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job';
|
import { MessagingAddSingleMessageToCacheForImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job';
|
||||||
import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
|
import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
|
||||||
import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job';
|
import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job';
|
||||||
|
import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
@ -20,15 +25,19 @@ import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import
|
|||||||
MessagingCommonModule,
|
MessagingCommonModule,
|
||||||
TypeOrmModule.forFeature([Workspace], 'core'),
|
TypeOrmModule.forFeature([Workspace], 'core'),
|
||||||
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
|
TypeOrmModule.forFeature([DataSourceEntity], 'metadata'),
|
||||||
|
TwentyORMModule.forFeature([MessageChannelWorkspaceEntity]),
|
||||||
],
|
],
|
||||||
providers: [
|
providers: [
|
||||||
MessagingMessageListFetchCronCommand,
|
MessagingMessageListFetchCronCommand,
|
||||||
MessagingMessagesImportCronCommand,
|
MessagingMessagesImportCronCommand,
|
||||||
|
MessagingOngoingStaleCronCommand,
|
||||||
MessagingSingleMessageImportCommand,
|
MessagingSingleMessageImportCommand,
|
||||||
MessagingMessageListFetchJob,
|
MessagingMessageListFetchJob,
|
||||||
MessagingMessagesImportJob,
|
MessagingMessagesImportJob,
|
||||||
|
MessagingOngoingStaleJob,
|
||||||
MessagingMessageListFetchCronJob,
|
MessagingMessageListFetchCronJob,
|
||||||
MessagingMessagesImportCronJob,
|
MessagingMessagesImportCronJob,
|
||||||
|
MessagingOngoingStaleCronJob,
|
||||||
MessagingAddSingleMessageToCacheForImportJob,
|
MessagingAddSingleMessageToCacheForImportJob,
|
||||||
],
|
],
|
||||||
exports: [],
|
exports: [],
|
||||||
|
|||||||
@ -0,0 +1,34 @@
|
|||||||
|
import { MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/message-import-manager/constants/messaging-import-ongoing-sync-timeout.constant';
|
||||||
|
import { isSyncStale } from 'src/modules/messaging/message-import-manager/utils/is-sync-stale.util';
|
||||||
|
|
||||||
|
jest.useFakeTimers().setSystemTime(new Date('2024-01-01'));
|
||||||
|
|
||||||
|
describe('isSyncStale', () => {
|
||||||
|
it('should return true if sync is stale', () => {
|
||||||
|
const syncStageStartedAt = new Date(
|
||||||
|
Date.now() - MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT - 1,
|
||||||
|
).toISOString();
|
||||||
|
|
||||||
|
const result = isSyncStale(syncStageStartedAt);
|
||||||
|
|
||||||
|
expect(result).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return false if sync is not stale', () => {
|
||||||
|
const syncStageStartedAt = new Date(
|
||||||
|
Date.now() - MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT + 1,
|
||||||
|
).toISOString();
|
||||||
|
|
||||||
|
const result = isSyncStale(syncStageStartedAt);
|
||||||
|
|
||||||
|
expect(result).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return false if syncStageStartedAt is invalid', () => {
|
||||||
|
const syncStageStartedAt = 'invalid-date';
|
||||||
|
|
||||||
|
expect(() => {
|
||||||
|
isSyncStale(syncStageStartedAt);
|
||||||
|
}).toThrow('Invalid date format');
|
||||||
|
});
|
||||||
|
});
|
||||||
@ -0,0 +1,13 @@
|
|||||||
|
import { MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/message-import-manager/constants/messaging-import-ongoing-sync-timeout.constant';
|
||||||
|
|
||||||
|
export const isSyncStale = (syncStageStartedAt: string): boolean => {
|
||||||
|
const syncStageStartedTime = new Date(syncStageStartedAt).getTime();
|
||||||
|
|
||||||
|
if (isNaN(syncStageStartedTime)) {
|
||||||
|
throw new Error('Invalid date format');
|
||||||
|
}
|
||||||
|
|
||||||
|
return (
|
||||||
|
Date.now() - syncStageStartedTime > MESSAGING_IMPORT_ONGOING_SYNC_TIMEOUT
|
||||||
|
);
|
||||||
|
};
|
||||||
Reference in New Issue
Block a user