13303 workflow clean workflowruns to keep max 1000 workflowruns per workflow (#13353)
- add a cron to remove COMPLETED and FAILED workflowRuns if there is more that 1000 per workflow - update workflowRun naming computing
This commit is contained in:
@ -0,0 +1,34 @@
|
||||
import { Command, CommandRunner } from 'nest-commander';
|
||||
|
||||
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
|
||||
import {
|
||||
CLEAN_WORKFLOW_RUN_CRON_PATTERN,
|
||||
CleanWorkflowRunsJob,
|
||||
} from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job';
|
||||
|
||||
@Command({
|
||||
name: 'cron:workflow:clean-workflow-runs',
|
||||
description: 'Clean workflow runs',
|
||||
})
|
||||
export class CronCleanWorkflowRunsCommand extends CommandRunner {
|
||||
constructor(
|
||||
@InjectMessageQueue(MessageQueue.cronQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async run(): Promise<void> {
|
||||
await this.messageQueueService.addCron({
|
||||
jobName: CleanWorkflowRunsJob.name,
|
||||
data: undefined,
|
||||
options: {
|
||||
repeat: {
|
||||
pattern: CLEAN_WORKFLOW_RUN_CRON_PATTERN,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,82 @@
|
||||
import { Logger } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
|
||||
import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
|
||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
import {
|
||||
WorkflowRunStatus,
|
||||
WorkflowRunWorkspaceEntity,
|
||||
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
|
||||
export const CLEAN_WORKFLOW_RUN_CRON_PATTERN = '0 0 * * *';
|
||||
|
||||
const NUMBER_OF_WORKFLOW_RUNS_TO_KEEP = 1000;
|
||||
|
||||
@Processor(MessageQueue.cronQueue)
|
||||
export class CleanWorkflowRunsJob {
|
||||
private readonly logger = new Logger(CleanWorkflowRunsJob.name);
|
||||
|
||||
constructor(
|
||||
@InjectRepository(Workspace, 'core')
|
||||
private readonly workspaceRepository: Repository<Workspace>,
|
||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
|
||||
) {}
|
||||
|
||||
@Process(CleanWorkflowRunsJob.name)
|
||||
@SentryCronMonitor(CleanWorkflowRunsJob.name, CLEAN_WORKFLOW_RUN_CRON_PATTERN)
|
||||
async handle() {
|
||||
const activeWorkspaces = await this.workspaceRepository.find({
|
||||
where: {
|
||||
activationStatus: WorkspaceActivationStatus.ACTIVE,
|
||||
},
|
||||
});
|
||||
|
||||
const mainDataSource =
|
||||
await this.workspaceDataSourceService.connectToMainDataSource();
|
||||
|
||||
for (const activeWorkspace of activeWorkspaces) {
|
||||
const schemaName = this.workspaceDataSourceService.getSchemaName(
|
||||
activeWorkspace.id,
|
||||
);
|
||||
|
||||
const workflowRunsToDelete = await mainDataSource.query(
|
||||
`
|
||||
WITH ranked_runs AS (
|
||||
SELECT id,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY "workflowId"
|
||||
ORDER BY "createdAt" DESC
|
||||
) AS rn
|
||||
FROM ${schemaName}."workflowRun"
|
||||
WHERE status IN ('${WorkflowRunStatus.COMPLETED}', '${WorkflowRunStatus.FAILED}')
|
||||
)
|
||||
SELECT id, rn FROM ranked_runs WHERE rn > ${NUMBER_OF_WORKFLOW_RUNS_TO_KEEP};
|
||||
`,
|
||||
);
|
||||
|
||||
const workflowRunRepository =
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace(
|
||||
activeWorkspace.id,
|
||||
WorkflowRunWorkspaceEntity,
|
||||
{ shouldBypassPermissionChecks: true },
|
||||
);
|
||||
|
||||
for (const workflowRunToDelete of workflowRunsToDelete) {
|
||||
await workflowRunRepository.delete(workflowRunToDelete.id);
|
||||
}
|
||||
|
||||
this.logger.log(
|
||||
`Deleted ${workflowRunsToDelete.length} workflow runs for workspace ${activeWorkspace.id} (schema ${schemaName})`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -9,6 +9,8 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
|
||||
import { CronWorkflowRunEnqueueCommand } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-workflow-run-enqueue.cron.command';
|
||||
import { WorkflowRunEnqueueJob } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/workflow-run-enqueue.cron.job';
|
||||
import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run-queue/workspace-services/workflow-run-queue.workspace-service';
|
||||
import { CleanWorkflowRunsJob } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/jobs/clean-workflow-runs.cron.job';
|
||||
import { CronCleanWorkflowRunsCommand } from 'src/modules/workflow/workflow-runner/workflow-run-queue/cron/command/cron-clean-workflow-runs.cron.command';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@ -21,7 +23,9 @@ import { WorkflowRunQueueWorkspaceService } from 'src/modules/workflow/workflow-
|
||||
providers: [
|
||||
WorkflowRunQueueWorkspaceService,
|
||||
WorkflowRunEnqueueJob,
|
||||
CleanWorkflowRunsJob,
|
||||
CronWorkflowRunEnqueueCommand,
|
||||
CronCleanWorkflowRunsCommand,
|
||||
],
|
||||
exports: [WorkflowRunQueueWorkspaceService],
|
||||
})
|
||||
|
||||
@ -92,12 +92,6 @@ export class WorkflowRunWorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
const workflowRunCount = await workflowRunRepository.count({
|
||||
where: {
|
||||
workflowId: workflow.id,
|
||||
},
|
||||
});
|
||||
|
||||
const position = await this.recordPositionService.buildRecordPosition({
|
||||
value: 'first',
|
||||
objectMetadata: {
|
||||
@ -109,6 +103,19 @@ export class WorkflowRunWorkspaceService {
|
||||
|
||||
const initState = this.getInitState(workflowVersion, triggerPayload);
|
||||
|
||||
const lastWorkflowRun = await workflowRunRepository.findOne({
|
||||
where: {
|
||||
workflowId: workflow.id,
|
||||
},
|
||||
order: { createdAt: 'desc' },
|
||||
});
|
||||
|
||||
const workflowRunCountMatch = lastWorkflowRun?.name.match(/#(\d+)/);
|
||||
|
||||
const workflowRunCount = workflowRunCountMatch
|
||||
? parseInt(workflowRunCountMatch[1], 10)
|
||||
: 0;
|
||||
|
||||
const workflowRun = workflowRunRepository.create({
|
||||
id: workflowRunId ?? v4(),
|
||||
name: `#${workflowRunCount + 1} - ${workflow.name}`,
|
||||
|
||||
Reference in New Issue
Block a user