Migrate workflow actions to executors (#10432)

Actions will now:
- receive the complete input
- get the step they want to execute by themself
- check that the type is the right one
- resolve variables

These all share a common executor interface.

It will allow for actions with a special execution process (forms, loop,
router) to have all required informations.

Main workflow executor should:
- find the right executor to call for current step
- store the output and context from step execution
- call next step index
This commit is contained in:
Thomas Trompette
2025-02-24 14:36:24 +01:00
committed by GitHub
parent 1f2c5c5fa5
commit 446924cf24
24 changed files with 320 additions and 155 deletions

View File

@ -22,6 +22,7 @@ import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/f
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type';
@ -32,13 +33,9 @@ export enum WorkflowRunStatus {
FAILED = 'FAILED',
}
type StepRunOutput = {
export type StepOutput = {
id: string;
outputs: {
attemptCount: number;
result: object | undefined;
error: string | undefined;
}[];
output: WorkflowExecutorOutput;
};
export type WorkflowRunOutput = {
@ -46,7 +43,7 @@ export type WorkflowRunOutput = {
trigger: WorkflowTrigger;
steps: WorkflowAction[];
};
stepsOutput?: Record<string, StepRunOutput>;
stepsOutput?: Record<string, WorkflowExecutorOutput>;
error?: string;
};

View File

@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import {
WorkflowStepExecutorException,
@ -10,22 +10,22 @@ import { CodeWorkflowAction } from 'src/modules/workflow/workflow-executor/workf
import { SendEmailWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action';
import { CreateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action';
import { DeleteRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action';
import { FindRecordsWorflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action';
import { FindRecordsWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action';
import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action';
import { WorkflowActionType } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
@Injectable()
export class WorkflowActionFactory {
export class WorkflowExecutorFactory {
constructor(
private readonly codeWorkflowAction: CodeWorkflowAction,
private readonly sendEmailWorkflowAction: SendEmailWorkflowAction,
private readonly createRecordWorkflowAction: CreateRecordWorkflowAction,
private readonly updateRecordWorkflowAction: UpdateRecordWorkflowAction,
private readonly deleteRecordWorkflowAction: DeleteRecordWorkflowAction,
private readonly findRecordsWorflowAction: FindRecordsWorflowAction,
private readonly findRecordsWorkflowAction: FindRecordsWorkflowAction,
) {}
get(stepType: WorkflowActionType): WorkflowAction {
get(stepType: WorkflowActionType): WorkflowExecutor {
switch (stepType) {
case WorkflowActionType.CODE:
return this.codeWorkflowAction;
@ -38,7 +38,7 @@ export class WorkflowActionFactory {
case WorkflowActionType.DELETE_RECORD:
return this.deleteRecordWorkflowAction;
case WorkflowActionType.FIND_RECORDS:
return this.findRecordsWorflowAction;
return this.findRecordsWorkflowAction;
default:
throw new WorkflowStepExecutorException(
`Workflow step executor not found for step type '${stepType}'`,

View File

@ -1,5 +0,0 @@
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
export interface WorkflowAction {
execute(workflowStepInput: unknown): Promise<WorkflowActionResult>;
}

View File

@ -0,0 +1,8 @@
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
export interface WorkflowExecutor {
execute(
workflowExecutorInput: WorkflowExecutorInput,
): Promise<WorkflowExecutorOutput>;
}

View File

@ -0,0 +1,9 @@
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export type WorkflowExecutorInput = {
currentStepIndex: number;
steps: WorkflowAction[];
context: Record<string, unknown>;
workflowRunId: string;
attemptCount?: number;
};

View File

@ -0,0 +1,4 @@
export type WorkflowExecutorOutput = {
result?: object;
error?: string;
};

View File

@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
@ -8,19 +8,38 @@ import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import { isWorkflowCodeAction } from 'src/modules/workflow/workflow-executor/workflow-actions/code/guards/is-workflow-code-action.guard';
import { WorkflowCodeActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/code/types/workflow-code-action-input.type';
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
@Injectable()
export class CodeWorkflowAction implements WorkflowAction {
export class CodeWorkflowAction implements WorkflowExecutor {
constructor(
private readonly serverlessFunctionService: ServerlessFunctionService,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
) {}
async execute(
workflowActionInput: WorkflowCodeActionInput,
): Promise<WorkflowActionResult> {
async execute({
currentStepIndex,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
if (!isWorkflowCodeAction(step)) {
throw new WorkflowStepExecutorException(
'Step is not a code action',
WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE,
);
}
const workflowActionInput = resolveInput(
step.settings.input,
context,
) as WorkflowCodeActionInput;
try {
const { workspaceId } = this.scopedWorkspaceContextFactory.create();
@ -40,7 +59,7 @@ export class CodeWorkflowAction implements WorkflowAction {
);
if (result.error) {
return { error: result.error };
return { error: result.error.errorMessage };
}
return { result: result.data || {} };

View File

@ -0,0 +1,11 @@
import {
WorkflowAction,
WorkflowActionType,
WorkflowCodeAction,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export const isWorkflowCodeAction = (
action: WorkflowAction,
): action is WorkflowCodeAction => {
return action.type === WorkflowActionType.CODE;
};

View File

@ -0,0 +1,11 @@
import {
WorkflowAction,
WorkflowActionType,
WorkflowSendEmailAction,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export const isWorkflowSendEmailAction = (
action: WorkflowAction,
): action is WorkflowSendEmailAction => {
return action.type === WorkflowActionType.SEND_EMAIL;
};

View File

@ -5,7 +5,7 @@ import { JSDOM } from 'jsdom';
import { isDefined, isValidUuid } from 'twenty-shared';
import { z } from 'zod';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
@ -15,19 +15,22 @@ import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import {
SendEmailActionException,
SendEmailActionExceptionCode,
} from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/exceptions/send-email-action.exception';
import { isWorkflowSendEmailAction } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/guards/is-workflow-send-email-action.guard';
import { WorkflowSendEmailActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/types/workflow-send-email-action-input.type';
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
export type WorkflowSendEmailStepOutputSchema = {
success: boolean;
};
@Injectable()
export class SendEmailWorkflowAction implements WorkflowAction {
export class SendEmailWorkflowAction implements WorkflowExecutor {
private readonly logger = new Logger(SendEmailWorkflowAction.name);
constructor(
private readonly gmailClientProvider: GmailClientProvider,
@ -79,12 +82,29 @@ export class SendEmailWorkflowAction implements WorkflowAction {
}
}
async execute(
workflowActionInput: WorkflowSendEmailActionInput,
): Promise<WorkflowActionResult> {
async execute({
currentStepIndex,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
if (!isWorkflowSendEmailAction(step)) {
throw new WorkflowStepExecutorException(
'Step is not a send email action',
WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE,
);
}
const emailProvider = await this.getEmailClient(
workflowActionInput.connectedAccountId,
step.settings.input.connectedAccountId,
);
const workflowActionInput = resolveInput(
step.settings.input,
context,
) as WorkflowSendEmailActionInput;
const { email, body, subject } = workflowActionInput;
const emailSchema = z.string().trim().email('Invalid email');

View File

@ -3,7 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
@ -11,15 +11,22 @@ import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadat
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import {
RecordCRUDActionException,
RecordCRUDActionExceptionCode,
} from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception';
import { isWorkflowCreateRecordAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-create-record-action.guard';
import { WorkflowCreateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type';
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
@Injectable()
export class CreateRecordWorkflowAction implements WorkflowAction {
export class CreateRecordWorkflowAction implements WorkflowExecutor {
constructor(
private readonly twentyORMManager: TwentyORMManager,
@InjectRepository(ObjectMetadataEntity, 'metadata')
@ -28,12 +35,19 @@ export class CreateRecordWorkflowAction implements WorkflowAction {
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
) {}
async execute(
workflowActionInput: WorkflowCreateRecordActionInput,
): Promise<WorkflowActionResult> {
const repository = await this.twentyORMManager.getRepository(
workflowActionInput.objectName,
);
async execute({
currentStepIndex,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
if (!isWorkflowCreateRecordAction(step)) {
throw new WorkflowStepExecutorException(
'Step is not a create record action',
WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE,
);
}
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
@ -44,6 +58,15 @@ export class CreateRecordWorkflowAction implements WorkflowAction {
);
}
const workflowActionInput = resolveInput(
step.settings.input,
context,
) as WorkflowCreateRecordActionInput;
const repository = await this.twentyORMManager.getRepository(
workflowActionInput.objectName,
);
const objectMetadata = await this.objectMetadataRepository.findOne({
where: {
nameSingular: workflowActionInput.objectName,

View File

@ -3,22 +3,29 @@ import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import {
RecordCRUDActionException,
RecordCRUDActionExceptionCode,
} from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception';
import { isWorkflowDeleteRecordAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-delete-record-action.guard';
import { WorkflowDeleteRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type';
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
@Injectable()
export class DeleteRecordWorkflowAction implements WorkflowAction {
export class DeleteRecordWorkflowAction implements WorkflowExecutor {
constructor(
private readonly twentyORMManager: TwentyORMManager,
@InjectRepository(ObjectMetadataEntity, 'metadata')
@ -27,9 +34,25 @@ export class DeleteRecordWorkflowAction implements WorkflowAction {
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
) {}
async execute(
workflowActionInput: WorkflowDeleteRecordActionInput,
): Promise<WorkflowActionResult> {
async execute({
currentStepIndex,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
if (!isWorkflowDeleteRecordAction(step)) {
throw new WorkflowStepExecutorException(
'Step is not a delete record action',
WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE,
);
}
const workflowActionInput = resolveInput(
step.settings.input,
context,
) as WorkflowDeleteRecordActionInput;
const repository = await this.twentyORMManager.getRepository(
workflowActionInput.objectName,
);

View File

@ -8,7 +8,7 @@ import {
ObjectRecordOrderBy,
OrderByDirection,
} from 'src/engine/api/graphql/workspace-query-builder/interfaces/object-record.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { QUERY_MAX_RECORDS } from 'src/engine/api/graphql/graphql-query-runner/constants/query-max-records.constant';
import { GraphqlQueryParser } from 'src/engine/api/graphql/graphql-query-runner/graphql-query-parsers/graphql-query.parser';
@ -21,15 +21,22 @@ import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { formatResult } from 'src/engine/twenty-orm/utils/format-result.util';
import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service';
import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import {
RecordCRUDActionException,
RecordCRUDActionExceptionCode,
} from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception';
import { isWorkflowFindRecordsAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-find-records-action.guard';
import { WorkflowFindRecordsActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type';
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
@Injectable()
export class FindRecordsWorflowAction implements WorkflowAction {
export class FindRecordsWorkflowAction implements WorkflowExecutor {
constructor(
private readonly twentyORMManager: TwentyORMManager,
private readonly workspaceCacheStorageService: WorkspaceCacheStorageService,
@ -37,12 +44,29 @@ export class FindRecordsWorflowAction implements WorkflowAction {
private readonly featureFlagService: FeatureFlagService,
) {}
async execute(
workflowActionInput: WorkflowFindRecordsActionInput,
): Promise<WorkflowActionResult> {
async execute({
currentStepIndex,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
if (!isWorkflowFindRecordsAction(step)) {
throw new WorkflowStepExecutorException(
'Step is not a find records action',
WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE,
);
}
const workflowActionInput = resolveInput(
step.settings.input,
context,
) as WorkflowFindRecordsActionInput;
const repository = await this.twentyORMManager.getRepository(
workflowActionInput.objectName,
);
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
if (!workspaceId) {

View File

@ -0,0 +1,11 @@
import {
WorkflowAction,
WorkflowActionType,
WorkflowCreateRecordAction,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export const isWorkflowCreateRecordAction = (
action: WorkflowAction,
): action is WorkflowCreateRecordAction => {
return action.type === WorkflowActionType.CREATE_RECORD;
};

View File

@ -0,0 +1,11 @@
import {
WorkflowAction,
WorkflowActionType,
WorkflowDeleteRecordAction,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export const isWorkflowDeleteRecordAction = (
action: WorkflowAction,
): action is WorkflowDeleteRecordAction => {
return action.type === WorkflowActionType.DELETE_RECORD;
};

View File

@ -0,0 +1,11 @@
import {
WorkflowAction,
WorkflowActionType,
WorkflowFindRecordsAction,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export const isWorkflowFindRecordsAction = (
action: WorkflowAction,
): action is WorkflowFindRecordsAction => {
return action.type === WorkflowActionType.FIND_RECORDS;
};

View File

@ -0,0 +1,11 @@
import {
WorkflowAction,
WorkflowActionType,
WorkflowUpdateRecordAction,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export const isWorkflowUpdateRecordAction = (
action: WorkflowAction,
): action is WorkflowUpdateRecordAction => {
return action.type === WorkflowActionType.UPDATE_RECORD;
};

View File

@ -8,7 +8,7 @@ import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/s
import { WorkspaceCacheStorageModule } from 'src/engine/workspace-cache-storage/workspace-cache-storage.module';
import { CreateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action';
import { DeleteRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action';
import { FindRecordsWorflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action';
import { FindRecordsWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action';
import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action';
@Module({
@ -22,13 +22,13 @@ import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-execut
CreateRecordWorkflowAction,
UpdateRecordWorkflowAction,
DeleteRecordWorkflowAction,
FindRecordsWorflowAction,
FindRecordsWorkflowAction,
],
exports: [
CreateRecordWorkflowAction,
UpdateRecordWorkflowAction,
DeleteRecordWorkflowAction,
FindRecordsWorflowAction,
FindRecordsWorkflowAction,
],
})
export class RecordCRUDActionModule {}

View File

@ -3,7 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
@ -13,15 +13,22 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { formatData } from 'src/engine/twenty-orm/utils/format-data.util';
import { WorkspaceCacheStorageService } from 'src/engine/workspace-cache-storage/workspace-cache-storage.service';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import {
RecordCRUDActionException,
RecordCRUDActionExceptionCode,
} from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/exceptions/record-crud-action.exception';
import { isWorkflowUpdateRecordAction } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/guards/is-workflow-update-record-action.guard';
import { WorkflowUpdateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type';
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
@Injectable()
export class UpdateRecordWorkflowAction implements WorkflowAction {
export class UpdateRecordWorkflowAction implements WorkflowExecutor {
constructor(
private readonly twentyORMManager: TwentyORMManager,
private readonly workspaceCacheStorageService: WorkspaceCacheStorageService,
@ -31,9 +38,25 @@ export class UpdateRecordWorkflowAction implements WorkflowAction {
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
) {}
async execute(
workflowActionInput: WorkflowUpdateRecordActionInput,
): Promise<WorkflowActionResult> {
async execute({
currentStepIndex,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps[currentStepIndex];
if (!isWorkflowUpdateRecordAction(step)) {
throw new WorkflowStepExecutorException(
'Step is not an update record action',
WorkflowStepExecutorExceptionCode.INVALID_STEP_TYPE,
);
}
const workflowActionInput = resolveInput(
step.settings.input,
context,
) as WorkflowUpdateRecordActionInput;
const repository = await this.twentyORMManager.getRepository(
workflowActionInput.objectName,
);

View File

@ -1,10 +0,0 @@
type WorkflowActionError = {
errorType: string;
errorMessage: string;
stackTrace: string;
};
export type WorkflowActionResult = {
result?: object;
error?: WorkflowActionError;
};

View File

@ -2,7 +2,7 @@ import { Module } from '@nestjs/common';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory';
import { CodeActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/code/code-action.module';
import { SendEmailActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email-action.module';
import { RecordCRUDActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/record-crud-action.module';
@ -20,7 +20,7 @@ import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow
providers: [
WorkflowExecutorWorkspaceService,
ScopedWorkspaceContextFactory,
WorkflowActionFactory,
WorkflowExecutorFactory,
],
exports: [WorkflowExecutorWorkspaceService],
})

View File

@ -1,18 +1,20 @@
import { Injectable } from '@nestjs/common';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant';
import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
StepOutput,
WorkflowRunOutput,
WorkflowRunStatus,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import { WorkflowActionResult } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-result.type';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
const MAX_RETRIES_ON_FAILURE = 3;
@ -23,9 +25,9 @@ export type WorkflowExecutorState = {
};
@Injectable()
export class WorkflowExecutorWorkspaceService {
export class WorkflowExecutorWorkspaceService implements WorkflowExecutor {
constructor(
private readonly workflowActionFactory: WorkflowActionFactory,
private readonly workflowExecutorFactory: WorkflowExecutorFactory,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
@ -35,84 +37,55 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex,
steps,
context,
workflowExecutorState,
attemptCount = 1,
workflowRunId,
}: {
currentStepIndex: number;
steps: WorkflowAction[];
workflowExecutorState: WorkflowExecutorState;
context: Record<string, unknown>;
attemptCount?: number;
workflowRunId: string;
}): Promise<WorkflowExecutorState> {
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
if (currentStepIndex >= steps.length) {
return { ...workflowExecutorState, status: WorkflowRunStatus.COMPLETED };
return {
result: {
success: true,
},
};
}
const step = steps[currentStepIndex];
const workflowAction = this.workflowActionFactory.get(step.type);
const workflowExecutor = this.workflowExecutorFactory.get(step.type);
const actionPayload = resolveInput(step.settings.input, context);
let result: WorkflowActionResult;
let actionOutput: WorkflowExecutorOutput;
try {
result = await workflowAction.execute(actionPayload);
actionOutput = await workflowExecutor.execute({
currentStepIndex,
steps,
context,
attemptCount,
workflowRunId,
});
} catch (error) {
result = {
error: {
errorType: error.name,
errorMessage: error.message,
stackTrace: error.stack,
},
actionOutput = {
error: error.message ?? 'Execution result error, no data or error',
};
}
const stepOutput = workflowExecutorState.stepsOutput?.[step.id];
const error =
result.error?.errorMessage ??
(result.result ? undefined : 'Execution result error, no data or error');
if (!error) {
if (!actionOutput.error) {
this.sendWorkflowNodeRunEvent();
}
const updatedStepOutput = {
const stepOutput: StepOutput = {
id: step.id,
outputs: [
...(stepOutput?.outputs ?? []),
{
attemptCount,
result: result.result,
error,
},
],
output: actionOutput,
};
const updatedStepsOutput = {
...workflowExecutorState.stepsOutput,
[step.id]: updatedStepOutput,
};
const updatedWorkflowExecutorState = {
...workflowExecutorState,
stepsOutput: updatedStepsOutput,
};
if (result.result) {
if (actionOutput.result) {
const updatedContext = {
...context,
[step.id]: result.result,
[step.id]: actionOutput.result,
};
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
output: {
stepsOutput: updatedStepsOutput,
},
stepOutput,
context: updatedContext,
});
@ -121,16 +94,13 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex: currentStepIndex + 1,
steps,
context: updatedContext,
workflowExecutorState: updatedWorkflowExecutorState,
});
}
if (step.settings.errorHandlingOptions.continueOnFailure.value) {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
output: {
stepsOutput: updatedStepsOutput,
},
stepOutput,
context,
});
@ -139,7 +109,6 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex: currentStepIndex + 1,
steps,
context,
workflowExecutorState: updatedWorkflowExecutorState,
});
}
@ -152,23 +121,17 @@ export class WorkflowExecutorWorkspaceService {
currentStepIndex,
steps,
context,
workflowExecutorState: updatedWorkflowExecutorState,
attemptCount: attemptCount + 1,
});
}
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
output: {
stepsOutput: updatedStepsOutput,
},
stepOutput,
context,
});
return {
...updatedWorkflowExecutorState,
status: WorkflowRunStatus.FAILED,
};
return actionOutput;
}
private sendWorkflowNodeRunEvent() {

View File

@ -67,20 +67,17 @@ export class RunWorkflowJob {
await this.throttleExecution(workflowVersion.workflowId);
const { status } = await this.workflowExecutorWorkspaceService.execute({
const { error } = await this.workflowExecutorWorkspaceService.execute({
workflowRunId,
currentStepIndex: 0,
steps: workflowVersion.steps ?? [],
steps: workflowVersion.steps,
context,
workflowExecutorState: {
stepsOutput: {},
status: WorkflowRunStatus.RUNNING,
},
});
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
status,
status: error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED,
error,
});
} catch (error) {
await this.workflowRunWorkspaceService.endWorkflowRun({

View File

@ -3,6 +3,7 @@ import { Injectable } from '@nestjs/common';
import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import {
StepOutput,
WorkflowRunOutput,
WorkflowRunStatus,
WorkflowRunWorkspaceEntity,
@ -125,11 +126,11 @@ export class WorkflowRunWorkspaceService {
async saveWorkflowRunState({
workflowRunId,
output,
stepOutput,
context,
}: {
workflowRunId: string;
output: Pick<WorkflowRunOutput, 'error' | 'stepsOutput'>;
stepOutput: StepOutput;
context: Record<string, any>;
}) {
const workflowRunRepository =
@ -154,7 +155,10 @@ export class WorkflowRunWorkspaceService {
trigger: undefined,
steps: [],
},
...output,
stepsOutput: {
...(workflowRunToUpdate.output?.stepsOutput ?? {}),
[stepOutput.id]: stepOutput.output,
},
},
context,
});