diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/exceptions/send-email-action.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/exceptions/send-email-action.exception.ts index 18eb09486..346749425 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/exceptions/send-email-action.exception.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/exceptions/send-email-action.exception.ts @@ -10,4 +10,5 @@ export class SendEmailActionException extends CustomException { export enum SendEmailActionExceptionCode { PROVIDER_NOT_SUPPORTED = 'PROVIDER_NOT_SUPPORTED', CONNECTED_ACCOUNT_NOT_FOUND = 'CONNECTED_ACCOUNT_NOT_FOUND', + INVALID_EMAIL = 'INVALID_EMAIL', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts index 917d0766c..01003b30f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts @@ -80,47 +80,44 @@ export class SendEmailWorkflowAction implements WorkflowAction { ); const { email, body, subject } = workflowActionInput; - try { - const emailSchema = z.string().trim().email('Invalid email'); + const emailSchema = z.string().trim().email('Invalid email'); - const result = emailSchema.safeParse(email); + const result = emailSchema.safeParse(email); - if (!result.success) { - this.logger.warn(`Email '${email}' invalid`); - - return { result: { success: false } }; - } - - const window = new JSDOM('').window; - const purify = DOMPurify(window); - const safeBody = purify.sanitize(body || ''); - const safeSubject = purify.sanitize(subject || ''); - - const message = [ - `To: ${email}`, - `Subject: ${safeSubject || ''}`, - 'MIME-Version: 1.0', - 'Content-Type: text/plain; charset="UTF-8"', - '', - safeBody, - ].join('\n'); - - const encodedMessage = Buffer.from(message).toString('base64'); - - await emailProvider.users.messages.send({ - userId: 'me', - requestBody: { - raw: encodedMessage, - }, - }); - - this.logger.log(`Email sent successfully`); - - return { - result: { success: true } satisfies WorkflowSendEmailStepOutputSchema, - }; - } catch (error) { - return { error }; + if (!result.success) { + throw new SendEmailActionException( + `Email '${email}' invalid`, + SendEmailActionExceptionCode.INVALID_EMAIL, + ); } + + const window = new JSDOM('').window; + const purify = DOMPurify(window); + const safeBody = purify.sanitize(body || ''); + const safeSubject = purify.sanitize(subject || ''); + + const message = [ + `To: ${email}`, + `Subject: ${safeSubject || ''}`, + 'MIME-Version: 1.0', + 'Content-Type: text/plain; charset="UTF-8"', + '', + safeBody, + ].join('\n'); + + const encodedMessage = Buffer.from(message).toString('base64'); + + await emailProvider.users.messages.send({ + userId: 'me', + requestBody: { + raw: encodedMessage, + }, + }); + + this.logger.log(`Email sent successfully`); + + return { + result: { success: true } satisfies WorkflowSendEmailStepOutputSchema, + }; } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index a64f8bd6a..bc2655d7a 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; import { WorkflowRunOutput, @@ -6,6 +6,7 @@ import { } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; +import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; const MAX_RETRIES_ON_FAILURE = 3; @@ -17,6 +18,7 @@ export type WorkflowExecutorOutput = { @Injectable() export class WorkflowExecutorWorkspaceService { + private readonly logger = new Logger(WorkflowExecutorWorkspaceService.name); constructor(private readonly workflowActionFactory: WorkflowActionFactory) {} async execute({ @@ -42,7 +44,19 @@ export class WorkflowExecutorWorkspaceService { const actionPayload = resolveInput(step.settings.input, context); - const result = await workflowAction.execute(actionPayload); + let result: WorkflowActionResult; + + try { + result = await workflowAction.execute(actionPayload); + } catch (error) { + result = { + error: { + errorType: error.name, + errorMessage: error.message, + stackTrace: error.stack, + }, + }; + } const stepOutput = output.steps[step.id]; diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index f6069bf3f..cb8fd1f52 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -1,4 +1,4 @@ -import { Scope } from '@nestjs/common'; +import { Logger, Scope } from '@nestjs/common'; import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service'; import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; @@ -23,6 +23,7 @@ export type RunWorkflowJobData = { @Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class RunWorkflowJob { + private readonly logger = new Logger(RunWorkflowJob.name); constructor( private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService, private readonly workflowExecutorWorkspaceService: WorkflowExecutorWorkspaceService, @@ -39,41 +40,33 @@ export class RunWorkflowJob { }: RunWorkflowJobData): Promise { await this.workflowRunWorkspaceService.startWorkflowRun(workflowRunId); - const workflowVersion = - await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail( - workflowVersionId, - ); - - await this.throttleExecution(workflowVersion.workflowId, workflowRunId); - - const { steps, status } = - await this.workflowExecutorWorkspaceService.execute({ - currentStepIndex: 0, - steps: workflowVersion.steps || [], - context: { - trigger: payload, - }, - output: { - steps: {}, - status: WorkflowRunStatus.RUNNING, - }, - }); - - await this.workflowRunWorkspaceService.endWorkflowRun( - workflowRunId, - status, - { - steps, - }, - ); - } - - private async throttleExecution(workflowId: string, workflowRunId: string) { try { - await this.throttlerService.throttle( - `${workflowId}-workflow-execution`, - this.environmentService.get('WORKFLOW_EXEC_THROTTLE_LIMIT'), - this.environmentService.get('WORKFLOW_EXEC_THROTTLE_TTL'), + const workflowVersion = + await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail( + workflowVersionId, + ); + + await this.throttleExecution(workflowVersion.workflowId); + + const { steps, status } = + await this.workflowExecutorWorkspaceService.execute({ + currentStepIndex: 0, + steps: workflowVersion.steps || [], + context: { + trigger: payload, + }, + output: { + steps: {}, + status: WorkflowRunStatus.RUNNING, + }, + }); + + await this.workflowRunWorkspaceService.endWorkflowRun( + workflowRunId, + status, + { + steps, + }, ); } catch (error) { await this.workflowRunWorkspaceService.endWorkflowRun( @@ -81,9 +74,20 @@ export class RunWorkflowJob { WorkflowRunStatus.FAILED, { steps: {}, - error: 'Workflow execution rate limit exceeded', + error: error.message, }, ); + } + } + + private async throttleExecution(workflowId: string) { + try { + await this.throttlerService.throttle( + `${workflowId}-workflow-execution`, + this.environmentService.get('WORKFLOW_EXEC_THROTTLE_LIMIT'), + this.environmentService.get('WORKFLOW_EXEC_THROTTLE_TTL'), + ); + } catch (error) { throw new WorkflowRunException( 'Workflow execution rate limit exceeded', WorkflowRunExceptionCode.WORKFLOW_RUN_LIMIT_REACHED,