Add a limit to workflow queue per workspace (#12908)

- new status `ENQUEUED` added. With a command to backfill
- counter in cache per workspace, managed by a new service
[workflow-run-queue.workspace-service.ts](https://github.com/twentyhq/twenty/compare/tt-improve-workflow-run-queueing?expand=1#diff-1e2de2a48cd482a3bd7e8dedf1150a19d0b200afbd9282181a24ecddddb56927)
- cron added that will run every minute to look for not started
workflows

Here is the new flow:
- When executing a workflow, we check if the queue is not full. If not,
run is created as `ENQUEUED` and the run workflow job is triggered as
usual. If full, create the run as NOT_STARTED and do not trigger the job
- Cron will look for NOT_STARTED workflows and queue some if there is
some place again
- Only MANUAL and Form submit skip the queue limit
This commit is contained in:
Thomas Trompette
2025-06-30 14:27:57 +02:00
committed by GitHub
parent fcb869fdd9
commit 8272e5dfd0
22 changed files with 501 additions and 32 deletions

View File

@ -309,6 +309,7 @@ export const workflowRunStatusSchema = z.enum([
'RUNNING', 'RUNNING',
'COMPLETED', 'COMPLETED',
'FAILED', 'FAILED',
'ENQUEUED',
]); ]);
export const workflowRunSchema = z export const workflowRunSchema = z

View File

@ -27,6 +27,13 @@ export const getWorkflowRunStatusTagProps = ({
}; };
} }
if (workflowRunStatus === 'ENQUEUED') {
return {
color: 'blue',
text: 'Enqueued',
};
}
return { return {
color: 'red', color: 'red',
text: 'Failed', text: 'Failed',

View File

@ -0,0 +1,112 @@
import { InjectRepository } from '@nestjs/typeorm';
import { Command } from 'nest-commander';
import { Repository } from 'typeorm';
import {
ActiveOrSuspendedWorkspacesMigrationCommandRunner,
RunOnWorkspaceArgs,
} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { FieldMetadataEntity } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { WORKFLOW_RUN_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
@Command({
name: 'upgrade:1-2:add-enqueued-status-to-workflow-run',
description: 'Add enqueued status to workflow run',
})
export class AddEnqueuedStatusToWorkflowRunCommand extends ActiveOrSuspendedWorkspacesMigrationCommandRunner {
constructor(
@InjectRepository(Workspace, 'core')
protected readonly workspaceRepository: Repository<Workspace>,
protected readonly twentyORMGlobalManager: TwentyORMGlobalManager,
@InjectRepository(FieldMetadataEntity, 'core')
private readonly fieldMetadataRepository: Repository<FieldMetadataEntity>,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {
super(workspaceRepository, twentyORMGlobalManager);
}
override async runOnWorkspace({
workspaceId,
options,
}: RunOnWorkspaceArgs): Promise<void> {
this.logger.log(
`Adding enqueued status to workflow run for workspace ${workspaceId}`,
);
const workflowRunStatusFieldMetadata =
await this.fieldMetadataRepository.findOne({
where: {
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.status,
},
});
if (!workflowRunStatusFieldMetadata) {
this.logger.error(
`Workflow run status field metadata not found for workspace ${workspaceId}`,
);
return;
}
const workflowRunStatusFieldMetadataOptions =
workflowRunStatusFieldMetadata.options;
// check if enqueued status is already in the field metadata options
if (
workflowRunStatusFieldMetadataOptions.some(
(option) => option.value === WorkflowRunStatus.ENQUEUED,
)
) {
this.logger.log(
`Workflow run status field metadata options already contain enqueued status for workspace ${workspaceId}`,
);
} else if (options.dryRun) {
this.logger.log(
`Would add enqueued status to workflow run status field metadata for workspace ${workspaceId}`,
);
} else {
workflowRunStatusFieldMetadataOptions.push({
value: WorkflowRunStatus.ENQUEUED,
label: 'Enqueued',
position: 4,
color: 'blue',
});
await this.fieldMetadataRepository.save(workflowRunStatusFieldMetadata);
this.logger.log(
`Enqueued status added to workflow run status field metadata for workspace ${workspaceId}`,
);
}
const schemaName =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const mainDataSource =
await this.workspaceDataSourceService.connectToMainDataSource();
if (options.dryRun) {
this.logger.log(
`Would try to add enqueued status to workflow run status enum for workspace ${workspaceId}`,
);
} else {
try {
await mainDataSource.query(
`ALTER TYPE ${schemaName}."workflowRun_status_enum" ADD VALUE 'ENQUEUED'`,
);
this.logger.log(
`Enqueued status added to workflow run status enum for workspace ${workspaceId}`,
);
} catch (error) {
this.logger.error(
`Error adding enqueued status to workflow run status enum for workspace ${workspaceId}: ${error}`,
);
}
}
}
}

View File

@ -0,0 +1,17 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { AddEnqueuedStatusToWorkflowRunCommand } from 'src/database/commands/upgrade-version-command/1-2/1-2-add-enqueued-status-to-workflow-run.command';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { FieldMetadataEntity } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [
TypeOrmModule.forFeature([Workspace, FieldMetadataEntity], 'core'),
WorkspaceDataSourceModule,
],
providers: [AddEnqueuedStatusToWorkflowRunCommand],
exports: [AddEnqueuedStatusToWorkflowRunCommand],
})
export class V1_2_UpgradeVersionCommandModule {}

View File

@ -4,6 +4,7 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { V0_54_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-54/0-54-upgrade-version-command.module'; import { V0_54_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-54/0-54-upgrade-version-command.module';
import { V0_55_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-55/0-55-upgrade-version-command.module'; import { V0_55_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/0-55/0-55-upgrade-version-command.module';
import { V1_1_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/1-1/1-1-upgrade-version-command.module'; import { V1_1_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/1-1/1-1-upgrade-version-command.module';
import { V1_2_UpgradeVersionCommandModule } from 'src/database/commands/upgrade-version-command/1-2/1-2-upgrade-version-command.module';
import { import {
DatabaseMigrationService, DatabaseMigrationService,
UpgradeCommand, UpgradeCommand,
@ -17,6 +18,7 @@ import { WorkspaceSyncMetadataModule } from 'src/engine/workspace-manager/worksp
V0_54_UpgradeVersionCommandModule, V0_54_UpgradeVersionCommandModule,
V0_55_UpgradeVersionCommandModule, V0_55_UpgradeVersionCommandModule,
V1_1_UpgradeVersionCommandModule, V1_1_UpgradeVersionCommandModule,
V1_2_UpgradeVersionCommandModule,
WorkspaceSyncMetadataModule, WorkspaceSyncMetadataModule,
], ],
providers: [DatabaseMigrationService, UpgradeCommand], providers: [DatabaseMigrationService, UpgradeCommand],

View File

@ -23,6 +23,7 @@ import { MigrateDefaultAvatarUrlToUserWorkspaceCommand } from 'src/database/comm
import { DeduplicateIndexedFieldsCommand } from 'src/database/commands/upgrade-version-command/0-55/0-55-deduplicate-indexed-fields.command'; import { DeduplicateIndexedFieldsCommand } from 'src/database/commands/upgrade-version-command/0-55/0-55-deduplicate-indexed-fields.command';
import { FixSchemaArrayTypeCommand } from 'src/database/commands/upgrade-version-command/1-1/1-1-fix-schema-array-type.command'; import { FixSchemaArrayTypeCommand } from 'src/database/commands/upgrade-version-command/1-1/1-1-fix-schema-array-type.command';
import { FixUpdateStandardFieldsIsLabelSyncedWithName } from 'src/database/commands/upgrade-version-command/1-1/1-1-fix-update-standard-field-is-label-synced-with-name.command'; import { FixUpdateStandardFieldsIsLabelSyncedWithName } from 'src/database/commands/upgrade-version-command/1-1/1-1-fix-update-standard-field-is-label-synced-with-name.command';
import { AddEnqueuedStatusToWorkflowRunCommand } from 'src/database/commands/upgrade-version-command/1-2/1-2-add-enqueued-status-to-workflow-run.command';
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
@ -141,6 +142,9 @@ export class UpgradeCommand extends UpgradeCommandRunner {
// 1.1 Commands // 1.1 Commands
protected readonly fixSchemaArrayTypeCommand: FixSchemaArrayTypeCommand, protected readonly fixSchemaArrayTypeCommand: FixSchemaArrayTypeCommand,
protected readonly fixUpdateStandardFieldsIsLabelSyncedWithNameCommand: FixUpdateStandardFieldsIsLabelSyncedWithName, protected readonly fixUpdateStandardFieldsIsLabelSyncedWithNameCommand: FixUpdateStandardFieldsIsLabelSyncedWithName,
// 1.2 Commands
protected readonly addEnqueuedStatusToWorkflowRunCommand: AddEnqueuedStatusToWorkflowRunCommand,
) { ) {
super( super(
workspaceRepository, workspaceRepository,
@ -189,6 +193,11 @@ export class UpgradeCommand extends UpgradeCommandRunner {
afterSyncMetadata: [], afterSyncMetadata: [],
}; };
const commands_120: VersionCommands = {
beforeSyncMetadata: [this.addEnqueuedStatusToWorkflowRunCommand],
afterSyncMetadata: [],
};
this.allCommands = { this.allCommands = {
'0.53.0': commands_053, '0.53.0': commands_053,
'0.54.0': commands_054, '0.54.0': commands_054,
@ -196,6 +205,7 @@ export class UpgradeCommand extends UpgradeCommandRunner {
'0.60.0': commands_060, '0.60.0': commands_060,
'1.0.0': commands_100, '1.0.0': commands_100,
'1.1.0': commands_110, '1.1.0': commands_110,
'1.2.0': commands_120,
}; };
} }

View File

@ -1,6 +1,7 @@
export enum CacheStorageNamespace { export enum CacheStorageNamespace {
ModuleMessaging = 'module:messaging', ModuleMessaging = 'module:messaging',
ModuleCalendar = 'module:calendar', ModuleCalendar = 'module:calendar',
ModuleWorkflow = 'module:workflow',
EngineWorkspace = 'engine:workspace', EngineWorkspace = 'engine:workspace',
EngineHealth = 'engine:health', EngineHealth = 'engine:health',
} }

View File

@ -20,4 +20,5 @@ export enum MetricsKeys {
WorkflowRunCompleted = 'workflow-run/completed', WorkflowRunCompleted = 'workflow-run/completed',
WorkflowRunFailed = 'workflow-run/failed', WorkflowRunFailed = 'workflow-run/failed',
WorkflowRunFailedThrottled = 'workflow-run/failed/throttled', WorkflowRunFailedThrottled = 'workflow-run/failed/throttled',
WorkflowRunFailedToEnqueue = 'workflow-run/failed/to-enqueue',
} }

View File

@ -7,5 +7,5 @@ export class ThrottlerException extends CustomException {
} }
export enum ThrottlerExceptionCode { export enum ThrottlerExceptionCode {
TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS', LIMIT_REACHED = 'LIMIT_REACHED',
} }

View File

@ -20,8 +20,8 @@ export class ThrottlerService {
if (currentCount >= limit) { if (currentCount >= limit) {
throw new ThrottlerException( throw new ThrottlerException(
'Too many requests', 'Limit reached',
ThrottlerExceptionCode.TOO_MANY_REQUESTS, ThrottlerExceptionCode.LIMIT_REACHED,
); );
} }

View File

@ -30,6 +30,7 @@ export enum WorkflowRunStatus {
RUNNING = 'RUNNING', RUNNING = 'RUNNING',
COMPLETED = 'COMPLETED', COMPLETED = 'COMPLETED',
FAILED = 'FAILED', FAILED = 'FAILED',
ENQUEUED = 'ENQUEUED',
} }
export type StepOutput = { export type StepOutput = {
@ -117,6 +118,12 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity {
position: 3, position: 3,
color: 'red', color: 'red',
}, },
{
value: WorkflowRunStatus.ENQUEUED,
label: 'Enqueued',
position: 4,
color: 'blue',
},
], ],
defaultValue: "'NOT_STARTED'", defaultValue: "'NOT_STARTED'",
}) })

View File

@ -15,14 +15,14 @@ import {
WorkflowRunException, WorkflowRunException,
WorkflowRunExceptionCode, WorkflowRunExceptionCode,
} from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception'; } from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception';
import { getRootSteps } from 'src/modules/workflow/workflow-runner/utils/getRootSteps.utils'; import { getRootSteps } from 'src/modules/workflow/workflow-runner/utils/get-root-steps.utils';
import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
import { WorkflowTriggerType } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; import { WorkflowTriggerType } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type';
export type RunWorkflowJobData = { export type RunWorkflowJobData = {
workspaceId: string; workspaceId: string;
workflowRunId: string; workflowRunId: string;
payload?: object;
lastExecutedStepId?: string; lastExecutedStepId?: string;
}; };
@ -35,12 +35,12 @@ export class RunWorkflowJob {
private readonly throttlerService: ThrottlerService, private readonly throttlerService: ThrottlerService,
private readonly twentyConfigService: TwentyConfigService, private readonly twentyConfigService: TwentyConfigService,
private readonly metricsService: MetricsService, private readonly metricsService: MetricsService,
private readonly workflowRunQueueWorkspaceService: WorkflowRunQueueWorkspaceService,
) {} ) {}
@Process(RunWorkflowJob.name) @Process(RunWorkflowJob.name)
async handle({ async handle({
workflowRunId, workflowRunId,
payload,
lastExecutedStepId, lastExecutedStepId,
workspaceId, workspaceId,
}: RunWorkflowJobData): Promise<void> { }: RunWorkflowJobData): Promise<void> {
@ -55,7 +55,6 @@ export class RunWorkflowJob {
await this.startWorkflowExecution({ await this.startWorkflowExecution({
workflowRunId, workflowRunId,
workspaceId, workspaceId,
payload: payload ?? {},
}); });
} }
} catch (error) { } catch (error) {
@ -65,22 +64,20 @@ export class RunWorkflowJob {
status: WorkflowRunStatus.FAILED, status: WorkflowRunStatus.FAILED,
error: error.message, error: error.message,
}); });
} finally {
await this.workflowRunQueueWorkspaceService.decreaseWorkflowRunQueuedCount(
workspaceId,
);
} }
} }
private async startWorkflowExecution({ private async startWorkflowExecution({
workflowRunId, workflowRunId,
workspaceId, workspaceId,
payload,
}: { }: {
workflowRunId: string; workflowRunId: string;
workspaceId: string; workspaceId: string;
payload: object;
}): Promise<void> { }): Promise<void> {
const context = {
trigger: payload,
};
const workflowRun = const workflowRun =
await this.workflowRunWorkspaceService.getWorkflowRunOrFail({ await this.workflowRunWorkspaceService.getWorkflowRunOrFail({
workflowRunId, workflowRunId,
@ -105,10 +102,11 @@ export class RunWorkflowJob {
triggerType: workflowVersion.trigger.type, triggerType: workflowVersion.trigger.type,
}); });
const triggerPayload = workflowRun.context?.trigger ?? {};
await this.workflowRunWorkspaceService.startWorkflowRun({ await this.workflowRunWorkspaceService.startWorkflowRun({
workflowRunId, workflowRunId,
workspaceId, workspaceId,
context,
output: { output: {
flow: { flow: {
trigger: workflowVersion.trigger, trigger: workflowVersion.trigger,
@ -116,7 +114,7 @@ export class RunWorkflowJob {
}, },
stepsOutput: { stepsOutput: {
trigger: { trigger: {
result: payload, result: triggerPayload,
}, },
}, },
}, },
@ -130,7 +128,9 @@ export class RunWorkflowJob {
workflowRunId, workflowRunId,
currentStepId: rootSteps[0].id, currentStepId: rootSteps[0].id,
steps: workflowVersion.steps, steps: workflowVersion.steps,
context, context: workflowRun.context ?? {
trigger: triggerPayload,
},
workspaceId, workspaceId,
}); });
} }

View File

@ -1,5 +1,5 @@
import { getRootSteps } from 'src/modules/workflow/workflow-runner/utils/getRootSteps.utils';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { getRootSteps } from 'src/modules/workflow/workflow-runner/utils/get-root-steps.utils';
describe('getRootSteps', () => { describe('getRootSteps', () => {
it('should return the root steps', () => { it('should return the root steps', () => {

View File

@ -0,0 +1,34 @@
import { Command, CommandRunner } from 'nest-commander';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import {
WORKFLOW_RUN_ENQUEUE_CRON_PATTERN,
WorkflowRunEnqueueJob,
} from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job';
@Command({
name: 'cron:workflow:enqueue-awaiting-workflow-run',
description: 'Enqueues awaiting workflow runs',
})
export class CronWorkflowRunEnqueueCommand extends CommandRunner {
constructor(
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}
async run(): Promise<void> {
await this.messageQueueService.addCron({
jobName: WorkflowRunEnqueueJob.name,
data: undefined,
options: {
repeat: {
pattern: WORKFLOW_RUN_ENQUEUE_CRON_PATTERN,
},
},
});
}
}

View File

@ -0,0 +1,139 @@
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
import { Repository } from 'typeorm';
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { MetricsService } from 'src/engine/core-modules/metrics/metrics.service';
import { MetricsKeys } from 'src/engine/core-modules/metrics/types/metrics-keys.type';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import {
WorkflowRunStatus,
WorkflowRunWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import {
RunWorkflowJob,
RunWorkflowJobData,
} from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job';
import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service';
export const WORKFLOW_RUN_ENQUEUE_CRON_PATTERN = '* * * * *';
@Processor(MessageQueue.cronQueue)
export class WorkflowRunEnqueueJob {
private readonly logger = new Logger(WorkflowRunEnqueueJob.name);
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
private readonly workflowRunQueueWorkspaceService: WorkflowRunQueueWorkspaceService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
private readonly metricsService: MetricsService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
@Process(WorkflowRunEnqueueJob.name)
@SentryCronMonitor(
WorkflowRunEnqueueJob.name,
WORKFLOW_RUN_ENQUEUE_CRON_PATTERN,
)
async handle() {
const activeWorkspaces = await this.workspaceRepository.find({
where: {
activationStatus: WorkspaceActivationStatus.ACTIVE,
},
});
const mainDataSource =
await this.workspaceDataSourceService.connectToMainDataSource();
for (const activeWorkspace of activeWorkspaces) {
let enqueuedWorkflowRunCount = 0;
try {
const remainingWorkflowRunCount =
await this.workflowRunQueueWorkspaceService.getRemainingRunsToEnqueueCount(
activeWorkspace.id,
);
if (remainingWorkflowRunCount <= 0) {
continue;
}
const schemaName = this.workspaceDataSourceService.getSchemaName(
activeWorkspace.id,
);
// Using raw query to avoid storing repository in cache
const workflowRuns = await mainDataSource.query(
`SELECT * FROM ${schemaName}."workflowRun" WHERE status = '${WorkflowRunStatus.NOT_STARTED}' ORDER BY "createdAt" ASC`,
);
const workflowRunsToEnqueueCount = Math.min(
remainingWorkflowRunCount,
workflowRuns.length,
);
if (workflowRunsToEnqueueCount <= 0) {
continue;
}
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace(
activeWorkspace.id,
WorkflowRunWorkspaceEntity,
{ shouldBypassPermissionChecks: true },
);
for (
let runIndex = 0;
runIndex < workflowRunsToEnqueueCount;
runIndex++
) {
const workflowRunId = workflowRuns[runIndex].id;
await this.messageQueueService.add<RunWorkflowJobData>(
RunWorkflowJob.name,
{
workflowRunId,
workspaceId: activeWorkspace.id,
},
);
await workflowRunRepository.update(workflowRunId, {
status: WorkflowRunStatus.ENQUEUED,
});
enqueuedWorkflowRunCount++;
}
} catch (error) {
this.logger.error(
`Error enqueuing workflow runs for workspace ${activeWorkspace.id}`,
error,
);
this.metricsService.incrementCounter({
key: MetricsKeys.WorkflowRunFailedToEnqueue,
eventId: activeWorkspace.id,
});
} finally {
if (enqueuedWorkflowRunCount > 0) {
await this.workflowRunQueueWorkspaceService.increaseWorkflowRunQueuedCount(
activeWorkspace.id,
enqueuedWorkflowRunCount,
);
}
}
}
}
}

View File

@ -0,0 +1,3 @@
export const getWorkflowRunQueuedCountCacheKey = (
workspaceId: string,
): string => `workflow-run-queued-count:${workspaceId}`;

View File

@ -0,0 +1,28 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { CacheStorageModule } from 'src/engine/core-modules/cache-storage/cache-storage.module';
import { MessageQueueModule } from 'src/engine/core-modules/message-queue/message-queue.module';
import { MetricsModule } from 'src/engine/core-modules/metrics/metrics.module';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { CronWorkflowRunEnqueueCommand } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-workflow-run-enqueue.cron.command';
import { WorkflowRunEnqueueJob } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job';
import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service';
@Module({
imports: [
CacheStorageModule,
TypeOrmModule.forFeature([Workspace], 'core'),
MessageQueueModule,
WorkspaceDataSourceModule,
MetricsModule,
],
providers: [
WorkflowRunQueueWorkspaceService,
WorkflowRunEnqueueJob,
CronWorkflowRunEnqueueCommand,
],
exports: [WorkflowRunQueueWorkspaceService],
})
export class WorkflowRunQueueModule {}

View File

@ -0,0 +1,59 @@
import { Injectable } from '@nestjs/common';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { getWorkflowRunQueuedCountCacheKey } from 'src/modules/workflow/workflow-runner/workflow-run-queue/utils/get-cache-workflow-run-count-key.util';
@Injectable()
export class WorkflowRunQueueWorkspaceService {
private readonly WORKFLOW_RUN_QUEUE_THROTTLE_LIMIT = 100;
constructor(
@InjectCacheStorage(CacheStorageNamespace.ModuleWorkflow)
private readonly cacheStorage: CacheStorageService,
) {}
async increaseWorkflowRunQueuedCount(
workspaceId: string,
newlyEnqueuedCount = 1,
): Promise<void> {
const currentCount =
await this.getCurrentWorkflowRunQueuedCount(workspaceId);
await this.cacheStorage.set(
getWorkflowRunQueuedCountCacheKey(workspaceId),
currentCount + newlyEnqueuedCount,
);
}
async decreaseWorkflowRunQueuedCount(
workspaceId: string,
removedFromQueueCount = 1,
): Promise<void> {
const currentCount =
await this.getCurrentWorkflowRunQueuedCount(workspaceId);
await this.cacheStorage.set(
getWorkflowRunQueuedCountCacheKey(workspaceId),
currentCount - removedFromQueueCount,
);
}
async getRemainingRunsToEnqueueCount(workspaceId: string): Promise<number> {
const currentCount =
await this.getCurrentWorkflowRunQueuedCount(workspaceId);
return this.WORKFLOW_RUN_QUEUE_THROTTLE_LIMIT - currentCount;
}
private async getCurrentWorkflowRunQueuedCount(
workspaceId: string,
): Promise<number> {
const key = getWorkflowRunQueuedCountCacheKey(workspaceId);
const currentCount = (await this.cacheStorage.get<number>(key)) ?? 0;
return Math.max(0, currentCount);
}
}

View File

@ -44,10 +44,15 @@ export class WorkflowRunWorkspaceService {
workflowVersionId, workflowVersionId,
createdBy, createdBy,
workflowRunId, workflowRunId,
context,
status,
}: { }: {
workflowVersionId: string; workflowVersionId: string;
createdBy: ActorMetadata; createdBy: ActorMetadata;
workflowRunId?: string; workflowRunId?: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
context: Record<string, any>;
status: WorkflowRunStatus.NOT_STARTED | WorkflowRunStatus.ENQUEUED;
}) { }) {
const workspaceId = const workspaceId =
this.scopedWorkspaceContextFactory.create()?.workspaceId; this.scopedWorkspaceContextFactory.create()?.workspaceId;
@ -113,8 +118,9 @@ export class WorkflowRunWorkspaceService {
workflowVersionId, workflowVersionId,
createdBy, createdBy,
workflowId: workflow.id, workflowId: workflow.id,
status: WorkflowRunStatus.NOT_STARTED, status,
position, position,
context,
}); });
await workflowRunRepository.insert(workflowRun); await workflowRunRepository.insert(workflowRun);
@ -125,13 +131,10 @@ export class WorkflowRunWorkspaceService {
async startWorkflowRun({ async startWorkflowRun({
workflowRunId, workflowRunId,
workspaceId, workspaceId,
context,
output, output,
}: { }: {
workflowRunId: string; workflowRunId: string;
workspaceId: string; workspaceId: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
context: Record<string, any>;
output: WorkflowRunOutput; output: WorkflowRunOutput;
}) { }) {
const workflowRunRepository = const workflowRunRepository =
@ -152,7 +155,10 @@ export class WorkflowRunWorkspaceService {
); );
} }
if (workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED) { if (
workflowRunToUpdate.status !== WorkflowRunStatus.ENQUEUED &&
workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED
) {
throw new WorkflowRunException( throw new WorkflowRunException(
'Workflow run already started', 'Workflow run already started',
WorkflowRunExceptionCode.INVALID_OPERATION, WorkflowRunExceptionCode.INVALID_OPERATION,
@ -162,7 +168,6 @@ export class WorkflowRunWorkspaceService {
const partialUpdate = { const partialUpdate = {
status: WorkflowRunStatus.RUNNING, status: WorkflowRunStatus.RUNNING,
startedAt: new Date().toISOString(), startedAt: new Date().toISOString(),
context,
output, output,
}; };
@ -170,7 +175,7 @@ export class WorkflowRunWorkspaceService {
await this.emitWorkflowRunUpdatedEvent({ await this.emitWorkflowRunUpdatedEvent({
workflowRunBefore: workflowRunToUpdate, workflowRunBefore: workflowRunToUpdate,
updatedFields: ['status', 'startedAt', 'context', 'output'], updatedFields: ['status', 'startedAt', 'output'],
}); });
} }

View File

@ -6,6 +6,7 @@ import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.mod
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module'; import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module';
import { RunWorkflowJob } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; import { RunWorkflowJob } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job';
import { WorkflowRunQueueModule } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workflow-run-queue.module';
import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.module'; import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.module';
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service';
@ -17,6 +18,7 @@ import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-ru
BillingModule, BillingModule,
WorkflowRunModule, WorkflowRunModule,
MetricsModule, MetricsModule,
WorkflowRunQueueModule,
], ],
providers: [WorkflowRunnerWorkspaceService, RunWorkflowJob], providers: [WorkflowRunnerWorkspaceService, RunWorkflowJob],
exports: [WorkflowRunnerWorkspaceService], exports: [WorkflowRunnerWorkspaceService],

View File

@ -5,17 +5,23 @@ import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decora
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { import {
RunWorkflowJob, RunWorkflowJob,
RunWorkflowJobData, RunWorkflowJobData,
} from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job'; } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job';
import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
import { WorkflowTriggerType } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type';
@Injectable() @Injectable()
export class WorkflowRunnerWorkspaceService { export class WorkflowRunnerWorkspaceService {
private readonly logger = new Logger(WorkflowRunnerWorkspaceService.name); private readonly logger = new Logger(WorkflowRunnerWorkspaceService.name);
constructor( constructor(
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
private readonly workflowRunQueueWorkspaceService: WorkflowRunQueueWorkspaceService,
@InjectMessageQueue(MessageQueue.workflowQueue) @InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService, private readonly messageQueueService: MessageQueueService,
private readonly billingUsageService: BillingUsageService, private readonly billingUsageService: BillingUsageService,
@ -42,21 +48,41 @@ export class WorkflowRunnerWorkspaceService {
'Cannot execute billed function, there is no subscription for this workspace', 'Cannot execute billed function, there is no subscription for this workspace',
); );
} }
const workflowVersion =
await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail({
workspaceId,
workflowVersionId,
});
const remainingRunsToEnqueueCount =
await this.workflowRunQueueWorkspaceService.getRemainingRunsToEnqueueCount(
workspaceId,
);
const isQueueLimitReached = remainingRunsToEnqueueCount <= 0;
const isManualTrigger =
workflowVersion.trigger?.type === WorkflowTriggerType.MANUAL;
const shouldEnqueueWorkflowRun = isManualTrigger || !isQueueLimitReached;
const workflowRunId = const workflowRunId =
await this.workflowRunWorkspaceService.createWorkflowRun({ await this.workflowRunWorkspaceService.createWorkflowRun({
workflowVersionId, workflowVersionId,
workflowRunId: initialWorkflowRunId, workflowRunId: initialWorkflowRunId,
createdBy: source, createdBy: source,
status: shouldEnqueueWorkflowRun
? WorkflowRunStatus.ENQUEUED
: WorkflowRunStatus.NOT_STARTED,
context: {
trigger: payload,
},
}); });
await this.messageQueueService.add<RunWorkflowJobData>( if (shouldEnqueueWorkflowRun) {
RunWorkflowJob.name, await this.enqueueWorkflowRun(workspaceId, workflowRunId);
{ }
workspaceId,
payload: payload,
workflowRunId,
},
);
return { workflowRunId }; return { workflowRunId };
} }
@ -70,6 +96,18 @@ export class WorkflowRunnerWorkspaceService {
workflowRunId: string; workflowRunId: string;
lastExecutedStepId: string; lastExecutedStepId: string;
}) { }) {
await this.enqueueWorkflowRun(
workspaceId,
workflowRunId,
lastExecutedStepId,
);
}
private async enqueueWorkflowRun(
workspaceId: string,
workflowRunId: string,
lastExecutedStepId?: string,
) {
await this.messageQueueService.add<RunWorkflowJobData>( await this.messageQueueService.add<RunWorkflowJobData>(
RunWorkflowJob.name, RunWorkflowJob.name,
{ {
@ -78,5 +116,8 @@ export class WorkflowRunnerWorkspaceService {
lastExecutedStepId, lastExecutedStepId,
}, },
); );
await this.workflowRunQueueWorkspaceService.increaseWorkflowRunQueuedCount(
workspaceId,
);
} }
} }