6181 workflows create a custom code executor (#6235)

Closes #6181

## Testing
- download Altair graphql dev tool https://altairgraphql.dev/#download
- create a file locally `test.ts` containing:
```
export const handler = async (event: object, context: object) => {
  return { test: 'toto', data: event['data'] };
}
```
- play those requests in Altair:
mutation UpsertFunction($file: Upload!) {
  upsertFunction(name: "toto", file: $file)
}

mutation ExecFunction {
  executeFunction(name:"toto", payload: {data: "titi"})
}
- it will run the local driver, add those env variable to test with
lambda driver
```
CUSTOM_CODE_ENGINE_DRIVER_TYPE=lambda
LAMBDA_REGION=eu-west-2
LAMBDA_ROLE=<ASK_ME>
```
This commit is contained in:
martmull
2024-07-17 17:53:01 +02:00
committed by GitHub
parent e6f6069fe7
commit 47ddc7be83
37 changed files with 2382 additions and 16 deletions

View File

@ -20,6 +20,7 @@ import { NodeEnvironment } from 'src/engine/integrations/environment/interfaces/
import { LLMChatModelDriver } from 'src/engine/integrations/llm-chat-model/interfaces/llm-chat-model.interface';
import { LLMTracingDriver } from 'src/engine/integrations/llm-tracing/interfaces/llm-tracing.interface';
import { ServerlessDriverType } from 'src/engine/integrations/serverless/serverless.interface';
import { assert } from 'src/utils/assert';
import { CastToStringArray } from 'src/engine/integrations/environment/decorators/cast-to-string-array.decorator';
import { ExceptionHandlerDriver } from 'src/engine/integrations/exception-handler/interfaces';
@ -204,6 +205,30 @@ export class EnvironmentVariables {
@ValidateIf((env) => env.AUTH_GOOGLE_ENABLED)
AUTH_GOOGLE_CALLBACK_URL: string;
// Custom Code Engine
@IsEnum(ServerlessDriverType)
@IsOptional()
SERVERLESS_TYPE: ServerlessDriverType = ServerlessDriverType.Local;
@ValidateIf((env) => env.SERVERLESS_TYPE === ServerlessDriverType.Lambda)
@IsAWSRegion()
SERVERLESS_LAMBDA_REGION: AwsRegion;
@ValidateIf((env) => env.SERVERLESS_TYPE === ServerlessDriverType.Lambda)
@IsString()
@IsOptional()
SERVERLESS_LAMBDA_ROLE: string;
@ValidateIf((env) => env.SERVERLESS_TYPE === ServerlessDriverType.Lambda)
@IsString()
@IsOptional()
SERVERLESS_LAMBDA_ACCESS_KEY_ID: string;
@ValidateIf((env) => env.SERVERLESS_TYPE === ServerlessDriverType.Lambda)
@IsString()
@IsOptional()
SERVERLESS_LAMBDA_SECRET_ACCESS_KEY: string;
// Storage
@IsEnum(StorageDriverType)
@IsOptional()

View File

@ -0,0 +1,11 @@
import { Readable } from 'stream';
export const readFileContent = async (stream: Readable): Promise<string> => {
const chunks: Buffer[] = [];
for await (const chunk of stream) {
chunks.push(Buffer.from(chunk));
}
return Buffer.concat(chunks).toString('utf8');
};

View File

@ -16,6 +16,10 @@ import { LLMChatModelModule } from 'src/engine/integrations/llm-chat-model/llm-c
import { llmChatModelModuleFactory } from 'src/engine/integrations/llm-chat-model/llm-chat-model.module-factory';
import { LLMTracingModule } from 'src/engine/integrations/llm-tracing/llm-tracing.module';
import { llmTracingModuleFactory } from 'src/engine/integrations/llm-tracing/llm-tracing.module-factory';
import { ServerlessModule } from 'src/engine/integrations/serverless/serverless.module';
import { serverlessModuleFactory } from 'src/engine/integrations/serverless/serverless-module.factory';
import { FileStorageService } from 'src/engine/integrations/file-storage/file-storage.service';
import { BuildDirectoryManagerService } from 'src/engine/integrations/serverless/drivers/services/build-directory-manager.service';
import { EnvironmentModule } from './environment/environment.module';
import { EnvironmentService } from './environment/environment.service';
@ -62,6 +66,14 @@ import { MessageQueueModule } from './message-queue/message-queue.module';
useFactory: llmTracingModuleFactory,
inject: [EnvironmentService],
}),
ServerlessModule.forRootAsync({
useFactory: serverlessModuleFactory,
inject: [
EnvironmentService,
FileStorageService,
BuildDirectoryManagerService,
],
}),
],
exports: [],
providers: [],

View File

@ -0,0 +1,33 @@
import { join } from 'path';
import { FileFolder } from 'src/engine/core-modules/file/interfaces/file-folder.interface';
import { ServerlessFunctionEntity } from 'src/engine/metadata-modules/serverless-function/serverless-function.entity';
import { SOURCE_FILE_NAME } from 'src/engine/integrations/serverless/drivers/constants/source-file-name';
import { readFileContent } from 'src/engine/integrations/file-storage/utils/read-file-content';
import { compileTypescript } from 'src/engine/integrations/serverless/drivers/utils/compile-typescript';
import { FileStorageService } from 'src/engine/integrations/file-storage/file-storage.service';
export class BaseServerlessDriver {
getFolderPath(serverlessFunction: ServerlessFunctionEntity) {
return join(
FileFolder.ServerlessFunction,
serverlessFunction.workspaceId,
serverlessFunction.id,
);
}
async getCompiledCode(
serverlessFunction: ServerlessFunctionEntity,
fileStorageService: FileStorageService,
) {
const folderPath = this.getFolderPath(serverlessFunction);
const fileStream = await fileStorageService.read({
folderPath,
filename: SOURCE_FILE_NAME,
});
const typescriptCode = await readFileContent(fileStream);
return compileTypescript(typescriptCode);
}
}

View File

@ -0,0 +1 @@
export const BUILD_FILE_NAME = 'build.js';

View File

@ -0,0 +1 @@
export const SOURCE_FILE_NAME = 'source.ts';

View File

@ -0,0 +1,9 @@
import { ServerlessFunctionEntity } from 'src/engine/metadata-modules/serverless-function/serverless-function.entity';
export interface ServerlessDriver {
build(serverlessFunction: ServerlessFunctionEntity): Promise<void>;
execute(
serverlessFunction: ServerlessFunctionEntity,
payload: object | undefined,
): Promise<object>;
}

View File

@ -0,0 +1,100 @@
import fs from 'fs';
import {
CreateFunctionCommand,
Lambda,
LambdaClientConfig,
InvokeCommand,
} from '@aws-sdk/client-lambda';
import { CreateFunctionCommandInput } from '@aws-sdk/client-lambda/dist-types/commands/CreateFunctionCommand';
import { ServerlessDriver } from 'src/engine/integrations/serverless/drivers/interfaces/serverless-driver.interface';
import { createZipFile } from 'src/engine/integrations/serverless/drivers/utils/create-zip-file';
import { ServerlessFunctionEntity } from 'src/engine/metadata-modules/serverless-function/serverless-function.entity';
import { FileStorageService } from 'src/engine/integrations/file-storage/file-storage.service';
import { BaseServerlessDriver } from 'src/engine/integrations/serverless/drivers/base-serverless.driver';
import { BuildDirectoryManagerService } from 'src/engine/integrations/serverless/drivers/services/build-directory-manager.service';
export interface LambdaDriverOptions extends LambdaClientConfig {
fileStorageService: FileStorageService;
buildDirectoryManagerService: BuildDirectoryManagerService;
region: string;
role: string;
}
export class LambdaDriver
extends BaseServerlessDriver
implements ServerlessDriver
{
private readonly lambdaClient: Lambda;
private readonly lambdaRole: string;
private readonly fileStorageService: FileStorageService;
private readonly buildDirectoryManagerService: BuildDirectoryManagerService;
constructor(options: LambdaDriverOptions) {
super();
const { region, role, ...lambdaOptions } = options;
this.lambdaClient = new Lambda({ ...lambdaOptions, region });
this.lambdaRole = role;
this.fileStorageService = options.fileStorageService;
this.buildDirectoryManagerService = options.buildDirectoryManagerService;
}
async build(serverlessFunction: ServerlessFunctionEntity) {
const javascriptCode = await this.getCompiledCode(
serverlessFunction,
this.fileStorageService,
);
const {
sourceTemporaryDir,
lambdaZipPath,
javascriptFilePath,
lambdaHandler,
} = await this.buildDirectoryManagerService.init();
await fs.promises.writeFile(javascriptFilePath, javascriptCode);
await createZipFile(sourceTemporaryDir, lambdaZipPath);
const params: CreateFunctionCommandInput = {
Code: {
ZipFile: await fs.promises.readFile(lambdaZipPath),
},
FunctionName: serverlessFunction.id,
Handler: lambdaHandler,
Role: this.lambdaRole,
Runtime: 'nodejs18.x',
Description: 'Lambda function to run user script',
Timeout: 900,
};
const command = new CreateFunctionCommand(params);
await this.lambdaClient.send(command);
await this.buildDirectoryManagerService.clean();
}
async execute(
functionToExecute: ServerlessFunctionEntity,
payload: object | undefined = undefined,
): Promise<object> {
const params = {
FunctionName: functionToExecute.id,
Payload: JSON.stringify(payload),
};
const command = new InvokeCommand(params);
const result = await this.lambdaClient.send(command);
if (!result.Payload) {
return {};
}
return JSON.parse(result.Payload.transformToString());
}
}

View File

@ -0,0 +1,94 @@
import { join } from 'path';
import { tmpdir } from 'os';
import { promises as fs } from 'fs';
import { fork } from 'child_process';
import { v4 } from 'uuid';
import { ServerlessDriver } from 'src/engine/integrations/serverless/drivers/interfaces/serverless-driver.interface';
import { FileStorageService } from 'src/engine/integrations/file-storage/file-storage.service';
import { readFileContent } from 'src/engine/integrations/file-storage/utils/read-file-content';
import { ServerlessFunctionEntity } from 'src/engine/metadata-modules/serverless-function/serverless-function.entity';
import { BUILD_FILE_NAME } from 'src/engine/integrations/serverless/drivers/constants/build-file-name';
import { BaseServerlessDriver } from 'src/engine/integrations/serverless/drivers/base-serverless.driver';
export interface LocalDriverOptions {
fileStorageService: FileStorageService;
}
export class LocalDriver
extends BaseServerlessDriver
implements ServerlessDriver
{
private readonly fileStorageService: FileStorageService;
constructor(options: LocalDriverOptions) {
super();
this.fileStorageService = options.fileStorageService;
}
async build(serverlessFunction: ServerlessFunctionEntity) {
const javascriptCode = await this.getCompiledCode(
serverlessFunction,
this.fileStorageService,
);
await this.fileStorageService.write({
file: javascriptCode,
name: BUILD_FILE_NAME,
mimeType: undefined,
folder: this.getFolderPath(serverlessFunction),
});
}
async execute(
serverlessFunction: ServerlessFunctionEntity,
payload: object | undefined = undefined,
): Promise<object> {
const fileStream = await this.fileStorageService.read({
folderPath: this.getFolderPath(serverlessFunction),
filename: BUILD_FILE_NAME,
});
const fileContent = await readFileContent(fileStream);
const tmpFilePath = join(tmpdir(), `${v4()}.js`);
const modifiedContent = `
process.on('message', async (message) => {
const { event, context } = message;
const result = await handler(event, context);
process.send(result);
});
${fileContent}
`;
await fs.writeFile(tmpFilePath, modifiedContent);
return await new Promise((resolve, reject) => {
const child = fork(tmpFilePath);
child.on('message', (message: object) => {
resolve(message);
child.kill();
fs.unlink(tmpFilePath);
});
child.on('error', (error) => {
reject(error);
child.kill();
fs.unlink(tmpFilePath);
});
child.on('exit', (code) => {
if (code && code !== 0) {
reject(new Error(`Child process exited with code ${code}`));
fs.unlink(tmpFilePath);
}
});
child.send({ event: payload });
});
}
}

View File

@ -0,0 +1,48 @@
import { Injectable } from '@nestjs/common';
import { join } from 'path';
import { tmpdir } from 'os';
import fs from 'fs';
import fsExtra from 'fs-extra';
import { v4 } from 'uuid';
const TEMPORARY_LAMBDA_FOLDER = 'twenty-build-lambda-temp-folder';
const TEMPORARY_LAMBDA_SOURCE_FOLDER = 'src';
const LAMBDA_ZIP_FILE_NAME = 'lambda.zip';
const LAMBDA_ENTRY_FILE_NAME = 'index.js';
@Injectable()
export class BuildDirectoryManagerService {
private temporaryDir = join(tmpdir(), `${TEMPORARY_LAMBDA_FOLDER}_${v4()}`);
private lambdaHandler = `${LAMBDA_ENTRY_FILE_NAME.split('.')[0]}.handler`;
async init() {
const sourceTemporaryDir = join(
this.temporaryDir,
TEMPORARY_LAMBDA_SOURCE_FOLDER,
);
const lambdaZipPath = join(this.temporaryDir, LAMBDA_ZIP_FILE_NAME);
const javascriptFilePath = join(sourceTemporaryDir, LAMBDA_ENTRY_FILE_NAME);
if (!fs.existsSync(this.temporaryDir)) {
await fs.promises.mkdir(this.temporaryDir);
await fs.promises.mkdir(sourceTemporaryDir);
} else {
await fsExtra.emptyDir(this.temporaryDir);
await fs.promises.mkdir(sourceTemporaryDir);
}
return {
sourceTemporaryDir,
lambdaZipPath,
javascriptFilePath,
lambdaHandler: this.lambdaHandler,
};
}
async clean() {
await fsExtra.emptyDir(this.temporaryDir);
await fs.promises.rmdir(this.temporaryDir);
}
}

View File

@ -0,0 +1,19 @@
import ts from 'typescript';
export const compileTypescript = (typescriptCode: string): string => {
const options: ts.CompilerOptions = {
module: ts.ModuleKind.CommonJS,
target: ts.ScriptTarget.ES2017,
moduleResolution: ts.ModuleResolutionKind.Node10,
esModuleInterop: true,
resolveJsonModule: true,
allowSyntheticDefaultImports: true,
types: ['node'],
};
const result = ts.transpileModule(typescriptCode, {
compilerOptions: options,
});
return result.outputText;
};

View File

@ -0,0 +1,21 @@
import fs from 'fs';
import { pipeline } from 'stream/promises';
import archiver from 'archiver';
export const createZipFile = async (
sourceDir: string,
outPath: string,
): Promise<void> => {
const output = fs.createWriteStream(outPath);
const archive = archiver('zip', {
zlib: { level: 9 }, // Compression level
});
const p = pipeline(archive, output);
archive.directory(sourceDir, false);
archive.finalize();
return p;
};

View File

@ -0,0 +1,59 @@
import { fromNodeProviderChain } from '@aws-sdk/credential-providers';
import {
ServerlessModuleOptions,
ServerlessDriverType,
} from 'src/engine/integrations/serverless/serverless.interface';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { FileStorageService } from 'src/engine/integrations/file-storage/file-storage.service';
import { BuildDirectoryManagerService } from 'src/engine/integrations/serverless/drivers/services/build-directory-manager.service';
export const serverlessModuleFactory = async (
environmentService: EnvironmentService,
fileStorageService: FileStorageService,
buildDirectoryManagerService: BuildDirectoryManagerService,
): Promise<ServerlessModuleOptions> => {
const driverType = environmentService.get('SERVERLESS_TYPE');
const options = { fileStorageService };
switch (driverType) {
case ServerlessDriverType.Local: {
return {
type: ServerlessDriverType.Local,
options,
};
}
case ServerlessDriverType.Lambda: {
const region = environmentService.get('SERVERLESS_LAMBDA_REGION');
const accessKeyId = environmentService.get(
'SERVERLESS_LAMBDA_ACCESS_KEY_ID',
);
const secretAccessKey = environmentService.get(
'SERVERLESS_LAMBDA_SECRET_ACCESS_KEY',
);
const role = environmentService.get('SERVERLESS_LAMBDA_ROLE');
return {
type: ServerlessDriverType.Lambda,
options: {
...options,
buildDirectoryManagerService,
credentials: accessKeyId
? {
accessKeyId,
secretAccessKey,
}
: fromNodeProviderChain({
clientConfig: { region },
}),
region: region ?? '',
role: role ?? '',
},
};
}
default:
throw new Error(
`Invalid serverless driver type (${driverType}), check your .env file`,
);
}
};

View File

@ -0,0 +1 @@
export const SERVERLESS_DRIVER = Symbol('SERVERLESS_DRIVER');

View File

@ -0,0 +1,30 @@
import { FactoryProvider, ModuleMetadata } from '@nestjs/common';
import { LocalDriverOptions } from 'src/engine/integrations/serverless/drivers/local.driver';
import { LambdaDriverOptions } from 'src/engine/integrations/serverless/drivers/lambda.driver';
export enum ServerlessDriverType {
Lambda = 'lambda',
Local = 'local',
}
export interface LocalDriverFactoryOptions {
type: ServerlessDriverType.Local;
options: LocalDriverOptions;
}
export interface LambdaDriverFactoryOptions {
type: ServerlessDriverType.Lambda;
options: LambdaDriverOptions;
}
export type ServerlessModuleOptions =
| LocalDriverFactoryOptions
| LambdaDriverFactoryOptions;
export type ServerlessModuleAsyncOptions = {
useFactory: (
...args: any[]
) => ServerlessModuleOptions | Promise<ServerlessModuleOptions>;
} & Pick<ModuleMetadata, 'imports'> &
Pick<FactoryProvider, 'inject'>;

View File

@ -0,0 +1,35 @@
import { DynamicModule, Global } from '@nestjs/common';
import {
ServerlessDriverType,
ServerlessModuleAsyncOptions,
} from 'src/engine/integrations/serverless/serverless.interface';
import { ServerlessService } from 'src/engine/integrations/serverless/serverless.service';
import { SERVERLESS_DRIVER } from 'src/engine/integrations/serverless/serverless.constants';
import { LocalDriver } from 'src/engine/integrations/serverless/drivers/local.driver';
import { LambdaDriver } from 'src/engine/integrations/serverless/drivers/lambda.driver';
import { BuildDirectoryManagerService } from 'src/engine/integrations/serverless/drivers/services/build-directory-manager.service';
@Global()
export class ServerlessModule {
static forRootAsync(options: ServerlessModuleAsyncOptions): DynamicModule {
const provider = {
provide: SERVERLESS_DRIVER,
useFactory: async (...args: any[]) => {
const config = await options.useFactory(...args);
return config?.type === ServerlessDriverType.Local
? new LocalDriver(config.options)
: new LambdaDriver(config.options);
},
inject: options.inject || [],
};
return {
module: ServerlessModule,
imports: options.imports || [],
providers: [ServerlessService, BuildDirectoryManagerService, provider],
exports: [ServerlessService],
};
}
}

View File

@ -0,0 +1,22 @@
import { Inject, Injectable } from '@nestjs/common';
import { ServerlessDriver } from 'src/engine/integrations/serverless/drivers/interfaces/serverless-driver.interface';
import { SERVERLESS_DRIVER } from 'src/engine/integrations/serverless/serverless.constants';
import { ServerlessFunctionEntity } from 'src/engine/metadata-modules/serverless-function/serverless-function.entity';
@Injectable()
export class ServerlessService implements ServerlessDriver {
constructor(@Inject(SERVERLESS_DRIVER) private driver: ServerlessDriver) {}
async build(serverlessFunction: ServerlessFunctionEntity): Promise<void> {
return this.driver.build(serverlessFunction);
}
async execute(
serverlessFunction: ServerlessFunctionEntity,
payload: object | undefined = undefined,
) {
return this.driver.execute(serverlessFunction, payload);
}
}