7142 make messaging full message list fetch idempotent (#7148)

- Add message deletion and thread cleaning during full message list
fetch
- Add thread cleaning  during partial message list fetch
- Delete provider from cache key
This commit is contained in:
Raphaël Bosi
2024-09-19 18:32:25 +02:00
committed by GitHub
parent 8964d26d5b
commit 6a5f9492d3
8 changed files with 51 additions and 22 deletions

View File

@ -89,7 +89,7 @@ export class CalendarChannelSyncStatusService {
for (const calendarChannelId of calendarChannelIds) { for (const calendarChannelId of calendarChannelIds) {
await this.cacheStorage.del( await this.cacheStorage.del(
`calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, `calendar-events-to-import:${workspaceId}:${calendarChannelId}`,
); );
} }
@ -191,7 +191,7 @@ export class CalendarChannelSyncStatusService {
for (const calendarChannelId of calendarChannelIds) { for (const calendarChannelId of calendarChannelIds) {
await this.cacheStorage.del( await this.cacheStorage.del(
`calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, `calendar-events-to-import:${workspaceId}:${calendarChannelId}`,
); );
} }
@ -216,7 +216,7 @@ export class CalendarChannelSyncStatusService {
for (const calendarChannelId of calendarChannelIds) { for (const calendarChannelId of calendarChannelIds) {
await this.cacheStorage.del( await this.cacheStorage.del(
`calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, `calendar-events-to-import:${workspaceId}:${calendarChannelId}`,
); );
} }
await calendarChannelRepository.update(calendarChannelIds, { await calendarChannelRepository.update(calendarChannelIds, {

View File

@ -79,7 +79,7 @@ export class MessageChannelSyncStatusService {
for (const messageChannelId of messageChannelIds) { for (const messageChannelId of messageChannelIds) {
await this.cacheStorage.del( await this.cacheStorage.del(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`, `messages-to-import:${workspaceId}:${messageChannelId}`,
); );
} }
@ -174,7 +174,7 @@ export class MessageChannelSyncStatusService {
for (const messageChannelId of messageChannelIds) { for (const messageChannelId of messageChannelIds) {
await this.cacheStorage.del( await this.cacheStorage.del(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`, `messages-to-import:${workspaceId}:${messageChannelId}`,
); );
} }
@ -199,7 +199,7 @@ export class MessageChannelSyncStatusService {
for (const messageChannelId of messageChannelIds) { for (const messageChannelId of messageChannelIds) {
await this.cacheStorage.del( await this.cacheStorage.del(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`, `messages-to-import:${workspaceId}:${messageChannelId}`,
); );
} }

View File

@ -25,7 +25,7 @@ export class MessagingAddSingleMessageToCacheForImportJob {
const { messageExternalId, messageChannelId, workspaceId } = data; const { messageExternalId, messageChannelId, workspaceId } = data;
await this.cacheStorage.setAdd( await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`, `messages-to-import:${workspaceId}:${messageChannelId}`,
[messageExternalId], [messageExternalId],
); );
} }

View File

@ -28,7 +28,7 @@ export class MessagingCleanCacheJob {
); );
await this.cacheStorage.del( await this.cacheStorage.del(
`messages-to-import:${data.workspaceId}:gmail:${data.messageChannelId}`, `messages-to-import:${data.workspaceId}:${data.messageChannelId}`,
); );
this.logger.log( this.logger.log(

View File

@ -9,6 +9,7 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
import { EmailAliasManagerModule } from 'src/modules/connected-account/email-alias-manager/email-alias-manager.module'; import { EmailAliasManagerModule } from 'src/modules/connected-account/email-alias-manager/email-alias-manager.module';
import { RefreshAccessTokenManagerModule } from 'src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module'; import { RefreshAccessTokenManagerModule } from 'src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module';
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cleaner/messaging-message-cleaner.module';
import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command'; import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command';
import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command'; import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command';
import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command'; import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command';
@ -47,6 +48,7 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess
FeatureFlagModule, FeatureFlagModule,
MessageParticipantManagerModule, MessageParticipantManagerModule,
MessagingMonitoringModule, MessagingMonitoringModule,
MessagingMessageCleanerModule,
], ],
providers: [ providers: [
MessagingMessageListFetchCronCommand, MessagingMessageListFetchCronCommand,

View File

@ -1,6 +1,6 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { Any } from 'typeorm'; import { In } from 'typeorm';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
@ -10,6 +10,7 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
import { import {
MessageImportExceptionHandlerService, MessageImportExceptionHandlerService,
MessageImportSyncStep, MessageImportSyncStep,
@ -25,6 +26,7 @@ export class MessagingFullMessageListFetchService {
private readonly twentyORMManager: TwentyORMManager, private readonly twentyORMManager: TwentyORMManager,
private readonly messagingGetMessageListService: MessagingGetMessageListService, private readonly messagingGetMessageListService: MessagingGetMessageListService,
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
private readonly messagingMessageCleanerService: MessagingMessageCleanerService,
) {} ) {}
public async processMessageListFetch( public async processMessageListFetch(
@ -51,7 +53,6 @@ export class MessagingFullMessageListFetchService {
await messageChannelMessageAssociationRepository.find({ await messageChannelMessageAssociationRepository.find({
where: { where: {
messageChannelId: messageChannel.id, messageChannelId: messageChannel.id,
messageExternalId: Any(messageExternalIds),
}, },
}); });
@ -61,17 +62,35 @@ export class MessagingFullMessageListFetchService {
messageChannelMessageAssociation.messageExternalId, messageChannelMessageAssociation.messageExternalId,
); );
const messageIdsToImport = messageExternalIds.filter( const messageExternalIdsToImport = messageExternalIds.filter(
(messageExternalId) => (messageExternalId) =>
!existingMessageChannelMessageAssociationsExternalIds.includes( !existingMessageChannelMessageAssociationsExternalIds.includes(
messageExternalId, messageExternalId,
), ),
); );
if (messageIdsToImport.length) { const messageExternalIdsToDelete =
existingMessageChannelMessageAssociationsExternalIds.filter(
(existingMessageCMAExternalId) =>
existingMessageCMAExternalId &&
!messageExternalIds.includes(existingMessageCMAExternalId),
);
if (messageExternalIdsToDelete.length) {
await messageChannelMessageAssociationRepository.delete({
messageChannelId: messageChannel.id,
messageExternalId: In(messageExternalIdsToDelete),
});
await this.messagingMessageCleanerService.cleanWorkspaceThreads(
workspaceId,
);
}
if (messageExternalIdsToImport.length) {
await this.cacheStorage.setAdd( await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, `messages-to-import:${workspaceId}:${messageChannel.id}`,
messageIdsToImport, messageExternalIdsToImport,
); );
} }

View File

@ -114,7 +114,7 @@ export class MessagingMessagesImportService {
); );
messageIdsToFetch = await this.cacheStorage.setPop( messageIdsToFetch = await this.cacheStorage.setPop(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, `messages-to-import:${workspaceId}:${messageChannel.id}`,
MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE, MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE,
); );
@ -186,7 +186,7 @@ export class MessagingMessagesImportService {
); );
} catch (error) { } catch (error) {
await this.cacheStorage.setAdd( await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, `messages-to-import:${workspaceId}:${messageChannel.id}`,
messageIdsToFetch, messageIdsToFetch,
); );

View File

@ -1,6 +1,6 @@
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger } from '@nestjs/common';
import { Any } from 'typeorm'; import { In } from 'typeorm';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
@ -10,6 +10,7 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
import { import {
MessageImportExceptionHandlerService, MessageImportExceptionHandlerService,
MessageImportSyncStep, MessageImportSyncStep,
@ -29,6 +30,7 @@ export class MessagingPartialMessageListFetchService {
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService, private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
private readonly twentyORMManager: TwentyORMManager, private readonly twentyORMManager: TwentyORMManager,
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
private readonly messagingMessageCleanerService: MessagingMessageCleanerService,
) {} ) {}
public async processMessageListFetch( public async processMessageListFetch(
@ -77,7 +79,7 @@ export class MessagingPartialMessageListFetchService {
} }
await this.cacheStorage.setAdd( await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, `messages-to-import:${workspaceId}:${messageChannel.id}`,
messageExternalIds, messageExternalIds,
); );
@ -90,10 +92,16 @@ export class MessagingPartialMessageListFetchService {
'messageChannelMessageAssociation', 'messageChannelMessageAssociation',
); );
await messageChannelMessageAssociationRepository.delete({ if (messageExternalIdsToDelete.length) {
messageChannelId: messageChannel.id, await messageChannelMessageAssociationRepository.delete({
messageExternalId: Any(messageExternalIdsToDelete), messageChannelId: messageChannel.id,
}); messageExternalId: In(messageExternalIdsToDelete),
});
await this.messagingMessageCleanerService.cleanWorkspaceThreads(
workspaceId,
);
}
this.logger.log( this.logger.log(
`Deleted ${messageExternalIdsToDelete.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`, `Deleted ${messageExternalIdsToDelete.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,