22 branches 2 (#13051)

This PR is purely technical, it does produces any functional change to
the user

- add Lock mecanism to run steps concurrently
- update `workflow-executor.workspace-service.ts` to handle multi branch
workflow execution
  - stop passing `context` through steps, it causes race condition issue
  - refactor a little bit
- simplify `workflow-run.workspace-service.ts` to prepare `output` and
`context` removal
- move workflowRun status computing from `run-workflow.job.ts` to
`workflow-executor.workspace-service.ts`

## NOTA BENE
When a code step depends of 2 parents like in this config (see image
below)

If the form is submitted before the "Code - 2s" step succeed, the branch
merge "Form" step is launched twice.
- once because form is submission Succeed resumes the workflow in an
asynchronous job
- the second time is when the asynchronous job is launched when "Code -
2s" is succeeded
- the merge "Form" step makes the workflow waiting for response to
trigger the resume in another job
- during that time, the first resume job is launched, running the merge
"Form" step again

This issue only occurs with branch workflows. It will be solved by
checking if the currentStepToExecute is already in a SUCCESS state or
not

<img width="505" alt="image"
src="https://github.com/user-attachments/assets/b73839a1-16fe-45e1-a0d9-3efa26ab4f8b"
/>
This commit is contained in:
martmull
2025-07-07 22:50:34 +02:00
committed by GitHub
parent 51d02c13bf
commit 2f7d8c76af
31 changed files with 877 additions and 524 deletions

View File

@ -0,0 +1,108 @@
import { Test, TestingModule } from '@nestjs/testing';
import { CacheLockService } from 'src/engine/core-modules/cache-lock/cache-lock.service';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
describe('CacheLockService', () => {
let service: CacheLockService;
let cacheStorageService: jest.Mocked<CacheStorageService>;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
CacheLockService,
{
provide: CacheStorageNamespace.EngineLock,
useValue: {
acquireLock: jest.fn(),
releaseLock: jest.fn(),
},
},
{
provide: CacheStorageService,
useValue: {
acquireLock: jest.fn(),
releaseLock: jest.fn(),
},
},
],
}).compile();
service = module.get<CacheLockService>(CacheLockService);
cacheStorageService = module.get(CacheStorageNamespace.EngineLock);
jest.spyOn(service, 'delay').mockResolvedValue(undefined);
});
it('should be defined', () => {
expect(service).toBeDefined();
});
it('should acquire the lock and execute the function', async () => {
cacheStorageService.acquireLock.mockResolvedValue(true);
cacheStorageService.releaseLock.mockResolvedValue(undefined);
const fn = jest.fn().mockResolvedValue('success');
const ttl = 100;
const result = await service.withLock(fn, 'key', {
ttl,
});
expect(result).toBe('success');
expect(fn).toHaveBeenCalled();
expect(cacheStorageService.acquireLock).toHaveBeenCalledTimes(1);
expect(cacheStorageService.acquireLock).toHaveBeenCalledWith('key', ttl);
expect(cacheStorageService.releaseLock).toHaveBeenCalledTimes(1);
expect(cacheStorageService.releaseLock).toHaveBeenCalledWith('key');
});
it('should throw an error if lock cannot be acquired after max retries', async () => {
cacheStorageService.acquireLock.mockResolvedValue(false);
const fn = jest.fn();
const ms = 1;
const maxRetries = 3;
await expect(
service.withLock(fn, 'key', { ms, maxRetries }),
).rejects.toThrow('Failed to acquire lock for key: key');
expect(cacheStorageService.acquireLock).toHaveBeenCalledTimes(maxRetries);
expect(fn).not.toHaveBeenCalled();
});
it('should retry before acquiring the lock', async () => {
const mockAcquireLock = cacheStorageService.acquireLock;
mockAcquireLock
.mockResolvedValueOnce(false)
.mockResolvedValueOnce(false)
.mockResolvedValueOnce(true);
const fn = jest.fn().mockResolvedValue('retried success');
const result = await service.withLock(fn, 'key', {
maxRetries: 5,
ms: 1,
});
expect(result).toBe('retried success');
expect(fn).toHaveBeenCalledTimes(1);
expect(mockAcquireLock).toHaveBeenCalledTimes(3);
expect(cacheStorageService.releaseLock).toHaveBeenCalledWith('key');
});
it('should release the lock even if the function throws', async () => {
cacheStorageService.acquireLock.mockResolvedValue(true);
cacheStorageService.releaseLock.mockResolvedValue(undefined);
const fn = jest.fn().mockRejectedValue(new Error('fail'));
await expect(service.withLock(fn, 'key')).rejects.toThrow('fail');
expect(fn).toHaveBeenCalled();
expect(cacheStorageService.releaseLock).toHaveBeenCalledWith('key');
});
});

View File

@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { CacheLockService } from 'src/engine/core-modules/cache-lock/cache-lock.service';
@Module({
imports: [],
providers: [CacheLockService],
exports: [CacheLockService],
})
export class CacheLockModule {}

View File

@ -0,0 +1,47 @@
import { Injectable } from '@nestjs/common';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
export type CacheLockOptions = {
ms?: number;
maxRetries?: number;
ttl?: number;
};
@Injectable()
export class CacheLockService {
constructor(
@InjectCacheStorage(CacheStorageNamespace.EngineLock)
private readonly cacheStorageService: CacheStorageService,
) {}
async delay(ms: number) {
return new Promise((res) => setTimeout(res, ms));
}
async withLock<T>(
fn: () => Promise<T>,
key: string,
options?: CacheLockOptions,
): Promise<T> {
const { ms = 50, maxRetries = 20, ttl = 500 } = options || {};
for (let attempt = 0; attempt < maxRetries; attempt++) {
const acquired = await this.cacheStorageService.acquireLock(key, ttl);
if (acquired) {
try {
return await fn();
} finally {
await this.cacheStorageService.releaseLock(key);
}
}
await this.delay(ms);
}
throw new Error(`Failed to acquire lock for key: ${key}`);
}
}

View File

@ -122,6 +122,29 @@ export class CacheStorageService {
} while (cursor !== 0);
}
async acquireLock(key: string, ttl = 1000): Promise<boolean> {
if (!this.isRedisCache()) {
throw new Error('acquireLock is only supported with Redis cache');
}
const redisClient = (this.cache as RedisCache).store.client;
const result = await redisClient.set(this.getKey(key), 'lock', {
NX: true,
PX: ttl,
});
return result === 'OK';
}
async releaseLock(key: string): Promise<void> {
if (!this.isRedisCache()) {
throw new Error('releaseLock is only supported with Redis cache');
}
await this.del(key);
}
private isRedisCache() {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
return (this.cache.store as any)?.name === 'redis';

View File

@ -3,5 +3,6 @@ export enum CacheStorageNamespace {
ModuleCalendar = 'module:calendar',
ModuleWorkflow = 'module:workflow',
EngineWorkspace = 'engine:workspace',
EngineLock = 'engine:lock',
EngineHealth = 'engine:health',
}

View File

@ -28,7 +28,7 @@ import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/f
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type';
import { WorkflowRunStepInfo } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
@ -43,7 +43,7 @@ export enum WorkflowRunStatus {
export type StepOutput = {
id: string;
output: WorkflowExecutorOutput;
output: WorkflowActionOutput;
};
export type WorkflowRunOutput = {
@ -51,7 +51,7 @@ export type WorkflowRunOutput = {
trigger: WorkflowTrigger;
steps: WorkflowAction[];
};
stepsOutput?: Record<string, WorkflowExecutorOutput>;
stepsOutput?: Record<string, WorkflowActionOutput>;
error?: string;
};

View File

@ -336,16 +336,10 @@ export class WorkflowVersionStepWorkspaceService {
},
};
const updatedContext = {
...workflowRun.context,
[stepId]: enrichedResponse,
};
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workspaceId,
workflowRunId,
stepOutput: newStepOutput,
context: updatedContext,
stepStatus: StepStatus.SUCCESS,
});

View File

@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import {
WorkflowStepExecutorException,
@ -19,7 +19,7 @@ import { UpdateRecordWorkflowAction } from 'src/modules/workflow/workflow-execut
import { WorkflowActionType } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
@Injectable()
export class WorkflowExecutorFactory {
export class WorkflowActionFactory {
constructor(
private readonly codeWorkflowAction: CodeWorkflowAction,
private readonly sendEmailWorkflowAction: SendEmailWorkflowAction,
@ -33,7 +33,7 @@ export class WorkflowExecutorFactory {
private readonly aiAgentWorkflowAction: AiAgentWorkflowAction,
) {}
get(stepType: WorkflowActionType): WorkflowExecutor {
get(stepType: WorkflowActionType): WorkflowAction {
switch (stepType) {
case WorkflowActionType.CODE:
return this.codeWorkflowAction;

View File

@ -0,0 +1,8 @@
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
export interface WorkflowAction {
execute(
workflowActionInput: WorkflowActionInput,
): Promise<WorkflowActionOutput>;
}

View File

@ -1,8 +0,0 @@
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
export interface WorkflowExecutor {
execute(
workflowExecutorInput: WorkflowExecutorInput,
): Promise<WorkflowExecutorOutput>;
}

View File

@ -0,0 +1,7 @@
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export type WorkflowActionInput = {
currentStepId: string;
steps: WorkflowAction[];
context: Record<string, unknown>;
};

View File

@ -1,4 +1,4 @@
export type WorkflowExecutorOutput = {
export type WorkflowActionOutput = {
result?: object;
error?: string;
pendingEvent?: boolean;

View File

@ -1,9 +1,12 @@
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export type WorkflowExecutorInput = {
currentStepId: string;
steps: WorkflowAction[];
context: Record<string, unknown>;
stepIds: string[];
workflowRunId: string;
attemptCount?: number;
workspaceId: string;
};
export type WorkflowBranchExecutorInput = {
stepId: string;
attemptCount?: number;
workflowRunId: string;
workspaceId: string;
};

View File

@ -0,0 +1,90 @@
import {
WorkflowAction,
WorkflowActionType,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils';
describe('canExecuteStep', () => {
const steps = [
{
id: 'step-1',
type: WorkflowActionType.CODE,
settings: {
errorHandlingOptions: {
continueOnFailure: { value: false },
retryOnFailure: { value: false },
},
},
nextStepIds: ['step-3'],
},
{
id: 'step-2',
type: WorkflowActionType.SEND_EMAIL,
settings: {
errorHandlingOptions: {
continueOnFailure: { value: false },
retryOnFailure: { value: false },
},
},
nextStepIds: ['step-3'],
},
{
id: 'step-3',
type: WorkflowActionType.SEND_EMAIL,
settings: {
errorHandlingOptions: {
continueOnFailure: { value: false },
retryOnFailure: { value: false },
},
},
nextStepIds: [],
},
] as WorkflowAction[];
it('should return true if all parents succeeded', () => {
const context = {
trigger: 'trigger result',
'step-1': 'step-1 result',
'step-2': 'step-2 result',
};
const result = canExecuteStep({ context, steps, stepId: 'step-3' });
expect(result).toBe(true);
});
it('should return false if one parent is not succeeded', () => {
expect(
canExecuteStep({
context: {
trigger: 'trigger result',
'step-2': 'step-2 result',
},
steps,
stepId: 'step-3',
}),
).toBe(false);
expect(
canExecuteStep({
context: {
trigger: 'trigger result',
'step-1': 'step-1 result',
},
steps,
stepId: 'step-3',
}),
).toBe(false);
expect(
canExecuteStep({
context: {
trigger: 'trigger result',
'step-1': {},
},
steps,
stepId: 'step-3',
}),
).toBe(false);
});
});

View File

@ -0,0 +1,23 @@
import { isDefined } from 'twenty-shared/utils';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export const canExecuteStep = ({
context,
stepId,
steps,
}: {
steps: WorkflowAction[];
context: Record<string, unknown>;
stepId: string;
}) => {
const parentSteps = steps.filter(
(parentStep) =>
isDefined(parentStep) && parentStep.nextStepIds?.includes(stepId),
);
// TODO use workflowRun.state to check if step status is not COMPLETED. Return false in this case
return parentSteps.every((parentStep) =>
Object.keys(context).includes(parentStep.id),
);
};

View File

@ -3,7 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { AIBillingService } from 'src/engine/core-modules/ai/services/ai-billing.service';
import { AgentExecutionService } from 'src/engine/metadata-modules/agent/agent-execution.service';
@ -16,13 +16,13 @@ import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { isWorkflowAiAgentAction } from './guards/is-workflow-ai-agent-action.guard';
@Injectable()
export class AiAgentWorkflowAction implements WorkflowExecutor {
export class AiAgentWorkflowAction implements WorkflowAction {
constructor(
private readonly agentExecutionService: AgentExecutionService,
private readonly aiBillingService: AIBillingService,
@ -34,7 +34,7 @@ export class AiAgentWorkflowAction implements WorkflowExecutor {
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
}: WorkflowActionInput): Promise<WorkflowActionOutput> {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {

View File

@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
@ -8,14 +8,14 @@ import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import { isWorkflowCodeAction } from 'src/modules/workflow/workflow-executor/workflow-actions/code/guards/is-workflow-code-action.guard';
import { WorkflowCodeActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/code/types/workflow-code-action-input.type';
@Injectable()
export class CodeWorkflowAction implements WorkflowExecutor {
export class CodeWorkflowAction implements WorkflowAction {
constructor(
private readonly serverlessFunctionService: ServerlessFunctionService,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
@ -25,7 +25,7 @@ export class CodeWorkflowAction implements WorkflowExecutor {
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
}: WorkflowActionInput): Promise<WorkflowActionOutput> {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {

View File

@ -1,20 +1,20 @@
import { Injectable } from '@nestjs/common';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import { isWorkflowFilterAction } from 'src/modules/workflow/workflow-executor/workflow-actions/filter/guards/is-workflow-filter-action.guard';
import { evaluateFilterConditions } from 'src/modules/workflow/workflow-executor/workflow-actions/filter/utils/evaluate-filter-conditions.util';
@Injectable()
export class FilterWorkflowAction implements WorkflowExecutor {
async execute(input: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
export class FilterWorkflowAction implements WorkflowAction {
async execute(input: WorkflowActionInput): Promise<WorkflowActionOutput> {
const { currentStepId, steps, context } = input;
const step = steps.find((step) => step.id === currentStepId);

View File

@ -1,21 +1,21 @@
import { Injectable } from '@nestjs/common';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { isWorkflowFormAction } from 'src/modules/workflow/workflow-executor/workflow-actions/form/guards/is-workflow-form-action.guard';
@Injectable()
export class FormWorkflowAction implements WorkflowExecutor {
export class FormWorkflowAction implements WorkflowAction {
async execute({
currentStepId,
steps,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
}: WorkflowActionInput): Promise<WorkflowActionOutput> {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {

View File

@ -3,26 +3,26 @@ import { Injectable } from '@nestjs/common';
import { isString } from '@sniptt/guards';
import axios, { AxiosRequestConfig } from 'axios';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import { isWorkflowHttpRequestAction } from './guards/is-workflow-http-request-action.guard';
import { WorkflowHttpRequestActionInput } from './types/workflow-http-request-action-input.type';
@Injectable()
export class HttpRequestWorkflowAction implements WorkflowExecutor {
export class HttpRequestWorkflowAction implements WorkflowAction {
async execute({
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
}: WorkflowActionInput): Promise<WorkflowActionOutput> {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {

View File

@ -5,7 +5,7 @@ import { JSDOM } from 'jsdom';
import { isDefined, isValidUuid } from 'twenty-shared/utils';
import { z } from 'zod';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
@ -15,8 +15,8 @@ import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import {
SendEmailActionException,
@ -30,7 +30,7 @@ export type WorkflowSendEmailStepOutputSchema = {
};
@Injectable()
export class SendEmailWorkflowAction implements WorkflowExecutor {
export class SendEmailWorkflowAction implements WorkflowAction {
private readonly logger = new Logger(SendEmailWorkflowAction.name);
constructor(
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
@ -78,7 +78,7 @@ export class SendEmailWorkflowAction implements WorkflowExecutor {
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
}: WorkflowActionInput): Promise<WorkflowActionOutput> {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {

View File

@ -4,7 +4,7 @@ import { InjectRepository } from '@nestjs/typeorm';
import { isDefined } from 'class-validator';
import { Repository } from 'typeorm';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { RecordPositionService } from 'src/engine/core-modules/record-position/services/record-position.service';
@ -19,8 +19,8 @@ import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import {
RecordCRUDActionException,
@ -30,7 +30,7 @@ import { isWorkflowCreateRecordAction } from 'src/modules/workflow/workflow-exec
import { WorkflowCreateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type';
@Injectable()
export class CreateRecordWorkflowAction implements WorkflowExecutor {
export class CreateRecordWorkflowAction implements WorkflowAction {
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
@InjectRepository(ObjectMetadataEntity, 'core')
@ -46,7 +46,7 @@ export class CreateRecordWorkflowAction implements WorkflowExecutor {
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
}: WorkflowActionInput): Promise<WorkflowActionOutput> {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {

View File

@ -5,7 +5,7 @@ import { isDefined } from 'class-validator';
import { isValidUuid } from 'twenty-shared/utils';
import { Repository } from 'typeorm';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
@ -16,8 +16,8 @@ import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import {
RecordCRUDActionException,
@ -27,7 +27,7 @@ import { isWorkflowDeleteRecordAction } from 'src/modules/workflow/workflow-exec
import { WorkflowDeleteRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type';
@Injectable()
export class DeleteRecordWorkflowAction implements WorkflowExecutor {
export class DeleteRecordWorkflowAction implements WorkflowAction {
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
@InjectRepository(ObjectMetadataEntity, 'core')
@ -40,7 +40,7 @@ export class DeleteRecordWorkflowAction implements WorkflowExecutor {
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
}: WorkflowActionInput): Promise<WorkflowActionOutput> {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {

View File

@ -9,7 +9,7 @@ import {
ObjectRecordOrderBy,
OrderByDirection,
} from 'src/engine/api/graphql/workspace-query-builder/interfaces/object-record.interface';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { GraphqlQueryParser } from 'src/engine/api/graphql/graphql-query-runner/graphql-query-parsers/graphql-query.parser';
import { ObjectMetadataItemWithFieldMaps } from 'src/engine/metadata-modules/types/object-metadata-item-with-field-maps';
@ -23,8 +23,8 @@ import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import {
RecordCRUDActionException,
@ -34,7 +34,7 @@ import { isWorkflowFindRecordsAction } from 'src/modules/workflow/workflow-execu
import { WorkflowFindRecordsActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type';
@Injectable()
export class FindRecordsWorkflowAction implements WorkflowExecutor {
export class FindRecordsWorkflowAction implements WorkflowAction {
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
@ -45,7 +45,7 @@ export class FindRecordsWorkflowAction implements WorkflowExecutor {
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
}: WorkflowActionInput): Promise<WorkflowActionOutput> {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {

View File

@ -5,7 +5,7 @@ import deepEqual from 'deep-equal';
import { isDefined, isValidUuid } from 'twenty-shared/utils';
import { Repository } from 'typeorm';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/interfaces/workflow-action.interface';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { objectRecordChangedValues } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-values';
@ -20,8 +20,8 @@ import {
WorkflowStepExecutorException,
WorkflowStepExecutorExceptionCode,
} from 'src/modules/workflow/workflow-executor/exceptions/workflow-step-executor.exception';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionInput } from 'src/modules/workflow/workflow-executor/types/workflow-action-input';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
import {
RecordCRUDActionException,
@ -31,7 +31,7 @@ import { isWorkflowUpdateRecordAction } from 'src/modules/workflow/workflow-exec
import { WorkflowUpdateRecordActionInput } from 'src/modules/workflow/workflow-executor/workflow-actions/record-crud/types/workflow-record-crud-action-input.type';
@Injectable()
export class UpdateRecordWorkflowAction implements WorkflowExecutor {
export class UpdateRecordWorkflowAction implements WorkflowAction {
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
@ -46,7 +46,7 @@ export class UpdateRecordWorkflowAction implements WorkflowExecutor {
currentStepId,
steps,
context,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
}: WorkflowActionInput): Promise<WorkflowActionOutput> {
const step = steps.find((step) => step.id === currentStepId);
if (!step) {

View File

@ -4,7 +4,7 @@ import { BillingModule } from 'src/engine/core-modules/billing/billing.module';
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory';
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
import { AiAgentActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/ai-agent/ai-agent-action.module';
import { CodeActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/code/code-action.module';
import { FilterActionModule } from 'src/modules/workflow/workflow-executor/workflow-actions/filter/filter-action.module';
@ -32,7 +32,7 @@ import { WorkflowRunModule } from 'src/modules/workflow/workflow-runner/workflow
providers: [
WorkflowExecutorWorkspaceService,
ScopedWorkspaceContextFactory,
WorkflowExecutorFactory,
WorkflowActionFactory,
],
exports: [WorkflowExecutorWorkspaceService],
})

View File

@ -4,9 +4,8 @@ import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/
import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant';
import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
import { BillingService } from 'src/engine/core-modules/billing/services/billing.service';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory';
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
import {
WorkflowAction,
WorkflowActionType,
@ -14,10 +13,26 @@ import {
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils';
jest.mock(
'src/modules/workflow/workflow-executor/utils/can-execute-step.utils',
() => {
const actual = jest.requireActual(
'src/modules/workflow/workflow-executor/utils/can-execute-step.utils',
);
return {
...actual,
canExecuteStep: jest.fn().mockReturnValue(true), // default behavior
};
},
);
describe('WorkflowExecutorWorkspaceService', () => {
let service: WorkflowExecutorWorkspaceService;
let workflowExecutorFactory: WorkflowExecutorFactory;
let workflowActionFactory: WorkflowActionFactory;
let workspaceEventEmitter: WorkspaceEventEmitter;
let workflowRunWorkspaceService: WorkflowRunWorkspaceService;
@ -29,22 +44,16 @@ describe('WorkflowExecutorWorkspaceService', () => {
emitCustomBatchEvent: jest.fn(),
};
const mockScopedWorkspaceContext = {
workspaceId: 'workspace-id',
};
const mockScopedWorkspaceContextFactory = {
create: jest.fn().mockReturnValue(mockScopedWorkspaceContext),
};
const mockWorkflowRunWorkspaceService = {
saveWorkflowRunState: jest.fn(),
endWorkflowRun: jest.fn(),
updateWorkflowRunStepStatus: jest.fn(),
saveWorkflowRunState: jest.fn(),
getWorkflowRun: jest.fn(),
};
const mockBillingService = {
isBillingEnabled: jest.fn(),
canBillMeteredProduct: jest.fn(),
isBillingEnabled: jest.fn().mockReturnValue(true),
canBillMeteredProduct: jest.fn().mockReturnValue(true),
};
beforeEach(async () => {
@ -54,7 +63,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
providers: [
WorkflowExecutorWorkspaceService,
{
provide: WorkflowExecutorFactory,
provide: WorkflowActionFactory,
useValue: {
get: jest.fn().mockReturnValue(mockWorkflowExecutor),
},
@ -63,10 +72,6 @@ describe('WorkflowExecutorWorkspaceService', () => {
provide: WorkspaceEventEmitter,
useValue: mockWorkspaceEventEmitter,
},
{
provide: ScopedWorkspaceContextFactory,
useValue: mockScopedWorkspaceContextFactory,
},
{
provide: WorkflowRunWorkspaceService,
useValue: mockWorkflowRunWorkspaceService,
@ -81,8 +86,8 @@ describe('WorkflowExecutorWorkspaceService', () => {
service = module.get<WorkflowExecutorWorkspaceService>(
WorkflowExecutorWorkspaceService,
);
workflowExecutorFactory = module.get<WorkflowExecutorFactory>(
WorkflowExecutorFactory,
workflowActionFactory = module.get<WorkflowActionFactory>(
WorkflowActionFactory,
);
workspaceEventEmitter = module.get<WorkspaceEventEmitter>(
WorkspaceEventEmitter,
@ -94,7 +99,8 @@ describe('WorkflowExecutorWorkspaceService', () => {
describe('execute', () => {
const mockWorkflowRunId = 'workflow-run-id';
const mockContext = { data: 'some-data' };
const mockWorkspaceId = 'workspace-id';
const mockContext = { trigger: 'trigger-result' };
const mockSteps = [
{
id: 'step-1',
@ -120,20 +126,9 @@ describe('WorkflowExecutorWorkspaceService', () => {
},
] as WorkflowAction[];
it('should return success when all steps are completed', async () => {
// No steps to execute
const result = await service.execute({
workflowRunId: mockWorkflowRunId,
currentStepId: 'step-2',
steps: mockSteps,
context: mockContext,
});
expect(result).toEqual({
result: {
success: true,
},
});
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({
output: { flow: { steps: mockSteps } },
context: mockContext,
});
it('should execute a step and continue to the next step on success', async () => {
@ -143,24 +138,22 @@ describe('WorkflowExecutorWorkspaceService', () => {
mockWorkflowExecutor.execute.mockResolvedValueOnce(mockStepResult);
const result = await service.execute({
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
expect(workflowActionFactory.get).toHaveBeenCalledWith(
WorkflowActionType.CODE,
);
expect(mockWorkflowExecutor.execute).toHaveBeenCalledWith({
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
});
// execute first step
expect(workflowExecutorFactory.get).toHaveBeenCalledWith(
WorkflowActionType.CODE,
);
expect(mockWorkflowExecutor.execute).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
attemptCount: 1,
});
expect(workspaceEventEmitter.emitCustomBatchEvent).toHaveBeenCalledWith(
BILLING_FEATURE_USED,
[
@ -171,9 +164,11 @@ describe('WorkflowExecutorWorkspaceService', () => {
],
'workspace-id',
);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
@ -182,9 +177,11 @@ describe('WorkflowExecutorWorkspaceService', () => {
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(2);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
@ -193,20 +190,12 @@ describe('WorkflowExecutorWorkspaceService', () => {
id: 'step-1',
output: mockStepResult,
},
context: {
data: 'some-data',
'step-1': {
stepOutput: 'success',
},
},
workspaceId: 'workspace-id',
stepStatus: StepStatus.SUCCESS,
});
expect(result).toEqual({ result: { success: true } });
// execute second step
expect(workflowExecutorFactory.get).toHaveBeenCalledWith(
expect(workflowActionFactory.get).toHaveBeenCalledWith(
WorkflowActionType.SEND_EMAIL,
);
});
@ -216,20 +205,18 @@ describe('WorkflowExecutorWorkspaceService', () => {
new Error('Step execution failed'),
);
const result = await service.execute({
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
expect(result).toEqual({
error: 'Step execution failed',
});
expect(workspaceEventEmitter.emitCustomBatchEvent).not.toHaveBeenCalled();
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
@ -238,9 +225,11 @@ describe('WorkflowExecutorWorkspaceService', () => {
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
@ -251,7 +240,6 @@ describe('WorkflowExecutorWorkspaceService', () => {
error: 'Step execution failed',
},
},
context: mockContext,
workspaceId: 'workspace-id',
stepStatus: StepStatus.FAILED,
});
@ -264,17 +252,16 @@ describe('WorkflowExecutorWorkspaceService', () => {
mockWorkflowExecutor.execute.mockResolvedValueOnce(mockPendingEvent);
const result = await service.execute({
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
expect(result).toEqual(mockPendingEvent);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
@ -283,9 +270,11 @@ describe('WorkflowExecutorWorkspaceService', () => {
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
@ -294,13 +283,12 @@ describe('WorkflowExecutorWorkspaceService', () => {
id: 'step-1',
output: mockPendingEvent,
},
context: mockContext,
workspaceId: 'workspace-id',
stepStatus: StepStatus.PENDING,
});
// No recursive call to execute should happen
expect(workflowExecutorFactory.get).not.toHaveBeenCalledWith(
expect(workflowActionFactory.get).not.toHaveBeenCalledWith(
WorkflowActionType.SEND_EMAIL,
);
});
@ -330,15 +318,19 @@ describe('WorkflowExecutorWorkspaceService', () => {
},
] as WorkflowAction[];
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValueOnce({
output: { flow: { steps: stepsWithContinueOnFailure } },
context: mockContext,
});
mockWorkflowExecutor.execute.mockResolvedValueOnce({
error: 'Step execution failed but continue',
});
const result = await service.execute({
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
currentStepId: 'step-1',
steps: stepsWithContinueOnFailure,
context: mockContext,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
expect(
@ -368,14 +360,12 @@ describe('WorkflowExecutorWorkspaceService', () => {
error: 'Step execution failed but continue',
},
},
context: mockContext,
workspaceId: 'workspace-id',
stepStatus: StepStatus.FAILED,
});
expect(result).toEqual({ result: { success: true } });
// execute second step
expect(workflowExecutorFactory.get).toHaveBeenCalledWith(
expect(workflowActionFactory.get).toHaveBeenCalledWith(
WorkflowActionType.SEND_EMAIL,
);
});
@ -394,122 +384,86 @@ describe('WorkflowExecutorWorkspaceService', () => {
},
] as WorkflowAction[];
mockWorkflowExecutor.execute.mockResolvedValueOnce({
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({
output: { flow: { steps: stepsWithRetryOnFailure } },
context: mockContext,
});
mockWorkflowExecutor.execute.mockResolvedValue({
error: 'Step execution failed, will retry',
});
await service.execute({
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
currentStepId: 'step-1',
steps: stepsWithRetryOnFailure,
context: mockContext,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
// Should call execute again with increased attemptCount
expect(workflowExecutorFactory.get).toHaveBeenCalledWith(
WorkflowActionType.CODE,
);
expect(workflowExecutorFactory.get).not.toHaveBeenCalledWith(
for (let attempt = 1; attempt <= 3; attempt++) {
expect(workflowActionFactory.get).toHaveBeenNthCalledWith(
attempt,
WorkflowActionType.CODE,
);
}
expect(workflowActionFactory.get).not.toHaveBeenCalledWith(
WorkflowActionType.SEND_EMAIL,
);
expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(2);
});
it('should stop retrying after MAX_RETRIES_ON_FAILURE', async () => {
const stepsWithRetryOnFailure = [
{
id: 'step-1',
type: WorkflowActionType.CODE,
settings: {
errorHandlingOptions: {
continueOnFailure: { value: false },
retryOnFailure: { value: true },
},
},
},
] as WorkflowAction[];
const errorOutput = {
error: 'Step execution failed, max retries reached',
};
mockWorkflowExecutor.execute.mockResolvedValueOnce(errorOutput);
const result = await service.execute({
workflowRunId: mockWorkflowRunId,
currentStepId: 'step-1',
steps: stepsWithRetryOnFailure,
context: mockContext,
attemptCount: 3, // MAX_RETRIES_ON_FAILURE is 3
});
// Should not retry anymore
expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepId: 'step-1',
workspaceId: 'workspace-id',
stepStatus: StepStatus.RUNNING,
});
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
stepOutput: {
id: 'step-1',
output: errorOutput,
},
context: mockContext,
workspaceId: 'workspace-id',
stepStatus: StepStatus.FAILED,
});
expect(result).toEqual(errorOutput);
});
it('should stop when billing validation fails', async () => {
mockBillingService.isBillingEnabled.mockReturnValueOnce(true);
mockBillingService.canBillMeteredProduct.mockReturnValueOnce(false);
const result = await service.execute({
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(1);
expect(workflowActionFactory.get).toHaveBeenCalledTimes(0);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledTimes(1);
expect(
workflowRunWorkspaceService.updateWorkflowRunStepStatus,
).not.toHaveBeenCalled();
expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledTimes(
1,
);
expect(
workflowRunWorkspaceService.saveWorkflowRunState,
).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
stepOutput: {
id: 'step-1',
output: {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
},
},
context: mockContext,
workspaceId: 'workspace-id',
stepStatus: StepStatus.FAILED,
});
expect(result).toEqual({
expect(workflowRunWorkspaceService.endWorkflowRun).toHaveBeenCalledWith({
workflowRunId: mockWorkflowRunId,
workspaceId: 'workspace-id',
status: WorkflowRunStatus.FAILED,
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
});
});
it('should return if step should not be executed', async () => {
(canExecuteStep as jest.Mock).mockReturnValueOnce(false);
await service.executeFromSteps({
workflowRunId: mockWorkflowRunId,
stepIds: ['step-1'],
workspaceId: mockWorkspaceId,
});
expect(workflowActionFactory.get).not.toHaveBeenCalled();
});
});
describe('sendWorkflowNodeRunEvent', () => {

View File

@ -2,112 +2,103 @@ import { Injectable } from '@nestjs/common';
import { isDefined } from 'twenty-shared/utils';
import { WorkflowExecutor } from 'src/modules/workflow/workflow-executor/interfaces/workflow-executor.interface';
import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant';
import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant';
import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
import { BillingProductKey } from 'src/engine/core-modules/billing/enums/billing-product-key.enum';
import { BillingService } from 'src/engine/core-modules/billing/services/billing.service';
import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
StepOutput,
WorkflowRunOutput,
WorkflowRunStatus,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowExecutorFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-executor.factory';
import { WorkflowExecutorInput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type';
import { WorkflowActionFactory } from 'src/modules/workflow/workflow-executor/factories/workflow-action.factory';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
import {
WorkflowTriggerException,
WorkflowTriggerExceptionCode,
} from 'src/modules/workflow/workflow-trigger/exceptions/workflow-trigger.exception';
import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
import {
WorkflowBranchExecutorInput,
WorkflowExecutorInput,
} from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils';
const MAX_RETRIES_ON_FAILURE = 3;
export type WorkflowExecutorState = {
stepsOutput: WorkflowRunOutput['stepsOutput'];
status: WorkflowRunStatus;
};
@Injectable()
export class WorkflowExecutorWorkspaceService implements WorkflowExecutor {
export class WorkflowExecutorWorkspaceService {
constructor(
private readonly workflowExecutorFactory: WorkflowExecutorFactory,
private readonly workflowActionFactory: WorkflowActionFactory,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
private readonly billingService: BillingService,
) {}
async execute({
currentStepId,
steps,
context,
async executeFromSteps({
stepIds,
workflowRunId,
workspaceId,
}: WorkflowExecutorInput) {
await Promise.all(
stepIds.map(async (stepIdToExecute) => {
await this.executeFromStep({
stepId: stepIdToExecute,
workflowRunId,
workspaceId,
});
}),
);
}
private async executeFromStep({
stepId,
attemptCount = 1,
workflowRunId,
}: WorkflowExecutorInput): Promise<WorkflowExecutorOutput> {
const step = steps.find((step) => step.id === currentStepId);
workspaceId,
}: WorkflowBranchExecutorInput) {
const workflowRunInfo = await this.getWorkflowRunInfoOrEndWorkflowRun({
stepId: stepId,
workflowRunId,
workspaceId,
});
if (!step) {
return {
error: 'Step not found',
};
if (!isDefined(workflowRunInfo)) {
return;
}
const workflowExecutor = this.workflowExecutorFactory.get(step.type);
const { stepToExecute, steps, context } = workflowRunInfo;
let actionOutput: WorkflowExecutorOutput;
const { workspaceId } = this.scopedWorkspaceContextFactory.create();
if (!workspaceId) {
throw new WorkflowTriggerException(
'No workspace id found',
WorkflowTriggerExceptionCode.INTERNAL_ERROR,
);
if (!canExecuteStep({ stepId: stepToExecute.id, steps, context })) {
return;
}
if (
this.billingService.isBillingEnabled() &&
!(await this.canBillWorkflowNodeExecution(workspaceId))
) {
const billingOutput = {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
};
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workspaceId,
const checkCanBillWorkflowNodeExecution =
await this.checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({
stepIdToExecute: stepToExecute.id,
workflowRunId,
stepOutput: {
id: step.id,
output: billingOutput,
},
context,
stepStatus: StepStatus.FAILED,
workspaceId,
});
return billingOutput;
if (!checkCanBillWorkflowNodeExecution) {
return;
}
const workflowAction = this.workflowActionFactory.get(stepToExecute.type);
let actionOutput: WorkflowActionOutput;
await this.workflowRunWorkspaceService.updateWorkflowRunStepStatus({
workflowRunId,
stepId: step.id,
stepId: stepToExecute.id,
workspaceId,
stepStatus: StepStatus.RUNNING,
});
try {
actionOutput = await workflowExecutor.execute({
currentStepId,
actionOutput = await workflowAction.execute({
currentStepId: stepId,
steps,
context,
attemptCount,
workflowRunId,
});
} catch (error) {
actionOutput = {
@ -120,7 +111,7 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor {
}
const stepOutput: StepOutput = {
id: step.id,
id: stepToExecute.id,
output: actionOutput,
};
@ -128,73 +119,145 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor {
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput,
context,
workspaceId,
stepStatus: StepStatus.PENDING,
});
return actionOutput;
return;
}
const actionOutputSuccess = isDefined(actionOutput.result);
const shouldContinue =
actionOutputSuccess ||
step.settings.errorHandlingOptions.continueOnFailure.value;
stepToExecute.settings.errorHandlingOptions.continueOnFailure.value;
if (shouldContinue) {
const updatedContext = isDefined(actionOutput.result)
? {
...context,
[step.id]: actionOutput.result,
}
: context;
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput,
context: updatedContext,
workspaceId,
stepStatus: isDefined(actionOutput.result)
? StepStatus.SUCCESS
: StepStatus.FAILED,
});
if (!isDefined(step.nextStepIds?.[0])) {
return actionOutput;
if (
!isDefined(stepToExecute.nextStepIds) ||
stepToExecute.nextStepIds.length === 0
) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.COMPLETED,
});
return;
}
// TODO: handle multiple next steps
return await this.execute({
await this.executeFromSteps({
stepIds: stepToExecute.nextStepIds,
workflowRunId,
currentStepId: step.nextStepIds[0],
steps,
context: updatedContext,
workspaceId,
});
return;
}
if (
step.settings.errorHandlingOptions.retryOnFailure.value &&
stepToExecute.settings.errorHandlingOptions.retryOnFailure.value &&
attemptCount < MAX_RETRIES_ON_FAILURE
) {
return await this.execute({
workflowRunId,
currentStepId,
steps,
context,
await this.executeFromStep({
stepId,
attemptCount: attemptCount + 1,
workflowRunId,
workspaceId,
});
return;
}
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workflowRunId,
stepOutput,
context,
workspaceId,
stepStatus: StepStatus.FAILED,
});
return actionOutput;
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: stepOutput.output.error,
});
}
private async getWorkflowRunInfoOrEndWorkflowRun({
stepId,
workflowRunId,
workspaceId,
}: {
stepId: string;
workflowRunId: string;
workspaceId: string;
}) {
const workflowRun = await this.workflowRunWorkspaceService.getWorkflowRun({
workflowRunId,
workspaceId,
});
if (!isDefined(workflowRun)) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: `WorkflowRun ${workflowRunId} not found`,
});
return;
}
const steps = workflowRun.output?.flow.steps;
const context = workflowRun.context;
if (!isDefined(steps)) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: 'Steps undefined',
});
return;
}
if (!isDefined(context)) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: 'Context not found',
});
return;
}
const stepToExecute = steps.find((step) => step.id === stepId);
if (!stepToExecute) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: 'Step not found',
});
return;
}
return { stepToExecute, steps, context };
}
private sendWorkflowNodeRunEvent(workspaceId: string) {
@ -210,10 +273,45 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor {
);
}
private async canBillWorkflowNodeExecution(workspaceId: string) {
return this.billingService.canBillMeteredProduct(
workspaceId,
BillingProductKey.WORKFLOW_NODE_EXECUTION,
);
private async checkCanBillWorkflowNodeExecutionOrEndWorkflowRun({
stepIdToExecute,
workflowRunId,
workspaceId,
}: {
stepIdToExecute: string;
workflowRunId: string;
workspaceId: string;
}) {
const canBillWorkflowNodeExecution =
!this.billingService.isBillingEnabled() ||
(await this.billingService.canBillMeteredProduct(
workspaceId,
BillingProductKey.WORKFLOW_NODE_EXECUTION,
));
if (!canBillWorkflowNodeExecution) {
const billingOutput = {
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
};
await this.workflowRunWorkspaceService.saveWorkflowRunState({
workspaceId,
workflowRunId,
stepOutput: {
id: stepIdToExecute,
output: billingOutput,
},
stepStatus: StepStatus.FAILED,
});
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE,
});
}
return canBillWorkflowNodeExecution;
}
}

View File

@ -1,5 +1,7 @@
import { Scope } from '@nestjs/common';
import { isDefined } from 'twenty-shared/utils';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
@ -9,7 +11,6 @@ import { ThrottlerService } from 'src/engine/core-modules/throttler/throttler.se
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service';
import {
WorkflowRunException,
@ -107,17 +108,6 @@ export class RunWorkflowJob {
await this.workflowRunWorkspaceService.startWorkflowRun({
workflowRunId,
workspaceId,
output: {
flow: {
trigger: workflowVersion.trigger,
steps: workflowVersion.steps,
},
stepsOutput: {
trigger: {
result: triggerPayload,
},
},
},
payload: triggerPayload,
});
@ -125,13 +115,9 @@ export class RunWorkflowJob {
const rootSteps = getRootSteps(workflowVersion.steps);
await this.executeWorkflow({
await this.workflowExecutorWorkspaceService.executeFromSteps({
stepIds: rootSteps.map((step) => step.id),
workflowRunId,
currentStepId: rootSteps[0].id,
steps: workflowVersion.steps,
context: workflowRun.context ?? {
trigger: triggerPayload,
},
workspaceId,
});
}
@ -169,9 +155,10 @@ export class RunWorkflowJob {
);
}
const nextStepId = lastExecutedStep.nextStepIds?.[0];
if (!nextStepId) {
if (
!isDefined(lastExecutedStep.nextStepIds) ||
lastExecutedStep.nextStepIds.length === 0
) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
@ -181,46 +168,10 @@ export class RunWorkflowJob {
return;
}
await this.executeWorkflow({
workflowRunId,
currentStepId: nextStepId,
steps: workflowRun.output?.flow?.steps ?? [],
context: workflowRun.context ?? {},
workspaceId,
});
}
private async executeWorkflow({
workflowRunId,
currentStepId,
steps,
context,
workspaceId,
}: {
workflowRunId: string;
currentStepId: string;
steps: WorkflowAction[];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
context: Record<string, any>;
workspaceId: string;
}) {
const { error, pendingEvent } =
await this.workflowExecutorWorkspaceService.execute({
workflowRunId,
currentStepId,
steps,
context,
});
if (pendingEvent) {
return;
}
await this.workflowRunWorkspaceService.endWorkflowRun({
await this.workflowExecutorWorkspaceService.executeFromSteps({
stepIds: lastExecutedStep.nextStepIds,
workflowRunId,
workspaceId,
status: error ? WorkflowRunStatus.FAILED : WorkflowRunStatus.COMPLETED,
error,
});
}

View File

@ -8,12 +8,14 @@ import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadat
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
import { CacheLockModule } from 'src/engine/core-modules/cache-lock/cache-lock.module';
@Module({
imports: [
WorkflowCommonModule,
NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'core'),
RecordPositionModule,
CacheLockModule,
MetricsModule,
],
providers: [WorkflowRunWorkspaceService, ScopedWorkspaceContextFactory],

View File

@ -4,6 +4,7 @@ import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { v4 } from 'uuid';
import { isDefined } from 'twenty-shared/utils';
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { objectRecordChangedValues } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-values';
@ -18,7 +19,6 @@ import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/worksp
import {
StepOutput,
WorkflowRunState,
WorkflowRunOutput,
WorkflowRunStatus,
WorkflowRunWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
@ -30,6 +30,7 @@ import {
} from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception';
import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { CacheLockService } from 'src/engine/core-modules/cache-lock/cache-lock.service';
@Injectable()
export class WorkflowRunWorkspaceService {
@ -42,6 +43,7 @@ export class WorkflowRunWorkspaceService {
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
private readonly recordPositionService: RecordPositionService,
private readonly metricsService: MetricsService,
private readonly cacheLockService: CacheLockService,
) {}
async createWorkflowRun({
@ -116,6 +118,8 @@ export class WorkflowRunWorkspaceService {
workspaceId,
});
const initState = this.getInitState(workflowVersion);
const workflowRun = workflowRunRepository.create({
id: workflowRunId ?? v4(),
name: `#${workflowRunCount + 1} - ${workflow.name}`,
@ -124,7 +128,11 @@ export class WorkflowRunWorkspaceService {
workflowId: workflow.id,
status,
position,
state: this.getInitState(workflowVersion),
state: initState,
output: {
...initState,
stepsOutput: {},
},
context,
});
@ -133,35 +141,31 @@ export class WorkflowRunWorkspaceService {
return workflowRun.id;
}
async startWorkflowRun({
async startWorkflowRun(params: {
workflowRunId: string;
workspaceId: string;
payload: object;
}) {
await this.cacheLockService.withLock(
async () => await this.startWorkflowRunWithoutLock(params),
params.workflowRunId,
);
}
private async startWorkflowRunWithoutLock({
workflowRunId,
workspaceId,
output,
payload,
}: {
workflowRunId: string;
workspaceId: string;
output: WorkflowRunOutput;
payload: object;
}) {
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
workspaceId,
'workflowRun',
{ shouldBypassPermissionChecks: true },
);
const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
workspaceId,
});
if (!workflowRunToUpdate) {
throw new WorkflowRunException(
'No workflow run to start',
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}
if (
workflowRunToUpdate.status !== WorkflowRunStatus.ENQUEUED &&
workflowRunToUpdate.status !== WorkflowRunStatus.NOT_STARTED
@ -175,29 +179,47 @@ export class WorkflowRunWorkspaceService {
const partialUpdate = {
status: WorkflowRunStatus.RUNNING,
startedAt: new Date().toISOString(),
output,
output: {
...workflowRunToUpdate.output,
stepsOutput: {
trigger: {
result: payload,
},
},
},
state: {
...workflowRunToUpdate.state,
stepInfos: {
...workflowRunToUpdate.state?.stepInfos,
trigger: {
...workflowRunToUpdate.state?.stepInfos.trigger,
status: StepStatus.SUCCESS,
result: payload,
},
},
},
context: payload
? {
trigger: payload,
}
: (workflowRunToUpdate.context ?? {}),
};
await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate);
await this.emitWorkflowRunUpdatedEvent({
workflowRunBefore: workflowRunToUpdate,
updatedFields: ['status', 'startedAt', 'output'],
});
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
}
async endWorkflowRun({
async endWorkflowRun(params: {
workflowRunId: string;
workspaceId: string;
status: WorkflowRunStatus;
error?: string;
}) {
await this.cacheLockService.withLock(
async () => await this.endWorkflowRunWithoutLock(params),
params.workflowRunId,
);
}
private async endWorkflowRunWithoutLock({
workflowRunId,
workspaceId,
status,
@ -208,29 +230,16 @@ export class WorkflowRunWorkspaceService {
status: WorkflowRunStatus;
error?: string;
}) {
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
workspaceId,
'workflowRun',
{ shouldBypassPermissionChecks: true },
);
const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
workspaceId,
});
if (!workflowRunToUpdate) {
throw new WorkflowRunException(
'No workflow run to end',
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}
const partialUpdate = {
status,
endedAt: new Date().toISOString(),
output: {
...(workflowRunToUpdate.output ?? {}),
...workflowRunToUpdate.output,
error,
},
state: {
@ -239,12 +248,7 @@ export class WorkflowRunWorkspaceService {
},
};
await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate);
await this.emitWorkflowRunUpdatedEvent({
workflowRunBefore: workflowRunToUpdate,
updatedFields: ['status', 'endedAt', 'output', 'state'],
});
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
await this.metricsService.incrementCounter({
key:
@ -255,7 +259,19 @@ export class WorkflowRunWorkspaceService {
});
}
async updateWorkflowRunStepStatus({
async updateWorkflowRunStepStatus(params: {
workflowRunId: string;
stepId: string;
workspaceId: string;
stepStatus: StepStatus;
}) {
await this.cacheLockService.withLock(
async () => await this.updateWorkflowRunStepStatusWithoutLock(params),
params.workflowRunId,
);
}
private async updateWorkflowRunStepStatusWithoutLock({
workflowRunId,
workspaceId,
stepId,
@ -266,24 +282,11 @@ export class WorkflowRunWorkspaceService {
workspaceId: string;
stepStatus: StepStatus;
}) {
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
workspaceId,
'workflowRun',
{ shouldBypassPermissionChecks: true },
);
const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
workspaceId,
});
if (!workflowRunToUpdate) {
throw new WorkflowRunException(
'No workflow run to save',
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}
const partialUpdate = {
state: {
...workflowRunToUpdate.state,
@ -297,46 +300,37 @@ export class WorkflowRunWorkspaceService {
},
};
await workflowRunRepository.update(workflowRunId, partialUpdate);
await this.emitWorkflowRunUpdatedEvent({
workflowRunBefore: workflowRunToUpdate,
updatedFields: ['state'],
});
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
}
async saveWorkflowRunState({
async saveWorkflowRunState(params: {
workflowRunId: string;
stepOutput: StepOutput;
workspaceId: string;
stepStatus: StepStatus;
}) {
await this.cacheLockService.withLock(
async () => await this.saveWorkflowRunStateWithoutLock(params),
params.workflowRunId,
);
}
private async saveWorkflowRunStateWithoutLock({
workflowRunId,
stepOutput,
workspaceId,
context,
stepStatus,
}: {
workflowRunId: string;
stepOutput: StepOutput;
workspaceId: string;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
context: Record<string, any>;
stepStatus: StepStatus;
}) {
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
workspaceId,
'workflowRun',
{ shouldBypassPermissionChecks: true },
);
const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
workspaceId,
});
if (!workflowRunToUpdate) {
throw new WorkflowRunException(
'No workflow run to save',
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}
const partialUpdate = {
output: {
flow: workflowRunToUpdate.output?.flow ?? {
@ -353,24 +347,38 @@ export class WorkflowRunWorkspaceService {
stepInfos: {
...workflowRunToUpdate.state?.stepInfos,
[stepOutput.id]: {
...(workflowRunToUpdate.state?.stepInfos[stepOutput.id] || {}),
result: stepOutput.output?.result,
error: stepOutput.output?.error,
status: stepStatus,
},
},
},
context,
...(stepStatus === StepStatus.SUCCESS
? {
context: {
...workflowRunToUpdate.context,
[stepOutput.id]: stepOutput.output.result,
},
}
: {}),
};
await workflowRunRepository.update(workflowRunId, partialUpdate);
await this.emitWorkflowRunUpdatedEvent({
workflowRunBefore: workflowRunToUpdate,
updatedFields: ['context', 'output', 'state'],
});
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
}
async updateWorkflowRunStep({
async updateWorkflowRunStep(params: {
workflowRunId: string;
step: WorkflowAction;
workspaceId: string;
}) {
await this.cacheLockService.withLock(
async () => await this.updateWorkflowRunStepWithoutLock(params),
params.workflowRunId,
);
}
private async updateWorkflowRunStepWithoutLock({
workflowRunId,
step,
workspaceId,
@ -379,24 +387,11 @@ export class WorkflowRunWorkspaceService {
step: WorkflowAction;
workspaceId: string;
}) {
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
workspaceId,
'workflowRun',
{ shouldBypassPermissionChecks: true },
);
const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
workflowRunId,
workspaceId,
});
if (!workflowRunToUpdate) {
throw new WorkflowRunException(
'No workflow run to update',
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}
if (
workflowRunToUpdate.status === WorkflowRunStatus.COMPLETED ||
workflowRunToUpdate.status === WorkflowRunStatus.FAILED
@ -428,11 +423,25 @@ export class WorkflowRunWorkspaceService {
},
};
await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate);
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
}
await this.emitWorkflowRunUpdatedEvent({
workflowRunBefore: workflowRunToUpdate,
updatedFields: ['output', 'state'],
async getWorkflowRun({
workflowRunId,
workspaceId,
}: {
workflowRunId: string;
workspaceId: string;
}): Promise<WorkflowRunWorkspaceEntity | null> {
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
workspaceId,
'workflowRun',
{ shouldBypassPermissionChecks: true },
);
return await workflowRunRepository.findOne({
where: { id: workflowRunId },
});
}
@ -443,15 +452,9 @@ export class WorkflowRunWorkspaceService {
workflowRunId: string;
workspaceId: string;
}): Promise<WorkflowRunWorkspaceEntity> {
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
workspaceId,
'workflowRun',
{ shouldBypassPermissionChecks: true },
);
const workflowRun = await workflowRunRepository.findOne({
where: { id: workflowRunId },
const workflowRun = await this.getWorkflowRun({
workflowRunId,
workspaceId,
});
if (!workflowRun) {
@ -560,4 +563,43 @@ export class WorkflowRunWorkspaceService {
},
};
}
private async updateWorkflowRun({
workflowRunId,
workspaceId,
partialUpdate,
}: {
workflowRunId: string;
workspaceId: string;
partialUpdate: QueryDeepPartialEntity<WorkflowRunWorkspaceEntity>;
}) {
const workflowRunRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
workspaceId,
'workflowRun',
{ shouldBypassPermissionChecks: true },
);
const workflowRunToUpdate = await workflowRunRepository.findOneBy({
id: workflowRunId,
});
if (!workflowRunToUpdate) {
throw new WorkflowRunException(
`workflowRun ${workflowRunId} not found`,
WorkflowRunExceptionCode.WORKFLOW_RUN_NOT_FOUND,
);
}
await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate);
const updatedFields = Object.keys(partialUpdate);
if (updatedFields.length > 0) {
await this.emitWorkflowRunUpdatedEvent({
workflowRunBefore: workflowRunToUpdate,
updatedFields: updatedFields,
});
}
}
}