diff --git a/packages/twenty-front/src/modules/workflow/validation-schemas/workflowSchema.ts b/packages/twenty-front/src/modules/workflow/validation-schemas/workflowSchema.ts index 0b92edfe0..15460be41 100644 --- a/packages/twenty-front/src/modules/workflow/validation-schemas/workflowSchema.ts +++ b/packages/twenty-front/src/modules/workflow/validation-schemas/workflowSchema.ts @@ -309,6 +309,7 @@ export const workflowRunStatusSchema = z.enum([ 'RUNNING', 'COMPLETED', 'FAILED', + 'ENQUEUED', ]); export const workflowRunSchema = z diff --git a/packages/twenty-front/src/modules/workflow/workflow-diagram/utils/getWorkflowRunStatusTagProps.ts b/packages/twenty-front/src/modules/workflow/workflow-diagram/utils/getWorkflowRunStatusTagProps.ts index 938dd8ed5..9e1aeba1c 100644 --- a/packages/twenty-front/src/modules/workflow/workflow-diagram/utils/getWorkflowRunStatusTagProps.ts +++ b/packages/twenty-front/src/modules/workflow/workflow-diagram/utils/getWorkflowRunStatusTagProps.ts @@ -27,6 +27,13 @@ export const getWorkflowRunStatusTagProps = ({ }; } + if (workflowRunStatus === 'ENQUEUED') { + return { + color: 'blue', + text: 'Enqueued', + }; + } + return { color: 'red', text: 'Failed', diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/1-2/1-2-add-enqueued-status-to-workflow-run.command.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/1-2/1-2-add-enqueued-status-to-workflow-run.command.ts new file mode 100644 index 000000000..c7ff94853 --- /dev/null +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/1-2/1-2-add-enqueued-status-to-workflow-run.command.ts @@ -0,0 +1,112 @@ +import { InjectRepository } from '@nestjs/typeorm'; + +import { Command } from 'nest-commander'; +import { Repository } from 'typeorm'; + +import { + ActiveOrSuspendedWorkspacesMigrationCommandRunner, + RunOnWorkspaceArgs, +} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { FieldMetadataEntity } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { WORKFLOW_RUN_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids'; +import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; + +@Command({ + name: 'upgrade:1-2:add-enqueued-status-to-workflow-run', + description: 'Add enqueued status to workflow run', +}) +export class AddEnqueuedStatusToWorkflowRunCommand extends ActiveOrSuspendedWorkspacesMigrationCommandRunner { + constructor( + @InjectRepository(Workspace, 'core') + protected readonly workspaceRepository: Repository, + protected readonly twentyORMGlobalManager: TwentyORMGlobalManager, + @InjectRepository(FieldMetadataEntity, 'core') + private readonly fieldMetadataRepository: Repository, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + ) { + super(workspaceRepository, twentyORMGlobalManager); + } + + override async runOnWorkspace({ + workspaceId, + options, + }: RunOnWorkspaceArgs): Promise { + this.logger.log( + `Adding enqueued status to workflow run for workspace ${workspaceId}`, + ); + + const workflowRunStatusFieldMetadata = + await this.fieldMetadataRepository.findOne({ + where: { + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.status, + }, + }); + + if (!workflowRunStatusFieldMetadata) { + this.logger.error( + `Workflow run status field metadata not found for workspace ${workspaceId}`, + ); + + return; + } + + const workflowRunStatusFieldMetadataOptions = + workflowRunStatusFieldMetadata.options; + + // check if enqueued status is already in the field metadata options + if ( + workflowRunStatusFieldMetadataOptions.some( + (option) => option.value === WorkflowRunStatus.ENQUEUED, + ) + ) { + this.logger.log( + `Workflow run status field metadata options already contain enqueued status for workspace ${workspaceId}`, + ); + } else if (options.dryRun) { + this.logger.log( + `Would add enqueued status to workflow run status field metadata for workspace ${workspaceId}`, + ); + } else { + workflowRunStatusFieldMetadataOptions.push({ + value: WorkflowRunStatus.ENQUEUED, + label: 'Enqueued', + position: 4, + color: 'blue', + }); + + await this.fieldMetadataRepository.save(workflowRunStatusFieldMetadata); + + this.logger.log( + `Enqueued status added to workflow run status field metadata for workspace ${workspaceId}`, + ); + } + + const schemaName = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); + + if (options.dryRun) { + this.logger.log( + `Would try to add enqueued status to workflow run status enum for workspace ${workspaceId}`, + ); + } else { + try { + await mainDataSource.query( + `ALTER TYPE ${schemaName}."workflowRun_status_enum" ADD VALUE 'ENQUEUED'`, + ); + this.logger.log( + `Enqueued status added to workflow run status enum for workspace ${workspaceId}`, + ); + } catch (error) { + this.logger.error( + `Error adding enqueued status to workflow run status enum for workspace ${workspaceId}: ${error}`, + ); + } + } + } +} diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/1-2/1-2-upgrade-version-command.module.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/1-2/1-2-upgrade-version-command.module.ts new file mode 100644 index 000000000..76f2aee63 --- /dev/null +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/1-2/1-2-upgrade-version-command.module.ts @@ -0,0 +1,17 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; + +import { AddEnqueuedStatusToWorkflowRunCommand } from 'src/database/commands/upgrade-version-command/1-2/1-2-add-enqueued-status-to-workflow-run.command'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { FieldMetadataEntity } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity'; +import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; + +@Module({ + imports: [ + TypeOrmModule.forFeature([Workspace, FieldMetadataEntity], 'core'), + WorkspaceDataSourceModule, + ], + providers: [AddEnqueuedStatusToWorkflowRunCommand], + exports: [AddEnqueuedStatusToWorkflowRunCommand], +}) +export class V1_2_UpgradeVersionCommandModule {} diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade-version-command.module.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade-version-command.module.ts index b33066fd4..16175e72f 100644 --- a/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade-version-command.module.ts +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade-version-command.module.ts @@ -4,6 +4,7 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { V0_54_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-54/0-54-upgrade-version-command.module'; import { V0_55_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-55/0-55-upgrade-version-command.module'; import { V1_1_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/1-1/1-1-upgrade-version-command.module'; +import { V1_2_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/1-2/1-2-upgrade-version-command.module'; import { DatabaseMigrationService, UpgradeCommand, @@ -17,6 +18,7 @@ import { WorkspaceSyncMetadataModule } from 'src/engine/workspace-manager/worksp V0_54_UpgradeVersionCommandModule, V0_55_UpgradeVersionCommandModule, V1_1_UpgradeVersionCommandModule, + V1_2_UpgradeVersionCommandModule, WorkspaceSyncMetadataModule, ], providers: [DatabaseMigrationService, UpgradeCommand], diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts index 05c91f08c..d887ad51e 100644 --- a/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts @@ -23,6 +23,7 @@ import { MigrateDefaultAvatarUrlToUserWorkspaceCommand } from 'src/database/comm import { DeduplicateIndexedFieldsCommand } from 'src/database/commands/upgrade-version-command/0-55/0-55-deduplicate-indexed-fields.command'; import { FixSchemaArrayTypeCommand } from 'src/database/commands/upgrade-version-command/1-1/1-1-fix-schema-array-type.command'; import { FixUpdateStandardFieldsIsLabelSyncedWithName } from 'src/database/commands/upgrade-version-command/1-1/1-1-fix-update-standard-field-is-label-synced-with-name.command'; +import { AddEnqueuedStatusToWorkflowRunCommand } from 'src/database/commands/upgrade-version-command/1-2/1-2-add-enqueued-status-to-workflow-run.command'; import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; @@ -141,6 +142,9 @@ export class UpgradeCommand extends UpgradeCommandRunner { // 1.1 Commands protected readonly fixSchemaArrayTypeCommand: FixSchemaArrayTypeCommand, protected readonly fixUpdateStandardFieldsIsLabelSyncedWithNameCommand: FixUpdateStandardFieldsIsLabelSyncedWithName, + + // 1.2 Commands + protected readonly addEnqueuedStatusToWorkflowRunCommand: AddEnqueuedStatusToWorkflowRunCommand, ) { super( workspaceRepository, @@ -189,6 +193,11 @@ export class UpgradeCommand extends UpgradeCommandRunner { afterSyncMetadata: [], }; + const commands_120: VersionCommands = { + beforeSyncMetadata: [this.addEnqueuedStatusToWorkflowRunCommand], + afterSyncMetadata: [], + }; + this.allCommands = { '0.53.0': commands_053, '0.54.0': commands_054, @@ -196,6 +205,7 @@ export class UpgradeCommand extends UpgradeCommandRunner { '0.60.0': commands_060, '1.0.0': commands_100, '1.1.0': commands_110, + '1.2.0': commands_120, }; } diff --git a/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts b/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts index 00a87a08a..b135a2822 100644 --- a/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts +++ b/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts @@ -1,6 +1,7 @@ export enum CacheStorageNamespace { ModuleMessaging = 'module:messaging', ModuleCalendar = 'module:calendar', + ModuleWorkflow = 'module:workflow', EngineWorkspace = 'engine:workspace', EngineHealth = 'engine:health', } diff --git a/packages/twenty-server/src/engine/core-modules/metrics/types/metrics-keys.type.ts b/packages/twenty-server/src/engine/core-modules/metrics/types/metrics-keys.type.ts index 179742b04..0e9d18fac 100644 --- a/packages/twenty-server/src/engine/core-modules/metrics/types/metrics-keys.type.ts +++ b/packages/twenty-server/src/engine/core-modules/metrics/types/metrics-keys.type.ts @@ -20,4 +20,5 @@ export enum MetricsKeys { WorkflowRunCompleted = 'workflow-run/completed', WorkflowRunFailed = 'workflow-run/failed', WorkflowRunFailedThrottled = 'workflow-run/failed/throttled', + WorkflowRunFailedToEnqueue = 'workflow-run/failed/to-enqueue', } diff --git a/packages/twenty-server/src/engine/core-modules/throttler/throttler.exception.ts b/packages/twenty-server/src/engine/core-modules/throttler/throttler.exception.ts index 08cec767b..ea94c5d2e 100644 --- a/packages/twenty-server/src/engine/core-modules/throttler/throttler.exception.ts +++ b/packages/twenty-server/src/engine/core-modules/throttler/throttler.exception.ts @@ -7,5 +7,5 @@ export class ThrottlerException extends CustomException { } export enum ThrottlerExceptionCode { - TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS', + LIMIT_REACHED = 'LIMIT_REACHED', } diff --git a/packages/twenty-server/src/engine/core-modules/throttler/throttler.service.ts b/packages/twenty-server/src/engine/core-modules/throttler/throttler.service.ts index 925ffc848..8e698fa86 100644 --- a/packages/twenty-server/src/engine/core-modules/throttler/throttler.service.ts +++ b/packages/twenty-server/src/engine/core-modules/throttler/throttler.service.ts @@ -20,8 +20,8 @@ export class ThrottlerService { if (currentCount >= limit) { throw new ThrottlerException( - 'Too many requests', - ThrottlerExceptionCode.TOO_MANY_REQUESTS, + 'Limit reached', + ThrottlerExceptionCode.LIMIT_REACHED, ); } diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 321239ab9..60372b8e3 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -30,6 +30,7 @@ export enum WorkflowRunStatus { RUNNING = 'RUNNING', COMPLETED = 'COMPLETED', FAILED = 'FAILED', + ENQUEUED = 'ENQUEUED', } export type StepOutput = { @@ -117,6 +118,12 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { position: 3, color: 'red', }, + { + value: WorkflowRunStatus.ENQUEUED, + label: 'Enqueued', + position: 4, + color: 'blue', + }, ], defaultValue: "'NOT_STARTED'", }) diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index 8ae8edb3f..e5115e21e 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -15,14 +15,14 @@ import { WorkflowRunException, WorkflowRunExceptionCode, } from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception'; -import { getRootSteps } from 'src/modules/workflow/workflow-runner/utils/getRootSteps.utils'; +import { getRootSteps } from 'src/modules/workflow/workflow-runner/utils/get-root-steps.utils'; +import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; import { WorkflowTriggerType } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; export type RunWorkflowJobData = { workspaceId: string; workflowRunId: string; - payload?: object; lastExecutedStepId?: string; }; @@ -35,12 +35,12 @@ export class RunWorkflowJob { private readonly throttlerService: ThrottlerService, private readonly twentyConfigService: TwentyConfigService, private readonly metricsService: MetricsService, + private readonly workflowRunQueueWorkspaceService: WorkflowRunQueueWorkspaceService, ) {} @Process(RunWorkflowJob.name) async handle({ workflowRunId, - payload, lastExecutedStepId, workspaceId, }: RunWorkflowJobData): Promise { @@ -55,7 +55,6 @@ export class RunWorkflowJob { await this.startWorkflowExecution({ workflowRunId, workspaceId, - payload: payload ?? {}, }); } } catch (error) { @@ -65,22 +64,20 @@ export class RunWorkflowJob { status: WorkflowRunStatus.FAILED, error: error.message, }); + } finally { + await this.workflowRunQueueWorkspaceService.decreaseWorkflowRunQueuedCount( + workspaceId, + ); } } private async startWorkflowExecution({ workflowRunId, workspaceId, - payload, }: { workflowRunId: string; workspaceId: string; - payload: object; }): Promise { - const context = { - trigger: payload, - }; - const workflowRun = await this.workflowRunWorkspaceService.getWorkflowRunOrFail({ workflowRunId, @@ -105,10 +102,11 @@ export class RunWorkflowJob { triggerType: workflowVersion.trigger.type, }); + const triggerPayload = workflowRun.context?.trigger ?? {}; + await this.workflowRunWorkspaceService.startWorkflowRun({ workflowRunId, workspaceId, - context, output: { flow: { trigger: workflowVersion.trigger, @@ -116,7 +114,7 @@ export class RunWorkflowJob { }, stepsOutput: { trigger: { - result: payload, + result: triggerPayload, }, }, }, @@ -130,7 +128,9 @@ export class RunWorkflowJob { workflowRunId, currentStepId: rootSteps[0].id, steps: workflowVersion.steps, - context, + context: workflowRun.context ?? { + trigger: triggerPayload, + }, workspaceId, }); } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/utils/__tests__/getRootSteps.utils.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/utils/__tests__/get-root-steps.utils.spec.ts similarity index 98% rename from packages/twenty-server/src/modules/workflow/workflow-runner/utils/__tests__/getRootSteps.utils.spec.ts rename to packages/twenty-server/src/modules/workflow/workflow-runner/utils/__tests__/get-root-steps.utils.spec.ts index b08c1b7de..abc80114d 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/utils/__tests__/getRootSteps.utils.spec.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/utils/__tests__/get-root-steps.utils.spec.ts @@ -1,5 +1,5 @@ -import { getRootSteps } from 'src/modules/workflow/workflow-runner/utils/getRootSteps.utils'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { getRootSteps } from 'src/modules/workflow/workflow-runner/utils/get-root-steps.utils'; describe('getRootSteps', () => { it('should return the root steps', () => { diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/utils/getRootSteps.utils.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/utils/get-root-steps.utils.ts similarity index 100% rename from packages/twenty-server/src/modules/workflow/workflow-runner/utils/getRootSteps.utils.ts rename to packages/twenty-server/src/modules/workflow/workflow-runner/utils/get-root-steps.utils.ts diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-workflow-run-enqueue.cron.command.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-workflow-run-enqueue.cron.command.ts new file mode 100644 index 000000000..0a3fcd144 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-workflow-run-enqueue.cron.command.ts @@ -0,0 +1,34 @@ +import { Command, CommandRunner } from 'nest-commander'; + +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { + WORKFLOW_RUN_ENQUEUE_CRON_PATTERN, + WorkflowRunEnqueueJob, +} from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job'; + +@Command({ + name: 'cron:workflow:enqueue-awaiting-workflow-run', + description: 'Enqueues awaiting workflow runs', +}) +export class CronWorkflowRunEnqueueCommand extends CommandRunner { + constructor( + @InjectMessageQueue(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron({ + jobName: WorkflowRunEnqueueJob.name, + data: undefined, + options: { + repeat: { + pattern: WORKFLOW_RUN_ENQUEUE_CRON_PATTERN, + }, + }, + }); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job.ts new file mode 100644 index 000000000..3577d8dbb --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job.ts @@ -0,0 +1,139 @@ +import { Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; +import { Repository } from 'typeorm'; + +import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service'; +import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { + WorkflowRunStatus, + WorkflowRunWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { + RunWorkflowJob, + RunWorkflowJobData, +} from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; +import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service'; + +export const WORKFLOW_RUN_ENQUEUE_CRON_PATTERN = '* * * * *'; + +@Processor(MessageQueue.cronQueue) +export class WorkflowRunEnqueueJob { + private readonly logger = new Logger(WorkflowRunEnqueueJob.name); + + constructor( + @InjectRepository(Workspace, 'core') + private readonly workspaceRepository: Repository, + private readonly workflowRunQueueWorkspaceService: WorkflowRunQueueWorkspaceService, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + @InjectMessageQueue(MessageQueue.workflowQueue) + private readonly messageQueueService: MessageQueueService, + private readonly metricsService: MetricsService, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + ) {} + + @Process(WorkflowRunEnqueueJob.name) + @SentryCronMonitor( + WorkflowRunEnqueueJob.name, + WORKFLOW_RUN_ENQUEUE_CRON_PATTERN, + ) + async handle() { + const activeWorkspaces = await this.workspaceRepository.find({ + where: { + activationStatus: WorkspaceActivationStatus.ACTIVE, + }, + }); + + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); + + for (const activeWorkspace of activeWorkspaces) { + let enqueuedWorkflowRunCount = 0; + + try { + const remainingWorkflowRunCount = + await this.workflowRunQueueWorkspaceService.getRemainingRunsToEnqueueCount( + activeWorkspace.id, + ); + + if (remainingWorkflowRunCount <= 0) { + continue; + } + + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); + + // Using raw query to avoid storing repository in cache + const workflowRuns = await mainDataSource.query( + `SELECT * FROM ${schemaName}."workflowRun" WHERE status = '${WorkflowRunStatus.NOT_STARTED}' ORDER BY "createdAt" ASC`, + ); + + const workflowRunsToEnqueueCount = Math.min( + remainingWorkflowRunCount, + workflowRuns.length, + ); + + if (workflowRunsToEnqueueCount <= 0) { + continue; + } + + const workflowRunRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + activeWorkspace.id, + WorkflowRunWorkspaceEntity, + { shouldBypassPermissionChecks: true }, + ); + + for ( + let runIndex = 0; + runIndex < workflowRunsToEnqueueCount; + runIndex++ + ) { + const workflowRunId = workflowRuns[runIndex].id; + + await this.messageQueueService.add( + RunWorkflowJob.name, + { + workflowRunId, + workspaceId: activeWorkspace.id, + }, + ); + + await workflowRunRepository.update(workflowRunId, { + status: WorkflowRunStatus.ENQUEUED, + }); + + enqueuedWorkflowRunCount++; + } + } catch (error) { + this.logger.error( + `Error enqueuing workflow runs for workspace ${activeWorkspace.id}`, + error, + ); + + this.metricsService.incrementCounter({ + key: MetricsKeys.WorkflowRunFailedToEnqueue, + eventId: activeWorkspace.id, + }); + } finally { + if (enqueuedWorkflowRunCount > 0) { + await this.workflowRunQueueWorkspaceService.increaseWorkflowRunQueuedCount( + activeWorkspace.id, + enqueuedWorkflowRunCount, + ); + } + } + } + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/utils/get-cache-workflow-run-count-key.util.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/utils/get-cache-workflow-run-count-key.util.ts new file mode 100644 index 000000000..8dec189c8 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/utils/get-cache-workflow-run-count-key.util.ts @@ -0,0 +1,3 @@ +export const getWorkflowRunQueuedCountCacheKey = ( + workspaceId: string, +): string => `workflow-run-queued-count:${workspaceId}`; diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workflow-run-queue.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workflow-run-queue.module.ts new file mode 100644 index 000000000..0191f420b --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workflow-run-queue.module.ts @@ -0,0 +1,28 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; + +import { CacheStorageModule } from 'src/engine/core-modules/cache-storage/cache-storage.module'; +import { MessageQueueModule } from 'src/engine/core-modules/message-queue/message-queue.module'; +import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; +import { CronWorkflowRunEnqueueCommand } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-workflow-run-enqueue.cron.command'; +import { WorkflowRunEnqueueJob } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job'; +import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service'; + +@Module({ + imports: [ + CacheStorageModule, + TypeOrmModule.forFeature([Workspace], 'core'), + MessageQueueModule, + WorkspaceDataSourceModule, + MetricsModule, + ], + providers: [ + WorkflowRunQueueWorkspaceService, + WorkflowRunEnqueueJob, + CronWorkflowRunEnqueueCommand, + ], + exports: [WorkflowRunQueueWorkspaceService], +}) +export class WorkflowRunQueueModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service.ts new file mode 100644 index 000000000..d3e358bd3 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service.ts @@ -0,0 +1,59 @@ +import { Injectable } from '@nestjs/common'; + +import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; +import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; +import { getWorkflowRunQueuedCountCacheKey } from 'src/modules/workflow/workflow-runner/workflow-run-queue/utils/get-cache-workflow-run-count-key.util'; + +@Injectable() +export class WorkflowRunQueueWorkspaceService { + private readonly WORKFLOW_RUN_QUEUE_THROTTLE_LIMIT = 100; + + constructor( + @InjectCacheStorage(CacheStorageNamespace.ModuleWorkflow) + private readonly cacheStorage: CacheStorageService, + ) {} + + async increaseWorkflowRunQueuedCount( + workspaceId: string, + newlyEnqueuedCount = 1, + ): Promise { + const currentCount = + await this.getCurrentWorkflowRunQueuedCount(workspaceId); + + await this.cacheStorage.set( + getWorkflowRunQueuedCountCacheKey(workspaceId), + currentCount + newlyEnqueuedCount, + ); + } + + async decreaseWorkflowRunQueuedCount( + workspaceId: string, + removedFromQueueCount = 1, + ): Promise { + const currentCount = + await this.getCurrentWorkflowRunQueuedCount(workspaceId); + + await this.cacheStorage.set( + getWorkflowRunQueuedCountCacheKey(workspaceId), + currentCount - removedFromQueueCount, + ); + } + + async getRemainingRunsToEnqueueCount(workspaceId: string): Promise { + const currentCount = + await this.getCurrentWorkflowRunQueuedCount(workspaceId); + + return this.WORKFLOW_RUN_QUEUE_THROTTLE_LIMIT - currentCount; + } + + private async getCurrentWorkflowRunQueuedCount( + workspaceId: string, + ): Promise { + const key = getWorkflowRunQueuedCountCacheKey(workspaceId); + + const currentCount = (await this.cacheStorage.get(key)) ?? 0; + + return Math.max(0, currentCount); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 5e3ed7df0..3f2ae8d52 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -44,10 +44,15 @@ export class WorkflowRunWorkspaceService { workflowVersionId, createdBy, workflowRunId, + context, + status, }: { workflowVersionId: string; createdBy: ActorMetadata; workflowRunId?: string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + context: Record; + status: WorkflowRunStatus.NOT_STARTED | WorkflowRunStatus.ENQUEUED; }) { const workspaceId = this.scopedWorkspaceContextFactory.create()?.workspaceId; @@ -113,8 +118,9 @@ export class WorkflowRunWorkspaceService { workflowVersionId, createdBy, workflowId: workflow.id, - status: WorkflowRunStatus.NOT_STARTED, + status, position, + context, }); await workflowRunRepository.insert(workflowRun); @@ -125,13 +131,10 @@ export class WorkflowRunWorkspaceService { async startWorkflowRun({ workflowRunId, workspaceId, - context, output, }: { workflowRunId: string; workspaceId: string; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - context: Record; output: WorkflowRunOutput; }) { const workflowRunRepository = @@ -152,7 +155,10 @@ export class WorkflowRunWorkspaceService { ); } - if (workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED) { + if ( + workflowRunToUpdate.status !== WorkflowRunStatus.ENQUEUED && + workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED + ) { throw new WorkflowRunException( 'Workflow run already started', WorkflowRunExceptionCode.INVALID_OPERATION, @@ -162,7 +168,6 @@ export class WorkflowRunWorkspaceService { const partialUpdate = { status: WorkflowRunStatus.RUNNING, startedAt: new Date().toISOString(), - context, output, }; @@ -170,7 +175,7 @@ export class WorkflowRunWorkspaceService { await this.emitWorkflowRunUpdatedEvent({ workflowRunBefore: workflowRunToUpdate, - updatedFields: ['status', 'startedAt', 'context', 'output'], + updatedFields: ['status', 'startedAt', 'output'], }); } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts index 9238ac18b..5803fed61 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts @@ -6,6 +6,7 @@ import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.mod import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module'; import { RunWorkflowJob } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; +import { WorkflowRunQueueModule } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workflow-run-queue.module'; import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.module'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; @@ -17,6 +18,7 @@ import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-ru BillingModule, WorkflowRunModule, MetricsModule, + WorkflowRunQueueModule, ], providers: [WorkflowRunnerWorkspaceService, RunWorkflowJob], exports: [WorkflowRunnerWorkspaceService], diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts index 3e5cd991b..57c0b6e69 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts @@ -5,17 +5,23 @@ import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decora import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; +import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; import { RunWorkflowJob, RunWorkflowJobData, } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; +import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; +import { WorkflowTriggerType } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; @Injectable() export class WorkflowRunnerWorkspaceService { private readonly logger = new Logger(WorkflowRunnerWorkspaceService.name); constructor( private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, + private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService, + private readonly workflowRunQueueWorkspaceService: WorkflowRunQueueWorkspaceService, @InjectMessageQueue(MessageQueue.workflowQueue) private readonly messageQueueService: MessageQueueService, private readonly billingUsageService: BillingUsageService, @@ -42,21 +48,41 @@ export class WorkflowRunnerWorkspaceService { 'Cannot execute billed function, there is no subscription for this workspace', ); } + + const workflowVersion = + await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail({ + workspaceId, + workflowVersionId, + }); + + const remainingRunsToEnqueueCount = + await this.workflowRunQueueWorkspaceService.getRemainingRunsToEnqueueCount( + workspaceId, + ); + + const isQueueLimitReached = remainingRunsToEnqueueCount <= 0; + + const isManualTrigger = + workflowVersion.trigger?.type === WorkflowTriggerType.MANUAL; + + const shouldEnqueueWorkflowRun = isManualTrigger || !isQueueLimitReached; + const workflowRunId = await this.workflowRunWorkspaceService.createWorkflowRun({ workflowVersionId, workflowRunId: initialWorkflowRunId, createdBy: source, + status: shouldEnqueueWorkflowRun + ? WorkflowRunStatus.ENQUEUED + : WorkflowRunStatus.NOT_STARTED, + context: { + trigger: payload, + }, }); - await this.messageQueueService.add( - RunWorkflowJob.name, - { - workspaceId, - payload: payload, - workflowRunId, - }, - ); + if (shouldEnqueueWorkflowRun) { + await this.enqueueWorkflowRun(workspaceId, workflowRunId); + } return { workflowRunId }; } @@ -70,6 +96,18 @@ export class WorkflowRunnerWorkspaceService { workflowRunId: string; lastExecutedStepId: string; }) { + await this.enqueueWorkflowRun( + workspaceId, + workflowRunId, + lastExecutedStepId, + ); + } + + private async enqueueWorkflowRun( + workspaceId: string, + workflowRunId: string, + lastExecutedStepId?: string, + ) { await this.messageQueueService.add( RunWorkflowJob.name, { @@ -78,5 +116,8 @@ export class WorkflowRunnerWorkspaceService { lastExecutedStepId, }, ); + await this.workflowRunQueueWorkspaceService.increaseWorkflowRunQueuedCount( + workspaceId, + ); } }