diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/1-1/1-1-migrate-workflow-run-state.command.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/1-1/1-1-migrate-workflow-run-state.command.ts new file mode 100644 index 000000000..76f1f9bab --- /dev/null +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/1-1/1-1-migrate-workflow-run-state.command.ts @@ -0,0 +1,157 @@ +import { InjectRepository } from '@nestjs/typeorm'; + +import { MoreThan, Repository } from 'typeorm'; +import { Command, Option } from 'nest-commander'; +import { isDefined } from 'twenty-shared/utils'; + +import { + ActiveOrSuspendedWorkspacesMigrationCommandRunner, + RunOnWorkspaceArgs, +} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner'; +import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { + WorkflowRunState, + WorkflowRunOutput, + WorkflowRunWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity'; +import { + StepStatus, + WorkflowRunStepInfo, +} from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; + +const DEFAULT_CHUNK_SIZE = 500; + +@Command({ + name: 'upgrade:1-1:migrate-workflow-run-state', + description: 'Migrate state column in workflow run records', +}) +export class MigrateWorkflowRunStatesCommand extends ActiveOrSuspendedWorkspacesMigrationCommandRunner { + private afterDate: string | undefined; + private chunkSize = DEFAULT_CHUNK_SIZE; + + constructor( + @InjectRepository(Workspace, 'core') + protected readonly workspaceRepository: Repository, + protected readonly twentyORMGlobalManager: TwentyORMGlobalManager, + ) { + super(workspaceRepository, twentyORMGlobalManager); + } + + @Option({ + flags: '--after-date [after_date]', + description: 'Only select records after this date (YYYY-MM-DD).', + required: false, + }) + parseAfterDate(val: string): string | undefined { + const date = new Date(val); + + if (isNaN(date.getTime())) { + throw new Error(`Invalid date format: ${val}`); + } + + const afterDate = date.toISOString(); + + this.afterDate = afterDate; + + return afterDate; + } + + @Option({ + flags: '--chunk-size [chunk_size]', + description: + 'Split workflowRuns into chunks for each workspaces (default 500)', + required: false, + }) + parseChunkSize(val: number): number { + if (isNaN(val) || val <= 0) { + throw new Error(`Invalid chunk size: ${val}. Should be greater than 0`); + } + + this.chunkSize = val; + + return this.chunkSize; + } + + override async runOnWorkspace({ + workspaceId, + }: RunOnWorkspaceArgs): Promise { + const workflowRunRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + workspaceId, + 'workflowRun', + { shouldBypassPermissionChecks: true }, + ); + + const workflowRunCount = await workflowRunRepository.count(); + + const chunkCount = Math.ceil(workflowRunCount / this.chunkSize); + + this.logger.log( + `Migrate ${workflowRunCount} workflowRun state in ${chunkCount} chunks of size ${this.chunkSize}`, + ); + + for (let offset = 0; offset < chunkCount; offset += 1) { + this.logger.log(`- Proceeding chunk ${offset + 1}/${chunkCount}`); + + const findOption = isDefined(this.afterDate) + ? { where: { startedAt: MoreThan(this.afterDate) } } + : {}; + + const workflowRuns = await workflowRunRepository.find({ + ...findOption, + skip: offset * this.chunkSize, + take: this.chunkSize, + }); + + for (const workflowRun of workflowRuns) { + const output = workflowRun.output; + + if (!isDefined(output)) { + continue; + } + + const state = this.buildRunStateFromOutput(output); + + await workflowRunRepository.update(workflowRun.id, { + state, + }); + } + } + } + + private buildRunStateFromOutput(output: WorkflowRunOutput): WorkflowRunState { + const stepInfos: Record = Object.fromEntries( + output.flow.steps.map((step) => { + const stepOutput = output.stepsOutput?.[step.id]; + const status = stepOutput?.pendingEvent + ? StepStatus.PENDING + : stepOutput?.error + ? StepStatus.FAILED + : stepOutput?.result + ? StepStatus.SUCCESS + : StepStatus.NOT_STARTED; + + return [ + step.id, + { + result: stepOutput?.result, + error: stepOutput?.error, + status, + }, + ]; + }), + ); + + stepInfos['trigger'] = { + result: output?.stepsOutput?.trigger?.result, + status: StepStatus.SUCCESS, + }; + + return { + flow: output?.flow, + workflowRunError: output?.error, + stepInfos, + }; + } +} diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/1-1/1-1-upgrade-version-command.module.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/1-1/1-1-upgrade-version-command.module.ts index df7db703c..d7699d0fd 100644 --- a/packages/twenty-server/src/database/commands/upgrade-version-command/1-1/1-1-upgrade-version-command.module.ts +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/1-1/1-1-upgrade-version-command.module.ts @@ -14,6 +14,7 @@ import { WorkspaceMetadataVersionModule } from 'src/engine/metadata-modules/work import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; import { WorkspaceHealthModule } from 'src/engine/workspace-manager/workspace-health/workspace-health.module'; import { WorkspaceMigrationRunnerModule } from 'src/engine/workspace-manager/workspace-migration-runner/workspace-migration-runner.module'; +import { MigrateWorkflowRunStatesCommand } from 'src/database/commands/upgrade-version-command/1-1/1-1-migrate-workflow-run-state.command'; @Module({ imports: [ @@ -37,10 +38,12 @@ import { WorkspaceMigrationRunnerModule } from 'src/engine/workspace-manager/wor providers: [ FixUpdateStandardFieldsIsLabelSyncedWithName, FixSchemaArrayTypeCommand, + MigrateWorkflowRunStatesCommand, ], exports: [ FixUpdateStandardFieldsIsLabelSyncedWithName, FixSchemaArrayTypeCommand, + MigrateWorkflowRunStatesCommand, ], }) export class V1_1_UpgradeVersionCommandModule {} diff --git a/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts b/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts index d887ad51e..75116b556 100644 --- a/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts +++ b/packages/twenty-server/src/database/commands/upgrade-version-command/upgrade.command.ts @@ -29,6 +29,7 @@ import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; import { SyncWorkspaceMetadataCommand } from 'src/engine/workspace-manager/workspace-sync-metadata/commands/sync-workspace-metadata.command'; import { compareVersionMajorAndMinor } from 'src/utils/version/compare-version-minor-and-major'; +import { MigrateWorkflowRunStatesCommand } from 'src/database/commands/upgrade-version-command/1-1/1-1-migrate-workflow-run-state.command'; const execPromise = promisify(exec); @@ -144,6 +145,7 @@ export class UpgradeCommand extends UpgradeCommandRunner { protected readonly fixUpdateStandardFieldsIsLabelSyncedWithNameCommand: FixUpdateStandardFieldsIsLabelSyncedWithName, // 1.2 Commands + protected readonly migrateWorkflowRunStatesCommand: MigrateWorkflowRunStatesCommand, protected readonly addEnqueuedStatusToWorkflowRunCommand: AddEnqueuedStatusToWorkflowRunCommand, ) { super( @@ -195,7 +197,7 @@ export class UpgradeCommand extends UpgradeCommandRunner { const commands_120: VersionCommands = { beforeSyncMetadata: [this.addEnqueuedStatusToWorkflowRunCommand], - afterSyncMetadata: [], + afterSyncMetadata: [this.migrateWorkflowRunStatesCommand], }; this.allCommands = { diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index f85526d4a..1ef67a125 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -486,6 +486,7 @@ export const WORKFLOW_RUN_STANDARD_FIELD_IDS = { createdBy: '20202020-6007-401a-8aa5-e6f38581a6f3', output: '20202020-7be4-4db2-8ac6-3ff0d740843d', context: '20202020-189c-478a-b867-d72feaf5926a', + state: '20202020-611f-45f3-9cde-d64927e8ec57', favorites: '20202020-4baf-4604-b899-2f7fcfbbf90d', timelineActivities: '20202020-af4d-4eb0-babc-eb960a45b356', searchVector: '20202020-0b91-4ded-b1ac-cbd5efa58cb9', diff --git a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts index 42e0fd487..ebfa01d79 100644 --- a/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts +++ b/packages/twenty-server/src/modules/workflow/common/standard-objects/workflow-run.workspace-entity.ts @@ -31,6 +31,7 @@ import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-ob import { WorkflowExecutorOutput } from 'src/modules/workflow/workflow-executor/types/workflow-executor-output.type'; import { WorkflowAction } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowTrigger } from 'src/modules/workflow/workflow-trigger/types/workflow-trigger.type'; +import { WorkflowRunStepInfo } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; export enum WorkflowRunStatus { NOT_STARTED = 'NOT_STARTED', @@ -54,6 +55,15 @@ export type WorkflowRunOutput = { error?: string; }; +export type WorkflowRunState = { + flow: { + trigger: WorkflowTrigger; + steps: WorkflowAction[]; + }; + stepInfos: Record; + workflowRunError?: string; +}; + const NAME_FIELD_NAME = 'name'; export const SEARCH_FIELDS_FOR_WORKFLOW_RUNS: FieldTypeAndNameMetadata[] = [ @@ -172,6 +182,16 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity { // eslint-disable-next-line @typescript-eslint/no-explicit-any context: Record | null; + @WorkspaceField({ + standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.state, + type: FieldMetadataType.RAW_JSON, + label: msg`State`, + description: msg`State of the workflow run`, + icon: 'IconHierarchy2', + }) + @WorkspaceIsNullable() + state: WorkflowRunState | null; + @WorkspaceField({ standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.position, type: FieldMetadataType.POSITION, diff --git a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts index a21f3dae2..1550fe59a 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-step/workflow-version-step.workspace-service.ts @@ -30,6 +30,7 @@ import { } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; +import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; const TRIGGER_STEP_ID = 'trigger'; @@ -334,6 +335,7 @@ export class WorkflowVersionStepWorkspaceService { workflowRunId, stepOutput: newStepOutput, context: updatedContext, + stepStatus: StepStatus.SUCCESS, }); await this.workflowRunnerWorkspaceService.resume({ diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-run-step-info.type.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-run-step-info.type.ts new file mode 100644 index 000000000..d4dbcfade --- /dev/null +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/types/workflow-run-step-info.type.ts @@ -0,0 +1,13 @@ +export enum StepStatus { + NOT_STARTED = 'NOT_STARTED', + RUNNING = 'RUNNING', + SUCCESS = 'SUCCESS', + FAILED = 'FAILED', + PENDING = 'PENDING', +} + +export type WorkflowRunStepInfo = { + result?: object; + error?: string; + status: StepStatus; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts index 86f356051..77860c82a 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/__tests__/workflow-executor.workspace-service.spec.ts @@ -13,6 +13,7 @@ import { } from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; import { WorkflowExecutorWorkspaceService } from 'src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; +import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; describe('WorkflowExecutorWorkspaceService', () => { let service: WorkflowExecutorWorkspaceService; @@ -169,9 +170,27 @@ describe('WorkflowExecutorWorkspaceService', () => { ], 'workspace-id', ); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledTimes(4); expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ + workflowRunId: mockWorkflowRunId, + stepOutput: { + id: 'step-1', + output: {}, + }, + context: { + data: 'some-data', + }, + workspaceId: 'workspace-id', + stepStatus: StepStatus.RUNNING, + }); + + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenNthCalledWith(2, { workflowRunId: mockWorkflowRunId, stepOutput: { id: 'step-1', @@ -179,10 +198,14 @@ describe('WorkflowExecutorWorkspaceService', () => { }, context: { data: 'some-data', - 'step-1': { stepOutput: 'success' }, + 'step-1': { + stepOutput: 'success', + }, }, workspaceId: 'workspace-id', + stepStatus: StepStatus.SUCCESS, }); + expect(result).toEqual({ result: { success: true } }); // execute second step @@ -207,9 +230,24 @@ describe('WorkflowExecutorWorkspaceService', () => { error: 'Step execution failed', }); expect(workspaceEventEmitter.emitCustomBatchEvent).not.toHaveBeenCalled(); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledTimes(2); expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ + workflowRunId: mockWorkflowRunId, + stepOutput: { + id: 'step-1', + output: {}, + }, + context: mockContext, + workspaceId: 'workspace-id', + stepStatus: StepStatus.RUNNING, + }); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenNthCalledWith(2, { workflowRunId: mockWorkflowRunId, stepOutput: { id: 'step-1', @@ -219,6 +257,7 @@ describe('WorkflowExecutorWorkspaceService', () => { }, context: mockContext, workspaceId: 'workspace-id', + stepStatus: StepStatus.FAILED, }); }); @@ -237,9 +276,24 @@ describe('WorkflowExecutorWorkspaceService', () => { }); expect(result).toEqual(mockPendingEvent); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledTimes(2); expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ + workflowRunId: mockWorkflowRunId, + stepOutput: { + id: 'step-1', + output: {}, + }, + context: mockContext, + workspaceId: 'workspace-id', + stepStatus: StepStatus.RUNNING, + }); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenNthCalledWith(2, { workflowRunId: mockWorkflowRunId, stepOutput: { id: 'step-1', @@ -247,6 +301,7 @@ describe('WorkflowExecutorWorkspaceService', () => { }, context: mockContext, workspaceId: 'workspace-id', + stepStatus: StepStatus.PENDING, }); // No recursive call to execute should happen @@ -291,10 +346,27 @@ describe('WorkflowExecutorWorkspaceService', () => { context: mockContext, }); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledTimes(4); + // execute first step expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ + workflowRunId: mockWorkflowRunId, + stepOutput: { + id: 'step-1', + output: {}, + }, + context: mockContext, + workspaceId: 'workspace-id', + stepStatus: StepStatus.RUNNING, + }); + + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenNthCalledWith(2, { workflowRunId: mockWorkflowRunId, stepOutput: { id: 'step-1', @@ -304,6 +376,7 @@ describe('WorkflowExecutorWorkspaceService', () => { }, context: mockContext, workspaceId: 'workspace-id', + stepStatus: StepStatus.FAILED, }); expect(result).toEqual({ result: { success: true } }); @@ -378,9 +451,24 @@ describe('WorkflowExecutorWorkspaceService', () => { // Should not retry anymore expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(1); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledTimes(2); expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ + workflowRunId: mockWorkflowRunId, + stepOutput: { + id: 'step-1', + output: {}, + }, + context: mockContext, + workspaceId: 'workspace-id', + stepStatus: StepStatus.RUNNING, + }); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenNthCalledWith(2, { workflowRunId: mockWorkflowRunId, stepOutput: { id: 'step-1', @@ -388,6 +476,7 @@ describe('WorkflowExecutorWorkspaceService', () => { }, context: mockContext, workspaceId: 'workspace-id', + stepStatus: StepStatus.FAILED, }); expect(result).toEqual(errorOutput); }); @@ -404,6 +493,9 @@ describe('WorkflowExecutorWorkspaceService', () => { }); expect(workflowExecutorFactory.get).toHaveBeenCalledTimes(1); + expect( + workflowRunWorkspaceService.saveWorkflowRunState, + ).toHaveBeenCalledTimes(1); expect( workflowRunWorkspaceService.saveWorkflowRunState, ).toHaveBeenCalledWith({ @@ -416,6 +508,7 @@ describe('WorkflowExecutorWorkspaceService', () => { }, context: mockContext, workspaceId: 'workspace-id', + stepStatus: StepStatus.FAILED, }); expect(result).toEqual({ error: BILLING_WORKFLOW_EXECUTION_ERROR_MESSAGE, diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index ef5e781dd..96f555633 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -25,6 +25,7 @@ import { WorkflowTriggerException, WorkflowTriggerExceptionCode, } from 'src/modules/workflow/workflow-trigger/exceptions/workflow-trigger.exception'; +import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; const MAX_RETRIES_ON_FAILURE = 3; @@ -87,11 +88,23 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { output: billingOutput, }, context, + stepStatus: StepStatus.FAILED, }); return billingOutput; } + await this.workflowRunWorkspaceService.saveWorkflowRunState({ + workflowRunId, + stepOutput: { + id: step.id, + output: {}, + }, + context, + workspaceId, + stepStatus: StepStatus.RUNNING, + }); + try { actionOutput = await workflowExecutor.execute({ currentStepId, @@ -121,13 +134,16 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { stepOutput, context, workspaceId, + stepStatus: StepStatus.PENDING, }); return actionOutput; } + const actionOutputSuccess = isDefined(actionOutput.result); + const shouldContinue = - isDefined(actionOutput.result) || + actionOutputSuccess || step.settings.errorHandlingOptions.continueOnFailure.value; if (shouldContinue) { @@ -143,6 +159,9 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { stepOutput, context: updatedContext, workspaceId, + stepStatus: isDefined(actionOutput.result) + ? StepStatus.SUCCESS + : StepStatus.FAILED, }); if (!isDefined(step.nextStepIds?.[0])) { @@ -176,6 +195,7 @@ export class WorkflowExecutorWorkspaceService implements WorkflowExecutor { stepOutput, context, workspaceId, + stepStatus: StepStatus.FAILED, }); return actionOutput; diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts index e5115e21e..5ce119bf4 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/jobs/run-workflow.job.ts @@ -118,6 +118,7 @@ export class RunWorkflowJob { }, }, }, + payload: triggerPayload, }); await this.throttleExecution(workflowVersion.workflowId); diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 3f2ae8d52..fe5f363d5 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -3,6 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm'; import { Repository } from 'typeorm'; import { v4 } from 'uuid'; +import { isDefined } from 'twenty-shared/utils'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { objectRecordChangedValues } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-values'; @@ -16,6 +17,7 @@ import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global. import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { StepOutput, + WorkflowRunState, WorkflowRunOutput, WorkflowRunStatus, WorkflowRunWorkspaceEntity, @@ -26,6 +28,8 @@ import { WorkflowRunException, WorkflowRunExceptionCode, } from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception'; +import { StepStatus } from 'src/modules/workflow/workflow-executor/types/workflow-run-step-info.type'; +import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; @Injectable() export class WorkflowRunWorkspaceService { @@ -120,6 +124,7 @@ export class WorkflowRunWorkspaceService { workflowId: workflow.id, status, position, + state: this.getInitState(workflowVersion), context, }); @@ -132,10 +137,12 @@ export class WorkflowRunWorkspaceService { workflowRunId, workspaceId, output, + payload, }: { workflowRunId: string; workspaceId: string; output: WorkflowRunOutput; + payload: object; }) { const workflowRunRepository = await this.twentyORMGlobalManager.getRepositoryForWorkspace( @@ -169,6 +176,17 @@ export class WorkflowRunWorkspaceService { status: WorkflowRunStatus.RUNNING, startedAt: new Date().toISOString(), output, + state: { + ...workflowRunToUpdate.state, + stepInfos: { + ...workflowRunToUpdate.state?.stepInfos, + trigger: { + ...workflowRunToUpdate.state?.stepInfos.trigger, + status: StepStatus.SUCCESS, + result: payload, + }, + }, + }, }; await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); @@ -215,13 +233,17 @@ export class WorkflowRunWorkspaceService { ...(workflowRunToUpdate.output ?? {}), error, }, + state: { + ...workflowRunToUpdate.state, + workflowRunError: error, + }, }; await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); await this.emitWorkflowRunUpdatedEvent({ workflowRunBefore: workflowRunToUpdate, - updatedFields: ['status', 'endedAt', 'output'], + updatedFields: ['status', 'endedAt', 'output', 'state'], }); await this.metricsService.incrementCounter({ @@ -238,12 +260,14 @@ export class WorkflowRunWorkspaceService { stepOutput, workspaceId, context, + stepStatus, }: { workflowRunId: string; stepOutput: StepOutput; workspaceId: string; // eslint-disable-next-line @typescript-eslint/no-explicit-any context: Record; + stepStatus: StepStatus; }) { const workflowRunRepository = await this.twentyORMGlobalManager.getRepositoryForWorkspace( @@ -274,6 +298,17 @@ export class WorkflowRunWorkspaceService { [stepOutput.id]: stepOutput.output, }, }, + state: { + ...workflowRunToUpdate.state, + stepInfos: { + ...workflowRunToUpdate.state?.stepInfos, + [stepOutput.id]: { + result: stepOutput.output?.result, + error: stepOutput.output?.error, + status: stepStatus, + }, + }, + }, context, }; @@ -281,7 +316,7 @@ export class WorkflowRunWorkspaceService { await this.emitWorkflowRunUpdatedEvent({ workflowRunBefore: workflowRunToUpdate, - updatedFields: ['context', 'output'], + updatedFields: ['context', 'output', 'state'], }); } @@ -334,13 +369,20 @@ export class WorkflowRunWorkspaceService { steps: updatedSteps, }, }, + state: { + ...workflowRunToUpdate.state, + flow: { + ...(workflowRunToUpdate.state?.flow ?? {}), + steps: updatedSteps, + }, + }, }; await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); await this.emitWorkflowRunUpdatedEvent({ workflowRunBefore: workflowRunToUpdate, - updatedFields: ['output'], + updatedFields: ['output', 'state'], }); } @@ -441,4 +483,31 @@ export class WorkflowRunWorkspaceService { workspaceId, }); } + + private getInitState( + workflowVersion: WorkflowVersionWorkspaceEntity, + ): WorkflowRunState | undefined { + if ( + !isDefined(workflowVersion.trigger) || + !isDefined(workflowVersion.steps) + ) { + return undefined; + } + + return { + flow: { + trigger: workflowVersion.trigger, + steps: workflowVersion.steps, + }, + stepInfos: { + trigger: { status: StepStatus.NOT_STARTED }, + ...Object.fromEntries( + workflowVersion.steps.map((step) => [ + step.id, + { status: StepStatus.NOT_STARTED }, + ]), + ), + }, + }; + } }