Migrate to workspace services (#6628)

As title
This commit is contained in:
Thomas Trompette
2024-08-14 18:46:36 +02:00
committed by GitHub
parent c63c18aef1
commit 6927f46e1c
12 changed files with 85 additions and 87 deletions

View File

@ -3,10 +3,10 @@ import { Module } from '@nestjs/common';
import { WorkflowTriggerResolver } from 'src/engine/core-modules/workflow/workflow-trigger.resolver'; import { WorkflowTriggerResolver } from 'src/engine/core-modules/workflow/workflow-trigger.resolver';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module'; import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service'; import { WorkflowTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service';
@Module({ @Module({
imports: [WorkflowCommonModule, WorkflowRunnerModule], imports: [WorkflowCommonModule, WorkflowRunnerModule],
providers: [WorkflowTriggerService, WorkflowTriggerResolver], providers: [WorkflowTriggerWorkspaceService, WorkflowTriggerResolver],
}) })
export class WorkflowTriggerCoreModule {} export class WorkflowTriggerCoreModule {}

View File

@ -4,26 +4,22 @@ import { Args, Mutation, Resolver } from '@nestjs/graphql';
import { RunWorkflowVersionInput } from 'src/engine/core-modules/workflow/dtos/run-workflow-version-input.dto'; import { RunWorkflowVersionInput } from 'src/engine/core-modules/workflow/dtos/run-workflow-version-input.dto';
import { WorkflowTriggerResultDTO } from 'src/engine/core-modules/workflow/dtos/workflow-trigger-result.dto'; import { WorkflowTriggerResultDTO } from 'src/engine/core-modules/workflow/dtos/workflow-trigger-result.dto';
import { workflowTriggerGraphqlApiExceptionHandler } from 'src/engine/core-modules/workflow/utils/workflow-trigger-graphql-api-exception-handler.util'; import { workflowTriggerGraphqlApiExceptionHandler } from 'src/engine/core-modules/workflow/utils/workflow-trigger-graphql-api-exception-handler.util';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator';
import { JwtAuthGuard } from 'src/engine/guards/jwt.auth.guard'; import { JwtAuthGuard } from 'src/engine/guards/jwt.auth.guard';
import { WorkflowTriggerService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.service'; import { WorkflowTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service';
@UseGuards(JwtAuthGuard) @UseGuards(JwtAuthGuard)
@Resolver() @Resolver()
export class WorkflowTriggerResolver { export class WorkflowTriggerResolver {
constructor( constructor(
private readonly workflowTriggerService: WorkflowTriggerService, private readonly workflowTriggerWorkspaceService: WorkflowTriggerWorkspaceService,
) {} ) {}
@Mutation(() => Boolean) @Mutation(() => Boolean)
async enableWorkflowTrigger( async enableWorkflowTrigger(
@AuthWorkspace() { id: workspaceId }: Workspace,
@Args('workflowVersionId') workflowVersionId: string, @Args('workflowVersionId') workflowVersionId: string,
) { ) {
try { try {
return await this.workflowTriggerService.enableWorkflowTrigger( return await this.workflowTriggerWorkspaceService.enableWorkflowTrigger(
workspaceId,
workflowVersionId, workflowVersionId,
); );
} catch (error) { } catch (error) {
@ -33,13 +29,11 @@ export class WorkflowTriggerResolver {
@Mutation(() => WorkflowTriggerResultDTO) @Mutation(() => WorkflowTriggerResultDTO)
async runWorkflowVersion( async runWorkflowVersion(
@AuthWorkspace() { id: workspaceId }: Workspace,
@Args('input') { workflowVersionId, payload }: RunWorkflowVersionInput, @Args('input') { workflowVersionId, payload }: RunWorkflowVersionInput,
) { ) {
try { try {
return { return {
result: await this.workflowTriggerService.runWorkflowVersion( result: await this.workflowTriggerWorkspaceService.runWorkflowVersion(
workspaceId,
workflowVersionId, workflowVersionId,
payload ?? {}, payload ?? {},
), ),

View File

@ -1,9 +1,9 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workflow-common.workspace-service';
@Module({ @Module({
providers: [WorkflowCommonService], providers: [WorkflowCommonWorkspaceService],
exports: [WorkflowCommonService], exports: [WorkflowCommonWorkspaceService],
}) })
export class WorkflowCommonModule {} export class WorkflowCommonModule {}

View File

@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type'; import { WorkflowTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type';
import { import {
@ -9,22 +9,16 @@ import {
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception'; } from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
@Injectable() @Injectable()
export class WorkflowCommonService { export class WorkflowCommonWorkspaceService {
constructor( constructor(private readonly twentyORMManager: TwentyORMManager) {}
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
async getWorkflowVersion( async getWorkflowVersion(workflowVersionId: string): Promise<
workspaceId: string,
workflowVersionId: string,
): Promise<
Omit<WorkflowVersionWorkspaceEntity, 'trigger'> & { Omit<WorkflowVersionWorkspaceEntity, 'trigger'> & {
trigger: WorkflowTrigger; trigger: WorkflowTrigger;
} }
> { > {
const workflowVersionRepository = const workflowVersionRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowVersionWorkspaceEntity>( await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
workspaceId,
'workflowVersion', 'workflowVersion',
); );

View File

@ -0,0 +1,12 @@
import { CustomException } from 'src/utils/custom-exception';
export class WorkflowActionRunnerException extends CustomException {
code: WorkflowActionRunnerExceptionCode;
constructor(message: string, code: WorkflowActionRunnerExceptionCode) {
super(message, code);
}
}
export enum WorkflowActionRunnerExceptionCode {
SCOPED_WORKSPACE_NOT_FOUND = 'SCOPED_WORKSPACE_NOT_FOUND',
}

View File

@ -1,14 +1,12 @@
import { WorkflowResult } from 'src/modules/workflow/common/types/workflow-result.type';
import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-action.type'; import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-action.type';
import { WorkflowResult } from 'src/modules/workflow/common/types/workflow-result.type';
export interface WorkflowActionRunner { export interface WorkflowActionRunner {
execute({ execute({
action, action,
workspaceId,
payload, payload,
}: { }: {
action: WorkflowAction; action: WorkflowAction;
workspaceId: string;
payload?: object; payload?: object;
}): Promise<WorkflowResult>; }): Promise<WorkflowResult>;
} }

View File

@ -1,12 +1,17 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkflowActionRunnerFactory } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.factory'; import { WorkflowActionRunnerFactory } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.factory';
import { CodeWorkflowActionRunner } from 'src/modules/workflow/workflow-action-runner/workflow-action-runners/code-workflow-action-runner'; import { CodeWorkflowActionRunner } from 'src/modules/workflow/workflow-action-runner/workflow-action-runners/code-workflow-action-runner';
import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module';
@Module({ @Module({
imports: [ServerlessFunctionModule], imports: [ServerlessFunctionModule],
providers: [WorkflowActionRunnerFactory, CodeWorkflowActionRunner], providers: [
WorkflowActionRunnerFactory,
CodeWorkflowActionRunner,
ScopedWorkspaceContextFactory,
],
exports: [WorkflowActionRunnerFactory], exports: [WorkflowActionRunnerFactory],
}) })
export class WorkflowActionRunnerModule {} export class WorkflowActionRunnerModule {}

View File

@ -1,25 +1,38 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { WorkflowActionRunner } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.interface';
import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-action.type';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkflowAction } from 'src/modules/workflow/common/types/workflow-action.type';
import { WorkflowResult } from 'src/modules/workflow/common/types/workflow-result.type'; import { WorkflowResult } from 'src/modules/workflow/common/types/workflow-result.type';
import {
WorkflowActionRunnerException,
WorkflowActionRunnerExceptionCode,
} from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.exception';
import { WorkflowActionRunner } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.interface';
@Injectable() @Injectable()
export class CodeWorkflowActionRunner implements WorkflowActionRunner { export class CodeWorkflowActionRunner implements WorkflowActionRunner {
constructor( constructor(
private readonly serverlessFunctionService: ServerlessFunctionService, private readonly serverlessFunctionService: ServerlessFunctionService,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
) {} ) {}
async execute({ async execute({
action, action,
workspaceId,
payload, payload,
}: { }: {
action: WorkflowAction; action: WorkflowAction;
workspaceId: string;
payload?: object; payload?: object;
}): Promise<WorkflowResult> { }): Promise<WorkflowResult> {
const { workspaceId } = this.scopedWorkspaceContextFactory.create();
if (!workspaceId) {
throw new WorkflowActionRunnerException(
'Scoped workspace not found',
WorkflowActionRunnerExceptionCode.SCOPED_WORKSPACE_NOT_FOUND,
);
}
const result = await this.serverlessFunctionService.executeOne( const result = await this.serverlessFunctionService.executeOne(
action.settings.serverlessFunctionId, action.settings.serverlessFunctionId,
workspaceId, workspaceId,

View File

@ -4,8 +4,8 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workflow-common.workspace-service';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service';
import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service'; import { WorkflowStatusWorkspaceService } from 'src/modules/workflow/workflow-status/workflow-status.workspace-service';
export type RunWorkflowJobData = { export type RunWorkflowJobData = {
@ -18,29 +18,27 @@ export type RunWorkflowJobData = {
@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) @Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
export class WorkflowRunnerJob { export class WorkflowRunnerJob {
constructor( constructor(
private readonly workflowCommonService: WorkflowCommonService, private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
private readonly workflowRunnerService: WorkflowRunnerService, private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService,
private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService, private readonly workflowStatusWorkspaceService: WorkflowStatusWorkspaceService,
) {} ) {}
@Process(WorkflowRunnerJob.name) @Process(WorkflowRunnerJob.name)
async handle({ async handle({
workspaceId,
workflowVersionId, workflowVersionId,
workflowRunId, workflowRunId,
payload, payload,
}: RunWorkflowJobData): Promise<void> { }: RunWorkflowJobData): Promise<void> {
await this.workflowStatusWorkspaceService.startWorkflowRun(workflowRunId); await this.workflowStatusWorkspaceService.startWorkflowRun(workflowRunId);
const workflowVersion = await this.workflowCommonService.getWorkflowVersion( const workflowVersion =
workspaceId, await this.workflowCommonWorkspaceService.getWorkflowVersion(
workflowVersionId, workflowVersionId,
); );
try { try {
await this.workflowRunnerService.run({ await this.workflowRunnerWorkspaceService.run({
action: workflowVersion.trigger.nextAction, action: workflowVersion.trigger.nextAction,
workspaceId,
payload, payload,
}); });

View File

@ -3,7 +3,7 @@ import { Module } from '@nestjs/common';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowActionRunnerModule } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.module'; import { WorkflowActionRunnerModule } from 'src/modules/workflow/workflow-action-runner/workflow-action-runner.module';
import { WorkflowRunnerJob } from 'src/modules/workflow/workflow-runner/workflow-runner.job'; import { WorkflowRunnerJob } from 'src/modules/workflow/workflow-runner/workflow-runner.job';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service';
import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module'; import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workflow-status.module';
@Module({ @Module({
@ -12,7 +12,7 @@ import { WorkflowStatusModule } from 'src/modules/workflow/workflow-status/workf
WorkflowActionRunnerModule, WorkflowActionRunnerModule,
WorkflowStatusModule, WorkflowStatusModule,
], ],
providers: [WorkflowRunnerService, WorkflowRunnerJob], providers: [WorkflowRunnerWorkspaceService, WorkflowRunnerJob],
exports: [WorkflowRunnerService], exports: [WorkflowRunnerWorkspaceService],
}) })
export class WorkflowRunnerModule {} export class WorkflowRunnerModule {}

View File

@ -15,19 +15,17 @@ export type WorkflowRunOutput = {
}; };
@Injectable() @Injectable()
export class WorkflowRunnerService { export class WorkflowRunnerWorkspaceService {
constructor( constructor(
private readonly workflowActionRunnerFactory: WorkflowActionRunnerFactory, private readonly workflowActionRunnerFactory: WorkflowActionRunnerFactory,
) {} ) {}
async run({ async run({
action, action,
workspaceId,
payload, payload,
attemptCount = 1, attemptCount = 1,
}: { }: {
action?: WorkflowAction; action?: WorkflowAction;
workspaceId: string;
payload?: object; payload?: object;
attemptCount?: number; attemptCount?: number;
}): Promise<WorkflowRunOutput> { }): Promise<WorkflowRunOutput> {
@ -43,14 +41,12 @@ export class WorkflowRunnerService {
const result = await workflowActionRunner.execute({ const result = await workflowActionRunner.execute({
action, action,
workspaceId,
payload, payload,
}); });
if (result.data) { if (result.data) {
return await this.run({ return await this.run({
action: action.nextAction, action: action.nextAction,
workspaceId,
payload: result.data, payload: result.data,
}); });
} }
@ -65,7 +61,6 @@ export class WorkflowRunnerService {
if (action.settings.errorHandlingOptions.continueOnFailure.value) { if (action.settings.errorHandlingOptions.continueOnFailure.value) {
return await this.run({ return await this.run({
action: action.nextAction, action: action.nextAction,
workspaceId,
payload, payload,
}); });
} }
@ -76,7 +71,6 @@ export class WorkflowRunnerService {
) { ) {
return await this.run({ return await this.run({
action, action,
workspaceId,
payload, payload,
attemptCount: attemptCount + 1, attemptCount: attemptCount + 1,
}); });

View File

@ -1,41 +1,36 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity'; import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import { import {
WorkflowDatabaseEventTrigger, WorkflowDatabaseEventTrigger,
WorkflowTriggerType, WorkflowTriggerType,
} from 'src/modules/workflow/common/types/workflow-trigger.type'; } from 'src/modules/workflow/common/types/workflow-trigger.type';
import { WorkflowCommonService } from 'src/modules/workflow/common/workflow-common.services'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workflow-common.workspace-service';
import { WorkflowRunnerService } from 'src/modules/workflow/workflow-runner/workflow-runner.service'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service';
import { import {
WorkflowTriggerException, WorkflowTriggerException,
WorkflowTriggerExceptionCode, WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception'; } from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
@Injectable() @Injectable()
export class WorkflowTriggerService { export class WorkflowTriggerWorkspaceService {
constructor( constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly twentyORMManager: TwentyORMManager,
private readonly workflowCommonService: WorkflowCommonService, private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
private readonly workflowRunnerService: WorkflowRunnerService, private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService,
) {} ) {}
async runWorkflowVersion( async runWorkflowVersion(workflowVersionId: string, payload: object) {
workspaceId: string, const workflowVersion =
workflowVersionId: string, await this.workflowCommonWorkspaceService.getWorkflowVersion(
payload: object, workflowVersionId,
) { );
const workflowVersion = await this.workflowCommonService.getWorkflowVersion(
workspaceId,
workflowVersionId,
);
try { try {
return await this.workflowRunnerService.run({ return await this.workflowRunnerWorkspaceService.run({
action: workflowVersion.trigger.nextAction, action: workflowVersion.trigger.nextAction,
workspaceId,
payload, payload,
}); });
} catch (error) { } catch (error) {
@ -46,16 +41,15 @@ export class WorkflowTriggerService {
} }
} }
async enableWorkflowTrigger(workspaceId: string, workflowVersionId: string) { async enableWorkflowTrigger(workflowVersionId: string) {
const workflowVersion = await this.workflowCommonService.getWorkflowVersion( const workflowVersion =
workspaceId, await this.workflowCommonWorkspaceService.getWorkflowVersion(
workflowVersionId, workflowVersionId,
); );
switch (workflowVersion.trigger.type) { switch (workflowVersion.trigger.type) {
case WorkflowTriggerType.DATABASE_EVENT: case WorkflowTriggerType.DATABASE_EVENT:
await this.upsertEventListenerAndPublishVersion( await this.upsertEventListenerAndPublishVersion(
workspaceId,
workflowVersion.workflowId, workflowVersion.workflowId,
workflowVersionId, workflowVersionId,
workflowVersion.trigger, workflowVersion.trigger,
@ -69,7 +63,6 @@ export class WorkflowTriggerService {
} }
private async upsertEventListenerAndPublishVersion( private async upsertEventListenerAndPublishVersion(
workspaceId: string,
workflowId: string, workflowId: string,
workflowVersionId: string, workflowVersionId: string,
trigger: WorkflowDatabaseEventTrigger, trigger: WorkflowDatabaseEventTrigger,
@ -84,8 +77,7 @@ export class WorkflowTriggerService {
} }
const workflowEventListenerRepository = const workflowEventListenerRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowEventListenerWorkspaceEntity>( await this.twentyORMManager.getRepository<WorkflowEventListenerWorkspaceEntity>(
workspaceId,
'workflowEventListener', 'workflowEventListener',
); );
@ -94,12 +86,10 @@ export class WorkflowTriggerService {
eventName, eventName,
}); });
const workspaceDataSource = const workspaceDataSource = await this.twentyORMManager.getDatasource();
await this.twentyORMGlobalManager.getDataSourceForWorkspace(workspaceId);
const workflowRepository = const workflowRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowWorkspaceEntity>( await this.twentyORMManager.getRepository<WorkflowWorkspaceEntity>(
workspaceId,
'workflow', 'workflow',
); );