Add workflow run entity (#6622)

- create a workflow run every time a workflow is triggered in
not_started status. This status will be helpful later for once workflows
will be scheduled
- update run status once workflow starts running
- complete status once the workflow finished running
- add a failed status if an error occurs
This commit is contained in:
Thomas Trompette
2024-08-14 18:27:32 +02:00
committed by GitHub
parent 121794e3c0
commit 9e7714e627
17 changed files with 390 additions and 45 deletions

View File

@ -1,14 +1,18 @@
import { Logger } from '@nestjs/common';
import { Scope } from '@nestjs/common';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import {
RunWorkflowJobData,
WorkflowRunnerJob,
} from 'src/modules/workflow/workflow-runner/workflow-runner.job';
import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service';
export type WorkflowEventTriggerJobData = {
workspaceId: string;
@ -16,21 +20,44 @@ export type WorkflowEventTriggerJobData = {
payload: object;
};
@Processor(MessageQueue.workflowQueue)
@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
export class WorkflowEventTriggerJob {
private readonly logger = new Logger(WorkflowEventTriggerJob.name);
constructor(
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
private readonly twentyORMManager: TwentyORMManager,
private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService,
) {}
@Process(WorkflowEventTriggerJob.name)
async handle(data: WorkflowEventTriggerJobData): Promise<void> {
const workflowRepository =
await this.twentyORMManager.getRepository<WorkflowWorkspaceEntity>(
'workflow',
);
const workflow = await workflowRepository.findOneByOrFail({
id: data.workflowId,
});
if (!workflow.publishedVersionId) {
throw new Error('Workflow has no published version');
}
const workflowRunId =
await this.workflowStatusWorkspaceService.createWorkflowRun(
workflow.publishedVersionId,
{
source: FieldActorSource.WORKFLOW,
name: workflow.name,
},
);
this.messageQueueService.add<RunWorkflowJobData>(WorkflowRunnerJob.name, {
workspaceId: data.workspaceId,
workflowId: data.workflowId,
workflowVersionId: workflow.publishedVersionId,
payload: data.payload,
workflowRunId,
});
}
}

View File

@ -1,10 +1,11 @@
import { Module } from '@nestjs/common';
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module';
import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job';
@Module({
imports: [WorkflowRunnerModule],
imports: [WorkflowRunnerModule, WorkflowStatusModule],
providers: [WorkflowEventTriggerJob],
})
export class WorkflowTriggerJobModule {}

View File

@ -12,4 +12,5 @@ export enum WorkflowTriggerExceptionCode {
INVALID_WORKFLOW_TRIGGER = 'INVALID_WORKFLOW_TRIGGER',
INVALID_WORKFLOW_VERSION = 'INVALID_WORKFLOW_VERSION',
INVALID_ACTION_TYPE = 'INVALID_ACTION_TYPE',
INTERNAL_ERROR = 'INTERNAL_ERROR',
}

View File

@ -32,11 +32,18 @@ export class WorkflowTriggerService {
workflowVersionId,
);
return await this.workflowRunnerService.run({
action: workflowVersion.trigger.nextAction,
workspaceId,
payload,
});
try {
return await this.workflowRunnerService.run({
action: workflowVersion.trigger.nextAction,
workspaceId,
payload,
});
} catch (error) {
throw new WorkflowTriggerException(
`Error running workflow version ${error}`,
WorkflowTriggerExceptionCode.INTERNAL_ERROR,
);
}
}
async enableWorkflowTrigger(workspaceId: string, workflowVersionId: string) {