From 361446d79ce0b50691d2a82744beae51f4aca1c1 Mon Sep 17 00:00:00 2001 From: martmull Date: Tue, 9 Jan 2024 12:23:45 +0100 Subject: [PATCH] Add cron mechanism (#3318) * Add cron to message queue interfaces * Add command to launch cron job * Add command to stop cron job * Update clean inactive workspaces job * Isolate cron mechanism * Code review returns * Remove useless object.assign * Add MessageQueuCronJobData interface * Rename cron job utils * Fix typing --- .../message-queue/drivers/bullmq.driver.ts | 40 +++++++++++++++++-- .../message-queue-driver.interface.ts | 8 ++++ .../message-queue/drivers/pg-boss.driver.ts | 24 +++++++++++ .../message-queue/drivers/sync.driver.ts | 28 ++++++++++++- .../interfaces/message-queue-job.interface.ts | 6 +++ .../integrations/message-queue/jobs.module.ts | 2 + .../message-queue/message-queue.constants.ts | 1 + .../message-queue/message-queue.module.ts | 8 +--- .../services/message-queue.service.ts | 13 ++++++ 9 files changed, 119 insertions(+), 11 deletions(-) diff --git a/packages/twenty-server/src/integrations/message-queue/drivers/bullmq.driver.ts b/packages/twenty-server/src/integrations/message-queue/drivers/bullmq.driver.ts index 56313e6e9..f1106b760 100644 --- a/packages/twenty-server/src/integrations/message-queue/drivers/bullmq.driver.ts +++ b/packages/twenty-server/src/integrations/message-queue/drivers/bullmq.driver.ts @@ -49,6 +49,39 @@ export class BullMQDriver implements MessageQueueDriver { this.workerMap[queueName] = worker; } + async addCron( + queueName: MessageQueue, + jobName: string, + data: T, + pattern: string, + options?: QueueJobOptions, + ): Promise { + if (!this.queueMap[queueName]) { + throw new Error( + `Queue ${queueName} is not registered, make sure you have added it as a queue provider`, + ); + } + const queueOptions = { + jobId: options?.id, + priority: options?.priority, + repeat: { + pattern, + }, + }; + + await this.queueMap[queueName].add(jobName, data, queueOptions); + } + + async removeCron( + queueName: MessageQueue, + jobName: string, + pattern: string, + ): Promise { + await this.queueMap[queueName].removeRepeatable(jobName, { + pattern, + }); + } + async add( queueName: MessageQueue, jobName: string, @@ -60,9 +93,8 @@ export class BullMQDriver implements MessageQueueDriver { `Queue ${queueName} is not registered, make sure you have added it as a queue provider`, ); } - await this.queueMap[queueName].add(jobName, data, { - jobId: options?.id, - priority: options?.priority, - }); + const queueOptions = { jobId: options?.id, priority: options?.priority }; + + await this.queueMap[queueName].add(jobName, data, queueOptions); } } diff --git a/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts b/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts index 0f710de8b..a0136672a 100644 --- a/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts +++ b/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts @@ -14,6 +14,14 @@ export interface MessageQueueDriver { queueName: MessageQueue, handler: ({ data, id }: { data: T; id: string }) => Promise | void, ); + addCron( + queueName: MessageQueue, + jobName: string, + data: T, + pattern: string, + options?: QueueJobOptions, + ); + removeCron(queueName: MessageQueue, jobName: string, pattern?: string); stop?(): Promise; register?(queueName: MessageQueue): void; } diff --git a/packages/twenty-server/src/integrations/message-queue/drivers/pg-boss.driver.ts b/packages/twenty-server/src/integrations/message-queue/drivers/pg-boss.driver.ts index f74649bf0..fc306b6f4 100644 --- a/packages/twenty-server/src/integrations/message-queue/drivers/pg-boss.driver.ts +++ b/packages/twenty-server/src/integrations/message-queue/drivers/pg-boss.driver.ts @@ -30,6 +30,30 @@ export class PgBossDriver implements MessageQueueDriver { return this.pgBoss.work(`${queueName}.*`, handler); } + async addCron( + queueName: MessageQueue, + jobName: string, + data: T, + pattern: string, + options?: QueueJobOptions, + ): Promise { + await this.pgBoss.schedule( + `${queueName}.${jobName}`, + pattern, + data as object, + options + ? { + ...options, + singletonKey: options?.id, + } + : {}, + ); + } + + async removeCron(queueName: MessageQueue, jobName: string): Promise { + await this.pgBoss.unschedule(`${queueName}.${jobName}`); + } + async add( queueName: MessageQueue, jobName: string, diff --git a/packages/twenty-server/src/integrations/message-queue/drivers/sync.driver.ts b/packages/twenty-server/src/integrations/message-queue/drivers/sync.driver.ts index 9c0bb934f..03515a103 100644 --- a/packages/twenty-server/src/integrations/message-queue/drivers/sync.driver.ts +++ b/packages/twenty-server/src/integrations/message-queue/drivers/sync.driver.ts @@ -1,7 +1,9 @@ import { ModuleRef } from '@nestjs/core'; +import { Logger } from '@nestjs/common'; import { MessageQueueDriver } from 'src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface'; import { + MessageQueueCronJobData, MessageQueueJob, MessageQueueJobData, } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; @@ -10,6 +12,7 @@ import { MessageQueue } from 'src/integrations/message-queue/message-queue.const import { getJobClassName } from 'src/integrations/message-queue/utils/get-job-class-name.util'; export class SyncDriver implements MessageQueueDriver { + private readonly logger = new Logger(SyncDriver.name); constructor(private readonly jobsModuleRef: ModuleRef) {} async add( @@ -23,7 +26,30 @@ export class SyncDriver implements MessageQueueDriver { { strict: true }, ); - return await job.handle(data); + await job.handle(data); + } + + async addCron( + _queueName: MessageQueue, + jobName: string, + data: T, + pattern: string, + ): Promise { + this.logger.log(`Running '${pattern}' cron job with SyncDriver`); + + const jobClassName = getJobClassName(jobName); + const job: MessageQueueCronJobData = + this.jobsModuleRef.get(jobClassName, { + strict: true, + }); + + await job.handle(data); + } + + async removeCron(_queueName: MessageQueue, jobName: string) { + this.logger.log(`Removing '${jobName}' cron job with SyncDriver`); + + return; } work() { diff --git a/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue-job.interface.ts b/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue-job.interface.ts index 9328cb1bd..539c2d2ff 100644 --- a/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue-job.interface.ts +++ b/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue-job.interface.ts @@ -2,6 +2,12 @@ export interface MessageQueueJob { handle(data: T): Promise | void; } +export interface MessageQueueCronJobData< + T extends MessageQueueJobData | undefined, +> { + handle(data: T): Promise | void; +} + export interface MessageQueueJobData { [key: string]: any; } diff --git a/packages/twenty-server/src/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/integrations/message-queue/jobs.module.ts index 10ce3c776..345c4a418 100644 --- a/packages/twenty-server/src/integrations/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/integrations/message-queue/jobs.module.ts @@ -8,6 +8,7 @@ import { CallWebhookJob } from 'src/workspace/workspace-query-runner/jobs/call-w import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; import { ObjectMetadataModule } from 'src/metadata/object-metadata/object-metadata.module'; import { DataSourceModule } from 'src/metadata/data-source/data-source.module'; +import { TypeORMModule } from 'src/database/typeorm/typeorm.module'; import { FetchWorkspaceMessagesModule } from 'src/workspace/messaging/services/fetch-workspace-messages.module'; @Module({ @@ -16,6 +17,7 @@ import { FetchWorkspaceMessagesModule } from 'src/workspace/messaging/services/f ObjectMetadataModule, DataSourceModule, HttpModule, + TypeORMModule, FetchWorkspaceMessagesModule, ], providers: [ diff --git a/packages/twenty-server/src/integrations/message-queue/message-queue.constants.ts b/packages/twenty-server/src/integrations/message-queue/message-queue.constants.ts index 47d2bad68..925407e98 100644 --- a/packages/twenty-server/src/integrations/message-queue/message-queue.constants.ts +++ b/packages/twenty-server/src/integrations/message-queue/message-queue.constants.ts @@ -4,4 +4,5 @@ export enum MessageQueue { taskAssignedQueue = 'task-assigned-queue', messagingQueue = 'messaging-queue', webhookQueue = 'webhook-queue', + cronQueue = 'cron-queue', } diff --git a/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts b/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts index dfbdcb1ee..29f86895b 100644 --- a/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts +++ b/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts @@ -7,8 +7,8 @@ import { MessageQueueModuleAsyncOptions, } from 'src/integrations/message-queue/interfaces'; import { - QUEUE_DRIVER, MessageQueue, + QUEUE_DRIVER, } from 'src/integrations/message-queue/message-queue.constants'; import { PgBossDriver } from 'src/integrations/message-queue/drivers/pg-boss.driver'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; @@ -55,11 +55,7 @@ export class MessageQueueModule { module: MessageQueueModule, imports: [JobsModule, ...(options.imports || [])], providers, - exports: [ - MessageQueue.taskAssignedQueue, - MessageQueue.messagingQueue, - MessageQueue.webhookQueue, - ], + exports: Object.values(MessageQueue), }; } } diff --git a/packages/twenty-server/src/integrations/message-queue/services/message-queue.service.ts b/packages/twenty-server/src/integrations/message-queue/services/message-queue.service.ts index 4c5402f5c..44c2d5121 100644 --- a/packages/twenty-server/src/integrations/message-queue/services/message-queue.service.ts +++ b/packages/twenty-server/src/integrations/message-queue/services/message-queue.service.ts @@ -34,6 +34,19 @@ export class MessageQueueService implements OnModuleDestroy { return this.driver.add(this.queueName, jobName, data, options); } + addCron( + jobName: string, + data: T, + pattern: string, + options?: QueueJobOptions, + ): Promise { + return this.driver.addCron(this.queueName, jobName, data, pattern, options); + } + + removeCron(jobName: string, pattern: string): Promise { + return this.driver.removeCron(this.queueName, jobName, pattern); + } + work( handler: ({ data, id }: { data: T; id: string }) => Promise | void, ) {