Add sync driver for queue messages (#3070)

* Add sync driver for queue messages

* rename moduleRef

* use switch instead
This commit is contained in:
Weiko
2023-12-19 13:30:40 +01:00
committed by GitHub
parent fff51a2d91
commit e799c84233
10 changed files with 83 additions and 23 deletions

View File

@ -148,7 +148,7 @@ export class EnvironmentService {
getMessageQueueDriverType(): MessageQueueDriverType {
return (
this.configService.get<MessageQueueDriverType>('MESSAGE_QUEUE_TYPE') ??
MessageQueueDriverType.PgBoss
MessageQueueDriverType.Sync
);
}

View File

@ -1,15 +1,16 @@
import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';
import { MessageQueueJobData } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
export interface MessageQueueDriver {
add<T>(
add<T extends MessageQueueJobData>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void>;
work<T>(
work<T extends MessageQueueJobData>(
queueName: MessageQueue,
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
);

View File

@ -0,0 +1,21 @@
import { ModuleRef } from "@nestjs/core";
import { QueueJobOptions } from "src/integrations/message-queue/drivers/interfaces/job-options.interface";
import { MessageQueueDriver } from "src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface";
import { MessageQueueJob, MessageQueueJobData } from "src/integrations/message-queue/interfaces/message-queue-job.interface";
import { MessageQueue } from "src/integrations/message-queue/message-queue.constants";
import { MessageQueueModule } from "src/integrations/message-queue/message-queue.module";
import { getJobClassName } from "src/integrations/message-queue/utils/get-job-class-name.util";
import { QueueWorkerModule } from "src/queue-worker.module";
export class SyncDriver implements MessageQueueDriver {
constructor(private readonly jobsModuleRef: ModuleRef) {}
async add<T extends MessageQueueJobData>(_queueName: MessageQueue, jobName: string, data: T, _options?: QueueJobOptions | undefined): Promise<void> {
const jobClassName = getJobClassName(jobName);
const job: MessageQueueJob<MessageQueueJobData> = this.jobsModuleRef.get(jobClassName, { strict: true });
return await job.handle(data);
}
work<T>(queueName: MessageQueue, handler: ({ data, id }: { data: T; id: string; }) => void | Promise<void>) {
return;
}
}

View File

@ -6,6 +6,7 @@ import { PgBossDriverOptions } from 'src/integrations/message-queue/drivers/pg-b
export enum MessageQueueDriverType {
PgBoss = 'pg-boss',
BullMQ = 'bull-mq',
Sync = 'sync',
}
export interface PgBossDriverFactoryOptions {
@ -18,9 +19,15 @@ export interface BullMQDriverFactoryOptions {
options: BullMQDriverOptions;
}
export interface SyncDriverFactoryOptions {
type: MessageQueueDriverType.Sync;
options: Record<string, any>;
}
export type MessageQueueModuleOptions =
| PgBossDriverFactoryOptions
| BullMQDriverFactoryOptions;
| BullMQDriverFactoryOptions
| SyncDriverFactoryOptions;
export type MessageQueueModuleAsyncOptions = {
useFactory: (

View File

@ -0,0 +1,19 @@
import { Module } from "@nestjs/common";
import { ModuleRef } from "@nestjs/core";
import { FetchMessagesJob } from "src/workspace/messaging/jobs/fetch-messages.job";
@Module({
providers: [
{
provide: FetchMessagesJob.name,
useClass: FetchMessagesJob,
},
],
})
export class JobsModule {
static moduleRef: ModuleRef;
constructor(private moduleRef: ModuleRef) {
JobsModule.moduleRef = this.moduleRef;
}
}

View File

@ -15,6 +15,12 @@ export const messageQueueModuleFactory = async (
const driverType = environmentService.getMessageQueueDriverType();
switch (driverType) {
case MessageQueueDriverType.Sync: {
return {
type: MessageQueueDriverType.Sync,
options: {},
};
}
case MessageQueueDriverType.PgBoss: {
const connectionString = environmentService.getPGDatabaseUrl();

View File

@ -13,6 +13,11 @@ import {
import { PgBossDriver } from 'src/integrations/message-queue/drivers/pg-boss.driver';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { BullMQDriver } from 'src/integrations/message-queue/drivers/bullmq.driver';
import { FetchMessagesJob } from 'src/workspace/messaging/jobs/fetch-messages.job';
import { SyncDriver } from 'src/integrations/message-queue/drivers/sync.driver';
import { ModuleRef } from '@nestjs/core';
import { AppModule } from 'src/app.module';
import { JobsModule } from 'src/integrations/message-queue/jobs.module';
@Global()
export class MessageQueueModule {
@ -30,15 +35,18 @@ export class MessageQueueModule {
useFactory: async (...args: any[]) => {
const config = await options.useFactory(...args);
if (config.type === MessageQueueDriverType.PgBoss) {
const boss = new PgBossDriver(config.options);
switch (config.type) {
case MessageQueueDriverType.PgBoss:
const boss = new PgBossDriver(config.options);
await boss.init();
return boss;
await boss.init();
case MessageQueueDriverType.BullMQ:
return new BullMQDriver(config.options);
return boss;
default:
return new SyncDriver(JobsModule.moduleRef);
}
return new BullMQDriver(config.options);
},
inject: options.inject || [],
},
@ -46,7 +54,7 @@ export class MessageQueueModule {
return {
module: MessageQueueModule,
imports: options.imports || [],
imports: [JobsModule, ...(options.imports || [])],
providers,
exports: [MessageQueue.taskAssignedQueue, MessageQueue.messagingQueue],
};

View File

@ -0,0 +1,5 @@
export function getJobClassName(name: string): string {
const [, jobName] = name.split('.') ?? [];
return jobName ?? name;
}

View File

@ -4,6 +4,7 @@ import { EnvironmentModule } from 'src/integrations/environment/environment.modu
import { EnvironmentService } from 'src/integrations/environment/environment.service';
import { LoggerModule } from 'src/integrations/logger/logger.module';
import { loggerModuleFactory } from 'src/integrations/logger/logger.module-factory';
import { JobsModule } from 'src/integrations/message-queue/jobs.module';
import { MessageQueueModule } from 'src/integrations/message-queue/message-queue.module';
import { messageQueueModuleFactory } from 'src/integrations/message-queue/message-queue.module-factory';
import { FetchMessagesJob } from 'src/workspace/messaging/jobs/fetch-messages.job';
@ -19,12 +20,7 @@ import { FetchMessagesJob } from 'src/workspace/messaging/jobs/fetch-messages.jo
useFactory: messageQueueModuleFactory,
inject: [EnvironmentService],
}),
],
providers: [
{
provide: FetchMessagesJob.name,
useClass: FetchMessagesJob,
},
JobsModule,
],
})
export class QueueWorkerModule {}

View File

@ -4,9 +4,11 @@ import {
MessageQueueJob,
MessageQueueJobData,
} from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { JobsModule } from 'src/integrations/message-queue/jobs.module';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { getJobClassName } from 'src/integrations/message-queue/utils/get-job-class-name.util';
import { QueueWorkerModule } from 'src/queue-worker.module';
async function bootstrap() {
@ -18,7 +20,7 @@ async function bootstrap() {
await messageQueueService.work(async (jobData: MessageQueueJobData) => {
const jobClassName = getJobClassName(jobData.name);
const job: MessageQueueJob<MessageQueueJobData> = app
.select(QueueWorkerModule)
.select(JobsModule)
.get(jobClassName, { strict: true });
await job.handle(jobData.data);
@ -27,8 +29,3 @@ async function bootstrap() {
}
bootstrap();
function getJobClassName(name: string): string {
const [, jobName] = name.split('.') ?? [];
return jobName ?? name;
}