From 6a5f9492d3d9b0dadfe5ac038a83c605e66d328c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Bosi?= <71827178+bosiraphael@users.noreply.github.com> Date: Thu, 19 Sep 2024 18:32:25 +0200 Subject: [PATCH] 7142 make messaging full message list fetch idempotent (#7148) - Add message deletion and thread cleaning during full message list fetch - Add thread cleaning during partial message list fetch - Delete provider from cache key --- .../calendar-channel-sync-status.service.ts | 6 ++-- .../message-channel-sync-status.service.ts | 6 ++-- ...-single-message-to-cache-for-import.job.ts | 2 +- .../jobs/messaging-clean-cache.ts | 2 +- .../messaging-import-manager.module.ts | 2 ++ ...ssaging-full-message-list-fetch.service.ts | 31 +++++++++++++++---- .../messaging-messages-import.service.ts | 4 +-- ...ging-partial-message-list-fetch.service.ts | 20 ++++++++---- 8 files changed, 51 insertions(+), 22 deletions(-) diff --git a/packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts b/packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts index c01a055de..4940c1db0 100644 --- a/packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts @@ -89,7 +89,7 @@ export class CalendarChannelSyncStatusService { for (const calendarChannelId of calendarChannelIds) { await this.cacheStorage.del( - `calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, + `calendar-events-to-import:${workspaceId}:${calendarChannelId}`, ); } @@ -191,7 +191,7 @@ export class CalendarChannelSyncStatusService { for (const calendarChannelId of calendarChannelIds) { await this.cacheStorage.del( - `calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, + `calendar-events-to-import:${workspaceId}:${calendarChannelId}`, ); } @@ -216,7 +216,7 @@ export class CalendarChannelSyncStatusService { for (const calendarChannelId of calendarChannelIds) { await this.cacheStorage.del( - `calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, + `calendar-events-to-import:${workspaceId}:${calendarChannelId}`, ); } await calendarChannelRepository.update(calendarChannelIds, { diff --git a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts index 02d6045b8..19e6746ce 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts @@ -79,7 +79,7 @@ export class MessageChannelSyncStatusService { for (const messageChannelId of messageChannelIds) { await this.cacheStorage.del( - `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, + `messages-to-import:${workspaceId}:${messageChannelId}`, ); } @@ -174,7 +174,7 @@ export class MessageChannelSyncStatusService { for (const messageChannelId of messageChannelIds) { await this.cacheStorage.del( - `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, + `messages-to-import:${workspaceId}:${messageChannelId}`, ); } @@ -199,7 +199,7 @@ export class MessageChannelSyncStatusService { for (const messageChannelId of messageChannelIds) { await this.cacheStorage.del( - `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, + `messages-to-import:${workspaceId}:${messageChannelId}`, ); } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job.ts index 7780ac0e0..79c5242b9 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job.ts @@ -25,7 +25,7 @@ export class MessagingAddSingleMessageToCacheForImportJob { const { messageExternalId, messageChannelId, workspaceId } = data; await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, + `messages-to-import:${workspaceId}:${messageChannelId}`, [messageExternalId], ); } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-clean-cache.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-clean-cache.ts index 766ff4a94..edb32a3fd 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-clean-cache.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-clean-cache.ts @@ -28,7 +28,7 @@ export class MessagingCleanCacheJob { ); await this.cacheStorage.del( - `messages-to-import:${data.workspaceId}:gmail:${data.messageChannelId}`, + `messages-to-import:${data.workspaceId}:${data.messageChannelId}`, ); this.logger.log( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index b9042cd2f..c13c9485c 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -9,6 +9,7 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works import { EmailAliasManagerModule } from 'src/modules/connected-account/email-alias-manager/email-alias-manager.module'; import { RefreshAccessTokenManagerModule } from 'src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; +import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cleaner/messaging-message-cleaner.module'; import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command'; import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command'; import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command'; @@ -47,6 +48,7 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess FeatureFlagModule, MessageParticipantManagerModule, MessagingMonitoringModule, + MessagingMessageCleanerModule, ], providers: [ MessagingMessageListFetchCronCommand, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts index fd10c3e20..8b9072bbe 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { Any } from 'typeorm'; +import { In } from 'typeorm'; import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; @@ -10,6 +10,7 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; import { MessageImportExceptionHandlerService, MessageImportSyncStep, @@ -25,6 +26,7 @@ export class MessagingFullMessageListFetchService { private readonly twentyORMManager: TwentyORMManager, private readonly messagingGetMessageListService: MessagingGetMessageListService, private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, + private readonly messagingMessageCleanerService: MessagingMessageCleanerService, ) {} public async processMessageListFetch( @@ -51,7 +53,6 @@ export class MessagingFullMessageListFetchService { await messageChannelMessageAssociationRepository.find({ where: { messageChannelId: messageChannel.id, - messageExternalId: Any(messageExternalIds), }, }); @@ -61,17 +62,35 @@ export class MessagingFullMessageListFetchService { messageChannelMessageAssociation.messageExternalId, ); - const messageIdsToImport = messageExternalIds.filter( + const messageExternalIdsToImport = messageExternalIds.filter( (messageExternalId) => !existingMessageChannelMessageAssociationsExternalIds.includes( messageExternalId, ), ); - if (messageIdsToImport.length) { + const messageExternalIdsToDelete = + existingMessageChannelMessageAssociationsExternalIds.filter( + (existingMessageCMAExternalId) => + existingMessageCMAExternalId && + !messageExternalIds.includes(existingMessageCMAExternalId), + ); + + if (messageExternalIdsToDelete.length) { + await messageChannelMessageAssociationRepository.delete({ + messageChannelId: messageChannel.id, + messageExternalId: In(messageExternalIdsToDelete), + }); + + await this.messagingMessageCleanerService.cleanWorkspaceThreads( + workspaceId, + ); + } + + if (messageExternalIdsToImport.length) { await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, - messageIdsToImport, + `messages-to-import:${workspaceId}:${messageChannel.id}`, + messageExternalIdsToImport, ); } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts index 2714f7dc8..601744100 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts @@ -114,7 +114,7 @@ export class MessagingMessagesImportService { ); messageIdsToFetch = await this.cacheStorage.setPop( - `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, + `messages-to-import:${workspaceId}:${messageChannel.id}`, MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE, ); @@ -186,7 +186,7 @@ export class MessagingMessagesImportService { ); } catch (error) { await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, + `messages-to-import:${workspaceId}:${messageChannel.id}`, messageIdsToFetch, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts index b41219314..bdf138950 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts @@ -1,6 +1,6 @@ import { Injectable, Logger } from '@nestjs/common'; -import { Any } from 'typeorm'; +import { In } from 'typeorm'; import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; @@ -10,6 +10,7 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; import { MessageImportExceptionHandlerService, MessageImportSyncStep, @@ -29,6 +30,7 @@ export class MessagingPartialMessageListFetchService { private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService, private readonly twentyORMManager: TwentyORMManager, private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, + private readonly messagingMessageCleanerService: MessagingMessageCleanerService, ) {} public async processMessageListFetch( @@ -77,7 +79,7 @@ export class MessagingPartialMessageListFetchService { } await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, + `messages-to-import:${workspaceId}:${messageChannel.id}`, messageExternalIds, ); @@ -90,10 +92,16 @@ export class MessagingPartialMessageListFetchService { 'messageChannelMessageAssociation', ); - await messageChannelMessageAssociationRepository.delete({ - messageChannelId: messageChannel.id, - messageExternalId: Any(messageExternalIdsToDelete), - }); + if (messageExternalIdsToDelete.length) { + await messageChannelMessageAssociationRepository.delete({ + messageChannelId: messageChannel.id, + messageExternalId: In(messageExternalIdsToDelete), + }); + + await this.messagingMessageCleanerService.cleanWorkspaceThreads( + workspaceId, + ); + } this.logger.log( `Deleted ${messageExternalIdsToDelete.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,