diff --git a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts index 67acbbdf3..a378502bf 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts @@ -174,9 +174,12 @@ export class MessageChannelSyncStatusService { }); } - public async markAsFailedUnknownAndFlushMessagesToImport( + public async markAsFailedAndFlushMessagesToImport( messageChannelIds: string[], workspaceId: string, + syncStatus: + | MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS + | MessageChannelSyncStatus.FAILED_UNKNOWN, ) { if (!messageChannelIds.length) { return; @@ -195,41 +198,16 @@ export class MessageChannelSyncStatusService { await messageChannelRepository.update(messageChannelIds, { syncStage: MessageChannelSyncStage.FAILED, - syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN, + syncStatus: syncStatus, }); + const metricsKey = + syncStatus === MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS + ? MetricsKeys.MessageChannelSyncJobFailedInsufficientPermissions + : MetricsKeys.MessageChannelSyncJobFailedUnknown; + await this.metricsService.batchIncrementCounter({ - key: MetricsKeys.MessageChannelSyncJobFailedUnknown, - eventIds: messageChannelIds, - }); - } - - public async markAsFailedInsufficientPermissionsAndFlushMessagesToImport( - messageChannelIds: string[], - workspaceId: string, - ) { - if (!messageChannelIds.length) { - return; - } - - for (const messageChannelId of messageChannelIds) { - await this.cacheStorage.del( - `messages-to-import:${workspaceId}:${messageChannelId}`, - ); - } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - await messageChannelRepository.update(messageChannelIds, { - syncStage: MessageChannelSyncStage.FAILED, - syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, - }); - - await this.metricsService.batchIncrementCounter({ - key: MetricsKeys.MessageChannelSyncJobFailedInsufficientPermissions, + key: metricsKey, eventIds: messageChannelIds, }); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts index 66f0eeaba..6b04eaee6 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts @@ -3,7 +3,10 @@ import { Injectable } from '@nestjs/common'; import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + MessageChannelSyncStatus, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts'; import { MessageImportDriverException, @@ -90,9 +93,10 @@ export class MessageImportExceptionHandlerService { if ( messageChannel.throttleFailureCount >= MESSAGING_THROTTLE_MAX_ATTEMPTS ) { - await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( + await this.messageChannelSyncStatusService.markAsFailedAndFlushMessagesToImport( [messageChannel.id], workspaceId, + MessageChannelSyncStatus.FAILED_UNKNOWN, ); throw new MessageImportException( `Unknown temporary error occurred multiple times while importing messages for message channel ${messageChannel.id} in workspace ${workspaceId}`, @@ -139,9 +143,10 @@ export class MessageImportExceptionHandlerService { messageChannel: Pick, workspaceId: string, ): Promise { - await this.messageChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport( + await this.messageChannelSyncStatusService.markAsFailedAndFlushMessagesToImport( [messageChannel.id], workspaceId, + MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, ); } @@ -150,9 +155,10 @@ export class MessageImportExceptionHandlerService { messageChannel: Pick, workspaceId: string, ): Promise { - await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( + await this.messageChannelSyncStatusService.markAsFailedAndFlushMessagesToImport( [messageChannel.id], workspaceId, + MessageChannelSyncStatus.FAILED_UNKNOWN, ); this.exceptionHandlerService.captureExceptions([ @@ -170,9 +176,10 @@ export class MessageImportExceptionHandlerService { messageChannel: Pick, workspaceId: string, ): Promise { - await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( + await this.messageChannelSyncStatusService.markAsFailedAndFlushMessagesToImport( [messageChannel.id], workspaceId, + MessageChannelSyncStatus.FAILED_UNKNOWN, ); throw new MessageImportException(