[backend] rename repository services and replace repository modules by dynamicModule (#4536)

* rename database services to repository

* refactor more repositories

* more refactoring

* followup

* remove unused imports

* fix

* fix

* Fix calendar listener being called when flag is off

* remove folders
This commit is contained in:
Weiko
2024-03-18 16:26:23 +01:00
committed by GitHub
parent 2aa6bcdb70
commit 8fb1ab8933
79 changed files with 1080 additions and 776 deletions

View File

@ -1,12 +0,0 @@
import { Module } from '@nestjs/common';
import { CompanyService } from 'src/modules/messaging/repositories/company/company.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
// TODO: Move outside of the messaging module
@Module({
imports: [WorkspaceDataSourceModule],
providers: [CompanyService],
exports: [CompanyService],
})
export class CompanyModule {}

View File

@ -1,61 +0,0 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
export type CompanyToCreate = {
id: string;
domainName: string;
name?: string;
city?: string;
};
// TODO: Move outside of the messaging module
@Injectable()
export class CompanyService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getExistingCompaniesByDomainNames(
domainNames: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<{ id: string; domainName: string }[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const existingCompanies =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT id, "domainName" FROM ${dataSourceSchema}.company WHERE "domainName" = ANY($1)`,
[domainNames],
workspaceId,
transactionManager,
);
return existingCompanies;
}
public async createCompany(
workspaceId: string,
companyToCreate: CompanyToCreate,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}.company (id, "domainName", name, address)
VALUES ($1, $2, $3, $4)`,
[
companyToCreate.id,
companyToCreate.domainName,
companyToCreate.name ?? '',
companyToCreate.city ?? '',
],
workspaceId,
transactionManager,
);
}
}

View File

@ -7,7 +7,7 @@ import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/mess
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
@Injectable()
export class MessageChannelMessageAssociationService {
export class MessageChannelMessageAssociationRepository {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
@ -192,4 +192,30 @@ export class MessageChannelMessageAssociationService {
transactionManager,
);
}
public async insert(
messageChannelId: string,
messageId: string,
messageExternalId: string,
messageThreadId: string,
messageThreadExternalId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`,
[
messageChannelId,
messageId,
messageExternalId,
messageThreadId,
messageThreadExternalId,
],
workspaceId,
transactionManager,
);
}
}

View File

@ -1,11 +0,0 @@
import { Module } from '@nestjs/common';
import { MessageChannelMessageAssociationService } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [MessageChannelMessageAssociationService],
exports: [MessageChannelMessageAssociationService],
})
export class MessageChannelMessageAssociationModule {}

View File

@ -7,7 +7,7 @@ import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-obj
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
@Injectable()
export class MessageChannelService {
export class MessageChannelRepository {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}

View File

@ -1,11 +0,0 @@
import { Module } from '@nestjs/common';
import { MessageChannelService } from 'src/modules/messaging/repositories/message-channel/message-channel.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [MessageChannelService],
exports: [MessageChannelService],
})
export class MessageChannelModule {}

View File

@ -9,13 +9,11 @@ import {
ParticipantWithId,
ParticipantWithMessageId,
} from 'src/modules/messaging/types/gmail-message';
import { PersonService } from 'src/modules/person/repositories/person/person.service';
@Injectable()
export class MessageParticipantService {
export class MessageParticipantRepository {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly personService: PersonService,
) {}
public async getByHandles(
@ -195,45 +193,4 @@ export class MessageParticipantService {
transactionManager,
);
}
public async updateMessageParticipantsAfterPeopleCreation(
participants: ParticipantWithId[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
if (!participants) return;
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const handles = participants.map((participant) => participant.handle);
const participantPersonIds = await this.personService.getByEmails(
handles,
workspaceId,
transactionManager,
);
const messageParticipantsToUpdate = participants.map((participant) => [
participant.id,
participantPersonIds.find(
(e: { id: string; email: string }) => e.email === participant.handle,
)?.id,
]);
if (messageParticipantsToUpdate.length === 0) return;
const valuesString = messageParticipantsToUpdate
.map((_, index) => `($${index * 2 + 1}::uuid, $${index * 2 + 2}::uuid)`)
.join(', ');
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" AS "messageParticipant" SET "personId" = "data"."personId"
FROM (VALUES ${valuesString}) AS "data"("id", "personId")
WHERE "messageParticipant"."id" = "data"."id"`,
messageParticipantsToUpdate.flat(),
workspaceId,
transactionManager,
);
}
}

View File

@ -1,12 +0,0 @@
import { Module } from '@nestjs/common';
import { MessageParticipantService } from 'src/modules/messaging/repositories/message-participant/message-participant.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { PersonModule } from 'src/modules/person/repositories/person/person.module';
@Module({
imports: [WorkspaceDataSourceModule, PersonModule],
providers: [MessageParticipantService],
exports: [MessageParticipantService],
})
export class MessageParticipantModule {}

View File

@ -0,0 +1,67 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
@Injectable()
export class MessageThreadRepository {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getOrphanThreadIdsPaginated(
limit: number,
offset: number,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<string[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const orphanThreads = await this.workspaceDataSourceService.executeRawQuery(
`SELECT mt.id
FROM ${dataSourceSchema}."messageThread" mt
LEFT JOIN ${dataSourceSchema}."message" m ON mt.id = m."messageThreadId"
WHERE m."messageThreadId" IS NULL
LIMIT $1 OFFSET $2`,
[limit, offset],
workspaceId,
transactionManager,
);
return orphanThreads.map(({ id }) => id);
}
public async deleteByIds(
messageThreadIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."messageThread" WHERE id = ANY($1)`,
[messageThreadIds],
workspaceId,
transactionManager,
);
}
public async insert(
messageThreadId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."messageThread" (id) VALUES ($1)`,
[messageThreadId],
workspaceId,
transactionManager,
);
}
}

View File

@ -1,17 +0,0 @@
import { Module, forwardRef } from '@nestjs/common';
import { MessageChannelMessageAssociationModule } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-assocation.module';
import { MessageThreadService } from 'src/modules/messaging/repositories/message-thread/message-thread.service';
import { MessageModule } from 'src/modules/messaging/repositories/message/message.module';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [
WorkspaceDataSourceModule,
MessageChannelMessageAssociationModule,
forwardRef(() => MessageModule),
],
providers: [MessageThreadService],
exports: [MessageThreadService],
})
export class MessageThreadModule {}

View File

@ -1,105 +0,0 @@
import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { v4 } from 'uuid';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { DataSourceEntity } from 'src/engine-metadata/data-source/data-source.entity';
import { MessageChannelMessageAssociationService } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
import { MessageService } from 'src/modules/messaging/repositories/message/message.service';
@Injectable()
export class MessageThreadService {
constructor(
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@Inject(forwardRef(() => MessageService))
private readonly messageService: MessageService,
) {}
public async getOrphanThreadIdsPaginated(
limit: number,
offset: number,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<string[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const orphanThreads = await this.workspaceDataSourceService.executeRawQuery(
`SELECT mt.id
FROM ${dataSourceSchema}."messageThread" mt
LEFT JOIN ${dataSourceSchema}."message" m ON mt.id = m."messageThreadId"
WHERE m."messageThreadId" IS NULL
LIMIT $1 OFFSET $2`,
[limit, offset],
workspaceId,
transactionManager,
);
return orphanThreads.map(({ id }) => id);
}
public async deleteByIds(
messageThreadIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."messageThread" WHERE id = ANY($1)`,
[messageThreadIds],
workspaceId,
transactionManager,
);
}
public async saveMessageThreadOrReturnExistingMessageThread(
headerMessageId: string,
messageThreadExternalId: string,
dataSourceMetadata: DataSourceEntity,
workspaceId: string,
manager: EntityManager,
) {
// Check if message thread already exists via threadExternalId
const existingMessageChannelMessageAssociationByMessageThreadExternalId =
await this.messageChannelMessageAssociationService.getFirstByMessageThreadExternalId(
messageThreadExternalId,
workspaceId,
manager,
);
const existingMessageThread =
existingMessageChannelMessageAssociationByMessageThreadExternalId?.messageThreadId;
if (existingMessageThread) {
return Promise.resolve(existingMessageThread);
}
// Check if message thread already exists via existing message headerMessageId
const existingMessageWithSameHeaderMessageId =
await this.messageService.getFirstOrNullByHeaderMessageId(
headerMessageId,
workspaceId,
manager,
);
if (existingMessageWithSameHeaderMessageId) {
return Promise.resolve(
existingMessageWithSameHeaderMessageId.messageThreadId,
);
}
// If message thread does not exist, create new message thread
const newMessageThreadId = v4();
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("id") VALUES ($1)`,
[newMessageThreadId],
);
return Promise.resolve(newMessageThreadId);
}
}

View File

@ -0,0 +1,138 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { MessageObjectMetadata } from 'src/modules/messaging/standard-objects/message.object-metadata';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
@Injectable()
export class MessageRepository {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getNonAssociatedMessageIdsPaginated(
limit: number,
offset: number,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<string[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const nonAssociatedMessages =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT m.id FROM ${dataSourceSchema}."message" m
LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" mcma
ON m.id = mcma."messageId"
WHERE mcma.id IS NULL
LIMIT $1 OFFSET $2`,
[limit, offset],
workspaceId,
transactionManager,
);
return nonAssociatedMessages.map(({ id }) => id);
}
public async getFirstOrNullByHeaderMessageId(
headerMessageId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata> | null> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messages = await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "headerMessageId" = $1 LIMIT 1`,
[headerMessageId],
workspaceId,
transactionManager,
);
if (!messages || messages.length === 0) {
return null;
}
return messages[0];
}
public async getByIds(
messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`,
[messageIds],
workspaceId,
transactionManager,
);
}
public async deleteByIds(
messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`,
[messageIds],
workspaceId,
transactionManager,
);
}
public async getByMessageThreadIds(
messageThreadIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "messageThreadId" = ANY($1)`,
[messageThreadIds],
workspaceId,
transactionManager,
);
}
public async insert(
id: string,
headerMessageId: string,
subject: string,
receivedAt: Date,
messageDirection: string,
messageThreadId: string,
text: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."message" ("id", "headerMessageId", "subject", "receivedAt", "direction", "messageThreadId", "text") VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
id,
headerMessageId,
subject,
receivedAt,
messageDirection,
messageThreadId,
text,
],
workspaceId,
transactionManager,
);
}
}

View File

@ -1,23 +0,0 @@
import { Module, forwardRef } from '@nestjs/common';
import { MessageChannelModule } from 'src/modules/messaging/repositories/message-channel/message-channel.module';
import { MessageChannelMessageAssociationModule } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-assocation.module';
import { MessageParticipantModule } from 'src/modules/messaging/repositories/message-participant/message-participant.module';
import { MessageThreadModule } from 'src/modules/messaging/repositories/message-thread/message-thread.module';
import { MessageService } from 'src/modules/messaging/repositories/message/message.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { CreateCompaniesAndContactsModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/create-company-and-contact/create-company-and-contact.module';
@Module({
imports: [
WorkspaceDataSourceModule,
forwardRef(() => MessageThreadModule),
MessageParticipantModule,
MessageChannelMessageAssociationModule,
MessageChannelModule,
CreateCompaniesAndContactsModule,
],
providers: [MessageService],
exports: [MessageService],
})
export class MessageModule {}

View File

@ -1,346 +0,0 @@
import { Inject, Injectable, Logger, forwardRef } from '@nestjs/common';
import { DataSource, EntityManager } from 'typeorm';
import { v4 } from 'uuid';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { MessageObjectMetadata } from 'src/modules/messaging/standard-objects/message.object-metadata';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { DataSourceEntity } from 'src/engine-metadata/data-source/data-source.entity';
import { GmailMessage } from 'src/modules/messaging/types/gmail-message';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { MessageChannelMessageAssociationService } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
import { MessageThreadService } from 'src/modules/messaging/repositories/message-thread/message-thread.service';
import { MessageChannelService } from 'src/modules/messaging/repositories/message-channel/message-channel.service';
@Injectable()
export class MessageService {
private readonly logger = new Logger(MessageService.name);
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
@Inject(forwardRef(() => MessageThreadService))
private readonly messageThreadService: MessageThreadService,
private readonly messageChannelService: MessageChannelService,
) {}
public async getNonAssociatedMessageIdsPaginated(
limit: number,
offset: number,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<string[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const nonAssociatedMessages =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT m.id FROM ${dataSourceSchema}."message" m
LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" mcma
ON m.id = mcma."messageId"
WHERE mcma.id IS NULL
LIMIT $1 OFFSET $2`,
[limit, offset],
workspaceId,
transactionManager,
);
return nonAssociatedMessages.map(({ id }) => id);
}
public async getFirstOrNullByHeaderMessageId(
headerMessageId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata> | null> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messages = await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "headerMessageId" = $1 LIMIT 1`,
[headerMessageId],
workspaceId,
transactionManager,
);
if (!messages || messages.length === 0) {
return null;
}
return messages[0];
}
public async getByIds(
messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`,
[messageIds],
workspaceId,
transactionManager,
);
}
public async deleteByIds(
messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`,
[messageIds],
workspaceId,
transactionManager,
);
}
public async getByMessageThreadIds(
messageThreadIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "messageThreadId" = ANY($1)`,
[messageThreadIds],
workspaceId,
transactionManager,
);
}
public async saveMessages(
messages: GmailMessage[],
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
gmailMessageChannelId: string,
workspaceId: string,
): Promise<Map<string, string>> {
const messageExternalIdsAndIdsMap = new Map<string, string>();
try {
let keepImporting = true;
for (const message of messages) {
if (!keepImporting) {
break;
}
await workspaceDataSource?.transaction(
async (manager: EntityManager) => {
const gmailMessageChannel =
await this.messageChannelService.getByIds(
[gmailMessageChannelId],
workspaceId,
manager,
);
if (gmailMessageChannel.length === 0) {
this.logger.error(
`No message channel found for connected account ${connectedAccount.id} in workspace ${workspaceId} in saveMessages`,
);
keepImporting = false;
return;
}
const existingMessageChannelMessageAssociationsCount =
await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId(
[message.externalId],
gmailMessageChannelId,
workspaceId,
manager,
);
if (existingMessageChannelMessageAssociationsCount > 0) {
return;
}
// TODO: This does not handle all thread merging use cases and might create orphan threads.
const savedOrExistingMessageThreadId =
await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread(
message.headerMessageId,
message.messageThreadExternalId,
dataSourceMetadata,
workspaceId,
manager,
);
const savedOrExistingMessageId =
await this.saveMessageOrReturnExistingMessage(
message,
savedOrExistingMessageThreadId,
connectedAccount,
dataSourceMetadata,
workspaceId,
manager,
);
messageExternalIdsAndIdsMap.set(
message.externalId,
savedOrExistingMessageId,
);
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`,
[
gmailMessageChannelId,
savedOrExistingMessageId,
message.externalId,
savedOrExistingMessageThreadId,
message.messageThreadExternalId,
],
);
},
);
}
} catch (error) {
throw new Error(
`Error saving connected account ${connectedAccount.id} messages to workspace ${workspaceId}: ${error.message}`,
);
}
return messageExternalIdsAndIdsMap;
}
private async saveMessageOrReturnExistingMessage(
message: GmailMessage,
messageThreadId: string,
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
dataSourceMetadata: DataSourceEntity,
workspaceId: string,
manager: EntityManager,
): Promise<string> {
const existingMessage = await this.getFirstOrNullByHeaderMessageId(
message.headerMessageId,
workspaceId,
);
const existingMessageId = existingMessage?.id;
if (existingMessageId) {
return Promise.resolve(existingMessageId);
}
const newMessageId = v4();
const messageDirection =
connectedAccount.handle === message.fromHandle ? 'outgoing' : 'incoming';
const receivedAt = new Date(parseInt(message.internalDate));
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."message" ("id", "headerMessageId", "subject", "receivedAt", "direction", "messageThreadId", "text") VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
newMessageId,
message.headerMessageId,
message.subject,
receivedAt,
messageDirection,
messageThreadId,
message.text,
],
);
return Promise.resolve(newMessageId);
}
public async deleteMessages(
messagesDeletedMessageExternalIds: string[],
gmailMessageChannelId: string,
workspaceId: string,
) {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
await workspaceDataSource?.transaction(async (manager: EntityManager) => {
const messageChannelMessageAssociationsToDelete =
await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId(
messagesDeletedMessageExternalIds,
gmailMessageChannelId,
workspaceId,
manager,
);
const messageChannelMessageAssociationIdsToDeleteIds =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.id,
);
await this.messageChannelMessageAssociationService.deleteByIds(
messageChannelMessageAssociationIdsToDeleteIds,
workspaceId,
manager,
);
const messageIdsFromMessageChannelMessageAssociationsToDelete =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.messageId,
);
const messageChannelMessageAssociationByMessageIds =
await this.messageChannelMessageAssociationService.getByMessageIds(
messageIdsFromMessageChannelMessageAssociationsToDelete,
workspaceId,
manager,
);
const messageIdsFromMessageChannelMessageAssociationByMessageIds =
messageChannelMessageAssociationByMessageIds.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageId,
);
const messageIdsToDelete =
messageIdsFromMessageChannelMessageAssociationsToDelete.filter(
(messageId) =>
!messageIdsFromMessageChannelMessageAssociationByMessageIds.includes(
messageId,
),
);
await this.deleteByIds(messageIdsToDelete, workspaceId, manager);
const messageThreadIdsFromMessageChannelMessageAssociationsToDelete =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.messageThreadId,
);
const messagesByThreadIds = await this.getByMessageThreadIds(
messageThreadIdsFromMessageChannelMessageAssociationsToDelete,
workspaceId,
manager,
);
const threadIdsToDelete =
messageThreadIdsFromMessageChannelMessageAssociationsToDelete.filter(
(threadId) =>
!messagesByThreadIds.find(
(message) => message.messageThreadId === threadId,
),
);
await this.messageThreadService.deleteByIds(
threadIdsToDelete,
workspaceId,
manager,
);
});
}
}