diff --git a/packages/twenty-server/src/metadata/data-source/data-source.service.ts b/packages/twenty-server/src/metadata/data-source/data-source.service.ts index 3b644f11f..226b5a9ca 100644 --- a/packages/twenty-server/src/metadata/data-source/data-source.service.ts +++ b/packages/twenty-server/src/metadata/data-source/data-source.service.ts @@ -12,7 +12,10 @@ export class DataSourceService { private readonly dataSourceMetadataRepository: Repository, ) {} - async createDataSourceMetadata(workspaceId: string, workspaceSchema: string) { + async createDataSourceMetadata( + workspaceId: string, + workspaceSchema: string, + ): Promise { // TODO: Double check if this is the correct way to do this const dataSource = await this.dataSourceMetadataRepository.findOne({ where: { workspaceId }, @@ -30,25 +33,29 @@ export class DataSourceService { async getManyDataSourceMetadata( options: FindManyOptions = {}, - ) { + ): Promise { return this.dataSourceMetadataRepository.find(options); } - async getDataSourcesMetadataFromWorkspaceId(workspaceId: string) { + async getDataSourcesMetadataFromWorkspaceId( + workspaceId: string, + ): Promise { return this.dataSourceMetadataRepository.find({ where: { workspaceId }, order: { createdAt: 'DESC' }, }); } - async getLastDataSourceMetadataFromWorkspaceIdOrFail(workspaceId: string) { + async getLastDataSourceMetadataFromWorkspaceIdOrFail( + workspaceId: string, + ): Promise { return this.dataSourceMetadataRepository.findOneOrFail({ where: { workspaceId }, order: { createdAt: 'DESC' }, }); } - async delete(workspaceId: string) { + async delete(workspaceId: string): Promise { await this.dataSourceMetadataRepository.delete({ workspaceId }); } } diff --git a/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts b/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts index 7fb2a6694..45002250c 100644 --- a/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts +++ b/packages/twenty-server/src/workspace/messaging/repositories/message/message.service.ts @@ -124,54 +124,62 @@ export class MessageService { ): Promise> { const messageExternalIdsAndIdsMap = new Map(); - for (const message of messages) { - await workspaceDataSource?.transaction(async (manager: EntityManager) => { - const existingMessageChannelMessageAssociationsCount = - await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId( - [message.externalId], - gmailMessageChannelId, - workspaceId, - manager, - ); + try { + for (const message of messages) { + await workspaceDataSource?.transaction( + async (manager: EntityManager) => { + const existingMessageChannelMessageAssociationsCount = + await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId( + [message.externalId], + gmailMessageChannelId, + workspaceId, + manager, + ); - if (existingMessageChannelMessageAssociationsCount > 0) { - return; - } + if (existingMessageChannelMessageAssociationsCount > 0) { + return; + } - const savedOrExistingMessageThreadId = - await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread( - message.messageThreadExternalId, - dataSourceMetadata, - workspaceId, - manager, - ); + const savedOrExistingMessageThreadId = + await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread( + message.messageThreadExternalId, + dataSourceMetadata, + workspaceId, + manager, + ); - const savedOrExistingMessageId = - await this.saveMessageOrReturnExistingMessage( - message, - savedOrExistingMessageThreadId, - connectedAccount, - dataSourceMetadata, - workspaceId, - manager, - ); + const savedOrExistingMessageId = + await this.saveMessageOrReturnExistingMessage( + message, + savedOrExistingMessageThreadId, + connectedAccount, + dataSourceMetadata, + workspaceId, + manager, + ); - messageExternalIdsAndIdsMap.set( - message.externalId, - savedOrExistingMessageId, + messageExternalIdsAndIdsMap.set( + message.externalId, + savedOrExistingMessageId, + ); + + await manager.query( + `INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`, + [ + gmailMessageChannelId, + savedOrExistingMessageId, + message.externalId, + savedOrExistingMessageThreadId, + message.messageThreadExternalId, + ], + ); + }, ); - - await manager.query( - `INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`, - [ - gmailMessageChannelId, - savedOrExistingMessageId, - message.externalId, - savedOrExistingMessageThreadId, - message.messageThreadExternalId, - ], - ); - }); + } + } catch (error) { + throw new Error( + `Error saving connected account ${connectedAccount.id} messages to workspace ${workspaceId}: ${error.message}`, + ); } return messageExternalIdsAndIdsMap; diff --git a/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts b/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts index 350ec645a..aad5315ad 100644 --- a/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts +++ b/packages/twenty-server/src/workspace/workspace-datasource/workspace-datasource.service.ts @@ -4,6 +4,7 @@ import { DataSource, EntityManager } from 'typeorm'; import { DataSourceService } from 'src/metadata/data-source/data-source.service'; import { TypeORMService } from 'src/database/typeorm/typeorm.service'; +import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; @Injectable() export class WorkspaceDataSourceService { @@ -30,7 +31,7 @@ export class WorkspaceDataSourceService { public async connectedToWorkspaceDataSourceAndReturnMetadata( workspaceId: string, - ) { + ): Promise<{ dataSource: DataSource; dataSourceMetadata: DataSourceEntity }> { const dataSourceMetadata = await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail( workspaceId, @@ -114,16 +115,18 @@ export class WorkspaceDataSourceService { workspaceId: string, transactionManager?: EntityManager, ): Promise { - if (transactionManager) { - return await transactionManager.query(query, parameters); - } - const workspaceDataSource = - await this.connectToWorkspaceDataSource(workspaceId); + try { + if (transactionManager) { + return await transactionManager.query(query, parameters); + } + const workspaceDataSource = + await this.connectToWorkspaceDataSource(workspaceId); - if (workspaceDataSource) { return await workspaceDataSource.query(query, parameters); + } catch (error) { + throw new Error( + `Error executing raw query for workspace ${workspaceId}: ${error.message}`, + ); } - - throw new Error('No data source found or transaction manager provided'); } }