diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index 1fa9a7f41..890db119e 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -415,6 +415,7 @@ export const WORKFLOW_RUN_STANDARD_FIELD_IDS = { endedAt: '20202020-e1c1-4b6b-bbbd-b2beaf2e159e', status: '20202020-6b3e-4f9c-8c2b-2e5b8e6d6f3b', createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3', + output: '20202020-7be4-4db2-8ac6-3ff0d740843d', }; export const WORKFLOW_VERSION_STANDARD_FIELD_IDS = { diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index c38cf4b5a..388d5f129 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -27,6 +27,17 @@ export enum WorkflowRunStatus { FAILED = 'FAILED', } +export type WorkflowRunOutput = { + steps: { + id: string; + name: string; + type: string; + attemptCount: number; + result: object | undefined; + error: string | undefined; + }[]; +}; + @WorkspaceEntity({ standardId: STANDARD_OBJECT_IDS.workflowRun, namePlural: 'workflowRuns', @@ -108,6 +119,15 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { }) createdBy: ActorMetadata; + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.output, + type: FieldMetadataType.RAW_JSON, + label: 'Output', + description: 'Json object to provide output of the workflow run', + }) + @WorkspaceIsNullable() + output: WorkflowRunOutput | null; + // Relations @WorkspaceRelation({ standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.workflowVersion, diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index 593543047..c50684f87 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -1,17 +1,17 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowStep } from 'src/modules/workflow/workflow-executor/types/workflow-action.type'; import { - WorkflowExecutorException, - WorkflowExecutorExceptionCode, -} from 'src/modules/workflow/workflow-executor/exceptions/workflow-executor.exception'; + WorkflowRunOutput, + WorkflowRunStatus, +} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; +import { WorkflowStep } from 'src/modules/workflow/workflow-executor/types/workflow-action.type'; const MAX_RETRIES_ON_FAILURE = 3; -export type WorkflowExecutionOutput = { - result?: object; - error?: object; +export type WorkflowExecutorOutput = { + steps: WorkflowRunOutput['steps']; + status: WorkflowRunStatus; }; @Injectable() @@ -22,17 +22,17 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex, steps, payload, + output, attemptCount = 1, }: { currentStepIndex: number; steps: WorkflowStep[]; + output: WorkflowExecutorOutput; payload?: object; attemptCount?: number; - }): Promise { + }): Promise { if (currentStepIndex >= steps.length) { - return { - result: payload, - }; + return { ...output, status: WorkflowRunStatus.COMPLETED }; } const step = steps[currentStepIndex]; @@ -44,19 +44,47 @@ export class WorkflowExecutorWorkspaceService { payload, }); + const baseStepOutput = { + id: step.id, + name: step.name, + type: step.type, + attemptCount, + }; + + const updatedOutput = { + ...output, + steps: [ + ...output.steps, + { + ...baseStepOutput, + result: result.result, + error: result.error?.errorMessage, + }, + ], + }; + if (result.result) { return await this.execute({ currentStepIndex: currentStepIndex + 1, steps, payload: result.result, + output: updatedOutput, }); } if (!result.error) { - throw new WorkflowExecutorException( - 'Execution result error, no data or error', - WorkflowExecutorExceptionCode.WORKFLOW_FAILED, - ); + return { + ...output, + steps: [ + ...output.steps, + { + ...baseStepOutput, + result: undefined, + error: 'Execution result error, no data or error', + }, + ], + status: WorkflowRunStatus.FAILED, + }; } if (step.settings.errorHandlingOptions.continueOnFailure.value) { @@ -64,6 +92,7 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex: currentStepIndex + 1, steps, payload, + output: updatedOutput, }); } @@ -75,13 +104,11 @@ export class WorkflowExecutorWorkspaceService { currentStepIndex, steps, payload, + output: updatedOutput, attemptCount: attemptCount + 1, }); } - throw new WorkflowExecutorException( - `Workflow failed: ${result.error}`, - WorkflowExecutorExceptionCode.WORKFLOW_FAILED, - ); + return { ...updatedOutput, status: WorkflowRunStatus.FAILED }; } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index 0ca2a3107..5a79462a3 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -3,8 +3,8 @@ import { Scope } from '@nestjs/common'; import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; -import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; @@ -36,24 +36,23 @@ export class RunWorkflowJob { workflowVersionId, ); - try { + const { steps, status } = await this.workflowExecutorWorkspaceService.execute({ currentStepIndex: 0, steps: workflowVersion.steps || [], payload, + output: { + steps: [], + status: WorkflowRunStatus.RUNNING, + }, }); - await this.workflowRunWorkspaceService.endWorkflowRun( - workflowRunId, - WorkflowRunStatus.COMPLETED, - ); - } catch (error) { - await this.workflowRunWorkspaceService.endWorkflowRun( - workflowRunId, - WorkflowRunStatus.FAILED, - ); - - throw error; - } + await this.workflowRunWorkspaceService.endWorkflowRun( + workflowRunId, + status, + { + steps, + }, + ); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service.ts index 00162c595..2f3aca255 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service.ts @@ -2,11 +2,12 @@ import { Injectable } from '@nestjs/common'; import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; import { + WorkflowRunOutput, WorkflowRunStatus, WorkflowRunWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; import { WorkflowRunException, WorkflowRunExceptionCode, @@ -70,7 +71,11 @@ export class WorkflowRunWorkspaceService { }); } - async endWorkflowRun(workflowRunId: string, status: WorkflowRunStatus) { + async endWorkflowRun( + workflowRunId: string, + status: WorkflowRunStatus, + output: WorkflowRunOutput, + ) { const workflowRunRepository = await this.twentyORMManager.getRepository( 'workflowRun', @@ -96,6 +101,7 @@ export class WorkflowRunWorkspaceService { return workflowRunRepository.update(workflowRunToUpdate.id, { status, + output, endedAt: new Date().toISOString(), }); }