diff --git a/packages/twenty-server/src/engine/core-modules/cache-lock/__tests__/cache-lock.service.spec.ts b/packages/twenty-server/src/engine/core-modules/cache-lock/__tests__/cache-lock.service.spec.ts new file mode 100644 index 000000000..34fccb744 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/cache-lock/__tests__/cache-lock.service.spec.ts @@ -0,0 +1,108 @@ +import { Test, TestingModule } from '@nestjs/testing'; + +import { CacheLockService } from 'src/engine/core-modules/cache-lock/cache-lock.service'; +import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; +import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; + +describe('CacheLockService', () => { + let service: CacheLockService; + let cacheStorageService: jest.Mocked; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + CacheLockService, + { + provide: CacheStorageNamespace.EngineLock, + useValue: { + acquireLock: jest.fn(), + releaseLock: jest.fn(), + }, + }, + { + provide: CacheStorageService, + useValue: { + acquireLock: jest.fn(), + releaseLock: jest.fn(), + }, + }, + ], + }).compile(); + + service = module.get(CacheLockService); + cacheStorageService = module.get(CacheStorageNamespace.EngineLock); + jest.spyOn(service, 'delay').mockResolvedValue(undefined); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + it('should acquire the lock and execute the function', async () => { + cacheStorageService.acquireLock.mockResolvedValue(true); + cacheStorageService.releaseLock.mockResolvedValue(undefined); + + const fn = jest.fn().mockResolvedValue('success'); + + const ttl = 100; + + const result = await service.withLock(fn, 'key', { + ttl, + }); + + expect(result).toBe('success'); + expect(fn).toHaveBeenCalled(); + expect(cacheStorageService.acquireLock).toHaveBeenCalledTimes(1); + expect(cacheStorageService.acquireLock).toHaveBeenCalledWith('key', ttl); + expect(cacheStorageService.releaseLock).toHaveBeenCalledTimes(1); + expect(cacheStorageService.releaseLock).toHaveBeenCalledWith('key'); + }); + + it('should throw an error if lock cannot be acquired after max retries', async () => { + cacheStorageService.acquireLock.mockResolvedValue(false); + + const fn = jest.fn(); + const ms = 1; + const maxRetries = 3; + + await expect( + service.withLock(fn, 'key', { ms, maxRetries }), + ).rejects.toThrow('Failed to acquire lock for key: key'); + + expect(cacheStorageService.acquireLock).toHaveBeenCalledTimes(maxRetries); + expect(fn).not.toHaveBeenCalled(); + }); + + it('should retry before acquiring the lock', async () => { + const mockAcquireLock = cacheStorageService.acquireLock; + + mockAcquireLock + .mockResolvedValueOnce(false) + .mockResolvedValueOnce(false) + .mockResolvedValueOnce(true); + + const fn = jest.fn().mockResolvedValue('retried success'); + + const result = await service.withLock(fn, 'key', { + maxRetries: 5, + ms: 1, + }); + + expect(result).toBe('retried success'); + expect(fn).toHaveBeenCalledTimes(1); + expect(mockAcquireLock).toHaveBeenCalledTimes(3); + expect(cacheStorageService.releaseLock).toHaveBeenCalledWith('key'); + }); + + it('should release the lock even if the function throws', async () => { + cacheStorageService.acquireLock.mockResolvedValue(true); + cacheStorageService.releaseLock.mockResolvedValue(undefined); + + const fn = jest.fn().mockRejectedValue(new Error('fail')); + + await expect(service.withLock(fn, 'key')).rejects.toThrow('fail'); + + expect(fn).toHaveBeenCalled(); + expect(cacheStorageService.releaseLock).toHaveBeenCalledWith('key'); + }); +}); diff --git a/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.module.ts b/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.module.ts new file mode 100644 index 000000000..ac32dd0e6 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; + +import { CacheLockService } from 'src/engine/core-modules/cache-lock/cache-lock.service'; + +@Module({ + imports: [], + providers: [CacheLockService], + exports: [CacheLockService], +}) +export class CacheLockModule {} diff --git a/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.service.ts b/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.service.ts new file mode 100644 index 000000000..4e117cc38 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.service.ts @@ -0,0 +1,47 @@ +import { Injectable } from '@nestjs/common'; + +import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; +import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; + +export type CacheLockOptions = { + ms?: number; + maxRetries?: number; + ttl?: number; +}; + +@Injectable() +export class CacheLockService { + constructor( + @InjectCacheStorage(CacheStorageNamespace.EngineLock) + private readonly cacheStorageService: CacheStorageService, + ) {} + + async delay(ms: number) { + return new Promise((res) => setTimeout(res, ms)); + } + + async withLock( + fn: () => Promise, + key: string, + options?: CacheLockOptions, + ): Promise { + const { ms = 50, maxRetries = 20, ttl = 500 } = options || {}; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + const acquired = await this.cacheStorageService.acquireLock(key, ttl); + + if (acquired) { + try { + return await fn(); + } finally { + await this.cacheStorageService.releaseLock(key); + } + } + + await this.delay(ms); + } + + throw new Error(`Failed to acquire lock for key: ${key}`); + } +} diff --git a/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts b/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts index 70fe45a48..3b3b84824 100644 --- a/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts +++ b/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts @@ -122,6 +122,29 @@ export class CacheStorageService { } while (cursor !== 0); } + async acquireLock(key: string, ttl = 1000): Promise { + if (!this.isRedisCache()) { + throw new Error('acquireLock is only supported with Redis cache'); + } + + const redisClient = (this.cache as RedisCache).store.client; + + const result = await redisClient.set(this.getKey(key), 'lock', { + NX: true, + PX: ttl, + }); + + return result === 'OK'; + } + + async releaseLock(key: string): Promise { + if (!this.isRedisCache()) { + throw new Error('releaseLock is only supported with Redis cache'); + } + + await this.del(key); + } + private isRedisCache() { // eslint-disable-next-line @typescript-eslint/no-explicit-any return (this.cache.store as any)?.name === 'redis'; diff --git a/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts b/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts index b135a2822..983c5db47 100644 --- a/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts +++ b/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts @@ -3,5 +3,6 @@ export enum CacheStorageNamespace { ModuleCalendar = 'module:calendar', ModuleWorkflow = 'module:workflow', EngineWorkspace = 'engine:workspace', + EngineLock = 'engine:lock', EngineHealth = 'engine:health', } diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index ebfa01d79..c85a55c3d 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -28,7 +28,7 @@ import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/f import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; import { WorkflowRunStepInfo } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; @@ -43,7 +43,7 @@ export enum WorkflowRunStatus { export type StepOutput = { id: string; - output: WorkflowExecutorOutput; + output: WorkflowActionOutput; }; export type WorkflowRunOutput = { @@ -51,7 +51,7 @@ export type WorkflowRunOutput = { trigger: WorkflowTrigger; steps: WorkflowAction[]; }; - stepsOutput?: Record; + stepsOutput?: Record; error?: string; }; diff --git a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts index 0b3412999..ba3b198dd 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts @@ -336,16 +336,10 @@ export class WorkflowVersionStepWorkspaceService { }, }; - const updatedContext = { - ...workflowRun.context, - [stepId]: enrichedResponse, - }; - await this.workflowRunWorkspaceService.saveWorkflowRunState({ workspaceId, workflowRunId, stepOutput: newStepOutput, - context: updatedContext, stepStatus: StepStatus.SUCCESS, }); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-executor.factory.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-action.factory.ts similarity index 94% rename from packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-executor.factory.ts rename to packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-action.factory.ts index f7ed39808..59f4b91a2 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-executor.factory.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-action.factory.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { WorkflowStepExecutorException, @@ -19,7 +19,7 @@ import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-execut import { WorkflowActionType } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; @Injectable() -export class WorkflowExecutorFactory { +export class WorkflowActionFactory { constructor( private readonly codeWorkflowAction: CodeWorkflowAction, private readonly sendEmailWorkflowAction: SendEmailWorkflowAction, @@ -33,7 +33,7 @@ export class WorkflowExecutorFactory { private readonly aiAgentWorkflowAction: AiAgentWorkflowAction, ) {} - get(stepType: WorkflowActionType): WorkflowExecutor { + get(stepType: WorkflowActionType): WorkflowAction { switch (stepType) { case WorkflowActionType.CODE: return this.codeWorkflowAction; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts new file mode 100644 index 000000000..909462748 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts @@ -0,0 +1,8 @@ +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; + +export interface WorkflowAction { + execute( + workflowActionInput: WorkflowActionInput, + ): Promise; +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts deleted file mode 100644 index 53aa949a4..000000000 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; - -export interface WorkflowExecutor { - execute( - workflowExecutorInput: WorkflowExecutorInput, - ): Promise; -} diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-input.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-input.ts new file mode 100644 index 000000000..c7cc9a699 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-input.ts @@ -0,0 +1,7 @@ +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export type WorkflowActionInput = { + currentStepId: string; + steps: WorkflowAction[]; + context: Record; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-output.type.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-output.type.ts similarity index 62% rename from packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-output.type.ts rename to packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-output.type.ts index 5f8ffc213..46fd0fa21 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-output.type.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-output.type.ts @@ -1,4 +1,4 @@ -export type WorkflowExecutorOutput = { +export type WorkflowActionOutput = { result?: object; error?: string; pendingEvent?: boolean; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts index 84e061dba..ff44471f7 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts @@ -1,9 +1,12 @@ -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; - export type WorkflowExecutorInput = { - currentStepId: string; - steps: WorkflowAction[]; - context: Record; + stepIds: string[]; workflowRunId: string; - attemptCount?: number; + workspaceId: string; +}; + +export type WorkflowBranchExecutorInput = { + stepId: string; + attemptCount?: number; + workflowRunId: string; + workspaceId: string; }; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts new file mode 100644 index 000000000..4a565771a --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts @@ -0,0 +1,90 @@ +import { + WorkflowAction, + WorkflowActionType, +} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils'; + +describe('canExecuteStep', () => { + const steps = [ + { + id: 'step-1', + type: WorkflowActionType.CODE, + settings: { + errorHandlingOptions: { + continueOnFailure: { value: false }, + retryOnFailure: { value: false }, + }, + }, + nextStepIds: ['step-3'], + }, + { + id: 'step-2', + type: WorkflowActionType.SEND_EMAIL, + settings: { + errorHandlingOptions: { + continueOnFailure: { value: false }, + retryOnFailure: { value: false }, + }, + }, + nextStepIds: ['step-3'], + }, + { + id: 'step-3', + type: WorkflowActionType.SEND_EMAIL, + settings: { + errorHandlingOptions: { + continueOnFailure: { value: false }, + retryOnFailure: { value: false }, + }, + }, + nextStepIds: [], + }, + ] as WorkflowAction[]; + + it('should return true if all parents succeeded', () => { + const context = { + trigger: 'trigger result', + 'step-1': 'step-1 result', + 'step-2': 'step-2 result', + }; + + const result = canExecuteStep({ context, steps, stepId: 'step-3' }); + + expect(result).toBe(true); + }); + + it('should return false if one parent is not succeeded', () => { + expect( + canExecuteStep({ + context: { + trigger: 'trigger result', + 'step-2': 'step-2 result', + }, + steps, + stepId: 'step-3', + }), + ).toBe(false); + + expect( + canExecuteStep({ + context: { + trigger: 'trigger result', + 'step-1': 'step-1 result', + }, + steps, + stepId: 'step-3', + }), + ).toBe(false); + + expect( + canExecuteStep({ + context: { + trigger: 'trigger result', + 'step-1': {}, + }, + steps, + stepId: 'step-3', + }), + ).toBe(false); + }); +}); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.utils.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.utils.ts new file mode 100644 index 000000000..81f31eb7f --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.utils.ts @@ -0,0 +1,23 @@ +import { isDefined } from 'twenty-shared/utils'; + +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export const canExecuteStep = ({ + context, + stepId, + steps, +}: { + steps: WorkflowAction[]; + context: Record; + stepId: string; +}) => { + const parentSteps = steps.filter( + (parentStep) => + isDefined(parentStep) && parentStep.nextStepIds?.includes(stepId), + ); + + // TODO use workflowRun.state to check if step status is not COMPLETED. Return false in this case + return parentSteps.every((parentStep) => + Object.keys(context).includes(parentStep.id), + ); +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent.workflow-action.ts index 480665006..fb235ce58 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent.workflow-action.ts @@ -3,7 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { AIBillingService } from 'src/engine/core-modules/ai/services/ai-billing.service'; import { AgentExecutionService } from 'src/engine/metadata-modules/agent/agent-execution.service'; @@ -16,13 +16,13 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { isWorkflowAiAgentAction } from './guards/is-workflow-ai-agent-action.guard'; @Injectable() -export class AiAgentWorkflowAction implements WorkflowExecutor { +export class AiAgentWorkflowAction implements WorkflowAction { constructor( private readonly agentExecutionService: AgentExecutionService, private readonly aiBillingService: AIBillingService, @@ -34,7 +34,7 @@ export class AiAgentWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts index 2ee659004..0a0381b79 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; @@ -8,14 +8,14 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { isWorkflowCodeAction } from 'src/modules/workflow/workflow-executor/workflow-actions/code/guards/is-workflow-code-action.guard'; import { WorkflowCodeActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/code/types/workflow-code-action-input.type'; @Injectable() -export class CodeWorkflowAction implements WorkflowExecutor { +export class CodeWorkflowAction implements WorkflowAction { constructor( private readonly serverlessFunctionService: ServerlessFunctionService, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, @@ -25,7 +25,7 @@ export class CodeWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/filter/filter.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/filter/filter.workflow-action.ts index 8be64e4f6..00a603c9c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/filter/filter.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/filter/filter.workflow-action.ts @@ -1,20 +1,20 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { isWorkflowFilterAction } from 'src/modules/workflow/workflow-executor/workflow-actions/filter/guards/is-workflow-filter-action.guard'; import { evaluateFilterConditions } from 'src/modules/workflow/workflow-executor/workflow-actions/filter/utils/evaluate-filter-conditions.util'; @Injectable() -export class FilterWorkflowAction implements WorkflowExecutor { - async execute(input: WorkflowExecutorInput): Promise { +export class FilterWorkflowAction implements WorkflowAction { + async execute(input: WorkflowActionInput): Promise { const { currentStepId, steps, context } = input; const step = steps.find((step) => step.id === currentStepId); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/form/form.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/form/form.workflow-action.ts index bed01343c..0a5f3d5f2 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/form/form.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/form/form.workflow-action.ts @@ -1,21 +1,21 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { isWorkflowFormAction } from 'src/modules/workflow/workflow-executor/workflow-actions/form/guards/is-workflow-form-action.guard'; @Injectable() -export class FormWorkflowAction implements WorkflowExecutor { +export class FormWorkflowAction implements WorkflowAction { async execute({ currentStepId, steps, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/http-request/http-request.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/http-request/http-request.workflow-action.ts index 181d03468..787ec14a8 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/http-request/http-request.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/http-request/http-request.workflow-action.ts @@ -3,26 +3,26 @@ import { Injectable } from '@nestjs/common'; import { isString } from '@sniptt/guards'; import axios, { AxiosRequestConfig } from 'axios'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { isWorkflowHttpRequestAction } from './guards/is-workflow-http-request-action.guard'; import { WorkflowHttpRequestActionInput } from './types/workflow-http-request-action-input.type'; @Injectable() -export class HttpRequestWorkflowAction implements WorkflowExecutor { +export class HttpRequestWorkflowAction implements WorkflowAction { async execute({ currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts index a95b31d62..c6e264f87 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts @@ -5,7 +5,7 @@ import { JSDOM } from 'jsdom'; import { isDefined, isValidUuid } from 'twenty-shared/utils'; import { z } from 'zod'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; @@ -15,8 +15,8 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { SendEmailActionException, @@ -30,7 +30,7 @@ export type WorkflowSendEmailStepOutputSchema = { }; @Injectable() -export class SendEmailWorkflowAction implements WorkflowExecutor { +export class SendEmailWorkflowAction implements WorkflowAction { private readonly logger = new Logger(SendEmailWorkflowAction.name); constructor( private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, @@ -78,7 +78,7 @@ export class SendEmailWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts index 1f811508f..0e9c168c8 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts @@ -4,7 +4,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { isDefined } from 'class-validator'; import { Repository } from 'typeorm'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { RecordPositionService } from 'src/engine/core-modules/record-position/services/record-position.service'; @@ -19,8 +19,8 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, @@ -30,7 +30,7 @@ import { isWorkflowCreateRecordAction } from 'src/modules/workflow/workflow-exec import { WorkflowCreateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; @Injectable() -export class CreateRecordWorkflowAction implements WorkflowExecutor { +export class CreateRecordWorkflowAction implements WorkflowAction { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, @InjectRepository(ObjectMetadataEntity, 'core') @@ -46,7 +46,7 @@ export class CreateRecordWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts index 904d651e0..401a07fa0 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts @@ -5,7 +5,7 @@ import { isDefined } from 'class-validator'; import { isValidUuid } from 'twenty-shared/utils'; import { Repository } from 'typeorm'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; @@ -16,8 +16,8 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, @@ -27,7 +27,7 @@ import { isWorkflowDeleteRecordAction } from 'src/modules/workflow/workflow-exec import { WorkflowDeleteRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; @Injectable() -export class DeleteRecordWorkflowAction implements WorkflowExecutor { +export class DeleteRecordWorkflowAction implements WorkflowAction { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, @InjectRepository(ObjectMetadataEntity, 'core') @@ -40,7 +40,7 @@ export class DeleteRecordWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts index b5ab53354..11d07a034 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts @@ -9,7 +9,7 @@ import { ObjectRecordOrderBy, OrderByDirection, } from 'src/engine/api/graphql/workspace-query-builder/interfaces/object-record.interface'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { GraphqlQueryParser } from 'src/engine/api/graphql/graphql-query-runner/graphql-query-parsers/graphql-query.parser'; import { ObjectMetadataItemWithFieldMaps } from 'src/engine/metadata-modules/types/object-metadata-item-with-field-maps'; @@ -23,8 +23,8 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, @@ -34,7 +34,7 @@ import { isWorkflowFindRecordsAction } from 'src/modules/workflow/workflow-execu import { WorkflowFindRecordsActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; @Injectable() -export class FindRecordsWorkflowAction implements WorkflowExecutor { +export class FindRecordsWorkflowAction implements WorkflowAction { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, @@ -45,7 +45,7 @@ export class FindRecordsWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts index eb5a9fee6..39ea5dd5b 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts @@ -5,7 +5,7 @@ import deepEqual from 'deep-equal'; import { isDefined, isValidUuid } from 'twenty-shared/utils'; import { Repository } from 'typeorm'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { objectRecordChangedValues } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-values'; @@ -20,8 +20,8 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, @@ -31,7 +31,7 @@ import { isWorkflowUpdateRecordAction } from 'src/modules/workflow/workflow-exec import { WorkflowUpdateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; @Injectable() -export class UpdateRecordWorkflowAction implements WorkflowExecutor { +export class UpdateRecordWorkflowAction implements WorkflowAction { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, @@ -46,7 +46,7 @@ export class UpdateRecordWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts index 4ae954445..ef6cd25db 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts @@ -4,7 +4,7 @@ import { BillingModule } from 'src/engine/core-modules/billing/billing.module'; import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; -import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory'; +import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; import { AiAgentActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent-action.module'; import { CodeActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/code/code-action.module'; import { FilterActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/filter/filter-action.module'; @@ -32,7 +32,7 @@ import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow providers: [ WorkflowExecutorWorkspaceService, ScopedWorkspaceContextFactory, - WorkflowExecutorFactory, + WorkflowActionFactory, ], exports: [WorkflowExecutorWorkspaceService], }) diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts index f38520e67..ff1b75f3e 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts @@ -4,9 +4,8 @@ import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/ import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant'; import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; import { BillingService } from 'src/engine/core-modules/billing/services/billing.service'; -import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; -import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory'; +import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; import { WorkflowAction, WorkflowActionType, @@ -14,10 +13,26 @@ import { import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; +import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils'; + +jest.mock( + 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils', + () => { + const actual = jest.requireActual( + 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils', + ); + + return { + ...actual, + canExecuteStep: jest.fn().mockReturnValue(true), // default behavior + }; + }, +); describe('WorkflowExecutorWorkspaceService', () => { let service: WorkflowExecutorWorkspaceService; - let workflowExecutorFactory: WorkflowExecutorFactory; + let workflowActionFactory: WorkflowActionFactory; let workspaceEventEmitter: WorkspaceEventEmitter; let workflowRunWorkspaceService: WorkflowRunWorkspaceService; @@ -29,22 +44,16 @@ describe('WorkflowExecutorWorkspaceService', () => { emitCustomBatchEvent: jest.fn(), }; - const mockScopedWorkspaceContext = { - workspaceId: 'workspace-id', - }; - - const mockScopedWorkspaceContextFactory = { - create: jest.fn().mockReturnValue(mockScopedWorkspaceContext), - }; - const mockWorkflowRunWorkspaceService = { - saveWorkflowRunState: jest.fn(), + endWorkflowRun: jest.fn(), updateWorkflowRunStepStatus: jest.fn(), + saveWorkflowRunState: jest.fn(), + getWorkflowRun: jest.fn(), }; const mockBillingService = { - isBillingEnabled: jest.fn(), - canBillMeteredProduct: jest.fn(), + isBillingEnabled: jest.fn().mockReturnValue(true), + canBillMeteredProduct: jest.fn().mockReturnValue(true), }; beforeEach(async () => { @@ -54,7 +63,7 @@ describe('WorkflowExecutorWorkspaceService', () => { providers: [ WorkflowExecutorWorkspaceService, { - provide: WorkflowExecutorFactory, + provide: WorkflowActionFactory, useValue: { get: jest.fn().mockReturnValue(mockWorkflowExecutor), }, @@ -63,10 +72,6 @@ describe('WorkflowExecutorWorkspaceService', () => { provide: WorkspaceEventEmitter, useValue: mockWorkspaceEventEmitter, }, - { - provide: ScopedWorkspaceContextFactory, - useValue: mockScopedWorkspaceContextFactory, - }, { provide: WorkflowRunWorkspaceService, useValue: mockWorkflowRunWorkspaceService, @@ -81,8 +86,8 @@ describe('WorkflowExecutorWorkspaceService', () => { service = module.get( WorkflowExecutorWorkspaceService, ); - workflowExecutorFactory = module.get( - WorkflowExecutorFactory, + workflowActionFactory = module.get( + WorkflowActionFactory, ); workspaceEventEmitter = module.get( WorkspaceEventEmitter, @@ -94,7 +99,8 @@ describe('WorkflowExecutorWorkspaceService', () => { describe('execute', () => { const mockWorkflowRunId = 'workflow-run-id'; - const mockContext = { data: 'some-data' }; + const mockWorkspaceId = 'workspace-id'; + const mockContext = { trigger: 'trigger-result' }; const mockSteps = [ { id: 'step-1', @@ -120,20 +126,9 @@ describe('WorkflowExecutorWorkspaceService', () => { }, ] as WorkflowAction[]; - it('should return success when all steps are completed', async () => { - // No steps to execute - const result = await service.execute({ - workflowRunId: mockWorkflowRunId, - currentStepId: 'step-2', - steps: mockSteps, - context: mockContext, - }); - - expect(result).toEqual({ - result: { - success: true, - }, - }); + mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({ + output: { flow: { steps: mockSteps } }, + context: mockContext, }); it('should execute a step and continue to the next step on success', async () => { @@ -143,24 +138,22 @@ describe('WorkflowExecutorWorkspaceService', () => { mockWorkflowExecutor.execute.mockResolvedValueOnce(mockStepResult); - const result = await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, + }); + + expect(workflowActionFactory.get).toHaveBeenCalledWith( + WorkflowActionType.CODE, + ); + + expect(mockWorkflowExecutor.execute).toHaveBeenCalledWith({ currentStepId: 'step-1', steps: mockSteps, context: mockContext, }); - // execute first step - expect(workflowExecutorFactory.get).toHaveBeenCalledWith( - WorkflowActionType.CODE, - ); - expect(mockWorkflowExecutor.execute).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: mockSteps, - context: mockContext, - attemptCount: 1, - }); expect(workspaceEventEmitter.emitCustomBatchEvent).toHaveBeenCalledWith( BILLING_FEATURE_USED, [ @@ -171,9 +164,11 @@ describe('WorkflowExecutorWorkspaceService', () => { ], 'workspace-id', ); + expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledTimes(2); + expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ @@ -182,9 +177,11 @@ describe('WorkflowExecutorWorkspaceService', () => { workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledTimes(2); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ @@ -193,20 +190,12 @@ describe('WorkflowExecutorWorkspaceService', () => { id: 'step-1', output: mockStepResult, }, - context: { - data: 'some-data', - 'step-1': { - stepOutput: 'success', - }, - }, workspaceId: 'workspace-id', stepStatus: StepStatus.SUCCESS, }); - expect(result).toEqual({ result: { success: true } }); - // execute second step - expect(workflowExecutorFactory.get).toHaveBeenCalledWith( + expect(workflowActionFactory.get).toHaveBeenCalledWith( WorkflowActionType.SEND_EMAIL, ); }); @@ -216,20 +205,18 @@ describe('WorkflowExecutorWorkspaceService', () => { new Error('Step execution failed'), ); - const result = await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: mockSteps, - context: mockContext, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, }); - expect(result).toEqual({ - error: 'Step execution failed', - }); expect(workspaceEventEmitter.emitCustomBatchEvent).not.toHaveBeenCalled(); + expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledTimes(1); + expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ @@ -238,9 +225,11 @@ describe('WorkflowExecutorWorkspaceService', () => { workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledTimes(1); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ @@ -251,7 +240,6 @@ describe('WorkflowExecutorWorkspaceService', () => { error: 'Step execution failed', }, }, - context: mockContext, workspaceId: 'workspace-id', stepStatus: StepStatus.FAILED, }); @@ -264,17 +252,16 @@ describe('WorkflowExecutorWorkspaceService', () => { mockWorkflowExecutor.execute.mockResolvedValueOnce(mockPendingEvent); - const result = await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: mockSteps, - context: mockContext, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, }); - expect(result).toEqual(mockPendingEvent); expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledTimes(1); + expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ @@ -283,9 +270,11 @@ describe('WorkflowExecutorWorkspaceService', () => { workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledTimes(1); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ @@ -294,13 +283,12 @@ describe('WorkflowExecutorWorkspaceService', () => { id: 'step-1', output: mockPendingEvent, }, - context: mockContext, workspaceId: 'workspace-id', stepStatus: StepStatus.PENDING, }); // No recursive call to execute should happen - expect(workflowExecutorFactory.get).not.toHaveBeenCalledWith( + expect(workflowActionFactory.get).not.toHaveBeenCalledWith( WorkflowActionType.SEND_EMAIL, ); }); @@ -330,15 +318,19 @@ describe('WorkflowExecutorWorkspaceService', () => { }, ] as WorkflowAction[]; + mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValueOnce({ + output: { flow: { steps: stepsWithContinueOnFailure } }, + context: mockContext, + }); + mockWorkflowExecutor.execute.mockResolvedValueOnce({ error: 'Step execution failed but continue', }); - const result = await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: stepsWithContinueOnFailure, - context: mockContext, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, }); expect( @@ -368,14 +360,12 @@ describe('WorkflowExecutorWorkspaceService', () => { error: 'Step execution failed but continue', }, }, - context: mockContext, workspaceId: 'workspace-id', stepStatus: StepStatus.FAILED, }); - expect(result).toEqual({ result: { success: true } }); // execute second step - expect(workflowExecutorFactory.get).toHaveBeenCalledWith( + expect(workflowActionFactory.get).toHaveBeenCalledWith( WorkflowActionType.SEND_EMAIL, ); }); @@ -394,122 +384,86 @@ describe('WorkflowExecutorWorkspaceService', () => { }, ] as WorkflowAction[]; - mockWorkflowExecutor.execute.mockResolvedValueOnce({ + mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({ + output: { flow: { steps: stepsWithRetryOnFailure } }, + context: mockContext, + }); + + mockWorkflowExecutor.execute.mockResolvedValue({ error: 'Step execution failed, will retry', }); - await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: stepsWithRetryOnFailure, - context: mockContext, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, }); - // Should call execute again with increased attemptCount - expect(workflowExecutorFactory.get).toHaveBeenCalledWith( - WorkflowActionType.CODE, - ); - expect(workflowExecutorFactory.get).not.toHaveBeenCalledWith( + for (let attempt = 1; attempt <= 3; attempt++) { + expect(workflowActionFactory.get).toHaveBeenNthCalledWith( + attempt, + WorkflowActionType.CODE, + ); + } + + expect(workflowActionFactory.get).not.toHaveBeenCalledWith( WorkflowActionType.SEND_EMAIL, ); - expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(2); - }); - - it('should stop retrying after MAX_RETRIES_ON_FAILURE', async () => { - const stepsWithRetryOnFailure = [ - { - id: 'step-1', - type: WorkflowActionType.CODE, - settings: { - errorHandlingOptions: { - continueOnFailure: { value: false }, - retryOnFailure: { value: true }, - }, - }, - }, - ] as WorkflowAction[]; - - const errorOutput = { - error: 'Step execution failed, max retries reached', - }; - - mockWorkflowExecutor.execute.mockResolvedValueOnce(errorOutput); - - const result = await service.execute({ - workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: stepsWithRetryOnFailure, - context: mockContext, - attemptCount: 3, // MAX_RETRIES_ON_FAILURE is 3 - }); - - // Should not retry anymore - expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(1); - expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledTimes(1); - expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - stepId: 'step-1', - workspaceId: 'workspace-id', - stepStatus: StepStatus.RUNNING, - }); - expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(1); - expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: errorOutput, - }, - context: mockContext, - workspaceId: 'workspace-id', - stepStatus: StepStatus.FAILED, - }); - expect(result).toEqual(errorOutput); }); it('should stop when billing validation fails', async () => { mockBillingService.isBillingEnabled.mockReturnValueOnce(true); mockBillingService.canBillMeteredProduct.mockReturnValueOnce(false); - const result = await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: mockSteps, - context: mockContext, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, }); - expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(1); + expect(workflowActionFactory.get).toHaveBeenCalledTimes(0); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledTimes(1); - expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).not.toHaveBeenCalled(); + + expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledTimes( + 1, + ); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, + workspaceId: 'workspace-id', stepOutput: { id: 'step-1', output: { error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, }, }, - context: mockContext, - workspaceId: 'workspace-id', stepStatus: StepStatus.FAILED, }); - expect(result).toEqual({ + + expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledWith({ + workflowRunId: mockWorkflowRunId, + workspaceId: 'workspace-id', + status: WorkflowRunStatus.FAILED, error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, }); }); + + it('should return if step should not be executed', async () => { + (canExecuteStep as jest.Mock).mockReturnValueOnce(false); + + await service.executeFromSteps({ + workflowRunId: mockWorkflowRunId, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, + }); + + expect(workflowActionFactory.get).not.toHaveBeenCalled(); + }); }); describe('sendWorkflowNodeRunEvent', () => { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index 5eeaaae97..3c918b331 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -2,112 +2,103 @@ import { Injectable } from '@nestjs/common'; import { isDefined } from 'twenty-shared/utils'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; - import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant'; import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant'; import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum'; import { BillingService } from 'src/engine/core-modules/billing/services/billing.service'; import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type'; -import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { StepOutput, - WorkflowRunOutput, WorkflowRunStatus, } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; -import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; -import { - WorkflowTriggerException, - WorkflowTriggerExceptionCode, -} from 'src/modules/workflow/workflow-trigger/exceptions/workflow-trigger.exception'; import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; +import { + WorkflowBranchExecutorInput, + WorkflowExecutorInput, +} from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; +import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils'; const MAX_RETRIES_ON_FAILURE = 3; -export type WorkflowExecutorState = { - stepsOutput: WorkflowRunOutput['stepsOutput']; - status: WorkflowRunStatus; -}; - @Injectable() -export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { +export class WorkflowExecutorWorkspaceService { constructor( - private readonly workflowExecutorFactory: WorkflowExecutorFactory, + private readonly workflowActionFactory: WorkflowActionFactory, private readonly workspaceEventEmitter: WorkspaceEventEmitter, - private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, private readonly billingService: BillingService, ) {} - async execute({ - currentStepId, - steps, - context, + async executeFromSteps({ + stepIds, + workflowRunId, + workspaceId, + }: WorkflowExecutorInput) { + await Promise.all( + stepIds.map(async (stepIdToExecute) => { + await this.executeFromStep({ + stepId: stepIdToExecute, + workflowRunId, + workspaceId, + }); + }), + ); + } + + private async executeFromStep({ + stepId, attemptCount = 1, workflowRunId, - }: WorkflowExecutorInput): Promise { - const step = steps.find((step) => step.id === currentStepId); + workspaceId, + }: WorkflowBranchExecutorInput) { + const workflowRunInfo = await this.getWorkflowRunInfoOrEndWorkflowRun({ + stepId: stepId, + workflowRunId, + workspaceId, + }); - if (!step) { - return { - error: 'Step not found', - }; + if (!isDefined(workflowRunInfo)) { + return; } - const workflowExecutor = this.workflowExecutorFactory.get(step.type); + const { stepToExecute, steps, context } = workflowRunInfo; - let actionOutput: WorkflowExecutorOutput; - - const { workspaceId } = this.scopedWorkspaceContextFactory.create(); - - if (!workspaceId) { - throw new WorkflowTriggerException( - 'No workspace id found', - WorkflowTriggerExceptionCode.INTERNAL_ERROR, - ); + if (!canExecuteStep({ stepId: stepToExecute.id, steps, context })) { + return; } - if ( - this.billingService.isBillingEnabled() && - !(await this.canBillWorkflowNodeExecution(workspaceId)) - ) { - const billingOutput = { - error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, - }; - - await this.workflowRunWorkspaceService.saveWorkflowRunState({ - workspaceId, + const checkCanBillWorkflowNodeExecution = + await this.checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({ + stepIdToExecute: stepToExecute.id, workflowRunId, - stepOutput: { - id: step.id, - output: billingOutput, - }, - context, - stepStatus: StepStatus.FAILED, + workspaceId, }); - return billingOutput; + if (!checkCanBillWorkflowNodeExecution) { + return; } + const workflowAction = this.workflowActionFactory.get(stepToExecute.type); + + let actionOutput: WorkflowActionOutput; + await this.workflowRunWorkspaceService.updateWorkflowRunStepStatus({ workflowRunId, - stepId: step.id, + stepId: stepToExecute.id, workspaceId, stepStatus: StepStatus.RUNNING, }); try { - actionOutput = await workflowExecutor.execute({ - currentStepId, + actionOutput = await workflowAction.execute({ + currentStepId: stepId, steps, context, - attemptCount, - workflowRunId, }); } catch (error) { actionOutput = { @@ -120,7 +111,7 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { } const stepOutput: StepOutput = { - id: step.id, + id: stepToExecute.id, output: actionOutput, }; @@ -128,73 +119,145 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, stepOutput, - context, workspaceId, stepStatus: StepStatus.PENDING, }); - return actionOutput; + return; } const actionOutputSuccess = isDefined(actionOutput.result); const shouldContinue = actionOutputSuccess || - step.settings.errorHandlingOptions.continueOnFailure.value; + stepToExecute.settings.errorHandlingOptions.continueOnFailure.value; if (shouldContinue) { - const updatedContext = isDefined(actionOutput.result) - ? { - ...context, - [step.id]: actionOutput.result, - } - : context; - await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, stepOutput, - context: updatedContext, workspaceId, stepStatus: isDefined(actionOutput.result) ? StepStatus.SUCCESS : StepStatus.FAILED, }); - if (!isDefined(step.nextStepIds?.[0])) { - return actionOutput; + if ( + !isDefined(stepToExecute.nextStepIds) || + stepToExecute.nextStepIds.length === 0 + ) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.COMPLETED, + }); + + return; } - // TODO: handle multiple next steps - return await this.execute({ + await this.executeFromSteps({ + stepIds: stepToExecute.nextStepIds, workflowRunId, - currentStepId: step.nextStepIds[0], - steps, - context: updatedContext, + workspaceId, }); + + return; } if ( - step.settings.errorHandlingOptions.retryOnFailure.value && + stepToExecute.settings.errorHandlingOptions.retryOnFailure.value && attemptCount < MAX_RETRIES_ON_FAILURE ) { - return await this.execute({ - workflowRunId, - currentStepId, - steps, - context, + await this.executeFromStep({ + stepId, attemptCount: attemptCount + 1, + workflowRunId, + workspaceId, }); + + return; } await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, stepOutput, - context, workspaceId, stepStatus: StepStatus.FAILED, }); - return actionOutput; + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: stepOutput.output.error, + }); + } + + private async getWorkflowRunInfoOrEndWorkflowRun({ + stepId, + workflowRunId, + workspaceId, + }: { + stepId: string; + workflowRunId: string; + workspaceId: string; + }) { + const workflowRun = await this.workflowRunWorkspaceService.getWorkflowRun({ + workflowRunId, + workspaceId, + }); + + if (!isDefined(workflowRun)) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: `WorkflowRun ${workflowRunId} not found`, + }); + + return; + } + + const steps = workflowRun.output?.flow.steps; + + const context = workflowRun.context; + + if (!isDefined(steps)) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: 'Steps undefined', + }); + + return; + } + + if (!isDefined(context)) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: 'Context not found', + }); + + return; + } + + const stepToExecute = steps.find((step) => step.id === stepId); + + if (!stepToExecute) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: 'Step not found', + }); + + return; + } + + return { stepToExecute, steps, context }; } private sendWorkflowNodeRunEvent(workspaceId: string) { @@ -210,10 +273,45 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { ); } - private async canBillWorkflowNodeExecution(workspaceId: string) { - return this.billingService.canBillMeteredProduct( - workspaceId, - BillingProductKey.WORKFLOW_NODE_EXECUTION, - ); + private async checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({ + stepIdToExecute, + workflowRunId, + workspaceId, + }: { + stepIdToExecute: string; + workflowRunId: string; + workspaceId: string; + }) { + const canBillWorkflowNodeExecution = + !this.billingService.isBillingEnabled() || + (await this.billingService.canBillMeteredProduct( + workspaceId, + BillingProductKey.WORKFLOW_NODE_EXECUTION, + )); + + if (!canBillWorkflowNodeExecution) { + const billingOutput = { + error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, + }; + + await this.workflowRunWorkspaceService.saveWorkflowRunState({ + workspaceId, + workflowRunId, + stepOutput: { + id: stepIdToExecute, + output: billingOutput, + }, + stepStatus: StepStatus.FAILED, + }); + + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, + }); + } + + return canBillWorkflowNodeExecution; } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index 5ce119bf4..c3cf95fe5 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -1,5 +1,7 @@ import { Scope } from '@nestjs/common'; +import { isDefined } from 'twenty-shared/utils'; + import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; @@ -9,7 +11,6 @@ import { ThrottlerService } from 'src/engine/core-modules/throttler/throttler.se import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; import { WorkflowRunException, @@ -107,17 +108,6 @@ export class RunWorkflowJob { await this.workflowRunWorkspaceService.startWorkflowRun({ workflowRunId, workspaceId, - output: { - flow: { - trigger: workflowVersion.trigger, - steps: workflowVersion.steps, - }, - stepsOutput: { - trigger: { - result: triggerPayload, - }, - }, - }, payload: triggerPayload, }); @@ -125,13 +115,9 @@ export class RunWorkflowJob { const rootSteps = getRootSteps(workflowVersion.steps); - await this.executeWorkflow({ + await this.workflowExecutorWorkspaceService.executeFromSteps({ + stepIds: rootSteps.map((step) => step.id), workflowRunId, - currentStepId: rootSteps[0].id, - steps: workflowVersion.steps, - context: workflowRun.context ?? { - trigger: triggerPayload, - }, workspaceId, }); } @@ -169,9 +155,10 @@ export class RunWorkflowJob { ); } - const nextStepId = lastExecutedStep.nextStepIds?.[0]; - - if (!nextStepId) { + if ( + !isDefined(lastExecutedStep.nextStepIds) || + lastExecutedStep.nextStepIds.length === 0 + ) { await this.workflowRunWorkspaceService.endWorkflowRun({ workflowRunId, workspaceId, @@ -181,46 +168,10 @@ export class RunWorkflowJob { return; } - await this.executeWorkflow({ - workflowRunId, - currentStepId: nextStepId, - steps: workflowRun.output?.flow?.steps ?? [], - context: workflowRun.context ?? {}, - workspaceId, - }); - } - - private async executeWorkflow({ - workflowRunId, - currentStepId, - steps, - context, - workspaceId, - }: { - workflowRunId: string; - currentStepId: string; - steps: WorkflowAction[]; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - context: Record; - workspaceId: string; - }) { - const { error, pendingEvent } = - await this.workflowExecutorWorkspaceService.execute({ - workflowRunId, - currentStepId, - steps, - context, - }); - - if (pendingEvent) { - return; - } - - await this.workflowRunWorkspaceService.endWorkflowRun({ + await this.workflowExecutorWorkspaceService.executeFromSteps({ + stepIds: lastExecutedStep.nextStepIds, workflowRunId, workspaceId, - status: error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED, - error, }); } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts index b0d6feb81..0dd770c7f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts @@ -8,12 +8,14 @@ import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadat import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; +import { CacheLockModule } from 'src/engine/core-modules/cache-lock/cache-lock.module'; @Module({ imports: [ WorkflowCommonModule, NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'core'), RecordPositionModule, + CacheLockModule, MetricsModule, ], providers: [WorkflowRunWorkspaceService, ScopedWorkspaceContextFactory], diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 775618827..1da904af4 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -4,6 +4,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { v4 } from 'uuid'; import { isDefined } from 'twenty-shared/utils'; +import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { objectRecordChangedValues } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-values'; @@ -18,7 +19,6 @@ import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/worksp import { StepOutput, WorkflowRunState, - WorkflowRunOutput, WorkflowRunStatus, WorkflowRunWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; @@ -30,6 +30,7 @@ import { } from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception'; import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { CacheLockService } from 'src/engine/core-modules/cache-lock/cache-lock.service'; @Injectable() export class WorkflowRunWorkspaceService { @@ -42,6 +43,7 @@ export class WorkflowRunWorkspaceService { private readonly objectMetadataRepository: Repository, private readonly recordPositionService: RecordPositionService, private readonly metricsService: MetricsService, + private readonly cacheLockService: CacheLockService, ) {} async createWorkflowRun({ @@ -116,6 +118,8 @@ export class WorkflowRunWorkspaceService { workspaceId, }); + const initState = this.getInitState(workflowVersion); + const workflowRun = workflowRunRepository.create({ id: workflowRunId ?? v4(), name: `#${workflowRunCount + 1} - ${workflow.name}`, @@ -124,7 +128,11 @@ export class WorkflowRunWorkspaceService { workflowId: workflow.id, status, position, - state: this.getInitState(workflowVersion), + state: initState, + output: { + ...initState, + stepsOutput: {}, + }, context, }); @@ -133,35 +141,31 @@ export class WorkflowRunWorkspaceService { return workflowRun.id; } - async startWorkflowRun({ + async startWorkflowRun(params: { + workflowRunId: string; + workspaceId: string; + payload: object; + }) { + await this.cacheLockService.withLock( + async () => await this.startWorkflowRunWithoutLock(params), + params.workflowRunId, + ); + } + + private async startWorkflowRunWithoutLock({ workflowRunId, workspaceId, - output, payload, }: { workflowRunId: string; workspaceId: string; - output: WorkflowRunOutput; payload: object; }) { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRunToUpdate = await workflowRunRepository.findOneBy({ - id: workflowRunId, + const workflowRunToUpdate = await this.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, }); - if (!workflowRunToUpdate) { - throw new WorkflowRunException( - 'No workflow run to start', - WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - if ( workflowRunToUpdate.status !== WorkflowRunStatus.ENQUEUED && workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED @@ -175,29 +179,47 @@ export class WorkflowRunWorkspaceService { const partialUpdate = { status: WorkflowRunStatus.RUNNING, startedAt: new Date().toISOString(), - output, + output: { + ...workflowRunToUpdate.output, + stepsOutput: { + trigger: { + result: payload, + }, + }, + }, state: { ...workflowRunToUpdate.state, stepInfos: { ...workflowRunToUpdate.state?.stepInfos, trigger: { - ...workflowRunToUpdate.state?.stepInfos.trigger, status: StepStatus.SUCCESS, result: payload, }, }, }, + context: payload + ? { + trigger: payload, + } + : (workflowRunToUpdate.context ?? {}), }; - await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); - - await this.emitWorkflowRunUpdatedEvent({ - workflowRunBefore: workflowRunToUpdate, - updatedFields: ['status', 'startedAt', 'output'], - }); + await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); } - async endWorkflowRun({ + async endWorkflowRun(params: { + workflowRunId: string; + workspaceId: string; + status: WorkflowRunStatus; + error?: string; + }) { + await this.cacheLockService.withLock( + async () => await this.endWorkflowRunWithoutLock(params), + params.workflowRunId, + ); + } + + private async endWorkflowRunWithoutLock({ workflowRunId, workspaceId, status, @@ -208,29 +230,16 @@ export class WorkflowRunWorkspaceService { status: WorkflowRunStatus; error?: string; }) { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRunToUpdate = await workflowRunRepository.findOneBy({ - id: workflowRunId, + const workflowRunToUpdate = await this.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, }); - if (!workflowRunToUpdate) { - throw new WorkflowRunException( - 'No workflow run to end', - WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - const partialUpdate = { status, endedAt: new Date().toISOString(), output: { - ...(workflowRunToUpdate.output ?? {}), + ...workflowRunToUpdate.output, error, }, state: { @@ -239,12 +248,7 @@ export class WorkflowRunWorkspaceService { }, }; - await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); - - await this.emitWorkflowRunUpdatedEvent({ - workflowRunBefore: workflowRunToUpdate, - updatedFields: ['status', 'endedAt', 'output', 'state'], - }); + await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); await this.metricsService.incrementCounter({ key: @@ -255,7 +259,19 @@ export class WorkflowRunWorkspaceService { }); } - async updateWorkflowRunStepStatus({ + async updateWorkflowRunStepStatus(params: { + workflowRunId: string; + stepId: string; + workspaceId: string; + stepStatus: StepStatus; + }) { + await this.cacheLockService.withLock( + async () => await this.updateWorkflowRunStepStatusWithoutLock(params), + params.workflowRunId, + ); + } + + private async updateWorkflowRunStepStatusWithoutLock({ workflowRunId, workspaceId, stepId, @@ -266,24 +282,11 @@ export class WorkflowRunWorkspaceService { workspaceId: string; stepStatus: StepStatus; }) { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRunToUpdate = await workflowRunRepository.findOneBy({ - id: workflowRunId, + const workflowRunToUpdate = await this.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, }); - if (!workflowRunToUpdate) { - throw new WorkflowRunException( - 'No workflow run to save', - WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - const partialUpdate = { state: { ...workflowRunToUpdate.state, @@ -297,46 +300,37 @@ export class WorkflowRunWorkspaceService { }, }; - await workflowRunRepository.update(workflowRunId, partialUpdate); - - await this.emitWorkflowRunUpdatedEvent({ - workflowRunBefore: workflowRunToUpdate, - updatedFields: ['state'], - }); + await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); } - async saveWorkflowRunState({ + async saveWorkflowRunState(params: { + workflowRunId: string; + stepOutput: StepOutput; + workspaceId: string; + stepStatus: StepStatus; + }) { + await this.cacheLockService.withLock( + async () => await this.saveWorkflowRunStateWithoutLock(params), + params.workflowRunId, + ); + } + + private async saveWorkflowRunStateWithoutLock({ workflowRunId, stepOutput, workspaceId, - context, stepStatus, }: { workflowRunId: string; stepOutput: StepOutput; workspaceId: string; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - context: Record; stepStatus: StepStatus; }) { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRunToUpdate = await workflowRunRepository.findOneBy({ - id: workflowRunId, + const workflowRunToUpdate = await this.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, }); - if (!workflowRunToUpdate) { - throw new WorkflowRunException( - 'No workflow run to save', - WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - const partialUpdate = { output: { flow: workflowRunToUpdate.output?.flow ?? { @@ -353,24 +347,38 @@ export class WorkflowRunWorkspaceService { stepInfos: { ...workflowRunToUpdate.state?.stepInfos, [stepOutput.id]: { + ...(workflowRunToUpdate.state?.stepInfos[stepOutput.id] || {}), result: stepOutput.output?.result, error: stepOutput.output?.error, status: stepStatus, }, }, }, - context, + ...(stepStatus === StepStatus.SUCCESS + ? { + context: { + ...workflowRunToUpdate.context, + [stepOutput.id]: stepOutput.output.result, + }, + } + : {}), }; - await workflowRunRepository.update(workflowRunId, partialUpdate); - - await this.emitWorkflowRunUpdatedEvent({ - workflowRunBefore: workflowRunToUpdate, - updatedFields: ['context', 'output', 'state'], - }); + await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); } - async updateWorkflowRunStep({ + async updateWorkflowRunStep(params: { + workflowRunId: string; + step: WorkflowAction; + workspaceId: string; + }) { + await this.cacheLockService.withLock( + async () => await this.updateWorkflowRunStepWithoutLock(params), + params.workflowRunId, + ); + } + + private async updateWorkflowRunStepWithoutLock({ workflowRunId, step, workspaceId, @@ -379,24 +387,11 @@ export class WorkflowRunWorkspaceService { step: WorkflowAction; workspaceId: string; }) { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRunToUpdate = await workflowRunRepository.findOneBy({ - id: workflowRunId, + const workflowRunToUpdate = await this.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, }); - if (!workflowRunToUpdate) { - throw new WorkflowRunException( - 'No workflow run to update', - WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - if ( workflowRunToUpdate.status === WorkflowRunStatus.COMPLETED || workflowRunToUpdate.status === WorkflowRunStatus.FAILED @@ -428,11 +423,25 @@ export class WorkflowRunWorkspaceService { }, }; - await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); + await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); + } - await this.emitWorkflowRunUpdatedEvent({ - workflowRunBefore: workflowRunToUpdate, - updatedFields: ['output', 'state'], + async getWorkflowRun({ + workflowRunId, + workspaceId, + }: { + workflowRunId: string; + workspaceId: string; + }): Promise { + const workflowRunRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowRun', + { shouldBypassPermissionChecks: true }, + ); + + return await workflowRunRepository.findOne({ + where: { id: workflowRunId }, }); } @@ -443,15 +452,9 @@ export class WorkflowRunWorkspaceService { workflowRunId: string; workspaceId: string; }): Promise { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRun = await workflowRunRepository.findOne({ - where: { id: workflowRunId }, + const workflowRun = await this.getWorkflowRun({ + workflowRunId, + workspaceId, }); if (!workflowRun) { @@ -560,4 +563,43 @@ export class WorkflowRunWorkspaceService { }, }; } + + private async updateWorkflowRun({ + workflowRunId, + workspaceId, + partialUpdate, + }: { + workflowRunId: string; + workspaceId: string; + partialUpdate: QueryDeepPartialEntity; + }) { + const workflowRunRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowRun', + { shouldBypassPermissionChecks: true }, + ); + + const workflowRunToUpdate = await workflowRunRepository.findOneBy({ + id: workflowRunId, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowRunException( + `workflowRun ${workflowRunId} not found`, + WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); + + const updatedFields = Object.keys(partialUpdate); + + if (updatedFields.length > 0) { + await this.emitWorkflowRunUpdatedEvent({ + workflowRunBefore: workflowRunToUpdate, + updatedFields: updatedFields, + }); + } + } }