From 55a45c50cc0c2e4e1e83d22dee3c79cd8f5a73ca Mon Sep 17 00:00:00 2001 From: Guillim Date: Wed, 5 Mar 2025 16:22:51 +0100 Subject: [PATCH] microsoft sync failed (#10381) This PR is supposed to solve an issue with the syncrhonisation of messages, specifically with microsoft driver. Microsoft calls don't need access_Token so refreshing toekns was not implemented. However, microsoft rely on its client which calls its refresfh_token, and I might have missed some underlying dependency from microsoft impelemtation so I setup the access token process to refresh it Needs a talk before to be merged Fix : https://github.com/twentyhq/twenty/issues/10367 EDIT: it was a problem with microsoft making refreshtoken expire (contrarily to google) which needs to be handled. --- .../calendar-event-import-manager.module.ts | 4 +- .../refresh-access-token-manager.module.ts | 11 -- .../services/refresh-access-token.service.ts | 83 --------- ...d-account-refresh-tokens-manager.module.ts | 15 ++ .../google-api-refresh-access-token.module.ts | 2 +- ...google-api-refresh-access-token.service.ts | 22 ++- ...crosoft-api-refresh-access-token.module.ts | 11 ++ .../microsoft-api-refresh-tokens.service.ts | 53 ++++++ ...ected-account-refresh-tokens.exception.ts} | 9 +- ...onnected-account-refresh-tokens.service.ts | 96 +++++++++++ .../jobs/messaging-message-list-fetch.job.ts | 160 ++++++++++++------ .../messaging-import-manager.module.ts | 6 +- ...ssaging-full-message-list-fetch.service.ts | 6 +- ...aging-import-exception-handler.service.ts} | 1 + .../messaging-messages-import.service.ts | 18 +- ...ging-partial-message-list-fetch.service.ts | 6 +- .../src/utils/assertUnreachable.ts | 3 + packages/twenty-shared/src/utils/index.ts | 2 + 18 files changed, 332 insertions(+), 176 deletions(-) delete mode 100644 packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module.ts delete mode 100644 packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service.ts create mode 100644 packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/connected-account-refresh-tokens-manager.module.ts rename packages/twenty-server/src/modules/connected-account/{refresh-access-token-manager => refresh-tokens-manager}/drivers/google/google-api-refresh-access-token.module.ts (77%) rename packages/twenty-server/src/modules/connected-account/{refresh-access-token-manager => refresh-tokens-manager}/drivers/google/services/google-api-refresh-access-token.service.ts (58%) create mode 100644 packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/microsoft-api-refresh-access-token.module.ts create mode 100644 packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service.ts rename packages/twenty-server/src/modules/connected-account/{refresh-access-token-manager/exceptions/refresh-access-token.exception.ts => refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception.ts} (52%) create mode 100644 packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts rename packages/twenty-server/src/modules/messaging/message-import-manager/services/{message-import-exception-handler.service.ts => messaging-import-exception-handler.service.ts} (98%) create mode 100644 packages/twenty-shared/src/utils/assertUnreachable.ts diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts index 0f667762b..40af5bdaa 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts @@ -30,7 +30,7 @@ import { CalendarEventParticipantManagerModule } from 'src/modules/calendar/cale import { CalendarCommonModule } from 'src/modules/calendar/common/calendar-common.module'; import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service'; import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module'; -import { RefreshAccessTokenManagerModule } from 'src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module'; +import { RefreshTokensManagerModule } from 'src/modules/connected-account/refresh-tokens-manager/connected-account-refresh-tokens-manager.module'; import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity'; @Module({ @@ -47,7 +47,7 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta GoogleCalendarDriverModule, MicrosoftCalendarDriverModule, BillingModule, - RefreshAccessTokenManagerModule, + RefreshTokensManagerModule, ConnectedAccountModule, CalendarCommonModule, HealthModule, diff --git a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module.ts b/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module.ts deleted file mode 100644 index e8eb170b5..000000000 --- a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { Module } from '@nestjs/common'; - -import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account/refresh-access-token-manager/drivers/google/google-api-refresh-access-token.module'; -import { RefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service'; - -@Module({ - imports: [GoogleAPIRefreshAccessTokenModule], - providers: [RefreshAccessTokenService], - exports: [RefreshAccessTokenService], -}) -export class RefreshAccessTokenManagerModule {} diff --git a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service.ts b/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service.ts deleted file mode 100644 index 1e2cc7306..000000000 --- a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service.ts +++ /dev/null @@ -1,83 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/drivers/google/services/google-api-refresh-access-token.service'; -import { - RefreshAccessTokenException, - RefreshAccessTokenExceptionCode, -} from 'src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; - -@Injectable() -export class RefreshAccessTokenService { - constructor( - private readonly googleAPIRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService, - private readonly twentyORMManager: TwentyORMManager, - ) {} - - async refreshAndSaveAccessToken( - connectedAccount: ConnectedAccountWorkspaceEntity, - workspaceId: string, - ): Promise { - const refreshToken = connectedAccount.refreshToken; - let accessToken: string; - - if (!refreshToken) { - throw new RefreshAccessTokenException( - `No refresh token found for connected account ${connectedAccount.id} in workspace ${workspaceId}`, - RefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND, - ); - } - - switch (connectedAccount.provider) { - case 'microsoft': - return ''; - case 'google': { - try { - accessToken = await this.refreshAccessToken( - connectedAccount, - refreshToken, - ); - } catch (error) { - throw new RefreshAccessTokenException( - `Error refreshing access token for connected account ${connectedAccount.id} in workspace ${workspaceId}: ${error.message}`, - RefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED, - ); - } - - const connectedAccountRepository = - await this.twentyORMManager.getRepository( - 'connectedAccount', - ); - - await connectedAccountRepository.update( - { id: connectedAccount.id }, - { - accessToken, - }, - ); - - return accessToken; - } - default: - throw new Error('Provider not supported for access token refresh'); - } - } - - async refreshAccessToken( - connectedAccount: ConnectedAccountWorkspaceEntity, - refreshToken: string, - ): Promise { - switch (connectedAccount.provider) { - case 'google': - return this.googleAPIRefreshAccessTokenService.refreshAccessToken( - refreshToken, - ); - default: - throw new RefreshAccessTokenException( - `Provider ${connectedAccount.provider} is not supported`, - RefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED, - ); - } - } -} diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/connected-account-refresh-tokens-manager.module.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/connected-account-refresh-tokens-manager.module.ts new file mode 100644 index 000000000..8165f89d8 --- /dev/null +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/connected-account-refresh-tokens-manager.module.ts @@ -0,0 +1,15 @@ +import { Module } from '@nestjs/common'; + +import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account/refresh-tokens-manager/drivers/google/google-api-refresh-access-token.module'; +import { MicrosoftAPIRefreshAccessTokenModule } from 'src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/microsoft-api-refresh-access-token.module'; +import { ConnectedAccountRefreshTokensService } from 'src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service'; + +@Module({ + imports: [ + GoogleAPIRefreshAccessTokenModule, + MicrosoftAPIRefreshAccessTokenModule, + ], + providers: [ConnectedAccountRefreshTokensService], + exports: [ConnectedAccountRefreshTokensService], +}) +export class RefreshTokensManagerModule {} diff --git a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/drivers/google/google-api-refresh-access-token.module.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/google-api-refresh-access-token.module.ts similarity index 77% rename from packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/drivers/google/google-api-refresh-access-token.module.ts rename to packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/google-api-refresh-access-token.module.ts index 5c8308ec7..7bf8a7753 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/drivers/google/google-api-refresh-access-token.module.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/google-api-refresh-access-token.module.ts @@ -1,6 +1,6 @@ import { Module } from '@nestjs/common'; -import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/drivers/google/services/google-api-refresh-access-token.service'; +import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-access-token.service'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; @Module({ diff --git a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/drivers/google/services/google-api-refresh-access-token.service.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-access-token.service.ts similarity index 58% rename from packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/drivers/google/services/google-api-refresh-access-token.service.ts rename to packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-access-token.service.ts index 0afb78d71..7de5f4b00 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/drivers/google/services/google-api-refresh-access-token.service.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-access-token.service.ts @@ -1,15 +1,27 @@ import { Injectable } from '@nestjs/common'; import axios from 'axios'; +import { z } from 'zod'; import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service'; +export type GoogleTokens = { + accessToken: string; +}; + +interface GoogleRefreshTokenResponse { + access_token: string; + id_token?: string; + token_type?: string; + expires_in?: number; + scope?: string; +} @Injectable() export class GoogleAPIRefreshAccessTokenService { constructor(private readonly environmentService: EnvironmentService) {} - async refreshAccessToken(refreshToken: string): Promise { - const response = await axios.post( + async refreshAccessToken(refreshToken: string): Promise { + const response = await axios.post( 'https://oauth2.googleapis.com/token', { client_id: this.environmentService.get('AUTH_GOOGLE_CLIENT_ID'), @@ -24,6 +36,10 @@ export class GoogleAPIRefreshAccessTokenService { }, ); - return response.data.access_token; + z.string().parse(response.data.access_token); + + return { + accessToken: response.data.access_token, + }; } } diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/microsoft-api-refresh-access-token.module.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/microsoft-api-refresh-access-token.module.ts new file mode 100644 index 000000000..241aa8435 --- /dev/null +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/microsoft-api-refresh-access-token.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; + +import { EnvironmentModule } from 'src/engine/core-modules/environment/environment.module'; +import { MicrosoftAPIRefreshAccessTokenService } from 'src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service'; + +@Module({ + imports: [EnvironmentModule], + providers: [MicrosoftAPIRefreshAccessTokenService], + exports: [MicrosoftAPIRefreshAccessTokenService], +}) +export class MicrosoftAPIRefreshAccessTokenModule {} diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service.ts new file mode 100644 index 000000000..0d7116127 --- /dev/null +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service.ts @@ -0,0 +1,53 @@ +import { Injectable } from '@nestjs/common'; + +import axios from 'axios'; +import { z } from 'zod'; + +import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service'; + +export type MicrosoftTokens = { + accessToken: string; + refreshToken: string; +}; + +interface MicrosoftRefreshTokenResponse { + access_token: string; + refresh_token: string; + scope: string; + token_type: string; + expires_in: number; + id_token?: string; +} +@Injectable() +export class MicrosoftAPIRefreshAccessTokenService { + constructor(private readonly environmentService: EnvironmentService) {} + + async refreshTokens(refreshToken: string): Promise { + const response = await axios.post( + 'https://login.microsoftonline.com/common/oauth2/v2.0/token', + new URLSearchParams({ + client_id: this.environmentService.get('AUTH_MICROSOFT_CLIENT_ID'), + client_secret: this.environmentService.get( + 'AUTH_MICROSOFT_CLIENT_SECRET', + ), + refresh_token: refreshToken, + grant_type: 'refresh_token', + }), + { + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + }, + }, + ); + + z.object({ + access_token: z.string(), + refresh_token: z.string(), + }).parse(response.data); + + return { + accessToken: response.data.access_token, + refreshToken: response.data.refresh_token, + }; + } +} diff --git a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception.ts similarity index 52% rename from packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception.ts rename to packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception.ts index 39f943398..67ca06c7e 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception.ts @@ -1,12 +1,15 @@ import { CustomException } from 'src/utils/custom-exception'; -export class RefreshAccessTokenException extends CustomException { - constructor(message: string, code: RefreshAccessTokenExceptionCode) { +export class ConnectedAccountRefreshAccessTokenException extends CustomException { + constructor( + message: string, + code: ConnectedAccountRefreshAccessTokenExceptionCode, + ) { super(message, code); } } -export enum RefreshAccessTokenExceptionCode { +export enum ConnectedAccountRefreshAccessTokenExceptionCode { REFRESH_TOKEN_NOT_FOUND = 'REFRESH_TOKEN_NOT_FOUND', REFRESH_ACCESS_TOKEN_FAILED = 'REFRESH_ACCESS_TOKEN_FAILED', PROVIDER_NOT_SUPPORTED = 'PROVIDER_NOT_SUPPORTED', diff --git a/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts new file mode 100644 index 000000000..128d09fc5 --- /dev/null +++ b/packages/twenty-server/src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service.ts @@ -0,0 +1,96 @@ +import { Injectable } from '@nestjs/common'; + +import { assertUnreachable, ConnectedAccountProvider } from 'twenty-shared'; + +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { + GoogleAPIRefreshAccessTokenService, + GoogleTokens, +} from 'src/modules/connected-account/refresh-tokens-manager/drivers/google/services/google-api-refresh-access-token.service'; +import { + MicrosoftAPIRefreshAccessTokenService, + MicrosoftTokens, +} from 'src/modules/connected-account/refresh-tokens-manager/drivers/microsoft/services/microsoft-api-refresh-tokens.service'; +import { + ConnectedAccountRefreshAccessTokenException, + ConnectedAccountRefreshAccessTokenExceptionCode, +} from 'src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; + +export type ConnectedAccountTokens = GoogleTokens | MicrosoftTokens; + +@Injectable() +export class ConnectedAccountRefreshTokensService { + constructor( + private readonly googleAPIRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService, + private readonly microsoftAPIRefreshAccessTokenService: MicrosoftAPIRefreshAccessTokenService, + private readonly twentyORMManager: TwentyORMManager, + ) {} + + async refreshAndSaveTokens( + connectedAccount: ConnectedAccountWorkspaceEntity, + workspaceId: string, + ): Promise { + const refreshToken = connectedAccount.refreshToken; + + if (!refreshToken) { + throw new ConnectedAccountRefreshAccessTokenException( + `No refresh token found for connected account ${connectedAccount.id} in workspace ${workspaceId}`, + ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND, + ); + } + + const connectedAccountTokens = await this.refreshTokens( + connectedAccount, + refreshToken, + workspaceId, + ); + + try { + const connectedAccountRepository = + await this.twentyORMManager.getRepository( + 'connectedAccount', + ); + + await connectedAccountRepository.update( + { id: connectedAccount.id }, + connectedAccountTokens, + ); + } catch (error) { + throw new Error( + `Error saving the new tokens for connected account ${connectedAccount.id} in workspace ${workspaceId}: ${error.message} `, + ); + } + + return connectedAccountTokens.accessToken; + } + + async refreshTokens( + connectedAccount: ConnectedAccountWorkspaceEntity, + refreshToken: string, + workspaceId: string, + ): Promise { + try { + switch (connectedAccount.provider) { + case ConnectedAccountProvider.GOOGLE: + return this.googleAPIRefreshAccessTokenService.refreshAccessToken( + refreshToken, + ); + case ConnectedAccountProvider.MICROSOFT: + return this.microsoftAPIRefreshAccessTokenService.refreshTokens( + refreshToken, + ); + default: + return assertUnreachable( + connectedAccount.provider, + `Provider ${connectedAccount.provider} not supported`, + ); + } + } catch (error) { + throw new ConnectedAccountRefreshAccessTokenException( + `Error refreshing tokens for connected account ${connectedAccount.id} in workspace ${workspaceId}: ${error.message} ${error?.response?.data?.error_description}`, + ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index 1a8aaf4e6..746562daf 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -4,12 +4,20 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { ConnectedAccountRefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception'; +import { ConnectedAccountRefreshTokensService } from 'src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service'; import { isThrottled } from 'src/modules/connected-account/utils/is-throttled'; import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageImportDriverExceptionCode } from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception'; +import { MessageImportExceptionCode } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; +import { + MessageImportExceptionHandlerService, + MessageImportSyncStep, +} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -30,6 +38,8 @@ export class MessagingMessageListFetchJob { private readonly messagingPartialMessageListFetchService: MessagingPartialMessageListFetchService, private readonly messagingTelemetryService: MessagingTelemetryService, private readonly twentyORMManager: TwentyORMManager, + private readonly connectedAccountRefreshTokensService: ConnectedAccountRefreshTokensService, + private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, ) {} @Process(MessagingMessageListFetchJob.name) @@ -64,72 +74,112 @@ export class MessagingMessageListFetchJob { return; } - if ( - isThrottled( - messageChannel.syncStageStartedAt, - messageChannel.throttleFailureCount, - ) - ) { - return; - } + try { + if ( + isThrottled( + messageChannel.syncStageStartedAt, + messageChannel.throttleFailureCount, + ) + ) { + return; + } - switch (messageChannel.syncStage) { - case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING: - this.logger.log( - `Fetching partial message list for workspace ${workspaceId} and messageChannelId ${messageChannel.id}`, - ); + try { + messageChannel.connectedAccount.accessToken = + await this.connectedAccountRefreshTokensService.refreshAndSaveTokens( + messageChannel.connectedAccount, + workspaceId, + ); + } catch (error) { + switch (error.code) { + case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED: + case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND: + await this.messagingTelemetryService.track({ + eventName: `refresh_token.error.insufficient_permissions`, + workspaceId, + connectedAccountId: messageChannel.connectedAccountId, + messageChannelId: messageChannel.id, + message: `${error.code}: ${error.reason ?? ''}`, + }); + throw { + code: MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS, + message: error.message, + }; + case ConnectedAccountRefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED: + throw { + code: MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, + message: error.message, + }; + default: + throw error; + } + } - await this.messagingTelemetryService.track({ - eventName: 'partial_message_list_fetch.started', - workspaceId, - connectedAccountId: messageChannel.connectedAccount.id, - messageChannelId: messageChannel.id, - }); + switch (messageChannel.syncStage) { + case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING: + this.logger.log( + `Fetching partial message list for workspace ${workspaceId} and messageChannelId ${messageChannel.id}`, + ); - await this.messagingPartialMessageListFetchService.processMessageListFetch( - messageChannel, - messageChannel.connectedAccount, - workspaceId, - ); + await this.messagingTelemetryService.track({ + eventName: 'partial_message_list_fetch.started', + workspaceId, + connectedAccountId: messageChannel.connectedAccount.id, + messageChannelId: messageChannel.id, + }); - await this.messagingTelemetryService.track({ - eventName: 'partial_message_list_fetch.completed', - workspaceId, - connectedAccountId: messageChannel.connectedAccount.id, - messageChannelId: messageChannel.id, - }); + await this.messagingPartialMessageListFetchService.processMessageListFetch( + messageChannel, + messageChannel.connectedAccount, + workspaceId, + ); - break; + await this.messagingTelemetryService.track({ + eventName: 'partial_message_list_fetch.completed', + workspaceId, + connectedAccountId: messageChannel.connectedAccount.id, + messageChannelId: messageChannel.id, + }); - case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING: - this.logger.log( - `Fetching full message list for workspace ${workspaceId} and account ${messageChannel.connectedAccount.id}`, - ); + break; - await this.messagingTelemetryService.track({ - eventName: 'full_message_list_fetch.started', - workspaceId, - connectedAccountId: messageChannel.connectedAccount.id, - messageChannelId: messageChannel.id, - }); + case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING: + this.logger.log( + `Fetching full message list for workspace ${workspaceId} and account ${messageChannel.connectedAccount.id}`, + ); - await this.messagingFullMessageListFetchService.processMessageListFetch( - messageChannel, - messageChannel.connectedAccount, - workspaceId, - ); + await this.messagingTelemetryService.track({ + eventName: 'full_message_list_fetch.started', + workspaceId, + connectedAccountId: messageChannel.connectedAccount.id, + messageChannelId: messageChannel.id, + }); - await this.messagingTelemetryService.track({ - eventName: 'full_message_list_fetch.completed', - workspaceId, - connectedAccountId: messageChannel.connectedAccount.id, - messageChannelId: messageChannel.id, - }); + await this.messagingFullMessageListFetchService.processMessageListFetch( + messageChannel, + messageChannel.connectedAccount, + workspaceId, + ); - break; + await this.messagingTelemetryService.track({ + eventName: 'full_message_list_fetch.completed', + workspaceId, + connectedAccountId: messageChannel.connectedAccount.id, + messageChannelId: messageChannel.id, + }); - default: - break; + break; + + default: + break; + } + } catch (error) { + await this.messageImportErrorHandlerService.handleDriverException( + error, + MessageImportSyncStep.FULL_OR_PARTIAL_MESSAGE_LIST_FETCH, + messageChannel, + workspaceId, + ); } } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index 6e9ee2ee2..2048a182a 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -7,7 +7,7 @@ import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; import { EmailAliasManagerModule } from 'src/modules/connected-account/email-alias-manager/email-alias-manager.module'; -import { RefreshAccessTokenManagerModule } from 'src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module'; +import { RefreshTokensManagerModule } from 'src/modules/connected-account/refresh-tokens-manager/connected-account-refresh-tokens-manager.module'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cleaner/messaging-message-cleaner.module'; import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command'; @@ -25,11 +25,11 @@ import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-impo import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job'; import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job'; import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener'; -import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service'; +import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service'; import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service'; import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; @@ -38,7 +38,7 @@ import { MessageParticipantManagerModule } from 'src/modules/messaging/message-p import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/messaging-monitoring.module'; @Module({ imports: [ - RefreshAccessTokenManagerModule, + RefreshTokensManagerModule, WorkspaceDataSourceModule, MessagingGmailDriverModule, MessagingMicrosoftDriverModule, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts index 7c343078e..b8d78fcfa 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts @@ -11,12 +11,12 @@ import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/se import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; +import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; +import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessageImportExceptionHandlerService, MessageImportSyncStep, -} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; -import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; -import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; +} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; @Injectable() export class MessagingFullMessageListFetchService { constructor( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/message-import-exception-handler.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts similarity index 98% rename from packages/twenty-server/src/modules/messaging/message-import-manager/services/message-import-exception-handler.service.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts index d942e8f2b..61aa79d74 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/message-import-exception-handler.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service.ts @@ -16,6 +16,7 @@ import { export enum MessageImportSyncStep { FULL_MESSAGE_LIST_FETCH = 'FULL_MESSAGE_LIST_FETCH', PARTIAL_MESSAGE_LIST_FETCH = 'PARTIAL_MESSAGE_LIST_FETCH', + FULL_OR_PARTIAL_MESSAGE_LIST_FETCH = 'FULL_OR_PARTIAL_MESSAGE_LIST_FETCH', MESSAGES_IMPORT = 'MESSAGES_IMPORT', } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts index 6cdb47aca..f21e0d247 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts @@ -8,8 +8,8 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { EmailAliasManagerService } from 'src/modules/connected-account/email-alias-manager/services/email-alias-manager.service'; -import { RefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception'; -import { RefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service'; +import { ConnectedAccountRefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception'; +import { ConnectedAccountRefreshTokensService } from 'src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; import { @@ -19,11 +19,11 @@ import { import { MessageImportDriverExceptionCode } from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception'; import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant'; import { MessageImportExceptionCode } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; +import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service'; import { MessageImportExceptionHandlerService, MessageImportSyncStep, -} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; -import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service'; +} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service'; import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -37,7 +37,7 @@ export class MessagingMessagesImportService { private readonly cacheStorage: CacheStorageService, private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService, private readonly saveMessagesAndEnqueueContactCreationService: MessagingSaveMessagesAndEnqueueContactCreationService, - private readonly refreshAccessTokenService: RefreshAccessTokenService, + private readonly connectedAccountRefreshTokensService: ConnectedAccountRefreshTokensService, private readonly messagingTelemetryService: MessagingTelemetryService, @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) private readonly blocklistRepository: BlocklistRepository, @@ -79,14 +79,14 @@ export class MessagingMessagesImportService { try { connectedAccount.accessToken = - await this.refreshAccessTokenService.refreshAndSaveAccessToken( + await this.connectedAccountRefreshTokensService.refreshAndSaveTokens( connectedAccount, workspaceId, ); } catch (error) { switch (error.code) { - case RefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED: - case RefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND: + case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED: + case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND: await this.messagingTelemetryService.track({ eventName: `refresh_token.error.insufficient_permissions`, workspaceId, @@ -98,7 +98,7 @@ export class MessagingMessagesImportService { code: MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS, message: error.message, }; - case RefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED: + case ConnectedAccountRefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED: throw { code: MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, message: error.message, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts index ea75a01ce..e5291dcd5 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts @@ -11,12 +11,12 @@ import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/se import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; +import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; +import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; import { MessageImportExceptionHandlerService, MessageImportSyncStep, -} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; -import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service'; -import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; +} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service'; @Injectable() export class MessagingPartialMessageListFetchService { diff --git a/packages/twenty-shared/src/utils/assertUnreachable.ts b/packages/twenty-shared/src/utils/assertUnreachable.ts new file mode 100644 index 000000000..042337aee --- /dev/null +++ b/packages/twenty-shared/src/utils/assertUnreachable.ts @@ -0,0 +1,3 @@ +export const assertUnreachable = (x: never, errorMessage?: string): never => { + throw new Error(errorMessage ?? "Didn't expect to get here."); +}; diff --git a/packages/twenty-shared/src/utils/index.ts b/packages/twenty-shared/src/utils/index.ts index 78cc26d34..f7747df2b 100644 --- a/packages/twenty-shared/src/utils/index.ts +++ b/packages/twenty-shared/src/utils/index.ts @@ -1,5 +1,7 @@ +export * from './assertUnreachable'; export * from './fieldMetadata'; export * from './image'; export * from './strings'; export * from './url'; export * from './validation'; +