Remove useless columns (#13312)
This commit is contained in:
@ -95,11 +95,12 @@ export class MigrateWorkflowRunStatesCommand extends ActiveOrSuspendedWorkspaces
|
||||
? { where: { startedAt: MoreThan(this.afterDate) } }
|
||||
: {};
|
||||
|
||||
const workflowRuns = await workflowRunRepository.find({
|
||||
const workflowRuns = (await workflowRunRepository.find({
|
||||
...findOption,
|
||||
skip: offset * this.chunkSize,
|
||||
take: this.chunkSize,
|
||||
});
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
})) as any[]; // We type as any as workflowRun output has been removed since 1.1.0 release
|
||||
|
||||
for (const workflowRun of workflowRuns) {
|
||||
const output = workflowRun.output;
|
||||
|
||||
@ -0,0 +1,39 @@
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
|
||||
import { Command } from 'nest-commander';
|
||||
import { IsNull, Repository } from 'typeorm';
|
||||
|
||||
import {
|
||||
ActiveOrSuspendedWorkspacesMigrationCommandRunner,
|
||||
RunOnWorkspaceArgs,
|
||||
} from 'src/database/commands/command-runners/active-or-suspended-workspaces-migration.command-runner';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
import { WorkflowRunWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-run.workspace-entity';
|
||||
|
||||
@Command({
|
||||
name: 'migrate:1-2:remove-workflow-runs-without-state',
|
||||
description: 'Remove workflow runs without state.',
|
||||
})
|
||||
export class RemoveWorkflowRunsWithoutState extends ActiveOrSuspendedWorkspacesMigrationCommandRunner {
|
||||
constructor(
|
||||
@InjectRepository(Workspace, 'core')
|
||||
protected readonly workspaceRepository: Repository<Workspace>,
|
||||
protected readonly twentyORMGlobalManager: TwentyORMGlobalManager,
|
||||
) {
|
||||
super(workspaceRepository, twentyORMGlobalManager);
|
||||
}
|
||||
|
||||
override async runOnWorkspace({
|
||||
workspaceId,
|
||||
}: RunOnWorkspaceArgs): Promise<void> {
|
||||
const workflowRunRepository =
|
||||
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WorkflowRunWorkspaceEntity>(
|
||||
workspaceId,
|
||||
'workflowRun',
|
||||
{ shouldBypassPermissionChecks: true },
|
||||
);
|
||||
|
||||
await workflowRunRepository.delete({ state: IsNull() });
|
||||
}
|
||||
}
|
||||
@ -1,8 +1,12 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
|
||||
import { RemoveWorkflowRunsWithoutState } from 'src/database/commands/upgrade-version-command/1-2/1-2-remove-workflow-runs-without-state.command';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
|
||||
@Module({
|
||||
imports: [],
|
||||
providers: [],
|
||||
exports: [],
|
||||
imports: [TypeOrmModule.forFeature([Workspace], 'core')],
|
||||
providers: [RemoveWorkflowRunsWithoutState],
|
||||
exports: [RemoveWorkflowRunsWithoutState],
|
||||
})
|
||||
export class V1_2_UpgradeVersionCommandModule {}
|
||||
|
||||
@ -31,6 +31,7 @@ import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
|
||||
import { SyncWorkspaceMetadataCommand } from 'src/engine/workspace-manager/workspace-sync-metadata/commands/sync-workspace-metadata.command';
|
||||
import { compareVersionMajorAndMinor } from 'src/utils/version/compare-version-minor-and-major';
|
||||
import { RemoveWorkflowRunsWithoutState } from 'src/database/commands/upgrade-version-command/1-2/1-2-remove-workflow-runs-without-state.command';
|
||||
|
||||
const execPromise = promisify(exec);
|
||||
|
||||
@ -149,6 +150,7 @@ export class UpgradeCommand extends UpgradeCommandRunner {
|
||||
protected readonly addEnqueuedStatusToWorkflowRunCommand: AddEnqueuedStatusToWorkflowRunCommand,
|
||||
|
||||
// 1.2 Commands
|
||||
protected readonly removeWorkflowRunsWithoutState: RemoveWorkflowRunsWithoutState,
|
||||
|
||||
// 1.3 Commands
|
||||
) {
|
||||
@ -202,7 +204,7 @@ export class UpgradeCommand extends UpgradeCommandRunner {
|
||||
};
|
||||
|
||||
const commands_120: VersionCommands = {
|
||||
beforeSyncMetadata: [],
|
||||
beforeSyncMetadata: [this.removeWorkflowRunsWithoutState],
|
||||
afterSyncMetadata: [],
|
||||
};
|
||||
|
||||
|
||||
@ -161,27 +161,6 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity {
|
||||
})
|
||||
createdBy: ActorMetadata;
|
||||
|
||||
@WorkspaceField({
|
||||
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.output,
|
||||
type: FieldMetadataType.RAW_JSON,
|
||||
label: msg`Output`,
|
||||
description: msg`Json object to provide output of the workflow run`,
|
||||
icon: 'IconText',
|
||||
})
|
||||
@WorkspaceIsNullable()
|
||||
output: WorkflowRunOutput | null;
|
||||
|
||||
@WorkspaceField({
|
||||
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.context,
|
||||
type: FieldMetadataType.RAW_JSON,
|
||||
label: msg`Context`,
|
||||
description: msg`Context`,
|
||||
icon: 'IconHierarchy2',
|
||||
})
|
||||
@WorkspaceIsNullable()
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
context: Record<string, any> | null;
|
||||
|
||||
@WorkspaceField({
|
||||
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.state,
|
||||
type: FieldMetadataType.RAW_JSON,
|
||||
@ -189,8 +168,7 @@ export class WorkflowRunWorkspaceEntity extends BaseWorkspaceEntity {
|
||||
description: msg`State of the workflow run`,
|
||||
icon: 'IconHierarchy2',
|
||||
})
|
||||
@WorkspaceIsNullable()
|
||||
state: WorkflowRunState | null;
|
||||
state: WorkflowRunState;
|
||||
|
||||
@WorkspaceField({
|
||||
standardId: WORKFLOW_RUN_STANDARD_FIELD_IDS.position,
|
||||
|
||||
@ -103,12 +103,9 @@ export class RunWorkflowJob {
|
||||
triggerType: workflowVersion.trigger.type,
|
||||
});
|
||||
|
||||
const triggerPayload = workflowRun.context?.trigger ?? {};
|
||||
|
||||
await this.workflowRunWorkspaceService.startWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
payload: triggerPayload,
|
||||
});
|
||||
|
||||
await this.throttleExecution(workflowVersion.workflowId);
|
||||
|
||||
@ -40,15 +40,14 @@ export class WorkflowRunWorkspaceService {
|
||||
workflowVersionId,
|
||||
createdBy,
|
||||
workflowRunId,
|
||||
context,
|
||||
status,
|
||||
triggerPayload,
|
||||
}: {
|
||||
workflowVersionId: string;
|
||||
createdBy: ActorMetadata;
|
||||
workflowRunId?: string;
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
context: Record<string, any>;
|
||||
status: WorkflowRunStatus.NOT_STARTED | WorkflowRunStatus.ENQUEUED;
|
||||
triggerPayload: object;
|
||||
}) {
|
||||
const workspaceId =
|
||||
this.scopedWorkspaceContextFactory.create()?.workspaceId;
|
||||
@ -108,7 +107,7 @@ export class WorkflowRunWorkspaceService {
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
const initState = this.getInitState(workflowVersion);
|
||||
const initState = this.getInitState(workflowVersion, triggerPayload);
|
||||
|
||||
const workflowRun = workflowRunRepository.create({
|
||||
id: workflowRunId ?? v4(),
|
||||
@ -119,11 +118,6 @@ export class WorkflowRunWorkspaceService {
|
||||
status,
|
||||
position,
|
||||
state: initState,
|
||||
output: {
|
||||
...initState,
|
||||
stepsOutput: {},
|
||||
},
|
||||
context,
|
||||
});
|
||||
|
||||
await workflowRunRepository.insert(workflowRun);
|
||||
@ -135,11 +129,9 @@ export class WorkflowRunWorkspaceService {
|
||||
async startWorkflowRun({
|
||||
workflowRunId,
|
||||
workspaceId,
|
||||
payload,
|
||||
}: {
|
||||
workflowRunId: string;
|
||||
workspaceId: string;
|
||||
payload: object;
|
||||
}) {
|
||||
const workflowRunToUpdate = await this.getWorkflowRunOrFail({
|
||||
workflowRunId,
|
||||
@ -159,29 +151,17 @@ export class WorkflowRunWorkspaceService {
|
||||
const partialUpdate = {
|
||||
status: WorkflowRunStatus.RUNNING,
|
||||
startedAt: new Date().toISOString(),
|
||||
output: {
|
||||
...workflowRunToUpdate.output,
|
||||
stepsOutput: {
|
||||
trigger: {
|
||||
result: payload,
|
||||
},
|
||||
},
|
||||
},
|
||||
state: {
|
||||
...workflowRunToUpdate.state,
|
||||
stepInfos: {
|
||||
...workflowRunToUpdate.state?.stepInfos,
|
||||
trigger: {
|
||||
result: {},
|
||||
...workflowRunToUpdate.state?.stepInfos.trigger,
|
||||
status: StepStatus.SUCCESS,
|
||||
result: payload,
|
||||
},
|
||||
},
|
||||
},
|
||||
context: payload
|
||||
? {
|
||||
trigger: payload,
|
||||
}
|
||||
: (workflowRunToUpdate.context ?? {}),
|
||||
};
|
||||
|
||||
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
|
||||
@ -207,10 +187,6 @@ export class WorkflowRunWorkspaceService {
|
||||
const partialUpdate = {
|
||||
status,
|
||||
endedAt: new Date().toISOString(),
|
||||
output: {
|
||||
...workflowRunToUpdate.output,
|
||||
error,
|
||||
},
|
||||
state: {
|
||||
...workflowRunToUpdate.state,
|
||||
workflowRunError: error,
|
||||
@ -279,16 +255,6 @@ export class WorkflowRunWorkspaceService {
|
||||
});
|
||||
|
||||
const partialUpdate = {
|
||||
output: {
|
||||
flow: workflowRunToUpdate.output?.flow ?? {
|
||||
trigger: undefined,
|
||||
steps: [],
|
||||
},
|
||||
stepsOutput: {
|
||||
...(workflowRunToUpdate.output?.stepsOutput ?? {}),
|
||||
[stepOutput.id]: stepOutput.output,
|
||||
},
|
||||
},
|
||||
state: {
|
||||
...workflowRunToUpdate.state,
|
||||
stepInfos: {
|
||||
@ -301,14 +267,6 @@ export class WorkflowRunWorkspaceService {
|
||||
},
|
||||
},
|
||||
},
|
||||
...(stepStatus === StepStatus.SUCCESS
|
||||
? {
|
||||
context: {
|
||||
...workflowRunToUpdate.context,
|
||||
[stepOutput.id]: stepOutput.output.result,
|
||||
},
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
|
||||
await this.updateWorkflowRun({ workflowRunId, workspaceId, partialUpdate });
|
||||
@ -339,18 +297,11 @@ export class WorkflowRunWorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
const updatedSteps = workflowRunToUpdate.output?.flow?.steps?.map(
|
||||
const updatedSteps = workflowRunToUpdate.state?.flow?.steps?.map(
|
||||
(existingStep) => (step.id === existingStep.id ? step : existingStep),
|
||||
);
|
||||
|
||||
const partialUpdate = {
|
||||
output: {
|
||||
...(workflowRunToUpdate.output ?? {}),
|
||||
flow: {
|
||||
...(workflowRunToUpdate.output?.flow ?? {}),
|
||||
steps: updatedSteps,
|
||||
},
|
||||
},
|
||||
state: {
|
||||
...workflowRunToUpdate.state,
|
||||
flow: {
|
||||
@ -406,6 +357,7 @@ export class WorkflowRunWorkspaceService {
|
||||
|
||||
private getInitState(
|
||||
workflowVersion: WorkflowVersionWorkspaceEntity,
|
||||
triggerPayload: object,
|
||||
): WorkflowRunState | undefined {
|
||||
if (
|
||||
!isDefined(workflowVersion.trigger) ||
|
||||
@ -420,7 +372,7 @@ export class WorkflowRunWorkspaceService {
|
||||
steps: workflowVersion.steps,
|
||||
},
|
||||
stepInfos: {
|
||||
trigger: { status: StepStatus.NOT_STARTED },
|
||||
trigger: { status: StepStatus.NOT_STARTED, result: triggerPayload },
|
||||
...Object.fromEntries(
|
||||
workflowVersion.steps.map((step) => [
|
||||
step.id,
|
||||
|
||||
@ -75,9 +75,7 @@ export class WorkflowRunnerWorkspaceService {
|
||||
status: shouldEnqueueWorkflowRun
|
||||
? WorkflowRunStatus.ENQUEUED
|
||||
: WorkflowRunStatus.NOT_STARTED,
|
||||
context: {
|
||||
trigger: payload,
|
||||
},
|
||||
triggerPayload: payload,
|
||||
});
|
||||
|
||||
if (shouldEnqueueWorkflowRun) {
|
||||
|
||||
Reference in New Issue
Block a user