Add workflow runner (#6458)

- add workflow runner module
- add an endpoint to trigger a workflow via api
- improve error handling for serverless functions

## Testing
- create 2 serverless functions
- create a workflow
- create this workflow Version
```
{
  "type": "MANUAL",
  "input": {"b": "bb"},
  "nextAction": {
    "name": "step_1",
    "displayName": "Code",
    "type": "CODE",
    "valid": true,
    "settings": {
      "serverlessFunctionId": "Serverless function 1 Id",
      "errorHandlingOptions": {
        "retryOnFailure": {
          "value": false
        },
        "continueOnFailure": {
          "value": false
        }
      }
    },
    "nextAction": {
      "name": "step_1",
      "displayName": "Code",
      "type": "CODE",
      "valid": true,
      "settings": {
        "serverlessFunctionId": "Serverless function 1 Id",
        "errorHandlingOptions": {
          "retryOnFailure": {
            "value": false
          },
          "continueOnFailure": {
            "value": false
          }
        }
      },
      "nextAction": {
        "name": "step_1",
        "displayName": "Code",
        "type": "CODE",
        "valid": true,
        "settings": {
          "serverlessFunctionId": "Serverless function 2 Id",
          "errorHandlingOptions": {
            "retryOnFailure": {
              "value": false
            },
            "continueOnFailure": {
              "value": false
            }
          }
        }
      }
    }
  }
}

`
``
- call 
```
mutation Trigger {
  triggerWorkflow(workflowVersionId: "WORKFLOW_VERSION_ID") {
    result
  }
}
```
- try when errors are injected in serverless function
This commit is contained in:
martmull
2024-07-31 12:48:33 +02:00
committed by GitHub
parent b8496d22b6
commit 6b4c79ff0d
42 changed files with 639 additions and 150 deletions

View File

@ -13,7 +13,7 @@ import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-
import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator';
import { WORKFLOW_EVENT_LISTENER_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/standard-objects/workflow.workspace-entity';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
@WorkspaceEntity({
standardId: STANDARD_OBJECT_IDS.workflowEventListener,

View File

@ -13,21 +13,8 @@ import { WorkspaceJoinColumn } from 'src/engine/twenty-orm/decorators/workspace-
import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator';
import { WORKFLOW_VERSION_STANDARD_FIELD_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { STANDARD_OBJECT_IDS } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/standard-objects/workflow.workspace-entity';
export enum WorkflowTriggerType {
DATABASE_EVENT = 'DATABASE_EVENT',
}
export type WorkflowDatabaseEventTrigger = {
type: WorkflowTriggerType.DATABASE_EVENT;
settings: {
eventName: string;
triggerName: string;
};
};
export type WorkflowTrigger = WorkflowDatabaseEventTrigger;
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type';
@WorkspaceEntity({
standardId: STANDARD_OBJECT_IDS.workflowVersion,
@ -60,7 +47,7 @@ export class WorkflowVersionWorkspaceEntity extends BaseWorkspaceEntity {
icon: 'IconPlayerPlay',
})
@WorkspaceIsNullable()
trigger: JSON | null;
trigger: WorkflowTrigger | null;
// Relations
@WorkspaceRelation({

View File

@ -19,8 +19,8 @@ import { ActivityTargetWorkspaceEntity } from 'src/modules/activity/standard-obj
import { AttachmentWorkspaceEntity } from 'src/modules/attachment/standard-objects/attachment.workspace-entity';
import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/favorite.workspace-entity';
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/standard-objects/workflow-event-listener.workspace-entity';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/standard-objects/workflow-version.workspace-entity';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
@WorkspaceEntity({
standardId: STANDARD_OBJECT_IDS.workflow,

View File

@ -0,0 +1,14 @@
import { WorkflowSettingsType } from 'src/modules/workflow/common/types/workflow-settings.type';
export enum WorkflowActionType {
CODE = 'CODE',
}
export type WorkflowAction = {
name: string;
displayName: string;
type: WorkflowActionType;
valid: boolean;
settings: WorkflowSettingsType;
nextAction?: WorkflowAction;
};

View File

@ -0,0 +1,14 @@
export type WorkflowCodeSettingsType = {
serverlessFunctionId: string;
};
export type WorkflowSettingsType = {
errorHandlingOptions: {
retryOnFailure: {
value: boolean;
};
continueOnFailure: {
value: boolean;
};
};
} & WorkflowCodeSettingsType;

View File

@ -0,0 +1,28 @@
import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-action.type';
export enum WorkflowTriggerType {
DATABASE_EVENT = 'DATABASE_EVENT',
MANUAL = 'MANUAL',
}
type BaseTrigger = {
type: WorkflowTriggerType;
input?: object;
nextAction?: WorkflowAction;
};
export type WorkflowDatabaseEventTrigger = BaseTrigger & {
type: WorkflowTriggerType.DATABASE_EVENT;
settings: {
eventName: string;
triggerName: string;
};
};
type WorkflowManualTrigger = BaseTrigger & {
type: WorkflowTriggerType.MANUAL;
};
export type WorkflowTrigger =
| WorkflowDatabaseEventTrigger
| WorkflowManualTrigger;

View File

@ -0,0 +1,9 @@
import { Global, Module } from '@nestjs/common';
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services';
@Module({
providers: [WorkflowCommonService],
exports: [WorkflowCommonService],
})
export class WorkflowCommonModule {}

View File

@ -0,0 +1,53 @@
import { Injectable } from '@nestjs/common';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type';
@Injectable()
export class WorkflowCommonService {
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
async getWorkflowVersion(
workspaceId: string,
workflowVersionId: string,
): Promise<
Omit<WorkflowVersionWorkspaceEntity, 'trigger'> & {
trigger: WorkflowTrigger;
}
> {
const workflowVersionRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowVersionWorkspaceEntity>(
workspaceId,
'workflowVersion',
);
const workflowVersion = await workflowVersionRepository.findOne({
where: {
id: workflowVersionId,
},
});
if (!workflowVersion) {
throw new WorkflowTriggerException(
'Workflow version not found',
WorkflowTriggerExceptionCode.INVALID_INPUT,
);
}
if (!workflowVersion.trigger || !workflowVersion.trigger?.type) {
throw new WorkflowTriggerException(
'Workflow version does not contains trigger',
WorkflowTriggerExceptionCode.INVALID_WORKFLOW_VERSION,
);
}
return { ...workflowVersion, trigger: workflowVersion.trigger };
}
}

View File

@ -0,0 +1,32 @@
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services';
type RunWorkflowJobData = { workspaceId: string; workflowVersionId: string };
@Processor(MessageQueue.workflowQueue)
export class WorkflowRunnerJob {
constructor(
private readonly workflowCommonService: WorkflowCommonService,
private readonly workflowRunnerService: WorkflowRunnerService,
) {}
@Process(WorkflowRunnerJob.name)
async handle({
workspaceId,
workflowVersionId,
}: RunWorkflowJobData): Promise<void> {
const workflowVersion = await this.workflowCommonService.getWorkflowVersion(
workspaceId,
workflowVersionId,
);
await this.workflowRunnerService.run({
action: workflowVersion.trigger.nextAction,
workspaceId,
payload: workflowVersion.trigger.input,
});
}
}

View File

@ -0,0 +1,13 @@
import { Module } from '@nestjs/common';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
import { WorkflowRunnerJob } from 'src/modules/workflow/workflow-runner/workflow-runner.job';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module';
@Module({
imports: [WorkflowCommonModule, ServerlessFunctionModule],
providers: [WorkflowRunnerService, WorkflowRunnerJob],
exports: [WorkflowRunnerService],
})
export class WorkflowRunnerModule {}

View File

@ -0,0 +1,82 @@
import { Injectable } from '@nestjs/common';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
import {
WorkflowAction,
WorkflowActionType,
} from 'src/modules/workflow/common/types/workflow-action.type';
const MAX_RETRIES_ON_FAILURE = 3;
@Injectable()
export class WorkflowRunnerService {
constructor(
private readonly serverlessFunctionService: ServerlessFunctionService,
) {}
async run({
action,
workspaceId,
payload,
attemptCount = 1,
}: {
action?: WorkflowAction;
workspaceId: string;
payload?: object;
attemptCount?: number;
}) {
if (!action) {
return payload;
}
let result: object | undefined = undefined;
switch (action.type) {
case WorkflowActionType.CODE: {
const executionResult = await this.serverlessFunctionService.executeOne(
action.settings.serverlessFunctionId,
workspaceId,
payload,
);
if (executionResult.data) {
result = executionResult.data;
}
if (!executionResult.error) {
throw new Error('Execution result error, no data or error');
}
if (action.settings.errorHandlingOptions.continueOnFailure.value) {
result = payload;
break;
} else if (
action.settings.errorHandlingOptions.retryOnFailure.value &&
attemptCount < MAX_RETRIES_ON_FAILURE
) {
return await this.run({
action,
workspaceId,
payload,
attemptCount: attemptCount + 1,
});
} else {
return executionResult.error;
}
}
default:
throw new WorkflowTriggerException(
`Unknown action type '${action.type}'`,
WorkflowTriggerExceptionCode.INVALID_ACTION_TYPE,
);
}
return await this.run({
action: action.nextAction,
workspaceId,
payload: result,
});
}
}

View File

@ -11,4 +11,5 @@ export enum WorkflowTriggerExceptionCode {
INVALID_INPUT = 'INVALID_INPUT',
INVALID_WORKFLOW_TRIGGER = 'INVALID_WORKFLOW_TRIGGER',
INVALID_WORKFLOW_VERSION = 'INVALID_WORKFLOW_VERSION',
INVALID_ACTION_TYPE = 'INVALID_ACTION_TYPE',
}

View File

@ -1,59 +1,51 @@
import { Injectable } from '@nestjs/common';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/standard-objects/workflow-event-listener.workspace-entity';
import {
WorkflowDatabaseEventTrigger,
WorkflowTrigger,
WorkflowTriggerType,
WorkflowVersionWorkspaceEntity,
} from 'src/modules/workflow/standard-objects/workflow-version.workspace-entity';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
import {
WorkflowDatabaseEventTrigger,
WorkflowTriggerType,
} from 'src/modules/workflow/common/types/workflow-trigger.type';
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service';
@Injectable()
export class WorkflowTriggerService {
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly workflowCommonService: WorkflowCommonService,
private readonly workflowRunnerService: WorkflowRunnerService,
) {}
async enableWorkflowTrigger(workspaceId: string, workflowVersionId: string) {
const workflowVersionRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowVersionWorkspaceEntity>(
workspaceId,
'workflowVersion',
);
async runWorkflow(workspaceId: string, workflowVersionId: string) {
const workflowVersion = await this.workflowCommonService.getWorkflowVersion(
workspaceId,
workflowVersionId,
);
const workflowVersion = await workflowVersionRepository.findOne({
where: {
id: workflowVersionId,
},
return await this.workflowRunnerService.run({
action: workflowVersion.trigger.nextAction,
workspaceId,
payload: workflowVersion.trigger.input,
});
}
if (!workflowVersion) {
throw new WorkflowTriggerException(
'Workflow version not found',
WorkflowTriggerExceptionCode.INVALID_INPUT,
);
}
async enableWorkflowTrigger(workspaceId: string, workflowVersionId: string) {
const workflowVersion = await this.workflowCommonService.getWorkflowVersion(
workspaceId,
workflowVersionId,
);
const trigger = workflowVersion.trigger as unknown as WorkflowTrigger;
if (!trigger || !trigger?.type) {
throw new WorkflowTriggerException(
'Workflow version does not contains trigger',
WorkflowTriggerExceptionCode.INVALID_WORKFLOW_VERSION,
);
}
switch (trigger.type) {
switch (workflowVersion.trigger.type) {
case WorkflowTriggerType.DATABASE_EVENT:
await this.upsertWorkflowEventListener(
workspaceId,
workflowVersion.workflowId,
trigger,
workflowVersion.trigger,
);
break;
default:

View File

@ -0,0 +1,8 @@
import { Module } from '@nestjs/common';
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
@Module({
imports: [WorkflowRunnerModule],
})
export class WorkflowModule {}