From db2935b877be4b054dca67b729f1749cb5904ec4 Mon Sep 17 00:00:00 2001 From: Weiko Date: Mon, 15 Apr 2024 17:33:27 +0200 Subject: [PATCH] [message-queue] Add job auto-removal (#4973) ## Context With the addition of cronjobs, the app is building a lot of jobs and stores them indefinitely. There is no real point to keep all of them in the queue once they have been processed (completed or failed) so we are adding a new default option to the bull-mq driver ## Implementation See bull-mq JobsOption doc ```typescript /** * If true, removes the job when it successfully completes * When given a number, it specifies the maximum amount of * jobs to keep, or you can provide an object specifying max * age and/or count to keep. It overrides whatever setting is used in the worker. * Default behavior is to keep the job in the completed set. */ removeOnComplete?: boolean | number | KeepJobs; /** * If true, removes the job when it fails after all attempts. * When given a number, it specifies the maximum amount of * jobs to keep, or you can provide an object specifying max * age and/or count to keep. It overrides whatever setting is used in the worker. * Default behavior is to keep the job in the failed set. */ removeOnFail?: boolean | number | KeepJobs; ``` removeOnFail should be a bit higher since they are the ones we are most likely looking at when needed. --- .../message-queue/drivers/bullmq.driver.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts b/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts index be282e726..fc78eaabb 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/drivers/bullmq.driver.ts @@ -1,4 +1,4 @@ -import { Queue, QueueOptions, Worker } from 'bullmq'; +import { JobsOptions, Queue, QueueOptions, Worker } from 'bullmq'; import { QueueCronJobOptions, @@ -63,10 +63,12 @@ export class BullMQDriver implements MessageQueueDriver { `Queue ${queueName} is not registered, make sure you have added it as a queue provider`, ); } - const queueOptions = { + const queueOptions: JobsOptions = { jobId: options?.id, priority: options?.priority, repeat: options?.repeat, + removeOnComplete: 100, + removeOnFail: 500, }; await this.queueMap[queueName].add(jobName, data, queueOptions); @@ -93,10 +95,12 @@ export class BullMQDriver implements MessageQueueDriver { `Queue ${queueName} is not registered, make sure you have added it as a queue provider`, ); } - const queueOptions = { + const queueOptions: JobsOptions = { jobId: options?.id, priority: options?.priority, attempts: 1 + (options?.retryLimit || 0), + removeOnComplete: 100, + removeOnFail: 500, }; await this.queueMap[queueName].add(jobName, data, queueOptions);