Trigger workflow run manually (#6696)

Fix https://github.com/twentyhq/twenty/issues/6669

- create a commun function `startWorkflowRun` that both create the run
object and the job for executing the workflow
- use it in both the `workflowEventJob` and the `runWorkflowVersion`
endpoint

Bonus:
- use filtering for exceptions instead of a util. It avoids doing a try
catch in all endpoint
This commit is contained in:
Thomas Trompette
2024-08-21 17:41:26 +02:00
committed by GitHub
parent da5dfb7a5b
commit 663acd56e4
43 changed files with 452 additions and 316 deletions

View File

@ -5,8 +5,8 @@ import { Processor } from 'src/engine/integrations/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workflow-common.workspace-service';
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service';
import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service';
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workflow-executor.workspace-service';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
export type RunWorkflowJobData = {
workspaceId: string;
@ -16,20 +16,20 @@ export type RunWorkflowJobData = {
};
@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
export class WorkflowRunnerJob {
export class RunWorkflowJob {
constructor(
private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService,
private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService,
private readonly workflowExecutorWorkspaceService: WorkflowExecutorWorkspaceService,
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
) {}
@Process(WorkflowRunnerJob.name)
@Process(RunWorkflowJob.name)
async handle({
workflowVersionId,
workflowRunId,
payload,
}: RunWorkflowJobData): Promise<void> {
await this.workflowStatusWorkspaceService.startWorkflowRun(workflowRunId);
await this.workflowRunWorkspaceService.startWorkflowRun(workflowRunId);
const workflowVersion =
await this.workflowCommonWorkspaceService.getWorkflowVersion(
@ -37,17 +37,17 @@ export class WorkflowRunnerJob {
);
try {
await this.workflowRunnerWorkspaceService.run({
await this.workflowExecutorWorkspaceService.execute({
action: workflowVersion.trigger.nextAction,
payload,
});
await this.workflowStatusWorkspaceService.endWorkflowRun(
await this.workflowRunWorkspaceService.endWorkflowRun(
workflowRunId,
WorkflowRunStatus.COMPLETED,
);
} catch (error) {
await this.workflowStatusWorkspaceService.endWorkflowRun(
await this.workflowRunWorkspaceService.endWorkflowRun(
workflowRunId,
WorkflowRunStatus.FAILED,
);

View File

@ -0,0 +1,13 @@
import { CustomException } from 'src/utils/custom-exception';
export class WorkflowRunException extends CustomException {
code: WorkflowRunExceptionCode;
constructor(message: string, code: WorkflowRunExceptionCode) {
super(message, code);
}
}
export enum WorkflowRunExceptionCode {
WORKFLOW_RUN_NOT_FOUND = 'WORKFLOW_RUN_NOT_FOUND',
INVALID_OPERATION = 'INVALID_OPERATION',
}

View File

@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
@Module({
providers: [WorkflowRunWorkspaceService],
exports: [WorkflowRunWorkspaceService],
})
export class WorkflowRunModule {}

View File

@ -0,0 +1,92 @@
import { Injectable } from '@nestjs/common';
import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import {
WorkflowRunStatus,
WorkflowRunWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import {
WorkflowRunException,
WorkflowRunExceptionCode,
} from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.exception';
@Injectable()
export class WorkflowRunWorkspaceService {
constructor(private readonly twentyORMManager: TwentyORMManager) {}
async createWorkflowRun(workflowVersionId: string, createdBy: ActorMetadata) {
const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun',
);
return (
await workflowRunRepository.save({
workflowVersionId,
createdBy,
status: WorkflowRunStatus.NOT_STARTED,
})
).id;
}
async startWorkflowRun(workflowRunId: string) {
const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun',
);
const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
});
if (!workflowRunToUpdate) {
throw new WorkflowRunException(
'No workflow run to start',
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}
if (workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED) {
throw new WorkflowRunException(
'Workflow run already started',
WorkflowRunExceptionCode.INVALID_OPERATION,
);
}
return workflowRunRepository.update(workflowRunToUpdate.id, {
status: WorkflowRunStatus.RUNNING,
startedAt: new Date().toISOString(),
});
}
async endWorkflowRun(workflowRunId: string, status: WorkflowRunStatus) {
const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun',
);
const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
});
if (!workflowRunToUpdate) {
throw new WorkflowRunException(
'No workflow run to end',
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}
if (workflowRunToUpdate.status !== WorkflowRunStatus.RUNNING) {
throw new WorkflowRunException(
'Workflow cannot be ended as it is not running',
WorkflowRunExceptionCode.INVALID_OPERATION,
);
}
return workflowRunRepository.update(workflowRunToUpdate.id, {
status,
endedAt: new Date().toISOString(),
});
}
}

View File

@ -1,11 +0,0 @@
import { CustomException } from 'src/utils/custom-exception';
export class WorkflowRunnerException extends CustomException {
constructor(message: string, code: string) {
super(message, code);
}
}
export enum WorkflowRunnerExceptionCode {
WORKFLOW_FAILED = 'WORKFLOW_FAILED',
}

View File

@ -1,18 +1,14 @@
import { Module } from '@nestjs/common';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowActionRunnerModule } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.module';
import { WorkflowRunnerJob } from 'src/modules/workflow/workflow-runner/workflow-runner.job';
import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module';
import { RunWorkflowJob } from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job';
import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.module';
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service';
import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module';
@Module({
imports: [
WorkflowCommonModule,
WorkflowActionRunnerModule,
WorkflowStatusModule,
],
providers: [WorkflowRunnerWorkspaceService, WorkflowRunnerJob],
imports: [WorkflowRunModule, WorkflowCommonModule, WorkflowExecutorModule],
providers: [WorkflowRunnerWorkspaceService, RunWorkflowJob],
exports: [WorkflowRunnerWorkspaceService],
})
export class WorkflowRunnerModule {}

View File

@ -1,84 +1,45 @@
import { Injectable } from '@nestjs/common';
import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-action.type';
import { WorkflowActionRunnerFactory } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.factory';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import {
WorkflowRunnerException,
WorkflowRunnerExceptionCode,
} from 'src/modules/workflow/workflow-runner/workflow-runner.exception';
const MAX_RETRIES_ON_FAILURE = 3;
export type WorkflowRunOutput = {
data?: object;
error?: object;
};
RunWorkflowJob,
RunWorkflowJobData,
} from 'src/modules/workflow/workflow-runner/jobs/run-workflow.job';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
@Injectable()
export class WorkflowRunnerWorkspaceService {
constructor(
private readonly workflowActionRunnerFactory: WorkflowActionRunnerFactory,
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
@InjectMessageQueue(MessageQueue.workflowQueue)
private readonly messageQueueService: MessageQueueService,
) {}
async run({
action,
payload,
attemptCount = 1,
}: {
action?: WorkflowAction;
payload?: object;
attemptCount?: number;
}): Promise<WorkflowRunOutput> {
if (!action) {
return {
data: payload,
};
}
const workflowActionRunner = this.workflowActionRunnerFactory.get(
action.type,
);
const result = await workflowActionRunner.execute({
action,
payload,
});
if (result.data) {
return await this.run({
action: action.nextAction,
payload: result.data,
});
}
if (!result.error) {
throw new WorkflowRunnerException(
'Execution result error, no data or error',
WorkflowRunnerExceptionCode.WORKFLOW_FAILED,
async run(
workspaceId: string,
workflowVersionId: string,
payload: object,
source: ActorMetadata,
) {
const workflowRunId =
await this.workflowRunWorkspaceService.createWorkflowRun(
workflowVersionId,
source,
);
}
if (action.settings.errorHandlingOptions.continueOnFailure.value) {
return await this.run({
action: action.nextAction,
payload,
});
}
if (
action.settings.errorHandlingOptions.retryOnFailure.value &&
attemptCount < MAX_RETRIES_ON_FAILURE
) {
return await this.run({
action,
payload,
attemptCount: attemptCount + 1,
});
}
throw new WorkflowRunnerException(
`Workflow failed: ${result.error}`,
WorkflowRunnerExceptionCode.WORKFLOW_FAILED,
await this.messageQueueService.add<RunWorkflowJobData>(
RunWorkflowJob.name,
{
workspaceId,
workflowVersionId,
payload: payload,
workflowRunId,
},
);
return { workflowRunId };
}
}