Fix workflow run output empty (#9616)
- catch error on action execution. We will log the error and return it in the step - catch error on workflow run - remove the catch in the action. All actions should simply throw and let the executor do the job <img width="1512" alt="Capture d’écran 2025-01-14 à 17 35 53" src="https://github.com/user-attachments/assets/dcf79567-a309-45f1-a640-c50b7ac4769b" />
This commit is contained in:
@ -10,4 +10,5 @@ export class SendEmailActionException extends CustomException {
|
||||
export enum SendEmailActionExceptionCode {
|
||||
PROVIDER_NOT_SUPPORTED = 'PROVIDER_NOT_SUPPORTED',
|
||||
CONNECTED_ACCOUNT_NOT_FOUND = 'CONNECTED_ACCOUNT_NOT_FOUND',
|
||||
INVALID_EMAIL = 'INVALID_EMAIL',
|
||||
}
|
||||
|
||||
@ -80,47 +80,44 @@ export class SendEmailWorkflowAction implements WorkflowAction {
|
||||
);
|
||||
const { email, body, subject } = workflowActionInput;
|
||||
|
||||
try {
|
||||
const emailSchema = z.string().trim().email('Invalid email');
|
||||
const emailSchema = z.string().trim().email('Invalid email');
|
||||
|
||||
const result = emailSchema.safeParse(email);
|
||||
const result = emailSchema.safeParse(email);
|
||||
|
||||
if (!result.success) {
|
||||
this.logger.warn(`Email '${email}' invalid`);
|
||||
|
||||
return { result: { success: false } };
|
||||
}
|
||||
|
||||
const window = new JSDOM('').window;
|
||||
const purify = DOMPurify(window);
|
||||
const safeBody = purify.sanitize(body || '');
|
||||
const safeSubject = purify.sanitize(subject || '');
|
||||
|
||||
const message = [
|
||||
`To: ${email}`,
|
||||
`Subject: ${safeSubject || ''}`,
|
||||
'MIME-Version: 1.0',
|
||||
'Content-Type: text/plain; charset="UTF-8"',
|
||||
'',
|
||||
safeBody,
|
||||
].join('\n');
|
||||
|
||||
const encodedMessage = Buffer.from(message).toString('base64');
|
||||
|
||||
await emailProvider.users.messages.send({
|
||||
userId: 'me',
|
||||
requestBody: {
|
||||
raw: encodedMessage,
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(`Email sent successfully`);
|
||||
|
||||
return {
|
||||
result: { success: true } satisfies WorkflowSendEmailStepOutputSchema,
|
||||
};
|
||||
} catch (error) {
|
||||
return { error };
|
||||
if (!result.success) {
|
||||
throw new SendEmailActionException(
|
||||
`Email '${email}' invalid`,
|
||||
SendEmailActionExceptionCode.INVALID_EMAIL,
|
||||
);
|
||||
}
|
||||
|
||||
const window = new JSDOM('').window;
|
||||
const purify = DOMPurify(window);
|
||||
const safeBody = purify.sanitize(body || '');
|
||||
const safeSubject = purify.sanitize(subject || '');
|
||||
|
||||
const message = [
|
||||
`To: ${email}`,
|
||||
`Subject: ${safeSubject || ''}`,
|
||||
'MIME-Version: 1.0',
|
||||
'Content-Type: text/plain; charset="UTF-8"',
|
||||
'',
|
||||
safeBody,
|
||||
].join('\n');
|
||||
|
||||
const encodedMessage = Buffer.from(message).toString('base64');
|
||||
|
||||
await emailProvider.users.messages.send({
|
||||
userId: 'me',
|
||||
requestBody: {
|
||||
raw: encodedMessage,
|
||||
},
|
||||
});
|
||||
|
||||
this.logger.log(`Email sent successfully`);
|
||||
|
||||
return {
|
||||
result: { success: true } satisfies WorkflowSendEmailStepOutputSchema,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import {
|
||||
WorkflowRunOutput,
|
||||
@ -6,6 +6,7 @@ import {
|
||||
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
|
||||
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
|
||||
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
|
||||
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
|
||||
|
||||
const MAX_RETRIES_ON_FAILURE = 3;
|
||||
@ -17,6 +18,7 @@ export type WorkflowExecutorOutput = {
|
||||
|
||||
@Injectable()
|
||||
export class WorkflowExecutorWorkspaceService {
|
||||
private readonly logger = new Logger(WorkflowExecutorWorkspaceService.name);
|
||||
constructor(private readonly workflowActionFactory: WorkflowActionFactory) {}
|
||||
|
||||
async execute({
|
||||
@ -42,7 +44,19 @@ export class WorkflowExecutorWorkspaceService {
|
||||
|
||||
const actionPayload = resolveInput(step.settings.input, context);
|
||||
|
||||
const result = await workflowAction.execute(actionPayload);
|
||||
let result: WorkflowActionResult;
|
||||
|
||||
try {
|
||||
result = await workflowAction.execute(actionPayload);
|
||||
} catch (error) {
|
||||
result = {
|
||||
error: {
|
||||
errorType: error.name,
|
||||
errorMessage: error.message,
|
||||
stackTrace: error.stack,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const stepOutput = output.steps[step.id];
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import { Scope } from '@nestjs/common';
|
||||
import { Logger, Scope } from '@nestjs/common';
|
||||
|
||||
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
|
||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||
@ -23,6 +23,7 @@ export type RunWorkflowJobData = {
|
||||
|
||||
@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
|
||||
export class RunWorkflowJob {
|
||||
private readonly logger = new Logger(RunWorkflowJob.name);
|
||||
constructor(
|
||||
private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
|
||||
private readonly workflowExecutorWorkspaceService: WorkflowExecutorWorkspaceService,
|
||||
@ -39,41 +40,33 @@ export class RunWorkflowJob {
|
||||
}: RunWorkflowJobData): Promise<void> {
|
||||
await this.workflowRunWorkspaceService.startWorkflowRun(workflowRunId);
|
||||
|
||||
const workflowVersion =
|
||||
await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail(
|
||||
workflowVersionId,
|
||||
);
|
||||
|
||||
await this.throttleExecution(workflowVersion.workflowId, workflowRunId);
|
||||
|
||||
const { steps, status } =
|
||||
await this.workflowExecutorWorkspaceService.execute({
|
||||
currentStepIndex: 0,
|
||||
steps: workflowVersion.steps || [],
|
||||
context: {
|
||||
trigger: payload,
|
||||
},
|
||||
output: {
|
||||
steps: {},
|
||||
status: WorkflowRunStatus.RUNNING,
|
||||
},
|
||||
});
|
||||
|
||||
await this.workflowRunWorkspaceService.endWorkflowRun(
|
||||
workflowRunId,
|
||||
status,
|
||||
{
|
||||
steps,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
private async throttleExecution(workflowId: string, workflowRunId: string) {
|
||||
try {
|
||||
await this.throttlerService.throttle(
|
||||
`${workflowId}-workflow-execution`,
|
||||
this.environmentService.get('WORKFLOW_EXEC_THROTTLE_LIMIT'),
|
||||
this.environmentService.get('WORKFLOW_EXEC_THROTTLE_TTL'),
|
||||
const workflowVersion =
|
||||
await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail(
|
||||
workflowVersionId,
|
||||
);
|
||||
|
||||
await this.throttleExecution(workflowVersion.workflowId);
|
||||
|
||||
const { steps, status } =
|
||||
await this.workflowExecutorWorkspaceService.execute({
|
||||
currentStepIndex: 0,
|
||||
steps: workflowVersion.steps || [],
|
||||
context: {
|
||||
trigger: payload,
|
||||
},
|
||||
output: {
|
||||
steps: {},
|
||||
status: WorkflowRunStatus.RUNNING,
|
||||
},
|
||||
});
|
||||
|
||||
await this.workflowRunWorkspaceService.endWorkflowRun(
|
||||
workflowRunId,
|
||||
status,
|
||||
{
|
||||
steps,
|
||||
},
|
||||
);
|
||||
} catch (error) {
|
||||
await this.workflowRunWorkspaceService.endWorkflowRun(
|
||||
@ -81,9 +74,20 @@ export class RunWorkflowJob {
|
||||
WorkflowRunStatus.FAILED,
|
||||
{
|
||||
steps: {},
|
||||
error: 'Workflow execution rate limit exceeded',
|
||||
error: error.message,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async throttleExecution(workflowId: string) {
|
||||
try {
|
||||
await this.throttlerService.throttle(
|
||||
`${workflowId}-workflow-execution`,
|
||||
this.environmentService.get('WORKFLOW_EXEC_THROTTLE_LIMIT'),
|
||||
this.environmentService.get('WORKFLOW_EXEC_THROTTLE_TTL'),
|
||||
);
|
||||
} catch (error) {
|
||||
throw new WorkflowRunException(
|
||||
'Workflow execution rate limit exceeded',
|
||||
WorkflowRunExceptionCode.WORKFLOW_RUN_LIMIT_REACHED,
|
||||
|
||||
Reference in New Issue
Block a user