* Merge messages and threads * rename messageChannelSync to messageChannelMessage * add merge logic * remove deprecated methods * restore enqueue GmailFullSyncJob after connectedAccount creation
This commit is contained in:
@ -30,6 +30,19 @@ export class GmailFullSyncService {
|
||||
throw new Error('No refresh token found');
|
||||
}
|
||||
|
||||
const gmailMessageChannel = await workspaceDataSource?.query(
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`,
|
||||
[connectedAccountId],
|
||||
);
|
||||
|
||||
if (!gmailMessageChannel.length) {
|
||||
throw new Error(
|
||||
`No gmail message channel found for connected account ${connectedAccountId}`,
|
||||
);
|
||||
}
|
||||
|
||||
const gmailMessageChannelId = gmailMessageChannel[0].id;
|
||||
|
||||
const gmailClient =
|
||||
await this.gmailClientProvider.getGmailClient(refreshToken);
|
||||
|
||||
@ -48,20 +61,8 @@ export class GmailFullSyncService {
|
||||
return;
|
||||
}
|
||||
|
||||
const { savedMessageIds, savedThreadIds } =
|
||||
await this.utils.getSavedMessageIdsAndThreadIds(
|
||||
messageExternalIds,
|
||||
connectedAccountId,
|
||||
dataSourceMetadata,
|
||||
workspaceDataSource,
|
||||
);
|
||||
|
||||
const messageIdsToSave = messageExternalIds.filter(
|
||||
(messageId) => !savedMessageIds.includes(messageId),
|
||||
);
|
||||
|
||||
const messageQueries =
|
||||
this.utils.createQueriesFromMessageIds(messageIdsToSave);
|
||||
this.utils.createQueriesFromMessageIds(messageExternalIds);
|
||||
|
||||
const { messages: messagesToSave, errors } =
|
||||
await this.fetchMessagesByBatchesService.fetchAllMessages(
|
||||
@ -69,32 +70,20 @@ export class GmailFullSyncService {
|
||||
accessToken,
|
||||
);
|
||||
|
||||
const threads = this.utils.getThreadsFromMessages(messagesToSave);
|
||||
|
||||
const threadsToSave = threads.filter(
|
||||
(threadId) => !savedThreadIds.includes(threadId.id),
|
||||
);
|
||||
|
||||
await this.utils.saveMessageThreads(
|
||||
threadsToSave,
|
||||
dataSourceMetadata,
|
||||
workspaceDataSource,
|
||||
connectedAccount.id,
|
||||
);
|
||||
if (messagesToSave.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.utils.saveMessages(
|
||||
messagesToSave,
|
||||
dataSourceMetadata,
|
||||
workspaceDataSource,
|
||||
connectedAccount,
|
||||
gmailMessageChannelId,
|
||||
);
|
||||
|
||||
if (errors.length) throw new Error('Error fetching messages');
|
||||
|
||||
if (messagesToSave.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const lastModifiedMessageId = messagesData[0].id;
|
||||
|
||||
const historyId = messagesToSave.find(
|
||||
|
||||
@ -111,40 +111,24 @@ export class GmailPartialSyncService {
|
||||
return;
|
||||
}
|
||||
|
||||
const gmailMessageChannel = await workspaceDataSource?.query(
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`,
|
||||
[connectedAccountId],
|
||||
);
|
||||
|
||||
if (!gmailMessageChannel.length) {
|
||||
throw new Error(
|
||||
`No gmail message channel found for connected account ${connectedAccountId}`,
|
||||
);
|
||||
}
|
||||
|
||||
const gmailMessageChannelId = gmailMessageChannel[0].id;
|
||||
|
||||
const { messagesAdded, messagesDeleted } =
|
||||
await this.getMessageIdsAndThreadIdsFromHistory(history);
|
||||
|
||||
const {
|
||||
savedMessageIds: messagesAddedAlreadySaved,
|
||||
savedThreadIds: threadsAddedAlreadySaved,
|
||||
} = await this.utils.getSavedMessageIdsAndThreadIds(
|
||||
messagesAdded,
|
||||
connectedAccountId,
|
||||
dataSourceMetadata,
|
||||
workspaceDataSource,
|
||||
);
|
||||
|
||||
const messageExternalIdsToSave = messagesAdded.filter(
|
||||
(messageId) =>
|
||||
!messagesAddedAlreadySaved.includes(messageId) &&
|
||||
!messagesDeleted.includes(messageId),
|
||||
);
|
||||
|
||||
const { savedMessageIds: messagesDeletedAlreadySaved } =
|
||||
await this.utils.getSavedMessageIdsAndThreadIds(
|
||||
messagesDeleted,
|
||||
connectedAccountId,
|
||||
dataSourceMetadata,
|
||||
workspaceDataSource,
|
||||
);
|
||||
|
||||
const messageExternalIdsToDelete = messagesDeleted.filter((messageId) =>
|
||||
messagesDeletedAlreadySaved.includes(messageId),
|
||||
);
|
||||
|
||||
const messageQueries = this.utils.createQueriesFromMessageIds(
|
||||
messageExternalIdsToSave,
|
||||
);
|
||||
const messageQueries =
|
||||
this.utils.createQueriesFromMessageIds(messagesAdded);
|
||||
|
||||
const { messages: messagesToSave, errors } =
|
||||
await this.fetchMessagesByBatchesService.fetchAllMessages(
|
||||
@ -152,35 +136,17 @@ export class GmailPartialSyncService {
|
||||
accessToken,
|
||||
);
|
||||
|
||||
const threads = this.utils.getThreadsFromMessages(messagesToSave);
|
||||
|
||||
const threadsToSave = threads.filter(
|
||||
(thread) => !threadsAddedAlreadySaved.includes(thread.id),
|
||||
);
|
||||
|
||||
await this.utils.saveMessageThreads(
|
||||
threadsToSave,
|
||||
dataSourceMetadata,
|
||||
workspaceDataSource,
|
||||
connectedAccount.id,
|
||||
);
|
||||
|
||||
await this.utils.saveMessages(
|
||||
messagesToSave,
|
||||
dataSourceMetadata,
|
||||
workspaceDataSource,
|
||||
connectedAccount,
|
||||
gmailMessageChannelId,
|
||||
);
|
||||
|
||||
await this.utils.deleteMessages(
|
||||
messageExternalIdsToDelete,
|
||||
dataSourceMetadata,
|
||||
workspaceDataSource,
|
||||
);
|
||||
|
||||
await this.utils.deleteEmptyThreads(
|
||||
await this.utils.deleteMessageChannelMessages(
|
||||
messagesDeleted,
|
||||
connectedAccountId,
|
||||
gmailMessageChannelId,
|
||||
dataSourceMetadata,
|
||||
workspaceDataSource,
|
||||
);
|
||||
|
||||
@ -31,7 +31,7 @@ export class GmailRefreshAccessTokenService {
|
||||
}
|
||||
|
||||
const connectedAccounts = await workspaceDataSource?.query(
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "id" = $1`,
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google' AND "id" = $1`,
|
||||
[connectedAccountId],
|
||||
);
|
||||
|
||||
|
||||
@ -10,7 +10,6 @@ import {
|
||||
GmailMessage,
|
||||
Participant,
|
||||
} from 'src/workspace/messaging/types/gmailMessage';
|
||||
import { GmailThread } from 'src/workspace/messaging/types/gmailThread';
|
||||
import { MessageQuery } from 'src/workspace/messaging/types/messageOrThreadQuery';
|
||||
|
||||
@Injectable()
|
||||
@ -28,137 +27,129 @@ export class MessagingUtilsService {
|
||||
}));
|
||||
}
|
||||
|
||||
public getThreadsFromMessages(messages: GmailMessage[]): GmailThread[] {
|
||||
return messages.reduce((acc, message) => {
|
||||
if (message.externalId === message.messageThreadExternalId) {
|
||||
acc.push({
|
||||
id: message.messageThreadExternalId,
|
||||
subject: message.subject,
|
||||
});
|
||||
}
|
||||
|
||||
return acc;
|
||||
}, [] as GmailThread[]);
|
||||
}
|
||||
|
||||
public async saveMessageThreads(
|
||||
threads: GmailThread[],
|
||||
dataSourceMetadata: DataSourceEntity,
|
||||
workspaceDataSource: DataSource,
|
||||
connectedAccountId: string,
|
||||
) {
|
||||
const messageChannel = await workspaceDataSource?.query(
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannel" WHERE "connectedAccountId" = $1`,
|
||||
[connectedAccountId],
|
||||
);
|
||||
|
||||
if (!messageChannel.length) {
|
||||
throw new Error('No message channel found for this connected account');
|
||||
}
|
||||
|
||||
for (const thread of threads) {
|
||||
await workspaceDataSource?.query(
|
||||
`INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("externalId", "subject", "messageChannelId", "visibility") VALUES ($1, $2, $3, $4)`,
|
||||
[thread.id, thread.subject, messageChannel[0].id, 'default'],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public async saveMessages(
|
||||
messages: GmailMessage[],
|
||||
dataSourceMetadata: DataSourceEntity,
|
||||
workspaceDataSource: DataSource,
|
||||
connectedAccount,
|
||||
gmailMessageChannelId: string,
|
||||
) {
|
||||
for (const message of messages) {
|
||||
const {
|
||||
externalId,
|
||||
headerMessageId,
|
||||
subject,
|
||||
messageThreadExternalId,
|
||||
internalDate,
|
||||
fromHandle,
|
||||
fromDisplayName,
|
||||
participants,
|
||||
text,
|
||||
} = message;
|
||||
|
||||
const receivedAt = new Date(parseInt(internalDate));
|
||||
|
||||
const messageThread = await workspaceDataSource?.query(
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."messageThread" WHERE "externalId" = $1`,
|
||||
[messageThreadExternalId],
|
||||
);
|
||||
|
||||
const messageId = v4();
|
||||
|
||||
const person = await workspaceDataSource?.query(
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."person" WHERE "email" = $1`,
|
||||
[fromHandle],
|
||||
);
|
||||
|
||||
const personId = person[0]?.id;
|
||||
|
||||
const workspaceMember = await workspaceDataSource?.query(
|
||||
`SELECT "workspaceMember"."id" FROM ${dataSourceMetadata.schema}."workspaceMember"
|
||||
JOIN ${dataSourceMetadata.schema}."connectedAccount" ON ${dataSourceMetadata.schema}."workspaceMember"."id" = ${dataSourceMetadata.schema}."connectedAccount"."accountOwnerId"
|
||||
WHERE ${dataSourceMetadata.schema}."connectedAccount"."handle" = $1`,
|
||||
[fromHandle],
|
||||
);
|
||||
|
||||
const workspaceMemberId = workspaceMember[0]?.id;
|
||||
|
||||
const messageDirection =
|
||||
connectedAccount.handle === fromHandle ? 'outgoing' : 'incoming';
|
||||
|
||||
await workspaceDataSource?.transaction(async (manager) => {
|
||||
await manager.query(
|
||||
`INSERT INTO ${dataSourceMetadata.schema}."message" ("id", "externalId", "headerMessageId", "subject", "receivedAt", "messageThreadId", "direction", "body") VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
|
||||
[
|
||||
messageId,
|
||||
externalId,
|
||||
headerMessageId,
|
||||
subject,
|
||||
receivedAt,
|
||||
messageThread[0]?.id,
|
||||
messageDirection,
|
||||
text,
|
||||
],
|
||||
);
|
||||
const savedOrExistingMessageThreadId =
|
||||
await this.saveMessageThreadOrReturnExistingMessageThread(
|
||||
message.messageThreadExternalId,
|
||||
dataSourceMetadata,
|
||||
workspaceDataSource,
|
||||
);
|
||||
|
||||
const savedOrExistingMessageId =
|
||||
await this.saveMessageOrReturnExistingMessage(
|
||||
message,
|
||||
savedOrExistingMessageThreadId,
|
||||
connectedAccount,
|
||||
dataSourceMetadata,
|
||||
manager,
|
||||
);
|
||||
|
||||
await manager.query(
|
||||
`INSERT INTO ${dataSourceMetadata.schema}."messageParticipant" ("messageId", "role", "handle", "displayName", "personId", "workspaceMemberId") VALUES ($1, $2, $3, $4, $5, $6)`,
|
||||
`INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessage" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`,
|
||||
[
|
||||
messageId,
|
||||
'from',
|
||||
fromHandle,
|
||||
fromDisplayName,
|
||||
personId,
|
||||
workspaceMemberId,
|
||||
gmailMessageChannelId,
|
||||
savedOrExistingMessageId,
|
||||
message.externalId,
|
||||
savedOrExistingMessageThreadId,
|
||||
message.messageThreadExternalId,
|
||||
],
|
||||
);
|
||||
|
||||
await this.saveMessageParticipants(
|
||||
participants,
|
||||
dataSourceMetadata,
|
||||
messageId,
|
||||
manager,
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public async saveMessageParticipants(
|
||||
participants: Participant[],
|
||||
private async saveMessageOrReturnExistingMessage(
|
||||
message: GmailMessage,
|
||||
messageThreadId: string,
|
||||
connectedAccount,
|
||||
dataSourceMetadata: DataSourceEntity,
|
||||
manager: EntityManager,
|
||||
): Promise<string> {
|
||||
const existingMessages = await manager.query(
|
||||
`SELECT "message"."id" FROM ${dataSourceMetadata.schema}."message" WHERE ${dataSourceMetadata.schema}."message"."headerMessageId" = $1 LIMIT 1`,
|
||||
[message.headerMessageId],
|
||||
);
|
||||
const existingMessageId: string = existingMessages[0]?.id;
|
||||
|
||||
if (existingMessageId) {
|
||||
return Promise.resolve(existingMessageId);
|
||||
}
|
||||
|
||||
const newMessageId = v4();
|
||||
|
||||
const messageDirection =
|
||||
connectedAccount.handle === message.fromHandle ? 'outgoing' : 'incoming';
|
||||
|
||||
const receivedAt = new Date(parseInt(message.internalDate));
|
||||
|
||||
await manager.query(
|
||||
`INSERT INTO ${dataSourceMetadata.schema}."message" ("id", "headerMessageId", "subject", "receivedAt", "direction", "messageThreadId", "body") VALUES ($1, $2, $3, $4, $5, $6, $7)`,
|
||||
[
|
||||
newMessageId,
|
||||
message.headerMessageId,
|
||||
message.subject,
|
||||
receivedAt,
|
||||
messageDirection,
|
||||
messageThreadId,
|
||||
message.text,
|
||||
],
|
||||
);
|
||||
|
||||
await this.saveMessageParticipants(
|
||||
message.participants,
|
||||
newMessageId,
|
||||
dataSourceMetadata,
|
||||
manager,
|
||||
);
|
||||
|
||||
return Promise.resolve(newMessageId);
|
||||
}
|
||||
|
||||
private async saveMessageThreadOrReturnExistingMessageThread(
|
||||
messageThreadExternalId: string,
|
||||
dataSourceMetadata: DataSourceEntity,
|
||||
workspaceDataSource: DataSource,
|
||||
) {
|
||||
const existingMessageThreads = await workspaceDataSource?.query(
|
||||
`SELECT "messageChannelMessage"."messageThreadId" FROM ${dataSourceMetadata.schema}."messageChannelMessage" WHERE "messageThreadExternalId" = $1 LIMIT 1`,
|
||||
[messageThreadExternalId],
|
||||
);
|
||||
|
||||
const existingMessageThread = existingMessageThreads[0]?.messageThreadId;
|
||||
|
||||
if (existingMessageThread) {
|
||||
return Promise.resolve(existingMessageThread);
|
||||
}
|
||||
|
||||
const newMessageThreadId = v4();
|
||||
|
||||
await workspaceDataSource?.query(
|
||||
`INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("id") VALUES ($1)`,
|
||||
[newMessageThreadId],
|
||||
);
|
||||
|
||||
return Promise.resolve(newMessageThreadId);
|
||||
}
|
||||
|
||||
private async saveMessageParticipants(
|
||||
participants: Participant[],
|
||||
messageId: string,
|
||||
dataSourceMetadata: DataSourceEntity,
|
||||
manager: EntityManager,
|
||||
): Promise<void> {
|
||||
if (!participants) return;
|
||||
|
||||
for (const participant of participants) {
|
||||
const participantPerson = await manager.query(
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."person" WHERE "email" = $1`,
|
||||
`SELECT "person"."id" FROM ${dataSourceMetadata.schema}."person" WHERE "email" = $1 LIMIT 1`,
|
||||
[participant.handle],
|
||||
);
|
||||
|
||||
@ -167,7 +158,8 @@ export class MessagingUtilsService {
|
||||
const workspaceMember = await manager.query(
|
||||
`SELECT "workspaceMember"."id" FROM ${dataSourceMetadata.schema}."workspaceMember"
|
||||
JOIN ${dataSourceMetadata.schema}."connectedAccount" ON ${dataSourceMetadata.schema}."workspaceMember"."id" = ${dataSourceMetadata.schema}."connectedAccount"."accountOwnerId"
|
||||
WHERE ${dataSourceMetadata.schema}."connectedAccount"."handle" = $1`,
|
||||
WHERE ${dataSourceMetadata.schema}."connectedAccount"."handle" = $1
|
||||
LIMIT 1`,
|
||||
[participant.handle],
|
||||
);
|
||||
|
||||
@ -187,41 +179,16 @@ export class MessagingUtilsService {
|
||||
}
|
||||
}
|
||||
|
||||
public async getSavedMessageIdsAndThreadIds(
|
||||
messageEternalIds: string[],
|
||||
public async deleteMessageChannelMessages(
|
||||
messageExternalIds: string[],
|
||||
connectedAccountId: string,
|
||||
dataSourceMetadata: DataSourceEntity,
|
||||
workspaceDataSource: DataSource,
|
||||
): Promise<{
|
||||
savedMessageIds: string[];
|
||||
savedThreadIds: string[];
|
||||
}> {
|
||||
const messageIdsInDatabase: {
|
||||
messageExternalId: string;
|
||||
messageThreadExternalId: string;
|
||||
}[] = await workspaceDataSource?.query(
|
||||
`SELECT message."externalId" AS "messageExternalId",
|
||||
"messageThread"."externalId" AS "messageThreadExternalId"
|
||||
FROM ${dataSourceMetadata.schema}."message" message
|
||||
LEFT JOIN ${dataSourceMetadata.schema}."messageThread" "messageThread" ON message."messageThreadId" = "messageThread"."id"
|
||||
LEFT JOIN ${dataSourceMetadata.schema}."messageChannel" ON "messageThread"."messageChannelId" = ${dataSourceMetadata.schema}."messageChannel"."id"
|
||||
WHERE ${dataSourceMetadata.schema}."messageChannel"."connectedAccountId" = $1
|
||||
AND message."externalId" = ANY($2)`,
|
||||
[connectedAccountId, messageEternalIds],
|
||||
) {
|
||||
await workspaceDataSource?.query(
|
||||
`DELETE FROM ${dataSourceMetadata.schema}."messageChannelMessage" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
|
||||
[messageExternalIds, connectedAccountId],
|
||||
);
|
||||
|
||||
return {
|
||||
savedMessageIds: messageIdsInDatabase.map(
|
||||
(message) => message.messageExternalId,
|
||||
),
|
||||
savedThreadIds: [
|
||||
...new Set(
|
||||
messageIdsInDatabase.map(
|
||||
(message) => message.messageThreadExternalId,
|
||||
),
|
||||
),
|
||||
],
|
||||
};
|
||||
}
|
||||
|
||||
public async getConnectedAccountsFromWorkspaceId(
|
||||
@ -240,7 +207,7 @@ export class MessagingUtilsService {
|
||||
}
|
||||
|
||||
const connectedAccounts = await workspaceDataSource?.query(
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail'`,
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google'`,
|
||||
);
|
||||
|
||||
if (!connectedAccounts || connectedAccounts.length === 0) {
|
||||
@ -271,7 +238,7 @@ export class MessagingUtilsService {
|
||||
}
|
||||
|
||||
const connectedAccounts = await workspaceDataSource?.query(
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "id" = $1`,
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'google' AND "id" = $1`,
|
||||
[connectedAccountId],
|
||||
);
|
||||
|
||||
@ -297,50 +264,4 @@ export class MessagingUtilsService {
|
||||
[historyId, connectedAccountId],
|
||||
);
|
||||
}
|
||||
|
||||
public async deleteMessages(
|
||||
messageIds: string[],
|
||||
dataSourceMetadata: DataSourceEntity,
|
||||
workspaceDataSource: DataSource,
|
||||
) {
|
||||
if (!messageIds || messageIds.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
await workspaceDataSource?.query(
|
||||
`DELETE FROM ${dataSourceMetadata.schema}."message" WHERE "externalId" = ANY($1)`,
|
||||
[messageIds],
|
||||
);
|
||||
}
|
||||
|
||||
public async deleteEmptyThreads(
|
||||
messageIds: string[],
|
||||
connectedAccountId: string,
|
||||
dataSourceMetadata: DataSourceEntity,
|
||||
workspaceDataSource: DataSource,
|
||||
) {
|
||||
const messageThreadsToDelete = await workspaceDataSource?.query(
|
||||
`SELECT "messageThread"."id" FROM ${dataSourceMetadata.schema}."messageThread" "messageThread"
|
||||
LEFT JOIN ${dataSourceMetadata.schema}."message" message ON "messageThread"."id" = message."messageThreadId"
|
||||
LEFT JOIN ${dataSourceMetadata.schema}."messageChannel" ON "messageThread"."messageChannelId" = ${dataSourceMetadata.schema}."messageChannel"."id"
|
||||
WHERE "messageThread"."externalId" = ANY($1)
|
||||
AND ${dataSourceMetadata.schema}."messageChannel"."connectedAccountId" = $2
|
||||
GROUP BY "messageThread"."id"
|
||||
HAVING COUNT(message."id") = 0`,
|
||||
[messageIds, connectedAccountId],
|
||||
);
|
||||
|
||||
if (!messageThreadsToDelete || messageThreadsToDelete.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const messageThreadIdsToDelete = messageThreadsToDelete.map(
|
||||
(messageThread) => messageThread.id,
|
||||
);
|
||||
|
||||
await workspaceDataSource?.query(
|
||||
`DELETE FROM ${dataSourceMetadata.schema}."messageThread" WHERE "id" = ANY($1)`,
|
||||
[messageThreadIdsToDelete],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user