6658 workflows add a first twenty piece email sender (#6965)

This commit is contained in:
martmull
2024-09-12 11:00:25 +02:00
committed by GitHub
parent f8e5b333d9
commit 3190f4a87b
397 changed files with 1143 additions and 1037 deletions

View File

@ -0,0 +1,8 @@
import { Inject } from '@nestjs/common';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
export const InjectMessageQueue = (queueName: MessageQueue) => {
return Inject(getQueueToken(queueName));
};

View File

@ -0,0 +1,21 @@
import { SetMetadata } from '@nestjs/common';
import { isString } from '@nestjs/common/utils/shared.utils';
import { PROCESS_METADATA } from 'src/engine/core-modules/message-queue/message-queue.constants';
export interface MessageQueueProcessOptions {
jobName: string;
concurrency?: number;
}
export function Process(jobName: string): MethodDecorator;
export function Process(options: MessageQueueProcessOptions): MethodDecorator;
export function Process(
nameOrOptions: string | MessageQueueProcessOptions,
): MethodDecorator {
const options = isString(nameOrOptions)
? { jobName: nameOrOptions }
: nameOrOptions;
return SetMetadata(PROCESS_METADATA, options || {});
}

View File

@ -0,0 +1,69 @@
import { Scope, SetMetadata } from '@nestjs/common';
import { SCOPE_OPTIONS_METADATA } from '@nestjs/common/constants';
import { MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';
import {
MessageQueue,
PROCESSOR_METADATA,
WORKER_METADATA,
} from 'src/engine/core-modules/message-queue/message-queue.constants';
export interface MessageQueueProcessorOptions {
/**
* Specifies the name of the queue to subscribe to.
*/
queueName: MessageQueue;
/**
* Specifies the lifetime of an injected Processor.
*/
scope?: Scope;
}
/**
* Represents a worker that is able to process jobs from the queue.
* @param queueName name of the queue to process
*/
export function Processor(queueName: string): ClassDecorator;
/**
* Represents a worker that is able to process jobs from the queue.
* @param queueName name of the queue to process
* @param workerOptions additional worker options
*/
export function Processor(
queueName: string,
workerOptions: MessageQueueWorkerOptions,
): ClassDecorator;
/**
* Represents a worker that is able to process jobs from the queue.
* @param processorOptions processor options
*/
export function Processor(
processorOptions: MessageQueueProcessorOptions,
): ClassDecorator;
/**
* Represents a worker that is able to process jobs from the queue.
* @param processorOptions processor options (Nest-specific)
* @param workerOptions additional Bull worker options
*/
export function Processor(
processorOptions: MessageQueueProcessorOptions,
workerOptions: MessageQueueWorkerOptions,
): ClassDecorator;
export function Processor(
queueNameOrOptions?: string | MessageQueueProcessorOptions,
maybeWorkerOptions?: MessageQueueWorkerOptions,
): ClassDecorator {
const options =
queueNameOrOptions && typeof queueNameOrOptions === 'object'
? queueNameOrOptions
: { queueName: queueNameOrOptions };
// eslint-disable-next-line @typescript-eslint/ban-types
return (target: Function) => {
SetMetadata(SCOPE_OPTIONS_METADATA, options)(target);
SetMetadata(PROCESSOR_METADATA, options)(target);
maybeWorkerOptions &&
SetMetadata(WORKER_METADATA, maybeWorkerOptions)(target);
};
}

View File

@ -0,0 +1,120 @@
import { OnModuleDestroy } from '@nestjs/common';
import omitBy from 'lodash.omitby';
import { JobsOptions, Queue, QueueOptions, Worker } from 'bullmq';
import {
QueueCronJobOptions,
QueueJobOptions,
} from 'src/engine/core-modules/message-queue/drivers/interfaces/job-options.interface';
import { MessageQueueJob } from 'src/engine/core-modules/message-queue/interfaces/message-queue-job.interface';
import { MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';
import { MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
export type BullMQDriverOptions = QueueOptions;
export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy {
private queueMap: Record<MessageQueue, Queue> = {} as Record<
MessageQueue,
Queue
>;
private workerMap: Record<MessageQueue, Worker> = {} as Record<
MessageQueue,
Worker
>;
constructor(private options: BullMQDriverOptions) {}
register(queueName: MessageQueue): void {
this.queueMap[queueName] = new Queue(queueName, this.options);
}
async onModuleDestroy() {
const workers = Object.values(this.workerMap);
const queues = Object.values(this.queueMap);
await Promise.all([
...queues.map((q) => q.close()),
...workers.map((w) => w.close()),
]);
}
async work<T>(
queueName: MessageQueue,
handler: (job: MessageQueueJob<T>) => Promise<void>,
options?: MessageQueueWorkerOptions,
) {
const worker = new Worker(
queueName,
async (job) => {
// TODO: Correctly support for job.id
await handler({ data: job.data, id: job.id ?? '', name: job.name });
},
omitBy(
{
...this.options,
concurrency: options?.concurrency,
},
(value) => value === undefined,
),
);
this.workerMap[queueName] = worker;
}
async addCron<T>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueCronJobOptions,
): Promise<void> {
if (!this.queueMap[queueName]) {
throw new Error(
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
);
}
const queueOptions: JobsOptions = {
jobId: options?.id,
priority: options?.priority,
repeat: options?.repeat,
removeOnComplete: 100,
removeOnFail: 500,
};
await this.queueMap[queueName].add(jobName, data, queueOptions);
}
async removeCron(
queueName: MessageQueue,
jobName: string,
pattern: string,
): Promise<void> {
await this.queueMap[queueName].removeRepeatable(jobName, {
pattern,
});
}
async add<T>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void> {
if (!this.queueMap[queueName]) {
throw new Error(
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
);
}
const queueOptions: JobsOptions = {
jobId: options?.id,
priority: options?.priority,
attempts: 1 + (options?.retryLimit || 0),
removeOnComplete: 100,
removeOnFail: 500,
};
await this.queueMap[queueName].add(jobName, data, queueOptions);
}
}

View File

@ -0,0 +1,13 @@
export interface QueueJobOptions {
id?: string;
priority?: number;
retryLimit?: number;
}
export interface QueueCronJobOptions extends QueueJobOptions {
repeat?: {
every?: number;
pattern?: string;
limit?: number;
};
}

View File

@ -0,0 +1,30 @@
import {
QueueCronJobOptions,
QueueJobOptions,
} from 'src/engine/core-modules/message-queue/drivers/interfaces/job-options.interface';
import { MessageQueueJobData } from 'src/engine/core-modules/message-queue/interfaces/message-queue-job.interface';
import { MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
export interface MessageQueueDriver {
add<T extends MessageQueueJobData>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void>;
work<T extends MessageQueueJobData>(
queueName: MessageQueue,
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
options?: MessageQueueWorkerOptions,
);
addCron<T extends MessageQueueJobData | undefined>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueCronJobOptions,
);
removeCron(queueName: MessageQueue, jobName: string, pattern?: string);
register?(queueName: MessageQueue): void;
}

View File

@ -0,0 +1,105 @@
import { OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import PgBoss from 'pg-boss';
import {
QueueCronJobOptions,
QueueJobOptions,
} from 'src/engine/core-modules/message-queue/drivers/interfaces/job-options.interface';
import { MessageQueueJob } from 'src/engine/core-modules/message-queue/interfaces/message-queue-job.interface';
import { MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';
import { MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
export type PgBossDriverOptions = PgBoss.ConstructorOptions;
const DEFAULT_PG_BOSS_CRON_PATTERN_WHEN_NOT_PROVIDED = '*/1 * * * *';
export class PgBossDriver
implements MessageQueueDriver, OnModuleInit, OnModuleDestroy
{
private pgBoss: PgBoss;
constructor(options: PgBossDriverOptions) {
this.pgBoss = new PgBoss(options);
}
async onModuleInit() {
await this.pgBoss.start();
}
async onModuleDestroy() {
await this.pgBoss.stop();
}
async work<T>(
queueName: string,
handler: (job: MessageQueueJob<T>) => Promise<void>,
options?: MessageQueueWorkerOptions,
) {
return this.pgBoss.work<T>(
`${queueName}.*`,
options?.concurrency
? {
teamConcurrency: options.concurrency,
}
: {},
async (job) => {
// PGBoss work with wildcard job name
const jobName = job.name.split('.')?.[1];
if (!jobName) {
throw new Error('Job name could not be splited from the job.');
}
await handler({
data: job.data,
id: job.id,
name: jobName,
});
},
);
}
async addCron<T>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueCronJobOptions,
): Promise<void> {
await this.pgBoss.schedule(
`${queueName}.${jobName}`,
options?.repeat?.pattern ??
DEFAULT_PG_BOSS_CRON_PATTERN_WHEN_NOT_PROVIDED,
data as object,
options
? {
singletonKey: options?.id,
}
: {},
);
}
async removeCron(queueName: MessageQueue, jobName: string): Promise<void> {
await this.pgBoss.unschedule(`${queueName}.${jobName}`);
}
async add<T>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void> {
await this.pgBoss.send(
`${queueName}.${jobName}`,
data as object,
options
? {
...options,
singletonKey: options?.id,
}
: {},
);
}
}

View File

@ -0,0 +1,65 @@
import { Logger } from '@nestjs/common';
import {
MessageQueueJobData,
MessageQueueJob,
} from 'src/engine/core-modules/message-queue/interfaces/message-queue-job.interface';
import { MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
export class SyncDriver implements MessageQueueDriver {
private readonly logger = new Logger(SyncDriver.name);
private workersMap: {
[queueName: string]: (job: MessageQueueJob) => Promise<void> | void;
} = {};
constructor() {}
async add<T extends MessageQueueJobData>(
queueName: MessageQueue,
jobName: string,
data: T,
): Promise<void> {
await this.processJob(queueName, { id: '', name: jobName, data });
}
async addCron<T extends MessageQueueJobData | undefined>(
queueName: MessageQueue,
jobName: string,
data: T,
): Promise<void> {
this.logger.log(`Running cron job with SyncDriver`);
await this.processJob(queueName, {
id: '',
name: jobName,
// TODO: Fix this type issue
data: data as any,
});
}
async removeCron(queueName: MessageQueue) {
this.logger.log(`Removing '${queueName}' cron job with SyncDriver`);
}
work<T extends MessageQueueJobData>(
queueName: MessageQueue,
handler: (job: MessageQueueJob<T>) => Promise<void> | void,
) {
this.logger.log(`Registering handler for queue: ${queueName}`);
this.workersMap[queueName] = handler;
}
async processJob<T extends MessageQueueJobData>(
queueName: string,
job: MessageQueueJob<T>,
) {
const worker = this.workersMap[queueName];
if (worker) {
await worker(job);
} else {
this.logger.error(`No handler found for job: ${queueName}`);
}
}
}

View File

@ -0,0 +1 @@
export * from 'src/engine/core-modules/message-queue/interfaces/message-queue-module-options.interface';

View File

@ -0,0 +1,15 @@
export interface MessageQueueJob<T = any> {
id: string;
name: string;
data: T;
}
export interface MessageQueueCronJobData<
T extends MessageQueueJobData | undefined,
> {
handle(data: T): Promise<void> | void;
}
export interface MessageQueueJobData {
[key: string]: any;
}

View File

@ -0,0 +1,28 @@
import { BullMQDriverOptions } from 'src/engine/core-modules/message-queue/drivers/bullmq.driver';
import { PgBossDriverOptions } from 'src/engine/core-modules/message-queue/drivers/pg-boss.driver';
export enum MessageQueueDriverType {
PgBoss = 'pg-boss',
BullMQ = 'bull-mq',
Sync = 'sync',
}
export interface PgBossDriverFactoryOptions {
type: MessageQueueDriverType.PgBoss;
options: PgBossDriverOptions;
}
export interface BullMQDriverFactoryOptions {
type: MessageQueueDriverType.BullMQ;
options: BullMQDriverOptions;
}
export interface SyncDriverFactoryOptions {
type: MessageQueueDriverType.Sync;
options: Record<string, any>;
}
export type MessageQueueModuleOptions =
| PgBossDriverFactoryOptions
| BullMQDriverFactoryOptions
| SyncDriverFactoryOptions;

View File

@ -0,0 +1,3 @@
export interface MessageQueueWorkerOptions {
concurrency?: number;
}

View File

@ -0,0 +1,66 @@
import { Module } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { TypeOrmModule } from '@nestjs/typeorm';
import { DataSeedDemoWorkspaceModule } from 'src/database/commands/data-seed-demo-workspace/data-seed-demo-workspace.module';
import { DataSeedDemoWorkspaceJob } from 'src/database/commands/data-seed-demo-workspace/jobs/data-seed-demo-workspace.job';
import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { WorkspaceQueryRunnerJobModule } from 'src/engine/api/graphql/workspace-query-runner/jobs/workspace-query-runner-job.module';
import { BillingModule } from 'src/engine/core-modules/billing/billing.module';
import { UpdateSubscriptionJob } from 'src/engine/core-modules/billing/jobs/update-subscription.job';
import { StripeModule } from 'src/engine/core-modules/billing/stripe/stripe.module';
import { UserWorkspaceModule } from 'src/engine/core-modules/user-workspace/user-workspace.module';
import { UserModule } from 'src/engine/core-modules/user/user.module';
import { HandleWorkspaceMemberDeletedJob } from 'src/engine/core-modules/workspace/handle-workspace-member-deleted.job';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
import { EmailSenderJob } from 'src/engine/core-modules/email/email-sender.job';
import { EmailModule } from 'src/engine/core-modules/email/email.module';
import { DataSourceModule } from 'src/engine/metadata-modules/data-source/data-source.module';
import { ObjectMetadataModule } from 'src/engine/metadata-modules/object-metadata/object-metadata.module';
import { CleanInactiveWorkspaceJob } from 'src/engine/workspace-manager/workspace-cleaner/crons/clean-inactive-workspace.job';
import { CalendarEventParticipantManagerModule } from 'src/modules/calendar/calendar-event-participant-manager/calendar-event-participant-manager.module';
import { CalendarModule } from 'src/modules/calendar/calendar.module';
import { AutoCompaniesAndContactsCreationJobModule } from 'src/modules/contact-creation-manager/jobs/auto-companies-and-contacts-creation-job.module';
import { MessagingModule } from 'src/modules/messaging/messaging.module';
import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module';
import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module';
import { WorkflowModule } from 'src/modules/workflow/workflow.module';
@Module({
imports: [
TypeOrmModule.forFeature([Workspace], 'core'),
DataSourceModule,
ObjectMetadataModule,
TypeORMModule,
UserModule,
EmailModule,
DataSeedDemoWorkspaceModule,
BillingModule,
UserWorkspaceModule,
WorkspaceModule,
MessagingModule,
CalendarModule,
CalendarEventParticipantManagerModule,
TimelineActivityModule,
StripeModule,
WorkspaceQueryRunnerJobModule,
AutoCompaniesAndContactsCreationJobModule,
TimelineJobModule,
WorkflowModule,
],
providers: [
CleanInactiveWorkspaceJob,
EmailSenderJob,
DataSeedDemoWorkspaceJob,
UpdateSubscriptionJob,
HandleWorkspaceMemberDeletedJob,
],
})
export class JobsModule {
static moduleRef: ModuleRef;
constructor(private moduleRef: ModuleRef) {
JobsModule.moduleRef = this.moduleRef;
}
}

View File

@ -0,0 +1,124 @@
import {
DynamicModule,
Global,
Logger,
Module,
Provider,
} from '@nestjs/common';
import { MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
import { MessageQueueDriverType } from 'src/engine/core-modules/message-queue/interfaces';
import {
MessageQueue,
QUEUE_DRIVER,
} from 'src/engine/core-modules/message-queue/message-queue.constants';
import { PgBossDriver } from 'src/engine/core-modules/message-queue/drivers/pg-boss.driver';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { BullMQDriver } from 'src/engine/core-modules/message-queue/drivers/bullmq.driver';
import { SyncDriver } from 'src/engine/core-modules/message-queue/drivers/sync.driver';
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
import {
ASYNC_OPTIONS_TYPE,
ConfigurableModuleClass,
OPTIONS_TYPE,
} from 'src/engine/core-modules/message-queue/message-queue.module-definition';
@Global()
@Module({})
export class MessageQueueCoreModule extends ConfigurableModuleClass {
private static readonly logger = new Logger(MessageQueueCoreModule.name);
static register(options: typeof OPTIONS_TYPE): DynamicModule {
const dynamicModule = super.register(options);
const driverProvider: Provider = {
provide: QUEUE_DRIVER,
useFactory: () => {
return this.createDriver(options);
},
};
const queueProviders = this.createQueueProviders();
return {
...dynamicModule,
providers: [
...(dynamicModule.providers ?? []),
driverProvider,
...queueProviders,
],
exports: [
...(dynamicModule.exports ?? []),
...Object.values(MessageQueue).map((queueName) =>
getQueueToken(queueName),
),
],
};
}
static registerAsync(options: typeof ASYNC_OPTIONS_TYPE): DynamicModule {
const dynamicModule = super.registerAsync(options);
const driverProvider: Provider = {
provide: QUEUE_DRIVER,
useFactory: async (...args: any[]) => {
if (options.useFactory) {
const config = await options.useFactory(...args);
return this.createDriver(config);
}
throw new Error('useFactory is not defined');
},
inject: options.inject || [],
};
const queueProviders = MessageQueueCoreModule.createQueueProviders();
return {
...dynamicModule,
providers: [
...(dynamicModule.providers ?? []),
driverProvider,
...queueProviders,
],
exports: [
...(dynamicModule.exports ?? []),
...Object.values(MessageQueue).map((queueName) =>
getQueueToken(queueName),
),
],
};
}
static async createDriver({ type, options }: typeof OPTIONS_TYPE) {
switch (type) {
case MessageQueueDriverType.PgBoss: {
return new PgBossDriver(options);
}
case MessageQueueDriverType.BullMQ: {
return new BullMQDriver(options);
}
case MessageQueueDriverType.Sync: {
return new SyncDriver();
}
default: {
this.logger.warn(
`Unsupported message queue driver type: ${type}. Using SyncDriver by default.`,
);
return new SyncDriver();
}
}
}
static createQueueProviders(): Provider[] {
return Object.values(MessageQueue).map((queueName) => ({
provide: getQueueToken(queueName),
useFactory: (driver: MessageQueueDriver) => {
return new MessageQueueService(driver, queueName);
},
inject: [QUEUE_DRIVER],
}));
}
}

View File

@ -0,0 +1,45 @@
/* eslint-disable @typescript-eslint/ban-types */
import { Injectable, Type } from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { MessageQueueProcessOptions } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { MessageQueueProcessorOptions } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import {
PROCESSOR_METADATA,
PROCESS_METADATA,
} from 'src/engine/core-modules/message-queue/message-queue.constants';
@Injectable()
export class MessageQueueMetadataAccessor {
constructor(private readonly reflector: Reflector) {}
isProcessor(target: Type | Function): boolean {
if (!target) {
return false;
}
return !!this.reflector.get(PROCESSOR_METADATA, target);
}
isProcess(target: Type | Function): boolean {
if (!target) {
return false;
}
return !!this.reflector.get(PROCESS_METADATA, target);
}
getProcessorMetadata(
target: Type | Function,
): MessageQueueProcessorOptions | undefined {
return this.reflector.get(PROCESSOR_METADATA, target);
}
getProcessMetadata(
target: Type | Function,
): MessageQueueProcessOptions | undefined {
const metadata = this.reflector.get(PROCESS_METADATA, target);
return metadata;
}
}

View File

@ -0,0 +1,20 @@
export const PROCESSOR_METADATA = Symbol('message-queue:processor_metadata');
export const PROCESS_METADATA = Symbol('message-queue:process_metadata');
export const WORKER_METADATA = Symbol('bullmq:worker_metadata');
export const QUEUE_DRIVER = Symbol('message-queue:queue_driver');
export enum MessageQueue {
taskAssignedQueue = 'task-assigned-queue',
messagingQueue = 'messaging-queue',
webhookQueue = 'webhook-queue',
cronQueue = 'cron-queue',
emailQueue = 'email-queue',
calendarQueue = 'calendar-queue',
contactCreationQueue = 'contact-creation-queue',
billingQueue = 'billing-queue',
workspaceQueue = 'workspace-queue',
recordPositionBackfillQueue = 'record-position-backfill-queue',
entityEventsToDbQueue = 'entity-events-to-db-queue',
testQueue = 'test-queue',
workflowQueue = 'workflow-queue',
}

View File

@ -0,0 +1,215 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import {
DiscoveryService,
MetadataScanner,
ModuleRef,
createContextId,
} from '@nestjs/core';
import { Module } from '@nestjs/core/injector/module';
import { Injector } from '@nestjs/core/injector/injector';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';
import {
MessageQueueJob,
MessageQueueJobData,
} from 'src/engine/core-modules/message-queue/interfaces/message-queue-job.interface';
import { MessageQueueMetadataAccessor } from 'src/engine/core-modules/message-queue/message-queue-metadata.accessor';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { getQueueToken } from 'src/engine/core-modules/message-queue/utils/get-queue-token.util';
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
import { shouldFilterException } from 'src/engine/utils/global-exception-handler.util';
interface ProcessorGroup {
instance: object;
host: Module;
processMethodNames: string[];
isRequestScoped: boolean;
}
@Injectable()
export class MessageQueueExplorer implements OnModuleInit {
private readonly logger = new Logger('MessageQueueModule');
private readonly injector = new Injector();
constructor(
private readonly moduleRef: ModuleRef,
private readonly discoveryService: DiscoveryService,
private readonly metadataAccessor: MessageQueueMetadataAccessor,
private readonly metadataScanner: MetadataScanner,
private readonly exceptionHandlerService: ExceptionHandlerService,
) {}
onModuleInit() {
this.explore();
}
explore() {
const processors = this.discoveryService
.getProviders()
.filter((wrapper) =>
this.metadataAccessor.isProcessor(
!wrapper.metatype || wrapper.inject
? wrapper.instance?.constructor
: wrapper.metatype,
),
);
const groupedProcessors = this.groupProcessorsByQueueName(processors);
for (const [queueName, processorGroupCollection] of Object.entries(
groupedProcessors,
)) {
const queueToken = getQueueToken(queueName);
const messageQueueService = this.getQueueService(queueToken);
this.handleProcessorGroupCollection(
processorGroupCollection,
messageQueueService,
);
}
}
private groupProcessorsByQueueName(processors: InstanceWrapper[]) {
return processors.reduce(
(acc, wrapper) => {
const { instance, metatype } = wrapper;
const methodNames = this.metadataScanner.getAllMethodNames(instance);
const { queueName } =
this.metadataAccessor.getProcessorMetadata(
instance.constructor || metatype,
) ?? {};
const processMethodNames = methodNames.filter((name) =>
this.metadataAccessor.isProcess(instance[name]),
);
if (!queueName) {
this.logger.error(
`Processor ${wrapper.name} is missing queue name metadata`,
);
return acc;
}
if (!wrapper.host) {
this.logger.error(
`Processor ${wrapper.name} is missing host metadata`,
);
return acc;
}
if (!acc[queueName]) {
acc[queueName] = [];
}
acc[queueName].push({
instance,
host: wrapper.host,
processMethodNames,
isRequestScoped: !wrapper.isDependencyTreeStatic(),
});
return acc;
},
{} as Record<string, ProcessorGroup[]>,
);
}
private getQueueService(queueToken: string): MessageQueueService {
try {
return this.moduleRef.get<MessageQueueService>(queueToken, {
strict: false,
});
} catch (err) {
this.logger.error(`No queue found for token ${queueToken}`);
throw err;
}
}
private async handleProcessorGroupCollection(
processorGroupCollection: ProcessorGroup[],
queue: MessageQueueService,
options?: MessageQueueWorkerOptions,
) {
queue.work(async (job) => {
for (const processorGroup of processorGroupCollection) {
await this.handleProcessor(processorGroup, job);
}
}, options);
}
private async handleProcessor(
{ instance, host, processMethodNames, isRequestScoped }: ProcessorGroup,
job: MessageQueueJob<MessageQueueJobData>,
) {
const filteredProcessMethodNames = processMethodNames.filter(
(processMethodName) => {
const metadata = this.metadataAccessor.getProcessMetadata(
instance[processMethodName],
);
return metadata && job.name === metadata.jobName;
},
);
// Return early if no matching methods found
if (filteredProcessMethodNames.length === 0) {
return;
}
if (isRequestScoped) {
const contextId = createContextId();
if (this.moduleRef.registerRequestByContextId) {
this.moduleRef.registerRequestByContextId(
{
// Add workspaceId to the request object
req: {
workspaceId: job.data?.workspaceId,
},
},
contextId,
);
}
const contextInstance = await this.injector.loadPerContext(
instance,
host,
host.providers,
contextId,
);
await this.invokeProcessMethods(
contextInstance,
filteredProcessMethodNames,
job,
);
} else {
await this.invokeProcessMethods(
instance,
filteredProcessMethodNames,
job,
);
}
}
private async invokeProcessMethods(
instance: object,
processMethodNames: string[],
job: MessageQueueJob<MessageQueueJobData>,
) {
for (const processMethodName of processMethodNames) {
try {
await instance[processMethodName].call(instance, job.data);
} catch (err) {
if (!shouldFilterException(err)) {
this.exceptionHandlerService.captureExceptions([err]);
}
throw err;
}
}
}
}

View File

@ -0,0 +1,20 @@
import { ConfigurableModuleBuilder } from '@nestjs/common';
import { MessageQueueModuleOptions } from 'src/engine/core-modules/message-queue/interfaces';
export const {
ConfigurableModuleClass,
OPTIONS_TYPE,
ASYNC_OPTIONS_TYPE,
MODULE_OPTIONS_TOKEN,
} = new ConfigurableModuleBuilder<MessageQueueModuleOptions>()
.setExtras(
{
isGlobal: true,
},
(definition, extras) => ({
...definition,
global: extras.isGlobal,
}),
)
.build();

View File

@ -0,0 +1,57 @@
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
import {
MessageQueueDriverType,
MessageQueueModuleOptions,
} from 'src/engine/core-modules/message-queue/interfaces';
/**
* MessageQueue Module factory
* @returns MessageQueueModuleOptions
* @param environmentService
*/
export const messageQueueModuleFactory = async (
environmentService: EnvironmentService,
): Promise<MessageQueueModuleOptions> => {
const driverType = environmentService.get('MESSAGE_QUEUE_TYPE');
switch (driverType) {
case MessageQueueDriverType.Sync: {
return {
type: MessageQueueDriverType.Sync,
options: {},
};
}
case MessageQueueDriverType.PgBoss: {
const connectionString = environmentService.get('PG_DATABASE_URL');
return {
type: MessageQueueDriverType.PgBoss,
options: {
connectionString,
},
};
}
case MessageQueueDriverType.BullMQ: {
const host = environmentService.get('REDIS_HOST');
const port = environmentService.get('REDIS_PORT');
const username = environmentService.get('REDIS_USERNAME');
const password = environmentService.get('REDIS_PASSWORD');
return {
type: MessageQueueDriverType.BullMQ,
options: {
connection: {
host,
port,
username,
password,
},
},
};
}
default:
throw new Error(
`Invalid message queue driver type (${driverType}), check your .env file`,
);
}
};

View File

@ -0,0 +1,36 @@
import { DynamicModule, Global, Module } from '@nestjs/common';
import { DiscoveryModule } from '@nestjs/core';
import { MessageQueueCoreModule } from 'src/engine/core-modules/message-queue/message-queue-core.module';
import { MessageQueueMetadataAccessor } from 'src/engine/core-modules/message-queue/message-queue-metadata.accessor';
import { MessageQueueExplorer } from 'src/engine/core-modules/message-queue/message-queue.explorer';
import {
ASYNC_OPTIONS_TYPE,
OPTIONS_TYPE,
} from 'src/engine/core-modules/message-queue/message-queue.module-definition';
@Global()
@Module({})
export class MessageQueueModule {
static register(options: typeof OPTIONS_TYPE): DynamicModule {
return {
module: MessageQueueModule,
imports: [MessageQueueCoreModule.register(options)],
};
}
static registerExplorer(): DynamicModule {
return {
module: MessageQueueModule,
imports: [DiscoveryModule],
providers: [MessageQueueExplorer, MessageQueueMetadataAccessor],
};
}
static registerAsync(options: typeof ASYNC_OPTIONS_TYPE): DynamicModule {
return {
module: MessageQueueModule,
imports: [MessageQueueCoreModule.registerAsync(options)],
};
}
}

View File

@ -0,0 +1,46 @@
import { Test, TestingModule } from '@nestjs/testing';
import { MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
import {
QUEUE_DRIVER,
MessageQueue,
} from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
describe('MessageQueueTaskAssigned queue', () => {
let service: MessageQueueService;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
{
provide: MessageQueue.taskAssignedQueue,
useFactory: (driver: MessageQueueDriver) => {
return new MessageQueueService(
driver,
MessageQueue.taskAssignedQueue,
);
},
inject: [QUEUE_DRIVER],
},
{
provide: QUEUE_DRIVER,
useValue: {},
},
],
}).compile();
service = module.get<MessageQueueService>(MessageQueue.taskAssignedQueue);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
it('should contain the topic and driver', () => {
expect(service).toEqual({
driver: {},
queueName: MessageQueue.taskAssignedQueue,
});
});
});

View File

@ -0,0 +1,56 @@
import { Inject, Injectable } from '@nestjs/common';
import {
QueueCronJobOptions,
QueueJobOptions,
} from 'src/engine/core-modules/message-queue/drivers/interfaces/job-options.interface';
import { MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
import {
MessageQueueJobData,
MessageQueueJob,
} from 'src/engine/core-modules/message-queue/interfaces/message-queue-job.interface';
import { MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';
import {
MessageQueue,
QUEUE_DRIVER,
} from 'src/engine/core-modules/message-queue/message-queue.constants';
@Injectable()
export class MessageQueueService {
constructor(
@Inject(QUEUE_DRIVER) protected driver: MessageQueueDriver,
protected queueName: MessageQueue,
) {
if (typeof this.driver.register === 'function') {
this.driver.register(queueName);
}
}
add<T extends MessageQueueJobData>(
jobName: string,
data: T,
options?: QueueJobOptions,
): Promise<void> {
return this.driver.add(this.queueName, jobName, data, options);
}
addCron<T extends MessageQueueJobData | undefined>(
jobName: string,
data: T,
options?: QueueCronJobOptions,
): Promise<void> {
return this.driver.addCron(this.queueName, jobName, data, options);
}
removeCron(jobName: string, pattern: string): Promise<void> {
return this.driver.removeCron(this.queueName, jobName, pattern);
}
work<T extends MessageQueueJobData>(
handler: (job: MessageQueueJob<T>) => Promise<void> | void,
options?: MessageQueueWorkerOptions,
) {
return this.driver.work(this.queueName, handler, options);
}
}

View File

@ -0,0 +1,2 @@
export const getQueueToken = (queueName: string) =>
`MESSAGE_QUEUE_${queueName}`;