Fix queue setup (#3075)

fix bullmq setup
This commit is contained in:
Weiko
2023-12-19 17:12:22 +01:00
committed by GitHub
parent 5afcab4e78
commit 4637a92f09
10 changed files with 22 additions and 15 deletions

View File

@ -38,9 +38,13 @@ export class BullMQDriver implements MessageQueueDriver {
queueName: MessageQueue,
handler: ({ data, id }: { data: T; id: string }) => Promise<void>,
) {
const worker = new Worker(queueName, async (job) => {
await handler(job as { data: T; id: string });
});
const worker = new Worker(
queueName,
async (job) => {
await handler(job as { data: T; id: string });
},
this.options,
);
this.workerMap[queueName] = worker;
}

View File

@ -39,7 +39,12 @@ export class PgBossDriver implements MessageQueueDriver {
await this.pgBoss.send(
`${queueName}.${jobName}`,
data as object,
options ?? {},
options
? {
...options,
singletonKey: options?.id,
}
: {},
);
}
}

View File

@ -13,10 +13,7 @@ import {
import { PgBossDriver } from 'src/integrations/message-queue/drivers/pg-boss.driver';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { BullMQDriver } from 'src/integrations/message-queue/drivers/bullmq.driver';
import { FetchMessagesJob } from 'src/workspace/messaging/jobs/fetch-messages.job';
import { SyncDriver } from 'src/integrations/message-queue/drivers/sync.driver';
import { ModuleRef } from '@nestjs/core';
import { AppModule } from 'src/app.module';
import { JobsModule } from 'src/integrations/message-queue/jobs.module';
@Global()
@ -38,7 +35,9 @@ export class MessageQueueModule {
switch (config.type) {
case MessageQueueDriverType.PgBoss:
const boss = new PgBossDriver(config.options);
await boss.init();
return boss;
case MessageQueueDriverType.BullMQ:

View File

@ -37,7 +37,7 @@ export class DataSourceEntity {
})
objects: ObjectMetadataEntity[];
@Column({ nullable: false })
@Column({ nullable: false, type: 'uuid' })
workspaceId: string;
@CreateDateColumn()

View File

@ -95,7 +95,7 @@ export class FieldMetadataEntity<
@Column({ nullable: true, default: true })
isNullable: boolean;
@Column({ nullable: false })
@Column({ nullable: false, type: 'uuid' })
workspaceId: string;
@OneToOne(

View File

@ -64,7 +64,7 @@ export class ObjectMetadataEntity implements ObjectMetadataInterface {
@Column({ nullable: true })
imageIdentifierFieldMetadataId?: string;
@Column({ nullable: false })
@Column({ nullable: false, type: 'uuid' })
workspaceId: string;
@OneToMany(() => FieldMetadataEntity, (field) => field.object, {

View File

@ -40,7 +40,7 @@ export class RelationMetadataEntity implements RelationMetadataInterface {
@Column({ nullable: false, type: 'uuid' })
toFieldMetadataId: string;
@Column({ nullable: false })
@Column({ nullable: false, type: 'uuid' })
workspaceId: string;
@ManyToOne(

View File

@ -11,7 +11,7 @@ export class WorkspaceCacheVersionEntity {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ unique: true })
@Column({ unique: true, nullable: false, type: 'uuid' })
workspaceId: string;
@Column()

View File

@ -79,7 +79,7 @@ export class WorkspaceMigrationEntity {
@Column({ nullable: true })
appliedAt?: Date;
@Column()
@Column({ nullable: false, type: 'uuid' })
workspaceId: string;
@CreateDateColumn()

View File

@ -4,8 +4,8 @@ import {
MessageQueueJob,
MessageQueueJobData,
} from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { JobsModule } from 'src/integrations/message-queue/jobs.module';
import { JobsModule } from 'src/integrations/message-queue/jobs.module';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { getJobClassName } from 'src/integrations/message-queue/utils/get-job-class-name.util';
@ -28,4 +28,3 @@ async function bootstrap() {
}
}
bootstrap();