diff --git a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts index 05f859917..30e73f6bb 100644 --- a/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts +++ b/packages/twenty-server/src/engine/integrations/message-queue/message-queue.explorer.ts @@ -25,7 +25,6 @@ import { MessageQueueMetadataAccessor } from './message-queue-metadata.accessor' interface ProcessorGroup { instance: object; host: Module; - processorName: string; processMethodNames: string[]; isRequestScoped: boolean; } @@ -78,7 +77,6 @@ export class MessageQueueExplorer implements OnModuleInit { (acc, wrapper) => { const { instance, metatype } = wrapper; const methodNames = this.metadataScanner.getAllMethodNames(instance); - const processorName = wrapper.name; const { queueName } = this.metadataAccessor.getProcessorMetadata( instance.constructor || metatype, @@ -111,7 +109,6 @@ export class MessageQueueExplorer implements OnModuleInit { acc[queueName].push({ instance, host: wrapper.host, - processorName, processMethodNames, isRequestScoped: !wrapper.isDependencyTreeStatic(), }); @@ -140,11 +137,7 @@ export class MessageQueueExplorer implements OnModuleInit { ) { queue.work(async (job) => { for (const processorGroup of processorGroupCollection) { - const { processorName } = processorGroup; - - if (job.name === processorName) { - await this.handleProcessor(processorGroup, job); - } + await this.handleProcessor(processorGroup, job); } }, options); } @@ -153,16 +146,21 @@ export class MessageQueueExplorer implements OnModuleInit { { instance, host, processMethodNames, isRequestScoped }: ProcessorGroup, job: MessageQueueJob, ) { - const processMetadataCollection = new Map( - processMethodNames.map((name) => { + const filteredProcessMethodNames = processMethodNames.filter( + (processMethodName) => { const metadata = this.metadataAccessor.getProcessMetadata( - instance[name], + instance[processMethodName], ); - return [name, metadata]; - }), + return metadata && job.name === metadata.jobName; + }, ); + // Return early if no matching methods found + if (filteredProcessMethodNames.length === 0) { + return; + } + if (isRequestScoped) { const contextId = createContextId(); @@ -187,29 +185,31 @@ export class MessageQueueExplorer implements OnModuleInit { await this.invokeProcessMethods( contextInstance, - processMetadataCollection, + filteredProcessMethodNames, job, ); } else { - await this.invokeProcessMethods(instance, processMetadataCollection, job); + await this.invokeProcessMethods( + instance, + filteredProcessMethodNames, + job, + ); } } private async invokeProcessMethods( instance: object, - processMetadataCollection: Map, + processMethodNames: string[], job: MessageQueueJob, ) { - for (const [methodName, metadata] of processMetadataCollection) { - if (job.name === metadata?.jobName) { - try { - await instance[methodName].call(instance, job.data); - } catch (err) { - if (!shouldFilterException(err)) { - this.exceptionHandlerService.captureExceptions([err]); - } - throw err; + for (const processMethodName of processMethodNames) { + try { + await instance[processMethodName].call(instance, job.data); + } catch (err) { + if (!shouldFilterException(err)) { + this.exceptionHandlerService.captureExceptions([err]); } + throw err; } } } diff --git a/packages/twenty-server/src/engine/twenty-orm/factories/scoped-workspace-datasource.factory.ts b/packages/twenty-server/src/engine/twenty-orm/factories/scoped-workspace-datasource.factory.ts index e040f6ddc..096be2194 100644 --- a/packages/twenty-server/src/engine/twenty-orm/factories/scoped-workspace-datasource.factory.ts +++ b/packages/twenty-server/src/engine/twenty-orm/factories/scoped-workspace-datasource.factory.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable, Scope } from '@nestjs/common'; +import { Inject, Injectable, Optional, Scope } from '@nestjs/common'; import { REQUEST } from '@nestjs/core'; import { EntitySchema } from 'typeorm'; @@ -8,7 +8,9 @@ import { WorkspaceDatasourceFactory } from 'src/engine/twenty-orm/factories/work @Injectable({ scope: Scope.REQUEST }) export class ScopedWorkspaceDatasourceFactory { constructor( - @Inject(REQUEST) private readonly request: Request, + @Optional() + @Inject(REQUEST) + private readonly request: Request | null, private readonly workspaceDataSourceFactory: WorkspaceDatasourceFactory, ) {} diff --git a/packages/twenty-server/src/engine/twenty-orm/twenty-orm.manager.ts b/packages/twenty-server/src/engine/twenty-orm/twenty-orm.manager.ts index c2e12ea45..b4fc8e292 100644 --- a/packages/twenty-server/src/engine/twenty-orm/twenty-orm.manager.ts +++ b/packages/twenty-server/src/engine/twenty-orm/twenty-orm.manager.ts @@ -1,9 +1,8 @@ -import { Injectable, Type } from '@nestjs/common'; +import { Injectable, Optional, Type } from '@nestjs/common'; import { ObjectLiteral } from 'typeorm'; import { EntitySchemaFactory } from 'src/engine/twenty-orm/factories/entity-schema.factory'; -import { InjectWorkspaceDatasource } from 'src/engine/twenty-orm/decorators/inject-workspace-datasource.decorator'; import { WorkspaceDataSource } from 'src/engine/twenty-orm/datasource/workspace.datasource'; import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; import { WorkspaceDatasourceFactory } from 'src/engine/twenty-orm/factories/workspace-datasource.factory'; @@ -12,8 +11,8 @@ import { ObjectLiteralStorage } from 'src/engine/twenty-orm/storage/object-liter @Injectable() export class TwentyORMManager { constructor( - @InjectWorkspaceDatasource() - private readonly workspaceDataSource: WorkspaceDataSource, + @Optional() + private readonly workspaceDataSource: WorkspaceDataSource | null, private readonly entitySchemaFactory: EntitySchemaFactory, private readonly workspaceDataSourceFactory: WorkspaceDatasourceFactory, ) {} @@ -23,6 +22,10 @@ export class TwentyORMManager { ): WorkspaceRepository { const entitySchema = this.entitySchemaFactory.create(entityClass); + if (!this.workspaceDataSource) { + throw new Error('Workspace data source not found'); + } + return this.workspaceDataSource.getRepository(entitySchema); }