Migrate to a monorepo structure (#2909)
This commit is contained in:
@ -0,0 +1,62 @@
|
||||
import { Queue, QueueOptions, Worker } from 'bullmq';
|
||||
|
||||
import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';
|
||||
|
||||
import { MessageQueues } from 'src/integrations/message-queue/message-queue.constants';
|
||||
|
||||
import { MessageQueueDriver } from './interfaces/message-queue-driver.interface';
|
||||
|
||||
export type BullMQDriverOptions = QueueOptions;
|
||||
|
||||
export class BullMQDriver implements MessageQueueDriver {
|
||||
private queueMap: Record<MessageQueues, Queue> = {} as Record<
|
||||
MessageQueues,
|
||||
Queue
|
||||
>;
|
||||
private workerMap: Record<MessageQueues, Worker> = {} as Record<
|
||||
MessageQueues,
|
||||
Worker
|
||||
>;
|
||||
|
||||
constructor(private options: BullMQDriverOptions) {}
|
||||
|
||||
register(queueName: MessageQueues): void {
|
||||
this.queueMap[queueName] = new Queue(queueName, this.options);
|
||||
}
|
||||
|
||||
async stop() {
|
||||
const workers = Object.values(this.workerMap);
|
||||
const queues = Object.values(this.queueMap);
|
||||
|
||||
await Promise.all([
|
||||
...queues.map((q) => q.close()),
|
||||
...workers.map((w) => w.close()),
|
||||
]);
|
||||
}
|
||||
|
||||
async work<T>(
|
||||
queueName: MessageQueues,
|
||||
handler: ({ data, id }: { data: T; id: string }) => Promise<void>,
|
||||
) {
|
||||
const worker = new Worker(queueName, async (job) => {
|
||||
await handler(job as { data: T; id: string });
|
||||
});
|
||||
|
||||
this.workerMap[queueName] = worker;
|
||||
}
|
||||
|
||||
async add<T>(
|
||||
queueName: MessageQueues,
|
||||
data: T,
|
||||
options?: QueueJobOptions,
|
||||
): Promise<void> {
|
||||
if (!this.queueMap[queueName]) {
|
||||
throw new Error(
|
||||
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
|
||||
);
|
||||
}
|
||||
await this.queueMap[queueName].add(options?.id || '', data, {
|
||||
priority: options?.priority,
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,4 @@
|
||||
export interface QueueJobOptions {
|
||||
id?: string;
|
||||
priority?: number;
|
||||
}
|
||||
@ -0,0 +1,17 @@
|
||||
import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';
|
||||
|
||||
import { MessageQueues } from 'src/integrations/message-queue/message-queue.constants';
|
||||
|
||||
export interface MessageQueueDriver {
|
||||
add<T>(
|
||||
queueName: MessageQueues,
|
||||
data: T,
|
||||
options?: QueueJobOptions,
|
||||
): Promise<void>;
|
||||
work<T>(
|
||||
queueName: string,
|
||||
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
|
||||
);
|
||||
stop?(): Promise<void>;
|
||||
register?(queueName: MessageQueues): void;
|
||||
}
|
||||
@ -0,0 +1,38 @@
|
||||
import PgBoss from 'pg-boss';
|
||||
|
||||
import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';
|
||||
|
||||
import { MessageQueueDriver } from './interfaces/message-queue-driver.interface';
|
||||
|
||||
export type PgBossDriverOptions = PgBoss.ConstructorOptions;
|
||||
|
||||
export class PgBossDriver implements MessageQueueDriver {
|
||||
private pgBoss: PgBoss;
|
||||
|
||||
constructor(options: PgBossDriverOptions) {
|
||||
this.pgBoss = new PgBoss(options);
|
||||
}
|
||||
|
||||
async stop() {
|
||||
await this.pgBoss.stop();
|
||||
}
|
||||
|
||||
async init(): Promise<void> {
|
||||
await this.pgBoss.start();
|
||||
}
|
||||
|
||||
async work<T>(
|
||||
queueName: string,
|
||||
handler: ({ data, id }: { data: T; id: string }) => Promise<void>,
|
||||
) {
|
||||
return this.pgBoss.work(queueName, handler);
|
||||
}
|
||||
|
||||
async add<T>(
|
||||
queueName: string,
|
||||
data: T,
|
||||
options?: QueueJobOptions,
|
||||
): Promise<void> {
|
||||
await this.pgBoss.send(queueName, data as object, options ? options : {});
|
||||
}
|
||||
}
|
||||
@ -0,0 +1 @@
|
||||
export * from './message-queue.interface';
|
||||
@ -0,0 +1,30 @@
|
||||
import { FactoryProvider, ModuleMetadata } from '@nestjs/common';
|
||||
|
||||
import { BullMQDriverOptions } from 'src/integrations/message-queue/drivers/bullmq.driver';
|
||||
import { PgBossDriverOptions } from 'src/integrations/message-queue/drivers/pg-boss.driver';
|
||||
|
||||
export enum MessageQueueDriverType {
|
||||
PgBoss = 'pg-boss',
|
||||
BullMQ = 'bull-mq',
|
||||
}
|
||||
|
||||
export interface PgBossDriverFactoryOptions {
|
||||
type: MessageQueueDriverType.PgBoss;
|
||||
options: PgBossDriverOptions;
|
||||
}
|
||||
|
||||
export interface BullMQDriverFactoryOptions {
|
||||
type: MessageQueueDriverType.BullMQ;
|
||||
options: BullMQDriverOptions;
|
||||
}
|
||||
|
||||
export type MessageQueueModuleOptions =
|
||||
| PgBossDriverFactoryOptions
|
||||
| BullMQDriverFactoryOptions;
|
||||
|
||||
export type MessageQueueModuleAsyncOptions = {
|
||||
useFactory: (
|
||||
...args: any[]
|
||||
) => MessageQueueModuleOptions | Promise<MessageQueueModuleOptions>;
|
||||
} & Pick<ModuleMetadata, 'imports'> &
|
||||
Pick<FactoryProvider, 'inject'>;
|
||||
@ -0,0 +1,5 @@
|
||||
export const QUEUE_DRIVER = Symbol('QUEUE_DRIVER');
|
||||
|
||||
export enum MessageQueues {
|
||||
taskAssignedQueue = 'task-assigned-queue',
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
import { EnvironmentService } from 'src/integrations/environment/environment.service';
|
||||
import {
|
||||
MessageQueueDriverType,
|
||||
MessageQueueModuleOptions,
|
||||
} from 'src/integrations/message-queue/interfaces';
|
||||
|
||||
/**
|
||||
* MessageQueue Module factory
|
||||
* @param environment
|
||||
* @returns MessageQueueModuleOptions
|
||||
*/
|
||||
export const messageQueueModuleFactory = async (
|
||||
environmentService: EnvironmentService,
|
||||
): Promise<MessageQueueModuleOptions> => {
|
||||
const driverType = environmentService.getMessageQueueDriverType();
|
||||
|
||||
switch (driverType) {
|
||||
case MessageQueueDriverType.PgBoss: {
|
||||
const connectionString = environmentService.getPGDatabaseUrl();
|
||||
|
||||
return {
|
||||
type: MessageQueueDriverType.PgBoss,
|
||||
options: {
|
||||
connectionString,
|
||||
},
|
||||
};
|
||||
}
|
||||
case MessageQueueDriverType.BullMQ: {
|
||||
const host = environmentService.getRedisHost();
|
||||
const port = environmentService.getRedisPort();
|
||||
|
||||
return {
|
||||
type: MessageQueueDriverType.BullMQ,
|
||||
options: {
|
||||
connection: {
|
||||
host,
|
||||
port,
|
||||
},
|
||||
},
|
||||
};
|
||||
}
|
||||
default:
|
||||
throw new Error(
|
||||
`Invalid message queue driver type (${driverType}), check your .env file`,
|
||||
);
|
||||
}
|
||||
};
|
||||
@ -0,0 +1,57 @@
|
||||
import { DynamicModule, Global } from '@nestjs/common';
|
||||
|
||||
import { MessageQueueDriver } from 'src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface';
|
||||
|
||||
import {
|
||||
MessageQueueDriverType,
|
||||
MessageQueueModuleAsyncOptions,
|
||||
} from 'src/integrations/message-queue/interfaces';
|
||||
import {
|
||||
QUEUE_DRIVER,
|
||||
MessageQueues,
|
||||
} from 'src/integrations/message-queue/message-queue.constants';
|
||||
import { PgBossDriver } from 'src/integrations/message-queue/drivers/pg-boss.driver';
|
||||
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
|
||||
import { BullMQDriver } from 'src/integrations/message-queue/drivers/bullmq.driver';
|
||||
|
||||
@Global()
|
||||
export class MessageQueueModule {
|
||||
static forRoot(options: MessageQueueModuleAsyncOptions): DynamicModule {
|
||||
const providers = [
|
||||
{
|
||||
provide: MessageQueues.taskAssignedQueue,
|
||||
useFactory: (driver: MessageQueueDriver) => {
|
||||
return new MessageQueueService(
|
||||
driver,
|
||||
MessageQueues.taskAssignedQueue,
|
||||
);
|
||||
},
|
||||
inject: [QUEUE_DRIVER],
|
||||
},
|
||||
{
|
||||
provide: QUEUE_DRIVER,
|
||||
useFactory: async (...args: any[]) => {
|
||||
const config = await options.useFactory(...args);
|
||||
|
||||
if (config.type === MessageQueueDriverType.PgBoss) {
|
||||
const boss = new PgBossDriver(config.options);
|
||||
|
||||
await boss.init();
|
||||
|
||||
return boss;
|
||||
}
|
||||
|
||||
return new BullMQDriver(config.options);
|
||||
},
|
||||
inject: options.inject || [],
|
||||
},
|
||||
];
|
||||
|
||||
return {
|
||||
module: MessageQueueModule,
|
||||
imports: options.imports || [],
|
||||
providers,
|
||||
exports: [MessageQueues.taskAssignedQueue],
|
||||
};
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,46 @@
|
||||
import { Test, TestingModule } from '@nestjs/testing';
|
||||
|
||||
import { MessageQueueDriver } from 'src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface';
|
||||
|
||||
import {
|
||||
QUEUE_DRIVER,
|
||||
MessageQueues,
|
||||
} from 'src/integrations/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
|
||||
|
||||
describe('MessageQueueTaskAssigned queue', () => {
|
||||
let service: MessageQueueService;
|
||||
|
||||
beforeEach(async () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
{
|
||||
provide: MessageQueues.taskAssignedQueue,
|
||||
useFactory: (driver: MessageQueueDriver) => {
|
||||
return new MessageQueueService(
|
||||
driver,
|
||||
MessageQueues.taskAssignedQueue,
|
||||
);
|
||||
},
|
||||
inject: [QUEUE_DRIVER],
|
||||
},
|
||||
{
|
||||
provide: QUEUE_DRIVER,
|
||||
useValue: {},
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<MessageQueueService>(MessageQueues.taskAssignedQueue);
|
||||
});
|
||||
|
||||
it('should be defined', () => {
|
||||
expect(service).toBeDefined();
|
||||
});
|
||||
it('should contain the topic and driver', () => {
|
||||
expect(service).toEqual({
|
||||
driver: {},
|
||||
queueName: MessageQueues.taskAssignedQueue,
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -0,0 +1,37 @@
|
||||
import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
|
||||
|
||||
import { QueueJobOptions } from 'src/integrations/message-queue/drivers/interfaces/job-options.interface';
|
||||
import { MessageQueueDriver } from 'src/integrations/message-queue/drivers/interfaces/message-queue-driver.interface';
|
||||
|
||||
import {
|
||||
MessageQueues,
|
||||
QUEUE_DRIVER,
|
||||
} from 'src/integrations/message-queue/message-queue.constants';
|
||||
|
||||
@Injectable()
|
||||
export class MessageQueueService implements OnModuleDestroy {
|
||||
constructor(
|
||||
@Inject(QUEUE_DRIVER) protected driver: MessageQueueDriver,
|
||||
protected queueName: MessageQueues,
|
||||
) {
|
||||
if (typeof this.driver.register === 'function') {
|
||||
this.driver.register(queueName);
|
||||
}
|
||||
}
|
||||
|
||||
async onModuleDestroy() {
|
||||
if (typeof this.driver.stop === 'function') {
|
||||
await this.driver.stop();
|
||||
}
|
||||
}
|
||||
|
||||
add<T>(data: T, options?: QueueJobOptions): Promise<void> {
|
||||
return this.driver.add(this.queueName, data, options);
|
||||
}
|
||||
|
||||
work<T>(
|
||||
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
|
||||
) {
|
||||
return this.driver.work(this.queueName, handler);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user