8839 workflow follow up code step (#8856)
- add readonly mode - fix falsy stepOutput computation
This commit is contained in:
@ -245,10 +245,12 @@ export class WorkflowVersionStepWorkspaceService {
|
||||
workspaceId,
|
||||
workflowVersionId,
|
||||
step,
|
||||
shouldUpdateStepOutput,
|
||||
}: {
|
||||
workspaceId: string;
|
||||
workflowVersionId: string;
|
||||
step: WorkflowAction;
|
||||
shouldUpdateStepOutput: boolean;
|
||||
}): Promise<WorkflowAction> {
|
||||
const workflowVersionRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
|
||||
@ -275,10 +277,12 @@ export class WorkflowVersionStepWorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
const enrichedNewStep = await this.enrichOutputSchema({
|
||||
step,
|
||||
workspaceId,
|
||||
});
|
||||
const enrichedNewStep = shouldUpdateStepOutput
|
||||
? await this.enrichOutputSchema({
|
||||
step,
|
||||
workspaceId,
|
||||
})
|
||||
: step;
|
||||
|
||||
const updatedSteps = workflowVersion.steps.map((existingStep) => {
|
||||
if (existingStep.id === step.id) {
|
||||
|
||||
@ -226,6 +226,7 @@ export class WorkflowBuilderWorkspaceService {
|
||||
|
||||
const inputSchema =
|
||||
codeIntrospectionService.getFunctionInputSchema(sourceCode);
|
||||
|
||||
const fakeFunctionInput =
|
||||
codeIntrospectionService.generateInputData(inputSchema);
|
||||
|
||||
@ -233,7 +234,7 @@ export class WorkflowBuilderWorkspaceService {
|
||||
await serverlessFunctionService.executeOneServerlessFunction(
|
||||
serverlessFunctionId,
|
||||
workspaceId,
|
||||
fakeFunctionInput,
|
||||
Object.values(fakeFunctionInput)?.[0] || {},
|
||||
serverlessFunctionVersion,
|
||||
);
|
||||
|
||||
|
||||
@ -35,7 +35,8 @@ export class CodeWorkflowAction implements WorkflowAction {
|
||||
await this.serverlessFunctionService.executeOneServerlessFunction(
|
||||
workflowActionInput.serverlessFunctionId,
|
||||
workspaceId,
|
||||
workflowActionInput.serverlessFunctionInput,
|
||||
Object.values(workflowActionInput.serverlessFunctionInput)?.[0] || {},
|
||||
workflowActionInput.serverlessFunctionVersion,
|
||||
);
|
||||
|
||||
if (result.error) {
|
||||
|
||||
@ -8,6 +8,7 @@ import {
|
||||
WorkflowVersionBatchEvent,
|
||||
WorkflowVersionEventType,
|
||||
} from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job';
|
||||
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
|
||||
|
||||
describe('WorkflowStatusesUpdate', () => {
|
||||
let job: WorkflowStatusesUpdateJob;
|
||||
@ -21,6 +22,10 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
getRepository: jest.fn().mockResolvedValue(mockWorkflowRepository),
|
||||
};
|
||||
|
||||
const mockServerlessFunctionService = {
|
||||
publishOneServerlessFunction: jest.fn(),
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
@ -29,6 +34,10 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
provide: TwentyORMManager,
|
||||
useValue: mockTwentyORMManager,
|
||||
},
|
||||
{
|
||||
provide: ServerlessFunctionService,
|
||||
useValue: mockServerlessFunctionService,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
@ -94,6 +103,7 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
statusUpdates: [
|
||||
{
|
||||
workflowId: '1',
|
||||
workflowVersionId: '1',
|
||||
previousStatus: WorkflowVersionStatus.ACTIVE,
|
||||
newStatus: WorkflowVersionStatus.ACTIVE,
|
||||
},
|
||||
@ -108,7 +118,7 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
|
||||
await job.handle(event);
|
||||
|
||||
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1);
|
||||
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
|
||||
expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
@ -119,6 +129,7 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
statusUpdates: [
|
||||
{
|
||||
workflowId: '1',
|
||||
workflowVersionId: '1',
|
||||
previousStatus: WorkflowVersionStatus.ACTIVE,
|
||||
newStatus: WorkflowVersionStatus.DRAFT,
|
||||
},
|
||||
@ -133,7 +144,7 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
|
||||
await job.handle(event);
|
||||
|
||||
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1);
|
||||
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
|
||||
expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0);
|
||||
});
|
||||
|
||||
@ -144,6 +155,7 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
statusUpdates: [
|
||||
{
|
||||
workflowId: '1',
|
||||
workflowVersionId: '1',
|
||||
previousStatus: WorkflowVersionStatus.DEACTIVATED,
|
||||
newStatus: WorkflowVersionStatus.ACTIVE,
|
||||
},
|
||||
@ -158,7 +170,7 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
|
||||
await job.handle(event);
|
||||
|
||||
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1);
|
||||
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
|
||||
expect(mockWorkflowRepository.update).toHaveBeenCalledWith(
|
||||
{ id: '1' },
|
||||
{ statuses: [WorkflowStatus.ACTIVE] },
|
||||
@ -172,6 +184,7 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
statusUpdates: [
|
||||
{
|
||||
workflowId: '1',
|
||||
workflowVersionId: '1',
|
||||
previousStatus: WorkflowVersionStatus.ACTIVE,
|
||||
newStatus: WorkflowVersionStatus.DEACTIVATED,
|
||||
},
|
||||
@ -186,7 +199,7 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
|
||||
await job.handle(event);
|
||||
|
||||
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1);
|
||||
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
|
||||
expect(mockWorkflowRepository.update).toHaveBeenCalledWith(
|
||||
{ id: '1' },
|
||||
{ statuses: [WorkflowStatus.DEACTIVATED] },
|
||||
@ -200,6 +213,7 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
statusUpdates: [
|
||||
{
|
||||
workflowId: '1',
|
||||
workflowVersionId: '1',
|
||||
previousStatus: WorkflowVersionStatus.DRAFT,
|
||||
newStatus: WorkflowVersionStatus.ACTIVE,
|
||||
},
|
||||
@ -214,7 +228,7 @@ describe('WorkflowStatusesUpdate', () => {
|
||||
|
||||
await job.handle(event);
|
||||
|
||||
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1);
|
||||
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
|
||||
expect(mockWorkflowRepository.update).toHaveBeenCalledWith(
|
||||
{ id: '1' },
|
||||
{ statuses: [WorkflowStatus.ACTIVE] },
|
||||
|
||||
@ -4,11 +4,21 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
|
||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { WorkflowVersionStatus } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
|
||||
import {
|
||||
WorkflowVersionStatus,
|
||||
WorkflowVersionWorkspaceEntity,
|
||||
} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
|
||||
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
|
||||
import { getStatusCombinationFromArray } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-array.util';
|
||||
import { getStatusCombinationFromUpdate } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-update.util';
|
||||
import { getWorkflowStatusesFromCombination } from 'src/modules/workflow/workflow-status/utils/get-statuses-from-combination.util';
|
||||
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
|
||||
import {
|
||||
WorkflowAction,
|
||||
WorkflowActionType,
|
||||
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
|
||||
import { isDefined } from 'src/utils/is-defined';
|
||||
import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository';
|
||||
|
||||
export enum WorkflowVersionEventType {
|
||||
CREATE = 'CREATE',
|
||||
@ -32,6 +42,7 @@ export type WorkflowVersionBatchCreateEvent = {
|
||||
|
||||
export type WorkflowVersionStatusUpdate = {
|
||||
workflowId: string;
|
||||
workflowVersionId: string;
|
||||
previousStatus: WorkflowVersionStatus;
|
||||
newStatus: WorkflowVersionStatus;
|
||||
};
|
||||
@ -48,7 +59,10 @@ export type WorkflowVersionBatchDelete = {
|
||||
|
||||
@Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST })
|
||||
export class WorkflowStatusesUpdateJob {
|
||||
constructor(private readonly twentyORMManager: TwentyORMManager) {}
|
||||
constructor(
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly serverlessFunctionService: ServerlessFunctionService,
|
||||
) {}
|
||||
|
||||
@Process(WorkflowStatusesUpdateJob.name)
|
||||
async handle(event: WorkflowVersionBatchEvent): Promise<void> {
|
||||
@ -63,7 +77,10 @@ export class WorkflowStatusesUpdateJob {
|
||||
case WorkflowVersionEventType.STATUS_UPDATE:
|
||||
await Promise.all(
|
||||
event.statusUpdates.map((statusUpdate) =>
|
||||
this.handleWorkflowVersionStatusUpdated(statusUpdate),
|
||||
this.handleWorkflowVersionStatusUpdated(
|
||||
statusUpdate,
|
||||
event.workspaceId,
|
||||
),
|
||||
),
|
||||
);
|
||||
break;
|
||||
@ -119,20 +136,92 @@ export class WorkflowStatusesUpdateJob {
|
||||
);
|
||||
}
|
||||
|
||||
private async handlePublishServerlessFunction({
|
||||
statusUpdate,
|
||||
workspaceId,
|
||||
workflowVersion,
|
||||
workflowVersionRepository,
|
||||
}: {
|
||||
statusUpdate: WorkflowVersionStatusUpdate;
|
||||
workspaceId: string;
|
||||
workflowVersion: WorkflowVersionWorkspaceEntity;
|
||||
workflowVersionRepository: WorkspaceRepository<WorkflowVersionWorkspaceEntity>;
|
||||
}) {
|
||||
const shouldComputeNewSteps =
|
||||
statusUpdate.newStatus === WorkflowVersionStatus.ACTIVE &&
|
||||
isDefined(workflowVersion.steps) &&
|
||||
workflowVersion.steps.filter(
|
||||
(step) => step.type === WorkflowActionType.CODE,
|
||||
).length > 0;
|
||||
|
||||
if (shouldComputeNewSteps) {
|
||||
const newSteps: WorkflowAction[] = [];
|
||||
|
||||
for (const step of workflowVersion.steps || []) {
|
||||
const newStep = { ...step };
|
||||
|
||||
if (step.type === WorkflowActionType.CODE) {
|
||||
let serverlessFunction;
|
||||
|
||||
try {
|
||||
serverlessFunction =
|
||||
await this.serverlessFunctionService.publishOneServerlessFunction(
|
||||
step.settings.input.serverlessFunctionId,
|
||||
workspaceId,
|
||||
);
|
||||
} catch (e) {
|
||||
serverlessFunction = null;
|
||||
}
|
||||
|
||||
if (serverlessFunction) {
|
||||
const newStepSettings = { ...step.settings };
|
||||
|
||||
newStepSettings.input.serverlessFunctionVersion =
|
||||
serverlessFunction.latestVersion;
|
||||
|
||||
newStep.settings = newStepSettings;
|
||||
}
|
||||
}
|
||||
newSteps.push(newStep);
|
||||
}
|
||||
|
||||
await workflowVersionRepository.update(statusUpdate.workflowVersionId, {
|
||||
steps: newSteps,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private async handleWorkflowVersionStatusUpdated(
|
||||
statusUpdate: WorkflowVersionStatusUpdate,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
const workflowRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowWorkspaceEntity>(
|
||||
'workflow',
|
||||
);
|
||||
|
||||
const workflowVersionRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
|
||||
'workflowVersion',
|
||||
);
|
||||
|
||||
const workflow = await workflowRepository.findOneOrFail({
|
||||
where: {
|
||||
id: statusUpdate.workflowId,
|
||||
},
|
||||
});
|
||||
|
||||
const workflowVersion = await workflowVersionRepository.findOneOrFail({
|
||||
where: { id: statusUpdate.workflowVersionId },
|
||||
});
|
||||
|
||||
await this.handlePublishServerlessFunction({
|
||||
workflowVersion,
|
||||
workflowVersionRepository,
|
||||
workspaceId,
|
||||
statusUpdate,
|
||||
});
|
||||
|
||||
const currentWorkflowStatusCombination = getStatusCombinationFromArray(
|
||||
workflow.statuses || [],
|
||||
);
|
||||
|
||||
@ -2,8 +2,10 @@ import { Module } from '@nestjs/common';
|
||||
|
||||
import { WorkflowStatusesUpdateJob } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job';
|
||||
import { WorkflowVersionStatusListener } from 'src/modules/workflow/workflow-status/listeners/workflow-version-status.listener';
|
||||
import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module';
|
||||
|
||||
@Module({
|
||||
imports: [ServerlessFunctionModule],
|
||||
providers: [WorkflowStatusesUpdateJob, WorkflowVersionStatusListener],
|
||||
})
|
||||
export class WorkflowStatusModule {}
|
||||
|
||||
@ -28,6 +28,7 @@ import { assertNever } from 'src/utils/assert';
|
||||
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
|
||||
import { WORKFLOW_VERSION_STATUS_UPDATED } from 'src/modules/workflow/workflow-status/constants/workflow-version-status-updated.constants';
|
||||
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
|
||||
import { WorkflowVersionStatusUpdate } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job';
|
||||
|
||||
@Injectable()
|
||||
export class WorkflowTriggerWorkspaceService {
|
||||
@ -387,11 +388,12 @@ export class WorkflowTriggerWorkspaceService {
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
this.workspaceEventEmitter.emitCustomBatchEvent(
|
||||
this.workspaceEventEmitter.emitCustomBatchEvent<WorkflowVersionStatusUpdate>(
|
||||
WORKFLOW_VERSION_STATUS_UPDATED,
|
||||
[
|
||||
{
|
||||
workflowId: workflowVersion.workflowId,
|
||||
workflowVersionId: workflowVersion.id,
|
||||
previousStatus: workflowVersion.status,
|
||||
newStatus,
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user