diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-clean-workflow-runs.cron.command.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-clean-workflow-runs.cron.command.ts new file mode 100644 index 000000000..6aee33d45 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-clean-workflow-runs.cron.command.ts @@ -0,0 +1,34 @@ +import { Command, CommandRunner } from 'nest-commander'; + +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { + CLEAN_WORKFLOW_RUN_CRON_PATTERN, + CleanWorkflowRunsJob, +} from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job'; + +@Command({ + name: 'cron:workflow:clean-workflow-runs', + description: 'Clean workflow runs', +}) +export class CronCleanWorkflowRunsCommand extends CommandRunner { + constructor( + @InjectMessageQueue(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron({ + jobName: CleanWorkflowRunsJob.name, + data: undefined, + options: { + repeat: { + pattern: CLEAN_WORKFLOW_RUN_CRON_PATTERN, + }, + }, + }); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job.ts new file mode 100644 index 000000000..6ce51695d --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job.ts @@ -0,0 +1,82 @@ +import { Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { WorkspaceActivationStatus } from 'twenty-shared/workspace'; +import { Repository } from 'typeorm'; + +import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service'; +import { + WorkflowRunStatus, + WorkflowRunWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; + +export const CLEAN_WORKFLOW_RUN_CRON_PATTERN = '0 0 * * *'; + +const NUMBER_OF_WORKFLOW_RUNS_TO_KEEP = 1000; + +@Processor(MessageQueue.cronQueue) +export class CleanWorkflowRunsJob { + private readonly logger = new Logger(CleanWorkflowRunsJob.name); + + constructor( + @InjectRepository(Workspace, 'core') + private readonly workspaceRepository: Repository, + private readonly workspaceDataSourceService: WorkspaceDataSourceService, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + ) {} + + @Process(CleanWorkflowRunsJob.name) + @SentryCronMonitor(CleanWorkflowRunsJob.name, CLEAN_WORKFLOW_RUN_CRON_PATTERN) + async handle() { + const activeWorkspaces = await this.workspaceRepository.find({ + where: { + activationStatus: WorkspaceActivationStatus.ACTIVE, + }, + }); + + const mainDataSource = + await this.workspaceDataSourceService.connectToMainDataSource(); + + for (const activeWorkspace of activeWorkspaces) { + const schemaName = this.workspaceDataSourceService.getSchemaName( + activeWorkspace.id, + ); + + const workflowRunsToDelete = await mainDataSource.query( + ` + WITH ranked_runs AS ( + SELECT id, + ROW_NUMBER() OVER ( + PARTITION BY "workflowId" + ORDER BY "createdAt" DESC + ) AS rn + FROM ${schemaName}."workflowRun" + WHERE status IN ('${WorkflowRunStatus.COMPLETED}', '${WorkflowRunStatus.FAILED}') + ) + SELECT id, rn FROM ranked_runs WHERE rn > ${NUMBER_OF_WORKFLOW_RUNS_TO_KEEP}; + `, + ); + + const workflowRunRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + activeWorkspace.id, + WorkflowRunWorkspaceEntity, + { shouldBypassPermissionChecks: true }, + ); + + for (const workflowRunToDelete of workflowRunsToDelete) { + await workflowRunRepository.delete(workflowRunToDelete.id); + } + + this.logger.log( + `Deleted ${workflowRunsToDelete.length} workflow runs for workspace ${activeWorkspace.id} (schema ${schemaName})`, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workflow-run-queue.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workflow-run-queue.module.ts index 0191f420b..298f0fa4e 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workflow-run-queue.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run-queue/workflow-run-queue.module.ts @@ -9,6 +9,8 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works import { CronWorkflowRunEnqueueCommand } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-workflow-run-enqueue.cron.command'; import { WorkflowRunEnqueueJob } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job'; import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service'; +import { CleanWorkflowRunsJob } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job'; +import { CronCleanWorkflowRunsCommand } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-clean-workflow-runs.cron.command'; @Module({ imports: [ @@ -21,7 +23,9 @@ import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow- providers: [ WorkflowRunQueueWorkspaceService, WorkflowRunEnqueueJob, + CleanWorkflowRunsJob, CronWorkflowRunEnqueueCommand, + CronCleanWorkflowRunsCommand, ], exports: [WorkflowRunQueueWorkspaceService], }) diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 2833844b5..fd9b9bc34 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -92,12 +92,6 @@ export class WorkflowRunWorkspaceService { ); } - const workflowRunCount = await workflowRunRepository.count({ - where: { - workflowId: workflow.id, - }, - }); - const position = await this.recordPositionService.buildRecordPosition({ value: 'first', objectMetadata: { @@ -109,6 +103,19 @@ export class WorkflowRunWorkspaceService { const initState = this.getInitState(workflowVersion, triggerPayload); + const lastWorkflowRun = await workflowRunRepository.findOne({ + where: { + workflowId: workflow.id, + }, + order: { createdAt: 'desc' }, + }); + + const workflowRunCountMatch = lastWorkflowRun?.name.match(/#(\d+)/); + + const workflowRunCount = workflowRunCountMatch + ? parseInt(workflowRunCountMatch[1], 10) + : 0; + const workflowRun = workflowRunRepository.create({ id: workflowRunId ?? v4(), name: `#${workflowRunCount + 1} - ${workflow.name}`,