diff --git a/packages/twenty-front/src/generated-metadata/graphql.ts b/packages/twenty-front/src/generated-metadata/graphql.ts index 54cd29a02..f3f9645e0 100644 --- a/packages/twenty-front/src/generated-metadata/graphql.ts +++ b/packages/twenty-front/src/generated-metadata/graphql.ts @@ -1571,6 +1571,8 @@ export type UpdateServerlessFunctionInput = { export type UpdateWorkflowVersionStepInput = { /** Step to update in JSON format */ step: Scalars['JSON']['input']; + /** Boolean to check if we need to update stepOutput */ + shouldUpdateStepOutput?: InputMaybe; /** Workflow version ID */ workflowVersionId: Scalars['String']['input']; }; @@ -2249,4 +2251,4 @@ export const UpdateOneServerlessFunctionDocument = {"kind":"Document","definitio export const FindManyAvailablePackagesDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"FindManyAvailablePackages"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"input"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"ServerlessFunctionIdInput"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"getAvailablePackages"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"input"},"value":{"kind":"Variable","name":{"kind":"Name","value":"input"}}}]}]}}]} as unknown as DocumentNode; export const GetManyServerlessFunctionsDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetManyServerlessFunctions"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"findManyServerlessFunctions"},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"FragmentSpread","name":{"kind":"Name","value":"ServerlessFunctionFields"}}]}}]}},{"kind":"FragmentDefinition","name":{"kind":"Name","value":"ServerlessFunctionFields"},"typeCondition":{"kind":"NamedType","name":{"kind":"Name","value":"ServerlessFunction"}},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}},{"kind":"Field","name":{"kind":"Name","value":"description"}},{"kind":"Field","name":{"kind":"Name","value":"runtime"}},{"kind":"Field","name":{"kind":"Name","value":"syncStatus"}},{"kind":"Field","name":{"kind":"Name","value":"latestVersion"}},{"kind":"Field","name":{"kind":"Name","value":"latestVersionInputSchema"}},{"kind":"Field","name":{"kind":"Name","value":"publishedVersions"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}}]}}]} as unknown as DocumentNode; export const GetOneServerlessFunctionDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"GetOneServerlessFunction"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"input"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"ServerlessFunctionIdInput"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"findOneServerlessFunction"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"input"},"value":{"kind":"Variable","name":{"kind":"Name","value":"input"}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"FragmentSpread","name":{"kind":"Name","value":"ServerlessFunctionFields"}}]}}]}},{"kind":"FragmentDefinition","name":{"kind":"Name","value":"ServerlessFunctionFields"},"typeCondition":{"kind":"NamedType","name":{"kind":"Name","value":"ServerlessFunction"}},"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"id"}},{"kind":"Field","name":{"kind":"Name","value":"name"}},{"kind":"Field","name":{"kind":"Name","value":"description"}},{"kind":"Field","name":{"kind":"Name","value":"runtime"}},{"kind":"Field","name":{"kind":"Name","value":"syncStatus"}},{"kind":"Field","name":{"kind":"Name","value":"latestVersion"}},{"kind":"Field","name":{"kind":"Name","value":"latestVersionInputSchema"}},{"kind":"Field","name":{"kind":"Name","value":"publishedVersions"}},{"kind":"Field","name":{"kind":"Name","value":"createdAt"}},{"kind":"Field","name":{"kind":"Name","value":"updatedAt"}}]}}]} as unknown as DocumentNode; -export const FindOneServerlessFunctionSourceCodeDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"FindOneServerlessFunctionSourceCode"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"input"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"GetServerlessFunctionSourceCodeInput"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"getServerlessFunctionSourceCode"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"input"},"value":{"kind":"Variable","name":{"kind":"Name","value":"input"}}}]}]}}]} as unknown as DocumentNode; \ No newline at end of file +export const FindOneServerlessFunctionSourceCodeDocument = {"kind":"Document","definitions":[{"kind":"OperationDefinition","operation":"query","name":{"kind":"Name","value":"FindOneServerlessFunctionSourceCode"},"variableDefinitions":[{"kind":"VariableDefinition","variable":{"kind":"Variable","name":{"kind":"Name","value":"input"}},"type":{"kind":"NonNullType","type":{"kind":"NamedType","name":{"kind":"Name","value":"GetServerlessFunctionSourceCodeInput"}}}}],"selectionSet":{"kind":"SelectionSet","selections":[{"kind":"Field","name":{"kind":"Name","value":"getServerlessFunctionSourceCode"},"arguments":[{"kind":"Argument","name":{"kind":"Name","value":"input"},"value":{"kind":"Variable","name":{"kind":"Name","value":"input"}}}]}]}}]} as unknown as DocumentNode; diff --git a/packages/twenty-front/src/generated/graphql.tsx b/packages/twenty-front/src/generated/graphql.tsx index ef7b67fbc..798f5582e 100644 --- a/packages/twenty-front/src/generated/graphql.tsx +++ b/packages/twenty-front/src/generated/graphql.tsx @@ -1279,6 +1279,8 @@ export type UpdateServerlessFunctionInput = { export type UpdateWorkflowVersionStepInput = { /** Step to update in JSON format */ step: Scalars['JSON']; + /** Boolean to check if we need to update stepOutput */ + shouldUpdateStepOutput?: InputMaybe; /** Workflow version ID */ workflowVersionId: Scalars['String']; }; @@ -4416,4 +4418,4 @@ export function useGetWorkspaceFromInviteHashLazyQuery(baseOptions?: Apollo.Lazy } export type GetWorkspaceFromInviteHashQueryHookResult = ReturnType; export type GetWorkspaceFromInviteHashLazyQueryHookResult = ReturnType; -export type GetWorkspaceFromInviteHashQueryResult = Apollo.QueryResult; \ No newline at end of file +export type GetWorkspaceFromInviteHashQueryResult = Apollo.QueryResult; diff --git a/packages/twenty-front/src/modules/settings/serverless-functions/hooks/useUpdateOneServerlessFunction.ts b/packages/twenty-front/src/modules/settings/serverless-functions/hooks/useUpdateOneServerlessFunction.ts index 61c0a52c4..f13936083 100644 --- a/packages/twenty-front/src/modules/settings/serverless-functions/hooks/useUpdateOneServerlessFunction.ts +++ b/packages/twenty-front/src/modules/settings/serverless-functions/hooks/useUpdateOneServerlessFunction.ts @@ -1,6 +1,7 @@ import { useApolloMetadataClient } from '@/object-metadata/hooks/useApolloMetadataClient'; import { UPDATE_ONE_SERVERLESS_FUNCTION } from '@/settings/serverless-functions/graphql/mutations/updateOneServerlessFunction'; import { FIND_MANY_SERVERLESS_FUNCTIONS } from '@/settings/serverless-functions/graphql/queries/findManyServerlessFunctions'; +import { FIND_ONE_SERVERLESS_FUNCTION_SOURCE_CODE } from '@/settings/serverless-functions/graphql/queries/findOneServerlessFunctionSourceCode'; import { useMutation } from '@apollo/client'; import { getOperationName } from '@apollo/client/utilities'; import { @@ -26,7 +27,10 @@ export const useUpdateOneServerlessFunction = () => { input, }, awaitRefetchQueries: true, - refetchQueries: [getOperationName(FIND_MANY_SERVERLESS_FUNCTIONS) ?? ''], + refetchQueries: [ + getOperationName(FIND_MANY_SERVERLESS_FUNCTIONS) ?? '', + getOperationName(FIND_ONE_SERVERLESS_FUNCTION_SOURCE_CODE) ?? '', + ], }); }; diff --git a/packages/twenty-front/src/modules/workflow/components/RecordShowPageWorkflowHeader.tsx b/packages/twenty-front/src/modules/workflow/components/RecordShowPageWorkflowHeader.tsx index d6b9f263a..c9c601e85 100644 --- a/packages/twenty-front/src/modules/workflow/components/RecordShowPageWorkflowHeader.tsx +++ b/packages/twenty-front/src/modules/workflow/components/RecordShowPageWorkflowHeader.tsx @@ -15,7 +15,6 @@ import { IconTrash, isDefined, } from 'twenty-ui'; -import { capitalize } from '~/utils/string/capitalize'; import { assertWorkflowWithCurrentVersionIsDefined } from '../utils/assertWorkflowWithCurrentVersionIsDefined'; export const RecordShowPageWorkflowHeader = ({ @@ -73,17 +72,6 @@ export const RecordShowPageWorkflowHeader = ({ workflowVersionId: workflowWithCurrentVersion.currentVersion.id, workflowName: workflowWithCurrentVersion.name, }); - - enqueueSnackBar('', { - variant: SnackBarVariant.Success, - title: `${capitalize(workflowWithCurrentVersion.name)} starting...`, - icon: ( - - ), - }); }} /> diff --git a/packages/twenty-front/src/modules/workflow/hooks/useUpdateStep.ts b/packages/twenty-front/src/modules/workflow/hooks/useUpdateStep.ts index f78409e12..994ff1ade 100644 --- a/packages/twenty-front/src/modules/workflow/hooks/useUpdateStep.ts +++ b/packages/twenty-front/src/modules/workflow/hooks/useUpdateStep.ts @@ -14,7 +14,10 @@ export const useUpdateStep = ({ const { getUpdatableWorkflowVersion } = useGetUpdatableWorkflowVersion(); const { updateWorkflowVersionStep } = useUpdateWorkflowVersionStep(); - const updateStep = async (updatedStep: T) => { + const updateStep = async ( + updatedStep: T, + shouldUpdateStepOutput = true, + ) => { if (!isDefined(workflow.currentVersion)) { throw new Error('Can not update an undefined workflow version.'); } @@ -23,6 +26,7 @@ export const useUpdateStep = ({ await updateWorkflowVersionStep({ workflowVersionId: workflowVersion.id, step: updatedStep, + shouldUpdateStepOutput, }); }; diff --git a/packages/twenty-front/src/modules/workflow/workflow-actions/components/WorkflowEditActionFormServerlessFunction.tsx b/packages/twenty-front/src/modules/workflow/workflow-actions/components/WorkflowEditActionFormServerlessFunction.tsx index 13c78c283..265ae6980 100644 --- a/packages/twenty-front/src/modules/workflow/workflow-actions/components/WorkflowEditActionFormServerlessFunction.tsx +++ b/packages/twenty-front/src/modules/workflow/workflow-actions/components/WorkflowEditActionFormServerlessFunction.tsx @@ -5,7 +5,7 @@ import { mergeDefaultFunctionInputAndFunctionInput } from '@/workflow/utils/merg import { setNestedValue } from '@/workflow/utils/setNestedValue'; import { useTheme } from '@emotion/react'; import styled from '@emotion/styled'; -import { Fragment, ReactNode, useState } from 'react'; +import { Fragment, ReactNode, useEffect, useState } from 'react'; import { CodeEditor, HorizontalSeparator, @@ -63,7 +63,10 @@ type WorkflowEditActionFormServerlessFunctionProps = { } | { readonly?: false; - onActionUpdate: (action: WorkflowCodeAction) => void; + onActionUpdate: ( + action: WorkflowCodeAction, + shouldUpdateStepOutput?: boolean, + ) => void; }; }; @@ -88,9 +91,16 @@ export const WorkflowEditActionFormServerlessFunction = ({ id: serverlessFunctionId, }); + const [functionInput, setFunctionInput] = + useState( + action.settings.input.serverlessFunctionInput, + ); + const { formValues, setFormValues, loading } = useServerlessFunctionUpdateFormState(serverlessFunctionId); + const headerTitle = action.name || 'Code - Serverless Function'; + const save = async () => { try { await updateOneServerlessFunction({ @@ -112,6 +122,9 @@ export const WorkflowEditActionFormServerlessFunction = ({ const handleSave = usePreventOverlapCallback(save, 1000); const onCodeChange = async (value: string) => { + if (actionOptions.readonly === true) { + return; + } setFormValues((prevState) => ({ ...prevState, code: { ...prevState.code, [INDEX_FILE_PATH]: value }, @@ -121,6 +134,9 @@ export const WorkflowEditActionFormServerlessFunction = ({ }; const updateFunctionInputSchema = async () => { + if (actionOptions.readonly === true) { + return; + } const sourceCode = formValues.code?.[INDEX_FILE_PATH]; if (!isDefined(sourceCode)) { return; @@ -141,27 +157,25 @@ export const WorkflowEditActionFormServerlessFunction = ({ 100, ); - const [functionInput, setFunctionInput] = - useState( - action.settings.input.serverlessFunctionInput, - ); - const updateFunctionInput = useDebouncedCallback( - async (newFunctionInput: object) => { + async (newFunctionInput: object, shouldUpdateStepOutput = true) => { if (actionOptions.readonly === true) { return; } - actionOptions.onActionUpdate({ - ...action, - settings: { - ...action.settings, - input: { - ...action.settings.input, - serverlessFunctionInput: newFunctionInput, + actionOptions.onActionUpdate( + { + ...action, + settings: { + ...action.settings, + input: { + ...action.settings.input, + serverlessFunctionInput: newFunctionInput, + }, }, }, - }); + shouldUpdateStepOutput, + ); }, 1_000, ); @@ -171,7 +185,7 @@ export const WorkflowEditActionFormServerlessFunction = ({ setFunctionInput(updatedFunctionInput); - await updateFunctionInput(updatedFunctionInput); + await updateFunctionInput(updatedFunctionInput, false); }; const renderFields = ( @@ -230,10 +244,6 @@ export const WorkflowEditActionFormServerlessFunction = ({ }); }; - const headerTitle = isDefined(action.name) - ? action.name - : 'Code - Serverless Function'; - const handleEditorDidMount = async ( editor: editor.IStandaloneCodeEditor, monaco: Monaco, @@ -252,10 +262,13 @@ export const WorkflowEditActionFormServerlessFunction = ({ return; } - actionOptions?.onActionUpdate({ - ...action, - ...actionUpdate, - }); + actionOptions?.onActionUpdate( + { + ...action, + ...actionUpdate, + }, + false, + ); }; const checkWorkflowUpdatable = async () => { @@ -265,6 +278,10 @@ export const WorkflowEditActionFormServerlessFunction = ({ await getUpdatableWorkflowVersion(workflow); }; + useEffect(() => { + setFunctionInput(action.settings.input.serverlessFunctionInput); + }, [action]); + return ( !loading && ( {renderFields(functionInput)} diff --git a/packages/twenty-server/src/engine/core-modules/serverless/drivers/lambda.driver.ts b/packages/twenty-server/src/engine/core-modules/serverless/drivers/lambda.driver.ts index 5364a30d3..23af22fec 100644 --- a/packages/twenty-server/src/engine/core-modules/serverless/drivers/lambda.driver.ts +++ b/packages/twenty-server/src/engine/core-modules/serverless/drivers/lambda.driver.ts @@ -319,6 +319,7 @@ export class LambdaDriver implements ServerlessDriver { await this.waitFunctionUpdates(functionToExecute.id, 10); const startTime = Date.now(); + const params: InvokeCommandInput = { FunctionName: functionName, Payload: JSON.stringify(payload), diff --git a/packages/twenty-server/src/engine/core-modules/workflow/dtos/update-workflow-version-step-input.dto.ts b/packages/twenty-server/src/engine/core-modules/workflow/dtos/update-workflow-version-step-input.dto.ts index af8fb778f..0ffc5a66c 100644 --- a/packages/twenty-server/src/engine/core-modules/workflow/dtos/update-workflow-version-step-input.dto.ts +++ b/packages/twenty-server/src/engine/core-modules/workflow/dtos/update-workflow-version-step-input.dto.ts @@ -17,4 +17,11 @@ export class UpdateWorkflowVersionStepInput { nullable: false, }) step: WorkflowAction; + + @Field(() => Boolean, { + description: 'Boolean to check if we need to update stepOutput', + nullable: true, + defaultValue: true, + }) + shouldUpdateStepOutput: boolean; } diff --git a/packages/twenty-server/src/engine/core-modules/workflow/resolvers/workflow-version-step.resolver.ts b/packages/twenty-server/src/engine/core-modules/workflow/resolvers/workflow-version-step.resolver.ts index d37407983..1002f87e7 100644 --- a/packages/twenty-server/src/engine/core-modules/workflow/resolvers/workflow-version-step.resolver.ts +++ b/packages/twenty-server/src/engine/core-modules/workflow/resolvers/workflow-version-step.resolver.ts @@ -36,12 +36,18 @@ export class WorkflowVersionStepResolver { @Mutation(() => WorkflowActionDTO) async updateWorkflowVersionStep( @AuthWorkspace() { id: workspaceId }: Workspace, - @Args('input') { step, workflowVersionId }: UpdateWorkflowVersionStepInput, + @Args('input') + { + step, + workflowVersionId, + shouldUpdateStepOutput, + }: UpdateWorkflowVersionStepInput, ): Promise { return this.workflowVersionStepWorkspaceService.updateWorkflowVersionStep({ workspaceId, workflowVersionId, step, + shouldUpdateStepOutput, }); } diff --git a/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts b/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts index 669aeccbd..f8d83794c 100644 --- a/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts +++ b/packages/twenty-server/src/engine/workspace-event-emitter/workspace-event-emitter.ts @@ -45,9 +45,9 @@ export class WorkspaceEventEmitter { }); } - public emitCustomBatchEvent( + public emitCustomBatchEvent( eventName: CustomEventName, - events: object[], + events: T[], workspaceId: string, ) { if (!events.length) { diff --git a/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-version-step.workspace-service.ts b/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-version-step.workspace-service.ts index 2f5511525..6d3d21d82 100644 --- a/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-version-step.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/common/workspace-services/workflow-version-step.workspace-service.ts @@ -245,10 +245,12 @@ export class WorkflowVersionStepWorkspaceService { workspaceId, workflowVersionId, step, + shouldUpdateStepOutput, }: { workspaceId: string; workflowVersionId: string; step: WorkflowAction; + shouldUpdateStepOutput: boolean; }): Promise { const workflowVersionRepository = await this.twentyORMManager.getRepository( @@ -275,10 +277,12 @@ export class WorkflowVersionStepWorkspaceService { ); } - const enrichedNewStep = await this.enrichOutputSchema({ - step, - workspaceId, - }); + const enrichedNewStep = shouldUpdateStepOutput + ? await this.enrichOutputSchema({ + step, + workspaceId, + }) + : step; const updatedSteps = workflowVersion.steps.map((existingStep) => { if (existingStep.id === step.id) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-builder.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-builder.workspace-service.ts index 0b213deb0..fdaec14b0 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-builder.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-builder/workflow-builder.workspace-service.ts @@ -226,6 +226,7 @@ export class WorkflowBuilderWorkspaceService { const inputSchema = codeIntrospectionService.getFunctionInputSchema(sourceCode); + const fakeFunctionInput = codeIntrospectionService.generateInputData(inputSchema); @@ -233,7 +234,7 @@ export class WorkflowBuilderWorkspaceService { await serverlessFunctionService.executeOneServerlessFunction( serverlessFunctionId, workspaceId, - fakeFunctionInput, + Object.values(fakeFunctionInput)?.[0] || {}, serverlessFunctionVersion, ); diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts index af4081f52..cd9421477 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workflow-actions/code/code.workflow-action.ts @@ -35,7 +35,8 @@ export class CodeWorkflowAction implements WorkflowAction { await this.serverlessFunctionService.executeOneServerlessFunction( workflowActionInput.serverlessFunctionId, workspaceId, - workflowActionInput.serverlessFunctionInput, + Object.values(workflowActionInput.serverlessFunctionInput)?.[0] || {}, + workflowActionInput.serverlessFunctionVersion, ); if (result.error) { diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts index bb370fa85..5db63d40c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts @@ -8,6 +8,7 @@ import { WorkflowVersionBatchEvent, WorkflowVersionEventType, } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; +import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; describe('WorkflowStatusesUpdate', () => { let job: WorkflowStatusesUpdateJob; @@ -21,6 +22,10 @@ describe('WorkflowStatusesUpdate', () => { getRepository: jest.fn().mockResolvedValue(mockWorkflowRepository), }; + const mockServerlessFunctionService = { + publishOneServerlessFunction: jest.fn(), + }; + beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ providers: [ @@ -29,6 +34,10 @@ describe('WorkflowStatusesUpdate', () => { provide: TwentyORMManager, useValue: mockTwentyORMManager, }, + { + provide: ServerlessFunctionService, + useValue: mockServerlessFunctionService, + }, ], }).compile(); @@ -94,6 +103,7 @@ describe('WorkflowStatusesUpdate', () => { statusUpdates: [ { workflowId: '1', + workflowVersionId: '1', previousStatus: WorkflowVersionStatus.ACTIVE, newStatus: WorkflowVersionStatus.ACTIVE, }, @@ -108,7 +118,7 @@ describe('WorkflowStatusesUpdate', () => { await job.handle(event); - expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2); expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); }); @@ -119,6 +129,7 @@ describe('WorkflowStatusesUpdate', () => { statusUpdates: [ { workflowId: '1', + workflowVersionId: '1', previousStatus: WorkflowVersionStatus.ACTIVE, newStatus: WorkflowVersionStatus.DRAFT, }, @@ -133,7 +144,7 @@ describe('WorkflowStatusesUpdate', () => { await job.handle(event); - expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2); expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); }); @@ -144,6 +155,7 @@ describe('WorkflowStatusesUpdate', () => { statusUpdates: [ { workflowId: '1', + workflowVersionId: '1', previousStatus: WorkflowVersionStatus.DEACTIVATED, newStatus: WorkflowVersionStatus.ACTIVE, }, @@ -158,7 +170,7 @@ describe('WorkflowStatusesUpdate', () => { await job.handle(event); - expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2); expect(mockWorkflowRepository.update).toHaveBeenCalledWith( { id: '1' }, { statuses: [WorkflowStatus.ACTIVE] }, @@ -172,6 +184,7 @@ describe('WorkflowStatusesUpdate', () => { statusUpdates: [ { workflowId: '1', + workflowVersionId: '1', previousStatus: WorkflowVersionStatus.ACTIVE, newStatus: WorkflowVersionStatus.DEACTIVATED, }, @@ -186,7 +199,7 @@ describe('WorkflowStatusesUpdate', () => { await job.handle(event); - expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2); expect(mockWorkflowRepository.update).toHaveBeenCalledWith( { id: '1' }, { statuses: [WorkflowStatus.DEACTIVATED] }, @@ -200,6 +213,7 @@ describe('WorkflowStatusesUpdate', () => { statusUpdates: [ { workflowId: '1', + workflowVersionId: '1', previousStatus: WorkflowVersionStatus.DRAFT, newStatus: WorkflowVersionStatus.ACTIVE, }, @@ -214,7 +228,7 @@ describe('WorkflowStatusesUpdate', () => { await job.handle(event); - expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); + expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2); expect(mockWorkflowRepository.update).toHaveBeenCalledWith( { id: '1' }, { statuses: [WorkflowStatus.ACTIVE] }, diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts index 4f4ce5886..365b3e16b 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts @@ -4,11 +4,21 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { WorkflowVersionStatus } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; +import { + WorkflowVersionStatus, + WorkflowVersionWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { getStatusCombinationFromArray } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-array.util'; import { getStatusCombinationFromUpdate } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-update.util'; import { getWorkflowStatusesFromCombination } from 'src/modules/workflow/workflow-status/utils/get-statuses-from-combination.util'; +import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; +import { + WorkflowAction, + WorkflowActionType, +} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type'; +import { isDefined } from 'src/utils/is-defined'; +import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; export enum WorkflowVersionEventType { CREATE = 'CREATE', @@ -32,6 +42,7 @@ export type WorkflowVersionBatchCreateEvent = { export type WorkflowVersionStatusUpdate = { workflowId: string; + workflowVersionId: string; previousStatus: WorkflowVersionStatus; newStatus: WorkflowVersionStatus; }; @@ -48,7 +59,10 @@ export type WorkflowVersionBatchDelete = { @Processor({ queueName: MessageQueue.workflowQueue, scope: Scope.REQUEST }) export class WorkflowStatusesUpdateJob { - constructor(private readonly twentyORMManager: TwentyORMManager) {} + constructor( + private readonly twentyORMManager: TwentyORMManager, + private readonly serverlessFunctionService: ServerlessFunctionService, + ) {} @Process(WorkflowStatusesUpdateJob.name) async handle(event: WorkflowVersionBatchEvent): Promise { @@ -63,7 +77,10 @@ export class WorkflowStatusesUpdateJob { case WorkflowVersionEventType.STATUS_UPDATE: await Promise.all( event.statusUpdates.map((statusUpdate) => - this.handleWorkflowVersionStatusUpdated(statusUpdate), + this.handleWorkflowVersionStatusUpdated( + statusUpdate, + event.workspaceId, + ), ), ); break; @@ -119,20 +136,92 @@ export class WorkflowStatusesUpdateJob { ); } + private async handlePublishServerlessFunction({ + statusUpdate, + workspaceId, + workflowVersion, + workflowVersionRepository, + }: { + statusUpdate: WorkflowVersionStatusUpdate; + workspaceId: string; + workflowVersion: WorkflowVersionWorkspaceEntity; + workflowVersionRepository: WorkspaceRepository; + }) { + const shouldComputeNewSteps = + statusUpdate.newStatus === WorkflowVersionStatus.ACTIVE && + isDefined(workflowVersion.steps) && + workflowVersion.steps.filter( + (step) => step.type === WorkflowActionType.CODE, + ).length > 0; + + if (shouldComputeNewSteps) { + const newSteps: WorkflowAction[] = []; + + for (const step of workflowVersion.steps || []) { + const newStep = { ...step }; + + if (step.type === WorkflowActionType.CODE) { + let serverlessFunction; + + try { + serverlessFunction = + await this.serverlessFunctionService.publishOneServerlessFunction( + step.settings.input.serverlessFunctionId, + workspaceId, + ); + } catch (e) { + serverlessFunction = null; + } + + if (serverlessFunction) { + const newStepSettings = { ...step.settings }; + + newStepSettings.input.serverlessFunctionVersion = + serverlessFunction.latestVersion; + + newStep.settings = newStepSettings; + } + } + newSteps.push(newStep); + } + + await workflowVersionRepository.update(statusUpdate.workflowVersionId, { + steps: newSteps, + }); + } + } + private async handleWorkflowVersionStatusUpdated( statusUpdate: WorkflowVersionStatusUpdate, + workspaceId: string, ): Promise { const workflowRepository = await this.twentyORMManager.getRepository( 'workflow', ); + const workflowVersionRepository = + await this.twentyORMManager.getRepository( + 'workflowVersion', + ); + const workflow = await workflowRepository.findOneOrFail({ where: { id: statusUpdate.workflowId, }, }); + const workflowVersion = await workflowVersionRepository.findOneOrFail({ + where: { id: statusUpdate.workflowVersionId }, + }); + + await this.handlePublishServerlessFunction({ + workflowVersion, + workflowVersionRepository, + workspaceId, + statusUpdate, + }); + const currentWorkflowStatusCombination = getStatusCombinationFromArray( workflow.statuses || [], ); diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts index 57c69f530..f7a4a841e 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts @@ -2,8 +2,10 @@ import { Module } from '@nestjs/common'; import { WorkflowStatusesUpdateJob } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; import { WorkflowVersionStatusListener } from 'src/modules/workflow/workflow-status/listeners/workflow-version-status.listener'; +import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module'; @Module({ + imports: [ServerlessFunctionModule], providers: [WorkflowStatusesUpdateJob, WorkflowVersionStatusListener], }) export class WorkflowStatusModule {} diff --git a/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts index 7f19f639c..41ef133b9 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service.ts @@ -28,6 +28,7 @@ import { assertNever } from 'src/utils/assert'; import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { WORKFLOW_VERSION_STATUS_UPDATED } from 'src/modules/workflow/workflow-status/constants/workflow-version-status-updated.constants'; import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; +import { WorkflowVersionStatusUpdate } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; @Injectable() export class WorkflowTriggerWorkspaceService { @@ -387,11 +388,12 @@ export class WorkflowTriggerWorkspaceService { workspaceId, }); - this.workspaceEventEmitter.emitCustomBatchEvent( + this.workspaceEventEmitter.emitCustomBatchEvent( WORKFLOW_VERSION_STATUS_UPDATED, [ { workflowId: workflowVersion.workflowId, + workflowVersionId: workflowVersion.id, previousStatus: workflowVersion.status, newStatus, },