From e799c84233d5fe6318d4b5060bdb5b69608950c9 Mon Sep 17 00:00:00 2001 From: Weiko Date: Tue, 19 Dec 2023 13:30:40 +0100 Subject: [PATCH] Add sync driver for queue messages (#3070) * Add sync driver for queue messages * rename moduleRef * use switch instead --- .../environment/environment.service.ts | 2 +- .../message-queue-driver.interface.ts | 5 +++-- .../message-queue/drivers/sync.driver.ts | 21 ++++++++++++++++++ .../interfaces/message-queue.interface.ts | 9 +++++++- .../integrations/message-queue/jobs.module.ts | 19 ++++++++++++++++ .../message-queue.module-factory.ts | 6 +++++ .../message-queue/message-queue.module.ts | 22 +++++++++++++------ .../utils/get-job-class-name.util.ts | 5 +++++ .../twenty-server/src/queue-worker.module.ts | 8 ++----- packages/twenty-server/src/queue-worker.ts | 9 +++----- 10 files changed, 83 insertions(+), 23 deletions(-) create mode 100644 packages/twenty-server/src/integrations/message-queue/drivers/sync.driver.ts create mode 100644 packages/twenty-server/src/integrations/message-queue/jobs.module.ts create mode 100644 packages/twenty-server/src/integrations/message-queue/utils/get-job-class-name.util.ts diff --git a/packages/twenty-server/src/integrations/environment/environment.service.ts b/packages/twenty-server/src/integrations/environment/environment.service.ts index ce3a3f2f3..8f31defd8 100644 --- a/packages/twenty-server/src/integrations/environment/environment.service.ts +++ b/packages/twenty-server/src/integrations/environment/environment.service.ts @@ -148,7 +148,7 @@ export class EnvironmentService { getMessageQueueDriverType(): MessageQueueDriverType { return ( this.configService.get('MESSAGE_QUEUE_TYPE') ?? - MessageQueueDriverType.PgBoss + MessageQueueDriverType.Sync ); } diff --git a/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts b/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts index 59c21c36d..0f710de8b 100644 --- a/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts +++ b/packages/twenty-server/src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface.ts @@ -1,15 +1,16 @@ import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface'; +import { MessageQueueJobData } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; export interface MessageQueueDriver { - add( + add( queueName: MessageQueue, jobName: string, data: T, options?: QueueJobOptions, ): Promise; - work( + work( queueName: MessageQueue, handler: ({ data, id }: { data: T; id: string }) => Promise | void, ); diff --git a/packages/twenty-server/src/integrations/message-queue/drivers/sync.driver.ts b/packages/twenty-server/src/integrations/message-queue/drivers/sync.driver.ts new file mode 100644 index 000000000..c81e47290 --- /dev/null +++ b/packages/twenty-server/src/integrations/message-queue/drivers/sync.driver.ts @@ -0,0 +1,21 @@ +import { ModuleRef } from "@nestjs/core"; +import { QueueJobOptions } from "src/integrations/message-queue/drivers/interfaces/job-options.interface"; +import { MessageQueueDriver } from "src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface"; +import { MessageQueueJob, MessageQueueJobData } from "src/integrations/message-queue/interfaces/message-queue-job.interface"; +import { MessageQueue } from "src/integrations/message-queue/message-queue.constants"; +import { MessageQueueModule } from "src/integrations/message-queue/message-queue.module"; +import { getJobClassName } from "src/integrations/message-queue/utils/get-job-class-name.util"; +import { QueueWorkerModule } from "src/queue-worker.module"; + +export class SyncDriver implements MessageQueueDriver { + constructor(private readonly jobsModuleRef: ModuleRef) {} + async add(_queueName: MessageQueue, jobName: string, data: T, _options?: QueueJobOptions | undefined): Promise { + const jobClassName = getJobClassName(jobName); + const job: MessageQueueJob = this.jobsModuleRef.get(jobClassName, { strict: true }); + + return await job.handle(data); + } + work(queueName: MessageQueue, handler: ({ data, id }: { data: T; id: string; }) => void | Promise) { + return; + } +} \ No newline at end of file diff --git a/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue.interface.ts b/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue.interface.ts index 4ebe536db..91d910f6a 100644 --- a/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue.interface.ts +++ b/packages/twenty-server/src/integrations/message-queue/interfaces/message-queue.interface.ts @@ -6,6 +6,7 @@ import { PgBossDriverOptions } from 'src/integrations/message-queue/drivers/pg-b export enum MessageQueueDriverType { PgBoss = 'pg-boss', BullMQ = 'bull-mq', + Sync = 'sync', } export interface PgBossDriverFactoryOptions { @@ -18,9 +19,15 @@ export interface BullMQDriverFactoryOptions { options: BullMQDriverOptions; } +export interface SyncDriverFactoryOptions { + type: MessageQueueDriverType.Sync; + options: Record; +} + export type MessageQueueModuleOptions = | PgBossDriverFactoryOptions - | BullMQDriverFactoryOptions; + | BullMQDriverFactoryOptions + | SyncDriverFactoryOptions; export type MessageQueueModuleAsyncOptions = { useFactory: ( diff --git a/packages/twenty-server/src/integrations/message-queue/jobs.module.ts b/packages/twenty-server/src/integrations/message-queue/jobs.module.ts new file mode 100644 index 000000000..4e7d727d6 --- /dev/null +++ b/packages/twenty-server/src/integrations/message-queue/jobs.module.ts @@ -0,0 +1,19 @@ +import { Module } from "@nestjs/common"; +import { ModuleRef } from "@nestjs/core"; +import { FetchMessagesJob } from "src/workspace/messaging/jobs/fetch-messages.job"; + +@Module({ + providers: [ + { + provide: FetchMessagesJob.name, + useClass: FetchMessagesJob, + }, + ], +}) +export class JobsModule { + static moduleRef: ModuleRef; + + constructor(private moduleRef: ModuleRef) { + JobsModule.moduleRef = this.moduleRef; + } +} diff --git a/packages/twenty-server/src/integrations/message-queue/message-queue.module-factory.ts b/packages/twenty-server/src/integrations/message-queue/message-queue.module-factory.ts index d98d38a89..689368342 100644 --- a/packages/twenty-server/src/integrations/message-queue/message-queue.module-factory.ts +++ b/packages/twenty-server/src/integrations/message-queue/message-queue.module-factory.ts @@ -15,6 +15,12 @@ export const messageQueueModuleFactory = async ( const driverType = environmentService.getMessageQueueDriverType(); switch (driverType) { + case MessageQueueDriverType.Sync: { + return { + type: MessageQueueDriverType.Sync, + options: {}, + }; + } case MessageQueueDriverType.PgBoss: { const connectionString = environmentService.getPGDatabaseUrl(); diff --git a/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts b/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts index 5a7f9c74a..80afdea7e 100644 --- a/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts +++ b/packages/twenty-server/src/integrations/message-queue/message-queue.module.ts @@ -13,6 +13,11 @@ import { import { PgBossDriver } from 'src/integrations/message-queue/drivers/pg-boss.driver'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; import { BullMQDriver } from 'src/integrations/message-queue/drivers/bullmq.driver'; +import { FetchMessagesJob } from 'src/workspace/messaging/jobs/fetch-messages.job'; +import { SyncDriver } from 'src/integrations/message-queue/drivers/sync.driver'; +import { ModuleRef } from '@nestjs/core'; +import { AppModule } from 'src/app.module'; +import { JobsModule } from 'src/integrations/message-queue/jobs.module'; @Global() export class MessageQueueModule { @@ -30,15 +35,18 @@ export class MessageQueueModule { useFactory: async (...args: any[]) => { const config = await options.useFactory(...args); - if (config.type === MessageQueueDriverType.PgBoss) { - const boss = new PgBossDriver(config.options); + switch (config.type) { + case MessageQueueDriverType.PgBoss: + const boss = new PgBossDriver(config.options); + await boss.init(); + return boss; - await boss.init(); + case MessageQueueDriverType.BullMQ: + return new BullMQDriver(config.options); - return boss; + default: + return new SyncDriver(JobsModule.moduleRef); } - - return new BullMQDriver(config.options); }, inject: options.inject || [], }, @@ -46,7 +54,7 @@ export class MessageQueueModule { return { module: MessageQueueModule, - imports: options.imports || [], + imports: [JobsModule, ...(options.imports || [])], providers, exports: [MessageQueue.taskAssignedQueue, MessageQueue.messagingQueue], }; diff --git a/packages/twenty-server/src/integrations/message-queue/utils/get-job-class-name.util.ts b/packages/twenty-server/src/integrations/message-queue/utils/get-job-class-name.util.ts new file mode 100644 index 000000000..a5fb66c62 --- /dev/null +++ b/packages/twenty-server/src/integrations/message-queue/utils/get-job-class-name.util.ts @@ -0,0 +1,5 @@ +export function getJobClassName(name: string): string { + const [, jobName] = name.split('.') ?? []; + + return jobName ?? name; +} \ No newline at end of file diff --git a/packages/twenty-server/src/queue-worker.module.ts b/packages/twenty-server/src/queue-worker.module.ts index 6c7018e48..35f18d29d 100644 --- a/packages/twenty-server/src/queue-worker.module.ts +++ b/packages/twenty-server/src/queue-worker.module.ts @@ -4,6 +4,7 @@ import { EnvironmentModule } from 'src/integrations/environment/environment.modu import { EnvironmentService } from 'src/integrations/environment/environment.service'; import { LoggerModule } from 'src/integrations/logger/logger.module'; import { loggerModuleFactory } from 'src/integrations/logger/logger.module-factory'; +import { JobsModule } from 'src/integrations/message-queue/jobs.module'; import { MessageQueueModule } from 'src/integrations/message-queue/message-queue.module'; import { messageQueueModuleFactory } from 'src/integrations/message-queue/message-queue.module-factory'; import { FetchMessagesJob } from 'src/workspace/messaging/jobs/fetch-messages.job'; @@ -19,12 +20,7 @@ import { FetchMessagesJob } from 'src/workspace/messaging/jobs/fetch-messages.jo useFactory: messageQueueModuleFactory, inject: [EnvironmentService], }), - ], - providers: [ - { - provide: FetchMessagesJob.name, - useClass: FetchMessagesJob, - }, + JobsModule, ], }) export class QueueWorkerModule {} diff --git a/packages/twenty-server/src/queue-worker.ts b/packages/twenty-server/src/queue-worker.ts index 50e1e69ea..4d327c29b 100644 --- a/packages/twenty-server/src/queue-worker.ts +++ b/packages/twenty-server/src/queue-worker.ts @@ -4,9 +4,11 @@ import { MessageQueueJob, MessageQueueJobData, } from 'src/integrations/message-queue/interfaces/message-queue-job.interface'; +import { JobsModule } from 'src/integrations/message-queue/jobs.module'; import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service'; +import { getJobClassName } from 'src/integrations/message-queue/utils/get-job-class-name.util'; import { QueueWorkerModule } from 'src/queue-worker.module'; async function bootstrap() { @@ -18,7 +20,7 @@ async function bootstrap() { await messageQueueService.work(async (jobData: MessageQueueJobData) => { const jobClassName = getJobClassName(jobData.name); const job: MessageQueueJob = app - .select(QueueWorkerModule) + .select(JobsModule) .get(jobClassName, { strict: true }); await job.handle(jobData.data); @@ -27,8 +29,3 @@ async function bootstrap() { } bootstrap(); -function getJobClassName(name: string): string { - const [, jobName] = name.split('.') ?? []; - - return jobName ?? name; -}