8725 workflow avoid serverless function autosave errors (#8916)

See issue #8725 
- Build function asynchronously using a job
- prevent useless builds
- run promises simultaneously

Todo:
- fix outputSchema computing
This commit is contained in:
martmull
2024-12-06 11:13:12 +01:00
committed by GitHub
parent a8867fd090
commit 2b3b073570
9 changed files with 129 additions and 51 deletions

View File

@ -1,6 +1,5 @@
import { useApolloMetadataClient } from '@/object-metadata/hooks/useApolloMetadataClient'; import { useApolloMetadataClient } from '@/object-metadata/hooks/useApolloMetadataClient';
import { UPDATE_ONE_SERVERLESS_FUNCTION } from '@/settings/serverless-functions/graphql/mutations/updateOneServerlessFunction'; import { UPDATE_ONE_SERVERLESS_FUNCTION } from '@/settings/serverless-functions/graphql/mutations/updateOneServerlessFunction';
import { FIND_MANY_SERVERLESS_FUNCTIONS } from '@/settings/serverless-functions/graphql/queries/findManyServerlessFunctions';
import { FIND_ONE_SERVERLESS_FUNCTION_SOURCE_CODE } from '@/settings/serverless-functions/graphql/queries/findOneServerlessFunctionSourceCode'; import { FIND_ONE_SERVERLESS_FUNCTION_SOURCE_CODE } from '@/settings/serverless-functions/graphql/queries/findOneServerlessFunctionSourceCode';
import { useMutation } from '@apollo/client'; import { useMutation } from '@apollo/client';
import { getOperationName } from '@apollo/client/utilities'; import { getOperationName } from '@apollo/client/utilities';
@ -28,7 +27,6 @@ export const useUpdateOneServerlessFunction = () => {
}, },
awaitRefetchQueries: true, awaitRefetchQueries: true,
refetchQueries: [ refetchQueries: [
getOperationName(FIND_MANY_SERVERLESS_FUNCTIONS) ?? '',
getOperationName(FIND_ONE_SERVERLESS_FUNCTION_SOURCE_CODE) ?? '', getOperationName(FIND_ONE_SERVERLESS_FUNCTION_SOURCE_CODE) ?? '',
], ],
}); });

View File

@ -2,6 +2,7 @@ import { OnModuleDestroy } from '@nestjs/common';
import { JobsOptions, Queue, QueueOptions, Worker } from 'bullmq'; import { JobsOptions, Queue, QueueOptions, Worker } from 'bullmq';
import omitBy from 'lodash.omitby'; import omitBy from 'lodash.omitby';
import { v4 } from 'uuid';
import { import {
QueueCronJobOptions, QueueCronJobOptions,
@ -15,6 +16,8 @@ import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queu
export type BullMQDriverOptions = QueueOptions; export type BullMQDriverOptions = QueueOptions;
const V4_LENGTH = 36;
export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy { export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy {
private queueMap: Record<MessageQueue, Queue> = {} as Record< private queueMap: Record<MessageQueue, Queue> = {} as Record<
MessageQueue, MessageQueue,
@ -107,8 +110,22 @@ export class BullMQDriver implements MessageQueueDriver, OnModuleDestroy {
`Queue ${queueName} is not registered, make sure you have added it as a queue provider`, `Queue ${queueName} is not registered, make sure you have added it as a queue provider`,
); );
} }
// This ensures only one waiting job can be queued for a specific option.id
if (options?.id) {
const waitingJobs = await this.queueMap[queueName].getJobs(['waiting']);
const isJobAlreadyWaiting = waitingJobs.some(
(job) => job.id?.slice(0, -(V4_LENGTH + 1)) === options.id,
);
if (isJobAlreadyWaiting) {
return;
}
}
const queueOptions: JobsOptions = { const queueOptions: JobsOptions = {
jobId: options?.id, jobId: options?.id ? `${options.id}-${v4()}` : undefined, // We add V4() to id to make sure ids are uniques so we can add a waiting job when a job related with the same option.id is running
priority: options?.priority, priority: options?.priority,
attempts: 1 + (options?.retryLimit || 0), attempts: 1 + (options?.retryLimit || 0),
removeOnComplete: 100, removeOnComplete: 100,

View File

@ -98,6 +98,7 @@ export class PgBossDriver
? { ? {
...options, ...options,
singletonKey: options?.id, singletonKey: options?.id,
useSingletonQueue: true, // When used with singletonKey, ensures only one job can be queued. See https://logsnag.com/blog/deep-dive-into-background-jobs-with-pg-boss-and-typescript
} }
: {}, : {},
); );

View File

@ -54,6 +54,8 @@ import { compileTypescript } from 'src/engine/core-modules/serverless/drivers/ut
import { ENV_FILE_NAME } from 'src/engine/core-modules/serverless/drivers/constants/env-file-name'; import { ENV_FILE_NAME } from 'src/engine/core-modules/serverless/drivers/constants/env-file-name';
import { OUTDIR_FOLDER } from 'src/engine/core-modules/serverless/drivers/constants/outdir-folder'; import { OUTDIR_FOLDER } from 'src/engine/core-modules/serverless/drivers/constants/outdir-folder';
const UPDATE_FUNCTION_DURATION_TIMEOUT_IN_SECONDS = 30;
export interface LambdaDriverOptions extends LambdaClientConfig { export interface LambdaDriverOptions extends LambdaClientConfig {
fileStorageService: FileStorageService; fileStorageService: FileStorageService;
region: string; region: string;
@ -75,7 +77,7 @@ export class LambdaDriver implements ServerlessDriver {
private async waitFunctionUpdates( private async waitFunctionUpdates(
serverlessFunctionId: string, serverlessFunctionId: string,
maxWaitTime: number, maxWaitTime: number = UPDATE_FUNCTION_DURATION_TIMEOUT_IN_SECONDS,
) { ) {
const waitParams = { FunctionName: serverlessFunctionId }; const waitParams = { FunctionName: serverlessFunctionId };
@ -263,12 +265,12 @@ export class LambdaDriver implements ServerlessDriver {
updateConfigurationParams, updateConfigurationParams,
); );
await this.waitFunctionUpdates(serverlessFunction.id, 10); await this.waitFunctionUpdates(serverlessFunction.id);
await this.lambdaClient.send(updateConfigurationCommand); await this.lambdaClient.send(updateConfigurationCommand);
} }
await this.waitFunctionUpdates(serverlessFunction.id, 10); await this.waitFunctionUpdates(serverlessFunction.id);
} }
async publish(serverlessFunction: ServerlessFunctionEntity) { async publish(serverlessFunction: ServerlessFunctionEntity) {
@ -316,7 +318,9 @@ export class LambdaDriver implements ServerlessDriver {
? functionToExecute.id ? functionToExecute.id
: `${functionToExecute.id}:${computedVersion}`; : `${functionToExecute.id}:${computedVersion}`;
await this.waitFunctionUpdates(functionToExecute.id, 10); if (version === 'draft') {
await this.waitFunctionUpdates(functionToExecute.id);
}
const startTime = Date.now(); const startTime = Date.now();

View File

@ -12,14 +12,10 @@ export const getLayerDependencies = async (
layerVersion: number | 'latest', layerVersion: number | 'latest',
): Promise<LayerDependencies> => { ): Promise<LayerDependencies> => {
const lastVersionLayerDirName = getLayerDependenciesDirName(layerVersion); const lastVersionLayerDirName = getLayerDependenciesDirName(layerVersion);
const packageJson = await fs.readFile( const [packageJson, yarnLock] = await Promise.all([
join(lastVersionLayerDirName, 'package.json'), fs.readFile(join(lastVersionLayerDirName, 'package.json'), 'utf8'),
'utf8', fs.readFile(join(lastVersionLayerDirName, 'yarn.lock'), 'utf8'),
); ]);
const yarnLock = await fs.readFile(
join(lastVersionLayerDirName, 'yarn.lock'),
'utf8',
);
return { packageJson: JSON.parse(packageJson), yarnLock }; return { packageJson: JSON.parse(packageJson), yarnLock };
}; };

View File

@ -0,0 +1,52 @@
import { Scope } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { ServerlessService } from 'src/engine/core-modules/serverless/serverless.service';
import { ServerlessFunctionEntity } from 'src/engine/metadata-modules/serverless-function/serverless-function.entity';
import { isDefined } from 'src/utils/is-defined';
export type BuildServerlessFunctionBatchEvent = {
serverlessFunctions: {
serverlessFunctionId: string;
serverlessFunctionVersion: string;
}[];
workspaceId: string;
};
@Processor({
queueName: MessageQueue.serverlessFunctionQueue,
scope: Scope.REQUEST,
})
export class BuildServerlessFunctionJob {
constructor(
@InjectRepository(ServerlessFunctionEntity, 'metadata')
private readonly serverlessFunctionRepository: Repository<ServerlessFunctionEntity>,
private readonly serverlessService: ServerlessService,
) {}
@Process(BuildServerlessFunctionJob.name)
async handle(batchEvent: BuildServerlessFunctionBatchEvent): Promise<void> {
for (const {
serverlessFunctionId,
serverlessFunctionVersion,
} of batchEvent.serverlessFunctions) {
const serverlessFunction =
await this.serverlessFunctionRepository.findOneBy({
id: serverlessFunctionId,
workspaceId: batchEvent.workspaceId,
});
if (isDefined(serverlessFunction)) {
await this.serverlessService.build(
serverlessFunction,
serverlessFunctionVersion,
);
}
}
}
}

View File

@ -1,33 +0,0 @@
import { Scope } from '@nestjs/common';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
export type DeleteServerlessFunctionBatchEvent = {
ids: string[];
workspaceId: string;
};
@Processor({
queueName: MessageQueue.serverlessFunctionQueue,
scope: Scope.REQUEST,
})
export class DeleteServerlessFunctionJob {
constructor(
private readonly serverlessFunctionService: ServerlessFunctionService,
) {}
@Process(DeleteServerlessFunctionJob.name)
async handle(batchEvent: DeleteServerlessFunctionBatchEvent): Promise<void> {
await Promise.all(
batchEvent.ids.map((id) =>
this.serverlessFunctionService.deleteOneServerlessFunction(
id,
batchEvent.workspaceId,
),
),
);
}
}

View File

@ -11,6 +11,7 @@ import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.mod
import { ServerlessFunctionEntity } from 'src/engine/metadata-modules/serverless-function/serverless-function.entity'; import { ServerlessFunctionEntity } from 'src/engine/metadata-modules/serverless-function/serverless-function.entity';
import { ServerlessFunctionResolver } from 'src/engine/metadata-modules/serverless-function/serverless-function.resolver'; import { ServerlessFunctionResolver } from 'src/engine/metadata-modules/serverless-function/serverless-function.resolver';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
import { BuildServerlessFunctionJob } from 'src/engine/metadata-modules/serverless-function/jobs/build-serverless-function.job';
@Module({ @Module({
imports: [ imports: [
@ -21,7 +22,11 @@ import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverles
ThrottlerModule, ThrottlerModule,
AnalyticsModule, AnalyticsModule,
], ],
providers: [ServerlessFunctionService, ServerlessFunctionResolver], providers: [
ServerlessFunctionService,
ServerlessFunctionResolver,
BuildServerlessFunctionJob,
],
exports: [ServerlessFunctionService], exports: [ServerlessFunctionService],
}) })
export class ServerlessFunctionModule {} export class ServerlessFunctionModule {}

View File

@ -32,6 +32,13 @@ import {
} from 'src/engine/metadata-modules/serverless-function/serverless-function.exception'; } from 'src/engine/metadata-modules/serverless-function/serverless-function.exception';
import { isDefined } from 'src/utils/is-defined'; import { isDefined } from 'src/utils/is-defined';
import { getLayerDependencies } from 'src/engine/core-modules/serverless/drivers/utils/get-last-layer-dependencies'; import { getLayerDependencies } from 'src/engine/core-modules/serverless/drivers/utils/get-last-layer-dependencies';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import {
BuildServerlessFunctionBatchEvent,
BuildServerlessFunctionJob,
} from 'src/engine/metadata-modules/serverless-function/jobs/build-serverless-function.job';
@Injectable() @Injectable()
export class ServerlessFunctionService { export class ServerlessFunctionService {
@ -43,6 +50,8 @@ export class ServerlessFunctionService {
private readonly throttlerService: ThrottlerService, private readonly throttlerService: ThrottlerService,
private readonly environmentService: EnvironmentService, private readonly environmentService: EnvironmentService,
private readonly analyticsService: AnalyticsService, private readonly analyticsService: AnalyticsService,
@InjectMessageQueue(MessageQueue.serverlessFunctionQueue)
private readonly messageQueueService: MessageQueueService,
) {} ) {}
async findManyServerlessFunctions(where) { async findManyServerlessFunctions(where) {
@ -263,7 +272,11 @@ export class ServerlessFunctionService {
}); });
} }
await this.serverlessService.build(existingServerlessFunction, 'draft'); await this.buildServerlessFunction({
serverlessFunctionId: existingServerlessFunction.id,
serverlessFunctionVersion: 'draft',
workspaceId,
});
await this.serverlessFunctionRepository.update( await this.serverlessFunctionRepository.update(
existingServerlessFunction.id, existingServerlessFunction.id,
{ {
@ -330,7 +343,11 @@ export class ServerlessFunctionService {
}); });
} }
await this.serverlessService.build(createdServerlessFunction, 'draft'); await this.buildServerlessFunction({
serverlessFunctionId: createdServerlessFunction.id,
serverlessFunctionVersion: 'draft',
workspaceId,
});
return this.serverlessFunctionRepository.findOneBy({ return this.serverlessFunctionRepository.findOneBy({
id: createdServerlessFunction.id, id: createdServerlessFunction.id,
@ -351,4 +368,25 @@ export class ServerlessFunctionService {
); );
} }
} }
private async buildServerlessFunction({
serverlessFunctionId,
serverlessFunctionVersion,
workspaceId,
}: {
serverlessFunctionId: string;
serverlessFunctionVersion: string;
workspaceId: string;
}) {
await this.messageQueueService.add<BuildServerlessFunctionBatchEvent>(
BuildServerlessFunctionJob.name,
{
serverlessFunctions: [
{ serverlessFunctionId, serverlessFunctionVersion },
],
workspaceId,
},
{ id: `${serverlessFunctionId}-${serverlessFunctionVersion}` },
);
}
} }