Improve typing definition (#6481)
- added typing for workflow-runner results - fix workflow typing - add a `workflow-action-runner` submodule that contains factories for action runners - added code-action-runner - simplified code
This commit is contained in:
@ -3,10 +3,10 @@ import { Module } from '@nestjs/common';
|
||||
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
|
||||
import { WorkflowRunnerJob } from 'src/modules/workflow/workflow-runner/workflow-runner.job';
|
||||
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
|
||||
import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module';
|
||||
import { WorkflowActionRunnerModule } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.module';
|
||||
|
||||
@Module({
|
||||
imports: [WorkflowCommonModule, ServerlessFunctionModule],
|
||||
imports: [WorkflowCommonModule, WorkflowActionRunnerModule],
|
||||
providers: [WorkflowRunnerService, WorkflowRunnerJob],
|
||||
exports: [WorkflowRunnerService],
|
||||
})
|
||||
|
||||
@ -1,21 +1,14 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
|
||||
import {
|
||||
WorkflowTriggerException,
|
||||
WorkflowTriggerExceptionCode,
|
||||
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
|
||||
import {
|
||||
WorkflowAction,
|
||||
WorkflowActionType,
|
||||
} from 'src/modules/workflow/common/types/workflow-action.type';
|
||||
import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-action.type';
|
||||
import { WorkflowActionRunnerFactory } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.factory';
|
||||
|
||||
const MAX_RETRIES_ON_FAILURE = 3;
|
||||
|
||||
@Injectable()
|
||||
export class WorkflowRunnerService {
|
||||
constructor(
|
||||
private readonly serverlessFunctionService: ServerlessFunctionService,
|
||||
private readonly workflowActionRunnerFactory: WorkflowActionRunnerFactory,
|
||||
) {}
|
||||
|
||||
async run({
|
||||
@ -33,51 +26,48 @@ export class WorkflowRunnerService {
|
||||
return payload;
|
||||
}
|
||||
|
||||
let result: object | undefined = undefined;
|
||||
const workflowActionRunner = this.workflowActionRunnerFactory.get(
|
||||
action.type,
|
||||
);
|
||||
|
||||
switch (action.type) {
|
||||
case WorkflowActionType.CODE: {
|
||||
const executionResult = await this.serverlessFunctionService.executeOne(
|
||||
action.settings.serverlessFunctionId,
|
||||
workspaceId,
|
||||
payload,
|
||||
);
|
||||
const result = await workflowActionRunner.execute({
|
||||
action,
|
||||
workspaceId,
|
||||
payload,
|
||||
});
|
||||
|
||||
if (executionResult.data) {
|
||||
result = executionResult.data;
|
||||
break;
|
||||
}
|
||||
if (!executionResult.error) {
|
||||
throw new Error('Execution result error, no data or error');
|
||||
}
|
||||
if (action.settings.errorHandlingOptions.continueOnFailure.value) {
|
||||
result = payload;
|
||||
break;
|
||||
} else if (
|
||||
action.settings.errorHandlingOptions.retryOnFailure.value &&
|
||||
attemptCount < MAX_RETRIES_ON_FAILURE
|
||||
) {
|
||||
return await this.run({
|
||||
action,
|
||||
workspaceId,
|
||||
payload,
|
||||
attemptCount: attemptCount + 1,
|
||||
});
|
||||
} else {
|
||||
return executionResult.error;
|
||||
}
|
||||
}
|
||||
default:
|
||||
throw new WorkflowTriggerException(
|
||||
`Unknown action type '${action.type}'`,
|
||||
WorkflowTriggerExceptionCode.INVALID_ACTION_TYPE,
|
||||
);
|
||||
if (result.data) {
|
||||
return await this.run({
|
||||
action: action.nextAction,
|
||||
workspaceId,
|
||||
payload: result.data,
|
||||
});
|
||||
}
|
||||
|
||||
return await this.run({
|
||||
action: action.nextAction,
|
||||
workspaceId,
|
||||
payload: result,
|
||||
});
|
||||
if (!result.error) {
|
||||
throw new Error('Execution result error, no data or error');
|
||||
}
|
||||
|
||||
if (action.settings.errorHandlingOptions.continueOnFailure.value) {
|
||||
return await this.run({
|
||||
action: action.nextAction,
|
||||
workspaceId,
|
||||
payload,
|
||||
});
|
||||
}
|
||||
|
||||
if (
|
||||
action.settings.errorHandlingOptions.retryOnFailure.value &&
|
||||
attemptCount < MAX_RETRIES_ON_FAILURE
|
||||
) {
|
||||
return await this.run({
|
||||
action,
|
||||
workspaceId,
|
||||
payload,
|
||||
attemptCount: attemptCount + 1,
|
||||
});
|
||||
}
|
||||
|
||||
return result.error;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user