From 2f7d8c76afda06fc0991c4799537f25dd730d9f4 Mon Sep 17 00:00:00 2001 From: martmull Date: Mon, 7 Jul 2025 22:50:34 +0200 Subject: [PATCH] 22 branches 2 (#13051) This PR is purely technical, it does produces any functional change to the user - add Lock mecanism to run steps concurrently - update `workflow-executor.workspace-service.ts` to handle multi branch workflow execution - stop passing `context` through steps, it causes race condition issue - refactor a little bit - simplify `workflow-run.workspace-service.ts` to prepare `output` and `context` removal - move workflowRun status computing from `run-workflow.job.ts` to `workflow-executor.workspace-service.ts` ## NOTA BENE When a code step depends of 2 parents like in this config (see image below) If the form is submitted before the "Code - 2s" step succeed, the branch merge "Form" step is launched twice. - once because form is submission Succeed resumes the workflow in an asynchronous job - the second time is when the asynchronous job is launched when "Code - 2s" is succeeded - the merge "Form" step makes the workflow waiting for response to trigger the resume in another job - during that time, the first resume job is launched, running the merge "Form" step again This issue only occurs with branch workflows. It will be solved by checking if the currentStepToExecute is already in a SUCCESS state or not image --- .../__tests__/cache-lock.service.spec.ts | 108 ++++++ .../cache-lock/cache-lock.module.ts | 10 + .../cache-lock/cache-lock.service.ts | 47 +++ .../services/cache-storage.service.ts | 23 ++ .../types/cache-storage-namespace.enum.ts | 1 + .../workflow-run.workspace-entity.ts | 6 +- ...workflow-version-step.workspace-service.ts | 6 - ....factory.ts => workflow-action.factory.ts} | 6 +- .../interfaces/workflow-action.interface.ts | 8 + .../interfaces/workflow-executor.interface.ts | 8 - .../types/workflow-action-input.ts | 7 + ...type.ts => workflow-action-output.type.ts} | 2 +- .../types/workflow-executor-input.ts | 15 +- .../__tests__/can-execute-step.util.spec.ts | 90 +++++ .../utils/can-execute-step.utils.ts | 23 ++ .../ai-agent/ai-agent.workflow-action.ts | 10 +- .../code/code.workflow-action.ts | 10 +- .../filter/filter.workflow-action.ts | 10 +- .../form/form.workflow-action.ts | 10 +- .../http-request.workflow-action.ts | 10 +- .../mail-sender/send-email.workflow-action.ts | 10 +- .../create-record.workflow-action.ts | 10 +- .../delete-record.workflow-action.ts | 10 +- .../find-records.workflow-action.ts | 10 +- .../update-record.workflow-action.ts | 10 +- .../workflow-executor.module.ts | 4 +- ...orkflow-executor.workspace-service.spec.ts | 274 +++++++--------- .../workflow-executor.workspace-service.ts | 284 ++++++++++------ .../workflow-runner/jobs/run-workflow.job.ts | 69 +--- .../workflow-run/workflow-run.module.ts | 2 + .../workflow-run.workspace-service.ts | 308 ++++++++++-------- 31 files changed, 877 insertions(+), 524 deletions(-) create mode 100644 packages/twenty-server/src/engine/core-modules/cache-lock/__tests__/cache-lock.service.spec.ts create mode 100644 packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.module.ts create mode 100644 packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.service.ts rename packages/twenty-server/src/modules/workflow/workflow-executor/factories/{workflow-executor.factory.ts => workflow-action.factory.ts} (94%) create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts delete mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-input.ts rename packages/twenty-server/src/modules/workflow/workflow-executor/types/{workflow-executor-output.type.ts => workflow-action-output.type.ts} (62%) create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts create mode 100644 packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.utils.ts diff --git a/packages/twenty-server/src/engine/core-modules/cache-lock/__tests__/cache-lock.service.spec.ts b/packages/twenty-server/src/engine/core-modules/cache-lock/__tests__/cache-lock.service.spec.ts new file mode 100644 index 000000000..34fccb744 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/cache-lock/__tests__/cache-lock.service.spec.ts @@ -0,0 +1,108 @@ +import { Test, TestingModule } from '@nestjs/testing'; + +import { CacheLockService } from 'src/engine/core-modules/cache-lock/cache-lock.service'; +import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; +import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; + +describe('CacheLockService', () => { + let service: CacheLockService; + let cacheStorageService: jest.Mocked; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ + CacheLockService, + { + provide: CacheStorageNamespace.EngineLock, + useValue: { + acquireLock: jest.fn(), + releaseLock: jest.fn(), + }, + }, + { + provide: CacheStorageService, + useValue: { + acquireLock: jest.fn(), + releaseLock: jest.fn(), + }, + }, + ], + }).compile(); + + service = module.get(CacheLockService); + cacheStorageService = module.get(CacheStorageNamespace.EngineLock); + jest.spyOn(service, 'delay').mockResolvedValue(undefined); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + it('should acquire the lock and execute the function', async () => { + cacheStorageService.acquireLock.mockResolvedValue(true); + cacheStorageService.releaseLock.mockResolvedValue(undefined); + + const fn = jest.fn().mockResolvedValue('success'); + + const ttl = 100; + + const result = await service.withLock(fn, 'key', { + ttl, + }); + + expect(result).toBe('success'); + expect(fn).toHaveBeenCalled(); + expect(cacheStorageService.acquireLock).toHaveBeenCalledTimes(1); + expect(cacheStorageService.acquireLock).toHaveBeenCalledWith('key', ttl); + expect(cacheStorageService.releaseLock).toHaveBeenCalledTimes(1); + expect(cacheStorageService.releaseLock).toHaveBeenCalledWith('key'); + }); + + it('should throw an error if lock cannot be acquired after max retries', async () => { + cacheStorageService.acquireLock.mockResolvedValue(false); + + const fn = jest.fn(); + const ms = 1; + const maxRetries = 3; + + await expect( + service.withLock(fn, 'key', { ms, maxRetries }), + ).rejects.toThrow('Failed to acquire lock for key: key'); + + expect(cacheStorageService.acquireLock).toHaveBeenCalledTimes(maxRetries); + expect(fn).not.toHaveBeenCalled(); + }); + + it('should retry before acquiring the lock', async () => { + const mockAcquireLock = cacheStorageService.acquireLock; + + mockAcquireLock + .mockResolvedValueOnce(false) + .mockResolvedValueOnce(false) + .mockResolvedValueOnce(true); + + const fn = jest.fn().mockResolvedValue('retried success'); + + const result = await service.withLock(fn, 'key', { + maxRetries: 5, + ms: 1, + }); + + expect(result).toBe('retried success'); + expect(fn).toHaveBeenCalledTimes(1); + expect(mockAcquireLock).toHaveBeenCalledTimes(3); + expect(cacheStorageService.releaseLock).toHaveBeenCalledWith('key'); + }); + + it('should release the lock even if the function throws', async () => { + cacheStorageService.acquireLock.mockResolvedValue(true); + cacheStorageService.releaseLock.mockResolvedValue(undefined); + + const fn = jest.fn().mockRejectedValue(new Error('fail')); + + await expect(service.withLock(fn, 'key')).rejects.toThrow('fail'); + + expect(fn).toHaveBeenCalled(); + expect(cacheStorageService.releaseLock).toHaveBeenCalledWith('key'); + }); +}); diff --git a/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.module.ts b/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.module.ts new file mode 100644 index 000000000..ac32dd0e6 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; + +import { CacheLockService } from 'src/engine/core-modules/cache-lock/cache-lock.service'; + +@Module({ + imports: [], + providers: [CacheLockService], + exports: [CacheLockService], +}) +export class CacheLockModule {} diff --git a/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.service.ts b/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.service.ts new file mode 100644 index 000000000..4e117cc38 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/cache-lock/cache-lock.service.ts @@ -0,0 +1,47 @@ +import { Injectable } from '@nestjs/common'; + +import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; +import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; + +export type CacheLockOptions = { + ms?: number; + maxRetries?: number; + ttl?: number; +}; + +@Injectable() +export class CacheLockService { + constructor( + @InjectCacheStorage(CacheStorageNamespace.EngineLock) + private readonly cacheStorageService: CacheStorageService, + ) {} + + async delay(ms: number) { + return new Promise((res) => setTimeout(res, ms)); + } + + async withLock( + fn: () => Promise, + key: string, + options?: CacheLockOptions, + ): Promise { + const { ms = 50, maxRetries = 20, ttl = 500 } = options || {}; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + const acquired = await this.cacheStorageService.acquireLock(key, ttl); + + if (acquired) { + try { + return await fn(); + } finally { + await this.cacheStorageService.releaseLock(key); + } + } + + await this.delay(ms); + } + + throw new Error(`Failed to acquire lock for key: ${key}`); + } +} diff --git a/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts b/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts index 70fe45a48..3b3b84824 100644 --- a/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts +++ b/packages/twenty-server/src/engine/core-modules/cache-storage/services/cache-storage.service.ts @@ -122,6 +122,29 @@ export class CacheStorageService { } while (cursor !== 0); } + async acquireLock(key: string, ttl = 1000): Promise { + if (!this.isRedisCache()) { + throw new Error('acquireLock is only supported with Redis cache'); + } + + const redisClient = (this.cache as RedisCache).store.client; + + const result = await redisClient.set(this.getKey(key), 'lock', { + NX: true, + PX: ttl, + }); + + return result === 'OK'; + } + + async releaseLock(key: string): Promise { + if (!this.isRedisCache()) { + throw new Error('releaseLock is only supported with Redis cache'); + } + + await this.del(key); + } + private isRedisCache() { // eslint-disable-next-line @typescript-eslint/no-explicit-any return (this.cache.store as any)?.name === 'redis'; diff --git a/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts b/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts index b135a2822..983c5db47 100644 --- a/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts +++ b/packages/twenty-server/src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum.ts @@ -3,5 +3,6 @@ export enum CacheStorageNamespace { ModuleCalendar = 'module:calendar', ModuleWorkflow = 'module:workflow', EngineWorkspace = 'engine:workspace', + EngineLock = 'engine:lock', EngineHealth = 'engine:health', } diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index ebfa01d79..c85a55c3d 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -28,7 +28,7 @@ import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/f import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; import { WorkflowRunStepInfo } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; @@ -43,7 +43,7 @@ export enum WorkflowRunStatus { export type StepOutput = { id: string; - output: WorkflowExecutorOutput; + output: WorkflowActionOutput; }; export type WorkflowRunOutput = { @@ -51,7 +51,7 @@ export type WorkflowRunOutput = { trigger: WorkflowTrigger; steps: WorkflowAction[]; }; - stepsOutput?: Record; + stepsOutput?: Record; error?: string; }; diff --git a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts index 0b3412999..ba3b198dd 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts @@ -336,16 +336,10 @@ export class WorkflowVersionStepWorkspaceService { }, }; - const updatedContext = { - ...workflowRun.context, - [stepId]: enrichedResponse, - }; - await this.workflowRunWorkspaceService.saveWorkflowRunState({ workspaceId, workflowRunId, stepOutput: newStepOutput, - context: updatedContext, stepStatus: StepStatus.SUCCESS, }); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-executor.factory.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-action.factory.ts similarity index 94% rename from packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-executor.factory.ts rename to packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-action.factory.ts index f7ed39808..59f4b91a2 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-executor.factory.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/factories/workflow-action.factory.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { WorkflowStepExecutorException, @@ -19,7 +19,7 @@ import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-execut import { WorkflowActionType } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; @Injectable() -export class WorkflowExecutorFactory { +export class WorkflowActionFactory { constructor( private readonly codeWorkflowAction: CodeWorkflowAction, private readonly sendEmailWorkflowAction: SendEmailWorkflowAction, @@ -33,7 +33,7 @@ export class WorkflowExecutorFactory { private readonly aiAgentWorkflowAction: AiAgentWorkflowAction, ) {} - get(stepType: WorkflowActionType): WorkflowExecutor { + get(stepType: WorkflowActionType): WorkflowAction { switch (stepType) { case WorkflowActionType.CODE: return this.codeWorkflowAction; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts new file mode 100644 index 000000000..909462748 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-action.interface.ts @@ -0,0 +1,8 @@ +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; + +export interface WorkflowAction { + execute( + workflowActionInput: WorkflowActionInput, + ): Promise; +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts deleted file mode 100644 index 53aa949a4..000000000 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; - -export interface WorkflowExecutor { - execute( - workflowExecutorInput: WorkflowExecutorInput, - ): Promise; -} diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-input.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-input.ts new file mode 100644 index 000000000..c7cc9a699 --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-input.ts @@ -0,0 +1,7 @@ +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export type WorkflowActionInput = { + currentStepId: string; + steps: WorkflowAction[]; + context: Record; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-output.type.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-output.type.ts similarity index 62% rename from packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-output.type.ts rename to packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-output.type.ts index 5f8ffc213..46fd0fa21 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-output.type.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-action-output.type.ts @@ -1,4 +1,4 @@ -export type WorkflowExecutorOutput = { +export type WorkflowActionOutput = { result?: object; error?: string; pendingEvent?: boolean; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts index 84e061dba..ff44471f7 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-executor-input.ts @@ -1,9 +1,12 @@ -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; - export type WorkflowExecutorInput = { - currentStepId: string; - steps: WorkflowAction[]; - context: Record; + stepIds: string[]; workflowRunId: string; - attemptCount?: number; + workspaceId: string; +}; + +export type WorkflowBranchExecutorInput = { + stepId: string; + attemptCount?: number; + workflowRunId: string; + workspaceId: string; }; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts new file mode 100644 index 000000000..4a565771a --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/__tests__/can-execute-step.util.spec.ts @@ -0,0 +1,90 @@ +import { + WorkflowAction, + WorkflowActionType, +} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils'; + +describe('canExecuteStep', () => { + const steps = [ + { + id: 'step-1', + type: WorkflowActionType.CODE, + settings: { + errorHandlingOptions: { + continueOnFailure: { value: false }, + retryOnFailure: { value: false }, + }, + }, + nextStepIds: ['step-3'], + }, + { + id: 'step-2', + type: WorkflowActionType.SEND_EMAIL, + settings: { + errorHandlingOptions: { + continueOnFailure: { value: false }, + retryOnFailure: { value: false }, + }, + }, + nextStepIds: ['step-3'], + }, + { + id: 'step-3', + type: WorkflowActionType.SEND_EMAIL, + settings: { + errorHandlingOptions: { + continueOnFailure: { value: false }, + retryOnFailure: { value: false }, + }, + }, + nextStepIds: [], + }, + ] as WorkflowAction[]; + + it('should return true if all parents succeeded', () => { + const context = { + trigger: 'trigger result', + 'step-1': 'step-1 result', + 'step-2': 'step-2 result', + }; + + const result = canExecuteStep({ context, steps, stepId: 'step-3' }); + + expect(result).toBe(true); + }); + + it('should return false if one parent is not succeeded', () => { + expect( + canExecuteStep({ + context: { + trigger: 'trigger result', + 'step-2': 'step-2 result', + }, + steps, + stepId: 'step-3', + }), + ).toBe(false); + + expect( + canExecuteStep({ + context: { + trigger: 'trigger result', + 'step-1': 'step-1 result', + }, + steps, + stepId: 'step-3', + }), + ).toBe(false); + + expect( + canExecuteStep({ + context: { + trigger: 'trigger result', + 'step-1': {}, + }, + steps, + stepId: 'step-3', + }), + ).toBe(false); + }); +}); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.utils.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.utils.ts new file mode 100644 index 000000000..81f31eb7f --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/utils/can-execute-step.utils.ts @@ -0,0 +1,23 @@ +import { isDefined } from 'twenty-shared/utils'; + +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; + +export const canExecuteStep = ({ + context, + stepId, + steps, +}: { + steps: WorkflowAction[]; + context: Record; + stepId: string; +}) => { + const parentSteps = steps.filter( + (parentStep) => + isDefined(parentStep) && parentStep.nextStepIds?.includes(stepId), + ); + + // TODO use workflowRun.state to check if step status is not COMPLETED. Return false in this case + return parentSteps.every((parentStep) => + Object.keys(context).includes(parentStep.id), + ); +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent.workflow-action.ts index 480665006..fb235ce58 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent.workflow-action.ts @@ -3,7 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { AIBillingService } from 'src/engine/core-modules/ai/services/ai-billing.service'; import { AgentExecutionService } from 'src/engine/metadata-modules/agent/agent-execution.service'; @@ -16,13 +16,13 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { isWorkflowAiAgentAction } from './guards/is-workflow-ai-agent-action.guard'; @Injectable() -export class AiAgentWorkflowAction implements WorkflowExecutor { +export class AiAgentWorkflowAction implements WorkflowAction { constructor( private readonly agentExecutionService: AgentExecutionService, private readonly aiBillingService: AIBillingService, @@ -34,7 +34,7 @@ export class AiAgentWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts index 2ee659004..0a0381b79 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; @@ -8,14 +8,14 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { isWorkflowCodeAction } from 'src/modules/workflow/workflow-executor/workflow-actions/code/guards/is-workflow-code-action.guard'; import { WorkflowCodeActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/code/types/workflow-code-action-input.type'; @Injectable() -export class CodeWorkflowAction implements WorkflowExecutor { +export class CodeWorkflowAction implements WorkflowAction { constructor( private readonly serverlessFunctionService: ServerlessFunctionService, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, @@ -25,7 +25,7 @@ export class CodeWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/filter/filter.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/filter/filter.workflow-action.ts index 8be64e4f6..00a603c9c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/filter/filter.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/filter/filter.workflow-action.ts @@ -1,20 +1,20 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { isWorkflowFilterAction } from 'src/modules/workflow/workflow-executor/workflow-actions/filter/guards/is-workflow-filter-action.guard'; import { evaluateFilterConditions } from 'src/modules/workflow/workflow-executor/workflow-actions/filter/utils/evaluate-filter-conditions.util'; @Injectable() -export class FilterWorkflowAction implements WorkflowExecutor { - async execute(input: WorkflowExecutorInput): Promise { +export class FilterWorkflowAction implements WorkflowAction { + async execute(input: WorkflowActionInput): Promise { const { currentStepId, steps, context } = input; const step = steps.find((step) => step.id === currentStepId); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/form/form.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/form/form.workflow-action.ts index bed01343c..0a5f3d5f2 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/form/form.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/form/form.workflow-action.ts @@ -1,21 +1,21 @@ import { Injectable } from '@nestjs/common'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { isWorkflowFormAction } from 'src/modules/workflow/workflow-executor/workflow-actions/form/guards/is-workflow-form-action.guard'; @Injectable() -export class FormWorkflowAction implements WorkflowExecutor { +export class FormWorkflowAction implements WorkflowAction { async execute({ currentStepId, steps, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/http-request/http-request.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/http-request/http-request.workflow-action.ts index 181d03468..787ec14a8 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/http-request/http-request.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/http-request/http-request.workflow-action.ts @@ -3,26 +3,26 @@ import { Injectable } from '@nestjs/common'; import { isString } from '@sniptt/guards'; import axios, { AxiosRequestConfig } from 'axios'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { isWorkflowHttpRequestAction } from './guards/is-workflow-http-request-action.guard'; import { WorkflowHttpRequestActionInput } from './types/workflow-http-request-action-input.type'; @Injectable() -export class HttpRequestWorkflowAction implements WorkflowExecutor { +export class HttpRequestWorkflowAction implements WorkflowAction { async execute({ currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts index a95b31d62..c6e264f87 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/mail-sender/send-email.workflow-action.ts @@ -5,7 +5,7 @@ import { JSDOM } from 'jsdom'; import { isDefined, isValidUuid } from 'twenty-shared/utils'; import { z } from 'zod'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; @@ -15,8 +15,8 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { SendEmailActionException, @@ -30,7 +30,7 @@ export type WorkflowSendEmailStepOutputSchema = { }; @Injectable() -export class SendEmailWorkflowAction implements WorkflowExecutor { +export class SendEmailWorkflowAction implements WorkflowAction { private readonly logger = new Logger(SendEmailWorkflowAction.name); constructor( private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, @@ -78,7 +78,7 @@ export class SendEmailWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts index 1f811508f..0e9c168c8 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/create-record.workflow-action.ts @@ -4,7 +4,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { isDefined } from 'class-validator'; import { Repository } from 'typeorm'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { RecordPositionService } from 'src/engine/core-modules/record-position/services/record-position.service'; @@ -19,8 +19,8 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, @@ -30,7 +30,7 @@ import { isWorkflowCreateRecordAction } from 'src/modules/workflow/workflow-exec import { WorkflowCreateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; @Injectable() -export class CreateRecordWorkflowAction implements WorkflowExecutor { +export class CreateRecordWorkflowAction implements WorkflowAction { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, @InjectRepository(ObjectMetadataEntity, 'core') @@ -46,7 +46,7 @@ export class CreateRecordWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts index 904d651e0..401a07fa0 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/delete-record.workflow-action.ts @@ -5,7 +5,7 @@ import { isDefined } from 'class-validator'; import { isValidUuid } from 'twenty-shared/utils'; import { Repository } from 'typeorm'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; @@ -16,8 +16,8 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, @@ -27,7 +27,7 @@ import { isWorkflowDeleteRecordAction } from 'src/modules/workflow/workflow-exec import { WorkflowDeleteRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; @Injectable() -export class DeleteRecordWorkflowAction implements WorkflowExecutor { +export class DeleteRecordWorkflowAction implements WorkflowAction { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, @InjectRepository(ObjectMetadataEntity, 'core') @@ -40,7 +40,7 @@ export class DeleteRecordWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts index b5ab53354..11d07a034 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/find-records.workflow-action.ts @@ -9,7 +9,7 @@ import { ObjectRecordOrderBy, OrderByDirection, } from 'src/engine/api/graphql/workspace-query-builder/interfaces/object-record.interface'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { GraphqlQueryParser } from 'src/engine/api/graphql/graphql-query-runner/graphql-query-parsers/graphql-query.parser'; import { ObjectMetadataItemWithFieldMaps } from 'src/engine/metadata-modules/types/object-metadata-item-with-field-maps'; @@ -23,8 +23,8 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, @@ -34,7 +34,7 @@ import { isWorkflowFindRecordsAction } from 'src/modules/workflow/workflow-execu import { WorkflowFindRecordsActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; @Injectable() -export class FindRecordsWorkflowAction implements WorkflowExecutor { +export class FindRecordsWorkflowAction implements WorkflowAction { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, @@ -45,7 +45,7 @@ export class FindRecordsWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts index eb5a9fee6..39ea5dd5b 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/record-crud/update-record.workflow-action.ts @@ -5,7 +5,7 @@ import deepEqual from 'deep-equal'; import { isDefined, isValidUuid } from 'twenty-shared/utils'; import { Repository } from 'typeorm'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; +import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { objectRecordChangedValues } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-values'; @@ -20,8 +20,8 @@ import { WorkflowStepExecutorException, WorkflowStepExecutorExceptionCode, } from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util'; import { RecordCRUDActionException, @@ -31,7 +31,7 @@ import { isWorkflowUpdateRecordAction } from 'src/modules/workflow/workflow-exec import { WorkflowUpdateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type'; @Injectable() -export class UpdateRecordWorkflowAction implements WorkflowExecutor { +export class UpdateRecordWorkflowAction implements WorkflowAction { constructor( private readonly twentyORMGlobalManager: TwentyORMGlobalManager, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, @@ -46,7 +46,7 @@ export class UpdateRecordWorkflowAction implements WorkflowExecutor { currentStepId, steps, context, - }: WorkflowExecutorInput): Promise { + }: WorkflowActionInput): Promise { const step = steps.find((step) => step.id === currentStepId); if (!step) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts index 4ae954445..ef6cd25db 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-executor.module.ts @@ -4,7 +4,7 @@ import { BillingModule } from 'src/engine/core-modules/billing/billing.module'; import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; -import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory'; +import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; import { AiAgentActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent-action.module'; import { CodeActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/code/code-action.module'; import { FilterActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/filter/filter-action.module'; @@ -32,7 +32,7 @@ import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow providers: [ WorkflowExecutorWorkspaceService, ScopedWorkspaceContextFactory, - WorkflowExecutorFactory, + WorkflowActionFactory, ], exports: [WorkflowExecutorWorkspaceService], }) diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts index f38520e67..ff1b75f3e 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts @@ -4,9 +4,8 @@ import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/ import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant'; import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; import { BillingService } from 'src/engine/core-modules/billing/services/billing.service'; -import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; -import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory'; +import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; import { WorkflowAction, WorkflowActionType, @@ -14,10 +13,26 @@ import { import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; +import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils'; + +jest.mock( + 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils', + () => { + const actual = jest.requireActual( + 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils', + ); + + return { + ...actual, + canExecuteStep: jest.fn().mockReturnValue(true), // default behavior + }; + }, +); describe('WorkflowExecutorWorkspaceService', () => { let service: WorkflowExecutorWorkspaceService; - let workflowExecutorFactory: WorkflowExecutorFactory; + let workflowActionFactory: WorkflowActionFactory; let workspaceEventEmitter: WorkspaceEventEmitter; let workflowRunWorkspaceService: WorkflowRunWorkspaceService; @@ -29,22 +44,16 @@ describe('WorkflowExecutorWorkspaceService', () => { emitCustomBatchEvent: jest.fn(), }; - const mockScopedWorkspaceContext = { - workspaceId: 'workspace-id', - }; - - const mockScopedWorkspaceContextFactory = { - create: jest.fn().mockReturnValue(mockScopedWorkspaceContext), - }; - const mockWorkflowRunWorkspaceService = { - saveWorkflowRunState: jest.fn(), + endWorkflowRun: jest.fn(), updateWorkflowRunStepStatus: jest.fn(), + saveWorkflowRunState: jest.fn(), + getWorkflowRun: jest.fn(), }; const mockBillingService = { - isBillingEnabled: jest.fn(), - canBillMeteredProduct: jest.fn(), + isBillingEnabled: jest.fn().mockReturnValue(true), + canBillMeteredProduct: jest.fn().mockReturnValue(true), }; beforeEach(async () => { @@ -54,7 +63,7 @@ describe('WorkflowExecutorWorkspaceService', () => { providers: [ WorkflowExecutorWorkspaceService, { - provide: WorkflowExecutorFactory, + provide: WorkflowActionFactory, useValue: { get: jest.fn().mockReturnValue(mockWorkflowExecutor), }, @@ -63,10 +72,6 @@ describe('WorkflowExecutorWorkspaceService', () => { provide: WorkspaceEventEmitter, useValue: mockWorkspaceEventEmitter, }, - { - provide: ScopedWorkspaceContextFactory, - useValue: mockScopedWorkspaceContextFactory, - }, { provide: WorkflowRunWorkspaceService, useValue: mockWorkflowRunWorkspaceService, @@ -81,8 +86,8 @@ describe('WorkflowExecutorWorkspaceService', () => { service = module.get( WorkflowExecutorWorkspaceService, ); - workflowExecutorFactory = module.get( - WorkflowExecutorFactory, + workflowActionFactory = module.get( + WorkflowActionFactory, ); workspaceEventEmitter = module.get( WorkspaceEventEmitter, @@ -94,7 +99,8 @@ describe('WorkflowExecutorWorkspaceService', () => { describe('execute', () => { const mockWorkflowRunId = 'workflow-run-id'; - const mockContext = { data: 'some-data' }; + const mockWorkspaceId = 'workspace-id'; + const mockContext = { trigger: 'trigger-result' }; const mockSteps = [ { id: 'step-1', @@ -120,20 +126,9 @@ describe('WorkflowExecutorWorkspaceService', () => { }, ] as WorkflowAction[]; - it('should return success when all steps are completed', async () => { - // No steps to execute - const result = await service.execute({ - workflowRunId: mockWorkflowRunId, - currentStepId: 'step-2', - steps: mockSteps, - context: mockContext, - }); - - expect(result).toEqual({ - result: { - success: true, - }, - }); + mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({ + output: { flow: { steps: mockSteps } }, + context: mockContext, }); it('should execute a step and continue to the next step on success', async () => { @@ -143,24 +138,22 @@ describe('WorkflowExecutorWorkspaceService', () => { mockWorkflowExecutor.execute.mockResolvedValueOnce(mockStepResult); - const result = await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, + }); + + expect(workflowActionFactory.get).toHaveBeenCalledWith( + WorkflowActionType.CODE, + ); + + expect(mockWorkflowExecutor.execute).toHaveBeenCalledWith({ currentStepId: 'step-1', steps: mockSteps, context: mockContext, }); - // execute first step - expect(workflowExecutorFactory.get).toHaveBeenCalledWith( - WorkflowActionType.CODE, - ); - expect(mockWorkflowExecutor.execute).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: mockSteps, - context: mockContext, - attemptCount: 1, - }); expect(workspaceEventEmitter.emitCustomBatchEvent).toHaveBeenCalledWith( BILLING_FEATURE_USED, [ @@ -171,9 +164,11 @@ describe('WorkflowExecutorWorkspaceService', () => { ], 'workspace-id', ); + expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledTimes(2); + expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ @@ -182,9 +177,11 @@ describe('WorkflowExecutorWorkspaceService', () => { workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledTimes(2); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ @@ -193,20 +190,12 @@ describe('WorkflowExecutorWorkspaceService', () => { id: 'step-1', output: mockStepResult, }, - context: { - data: 'some-data', - 'step-1': { - stepOutput: 'success', - }, - }, workspaceId: 'workspace-id', stepStatus: StepStatus.SUCCESS, }); - expect(result).toEqual({ result: { success: true } }); - // execute second step - expect(workflowExecutorFactory.get).toHaveBeenCalledWith( + expect(workflowActionFactory.get).toHaveBeenCalledWith( WorkflowActionType.SEND_EMAIL, ); }); @@ -216,20 +205,18 @@ describe('WorkflowExecutorWorkspaceService', () => { new Error('Step execution failed'), ); - const result = await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: mockSteps, - context: mockContext, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, }); - expect(result).toEqual({ - error: 'Step execution failed', - }); expect(workspaceEventEmitter.emitCustomBatchEvent).not.toHaveBeenCalled(); + expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledTimes(1); + expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ @@ -238,9 +225,11 @@ describe('WorkflowExecutorWorkspaceService', () => { workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledTimes(1); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ @@ -251,7 +240,6 @@ describe('WorkflowExecutorWorkspaceService', () => { error: 'Step execution failed', }, }, - context: mockContext, workspaceId: 'workspace-id', stepStatus: StepStatus.FAILED, }); @@ -264,17 +252,16 @@ describe('WorkflowExecutorWorkspaceService', () => { mockWorkflowExecutor.execute.mockResolvedValueOnce(mockPendingEvent); - const result = await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: mockSteps, - context: mockContext, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, }); - expect(result).toEqual(mockPendingEvent); expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledTimes(1); + expect( workflowRunWorkspaceService.updateWorkflowRunStepStatus, ).toHaveBeenCalledWith({ @@ -283,9 +270,11 @@ describe('WorkflowExecutorWorkspaceService', () => { workspaceId: 'workspace-id', stepStatus: StepStatus.RUNNING, }); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledTimes(1); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ @@ -294,13 +283,12 @@ describe('WorkflowExecutorWorkspaceService', () => { id: 'step-1', output: mockPendingEvent, }, - context: mockContext, workspaceId: 'workspace-id', stepStatus: StepStatus.PENDING, }); // No recursive call to execute should happen - expect(workflowExecutorFactory.get).not.toHaveBeenCalledWith( + expect(workflowActionFactory.get).not.toHaveBeenCalledWith( WorkflowActionType.SEND_EMAIL, ); }); @@ -330,15 +318,19 @@ describe('WorkflowExecutorWorkspaceService', () => { }, ] as WorkflowAction[]; + mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValueOnce({ + output: { flow: { steps: stepsWithContinueOnFailure } }, + context: mockContext, + }); + mockWorkflowExecutor.execute.mockResolvedValueOnce({ error: 'Step execution failed but continue', }); - const result = await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: stepsWithContinueOnFailure, - context: mockContext, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, }); expect( @@ -368,14 +360,12 @@ describe('WorkflowExecutorWorkspaceService', () => { error: 'Step execution failed but continue', }, }, - context: mockContext, workspaceId: 'workspace-id', stepStatus: StepStatus.FAILED, }); - expect(result).toEqual({ result: { success: true } }); // execute second step - expect(workflowExecutorFactory.get).toHaveBeenCalledWith( + expect(workflowActionFactory.get).toHaveBeenCalledWith( WorkflowActionType.SEND_EMAIL, ); }); @@ -394,122 +384,86 @@ describe('WorkflowExecutorWorkspaceService', () => { }, ] as WorkflowAction[]; - mockWorkflowExecutor.execute.mockResolvedValueOnce({ + mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({ + output: { flow: { steps: stepsWithRetryOnFailure } }, + context: mockContext, + }); + + mockWorkflowExecutor.execute.mockResolvedValue({ error: 'Step execution failed, will retry', }); - await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: stepsWithRetryOnFailure, - context: mockContext, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, }); - // Should call execute again with increased attemptCount - expect(workflowExecutorFactory.get).toHaveBeenCalledWith( - WorkflowActionType.CODE, - ); - expect(workflowExecutorFactory.get).not.toHaveBeenCalledWith( + for (let attempt = 1; attempt <= 3; attempt++) { + expect(workflowActionFactory.get).toHaveBeenNthCalledWith( + attempt, + WorkflowActionType.CODE, + ); + } + + expect(workflowActionFactory.get).not.toHaveBeenCalledWith( WorkflowActionType.SEND_EMAIL, ); - expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(2); - }); - - it('should stop retrying after MAX_RETRIES_ON_FAILURE', async () => { - const stepsWithRetryOnFailure = [ - { - id: 'step-1', - type: WorkflowActionType.CODE, - settings: { - errorHandlingOptions: { - continueOnFailure: { value: false }, - retryOnFailure: { value: true }, - }, - }, - }, - ] as WorkflowAction[]; - - const errorOutput = { - error: 'Step execution failed, max retries reached', - }; - - mockWorkflowExecutor.execute.mockResolvedValueOnce(errorOutput); - - const result = await service.execute({ - workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: stepsWithRetryOnFailure, - context: mockContext, - attemptCount: 3, // MAX_RETRIES_ON_FAILURE is 3 - }); - - // Should not retry anymore - expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(1); - expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledTimes(1); - expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - stepId: 'step-1', - workspaceId: 'workspace-id', - stepStatus: StepStatus.RUNNING, - }); - expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledTimes(1); - expect( - workflowRunWorkspaceService.saveWorkflowRunState, - ).toHaveBeenCalledWith({ - workflowRunId: mockWorkflowRunId, - stepOutput: { - id: 'step-1', - output: errorOutput, - }, - context: mockContext, - workspaceId: 'workspace-id', - stepStatus: StepStatus.FAILED, - }); - expect(result).toEqual(errorOutput); }); it('should stop when billing validation fails', async () => { mockBillingService.isBillingEnabled.mockReturnValueOnce(true); mockBillingService.canBillMeteredProduct.mockReturnValueOnce(false); - const result = await service.execute({ + await service.executeFromSteps({ workflowRunId: mockWorkflowRunId, - currentStepId: 'step-1', - steps: mockSteps, - context: mockContext, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, }); - expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(1); + expect(workflowActionFactory.get).toHaveBeenCalledTimes(0); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledTimes(1); - expect( - workflowRunWorkspaceService.updateWorkflowRunStepStatus, - ).not.toHaveBeenCalled(); + + expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledTimes( + 1, + ); + expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ workflowRunId: mockWorkflowRunId, + workspaceId: 'workspace-id', stepOutput: { id: 'step-1', output: { error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, }, }, - context: mockContext, - workspaceId: 'workspace-id', stepStatus: StepStatus.FAILED, }); - expect(result).toEqual({ + + expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledWith({ + workflowRunId: mockWorkflowRunId, + workspaceId: 'workspace-id', + status: WorkflowRunStatus.FAILED, error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, }); }); + + it('should return if step should not be executed', async () => { + (canExecuteStep as jest.Mock).mockReturnValueOnce(false); + + await service.executeFromSteps({ + workflowRunId: mockWorkflowRunId, + stepIds: ['step-1'], + workspaceId: mockWorkspaceId, + }); + + expect(workflowActionFactory.get).not.toHaveBeenCalled(); + }); }); describe('sendWorkflowNodeRunEvent', () => { diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index 5eeaaae97..3c918b331 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -2,112 +2,103 @@ import { Injectable } from '@nestjs/common'; import { isDefined } from 'twenty-shared/utils'; -import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface'; - import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant'; import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant'; import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum'; import { BillingService } from 'src/engine/core-modules/billing/services/billing.service'; import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type'; -import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { StepOutput, - WorkflowRunOutput, WorkflowRunStatus, } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; -import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory'; -import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; -import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; +import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory'; +import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; -import { - WorkflowTriggerException, - WorkflowTriggerExceptionCode, -} from 'src/modules/workflow/workflow-trigger/exceptions/workflow-trigger.exception'; import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; +import { + WorkflowBranchExecutorInput, + WorkflowExecutorInput, +} from 'src/modules/workflow/workflow-executor/types/workflow-executor-input'; +import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils'; const MAX_RETRIES_ON_FAILURE = 3; -export type WorkflowExecutorState = { - stepsOutput: WorkflowRunOutput['stepsOutput']; - status: WorkflowRunStatus; -}; - @Injectable() -export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { +export class WorkflowExecutorWorkspaceService { constructor( - private readonly workflowExecutorFactory: WorkflowExecutorFactory, + private readonly workflowActionFactory: WorkflowActionFactory, private readonly workspaceEventEmitter: WorkspaceEventEmitter, - private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, private readonly billingService: BillingService, ) {} - async execute({ - currentStepId, - steps, - context, + async executeFromSteps({ + stepIds, + workflowRunId, + workspaceId, + }: WorkflowExecutorInput) { + await Promise.all( + stepIds.map(async (stepIdToExecute) => { + await this.executeFromStep({ + stepId: stepIdToExecute, + workflowRunId, + workspaceId, + }); + }), + ); + } + + private async executeFromStep({ + stepId, attemptCount = 1, workflowRunId, - }: WorkflowExecutorInput): Promise { - const step = steps.find((step) => step.id === currentStepId); + workspaceId, + }: WorkflowBranchExecutorInput) { + const workflowRunInfo = await this.getWorkflowRunInfoOrEndWorkflowRun({ + stepId: stepId, + workflowRunId, + workspaceId, + }); - if (!step) { - return { - error: 'Step not found', - }; + if (!isDefined(workflowRunInfo)) { + return; } - const workflowExecutor = this.workflowExecutorFactory.get(step.type); + const { stepToExecute, steps, context } = workflowRunInfo; - let actionOutput: WorkflowExecutorOutput; - - const { workspaceId } = this.scopedWorkspaceContextFactory.create(); - - if (!workspaceId) { - throw new WorkflowTriggerException( - 'No workspace id found', - WorkflowTriggerExceptionCode.INTERNAL_ERROR, - ); + if (!canExecuteStep({ stepId: stepToExecute.id, steps, context })) { + return; } - if ( - this.billingService.isBillingEnabled() && - !(await this.canBillWorkflowNodeExecution(workspaceId)) - ) { - const billingOutput = { - error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, - }; - - await this.workflowRunWorkspaceService.saveWorkflowRunState({ - workspaceId, + const checkCanBillWorkflowNodeExecution = + await this.checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({ + stepIdToExecute: stepToExecute.id, workflowRunId, - stepOutput: { - id: step.id, - output: billingOutput, - }, - context, - stepStatus: StepStatus.FAILED, + workspaceId, }); - return billingOutput; + if (!checkCanBillWorkflowNodeExecution) { + return; } + const workflowAction = this.workflowActionFactory.get(stepToExecute.type); + + let actionOutput: WorkflowActionOutput; + await this.workflowRunWorkspaceService.updateWorkflowRunStepStatus({ workflowRunId, - stepId: step.id, + stepId: stepToExecute.id, workspaceId, stepStatus: StepStatus.RUNNING, }); try { - actionOutput = await workflowExecutor.execute({ - currentStepId, + actionOutput = await workflowAction.execute({ + currentStepId: stepId, steps, context, - attemptCount, - workflowRunId, }); } catch (error) { actionOutput = { @@ -120,7 +111,7 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { } const stepOutput: StepOutput = { - id: step.id, + id: stepToExecute.id, output: actionOutput, }; @@ -128,73 +119,145 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, stepOutput, - context, workspaceId, stepStatus: StepStatus.PENDING, }); - return actionOutput; + return; } const actionOutputSuccess = isDefined(actionOutput.result); const shouldContinue = actionOutputSuccess || - step.settings.errorHandlingOptions.continueOnFailure.value; + stepToExecute.settings.errorHandlingOptions.continueOnFailure.value; if (shouldContinue) { - const updatedContext = isDefined(actionOutput.result) - ? { - ...context, - [step.id]: actionOutput.result, - } - : context; - await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, stepOutput, - context: updatedContext, workspaceId, stepStatus: isDefined(actionOutput.result) ? StepStatus.SUCCESS : StepStatus.FAILED, }); - if (!isDefined(step.nextStepIds?.[0])) { - return actionOutput; + if ( + !isDefined(stepToExecute.nextStepIds) || + stepToExecute.nextStepIds.length === 0 + ) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.COMPLETED, + }); + + return; } - // TODO: handle multiple next steps - return await this.execute({ + await this.executeFromSteps({ + stepIds: stepToExecute.nextStepIds, workflowRunId, - currentStepId: step.nextStepIds[0], - steps, - context: updatedContext, + workspaceId, }); + + return; } if ( - step.settings.errorHandlingOptions.retryOnFailure.value && + stepToExecute.settings.errorHandlingOptions.retryOnFailure.value && attemptCount < MAX_RETRIES_ON_FAILURE ) { - return await this.execute({ - workflowRunId, - currentStepId, - steps, - context, + await this.executeFromStep({ + stepId, attemptCount: attemptCount + 1, + workflowRunId, + workspaceId, }); + + return; } await this.workflowRunWorkspaceService.saveWorkflowRunState({ workflowRunId, stepOutput, - context, workspaceId, stepStatus: StepStatus.FAILED, }); - return actionOutput; + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: stepOutput.output.error, + }); + } + + private async getWorkflowRunInfoOrEndWorkflowRun({ + stepId, + workflowRunId, + workspaceId, + }: { + stepId: string; + workflowRunId: string; + workspaceId: string; + }) { + const workflowRun = await this.workflowRunWorkspaceService.getWorkflowRun({ + workflowRunId, + workspaceId, + }); + + if (!isDefined(workflowRun)) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: `WorkflowRun ${workflowRunId} not found`, + }); + + return; + } + + const steps = workflowRun.output?.flow.steps; + + const context = workflowRun.context; + + if (!isDefined(steps)) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: 'Steps undefined', + }); + + return; + } + + if (!isDefined(context)) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: 'Context not found', + }); + + return; + } + + const stepToExecute = steps.find((step) => step.id === stepId); + + if (!stepToExecute) { + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: 'Step not found', + }); + + return; + } + + return { stepToExecute, steps, context }; } private sendWorkflowNodeRunEvent(workspaceId: string) { @@ -210,10 +273,45 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { ); } - private async canBillWorkflowNodeExecution(workspaceId: string) { - return this.billingService.canBillMeteredProduct( - workspaceId, - BillingProductKey.WORKFLOW_NODE_EXECUTION, - ); + private async checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({ + stepIdToExecute, + workflowRunId, + workspaceId, + }: { + stepIdToExecute: string; + workflowRunId: string; + workspaceId: string; + }) { + const canBillWorkflowNodeExecution = + !this.billingService.isBillingEnabled() || + (await this.billingService.canBillMeteredProduct( + workspaceId, + BillingProductKey.WORKFLOW_NODE_EXECUTION, + )); + + if (!canBillWorkflowNodeExecution) { + const billingOutput = { + error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, + }; + + await this.workflowRunWorkspaceService.saveWorkflowRunState({ + workspaceId, + workflowRunId, + stepOutput: { + id: stepIdToExecute, + output: billingOutput, + }, + stepStatus: StepStatus.FAILED, + }); + + await this.workflowRunWorkspaceService.endWorkflowRun({ + workflowRunId, + workspaceId, + status: WorkflowRunStatus.FAILED, + error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, + }); + } + + return canBillWorkflowNodeExecution; } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index 5ce119bf4..c3cf95fe5 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -1,5 +1,7 @@ import { Scope } from '@nestjs/common'; +import { isDefined } from 'twenty-shared/utils'; + import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; @@ -9,7 +11,6 @@ import { ThrottlerService } from 'src/engine/core-modules/throttler/throttler.se import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service'; import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service'; -import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; import { WorkflowRunException, @@ -107,17 +108,6 @@ export class RunWorkflowJob { await this.workflowRunWorkspaceService.startWorkflowRun({ workflowRunId, workspaceId, - output: { - flow: { - trigger: workflowVersion.trigger, - steps: workflowVersion.steps, - }, - stepsOutput: { - trigger: { - result: triggerPayload, - }, - }, - }, payload: triggerPayload, }); @@ -125,13 +115,9 @@ export class RunWorkflowJob { const rootSteps = getRootSteps(workflowVersion.steps); - await this.executeWorkflow({ + await this.workflowExecutorWorkspaceService.executeFromSteps({ + stepIds: rootSteps.map((step) => step.id), workflowRunId, - currentStepId: rootSteps[0].id, - steps: workflowVersion.steps, - context: workflowRun.context ?? { - trigger: triggerPayload, - }, workspaceId, }); } @@ -169,9 +155,10 @@ export class RunWorkflowJob { ); } - const nextStepId = lastExecutedStep.nextStepIds?.[0]; - - if (!nextStepId) { + if ( + !isDefined(lastExecutedStep.nextStepIds) || + lastExecutedStep.nextStepIds.length === 0 + ) { await this.workflowRunWorkspaceService.endWorkflowRun({ workflowRunId, workspaceId, @@ -181,46 +168,10 @@ export class RunWorkflowJob { return; } - await this.executeWorkflow({ - workflowRunId, - currentStepId: nextStepId, - steps: workflowRun.output?.flow?.steps ?? [], - context: workflowRun.context ?? {}, - workspaceId, - }); - } - - private async executeWorkflow({ - workflowRunId, - currentStepId, - steps, - context, - workspaceId, - }: { - workflowRunId: string; - currentStepId: string; - steps: WorkflowAction[]; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - context: Record; - workspaceId: string; - }) { - const { error, pendingEvent } = - await this.workflowExecutorWorkspaceService.execute({ - workflowRunId, - currentStepId, - steps, - context, - }); - - if (pendingEvent) { - return; - } - - await this.workflowRunWorkspaceService.endWorkflowRun({ + await this.workflowExecutorWorkspaceService.executeFromSteps({ + stepIds: lastExecutedStep.nextStepIds, workflowRunId, workspaceId, - status: error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED, - error, }); } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts index b0d6feb81..0dd770c7f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts @@ -8,12 +8,14 @@ import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadat import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; +import { CacheLockModule } from 'src/engine/core-modules/cache-lock/cache-lock.module'; @Module({ imports: [ WorkflowCommonModule, NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'core'), RecordPositionModule, + CacheLockModule, MetricsModule, ], providers: [WorkflowRunWorkspaceService, ScopedWorkspaceContextFactory], diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 775618827..1da904af4 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -4,6 +4,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { v4 } from 'uuid'; import { isDefined } from 'twenty-shared/utils'; +import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { objectRecordChangedValues } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-values'; @@ -18,7 +19,6 @@ import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/worksp import { StepOutput, WorkflowRunState, - WorkflowRunOutput, WorkflowRunStatus, WorkflowRunWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; @@ -30,6 +30,7 @@ import { } from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception'; import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { CacheLockService } from 'src/engine/core-modules/cache-lock/cache-lock.service'; @Injectable() export class WorkflowRunWorkspaceService { @@ -42,6 +43,7 @@ export class WorkflowRunWorkspaceService { private readonly objectMetadataRepository: Repository, private readonly recordPositionService: RecordPositionService, private readonly metricsService: MetricsService, + private readonly cacheLockService: CacheLockService, ) {} async createWorkflowRun({ @@ -116,6 +118,8 @@ export class WorkflowRunWorkspaceService { workspaceId, }); + const initState = this.getInitState(workflowVersion); + const workflowRun = workflowRunRepository.create({ id: workflowRunId ?? v4(), name: `#${workflowRunCount + 1} - ${workflow.name}`, @@ -124,7 +128,11 @@ export class WorkflowRunWorkspaceService { workflowId: workflow.id, status, position, - state: this.getInitState(workflowVersion), + state: initState, + output: { + ...initState, + stepsOutput: {}, + }, context, }); @@ -133,35 +141,31 @@ export class WorkflowRunWorkspaceService { return workflowRun.id; } - async startWorkflowRun({ + async startWorkflowRun(params: { + workflowRunId: string; + workspaceId: string; + payload: object; + }) { + await this.cacheLockService.withLock( + async () => await this.startWorkflowRunWithoutLock(params), + params.workflowRunId, + ); + } + + private async startWorkflowRunWithoutLock({ workflowRunId, workspaceId, - output, payload, }: { workflowRunId: string; workspaceId: string; - output: WorkflowRunOutput; payload: object; }) { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRunToUpdate = await workflowRunRepository.findOneBy({ - id: workflowRunId, + const workflowRunToUpdate = await this.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, }); - if (!workflowRunToUpdate) { - throw new WorkflowRunException( - 'No workflow run to start', - WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - if ( workflowRunToUpdate.status !== WorkflowRunStatus.ENQUEUED && workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED @@ -175,29 +179,47 @@ export class WorkflowRunWorkspaceService { const partialUpdate = { status: WorkflowRunStatus.RUNNING, startedAt: new Date().toISOString(), - output, + output: { + ...workflowRunToUpdate.output, + stepsOutput: { + trigger: { + result: payload, + }, + }, + }, state: { ...workflowRunToUpdate.state, stepInfos: { ...workflowRunToUpdate.state?.stepInfos, trigger: { - ...workflowRunToUpdate.state?.stepInfos.trigger, status: StepStatus.SUCCESS, result: payload, }, }, }, + context: payload + ? { + trigger: payload, + } + : (workflowRunToUpdate.context ?? {}), }; - await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); - - await this.emitWorkflowRunUpdatedEvent({ - workflowRunBefore: workflowRunToUpdate, - updatedFields: ['status', 'startedAt', 'output'], - }); + await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); } - async endWorkflowRun({ + async endWorkflowRun(params: { + workflowRunId: string; + workspaceId: string; + status: WorkflowRunStatus; + error?: string; + }) { + await this.cacheLockService.withLock( + async () => await this.endWorkflowRunWithoutLock(params), + params.workflowRunId, + ); + } + + private async endWorkflowRunWithoutLock({ workflowRunId, workspaceId, status, @@ -208,29 +230,16 @@ export class WorkflowRunWorkspaceService { status: WorkflowRunStatus; error?: string; }) { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRunToUpdate = await workflowRunRepository.findOneBy({ - id: workflowRunId, + const workflowRunToUpdate = await this.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, }); - if (!workflowRunToUpdate) { - throw new WorkflowRunException( - 'No workflow run to end', - WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - const partialUpdate = { status, endedAt: new Date().toISOString(), output: { - ...(workflowRunToUpdate.output ?? {}), + ...workflowRunToUpdate.output, error, }, state: { @@ -239,12 +248,7 @@ export class WorkflowRunWorkspaceService { }, }; - await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); - - await this.emitWorkflowRunUpdatedEvent({ - workflowRunBefore: workflowRunToUpdate, - updatedFields: ['status', 'endedAt', 'output', 'state'], - }); + await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); await this.metricsService.incrementCounter({ key: @@ -255,7 +259,19 @@ export class WorkflowRunWorkspaceService { }); } - async updateWorkflowRunStepStatus({ + async updateWorkflowRunStepStatus(params: { + workflowRunId: string; + stepId: string; + workspaceId: string; + stepStatus: StepStatus; + }) { + await this.cacheLockService.withLock( + async () => await this.updateWorkflowRunStepStatusWithoutLock(params), + params.workflowRunId, + ); + } + + private async updateWorkflowRunStepStatusWithoutLock({ workflowRunId, workspaceId, stepId, @@ -266,24 +282,11 @@ export class WorkflowRunWorkspaceService { workspaceId: string; stepStatus: StepStatus; }) { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRunToUpdate = await workflowRunRepository.findOneBy({ - id: workflowRunId, + const workflowRunToUpdate = await this.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, }); - if (!workflowRunToUpdate) { - throw new WorkflowRunException( - 'No workflow run to save', - WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - const partialUpdate = { state: { ...workflowRunToUpdate.state, @@ -297,46 +300,37 @@ export class WorkflowRunWorkspaceService { }, }; - await workflowRunRepository.update(workflowRunId, partialUpdate); - - await this.emitWorkflowRunUpdatedEvent({ - workflowRunBefore: workflowRunToUpdate, - updatedFields: ['state'], - }); + await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); } - async saveWorkflowRunState({ + async saveWorkflowRunState(params: { + workflowRunId: string; + stepOutput: StepOutput; + workspaceId: string; + stepStatus: StepStatus; + }) { + await this.cacheLockService.withLock( + async () => await this.saveWorkflowRunStateWithoutLock(params), + params.workflowRunId, + ); + } + + private async saveWorkflowRunStateWithoutLock({ workflowRunId, stepOutput, workspaceId, - context, stepStatus, }: { workflowRunId: string; stepOutput: StepOutput; workspaceId: string; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - context: Record; stepStatus: StepStatus; }) { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRunToUpdate = await workflowRunRepository.findOneBy({ - id: workflowRunId, + const workflowRunToUpdate = await this.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, }); - if (!workflowRunToUpdate) { - throw new WorkflowRunException( - 'No workflow run to save', - WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - const partialUpdate = { output: { flow: workflowRunToUpdate.output?.flow ?? { @@ -353,24 +347,38 @@ export class WorkflowRunWorkspaceService { stepInfos: { ...workflowRunToUpdate.state?.stepInfos, [stepOutput.id]: { + ...(workflowRunToUpdate.state?.stepInfos[stepOutput.id] || {}), result: stepOutput.output?.result, error: stepOutput.output?.error, status: stepStatus, }, }, }, - context, + ...(stepStatus === StepStatus.SUCCESS + ? { + context: { + ...workflowRunToUpdate.context, + [stepOutput.id]: stepOutput.output.result, + }, + } + : {}), }; - await workflowRunRepository.update(workflowRunId, partialUpdate); - - await this.emitWorkflowRunUpdatedEvent({ - workflowRunBefore: workflowRunToUpdate, - updatedFields: ['context', 'output', 'state'], - }); + await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); } - async updateWorkflowRunStep({ + async updateWorkflowRunStep(params: { + workflowRunId: string; + step: WorkflowAction; + workspaceId: string; + }) { + await this.cacheLockService.withLock( + async () => await this.updateWorkflowRunStepWithoutLock(params), + params.workflowRunId, + ); + } + + private async updateWorkflowRunStepWithoutLock({ workflowRunId, step, workspaceId, @@ -379,24 +387,11 @@ export class WorkflowRunWorkspaceService { step: WorkflowAction; workspaceId: string; }) { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRunToUpdate = await workflowRunRepository.findOneBy({ - id: workflowRunId, + const workflowRunToUpdate = await this.getWorkflowRunOrFail({ + workflowRunId, + workspaceId, }); - if (!workflowRunToUpdate) { - throw new WorkflowRunException( - 'No workflow run to update', - WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, - ); - } - if ( workflowRunToUpdate.status === WorkflowRunStatus.COMPLETED || workflowRunToUpdate.status === WorkflowRunStatus.FAILED @@ -428,11 +423,25 @@ export class WorkflowRunWorkspaceService { }, }; - await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); + await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate }); + } - await this.emitWorkflowRunUpdatedEvent({ - workflowRunBefore: workflowRunToUpdate, - updatedFields: ['output', 'state'], + async getWorkflowRun({ + workflowRunId, + workspaceId, + }: { + workflowRunId: string; + workspaceId: string; + }): Promise { + const workflowRunRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowRun', + { shouldBypassPermissionChecks: true }, + ); + + return await workflowRunRepository.findOne({ + where: { id: workflowRunId }, }); } @@ -443,15 +452,9 @@ export class WorkflowRunWorkspaceService { workflowRunId: string; workspaceId: string; }): Promise { - const workflowRunRepository = - await this.twentyORMGlobalManager.getRepositoryForWorkspace( - workspaceId, - 'workflowRun', - { shouldBypassPermissionChecks: true }, - ); - - const workflowRun = await workflowRunRepository.findOne({ - where: { id: workflowRunId }, + const workflowRun = await this.getWorkflowRun({ + workflowRunId, + workspaceId, }); if (!workflowRun) { @@ -560,4 +563,43 @@ export class WorkflowRunWorkspaceService { }, }; } + + private async updateWorkflowRun({ + workflowRunId, + workspaceId, + partialUpdate, + }: { + workflowRunId: string; + workspaceId: string; + partialUpdate: QueryDeepPartialEntity; + }) { + const workflowRunRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowRun', + { shouldBypassPermissionChecks: true }, + ); + + const workflowRunToUpdate = await workflowRunRepository.findOneBy({ + id: workflowRunId, + }); + + if (!workflowRunToUpdate) { + throw new WorkflowRunException( + `workflowRun ${workflowRunId} not found`, + WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND, + ); + } + + await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); + + const updatedFields = Object.keys(partialUpdate); + + if (updatedFields.length > 0) { + await this.emitWorkflowRunUpdatedEvent({ + workflowRunBefore: workflowRunToUpdate, + updatedFields: updatedFields, + }); + } + } }