22 branches 3 (#13181)

This PR does not produce any functional changes for our users. It
prepares the branches for workflows by:

- decommissioning `output` and `context` fields or `workflowRun` records
and use newly created `state` field from front-end and back-end
- use `stepStatus` computed by `back-end` in `front-end`
- add utils and types in `twenty-shared/workflow` (not completed, a
follow-up is scheduled
https://github.com/twentyhq/core-team-issues/issues/1211)
- add concurrency to `workflowQueue` message queue to avoid weird branch
execution when using forms in workflow branches
- add a WithLock decorator for better dev experience of
`CacheLockService.withLock` usage

Here is an example of such a workflow running (front branch display is
not yet done that's why it looks ugly) ->
https://discord.com/channels/1130383047699738754/1258024460238192691/1392897615171158098
This commit is contained in:
martmull
2025-07-16 11:16:04 +02:00
committed by GitHub
parent bf6330469b
commit 47386e92a3
47 changed files with 5124 additions and 6380 deletions

View File

@ -3,6 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm';
import { MoreThan, Repository } from 'typeorm';
import { Command, Option } from 'nest-commander';
import { isDefined } from 'twenty-shared/utils';
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
import {
ActiveOrSuspendedWorkspacesMigrationCommandRunner,
@ -15,10 +16,6 @@ import {
WorkflowRunOutput,
WorkflowRunWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import {
StepStatus,
WorkflowRunStepInfo,
} from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
const DEFAULT_CHUNK_SIZE = 500;
@ -121,7 +118,7 @@ export class MigrateWorkflowRunStatesCommand extends ActiveOrSuspendedWorkspaces
}
private buildRunStateFromOutput(output: WorkflowRunOutput): WorkflowRunState {
const stepInfos: Record<string, WorkflowRunStepInfo> = Object.fromEntries(
const stepInfos: WorkflowRunStepInfos = Object.fromEntries(
output.flow.steps.map((step) => {
const stepOutput = output.stepsOutput?.[step.id];
const status = stepOutput?.pendingEvent

View File

@ -0,0 +1,48 @@
import { Inject } from '@nestjs/common';
import {
CacheLockOptions,
CacheLockService,
} from 'src/engine/core-modules/cache-lock/cache-lock.service';
export const WithLock = (
lockKeyParamPath: string,
options?: CacheLockOptions,
): MethodDecorator => {
const injectCacheLockService = Inject(CacheLockService);
return function (target, propertyKey, descriptor: PropertyDescriptor) {
injectCacheLockService(target, 'cacheLockService');
const originalMethod = descriptor.value;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
descriptor.value = async function (...args: any[]) {
const self = this as { cacheLockService: CacheLockService };
if (!self.cacheLockService) {
throw new Error('cacheLockService not available on instance');
}
if (typeof args[0] !== 'object') {
throw new Error(
`You must use one object parameter to use @WithLock decorator. Received ${args}`,
);
}
const key = args[0][lockKeyParamPath];
if (typeof key !== 'string') {
throw new Error(
`Could not resolve lock key from path "${lockKeyParamPath}" on first argument`,
);
}
return await self.cacheLockService.withLock(
() => originalMethod.apply(self, args),
key,
options,
);
};
};
};

View File

@ -1,21 +1,11 @@
import { SetMetadata } from '@nestjs/common';
import { isString } from '@nestjs/common/utils/shared.utils';
import { PROCESS_METADATA } from 'src/engine/core-modules/message-queue/message-queue.constants';
export interface MessageQueueProcessOptions {
jobName: string;
concurrency?: number;
}
export function Process(jobName: string): MethodDecorator;
export function Process(options: MessageQueueProcessOptions): MethodDecorator;
export function Process(
nameOrOptions: string | MessageQueueProcessOptions,
): MethodDecorator {
const options = isString(nameOrOptions)
? { jobName: nameOrOptions }
: nameOrOptions;
return SetMetadata(PROCESS_METADATA, options || {});
export function Process(jobName: string): MethodDecorator {
return SetMetadata(PROCESS_METADATA, { jobName });
}

View File

@ -1,12 +1,9 @@
import { Scope, SetMetadata } from '@nestjs/common';
import { SCOPE_OPTIONS_METADATA } from '@nestjs/common/constants';
import { MessageQueueWorkerOptions } from 'src/engine/core-modules/message-queue/interfaces/message-queue-worker-options.interface';
import {
MessageQueue,
PROCESSOR_METADATA,
WORKER_METADATA,
} from 'src/engine/core-modules/message-queue/message-queue.constants';
export interface MessageQueueProcessorOptions {
@ -24,16 +21,7 @@ export interface MessageQueueProcessorOptions {
* Represents a worker that is able to process jobs from the queue.
* @param queueName name of the queue to process
*/
export function Processor(queueName: string): ClassDecorator;
/**
* Represents a worker that is able to process jobs from the queue.
* @param queueName name of the queue to process
* @param workerOptions additional worker options
*/
export function Processor(
queueName: string,
workerOptions: MessageQueueWorkerOptions,
): ClassDecorator;
export function Processor(queueName: MessageQueue): ClassDecorator;
/**
* Represents a worker that is able to process jobs from the queue.
* @param processorOptions processor options
@ -41,21 +29,11 @@ export function Processor(
export function Processor(
processorOptions: MessageQueueProcessorOptions,
): ClassDecorator;
/**
* Represents a worker that is able to process jobs from the queue.
* @param processorOptions processor options (Nest-specific)
* @param workerOptions additional Bull worker options
*/
export function Processor(
processorOptions: MessageQueueProcessorOptions,
workerOptions: MessageQueueWorkerOptions,
): ClassDecorator;
export function Processor(
queueNameOrOptions?: string | MessageQueueProcessorOptions,
maybeWorkerOptions?: MessageQueueWorkerOptions,
queueNameOrOptions: string | MessageQueueProcessorOptions,
): ClassDecorator {
const options =
queueNameOrOptions && typeof queueNameOrOptions === 'object'
typeof queueNameOrOptions === 'object'
? queueNameOrOptions
: { queueName: queueNameOrOptions };
@ -63,7 +41,5 @@ export function Processor(
return (target: Function) => {
SetMetadata(SCOPE_OPTIONS_METADATA, options)(target);
SetMetadata(PROCESSOR_METADATA, options)(target);
maybeWorkerOptions &&
SetMetadata(WORKER_METADATA, maybeWorkerOptions)(target);
};
}

View File

@ -1,6 +1,5 @@
export const PROCESSOR_METADATA = Symbol('message-queue:processor_metadata');
export const PROCESS_METADATA = Symbol('message-queue:process_metadata');
export const WORKER_METADATA = Symbol('bullmq:worker_metadata');
export const QUEUE_DRIVER = Symbol('message-queue:queue_driver');
export enum MessageQueue {
@ -13,11 +12,8 @@ export enum MessageQueue {
contactCreationQueue = 'contact-creation-queue',
billingQueue = 'billing-queue',
workspaceQueue = 'workspace-queue',
recordPositionBackfillQueue = 'record-position-backfill-queue',
entityEventsToDbQueue = 'entity-events-to-db-queue',
testQueue = 'test-queue',
workflowQueue = 'workflow-queue',
serverlessFunctionQueue = 'serverless-function-queue',
deleteCascadeQueue = 'delete-cascade-queue',
subscriptionsQueue = 'subscriptions-queue',
}

View File

@ -1,5 +1,6 @@
import { msg } from '@lingui/core/macro';
import { FieldMetadataType } from 'twenty-shared/types';
import { WorkflowRunStepInfos } from 'twenty-shared/workflow';
import { RelationOnDeleteAction } from 'src/engine/metadata-modules/field-metadata/interfaces/relation-on-delete-action.interface';
import { RelationType } from 'src/engine/metadata-modules/field-metadata/interfaces/relation-type.interface';
@ -29,7 +30,6 @@ import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-o
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import { WorkflowActionOutput } from 'src/modules/workflow/workflow-executor/types/workflow-action-output.type';
import { WorkflowRunStepInfo } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type';
@ -60,7 +60,7 @@ export type WorkflowRunState = {
trigger: WorkflowTrigger;
steps: WorkflowAction[];
};
stepInfos: Record<string, WorkflowRunStepInfo>;
stepInfos: WorkflowRunStepInfos;
workflowRunError?: string;
};

View File

@ -6,6 +6,7 @@ import { FieldMetadataType } from 'twenty-shared/types';
import { isDefined, isValidUuid } from 'twenty-shared/utils';
import { Repository } from 'typeorm';
import { v4 } from 'uuid';
import { StepStatus } from 'twenty-shared/workflow';
import { BASE_TYPESCRIPT_PROJECT_INPUT_SCHEMA } from 'src/engine/core-modules/serverless/drivers/constants/base-typescript-project-input-schema';
import { CreateWorkflowVersionStepInput } from 'src/engine/core-modules/workflow/dtos/create-workflow-version-step-input.dto';
@ -27,7 +28,6 @@ import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/work
import { WorkflowSchemaWorkspaceService } from 'src/modules/workflow/workflow-builder/workflow-schema/workflow-schema.workspace-service';
import { insertStep } from 'src/modules/workflow/workflow-builder/workflow-step/utils/insert-step';
import { removeStep } from 'src/modules/workflow/workflow-builder/workflow-step/utils/remove-step';
import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
import { BaseWorkflowActionSettings } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action-settings.type';
import {
WorkflowAction,
@ -302,7 +302,7 @@ export class WorkflowVersionStepWorkspaceService {
workspaceId,
});
const step = workflowRun.output?.flow?.steps?.find(
const step = workflowRun.state?.flow?.steps?.find(
(step) => step.id === stepId,
);

View File

@ -1,13 +0,0 @@
export enum StepStatus {
NOT_STARTED = 'NOT_STARTED',
RUNNING = 'RUNNING',
SUCCESS = 'SUCCESS',
FAILED = 'FAILED',
PENDING = 'PENDING',
}
export type WorkflowRunStepInfo = {
result?: object;
error?: string;
status: StepStatus;
};

View File

@ -1,8 +1,10 @@
import { StepStatus } from 'twenty-shared/workflow';
import {
WorkflowAction,
WorkflowActionType,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
describe('canExecuteStep', () => {
const steps = [
@ -42,13 +44,19 @@ describe('canExecuteStep', () => {
] as WorkflowAction[];
it('should return true if all parents succeeded', () => {
const context = {
trigger: 'trigger result',
'step-1': 'step-1 result',
'step-2': 'step-2 result',
const stepInfos = {
'step-1': {
status: StepStatus.SUCCESS,
},
'step-2': {
status: StepStatus.SUCCESS,
},
'step-3': {
status: StepStatus.NOT_STARTED,
},
};
const result = canExecuteStep({ context, steps, stepId: 'step-3' });
const result = canExecuteStep({ stepInfos, steps, stepId: 'step-3' });
expect(result).toBe(true);
});
@ -56,9 +64,16 @@ describe('canExecuteStep', () => {
it('should return false if one parent is not succeeded', () => {
expect(
canExecuteStep({
context: {
trigger: 'trigger result',
'step-2': 'step-2 result',
stepInfos: {
'step-1': {
status: StepStatus.NOT_STARTED,
},
'step-2': {
status: StepStatus.SUCCESS,
},
'step-3': {
status: StepStatus.NOT_STARTED,
},
},
steps,
stepId: 'step-3',
@ -67,9 +82,16 @@ describe('canExecuteStep', () => {
expect(
canExecuteStep({
context: {
trigger: 'trigger result',
'step-1': 'step-1 result',
stepInfos: {
'step-1': {
status: StepStatus.SUCCESS,
},
'step-2': {
status: StepStatus.NOT_STARTED,
},
'step-3': {
status: StepStatus.NOT_STARTED,
},
},
steps,
stepId: 'step-3',
@ -78,9 +100,90 @@ describe('canExecuteStep', () => {
expect(
canExecuteStep({
context: {
trigger: 'trigger result',
'step-1': {},
stepInfos: {
'step-1': {
status: StepStatus.NOT_STARTED,
},
'step-2': {
status: StepStatus.NOT_STARTED,
},
'step-3': {
status: StepStatus.NOT_STARTED,
},
},
steps,
stepId: 'step-3',
}),
).toBe(false);
});
it('should return false if step has already ran', () => {
expect(
canExecuteStep({
stepInfos: {
'step-1': {
status: StepStatus.SUCCESS,
},
'step-2': {
status: StepStatus.SUCCESS,
},
'step-3': {
status: StepStatus.SUCCESS,
},
},
steps,
stepId: 'step-3',
}),
).toBe(false);
expect(
canExecuteStep({
stepInfos: {
'step-1': {
status: StepStatus.SUCCESS,
},
'step-2': {
status: StepStatus.SUCCESS,
},
'step-3': {
status: StepStatus.PENDING,
},
},
steps,
stepId: 'step-3',
}),
).toBe(false);
expect(
canExecuteStep({
stepInfos: {
'step-1': {
status: StepStatus.SUCCESS,
},
'step-2': {
status: StepStatus.SUCCESS,
},
'step-3': {
status: StepStatus.FAILED,
},
},
steps,
stepId: 'step-3',
}),
).toBe(false);
expect(
canExecuteStep({
stepInfos: {
'step-1': {
status: StepStatus.SUCCESS,
},
'step-2': {
status: StepStatus.SUCCESS,
},
'step-3': {
status: StepStatus.RUNNING,
},
},
steps,
stepId: 'step-3',

View File

@ -1,23 +1,30 @@
import { isDefined } from 'twenty-shared/utils';
import { StepStatus, WorkflowRunStepInfos } from 'twenty-shared/workflow';
import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
export const canExecuteStep = ({
context,
stepId,
steps,
stepInfos,
}: {
steps: WorkflowAction[];
context: Record<string, unknown>;
stepInfos: WorkflowRunStepInfos;
stepId: string;
}) => {
if (
isDefined(stepInfos[stepId]?.status) &&
stepInfos[stepId].status !== StepStatus.NOT_STARTED
) {
return false;
}
const parentSteps = steps.filter(
(parentStep) =>
isDefined(parentStep) && parentStep.nextStepIds?.includes(stepId),
);
// TODO use workflowRun.state to check if step status is not COMPLETED. Return false in this case
return parentSteps.every((parentStep) =>
Object.keys(context).includes(parentStep.id),
return parentSteps.every(
(parentStep) => stepInfos[parentStep.id]?.status === StepStatus.SUCCESS,
);
};

View File

@ -1,5 +1,7 @@
import { Test, TestingModule } from '@nestjs/testing';
import { getWorkflowRunContext, StepStatus } from 'twenty-shared/workflow';
import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant';
import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant';
import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
@ -12,15 +14,14 @@ import {
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
import { WorkflowRunStatus } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
jest.mock(
'src/modules/workflow/workflow-executor/utils/can-execute-step.utils',
'src/modules/workflow/workflow-executor/utils/can-execute-step.util',
() => {
const actual = jest.requireActual(
'src/modules/workflow/workflow-executor/utils/can-execute-step.utils',
'src/modules/workflow/workflow-executor/utils/can-execute-step.util',
);
return {
@ -100,7 +101,6 @@ describe('WorkflowExecutorWorkspaceService', () => {
describe('execute', () => {
const mockWorkflowRunId = 'workflow-run-id';
const mockWorkspaceId = 'workspace-id';
const mockContext = { trigger: 'trigger-result' };
const mockSteps = [
{
id: 'step-1',
@ -125,10 +125,12 @@ describe('WorkflowExecutorWorkspaceService', () => {
nextStepIds: [],
},
] as WorkflowAction[];
const mockStepInfos = {
trigger: { result: {}, status: StepStatus.SUCCESS },
};
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({
output: { flow: { steps: mockSteps } },
context: mockContext,
state: { flow: { steps: mockSteps }, stepInfos: mockStepInfos },
});
it('should execute a step and continue to the next step on success', async () => {
@ -151,7 +153,7 @@ describe('WorkflowExecutorWorkspaceService', () => {
expect(mockWorkflowExecutor.execute).toHaveBeenCalledWith({
currentStepId: 'step-1',
steps: mockSteps,
context: mockContext,
context: getWorkflowRunContext(mockStepInfos),
});
expect(workspaceEventEmitter.emitCustomBatchEvent).toHaveBeenCalledWith(
@ -319,8 +321,10 @@ describe('WorkflowExecutorWorkspaceService', () => {
] as WorkflowAction[];
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValueOnce({
output: { flow: { steps: stepsWithContinueOnFailure } },
context: mockContext,
state: {
flow: { steps: stepsWithContinueOnFailure },
stepInfos: mockStepInfos,
},
});
mockWorkflowExecutor.execute.mockResolvedValueOnce({
@ -385,8 +389,10 @@ describe('WorkflowExecutorWorkspaceService', () => {
] as WorkflowAction[];
mockWorkflowRunWorkspaceService.getWorkflowRun.mockReturnValue({
output: { flow: { steps: stepsWithRetryOnFailure } },
context: mockContext,
state: {
flow: { steps: stepsWithRetryOnFailure },
stepInfos: mockStepInfos,
},
});
mockWorkflowExecutor.execute.mockResolvedValue({

View File

@ -1,6 +1,7 @@
import { Injectable } from '@nestjs/common';
import { isDefined } from 'twenty-shared/utils';
import { getWorkflowRunContext, StepStatus } from 'twenty-shared/workflow';
import { BILLING_FEATURE_USED } from 'src/engine/core-modules/billing/constants/billing-feature-used.constant';
import { BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE } from 'src/engine/core-modules/billing/constants/billing-workflow-execution-error-message.constant';
@ -19,8 +20,7 @@ import {
WorkflowBranchExecutorInput,
WorkflowExecutorInput,
} from 'src/modules/workflow/workflow-executor/types/workflow-executor-input';
import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.utils';
import { canExecuteStep } from 'src/modules/workflow/workflow-executor/utils/can-execute-step.util';
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
const MAX_RETRIES_ON_FAILURE = 3;
@ -57,7 +57,7 @@ export class WorkflowExecutorWorkspaceService {
workspaceId,
}: WorkflowBranchExecutorInput) {
const workflowRunInfo = await this.getWorkflowRunInfoOrEndWorkflowRun({
stepId: stepId,
stepId,
workflowRunId,
workspaceId,
});
@ -66,9 +66,9 @@ export class WorkflowExecutorWorkspaceService {
return;
}
const { stepToExecute, steps, context } = workflowRunInfo;
const { stepToExecute, steps, stepInfos } = workflowRunInfo;
if (!canExecuteStep({ stepId: stepToExecute.id, steps, context })) {
if (!canExecuteStep({ stepId, steps, stepInfos })) {
return;
}
@ -98,7 +98,7 @@ export class WorkflowExecutorWorkspaceService {
actionOutput = await workflowAction.execute({
currentStepId: stepId,
steps,
context,
context: getWorkflowRunContext(stepInfos),
});
} catch (error) {
actionOutput = {
@ -219,31 +219,18 @@ export class WorkflowExecutorWorkspaceService {
return;
}
const steps = workflowRun.output?.flow.steps;
const context = workflowRun.context;
if (!isDefined(steps)) {
if (!isDefined(workflowRun?.state)) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: 'Steps undefined',
error: `WorkflowRun ${workflowRunId} doesn't have any state`,
});
return;
}
if (!isDefined(context)) {
await this.workflowRunWorkspaceService.endWorkflowRun({
workflowRunId,
workspaceId,
status: WorkflowRunStatus.FAILED,
error: 'Context not found',
});
return;
}
const steps = workflowRun.state.flow.steps;
const stepToExecute = steps.find((step) => step.id === stepId);
@ -258,7 +245,11 @@ export class WorkflowExecutorWorkspaceService {
return;
}
return { stepToExecute, steps, context };
return {
stepToExecute,
steps,
stepInfos: workflowRun.state.stepInfos,
};
}
private sendWorkflowNodeRunEvent(workspaceId: string) {

View File

@ -144,7 +144,7 @@ export class RunWorkflowJob {
);
}
const lastExecutedStep = workflowRun.output?.flow?.steps?.find(
const lastExecutedStep = workflowRun.state?.flow?.steps?.find(
(step) => step.id === lastExecutedStepId,
);

View File

@ -5,6 +5,7 @@ import { Repository } from 'typeorm';
import { v4 } from 'uuid';
import { isDefined } from 'twenty-shared/utils';
import { QueryDeepPartialEntity } from 'typeorm/query-builder/QueryPartialEntity';
import { StepStatus } from 'twenty-shared/workflow';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { objectRecordChangedValues } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-values';
@ -28,9 +29,8 @@ import {
WorkflowRunException,
WorkflowRunExceptionCode,
} from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception';
import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type';
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { CacheLockService } from 'src/engine/core-modules/cache-lock/cache-lock.service';
import { WithLock } from 'src/engine/core-modules/cache-lock/with-lock.decorator';
@Injectable()
export class WorkflowRunWorkspaceService {
@ -43,7 +43,6 @@ export class WorkflowRunWorkspaceService {
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
private readonly recordPositionService: RecordPositionService,
private readonly metricsService: MetricsService,
private readonly cacheLockService: CacheLockService,
) {}
async createWorkflowRun({
@ -141,18 +140,8 @@ export class WorkflowRunWorkspaceService {
return workflowRun.id;
}
async startWorkflowRun(params: {
workflowRunId: string;
workspaceId: string;
payload: object;
}) {
await this.cacheLockService.withLock(
async () => await this.startWorkflowRunWithoutLock(params),
params.workflowRunId,
);
}
private async startWorkflowRunWithoutLock({
@WithLock('workflowRunId')
async startWorkflowRun({
workflowRunId,
workspaceId,
payload,
@ -207,19 +196,8 @@ export class WorkflowRunWorkspaceService {
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
}
async endWorkflowRun(params: {
workflowRunId: string;
workspaceId: string;
status: WorkflowRunStatus;
error?: string;
}) {
await this.cacheLockService.withLock(
async () => await this.endWorkflowRunWithoutLock(params),
params.workflowRunId,
);
}
private async endWorkflowRunWithoutLock({
@WithLock('workflowRunId')
async endWorkflowRun({
workflowRunId,
workspaceId,
status,
@ -259,19 +237,8 @@ export class WorkflowRunWorkspaceService {
});
}
async updateWorkflowRunStepStatus(params: {
workflowRunId: string;
stepId: string;
workspaceId: string;
stepStatus: StepStatus;
}) {
await this.cacheLockService.withLock(
async () => await this.updateWorkflowRunStepStatusWithoutLock(params),
params.workflowRunId,
);
}
private async updateWorkflowRunStepStatusWithoutLock({
@WithLock('workflowRunId')
async updateWorkflowRunStepStatus({
workflowRunId,
workspaceId,
stepId,
@ -303,19 +270,8 @@ export class WorkflowRunWorkspaceService {
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
}
async saveWorkflowRunState(params: {
workflowRunId: string;
stepOutput: StepOutput;
workspaceId: string;
stepStatus: StepStatus;
}) {
await this.cacheLockService.withLock(
async () => await this.saveWorkflowRunStateWithoutLock(params),
params.workflowRunId,
);
}
private async saveWorkflowRunStateWithoutLock({
@WithLock('workflowRunId')
async saveWorkflowRunState({
workflowRunId,
stepOutput,
workspaceId,
@ -367,18 +323,8 @@ export class WorkflowRunWorkspaceService {
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
}
async updateWorkflowRunStep(params: {
workflowRunId: string;
step: WorkflowAction;
workspaceId: string;
}) {
await this.cacheLockService.withLock(
async () => await this.updateWorkflowRunStepWithoutLock(params),
params.workflowRunId,
);
}
private async updateWorkflowRunStepWithoutLock({
@WithLock('workflowRunId')
async updateWorkflowRunStep({
workflowRunId,
step,
workspaceId,