From 2b3b073570809267a8be9c2651c2516226981ae8 Mon Sep 17 00:00:00 2001 From: martmull Date: Fri, 6 Dec 2024 11:13:12 +0100 Subject: [PATCH] 8725 workflow avoid serverless function autosave errors (#8916) See issue #8725 - Build function asynchronously using a job - prevent useless builds - run promises simultaneously Todo: - fix outputSchema computing --- .../hooks/useUpdateOneServerlessFunction.ts | 2 - .../message-queue/drivers/bullmq.driver.ts | 19 ++++++- .../message-queue/drivers/pg-boss.driver.ts | 1 + .../serverless/drivers/lambda.driver.ts | 12 +++-- .../utils/get-last-layer-dependencies.ts | 12 ++--- .../jobs/build-serverless-function.job.ts | 52 +++++++++++++++++++ .../jobs/delete-serverless-function.job.ts | 33 ------------ .../serverless-function.module.ts | 7 ++- .../serverless-function.service.ts | 42 ++++++++++++++- 9 files changed, 129 insertions(+), 51 deletions(-) create mode 100644 packages/twenty-server/src/engine/metadata-modules/serverless-function/jobs/build-serverless-function.job.ts delete mode 100644 packages/twenty-server/src/engine/metadata-modules/serverless-function/jobs/delete-serverless-function.job.ts diff --git a/packages/twenty-front/src/modules/settings/serverless-functions/hooks/useUpdateOneServerlessFunction.ts b/packages/twenty-front/src/modules/settings/serverless-functions/hooks/useUpdateOneServerlessFunction.ts index f13936083..3de1b4bac 100644 --- a/packages/twenty-front/src/modules/settings/serverless-functions/hooks/useUpdateOneServerlessFunction.ts +++ b/packages/twenty-front/src/modules/settings/serverless-functions/hooks/useUpdateOneServerlessFunction.ts @@ -1,6 +1,5 @@ import { useApolloMetadataClient } from '@/object-metadata/hooks/useApolloMetadataClient'; import { UPDATE_ONE_SERVERLESS_FUNCTION } from '@/settings/serverless-functions/graphql/mutations/updateOneServerlessFunction'; -import { FIND_MANY_SERVERLESS_FUNCTIONS } from '@/settings/serverless-functions/graphql/queries/findManyServerlessFunctions'; import { FIND_ONE_SERVERLESS_FUNCTION_SOURCE_CODE } from '@/settings/serverless-functions/graphql/queries/findOneServerlessFunctionSourceCode'; import { useMutation } from '@apollo/client'; import { getOperationName } from '@apollo/client/utilities'; @@ -28,7 +27,6 @@ export const useUpdateOneServerlessFunction = () => { }, awaitRefetchQueries: true, refetchQueries: [ - getOperationName(FIND_MANY_SERVERLESS_FUNCTIONS) ?? '', getOperationName(FIND_ONE_SERVERLESS_FUNCTION_SOURCE_CODE) ?? '', ], }); diff --git a/packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts b/packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts index 8525330b5..565ff4828 100644 --- a/packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts +++ b/packages/twenty-server/src/engine/core-modules/message-queue/drivers/bullmq.driver.ts @@ -2,6 +2,7 @@ import { OnModuleDestroy } from '@nestjs/common'; import { JobsOptions, Queue, QueueOptions, Worker } from 'bullmq'; import omitBy from 'lodash.omitby'; +import { v4 } from 'uuid'; import { QueueCronJobOptions, @@ -15,6 +16,8 @@ import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queu export type BullMQDriverOptions = QueueOptions; +const V4_LENGTH = 36; + export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy { private queueMap: Record = {} as Record< MessageQueue, @@ -107,8 +110,22 @@ export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy { `Queue ${queueName} is not registered, make sure you have added it as a queue provider`, ); } + + // This ensures only one waiting job can be queued for a specific option.id + if (options?.id) { + const waitingJobs = await this.queueMap[queueName].getJobs(['waiting']); + + const isJobAlreadyWaiting = waitingJobs.some( + (job) => job.id?.slice(0, -(V4_LENGTH + 1)) === options.id, + ); + + if (isJobAlreadyWaiting) { + return; + } + } + const queueOptions: JobsOptions = { - jobId: options?.id, + jobId: options?.id ? `${options.id}-${v4()}` : undefined, // We add V4() to id to make sure ids are uniques so we can add a waiting job when a job related with the same option.id is running priority: options?.priority, attempts: 1 + (options?.retryLimit || 0), removeOnComplete: 100, diff --git a/packages/twenty-server/src/engine/core-modules/message-queue/drivers/pg-boss.driver.ts b/packages/twenty-server/src/engine/core-modules/message-queue/drivers/pg-boss.driver.ts index 341fbc348..1c08c030a 100644 --- a/packages/twenty-server/src/engine/core-modules/message-queue/drivers/pg-boss.driver.ts +++ b/packages/twenty-server/src/engine/core-modules/message-queue/drivers/pg-boss.driver.ts @@ -98,6 +98,7 @@ export class PgBossDriver ? { ...options, singletonKey: options?.id, + useSingletonQueue: true, // When used with singletonKey, ensures only one job can be queued. See https://logsnag.com/blog/deep-dive-into-background-jobs-with-pg-boss-and-typescript } : {}, ); diff --git a/packages/twenty-server/src/engine/core-modules/serverless/drivers/lambda.driver.ts b/packages/twenty-server/src/engine/core-modules/serverless/drivers/lambda.driver.ts index 23af22fec..3823999a5 100644 --- a/packages/twenty-server/src/engine/core-modules/serverless/drivers/lambda.driver.ts +++ b/packages/twenty-server/src/engine/core-modules/serverless/drivers/lambda.driver.ts @@ -54,6 +54,8 @@ import { compileTypescript } from 'src/engine/core-modules/serverless/drivers/ut import { ENV_FILE_NAME } from 'src/engine/core-modules/serverless/drivers/constants/env-file-name'; import { OUTDIR_FOLDER } from 'src/engine/core-modules/serverless/drivers/constants/outdir-folder'; +const UPDATE_FUNCTION_DURATION_TIMEOUT_IN_SECONDS = 30; + export interface LambdaDriverOptions extends LambdaClientConfig { fileStorageService: FileStorageService; region: string; @@ -75,7 +77,7 @@ export class LambdaDriver implements ServerlessDriver { private async waitFunctionUpdates( serverlessFunctionId: string, - maxWaitTime: number, + maxWaitTime: number = UPDATE_FUNCTION_DURATION_TIMEOUT_IN_SECONDS, ) { const waitParams = { FunctionName: serverlessFunctionId }; @@ -263,12 +265,12 @@ export class LambdaDriver implements ServerlessDriver { updateConfigurationParams, ); - await this.waitFunctionUpdates(serverlessFunction.id, 10); + await this.waitFunctionUpdates(serverlessFunction.id); await this.lambdaClient.send(updateConfigurationCommand); } - await this.waitFunctionUpdates(serverlessFunction.id, 10); + await this.waitFunctionUpdates(serverlessFunction.id); } async publish(serverlessFunction: ServerlessFunctionEntity) { @@ -316,7 +318,9 @@ export class LambdaDriver implements ServerlessDriver { ? functionToExecute.id : `${functionToExecute.id}:${computedVersion}`; - await this.waitFunctionUpdates(functionToExecute.id, 10); + if (version === 'draft') { + await this.waitFunctionUpdates(functionToExecute.id); + } const startTime = Date.now(); diff --git a/packages/twenty-server/src/engine/core-modules/serverless/drivers/utils/get-last-layer-dependencies.ts b/packages/twenty-server/src/engine/core-modules/serverless/drivers/utils/get-last-layer-dependencies.ts index 35a72d2fc..bfd6a3503 100644 --- a/packages/twenty-server/src/engine/core-modules/serverless/drivers/utils/get-last-layer-dependencies.ts +++ b/packages/twenty-server/src/engine/core-modules/serverless/drivers/utils/get-last-layer-dependencies.ts @@ -12,14 +12,10 @@ export const getLayerDependencies = async ( layerVersion: number | 'latest', ): Promise => { const lastVersionLayerDirName = getLayerDependenciesDirName(layerVersion); - const packageJson = await fs.readFile( - join(lastVersionLayerDirName, 'package.json'), - 'utf8', - ); - const yarnLock = await fs.readFile( - join(lastVersionLayerDirName, 'yarn.lock'), - 'utf8', - ); + const [packageJson, yarnLock] = await Promise.all([ + fs.readFile(join(lastVersionLayerDirName, 'package.json'), 'utf8'), + fs.readFile(join(lastVersionLayerDirName, 'yarn.lock'), 'utf8'), + ]); return { packageJson: JSON.parse(packageJson), yarnLock }; }; diff --git a/packages/twenty-server/src/engine/metadata-modules/serverless-function/jobs/build-serverless-function.job.ts b/packages/twenty-server/src/engine/metadata-modules/serverless-function/jobs/build-serverless-function.job.ts new file mode 100644 index 000000000..1522a3371 --- /dev/null +++ b/packages/twenty-server/src/engine/metadata-modules/serverless-function/jobs/build-serverless-function.job.ts @@ -0,0 +1,52 @@ +import { Scope } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; + +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { ServerlessService } from 'src/engine/core-modules/serverless/serverless.service'; +import { ServerlessFunctionEntity } from 'src/engine/metadata-modules/serverless-function/serverless-function.entity'; +import { isDefined } from 'src/utils/is-defined'; + +export type BuildServerlessFunctionBatchEvent = { + serverlessFunctions: { + serverlessFunctionId: string; + serverlessFunctionVersion: string; + }[]; + workspaceId: string; +}; + +@Processor({ + queueName: MessageQueue.serverlessFunctionQueue, + scope: Scope.REQUEST, +}) +export class BuildServerlessFunctionJob { + constructor( + @InjectRepository(ServerlessFunctionEntity, 'metadata') + private readonly serverlessFunctionRepository: Repository, + private readonly serverlessService: ServerlessService, + ) {} + + @Process(BuildServerlessFunctionJob.name) + async handle(batchEvent: BuildServerlessFunctionBatchEvent): Promise { + for (const { + serverlessFunctionId, + serverlessFunctionVersion, + } of batchEvent.serverlessFunctions) { + const serverlessFunction = + await this.serverlessFunctionRepository.findOneBy({ + id: serverlessFunctionId, + workspaceId: batchEvent.workspaceId, + }); + + if (isDefined(serverlessFunction)) { + await this.serverlessService.build( + serverlessFunction, + serverlessFunctionVersion, + ); + } + } + } +} diff --git a/packages/twenty-server/src/engine/metadata-modules/serverless-function/jobs/delete-serverless-function.job.ts b/packages/twenty-server/src/engine/metadata-modules/serverless-function/jobs/delete-serverless-function.job.ts deleted file mode 100644 index 2edca6105..000000000 --- a/packages/twenty-server/src/engine/metadata-modules/serverless-function/jobs/delete-serverless-function.job.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { Scope } from '@nestjs/common'; - -import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; -import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; -import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; -import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; - -export type DeleteServerlessFunctionBatchEvent = { - ids: string[]; - workspaceId: string; -}; - -@Processor({ - queueName: MessageQueue.serverlessFunctionQueue, - scope: Scope.REQUEST, -}) -export class DeleteServerlessFunctionJob { - constructor( - private readonly serverlessFunctionService: ServerlessFunctionService, - ) {} - - @Process(DeleteServerlessFunctionJob.name) - async handle(batchEvent: DeleteServerlessFunctionBatchEvent): Promise { - await Promise.all( - batchEvent.ids.map((id) => - this.serverlessFunctionService.deleteOneServerlessFunction( - id, - batchEvent.workspaceId, - ), - ), - ); - } -} diff --git a/packages/twenty-server/src/engine/metadata-modules/serverless-function/serverless-function.module.ts b/packages/twenty-server/src/engine/metadata-modules/serverless-function/serverless-function.module.ts index d289f9c08..01a3d1f80 100644 --- a/packages/twenty-server/src/engine/metadata-modules/serverless-function/serverless-function.module.ts +++ b/packages/twenty-server/src/engine/metadata-modules/serverless-function/serverless-function.module.ts @@ -11,6 +11,7 @@ import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.mod import { ServerlessFunctionEntity } from 'src/engine/metadata-modules/serverless-function/serverless-function.entity'; import { ServerlessFunctionResolver } from 'src/engine/metadata-modules/serverless-function/serverless-function.resolver'; import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; +import { BuildServerlessFunctionJob } from 'src/engine/metadata-modules/serverless-function/jobs/build-serverless-function.job'; @Module({ imports: [ @@ -21,7 +22,11 @@ import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverles ThrottlerModule, AnalyticsModule, ], - providers: [ServerlessFunctionService, ServerlessFunctionResolver], + providers: [ + ServerlessFunctionService, + ServerlessFunctionResolver, + BuildServerlessFunctionJob, + ], exports: [ServerlessFunctionService], }) export class ServerlessFunctionModule {} diff --git a/packages/twenty-server/src/engine/metadata-modules/serverless-function/serverless-function.service.ts b/packages/twenty-server/src/engine/metadata-modules/serverless-function/serverless-function.service.ts index c5beda571..016f1100e 100644 --- a/packages/twenty-server/src/engine/metadata-modules/serverless-function/serverless-function.service.ts +++ b/packages/twenty-server/src/engine/metadata-modules/serverless-function/serverless-function.service.ts @@ -32,6 +32,13 @@ import { } from 'src/engine/metadata-modules/serverless-function/serverless-function.exception'; import { isDefined } from 'src/utils/is-defined'; import { getLayerDependencies } from 'src/engine/core-modules/serverless/drivers/utils/get-last-layer-dependencies'; +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { + BuildServerlessFunctionBatchEvent, + BuildServerlessFunctionJob, +} from 'src/engine/metadata-modules/serverless-function/jobs/build-serverless-function.job'; @Injectable() export class ServerlessFunctionService { @@ -43,6 +50,8 @@ export class ServerlessFunctionService { private readonly throttlerService: ThrottlerService, private readonly environmentService: EnvironmentService, private readonly analyticsService: AnalyticsService, + @InjectMessageQueue(MessageQueue.serverlessFunctionQueue) + private readonly messageQueueService: MessageQueueService, ) {} async findManyServerlessFunctions(where) { @@ -263,7 +272,11 @@ export class ServerlessFunctionService { }); } - await this.serverlessService.build(existingServerlessFunction, 'draft'); + await this.buildServerlessFunction({ + serverlessFunctionId: existingServerlessFunction.id, + serverlessFunctionVersion: 'draft', + workspaceId, + }); await this.serverlessFunctionRepository.update( existingServerlessFunction.id, { @@ -330,7 +343,11 @@ export class ServerlessFunctionService { }); } - await this.serverlessService.build(createdServerlessFunction, 'draft'); + await this.buildServerlessFunction({ + serverlessFunctionId: createdServerlessFunction.id, + serverlessFunctionVersion: 'draft', + workspaceId, + }); return this.serverlessFunctionRepository.findOneBy({ id: createdServerlessFunction.id, @@ -351,4 +368,25 @@ export class ServerlessFunctionService { ); } } + + private async buildServerlessFunction({ + serverlessFunctionId, + serverlessFunctionVersion, + workspaceId, + }: { + serverlessFunctionId: string; + serverlessFunctionVersion: string; + workspaceId: string; + }) { + await this.messageQueueService.add( + BuildServerlessFunctionJob.name, + { + serverlessFunctions: [ + { serverlessFunctionId, serverlessFunctionVersion }, + ], + workspaceId, + }, + { id: `${serverlessFunctionId}-${serverlessFunctionVersion}` }, + ); + } }