[Messaging] Fix duplicate messageChannelMessage (#3616)

* [Messaging] Fix duplicate channelMessageChannel

* add messageChannelMessage check before querying gmail

* rename messageChannelMessage to messageChannelMessageAssociation
This commit is contained in:
Weiko
2024-01-25 14:15:57 +01:00
committed by GitHub
parent 7845e04f6b
commit 6d997edabb
9 changed files with 87 additions and 34 deletions

View File

@ -57,12 +57,33 @@ export class GmailFullSyncService {
? messagesData.map((message) => message.id || '')
: [];
if (!messagesData || messagesData?.length === 0) {
if (!messageExternalIds || messageExternalIds?.length === 0) {
return;
}
const existingMessageChannelMessageAssociations =
await this.utils.getMessageChannelMessageAssociations(
messageExternalIds,
gmailMessageChannelId,
dataSourceMetadata,
workspaceDataSource,
);
const existingMessageChannelMessageAssociationsExternalIds =
existingMessageChannelMessageAssociations.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageExternalId,
);
const messagesToFetch = messageExternalIds.filter(
(messageExternalId) =>
!existingMessageChannelMessageAssociationsExternalIds.includes(
messageExternalId,
),
);
const messageQueries =
this.utils.createQueriesFromMessageIds(messageExternalIds);
this.utils.createQueriesFromMessageIds(messagesToFetch);
const { messages: messagesToSave, errors } =
await this.fetchMessagesByBatchesService.fetchAllMessages(
@ -84,7 +105,7 @@ export class GmailFullSyncService {
if (errors.length) throw new Error('Error fetching messages');
const lastModifiedMessageId = messagesData[0].id;
const lastModifiedMessageId = messagesToFetch[0];
const historyId = messagesToSave.find(
(message) => message.externalId === lastModifiedMessageId,

View File

@ -125,7 +125,7 @@ export class GmailPartialSyncService {
const gmailMessageChannelId = gmailMessageChannel[0].id;
const { messagesAdded, messagesDeleted } =
await this.getMessageIdsAndThreadIdsFromHistory(history);
await this.getMessageIdsFromHistory(history);
const messageQueries =
this.utils.createQueriesFromMessageIds(messagesAdded);
@ -144,7 +144,7 @@ export class GmailPartialSyncService {
gmailMessageChannelId,
);
await this.utils.deleteMessageChannelMessages(
await this.utils.deleteMessageChannelMessageAssociations(
messagesDeleted,
gmailMessageChannelId,
dataSourceMetadata,
@ -161,7 +161,7 @@ export class GmailPartialSyncService {
);
}
private async getMessageIdsAndThreadIdsFromHistory(
private async getMessageIdsFromHistory(
history: gmail_v1.Schema$ListHistoryResponse,
): Promise<{
messagesAdded: string[];
@ -193,9 +193,17 @@ export class GmailPartialSyncService {
{ messagesAdded: [], messagesDeleted: [] },
);
const uniqueMessagesAdded = messagesAdded.filter(
(messageId) => !messagesDeleted.includes(messageId),
);
const uniqueMessagesDeleted = messagesDeleted.filter(
(messageId) => !messagesAdded.includes(messageId),
);
return {
messagesAdded,
messagesDeleted,
messagesAdded: uniqueMessagesAdded,
messagesDeleted: uniqueMessagesDeleted,
};
}
}

View File

@ -36,6 +36,16 @@ export class MessagingUtilsService {
) {
for (const message of messages) {
await workspaceDataSource?.transaction(async (manager) => {
const existingMessageChannelMessageAssociations = await manager.query(
`SELECT COUNT(*) FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation"
WHERE "messageExternalId" = $1 AND "messageChannelId" = $2`,
[message.externalId, gmailMessageChannelId],
);
if (existingMessageChannelMessageAssociations[0]?.count > 0) {
return;
}
const savedOrExistingMessageThreadId =
await this.saveMessageThreadOrReturnExistingMessageThread(
message.messageThreadExternalId,
@ -53,7 +63,7 @@ export class MessagingUtilsService {
);
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessage" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`,
`INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`,
[
gmailMessageChannelId,
savedOrExistingMessageId,
@ -120,7 +130,7 @@ export class MessagingUtilsService {
workspaceDataSource: DataSource,
) {
const existingMessageThreads = await workspaceDataSource?.query(
`SELECT "messageChannelMessage"."messageThreadId" FROM ${dataSourceMetadata.schema}."messageChannelMessage" WHERE "messageThreadExternalId" = $1 LIMIT 1`,
`SELECT "messageChannelMessageAssociation"."messageThreadId" FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" WHERE "messageThreadExternalId" = $1 LIMIT 1`,
[messageThreadExternalId],
);
@ -180,14 +190,14 @@ export class MessagingUtilsService {
}
}
public async deleteMessageChannelMessages(
public async deleteMessageChannelMessageAssociations(
messageExternalIds: string[],
connectedAccountId: string,
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
) {
await workspaceDataSource?.query(
`DELETE FROM ${dataSourceMetadata.schema}."messageChannelMessage" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
`DELETE FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, connectedAccountId],
);
}
@ -265,4 +275,20 @@ export class MessagingUtilsService {
[historyId, connectedAccountId],
);
}
public async getMessageChannelMessageAssociations(
messageExternalIds: string[],
gmailMessageChannelId: string,
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
) {
const existingMessageChannelMessageAssociation =
await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."messageChannelMessageAssociation"
WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, gmailMessageChannelId],
);
return existingMessageChannelMessageAssociation;
}
}