[messaging] add more details in exceptions (#4256)

[messaging] add more logs in exceptions
This commit is contained in:
Weiko
2024-03-04 14:05:01 +01:00
committed by GitHub
parent f990b68f0e
commit 3c63584ef8
3 changed files with 75 additions and 57 deletions

View File

@ -12,7 +12,10 @@ export class DataSourceService {
private readonly dataSourceMetadataRepository: Repository<DataSourceEntity>,
) {}
async createDataSourceMetadata(workspaceId: string, workspaceSchema: string) {
async createDataSourceMetadata(
workspaceId: string,
workspaceSchema: string,
): Promise<DataSourceEntity> {
// TODO: Double check if this is the correct way to do this
const dataSource = await this.dataSourceMetadataRepository.findOne({
where: { workspaceId },
@ -30,25 +33,29 @@ export class DataSourceService {
async getManyDataSourceMetadata(
options: FindManyOptions<DataSourceEntity> = {},
) {
): Promise<DataSourceEntity[]> {
return this.dataSourceMetadataRepository.find(options);
}
async getDataSourcesMetadataFromWorkspaceId(workspaceId: string) {
async getDataSourcesMetadataFromWorkspaceId(
workspaceId: string,
): Promise<DataSourceEntity[]> {
return this.dataSourceMetadataRepository.find({
where: { workspaceId },
order: { createdAt: 'DESC' },
});
}
async getLastDataSourceMetadataFromWorkspaceIdOrFail(workspaceId: string) {
async getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId: string,
): Promise<DataSourceEntity> {
return this.dataSourceMetadataRepository.findOneOrFail({
where: { workspaceId },
order: { createdAt: 'DESC' },
});
}
async delete(workspaceId: string) {
async delete(workspaceId: string): Promise<void> {
await this.dataSourceMetadataRepository.delete({ workspaceId });
}
}

View File

@ -124,54 +124,62 @@ export class MessageService {
): Promise<Map<string, string>> {
const messageExternalIdsAndIdsMap = new Map<string, string>();
for (const message of messages) {
await workspaceDataSource?.transaction(async (manager: EntityManager) => {
const existingMessageChannelMessageAssociationsCount =
await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId(
[message.externalId],
gmailMessageChannelId,
workspaceId,
manager,
);
try {
for (const message of messages) {
await workspaceDataSource?.transaction(
async (manager: EntityManager) => {
const existingMessageChannelMessageAssociationsCount =
await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId(
[message.externalId],
gmailMessageChannelId,
workspaceId,
manager,
);
if (existingMessageChannelMessageAssociationsCount > 0) {
return;
}
if (existingMessageChannelMessageAssociationsCount > 0) {
return;
}
const savedOrExistingMessageThreadId =
await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread(
message.messageThreadExternalId,
dataSourceMetadata,
workspaceId,
manager,
);
const savedOrExistingMessageThreadId =
await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread(
message.messageThreadExternalId,
dataSourceMetadata,
workspaceId,
manager,
);
const savedOrExistingMessageId =
await this.saveMessageOrReturnExistingMessage(
message,
savedOrExistingMessageThreadId,
connectedAccount,
dataSourceMetadata,
workspaceId,
manager,
);
const savedOrExistingMessageId =
await this.saveMessageOrReturnExistingMessage(
message,
savedOrExistingMessageThreadId,
connectedAccount,
dataSourceMetadata,
workspaceId,
manager,
);
messageExternalIdsAndIdsMap.set(
message.externalId,
savedOrExistingMessageId,
messageExternalIdsAndIdsMap.set(
message.externalId,
savedOrExistingMessageId,
);
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`,
[
gmailMessageChannelId,
savedOrExistingMessageId,
message.externalId,
savedOrExistingMessageThreadId,
message.messageThreadExternalId,
],
);
},
);
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`,
[
gmailMessageChannelId,
savedOrExistingMessageId,
message.externalId,
savedOrExistingMessageThreadId,
message.messageThreadExternalId,
],
);
});
}
} catch (error) {
throw new Error(
`Error saving connected account ${connectedAccount.id} messages to workspace ${workspaceId}: ${error.message}`,
);
}
return messageExternalIdsAndIdsMap;

View File

@ -4,6 +4,7 @@ import { DataSource, EntityManager } from 'typeorm';
import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity';
@Injectable()
export class WorkspaceDataSourceService {
@ -30,7 +31,7 @@ export class WorkspaceDataSourceService {
public async connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId: string,
) {
): Promise<{ dataSource: DataSource; dataSourceMetadata: DataSourceEntity }> {
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId,
@ -114,16 +115,18 @@ export class WorkspaceDataSourceService {
workspaceId: string,
transactionManager?: EntityManager,
): Promise<any> {
if (transactionManager) {
return await transactionManager.query(query, parameters);
}
const workspaceDataSource =
await this.connectToWorkspaceDataSource(workspaceId);
try {
if (transactionManager) {
return await transactionManager.query(query, parameters);
}
const workspaceDataSource =
await this.connectToWorkspaceDataSource(workspaceId);
if (workspaceDataSource) {
return await workspaceDataSource.query(query, parameters);
} catch (error) {
throw new Error(
`Error executing raw query for workspace ${workspaceId}: ${error.message}`,
);
}
throw new Error('No data source found or transaction manager provided');
}
}