335 workflow implement workflow cron triggers backend (#9988)

[Backend side] Add cron triggers to workflow
Closes https://github.com/twentyhq/core-team-issues/issues/335
This commit is contained in:
martmull
2025-02-05 12:02:49 +01:00
committed by GitHub
parent 074cc113ac
commit 736b845c98
46 changed files with 419 additions and 253 deletions

View File

@ -1,8 +1,8 @@
import { OnModuleDestroy } from '@nestjs/common';
import { JobsOptions, Queue, QueueOptions, Worker } from 'bullmq';
import omitBy from 'lodash.omitby';
import { v4 } from 'uuid';
import { isDefined } from 'twenty-shared';
import {
QueueCronJobOptions,
@ -13,6 +13,7 @@ import { MessageQueueJob } from 'src/engine/core-modules/message-queue/interface
import { MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { getJobKey } from 'src/engine/core-modules/message-queue/utils/get-job-key.util';
export type BullMQDriverOptions = QueueOptions;
@ -49,54 +50,72 @@ export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy {
handler: (job: MessageQueueJob<T>) => Promise<void>,
options?: MessageQueueWorkerOptions,
) {
const worker = new Worker(
const workerOptions = isDefined(options?.concurrency)
? {
...this.options,
concurrency: options.concurrency,
}
: this.options;
this.workerMap[queueName] = new Worker(
queueName,
async (job) => {
// TODO: Correctly support for job.id
await handler({ data: job.data, id: job.id ?? '', name: job.name });
},
omitBy(
{
...this.options,
concurrency: options?.concurrency,
},
(value) => value === undefined,
),
workerOptions,
);
this.workerMap[queueName] = worker;
}
async addCron<T>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueCronJobOptions,
): Promise<void> {
async addCron<T>({
queueName,
jobName,
data,
options,
jobId,
}: {
queueName: MessageQueue;
jobName: string;
data: T;
options: QueueCronJobOptions;
jobId?: string;
}): Promise<void> {
if (!this.queueMap[queueName]) {
throw new Error(
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
);
}
const queueOptions: JobsOptions = {
jobId: options?.id,
priority: options?.priority,
repeat: options?.repeat,
removeOnComplete: true,
removeOnFail: 100,
};
await this.queueMap[queueName].add(jobName, data, queueOptions);
await this.queueMap[queueName].upsertJobScheduler(
getJobKey({ jobName, jobId }),
options?.repeat,
{
name: jobName,
data,
opts: queueOptions,
},
);
}
async removeCron(
queueName: MessageQueue,
jobName: string,
pattern: string,
): Promise<void> {
await this.queueMap[queueName].removeRepeatable(jobName, {
pattern,
});
async removeCron({
queueName,
jobName,
jobId,
}: {
queueName: MessageQueue;
jobName: string;
jobId?: string;
}): Promise<void> {
await this.queueMap[queueName].removeJobScheduler(
getJobKey({ jobName, jobId }),
);
}
async add<T>(

View File

@ -5,7 +5,7 @@ export interface QueueJobOptions {
}
export interface QueueCronJobOptions extends QueueJobOptions {
repeat?: {
repeat: {
every?: number;
pattern?: string;
limit?: number;

View File

@ -19,12 +19,27 @@ export interface MessageQueueDriver {
handler: ({ data, id }: { data: T; id: string }) => Promise<void> | void,
options?: MessageQueueWorkerOptions,
);
addCron<T extends MessageQueueJobData | undefined>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueCronJobOptions,
);
removeCron(queueName: MessageQueue, jobName: string, pattern?: string);
addCron<T extends MessageQueueJobData | undefined>({
queueName,
jobName,
data,
options,
jobId,
}: {
queueName: MessageQueue;
jobName: string;
data: T;
options: QueueCronJobOptions;
jobId?: string;
});
removeCron({
queueName,
jobName,
jobId,
}: {
queueName: MessageQueue;
jobName: string;
jobId?: string;
});
register?(queueName: MessageQueue): void;
}

View File

@ -11,6 +11,7 @@ import { MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue
import { MessageQueueDriver } from 'src/engine/core-modules/message-queue/drivers/interfaces/message-queue-driver.interface';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { getJobKey } from 'src/engine/core-modules/message-queue/utils/get-job-key.util';
export type PgBossDriverOptions = PgBoss.ConstructorOptions;
@ -62,27 +63,40 @@ export class PgBossDriver
);
}
async addCron<T>(
queueName: MessageQueue,
jobName: string,
data: T,
options?: QueueCronJobOptions,
): Promise<void> {
async addCron<T>({
queueName,
jobName,
data,
options,
jobId,
}: {
queueName: MessageQueue;
jobName: string;
data: T;
options: QueueCronJobOptions;
jobId?: string;
}): Promise<void> {
const name = `${queueName}.${getJobKey({ jobName, jobId })}`;
await this.pgBoss.schedule(
`${queueName}.${jobName}`,
options?.repeat?.pattern ??
DEFAULT_PG_BOSS_CRON_PATTERN_WHEN_NOT_PROVIDED,
name,
options.repeat.pattern ?? DEFAULT_PG_BOSS_CRON_PATTERN_WHEN_NOT_PROVIDED,
data as object,
options
? {
singletonKey: options?.id,
}
: {},
);
}
async removeCron(queueName: MessageQueue, jobName: string): Promise<void> {
await this.pgBoss.unschedule(`${queueName}.${jobName}`);
async removeCron({
queueName,
jobName,
jobId,
}: {
queueName: MessageQueue;
jobName: string;
jobId?: string;
}): Promise<void> {
const name = `${queueName}.${getJobKey({ jobName, jobId })}`;
await this.pgBoss.unschedule(name);
}
async add<T>(

View File

@ -24,11 +24,15 @@ export class SyncDriver implements MessageQueueDriver {
await this.processJob(queueName, { id: '', name: jobName, data });
}
async addCron<T extends MessageQueueJobData | undefined>(
queueName: MessageQueue,
jobName: string,
data: T,
): Promise<void> {
async addCron<T extends MessageQueueJobData | undefined>({
queueName,
jobName,
data,
}: {
queueName: MessageQueue;
jobName: string;
data: T;
}): Promise<void> {
this.logger.log(`Running cron job with SyncDriver`);
await this.processJob(queueName, {
id: '',
@ -38,7 +42,7 @@ export class SyncDriver implements MessageQueueDriver {
});
}
async removeCron(queueName: MessageQueue) {
async removeCron({ queueName }: { queueName: MessageQueue }) {
this.logger.log(`Removing '${queueName}' cron job with SyncDriver`);
}

View File

@ -35,16 +35,38 @@ export class MessageQueueService {
return this.driver.add(this.queueName, jobName, data, options);
}
addCron<T extends MessageQueueJobData | undefined>(
jobName: string,
data: T,
options?: QueueCronJobOptions,
): Promise<void> {
return this.driver.addCron(this.queueName, jobName, data, options);
addCron<T extends MessageQueueJobData | undefined>({
jobName,
data,
options,
jobId,
}: {
jobName: string;
data: T;
options: QueueCronJobOptions;
jobId?: string;
}): Promise<void> {
return this.driver.addCron({
queueName: this.queueName,
jobName,
data,
options,
jobId,
});
}
removeCron(jobName: string, pattern: string): Promise<void> {
return this.driver.removeCron(this.queueName, jobName, pattern);
removeCron({
jobName,
jobId,
}: {
jobName: string;
jobId?: string;
}): Promise<void> {
return this.driver.removeCron({
queueName: this.queueName,
jobName,
jobId,
});
}
work<T extends MessageQueueJobData>(

View File

@ -0,0 +1,9 @@
export const getJobKey = ({
jobName,
jobId,
}: {
jobName: string;
jobId?: string;
}) => {
return `${jobName}${jobId ? `.${jobId}` : ''}`;
};