Add output to workflow run (#7276)

Example of output stored for following workflow:

<img width="244" alt="Capture d’écran 2024-09-27 à 11 18 06"
src="https://github.com/user-attachments/assets/722bfa96-2dd1-41f7-ab87-d39584ac9efc">

Output:

```
{"steps": [
  {"type": "CODE", "result": {"email": "test@twenty.com"}}, 
  {"type": "SEND_EMAIL", "result": {"success": true}}
]}
```
This commit is contained in:
Thomas Trompette
2024-09-30 18:45:44 +02:00
committed by GitHub
parent 06d4ba92e5
commit ca027d6772
5 changed files with 88 additions and 35 deletions

View File

@ -415,6 +415,7 @@ export const WORKFLOW_RUN_STANDARD_FIELD_IDS = {
endedAt: '20202020-e1c1-4b6b-bbbd-b2beaf2e159e', endedAt: '20202020-e1c1-4b6b-bbbd-b2beaf2e159e',
status: '20202020-6b3e-4f9c-8c2b-2e5b8e6d6f3b', status: '20202020-6b3e-4f9c-8c2b-2e5b8e6d6f3b',
createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3', createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3',
output: '20202020-7be4-4db2-8ac6-3ff0d740843d',
}; };
export const WORKFLOW_VERSION_STANDARD_FIELD_IDS = { export const WORKFLOW_VERSION_STANDARD_FIELD_IDS = {

View File

@ -27,6 +27,17 @@ export enum WorkflowRunStatus {
FAILED = 'FAILED', FAILED = 'FAILED',
} }
export type WorkflowRunOutput = {
steps: {
id: string;
name: string;
type: string;
attemptCount: number;
result: object | undefined;
error: string | undefined;
}[];
};
@WorkspaceEntity({ @WorkspaceEntity({
standardId: STANDARD_OBJECT_IDS.workflowRun, standardId: STANDARD_OBJECT_IDS.workflowRun,
namePlural: 'workflowRuns', namePlural: 'workflowRuns',
@ -108,6 +119,15 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity {
}) })
createdBy: ActorMetadata; createdBy: ActorMetadata;
@WorkspaceField({
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.output,
type: FieldMetadataType.RAW_JSON,
label: 'Output',
description: 'Json object to provide output of the workflow run',
})
@WorkspaceIsNullable()
output: WorkflowRunOutput | null;
// Relations // Relations
@WorkspaceRelation({ @WorkspaceRelation({
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.workflowVersion, standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.workflowVersion,

View File

@ -1,17 +1,17 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { WorkflowStep } from 'src/modules/workflow/workflow-executor/types/workflow-action.type';
import { import {
WorkflowExecutorException, WorkflowRunOutput,
WorkflowExecutorExceptionCode, WorkflowRunStatus,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-executor.exception'; } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
import { WorkflowStep } from 'src/modules/workflow/workflow-executor/types/workflow-action.type';
const MAX_RETRIES_ON_FAILURE = 3; const MAX_RETRIES_ON_FAILURE = 3;
export type WorkflowExecutionOutput = { export type WorkflowExecutorOutput = {
result?: object; steps: WorkflowRunOutput['steps'];
error?: object; status: WorkflowRunStatus;
}; };
@Injectable() @Injectable()
@ -22,17 +22,17 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex, currentStepIndex,
steps, steps,
payload, payload,
output,
attemptCount = 1, attemptCount = 1,
}: { }: {
currentStepIndex: number; currentStepIndex: number;
steps: WorkflowStep[]; steps: WorkflowStep[];
output: WorkflowExecutorOutput;
payload?: object; payload?: object;
attemptCount?: number; attemptCount?: number;
}): Promise<WorkflowExecutionOutput> { }): Promise<WorkflowExecutorOutput> {
if (currentStepIndex >= steps.length) { if (currentStepIndex >= steps.length) {
return { return { ...output, status: WorkflowRunStatus.COMPLETED };
result: payload,
};
} }
const step = steps[currentStepIndex]; const step = steps[currentStepIndex];
@ -44,19 +44,47 @@ export class WorkflowExecutorWorkspaceService {
payload, payload,
}); });
const baseStepOutput = {
id: step.id,
name: step.name,
type: step.type,
attemptCount,
};
const updatedOutput = {
...output,
steps: [
...output.steps,
{
...baseStepOutput,
result: result.result,
error: result.error?.errorMessage,
},
],
};
if (result.result) { if (result.result) {
return await this.execute({ return await this.execute({
currentStepIndex: currentStepIndex + 1, currentStepIndex: currentStepIndex + 1,
steps, steps,
payload: result.result, payload: result.result,
output: updatedOutput,
}); });
} }
if (!result.error) { if (!result.error) {
throw new WorkflowExecutorException( return {
'Execution result error, no data or error', ...output,
WorkflowExecutorExceptionCode.WORKFLOW_FAILED, steps: [
); ...output.steps,
{
...baseStepOutput,
result: undefined,
error: 'Execution result error, no data or error',
},
],
status: WorkflowRunStatus.FAILED,
};
} }
if (step.settings.errorHandlingOptions.continueOnFailure.value) { if (step.settings.errorHandlingOptions.continueOnFailure.value) {
@ -64,6 +92,7 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex: currentStepIndex + 1, currentStepIndex: currentStepIndex + 1,
steps, steps,
payload, payload,
output: updatedOutput,
}); });
} }
@ -75,13 +104,11 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex, currentStepIndex,
steps, steps,
payload, payload,
output: updatedOutput,
attemptCount: attemptCount + 1, attemptCount: attemptCount + 1,
}); });
} }
throw new WorkflowExecutorException( return { ...updatedOutput, status: WorkflowRunStatus.FAILED };
`Workflow failed: ${result.error}`,
WorkflowExecutorExceptionCode.WORKFLOW_FAILED,
);
} }
} }

View File

@ -3,8 +3,8 @@ import { Scope } from '@nestjs/common';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-run.workspace-service';
@ -36,24 +36,23 @@ export class RunWorkflowJob {
workflowVersionId, workflowVersionId,
); );
try { const { steps, status } =
await this.workflowExecutorWorkspaceService.execute({ await this.workflowExecutorWorkspaceService.execute({
currentStepIndex: 0, currentStepIndex: 0,
steps: workflowVersion.steps || [], steps: workflowVersion.steps || [],
payload, payload,
output: {
steps: [],
status: WorkflowRunStatus.RUNNING,
},
}); });
await this.workflowRunWorkspaceService.endWorkflowRun( await this.workflowRunWorkspaceService.endWorkflowRun(
workflowRunId, workflowRunId,
WorkflowRunStatus.COMPLETED, status,
); {
} catch (error) { steps,
await this.workflowRunWorkspaceService.endWorkflowRun( },
workflowRunId, );
WorkflowRunStatus.FAILED,
);
throw error;
}
} }
} }

View File

@ -2,11 +2,12 @@ import { Injectable } from '@nestjs/common';
import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { import {
WorkflowRunOutput,
WorkflowRunStatus, WorkflowRunStatus,
WorkflowRunWorkspaceEntity, WorkflowRunWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { import {
WorkflowRunException, WorkflowRunException,
WorkflowRunExceptionCode, WorkflowRunExceptionCode,
@ -70,7 +71,11 @@ export class WorkflowRunWorkspaceService {
}); });
} }
async endWorkflowRun(workflowRunId: string, status: WorkflowRunStatus) { async endWorkflowRun(
workflowRunId: string,
status: WorkflowRunStatus,
output: WorkflowRunOutput,
) {
const workflowRunRepository = const workflowRunRepository =
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>( await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
'workflowRun', 'workflowRun',
@ -96,6 +101,7 @@ export class WorkflowRunWorkspaceService {
return workflowRunRepository.update(workflowRunToUpdate.id, { return workflowRunRepository.update(workflowRunToUpdate.id, {
status, status,
output,
endedAt: new Date().toISOString(), endedAt: new Date().toISOString(),
}); });
} }