feat: Enhancements to MessageQueue Module with Decorators (#5657)
### Overview
This PR introduces significant enhancements to the MessageQueue module
by integrating `@Processor`, `@Process`, and `@InjectMessageQueue`
decorators. These changes streamline the process of defining and
managing queue processors and job handlers, and also allow for
request-scoped handlers, improving compatibility with services that rely
on scoped providers like TwentyORM repositories.
### Key Features
1. **Decorator-based Job Handling**: Use `@Processor` and `@Process`
decorators to define job handlers declaratively.
2. **Request Scope Support**: Job handlers can be scoped per request,
enhancing integration with request-scoped services.
### Usage
#### Defining Processors and Job Handlers
The `@Processor` decorator is used to define a class that processes jobs
for a specific queue. The `@Process` decorator is applied to methods
within this class to define specific job handlers.
##### Example 1: Specific Job Handlers
```typescript
import { Processor, Process, InjectMessageQueue } from 'src/engine/integrations/message-queue';
@Processor('taskQueue')
export class TaskProcessor {
@Process('taskA')
async handleTaskA(job: { id: string, data: any }) {
console.log(`Handling task A with data:`, job.data);
// Logic for task A
}
@Process('taskB')
async handleTaskB(job: { id: string, data: any }) {
console.log(`Handling task B with data:`, job.data);
// Logic for task B
}
}
```
In the example above, `TaskProcessor` is responsible for processing jobs
in the `taskQueue`. The `handleTaskA` method will only be called for
jobs with the name `taskA`, while `handleTaskB` will be called for
`taskB` jobs.
##### Example 2: General Job Handler
```typescript
import { Processor, Process, InjectMessageQueue } from 'src/engine/integrations/message-queue';
@Processor('generalQueue')
export class GeneralProcessor {
@Process()
async handleAnyJob(job: { id: string, name: string, data: any }) {
console.log(`Handling job ${job.name} with data:`, job.data);
// Logic for any job
}
}
```
In this example, `GeneralProcessor` handles all jobs in the
`generalQueue`, regardless of the job name. The `handleAnyJob` method
will be invoked for every job added to the `generalQueue`.
#### Adding Jobs to a Queue
You can use the `@InjectMessageQueue` decorator to inject a queue into a
service and add jobs to it.
##### Example:
```typescript
import { Injectable } from '@nestjs/common';
import { InjectMessageQueue, MessageQueue } from 'src/engine/integrations/message-queue';
@Injectable()
export class TaskService {
constructor(
@InjectMessageQueue('taskQueue') private readonly taskQueue: MessageQueue,
) {}
async addTaskA(data: any) {
await this.taskQueue.add('taskA', data);
}
async addTaskB(data: any) {
await this.taskQueue.add('taskB', data);
}
}
```
In this example, `TaskService` adds jobs to the `taskQueue`. The
`addTaskA` and `addTaskB` methods add jobs named `taskA` and `taskB`,
respectively, to the queue.
#### Using Scoped Job Handlers
To utilize request-scoped job handlers, specify the scope in the
`@Processor` decorator. This is particularly useful for services that
use scoped repositories like those in TwentyORM.
##### Example:
```typescript
import { Processor, Process, InjectMessageQueue, Scope } from 'src/engine/integrations/message-queue';
@Processor({ name: 'scopedQueue', scope: Scope.REQUEST })
export class ScopedTaskProcessor {
@Process('scopedTask')
async handleScopedTask(job: { id: string, data: any }) {
console.log(`Handling scoped task with data:`, job.data);
// Logic for scoped task, which might use request-scoped services
}
}
```
Here, the `ScopedTaskProcessor` is associated with `scopedQueue` and
operates with request scope. This setup is essential when the job
handler relies on services that need to be instantiated per request,
such as scoped repositories.
### Migration Notes
- **Decorators**: Refactor job handlers to use `@Processor` and
`@Process` decorators.
- **Request Scope**: Utilize the scope option in `@Processor` if your
job handlers depend on request-scoped services.
Fix #5628
---------
Co-authored-by: Weiko <corentin@twenty.com>
This commit is contained in:
@ -0,0 +1,209 @@
|
||||
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
|
||||
import {
|
||||
DiscoveryService,
|
||||
MetadataScanner,
|
||||
ModuleRef,
|
||||
createContextId,
|
||||
} from '@nestjs/core';
|
||||
import { Module } from '@nestjs/core/injector/module';
|
||||
import { Injector } from '@nestjs/core/injector/injector';
|
||||
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
|
||||
|
||||
import { MessageQueueWorkerOptions } from 'src/engine/integrations/message-queue/interfaces/message-queue-worker-options.interface';
|
||||
import {
|
||||
MessageQueueJob,
|
||||
MessageQueueJobData,
|
||||
} from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
|
||||
|
||||
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||
import { getQueueToken } from 'src/engine/integrations/message-queue/utils/get-queue-token.util';
|
||||
import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service';
|
||||
import { shouldFilterException } from 'src/engine/utils/global-exception-handler.util';
|
||||
|
||||
import { MessageQueueMetadataAccessor } from './message-queue-metadata.accessor';
|
||||
|
||||
interface ProcessorGroup {
|
||||
instance: object;
|
||||
host: Module;
|
||||
processMethodNames: string[];
|
||||
isRequestScoped: boolean;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class MessageQueueExplorer implements OnModuleInit {
|
||||
private readonly logger = new Logger('MessageQueueModule');
|
||||
private readonly injector = new Injector();
|
||||
|
||||
constructor(
|
||||
private readonly moduleRef: ModuleRef,
|
||||
private readonly discoveryService: DiscoveryService,
|
||||
private readonly metadataAccessor: MessageQueueMetadataAccessor,
|
||||
private readonly metadataScanner: MetadataScanner,
|
||||
private readonly exceptionHandlerService: ExceptionHandlerService,
|
||||
) {}
|
||||
|
||||
onModuleInit() {
|
||||
this.explore();
|
||||
}
|
||||
|
||||
explore() {
|
||||
const processors = this.discoveryService
|
||||
.getProviders()
|
||||
.filter((wrapper) =>
|
||||
this.metadataAccessor.isProcessor(
|
||||
!wrapper.metatype || wrapper.inject
|
||||
? wrapper.instance?.constructor
|
||||
: wrapper.metatype,
|
||||
),
|
||||
);
|
||||
|
||||
const groupedProcessors = this.groupProcessorsByQueueName(processors);
|
||||
|
||||
for (const [queueName, processorGroupCollection] of Object.entries(
|
||||
groupedProcessors,
|
||||
)) {
|
||||
const queueToken = getQueueToken(queueName);
|
||||
const messageQueueService = this.getQueueService(queueToken);
|
||||
|
||||
this.handleProcessorGroupCollection(
|
||||
processorGroupCollection,
|
||||
messageQueueService,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private groupProcessorsByQueueName(processors: InstanceWrapper[]) {
|
||||
return processors.reduce(
|
||||
(acc, wrapper) => {
|
||||
const { instance, metatype } = wrapper;
|
||||
const methodNames = this.metadataScanner.getAllMethodNames(instance);
|
||||
const { queueName } =
|
||||
this.metadataAccessor.getProcessorMetadata(
|
||||
instance.constructor || metatype,
|
||||
) ?? {};
|
||||
|
||||
const processMethodNames = methodNames.filter((name) =>
|
||||
this.metadataAccessor.isProcess(instance[name]),
|
||||
);
|
||||
|
||||
if (!queueName) {
|
||||
this.logger.error(
|
||||
`Processor ${wrapper.name} is missing queue name metadata`,
|
||||
);
|
||||
|
||||
return acc;
|
||||
}
|
||||
|
||||
if (!wrapper.host) {
|
||||
this.logger.error(
|
||||
`Processor ${wrapper.name} is missing host metadata`,
|
||||
);
|
||||
|
||||
return acc;
|
||||
}
|
||||
|
||||
if (!acc[queueName]) {
|
||||
acc[queueName] = [];
|
||||
}
|
||||
|
||||
acc[queueName].push({
|
||||
instance,
|
||||
host: wrapper.host,
|
||||
processMethodNames,
|
||||
isRequestScoped: !wrapper.isDependencyTreeStatic(),
|
||||
});
|
||||
|
||||
return acc;
|
||||
},
|
||||
{} as Record<string, ProcessorGroup[]>,
|
||||
);
|
||||
}
|
||||
|
||||
private getQueueService(queueToken: string): MessageQueueService {
|
||||
try {
|
||||
return this.moduleRef.get<MessageQueueService>(queueToken, {
|
||||
strict: false,
|
||||
});
|
||||
} catch (err) {
|
||||
this.logger.error(`No queue found for token ${queueToken}`);
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
private async handleProcessorGroupCollection(
|
||||
processorGroupCollection: ProcessorGroup[],
|
||||
queue: MessageQueueService,
|
||||
options?: MessageQueueWorkerOptions,
|
||||
) {
|
||||
queue.work(async (job) => {
|
||||
for (const processorGroup of processorGroupCollection) {
|
||||
await this.handleProcessor(processorGroup, job);
|
||||
}
|
||||
}, options);
|
||||
}
|
||||
|
||||
private async handleProcessor(
|
||||
{ instance, host, processMethodNames, isRequestScoped }: ProcessorGroup,
|
||||
job: MessageQueueJob<MessageQueueJobData>,
|
||||
) {
|
||||
const processMetadataCollection = new Map(
|
||||
processMethodNames.map((name) => {
|
||||
const metadata = this.metadataAccessor.getProcessMetadata(
|
||||
instance[name],
|
||||
);
|
||||
|
||||
return [name, metadata];
|
||||
}),
|
||||
);
|
||||
|
||||
if (isRequestScoped) {
|
||||
const contextId = createContextId();
|
||||
|
||||
if (this.moduleRef.registerRequestByContextId) {
|
||||
this.moduleRef.registerRequestByContextId(
|
||||
{
|
||||
// Add workspaceId to the request object
|
||||
req: {
|
||||
workspaceId: job.data.workspaceId,
|
||||
},
|
||||
},
|
||||
contextId,
|
||||
);
|
||||
}
|
||||
|
||||
const contextInstance = await this.injector.loadPerContext(
|
||||
instance,
|
||||
host,
|
||||
host.providers,
|
||||
contextId,
|
||||
);
|
||||
|
||||
await this.invokeProcessMethods(
|
||||
contextInstance,
|
||||
processMetadataCollection,
|
||||
job,
|
||||
);
|
||||
} else {
|
||||
await this.invokeProcessMethods(instance, processMetadataCollection, job);
|
||||
}
|
||||
}
|
||||
|
||||
private async invokeProcessMethods(
|
||||
instance: object,
|
||||
processMetadataCollection: Map<string, any>,
|
||||
job: MessageQueueJob<MessageQueueJobData>,
|
||||
) {
|
||||
for (const [methodName, metadata] of processMetadataCollection) {
|
||||
if (job.name === metadata?.jobName) {
|
||||
try {
|
||||
await instance[methodName].call(instance, job.data);
|
||||
} catch (err) {
|
||||
if (!shouldFilterException(err)) {
|
||||
this.exceptionHandlerService.captureExceptions([err]);
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user