diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts index 1d3a931dd..a4b0fc43a 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts @@ -14,7 +14,7 @@ import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-eve import { GoogleCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/google-calendar-driver.module'; import { CalendarEventListFetchJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job'; import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service'; -import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-error-handling.service'; +import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service'; import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service'; import { CalendarGetCalendarEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service'; import { CalendarSaveEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service'; diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/exceptions/calendar-event-import-driver.exception.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/exceptions/calendar-event-import-driver.exception.ts new file mode 100644 index 000000000..47968eb9d --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/exceptions/calendar-event-import-driver.exception.ts @@ -0,0 +1,16 @@ +import { CustomException } from 'src/utils/custom-exception'; + +export class CalendarEventImportDriverException extends CustomException { + code: CalendarEventImportDriverExceptionCode; + constructor(message: string, code: CalendarEventImportDriverExceptionCode) { + super(message, code); + } +} + +export enum CalendarEventImportDriverExceptionCode { + NOT_FOUND = 'NOT_FOUND', + TEMPORARY_ERROR = 'TEMPORARY_ERROR', + INSUFFICIENT_PERMISSIONS = 'INSUFFICIENT_PERMISSIONS', + UNKNOWN = 'UNKNOWN', + UNKNOWN_NETWORK_ERROR = 'UNKNOWN_NETWORK_ERROR', +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service.ts index 20f3ec637..a140b8bfb 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service.ts @@ -4,7 +4,6 @@ import { GaxiosError } from 'gaxios'; import { calendar_v3 as calendarV3 } from 'googleapis'; import { GoogleCalendarClientProvider } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/providers/google-calendar.provider'; -import { GoogleCalendarError } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/types/google-calendar-error.type'; import { formatGoogleCalendarEvents } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/format-google-calendar-event.util'; import { parseGaxiosError } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/parse-gaxios-error.util'; import { parseGoogleCalendarError } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/parse-google-calendar-error.util'; @@ -92,7 +91,7 @@ export class GoogleCalendarGetEventsService { throw parseGaxiosError(error); } if (error.response?.status !== 410) { - const googleCalendarError: GoogleCalendarError = { + const googleCalendarError = { code: error.response?.status, reason: error.response?.data?.error?.errors?.[0].reason || diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/types/google-calendar-error.type.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/types/google-calendar-error.type.ts deleted file mode 100644 index b007431ad..000000000 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/types/google-calendar-error.type.ts +++ /dev/null @@ -1,5 +0,0 @@ -export type GoogleCalendarError = { - code?: number; - reason: string; - message: string; -}; diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/parse-gaxios-error.util.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/parse-gaxios-error.util.ts index 4245ff82f..33f9a5a09 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/parse-gaxios-error.util.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/parse-gaxios-error.util.ts @@ -1,11 +1,13 @@ import { GaxiosError } from 'gaxios'; import { - CalendarEventError, - CalendarEventErrorCode, -} from 'src/modules/calendar/calendar-event-import-manager/types/calendar-event-error.type'; + CalendarEventImportDriverException, + CalendarEventImportDriverExceptionCode, +} from 'src/modules/calendar/calendar-event-import-manager/drivers/exceptions/calendar-event-import-driver.exception'; -export const parseGaxiosError = (error: GaxiosError): CalendarEventError => { +export const parseGaxiosError = ( + error: GaxiosError, +): CalendarEventImportDriverException => { const { code } = error; switch (code) { @@ -14,15 +16,15 @@ export const parseGaxiosError = (error: GaxiosError): CalendarEventError => { case 'ECONNABORTED': case 'ETIMEDOUT': case 'ERR_NETWORK': - return { - code: CalendarEventErrorCode.TEMPORARY_ERROR, - message: error.message, - }; + return new CalendarEventImportDriverException( + error.message, + CalendarEventImportDriverExceptionCode.TEMPORARY_ERROR, + ); default: - return { - code: CalendarEventErrorCode.UNKNOWN, - message: error.message, - }; + return new CalendarEventImportDriverException( + error.message, + CalendarEventImportDriverExceptionCode.UNKNOWN_NETWORK_ERROR, + ); } }; diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/parse-google-calendar-error.util.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/parse-google-calendar-error.util.ts index 9e5667c62..f7ddd8e2f 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/parse-google-calendar-error.util.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/utils/parse-google-calendar-error.util.ts @@ -1,86 +1,87 @@ -import { GoogleCalendarError } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/types/google-calendar-error.type'; import { - CalendarEventError, - CalendarEventErrorCode, -} from 'src/modules/calendar/calendar-event-import-manager/types/calendar-event-error.type'; + CalendarEventImportDriverException, + CalendarEventImportDriverExceptionCode, +} from 'src/modules/calendar/calendar-event-import-manager/drivers/exceptions/calendar-event-import-driver.exception'; -export const parseGoogleCalendarError = ( - error: GoogleCalendarError, -): CalendarEventError => { +export const parseGoogleCalendarError = (error: { + code?: number; + reason: string; + message: string; +}): CalendarEventImportDriverException => { const { code, reason, message } = error; switch (code) { case 400: if (reason === 'invalid_grant') { - return { - code: CalendarEventErrorCode.INSUFFICIENT_PERMISSIONS, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS, + ); } if (reason === 'failedPrecondition') { - return { - code: CalendarEventErrorCode.TEMPORARY_ERROR, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.TEMPORARY_ERROR, + ); } - return { - code: CalendarEventErrorCode.UNKNOWN, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.UNKNOWN, + ); case 404: - return { - code: CalendarEventErrorCode.NOT_FOUND, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.NOT_FOUND, + ); case 429: - return { - code: CalendarEventErrorCode.TEMPORARY_ERROR, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.TEMPORARY_ERROR, + ); case 403: if ( reason === 'rateLimitExceeded' || reason === 'userRateLimitExceeded' ) { - return { - code: CalendarEventErrorCode.TEMPORARY_ERROR, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.TEMPORARY_ERROR, + ); } else { - return { - code: CalendarEventErrorCode.INSUFFICIENT_PERMISSIONS, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS, + ); } case 401: - return { - code: CalendarEventErrorCode.INSUFFICIENT_PERMISSIONS, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS, + ); case 500: if (reason === 'backendError') { - return { - code: CalendarEventErrorCode.TEMPORARY_ERROR, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.TEMPORARY_ERROR, + ); } else { - return { - code: CalendarEventErrorCode.UNKNOWN, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.UNKNOWN, + ); } default: break; } - return { - code: CalendarEventErrorCode.UNKNOWN, + return new CalendarEventImportDriverException( message, - }; + CalendarEventImportDriverExceptionCode.UNKNOWN, + ); }; diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/exceptions/calendar-event-import.exception.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/exceptions/calendar-event-import.exception.ts new file mode 100644 index 000000000..1af518109 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/exceptions/calendar-event-import.exception.ts @@ -0,0 +1,14 @@ +import { CustomException } from 'src/utils/custom-exception'; + +export class CalendarEventImportException extends CustomException { + code: CalendarEventImportExceptionCode; + constructor(message: string, code: CalendarEventImportExceptionCode) { + super(message, code); + } +} + +export enum CalendarEventImportExceptionCode { + PROVIDER_NOT_SUPPORTED = 'PROVIDER_NOT_SUPPORTED', + UNKNOWN = 'UNKNOWN', + CALENDAR_CHANNEL_NOT_FOUND = 'CALENDAR_CHANNEL_NOT_FOUND', +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts index 21ccdaafc..0355dde32 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts @@ -4,12 +4,17 @@ import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { + CalendarEventImportException, + CalendarEventImportExceptionCode, +} from 'src/modules/calendar/calendar-event-import-manager/exceptions/calendar-event-import.exception'; import { CalendarChannelSyncStage, CalendarChannelSyncStatus, CalendarChannelWorkspaceEntity, } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { AccountsToReconnectKeys } from 'src/modules/connected-account/types/accounts-to-reconnect-key-value.type'; @Injectable() @@ -161,6 +166,31 @@ export class CalendarChannelSyncStatusService { syncStage: CalendarChannelSyncStage.FAILED, }); + const connectedAccountRepository = + await this.twentyORMManager.getRepository( + 'connectedAccount', + ); + + const calendarChannel = await calendarChannelRepository.findOne({ + where: { id: calendarChannelId }, + }); + + if (!calendarChannel) { + throw new CalendarEventImportException( + `Calendar channel ${calendarChannelId} not found in workspace ${workspaceId}`, + CalendarEventImportExceptionCode.CALENDAR_CHANNEL_NOT_FOUND, + ); + } + + const connectedAccountId = calendarChannel.connectedAccountId; + + await connectedAccountRepository.update( + { id: connectedAccountId }, + { + authFailedAt: new Date(), + }, + ); + await this.addToAccountsToReconnect(calendarChannelId, workspaceId); } diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-error-handling.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service.ts similarity index 70% rename from packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-error-handling.service.ts rename to packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service.ts index c6fbd93ec..8f1de59aa 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-error-handling.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service.ts @@ -2,8 +2,15 @@ import { Injectable } from '@nestjs/common'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { CALENDAR_THROTTLE_MAX_ATTEMPTS } from 'src/modules/calendar/calendar-event-import-manager/constants/calendar-throttle-max-attempts'; +import { + CalendarEventImportDriverException, + CalendarEventImportDriverExceptionCode, +} from 'src/modules/calendar/calendar-event-import-manager/drivers/exceptions/calendar-event-import-driver.exception'; +import { + CalendarEventImportException, + CalendarEventImportExceptionCode, +} from 'src/modules/calendar/calendar-event-import-manager/exceptions/calendar-event-import.exception'; import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service'; -import { CalendarEventError } from 'src/modules/calendar/calendar-event-import-manager/types/calendar-event-error.type'; import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; export enum CalendarEventImportSyncStep { @@ -19,8 +26,8 @@ export class CalendarEventImportErrorHandlerService { private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService, ) {} - public async handleError( - error: CalendarEventError, + public async handleDriverException( + exception: CalendarEventImportDriverException, syncStep: CalendarEventImportSyncStep, calendarChannel: Pick< CalendarChannelWorkspaceEntity, @@ -28,26 +35,41 @@ export class CalendarEventImportErrorHandlerService { >, workspaceId: string, ): Promise { - switch (error.code) { - case 'NOT_FOUND': - await this.handleNotFoundError(syncStep, calendarChannel, workspaceId); - break; - case 'TEMPORARY_ERROR': - await this.handleTemporaryError(syncStep, calendarChannel, workspaceId); - break; - case 'INSUFFICIENT_PERMISSIONS': - await this.handleInsufficientPermissionsError( + switch (exception.code) { + case CalendarEventImportDriverExceptionCode.NOT_FOUND: + await this.handleNotFoundException( + syncStep, calendarChannel, workspaceId, ); break; - case 'UNKNOWN': - await this.handleUnknownError(error, calendarChannel, workspaceId); + case CalendarEventImportDriverExceptionCode.TEMPORARY_ERROR: + await this.handleTemporaryException( + syncStep, + calendarChannel, + workspaceId, + ); break; + case CalendarEventImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS: + await this.handleInsufficientPermissionsException( + calendarChannel, + workspaceId, + ); + break; + case CalendarEventImportDriverExceptionCode.UNKNOWN: + case CalendarEventImportDriverExceptionCode.UNKNOWN_NETWORK_ERROR: + await this.handleUnknownException( + exception, + calendarChannel, + workspaceId, + ); + break; + default: + throw exception; } } - private async handleTemporaryError( + private async handleTemporaryException( syncStep: CalendarEventImportSyncStep, calendarChannel: Pick< CalendarChannelWorkspaceEntity, @@ -103,7 +125,7 @@ export class CalendarEventImportErrorHandlerService { } } - private async handleInsufficientPermissionsError( + private async handleInsufficientPermissionsException( calendarChannel: Pick, workspaceId: string, ): Promise { @@ -113,8 +135,8 @@ export class CalendarEventImportErrorHandlerService { ); } - private async handleUnknownError( - error: CalendarEventError, + private async handleUnknownException( + exception: CalendarEventImportDriverException, calendarChannel: Pick, workspaceId: string, ): Promise { @@ -123,12 +145,13 @@ export class CalendarEventImportErrorHandlerService { workspaceId, ); - throw new Error( - `Unknown error occurred while importing calendar events for calendar channel ${calendarChannel.id} in workspace ${workspaceId}: ${error.message}`, + throw new CalendarEventImportException( + `Unknown error occurred while importing calendar events for calendar channel ${calendarChannel.id} in workspace ${workspaceId}: ${exception.message}`, + CalendarEventImportExceptionCode.UNKNOWN, ); } - private async handleNotFoundError( + private async handleNotFoundException( syncStep: CalendarEventImportSyncStep, calendarChannel: Pick, workspaceId: string, diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts index d2dd7c90a..d2b7a6181 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts @@ -11,7 +11,7 @@ import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar- import { CalendarEventImportErrorHandlerService, CalendarEventImportSyncStep, -} from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-error-handling.service'; +} from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service'; import { CalendarGetCalendarEventsService, GetCalendarEventsResponse, @@ -64,23 +64,66 @@ export class CalendarEventsImportService { calendarEvents = getCalendarEventsResponse.calendarEvents; nextSyncCursor = getCalendarEventsResponse.nextSyncCursor; - } catch (error) { - await this.calendarEventImportErrorHandlerService.handleError( - error, - syncStep, - calendarChannel, + + const calendarChannelRepository = + await this.twentyORMManager.getRepository( + 'calendarChannel', + ); + + if (!calendarEvents || calendarEvents?.length === 0) { + await calendarChannelRepository.update( + { + id: calendarChannel.id, + }, + { + syncCursor: nextSyncCursor, + }, + ); + + await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch( + calendarChannel.id, + ); + } + + const blocklist = await this.blocklistRepository.getByWorkspaceMemberId( + connectedAccount.accountOwnerId, workspaceId, ); - return; - } + const { filteredEvents, cancelledEvents } = + filterEventsAndReturnCancelledEvents( + calendarChannel, + calendarEvents, + blocklist.map((blocklist) => blocklist.handle), + ); - const calendarChannelRepository = - await this.twentyORMManager.getRepository( - 'calendarChannel', + const cancelledEventExternalIds = cancelledEvents.map( + (event) => event.externalId, + ); + + await this.calendarSaveEventsService.saveCalendarEventsAndEnqueueContactCreationJob( + filteredEvents, + calendarChannel, + connectedAccount, + workspaceId, + ); + + const calendarChannelEventAssociationRepository = + await this.twentyORMManager.getRepository( + 'calendarChannelEventAssociation', + ); + + await calendarChannelEventAssociationRepository.delete({ + eventExternalId: Any(cancelledEventExternalIds), + calendarChannel: { + id: calendarChannel.id, + }, + }); + + await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents( + workspaceId, ); - if (!calendarEvents || calendarEvents?.length === 0) { await calendarChannelRepository.update( { id: calendarChannel.id, @@ -90,61 +133,16 @@ export class CalendarEventsImportService { }, ); - await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch( + await this.calendarChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( calendarChannel.id, ); - } - - const blocklist = await this.blocklistRepository.getByWorkspaceMemberId( - connectedAccount.accountOwnerId, - workspaceId, - ); - - const { filteredEvents, cancelledEvents } = - filterEventsAndReturnCancelledEvents( + } catch (error) { + await this.calendarEventImportErrorHandlerService.handleDriverException( + error, + syncStep, calendarChannel, - calendarEvents, - blocklist.map((blocklist) => blocklist.handle), + workspaceId, ); - - const cancelledEventExternalIds = cancelledEvents.map( - (event) => event.externalId, - ); - - await this.calendarSaveEventsService.saveCalendarEventsAndEnqueueContactCreationJob( - filteredEvents, - calendarChannel, - connectedAccount, - workspaceId, - ); - - const calendarChannelEventAssociationRepository = - await this.twentyORMManager.getRepository( - 'calendarChannelEventAssociation', - ); - - await calendarChannelEventAssociationRepository.delete({ - eventExternalId: Any(cancelledEventExternalIds), - calendarChannel: { - id: calendarChannel.id, - }, - }); - - await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents( - workspaceId, - ); - - await calendarChannelRepository.update( - { - id: calendarChannel.id, - }, - { - syncCursor: nextSyncCursor, - }, - ); - - await this.calendarChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( - calendarChannel.id, - ); + } } } diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service.ts index d9fa12110..8bee58bd4 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service.ts @@ -1,6 +1,10 @@ import { Injectable } from '@nestjs/common'; import { GoogleCalendarGetEventsService as GoogleCalendarGetCalendarEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service'; +import { + CalendarEventImportException, + CalendarEventImportExceptionCode, +} from 'src/modules/calendar/calendar-event-import-manager/exceptions/calendar-event-import.exception'; import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types/calendar-event'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @@ -29,8 +33,9 @@ export class CalendarGetCalendarEventsService { syncCursor, ); default: - throw new Error( - `Provider ${connectedAccount.provider} is not supported.`, + throw new CalendarEventImportException( + `Provider ${connectedAccount.provider} is not supported`, + CalendarEventImportExceptionCode.PROVIDER_NOT_SUPPORTED, ); } } diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/types/calendar-event-error.type.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/types/calendar-event-error.type.ts deleted file mode 100644 index 7bf1b56f8..000000000 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/types/calendar-event-error.type.ts +++ /dev/null @@ -1,11 +0,0 @@ -export enum CalendarEventErrorCode { - NOT_FOUND = 'NOT_FOUND', - TEMPORARY_ERROR = 'TEMPORARY_ERROR', - INSUFFICIENT_PERMISSIONS = 'INSUFFICIENT_PERMISSIONS', - UNKNOWN = 'UNKNOWN', -} - -export interface CalendarEventError { - message: string; - code: CalendarEventErrorCode; -} diff --git a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception.ts b/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception.ts new file mode 100644 index 000000000..1f87487e5 --- /dev/null +++ b/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception.ts @@ -0,0 +1,14 @@ +import { CustomException } from 'src/utils/custom-exception'; + +export class RefreshAccessTokenException extends CustomException { + code: RefreshAccessTokenExceptionCode; + constructor(message: string, code: RefreshAccessTokenExceptionCode) { + super(message, code); + } +} + +export enum RefreshAccessTokenExceptionCode { + REFRESH_TOKEN_NOT_FOUND = 'REFRESH_TOKEN_NOT_FOUND', + REFRESH_ACCESS_TOKEN_FAILED = 'REFRESH_ACCESS_TOKEN_FAILED', + PROVIDER_NOT_SUPPORTED = 'PROVIDER_NOT_SUPPORTED', +} diff --git a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service.ts b/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service.ts index 7f381750f..3326cd4ba 100644 --- a/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service.ts +++ b/packages/twenty-server/src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service.ts @@ -2,6 +2,10 @@ import { Injectable } from '@nestjs/common'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/drivers/google/services/google-api-refresh-access-token.service'; +import { + RefreshAccessTokenException, + RefreshAccessTokenExceptionCode, +} from 'src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception'; import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @@ -20,20 +24,25 @@ export class RefreshAccessTokenService { const refreshToken = connectedAccount.refreshToken; if (!refreshToken) { - throw new Error( + throw new RefreshAccessTokenException( `No refresh token found for connected account ${connectedAccount.id} in workspace ${workspaceId}`, + RefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND, ); } - const accessToken = await this.refreshAccessToken( - connectedAccount, - refreshToken, - ); - await this.connectedAccountRepository.updateAccessToken( - accessToken, - connectedAccount.id, - workspaceId, - ); + let accessToken: string; + + try { + accessToken = await this.refreshAccessToken( + connectedAccount, + refreshToken, + ); + } catch (error) { + throw new RefreshAccessTokenException( + `Error refreshing access token for connected account ${connectedAccount.id} in workspace ${workspaceId}: ${error.message}`, + RefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED, + ); + } await this.connectedAccountRepository.updateAccessToken( accessToken, @@ -54,8 +63,9 @@ export class RefreshAccessTokenService { refreshToken, ); default: - throw new Error( - `Provider ${connectedAccount.provider} is not supported.`, + throw new RefreshAccessTokenException( + `Provider ${connectedAccount.provider} is not supported`, + RefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED, ); } } diff --git a/packages/twenty-server/src/modules/match-participant/match-participant.service.ts b/packages/twenty-server/src/modules/match-participant/match-participant.service.ts index c5cd06f30..509bbfe6b 100644 --- a/packages/twenty-server/src/modules/match-participant/match-participant.service.ts +++ b/packages/twenty-server/src/modules/match-participant/match-participant.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { Any } from 'typeorm'; +import { Any, EntityManager } from 'typeorm'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; @@ -39,7 +39,7 @@ export class MatchParticipantService< public async matchParticipants( participants: ParticipantWorkspaceEntity[], objectMetadataName: 'messageParticipant' | 'calendarEventParticipant', - transactionManager?: any, + transactionManager?: EntityManager, ) { const participantRepository = await this.getParticipantRepository(objectMetadataName); diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts index c34bbe192..4335cf29d 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts @@ -17,7 +17,7 @@ import { BlocklistItemDeleteMessagesJob, BlocklistItemDeleteMessagesJobData, } from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job'; -import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; +import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @Injectable() @@ -27,7 +27,7 @@ export class MessagingBlocklistListener { private readonly messageQueueService: MessageQueueService, @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, - private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, + private readonly messagingChannelSyncStatusService: MessageChannelSyncStatusService, private readonly twentyORMManager: TwentyORMManager, ) {} diff --git a/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts b/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts index 8f5701524..d923d7655 100644 --- a/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts +++ b/packages/twenty-server/src/modules/messaging/common/messaging-common.module.ts @@ -4,7 +4,7 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module'; -import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; +import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; @Module({ imports: [ @@ -12,7 +12,7 @@ import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/ TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), ConnectedAccountModule, ], - providers: [MessagingChannelSyncStatusService], - exports: [MessagingChannelSyncStatusService], + providers: [MessageChannelSyncStatusService], + exports: [MessageChannelSyncStatusService], }) export class MessagingCommonModule {} diff --git a/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts similarity index 82% rename from packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts rename to packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts index 43025a021..9c6eeb188 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts @@ -5,15 +5,20 @@ import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decora import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { AccountsToReconnectKeys } from 'src/modules/connected-account/types/accounts-to-reconnect-key-value.type'; import { MessageChannelSyncStage, MessageChannelSyncStatus, MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + MessageImportException, + MessageImportExceptionCode, +} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; @Injectable() -export class MessagingChannelSyncStatusService { +export class MessageChannelSyncStatusService { constructor( @InjectCacheStorage(CacheStorageNamespace.ModuleMessaging) private readonly cacheStorage: CacheStorageService, @@ -28,9 +33,7 @@ export class MessagingChannelSyncStatusService { ); await messageChannelRepository.update( - { - id: messageChannelId, - }, + { id: messageChannelId }, { syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, }, @@ -44,9 +47,7 @@ export class MessagingChannelSyncStatusService { ); await messageChannelRepository.update( - { - id: messageChannelId, - }, + { id: messageChannelId }, { syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, }, @@ -60,9 +61,7 @@ export class MessagingChannelSyncStatusService { ); await messageChannelRepository.update( - { - id: messageChannelId, - }, + { id: messageChannelId }, { syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, }, @@ -83,9 +82,7 @@ export class MessagingChannelSyncStatusService { ); await messageChannelRepository.update( - { - id: messageChannelId, - }, + { id: messageChannelId }, { syncCursor: '', syncStageStartedAt: null, @@ -103,9 +100,7 @@ export class MessagingChannelSyncStatusService { ); await messageChannelRepository.update( - { - id: messageChannelId, - }, + { id: messageChannelId }, { syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, syncStatus: MessageChannelSyncStatus.ONGOING, @@ -122,9 +117,7 @@ export class MessagingChannelSyncStatusService { ); await messageChannelRepository.update( - { - id: messageChannelId, - }, + { id: messageChannelId }, { syncStatus: MessageChannelSyncStatus.ACTIVE, }, @@ -140,9 +133,7 @@ export class MessagingChannelSyncStatusService { ); await messageChannelRepository.update( - { - id: messageChannelId, - }, + { id: messageChannelId }, { syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, }, @@ -163,9 +154,7 @@ export class MessagingChannelSyncStatusService { ); await messageChannelRepository.update( - { - id: messageChannelId, - }, + { id: messageChannelId }, { syncStage: MessageChannelSyncStage.FAILED, syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN, @@ -187,15 +176,38 @@ export class MessagingChannelSyncStatusService { ); await messageChannelRepository.update( - { - id: messageChannelId, - }, + { id: messageChannelId }, { syncStage: MessageChannelSyncStage.FAILED, syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, }, ); + const connectedAccountRepository = + await this.twentyORMManager.getRepository( + 'connectedAccount', + ); + + const messageChannel = await messageChannelRepository.findOne({ + where: { id: messageChannelId }, + }); + + if (!messageChannel) { + throw new MessageImportException( + `Message channel ${messageChannelId} not found in workspace ${workspaceId}`, + MessageImportExceptionCode.MESSAGE_CHANNEL_NOT_FOUND, + ); + } + + const connectedAccountId = messageChannel.connectedAccountId; + + await connectedAccountRepository.update( + { id: connectedAccountId }, + { + authFailedAt: new Date(), + }, + ); + await this.addToAccountsToReconnect(messageChannelId, workspaceId); } @@ -209,9 +221,7 @@ export class MessagingChannelSyncStatusService { ); const messageChannel = await messageChannelRepository.findOne({ - where: { - id: messageChannelId, - }, + where: { id: messageChannelId }, relations: { connectedAccount: { accountOwner: true, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception.ts new file mode 100644 index 000000000..600b66853 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception.ts @@ -0,0 +1,17 @@ +import { CustomException } from 'src/utils/custom-exception'; + +export class MessageImportDriverException extends CustomException { + code: MessageImportDriverExceptionCode; + constructor(message: string, code: MessageImportDriverExceptionCode) { + super(message, code); + } +} + +export enum MessageImportDriverExceptionCode { + NOT_FOUND = 'NOT_FOUND', + TEMPORARY_ERROR = 'TEMPORARY_ERROR', + INSUFFICIENT_PERMISSIONS = 'INSUFFICIENT_PERMISSIONS', + UNKNOWN = 'UNKNOWN', + UNKNOWN_NETWORK_ERROR = 'UNKNOWN_NETWORK_ERROR', + NO_NEXT_SYNC_CURSOR = 'NO_NEXT_SYNC_CURSOR', +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts index ac5bf9ea6..8229618a2 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module.ts @@ -12,11 +12,12 @@ import { EmailAliasManagerModule } from 'src/modules/connected-account/email-ali import { OAuth2ClientManagerModule } from 'src/modules/connected-account/oauth2-client-manager/oauth2-client-manager.module'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; -import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider'; -import { MessagingGmailFetchByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-by-batch.service'; -import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service'; -import { MessagingGmailFetchMessageIdsToExcludeService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-ids-to-exclude.service'; -import { MessagingGmailHistoryService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service'; +import { GmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/gmail-client.provider'; +import { GmailFetchByBatchService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-fetch-by-batch.service'; +import { GmailGetHistoryService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-history.service'; +import { GmailGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service'; +import { GmailGetMessagesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-messages.service'; +import { GmailHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-handle-error.service'; import { MessageParticipantManagerModule } from 'src/modules/messaging/message-participant-manager/message-participant-manager.module'; @Module({ @@ -38,18 +39,13 @@ import { MessageParticipantManagerModule } from 'src/modules/messaging/message-p MessageParticipantManagerModule, ], providers: [ - MessagingGmailClientProvider, - MessagingGmailHistoryService, - MessagingGmailFetchByBatchesService, - MessagingGmailFetchMessagesByBatchesService, - MessagingGmailFetchMessageIdsToExcludeService, - ], - exports: [ - MessagingGmailClientProvider, - MessagingGmailHistoryService, - MessagingGmailFetchByBatchesService, - MessagingGmailFetchMessagesByBatchesService, - MessagingGmailFetchMessageIdsToExcludeService, + GmailClientProvider, + GmailGetHistoryService, + GmailFetchByBatchService, + GmailGetMessagesService, + GmailGetMessageListService, + GmailHandleErrorService, ], + exports: [GmailGetMessagesService, GmailGetMessageListService], }) export class MessagingGmailDriverModule {} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/providers/gmail-client.provider.ts similarity index 95% rename from packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/providers/gmail-client.provider.ts index 834fedca4..47b74a77c 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/providers/gmail-client.provider.ts @@ -6,7 +6,7 @@ import { OAuth2ClientManagerService } from 'src/modules/connected-account/oauth2 import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @Injectable() -export class MessagingGmailClientProvider { +export class GmailClientProvider { constructor( private readonly oAuth2ClientManagerService: OAuth2ClientManagerService, ) {} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-by-batch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-fetch-by-batch.service.ts similarity index 96% rename from packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-by-batch.service.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-fetch-by-batch.service.ts index 13720769b..66c7c5e02 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-by-batch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-fetch-by-batch.service.ts @@ -3,12 +3,12 @@ import { Injectable } from '@nestjs/common'; import { AxiosResponse } from 'axios'; -import { GmailMessageParsedResponse } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message-parsed-response'; +import { GmailMessageParsedResponse } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message-parsed-response.type'; +import { createQueriesFromMessageIds } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/create-queries-from-message-ids.util'; import { BatchQueries } from 'src/modules/messaging/message-import-manager/types/batch-queries'; -import { createQueriesFromMessageIds } from 'src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util'; @Injectable() -export class MessagingGmailFetchByBatchesService { +export class GmailFetchByBatchService { constructor(private readonly httpService: HttpService) {} async fetchAllByBatches( diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-history.service.ts similarity index 80% rename from packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-history.service.ts index df799bc91..4e03a104c 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-history.service.ts @@ -1,14 +1,15 @@ import { Injectable } from '@nestjs/common'; -import { GaxiosResponse } from 'gaxios'; import { gmail_v1 } from 'googleapis'; import { MESSAGING_GMAIL_USERS_HISTORY_MAX_RESULT } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-history-max-result.constant'; -import { GmailError } from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service'; +import { GmailHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-handle-error.service'; @Injectable() -export class MessagingGmailHistoryService { - constructor() {} +export class GmailGetHistoryService { + constructor( + private readonly gmailHandleErrorService: GmailHandleErrorService, + ) {} public async getHistory( gmailClient: gmail_v1.Gmail, @@ -18,34 +19,33 @@ export class MessagingGmailHistoryService { ): Promise<{ history: gmail_v1.Schema$History[]; historyId?: string | null; - error?: GmailError; }> { const fullHistory: gmail_v1.Schema$History[] = []; let pageToken: string | undefined; let hasMoreMessages = true; let nextHistoryId: string | undefined; - let response: GaxiosResponse; while (hasMoreMessages) { - try { - response = await gmailClient.users.history.list({ + const response = await gmailClient.users.history + .list({ userId: 'me', maxResults: MESSAGING_GMAIL_USERS_HISTORY_MAX_RESULT, pageToken, startHistoryId: lastSyncHistoryId, historyTypes: historyTypes || ['messageAdded', 'messageDeleted'], labelId, + }) + .catch((error) => { + this.gmailHandleErrorService.handleError(error); + + return { + data: { + history: [], + historyId: lastSyncHistoryId, + nextPageToken: undefined, + }, + }; }); - } catch (error) { - return { - history: [], - error: { - code: error.response?.status, - reason: error.response?.data?.error, - }, - historyId: lastSyncHistoryId, - }; - } nextHistoryId = response?.data?.historyId ?? undefined; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts new file mode 100644 index 000000000..03509497e --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service.ts @@ -0,0 +1,168 @@ +import { Injectable } from '@nestjs/common'; + +import { gmail_v1 as gmailV1 } from 'googleapis'; + +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { + MessageImportDriverException, + MessageImportDriverExceptionCode, +} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception'; +import { MESSAGING_GMAIL_EXCLUDED_CATEGORIES } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories'; +import { MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-list-max-result.constant'; +import { GmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/gmail-client.provider'; +import { GmailGetHistoryService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-history.service'; +import { GmailHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-handle-error.service'; +import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter.util'; +import { computeGmailCategoryLabelId } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id.util'; +import { + GetFullMessageListResponse, + GetPartialMessageListResponse, +} from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; +import { assertNotNull } from 'src/utils/assert'; + +@Injectable() +export class GmailGetMessageListService { + constructor( + private readonly gmailClientProvider: GmailClientProvider, + private readonly gmailGetHistoryService: GmailGetHistoryService, + private readonly gmailHandleErrorService: GmailHandleErrorService, + ) {} + + public async getFullMessageList( + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'provider' | 'refreshToken' | 'id' + >, + ): Promise { + const gmailClient = + await this.gmailClientProvider.getGmailClient(connectedAccount); + + let pageToken: string | undefined; + let hasMoreMessages = true; + let firstMessageExternalId: string | undefined; + const messageExternalIds: string[] = []; + + while (hasMoreMessages) { + const messageList = await gmailClient.users.messages + .list({ + userId: 'me', + maxResults: MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT, + pageToken, + q: computeGmailCategoryExcludeSearchFilter( + MESSAGING_GMAIL_EXCLUDED_CATEGORIES, + ), + }) + .catch((error) => { + this.gmailHandleErrorService.handleError(error); + + return { + data: { + messages: [], + nextPageToken: undefined, + }, + }; + }); + + pageToken = messageList.data.nextPageToken ?? undefined; + hasMoreMessages = !!pageToken; + + const { messages } = messageList.data; + + if (!messages || messages.length === 0) { + break; + } + + if (!firstMessageExternalId) { + firstMessageExternalId = messageList.data.messages?.[0].id ?? undefined; + } + + messageExternalIds.push(...messages.map((message) => message.id)); + } + + const firstMessageContent = await gmailClient.users.messages + .get({ + userId: 'me', + id: firstMessageExternalId, + }) + .catch((error) => { + this.gmailHandleErrorService.handleError(error); + }); + + const nextSyncCursor = firstMessageContent?.data?.historyId; + + if (!nextSyncCursor) { + throw new MessageImportDriverException( + `No historyId found for message ${firstMessageExternalId} for connected account ${connectedAccount.id}`, + MessageImportDriverExceptionCode.NO_NEXT_SYNC_CURSOR, + ); + } + + return { messageExternalIds, nextSyncCursor }; + } + + public async getPartialMessageList( + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'provider' | 'refreshToken' | 'id' + >, + syncCursor: string, + ): Promise { + const gmailClient = + await this.gmailClientProvider.getGmailClient(connectedAccount); + + const { history, historyId: nextSyncCursor } = + await this.gmailGetHistoryService.getHistory(gmailClient, syncCursor); + + const { messagesAdded, messagesDeleted } = + await this.gmailGetHistoryService.getMessageIdsFromHistory(history); + + const messageIdsToFilter = await this.getEmailIdsFromExcludedCategories( + gmailClient, + syncCursor, + ); + + const messagesAddedFiltered = messagesAdded.filter( + (messageId) => !messageIdsToFilter.includes(messageId), + ); + + if (!nextSyncCursor) { + throw new MessageImportDriverException( + `No nextSyncCursor found for connected account ${connectedAccount.id}`, + MessageImportDriverExceptionCode.NO_NEXT_SYNC_CURSOR, + ); + } + + return { + messageExternalIds: messagesAddedFiltered, + messageExternalIdsToDelete: messagesDeleted, + nextSyncCursor, + }; + } + + private async getEmailIdsFromExcludedCategories( + gmailClient: gmailV1.Gmail, + lastSyncHistoryId: string, + ): Promise { + const emailIds: string[] = []; + + for (const category of MESSAGING_GMAIL_EXCLUDED_CATEGORIES) { + const { history } = await this.gmailGetHistoryService.getHistory( + gmailClient, + lastSyncHistoryId, + ['messageAdded'], + computeGmailCategoryLabelId(category), + ); + + const emailIdsFromCategory = history + .map((history) => history.messagesAdded) + .flat() + .map((message) => message?.message?.id) + .filter((id) => id) + .filter(assertNotNull); + + emailIds.push(...emailIdsFromCategory); + } + + return emailIds; + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-messages.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-messages.service.ts new file mode 100644 index 000000000..98358a889 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-messages.service.ts @@ -0,0 +1,98 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import { AxiosResponse } from 'axios'; +import { gmail_v1 as gmailV1 } from 'googleapis'; + +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { GmailFetchByBatchService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-fetch-by-batch.service'; +import { GmailHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-handle-error.service'; +import { parseAndFormatGmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-and-format-gmail-message.util'; +import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message'; +import { isDefined } from 'src/utils/is-defined'; + +@Injectable() +export class GmailGetMessagesService { + private readonly logger = new Logger(GmailGetMessagesService.name); + + constructor( + private readonly fetchByBatchesService: GmailFetchByBatchService, + private readonly gmailHandleErrorService: GmailHandleErrorService, + ) {} + + async getMessages( + messageIds: string[], + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'accessToken' | 'refreshToken' | 'id' | 'handle' | 'handleAliases' + >, + workspaceId: string, + ): Promise { + let startTime = Date.now(); + + const { messageIdsByBatch, batchResponses } = + await this.fetchByBatchesService.fetchAllByBatches( + messageIds, + connectedAccount.accessToken, + 'batch_gmail_messages', + ); + let endTime = Date.now(); + + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} fetching ${ + messageIds.length + } messages in ${endTime - startTime}ms`, + ); + + startTime = Date.now(); + + const messages = batchResponses.flatMap((response, index) => { + return this.formatBatchResponseAsMessage( + messageIdsByBatch[index], + response, + connectedAccount, + ); + }); + + endTime = Date.now(); + + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} formatting ${ + messageIds.length + } messages in ${endTime - startTime}ms`, + ); + + return messages; + } + + private formatBatchResponseAsMessage( + messageIds: string[], + responseCollection: AxiosResponse, + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'handle' | 'handleAliases' + >, + ): MessageWithParticipants[] { + const parsedResponses = + this.fetchByBatchesService.parseBatch(responseCollection); + + const messages = parsedResponses.map((response, index) => { + if ('error' in response) { + if (response.error.code === 404) { + return null; + } + + this.gmailHandleErrorService.handleError( + response.error, + messageIds[index], + ); + } + + return parseAndFormatGmailMessage( + response as gmailV1.Schema$Message, + connectedAccount, + ); + }); + + return messages.filter(isDefined); + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-handle-error.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-handle-error.service.ts new file mode 100644 index 000000000..cdffcaf54 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-handle-error.service.ts @@ -0,0 +1,33 @@ +import { Injectable } from '@nestjs/common'; + +import { parseGaxiosError } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util'; +import { parseGmailError } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gmail-error.util'; + +@Injectable() +export class GmailHandleErrorService { + constructor() {} + + public handleError(error: any, messageExternalId?: string): void { + if ( + error.code && + [ + 'ECONNRESET', + 'ENOTFOUND', + 'ECONNABORTED', + 'ETIMEDOUT', + 'ERR_NETWORK', + ].includes(error.code) + ) { + throw parseGaxiosError(error); + } + if (error.response?.status !== 410) { + const gmailError = { + code: error.response?.status, + reason: `${error.response?.data?.error?.errors?.[0].reason || error.response?.data?.error || ''}`, + message: `${error.response?.data?.error?.errors?.[0].message || error.response?.data?.error_description || ''}${messageExternalId ? ` for message with externalId: ${messageExternalId}` : ''}`, + }; + + throw parseGmailError(gmailError); + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service.ts deleted file mode 100644 index 96c1e4143..000000000 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service.ts +++ /dev/null @@ -1,263 +0,0 @@ -import { Injectable, Logger } from '@nestjs/common'; - -import addressparser from 'addressparser'; -import { AxiosResponse } from 'axios'; -import { gmail_v1 } from 'googleapis'; -import planer from 'planer'; - -import { MessagingGmailFetchByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-by-batch.service'; -import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; -import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util'; -import { assert, assertNotNull } from 'src/utils/assert'; - -@Injectable() -export class MessagingGmailFetchMessagesByBatchesService { - private readonly logger = new Logger( - MessagingGmailFetchMessagesByBatchesService.name, - ); - - constructor( - private readonly fetchByBatchesService: MessagingGmailFetchByBatchesService, - ) {} - - async fetchAllMessages( - messageIds: string[], - accessToken: string, - connectedAccountId: string, - workspaceId: string, - ): Promise { - let startTime = Date.now(); - - const { messageIdsByBatch, batchResponses } = - await this.fetchByBatchesService.fetchAllByBatches( - messageIds, - accessToken, - 'batch_gmail_messages', - ); - let endTime = Date.now(); - - this.logger.log( - `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} fetching ${ - messageIds.length - } messages in ${endTime - startTime}ms`, - ); - - startTime = Date.now(); - - const formattedResponse = this.formatBatchResponsesAsGmailMessages( - messageIdsByBatch, - batchResponses, - workspaceId, - connectedAccountId, - ); - - endTime = Date.now(); - - this.logger.log( - `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} formatting ${ - messageIds.length - } messages in ${endTime - startTime}ms`, - ); - - return formattedResponse; - } - - private formatBatchResponseAsGmailMessage( - messageIds: string[], - responseCollection: AxiosResponse, - workspaceId: string, - connectedAccountId: string, - ): GmailMessage[] { - const parsedResponses = - this.fetchByBatchesService.parseBatch(responseCollection); - - const sanitizeString = (str: string) => { - return str.replace(/\0/g, ''); - }; - - const formattedResponse = parsedResponses.map((response, index) => { - if ('error' in response) { - if (response.error.code === 404) { - return null; - } - - throw { ...response.error, messageId: messageIds[index] }; - } - - const { - historyId, - id, - threadId, - internalDate, - subject, - from, - to, - cc, - bcc, - headerMessageId, - text, - attachments, - deliveredTo, - } = this.parseGmailMessage(response); - - if (!from) { - this.logger.log( - `From value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, - ); - - return null; - } - - if (!to && !deliveredTo && !bcc && !cc) { - this.logger.log( - `To, Delivered-To, Bcc or Cc value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, - ); - - return null; - } - - if (!headerMessageId) { - this.logger.log( - `Message-ID is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, - ); - - return null; - } - - if (!threadId) { - this.logger.log( - `Thread Id is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, - ); - - return null; - } - - const participants = [ - ...formatAddressObjectAsParticipants(from, 'from'), - ...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'), - ...formatAddressObjectAsParticipants(cc, 'cc'), - ...formatAddressObjectAsParticipants(bcc, 'bcc'), - ]; - - let textWithoutReplyQuotations = text; - - if (text) { - textWithoutReplyQuotations = planer.extractFrom(text, 'text/plain'); - } - - const messageFromGmail: GmailMessage = { - historyId, - externalId: id, - headerMessageId, - subject: subject || '', - messageThreadExternalId: threadId, - internalDate, - fromHandle: from[0].address || '', - fromDisplayName: from[0].name || '', - participants, - text: sanitizeString(textWithoutReplyQuotations || ''), - attachments, - }; - - return messageFromGmail; - }); - - const filteredMessages = formattedResponse.filter((message) => - assertNotNull(message), - ) as GmailMessage[]; - - return filteredMessages; - } - - private formatBatchResponsesAsGmailMessages( - messageIdsByBatch: string[][], - batchResponses: AxiosResponse[], - workspaceId: string, - connectedAccountId: string, - ): GmailMessage[] { - const messageBatches = batchResponses.map((response, index) => { - return this.formatBatchResponseAsGmailMessage( - messageIdsByBatch[index], - response, - workspaceId, - connectedAccountId, - ); - }); - - return messageBatches.flat(); - } - - private parseGmailMessage(message: gmail_v1.Schema$Message) { - const subject = this.getPropertyFromHeaders(message, 'Subject'); - const rawFrom = this.getPropertyFromHeaders(message, 'From'); - const rawTo = this.getPropertyFromHeaders(message, 'To'); - const rawDeliveredTo = this.getPropertyFromHeaders(message, 'Delivered-To'); - const rawCc = this.getPropertyFromHeaders(message, 'Cc'); - const rawBcc = this.getPropertyFromHeaders(message, 'Bcc'); - const messageId = this.getPropertyFromHeaders(message, 'Message-ID'); - const id = message.id; - const threadId = message.threadId; - const historyId = message.historyId; - const internalDate = message.internalDate; - - assert(id, 'ID is missing'); - assert(historyId, 'History-ID is missing'); - assert(internalDate, 'Internal date is missing'); - - const bodyData = this.getBodyData(message); - const text = bodyData ? Buffer.from(bodyData, 'base64').toString() : ''; - - const attachments = this.getAttachmentData(message); - - return { - id, - headerMessageId: messageId, - threadId, - historyId, - internalDate, - subject, - from: rawFrom ? addressparser(rawFrom) : undefined, - deliveredTo: rawDeliveredTo ? addressparser(rawDeliveredTo) : undefined, - to: rawTo ? addressparser(rawTo) : undefined, - cc: rawCc ? addressparser(rawCc) : undefined, - bcc: rawBcc ? addressparser(rawBcc) : undefined, - text, - attachments, - }; - } - - private getBodyData(message: gmail_v1.Schema$Message) { - const firstPart = message.payload?.parts?.[0]; - - if (firstPart?.mimeType === 'text/plain') { - return firstPart?.body?.data; - } - - return firstPart?.parts?.find((part) => part.mimeType === 'text/plain') - ?.body?.data; - } - - private getAttachmentData(message: gmail_v1.Schema$Message) { - return ( - message.payload?.parts - ?.filter((part) => part.filename && part.body?.attachmentId) - .map((part) => ({ - filename: part.filename || '', - id: part.body?.attachmentId || '', - mimeType: part.mimeType || '', - size: part.body?.size || 0, - })) || [] - ); - } - - private getPropertyFromHeaders( - message: gmail_v1.Schema$Message, - property: string, - ) { - const header = message.payload?.headers?.find( - (header) => header.name?.toLowerCase() === property.toLowerCase(), - ); - - return header?.value; - } -} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-ids-to-exclude.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-ids-to-exclude.service.ts deleted file mode 100644 index 266553695..000000000 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-ids-to-exclude.service.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import { gmail_v1 } from 'googleapis'; - -import { MESSAGING_GMAIL_EXCLUDED_CATEGORIES } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories'; -import { MessagingGmailHistoryService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service'; -import { computeGmailCategoryLabelId } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id'; -import { assertNotNull } from 'src/utils/assert'; - -@Injectable() -export class MessagingGmailFetchMessageIdsToExcludeService { - constructor( - private readonly gmailGetHistoryService: MessagingGmailHistoryService, - ) {} - - public async fetchEmailIdsToExcludeOrThrow( - gmailClient: gmail_v1.Gmail, - lastSyncHistoryId: string, - ): Promise { - const emailIds: string[] = []; - - for (const category of MESSAGING_GMAIL_EXCLUDED_CATEGORIES) { - const { history, error } = await this.gmailGetHistoryService.getHistory( - gmailClient, - lastSyncHistoryId, - ['messageAdded'], - computeGmailCategoryLabelId(category), - ); - - if (error) { - throw error; - } - - const emailIdsFromCategory = history - .map((history) => history.messagesAdded) - .flat() - .map((message) => message?.message?.id) - .filter((id) => id) - .filter(assertNotNull); - - emailIds.push(...emailIdsFromCategory); - } - - return emailIds; - } -} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message-parsed-response.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message-parsed-response.type.ts similarity index 100% rename from packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message-parsed-response.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message-parsed-response.type.ts diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message.type.ts similarity index 100% rename from packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message.type.ts diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/__tests__/compute-gmail-category-excude-search-filter.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/__tests__/compute-gmail-category-excude-search-filter.spec.ts index 776ab3356..7f1635d2a 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/__tests__/compute-gmail-category-excude-search-filter.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/__tests__/compute-gmail-category-excude-search-filter.spec.ts @@ -1,4 +1,4 @@ -import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter'; +import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter.util'; describe('computeGmailCategoryExcludeSearchFilter', () => { it('should return correct exclude search filter with empty category array', () => { diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/__tests__/compute-gmail-category-label-id.spec.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/__tests__/compute-gmail-category-label-id.spec.ts index 5cacd83c9..91d663494 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/__tests__/compute-gmail-category-label-id.spec.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/__tests__/compute-gmail-category-label-id.spec.ts @@ -1,4 +1,4 @@ -import { computeGmailCategoryLabelId } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id'; +import { computeGmailCategoryLabelId } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id.util'; describe('computeGmailCategoryLabelId', () => { it('should return correct category label id', () => { diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter.util.ts similarity index 100% rename from packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter.util.ts diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id.util.ts similarity index 100% rename from packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-label-id.util.ts diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-message-direction.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-message-direction.util.ts new file mode 100644 index 000000000..77cf13fbf --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-message-direction.util.ts @@ -0,0 +1,13 @@ +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; + +export const computeMessageDirection = ( + fromHandle: string, + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'handle' | 'handleAliases' + >, +): 'outgoing' | 'incoming' => + connectedAccount.handle === fromHandle || + connectedAccount.handleAliases?.includes(fromHandle) + ? 'outgoing' + : 'incoming'; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/create-queries-from-message-ids.util.ts similarity index 100% rename from packages/twenty-server/src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util.ts rename to packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/create-queries-from-message-ids.util.ts diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/get-attachment-data.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/get-attachment-data.util.ts new file mode 100644 index 000000000..9f35813fb --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/get-attachment-data.util.ts @@ -0,0 +1,14 @@ +import { gmail_v1 as gmailV1 } from 'googleapis'; + +export const getAttachmentData = (message: gmailV1.Schema$Message) => { + return ( + message.payload?.parts + ?.filter((part) => part.filename && part.body?.attachmentId) + .map((part) => ({ + filename: part.filename ?? '', + id: part.body?.attachmentId ?? '', + mimeType: part.mimeType ?? '', + size: part.body?.size ?? 0, + })) ?? [] + ); +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/get-body-data.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/get-body-data.util.ts new file mode 100644 index 000000000..ec9d07c9d --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/get-body-data.util.ts @@ -0,0 +1,12 @@ +import { gmail_v1 as gmailV1 } from 'googleapis'; + +export const getBodyData = (message: gmailV1.Schema$Message) => { + const firstPart = message.payload?.parts?.[0]; + + if (firstPart?.mimeType === 'text/plain') { + return firstPart?.body?.data; + } + + return firstPart?.parts?.find((part) => part.mimeType === 'text/plain')?.body + ?.data; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/get-property-from-headers.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/get-property-from-headers.util.ts new file mode 100644 index 000000000..5d54288b9 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/get-property-from-headers.util.ts @@ -0,0 +1,12 @@ +import { gmail_v1 as gmailV1 } from 'googleapis'; + +export const getPropertyFromHeaders = ( + message: gmailV1.Schema$Message, + property: string, +) => { + const header = message.payload?.headers?.find( + (header) => header.name?.toLowerCase() === property.toLowerCase(), + ); + + return header?.value; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-and-format-gmail-message.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-and-format-gmail-message.util.ts new file mode 100644 index 000000000..f46f69eea --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-and-format-gmail-message.util.ts @@ -0,0 +1,64 @@ +import { gmail_v1 as gmailV1 } from 'googleapis'; +import planer from 'planer'; + +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { computeMessageDirection } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-message-direction.util'; +import { parseGmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gmail-message.util'; +import { sanitizeString } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/sanitize-string.util'; +import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message'; +import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util'; + +export const parseAndFormatGmailMessage = ( + message: gmailV1.Schema$Message, + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'handle' | 'handleAliases' + >, +): MessageWithParticipants | null => { + const { + id, + threadId, + internalDate, + subject, + from, + to, + cc, + bcc, + headerMessageId, + text, + attachments, + deliveredTo, + } = parseGmailMessage(message); + + if ( + !from || + (!to && !deliveredTo && !bcc && !cc) || + !headerMessageId || + !threadId + ) { + return null; + } + + const participants = [ + ...formatAddressObjectAsParticipants(from, 'from'), + ...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'), + ...formatAddressObjectAsParticipants(cc, 'cc'), + ...formatAddressObjectAsParticipants(bcc, 'bcc'), + ]; + + const textWithoutReplyQuotations = text + ? planer.extractFrom(text, 'text/plain') + : ''; + + return { + externalId: id, + headerMessageId, + subject: subject || '', + messageThreadExternalId: threadId, + receivedAt: new Date(parseInt(internalDate)), + direction: computeMessageDirection(from[0].address || '', connectedAccount), + participants, + text: sanitizeString(textWithoutReplyQuotations), + attachments, + }; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts new file mode 100644 index 000000000..221423ae3 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gaxios-error.util.ts @@ -0,0 +1,30 @@ +import { GaxiosError } from 'gaxios'; + +import { + MessageImportDriverException, + MessageImportDriverExceptionCode, +} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception'; + +export const parseGaxiosError = ( + error: GaxiosError, +): MessageImportDriverException => { + const { code } = error; + + switch (code) { + case 'ECONNRESET': + case 'ENOTFOUND': + case 'ECONNABORTED': + case 'ETIMEDOUT': + case 'ERR_NETWORK': + return new MessageImportDriverException( + error.message, + MessageImportDriverExceptionCode.TEMPORARY_ERROR, + ); + + default: + return new MessageImportDriverException( + error.message, + MessageImportDriverExceptionCode.UNKNOWN, + ); + } +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gmail-error.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gmail-error.util.ts new file mode 100644 index 000000000..ed52d8b11 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gmail-error.util.ts @@ -0,0 +1,88 @@ +import { + MessageImportDriverException, + MessageImportDriverExceptionCode, +} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception'; + +export const parseGmailError = (error: { + code?: number; + reason: string; + message: string; +}): MessageImportDriverException => { + const { code, reason, message } = error; + + switch (code) { + case 400: + if (reason === 'invalid_grant') { + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS, + ); + } + if (reason === 'failedPrecondition') { + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.TEMPORARY_ERROR, + ); + } + + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.UNKNOWN, + ); + + case 404: + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.NOT_FOUND, + ); + + case 429: + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.TEMPORARY_ERROR, + ); + + case 403: + if ( + reason === 'rateLimitExceeded' || + reason === 'userRateLimitExceeded' + ) { + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.TEMPORARY_ERROR, + ); + } else { + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS, + ); + } + + case 401: + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS, + ); + + case 500: + if (reason === 'backendError') { + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.TEMPORARY_ERROR, + ); + } else { + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.UNKNOWN, + ); + } + + default: + break; + } + + return new MessageImportDriverException( + message, + MessageImportDriverExceptionCode.UNKNOWN, + ); +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gmail-message.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gmail-message.util.ts new file mode 100644 index 000000000..1ba116744 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gmail-message.util.ts @@ -0,0 +1,47 @@ +import assert from 'assert'; + +import addressparser from 'addressparser'; +import { gmail_v1 } from 'googleapis'; + +import { getAttachmentData } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/get-attachment-data.util'; +import { getBodyData } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/get-body-data.util'; +import { getPropertyFromHeaders } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/get-property-from-headers.util'; + +export const parseGmailMessage = (message: gmail_v1.Schema$Message) => { + const subject = getPropertyFromHeaders(message, 'Subject'); + const rawFrom = getPropertyFromHeaders(message, 'From'); + const rawTo = getPropertyFromHeaders(message, 'To'); + const rawDeliveredTo = getPropertyFromHeaders(message, 'Delivered-To'); + const rawCc = getPropertyFromHeaders(message, 'Cc'); + const rawBcc = getPropertyFromHeaders(message, 'Bcc'); + const messageId = getPropertyFromHeaders(message, 'Message-ID'); + const id = message.id; + const threadId = message.threadId; + const historyId = message.historyId; + const internalDate = message.internalDate; + + assert(id, 'ID is missing'); + assert(historyId, 'History-ID is missing'); + assert(internalDate, 'Internal date is missing'); + + const bodyData = getBodyData(message); + const text = bodyData ? Buffer.from(bodyData, 'base64').toString() : ''; + + const attachments = getAttachmentData(message); + + return { + id, + headerMessageId: messageId, + threadId, + historyId, + internalDate, + subject, + from: rawFrom ? addressparser(rawFrom) : undefined, + deliveredTo: rawDeliveredTo ? addressparser(rawDeliveredTo) : undefined, + to: rawTo ? addressparser(rawTo) : undefined, + cc: rawCc ? addressparser(rawCc) : undefined, + bcc: rawBcc ? addressparser(rawBcc) : undefined, + text, + attachments, + }; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/sanitize-string.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/sanitize-string.util.ts new file mode 100644 index 000000000..b158fb888 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/sanitize-string.util.ts @@ -0,0 +1,3 @@ +export const sanitizeString = (str: string) => { + return str.replace(/\0/g, ''); +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/exceptions/message-import.exception.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/exceptions/message-import.exception.ts new file mode 100644 index 000000000..1363ddca7 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/exceptions/message-import.exception.ts @@ -0,0 +1,14 @@ +import { CustomException } from 'src/utils/custom-exception'; + +export class MessageImportException extends CustomException { + code: MessageImportExceptionCode; + constructor(message: string, code: MessageImportExceptionCode) { + super(message, code); + } +} + +export enum MessageImportExceptionCode { + UNKNOWN = 'UNKNOWN', + PROVIDER_NOT_SUPPORTED = 'PROVIDER_NOT_SUPPORTED', + MESSAGE_CHANNEL_NOT_FOUND = 'MESSAGE_CHANNEL_NOT_FOUND', +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index fc0d37413..a51bd35a0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -12,6 +12,7 @@ import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -35,6 +36,7 @@ export class MessagingMessageListFetchJob { private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly messagingTelemetryService: MessagingTelemetryService, private readonly twentyORMManager: TwentyORMManager, + private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, ) {} @Process(MessagingMessageListFetchJob.name) diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts index c35243f8e..f62dc3a1b 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts @@ -12,6 +12,7 @@ import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -28,9 +29,10 @@ export class MessagingMessagesImportJob { constructor( @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) private readonly connectedAccountRepository: ConnectedAccountRepository, - private readonly gmailFetchMessageContentFromCacheService: MessagingMessagesImportService, + private readonly messagingMessagesImportService: MessagingMessagesImportService, private readonly messagingTelemetryService: MessagingTelemetryService, private readonly twentyORMManager: TwentyORMManager, + private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, ) {} @Process(MessagingMessagesImportJob.name) @@ -92,7 +94,7 @@ export class MessagingMessagesImportJob { return; } - await this.gmailFetchMessageContentFromCacheService.processMessageBatchImport( + await this.messagingMessagesImportService.processMessageBatchImport( messageChannel, connectedAccount, workspaceId, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts index a2839cdd9..0388005c9 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts @@ -6,7 +6,7 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; +import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, @@ -25,7 +25,7 @@ export class MessagingOngoingStaleJob { private readonly logger = new Logger(MessagingOngoingStaleJob.name); constructor( private readonly twentyORMManager: TwentyORMManager, - private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, + private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService, ) {} @Process(MessagingOngoingStaleJob.name) @@ -57,12 +57,12 @@ export class MessagingOngoingStaleJob { switch (messageChannel.syncStage) { case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING: - await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch( + await this.messageChannelSyncStatusService.schedulePartialMessageListFetch( messageChannel.id, ); break; case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING: - await this.messagingChannelSyncStatusService.scheduleMessagesImport( + await this.messageChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, ); break; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts index d54fe13b5..b9042cd2f 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/messaging-import-manager.module.ts @@ -23,8 +23,10 @@ import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-impo import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job'; import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job'; import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener'; -import { MessagingErrorHandlingService } from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service'; +import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service'; +import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; +import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service'; import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service'; import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service'; import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service'; @@ -61,11 +63,13 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess MessagingMessageImportManagerMessageChannelListener, MessagingCleanCacheJob, MessagingMessageService, - MessagingErrorHandlingService, MessagingPartialMessageListFetchService, MessagingFullMessageListFetchService, MessagingMessagesImportService, MessagingSaveMessagesAndEnqueueContactCreationService, + MessagingGetMessageListService, + MessagingGetMessagesService, + MessageImportExceptionHandlerService, ], exports: [], }) diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/message-import-exception-handler.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/message-import-exception-handler.service.ts new file mode 100644 index 000000000..55cff5f16 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/message-import-exception-handler.service.ts @@ -0,0 +1,166 @@ +import { Injectable } from '@nestjs/common'; + +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { CALENDAR_THROTTLE_MAX_ATTEMPTS } from 'src/modules/calendar/calendar-event-import-manager/constants/calendar-throttle-max-attempts'; +import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + MessageImportDriverException, + MessageImportDriverExceptionCode, +} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception'; +import { + MessageImportException, + MessageImportExceptionCode, +} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; + +export enum MessageImportSyncStep { + FULL_MESSAGE_LIST_FETCH = 'FULL_MESSAGE_LIST_FETCH', + PARTIAL_MESSAGE_LIST_FETCH = 'PARTIAL_MESSAGE_LIST_FETCH', + MESSAGES_IMPORT = 'MESSAGES_IMPORT', +} + +@Injectable() +export class MessageImportExceptionHandlerService { + constructor( + private readonly twentyORMManager: TwentyORMManager, + private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService, + ) {} + + public async handleDriverException( + exception: MessageImportDriverException, + syncStep: MessageImportSyncStep, + messageChannel: Pick< + MessageChannelWorkspaceEntity, + 'id' | 'throttleFailureCount' + >, + workspaceId: string, + ): Promise { + switch (exception.code) { + case MessageImportDriverExceptionCode.NOT_FOUND: + await this.handleNotFoundException( + syncStep, + messageChannel, + workspaceId, + ); + break; + case MessageImportDriverExceptionCode.TEMPORARY_ERROR: + await this.handleTemporaryException( + syncStep, + messageChannel, + workspaceId, + ); + break; + case MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS: + await this.handleInsufficientPermissionsException( + messageChannel, + workspaceId, + ); + break; + case MessageImportDriverExceptionCode.UNKNOWN: + case MessageImportDriverExceptionCode.UNKNOWN_NETWORK_ERROR: + await this.handleUnknownException( + exception, + messageChannel, + workspaceId, + ); + break; + default: + throw exception; + } + } + + private async handleTemporaryException( + syncStep: MessageImportSyncStep, + messageChannel: Pick< + MessageChannelWorkspaceEntity, + 'id' | 'throttleFailureCount' + >, + workspaceId: string, + ): Promise { + if (messageChannel.throttleFailureCount >= CALENDAR_THROTTLE_MAX_ATTEMPTS) { + await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( + messageChannel.id, + workspaceId, + ); + + return; + } + + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + await messageChannelRepository.increment( + { + id: messageChannel.id, + }, + 'throttleFailureCount', + 1, + ); + + switch (syncStep) { + case MessageImportSyncStep.FULL_MESSAGE_LIST_FETCH: + await this.messageChannelSyncStatusService.scheduleFullMessageListFetch( + messageChannel.id, + ); + break; + + case MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH: + await this.messageChannelSyncStatusService.schedulePartialMessageListFetch( + messageChannel.id, + ); + break; + + case MessageImportSyncStep.MESSAGES_IMPORT: + await this.messageChannelSyncStatusService.scheduleMessagesImport( + messageChannel.id, + ); + break; + + default: + break; + } + } + + private async handleInsufficientPermissionsException( + messageChannel: Pick, + workspaceId: string, + ): Promise { + await this.messageChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport( + messageChannel.id, + workspaceId, + ); + } + + private async handleUnknownException( + exception: MessageImportDriverException, + messageChannel: Pick, + workspaceId: string, + ): Promise { + await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( + messageChannel.id, + workspaceId, + ); + + throw new MessageImportException( + `Unknown error occurred while importing messages for message channel ${messageChannel.id} in workspace ${workspaceId}: ${exception.message}`, + MessageImportExceptionCode.UNKNOWN, + ); + } + + private async handleNotFoundException( + syncStep: MessageImportSyncStep, + messageChannel: Pick, + workspaceId: string, + ): Promise { + if (syncStep === MessageImportSyncStep.FULL_MESSAGE_LIST_FETCH) { + return; + } + + await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch( + messageChannel.id, + workspaceId, + ); + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts deleted file mode 100644 index 2a6570961..000000000 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-error-handling.service.ts +++ /dev/null @@ -1,334 +0,0 @@ -import { Injectable } from '@nestjs/common'; - -import snakeCase from 'lodash.snakecase'; - -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts'; -import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; - -type SyncStep = - | 'partial-message-list-fetch' - | 'full-message-list-fetch' - | 'messages-import'; - -export type GmailError = { - code: number | string; - reason: string; -}; - -@Injectable() -export class MessagingErrorHandlingService { - constructor( - @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) - private readonly connectedAccountRepository: ConnectedAccountRepository, - private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, - private readonly messagingTelemetryService: MessagingTelemetryService, - private readonly twentyORMManager: TwentyORMManager, - ) {} - - public async handleGmailError( - error: GmailError, - syncStep: SyncStep, - messageChannel: MessageChannelWorkspaceEntity, - workspaceId: string, - ): Promise { - const { code, reason } = error; - - switch (code) { - case 400: - if (reason === 'invalid_grant') { - await this.handleInsufficientPermissions( - error, - syncStep, - messageChannel, - workspaceId, - ); - } - if (reason === 'failedPrecondition') { - await this.handleFailedPrecondition( - error, - syncStep, - messageChannel, - workspaceId, - ); - } else { - await this.handleUnknownError( - error, - syncStep, - messageChannel, - workspaceId, - ); - } - break; - case 404: - await this.handleNotFound(error, syncStep, messageChannel, workspaceId); - break; - - case 429: - await this.handleRateLimitExceeded( - error, - syncStep, - messageChannel, - workspaceId, - ); - break; - - case 403: - if ( - reason === 'rateLimitExceeded' || - reason === 'userRateLimitExceeded' - ) { - await this.handleRateLimitExceeded( - error, - syncStep, - messageChannel, - workspaceId, - ); - } else { - await this.handleInsufficientPermissions( - error, - syncStep, - messageChannel, - workspaceId, - ); - } - break; - - case 401: - await this.handleInsufficientPermissions( - error, - syncStep, - messageChannel, - workspaceId, - ); - break; - case 500: - if (reason === 'backendError') { - await this.handleRateLimitExceeded( - error, - syncStep, - messageChannel, - workspaceId, - ); - } else { - await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( - messageChannel.id, - workspaceId, - ); - throw new Error( - `Unhandled Gmail error code ${code} with reason ${reason}`, - ); - } - break; - case 'ECONNRESET': - case 'ENOTFOUND': - case 'ECONNABORTED': - case 'ETIMEDOUT': - case 'ERR_NETWORK': - // We are currently mixing up Gmail Error code (HTTP status) and axios error code (ECONNRESET) - - // In case of a network error, we should retry the request - await this.handleRateLimitExceeded( - error, - syncStep, - messageChannel, - workspaceId, - ); - break; - default: - await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( - messageChannel.id, - workspaceId, - ); - throw new Error( - `Unhandled Gmail error code ${code} with reason ${reason}`, - ); - } - } - - private async handleRateLimitExceeded( - error: GmailError, - syncStep: SyncStep, - messageChannel: MessageChannelWorkspaceEntity, - workspaceId: string, - ): Promise { - await this.messagingTelemetryService.track({ - eventName: `${snakeCase(syncStep)}.error.rate_limit_exceeded`, - workspaceId, - connectedAccountId: messageChannel.connectedAccountId, - messageChannelId: messageChannel.id, - message: `${error.code}: ${error.reason}`, - }); - - await this.handleThrottle(syncStep, messageChannel, workspaceId); - } - - private async handleFailedPrecondition( - error: GmailError, - syncStep: SyncStep, - messageChannel: MessageChannelWorkspaceEntity, - workspaceId: string, - ): Promise { - await this.messagingTelemetryService.track({ - eventName: `${snakeCase(syncStep)}.error.failed_precondition`, - workspaceId, - connectedAccountId: messageChannel.connectedAccountId, - messageChannelId: messageChannel.id, - message: `${error.code}: ${error.reason}`, - }); - - await this.handleThrottle(syncStep, messageChannel, workspaceId); - } - - private async handleInsufficientPermissions( - error: GmailError, - syncStep: SyncStep, - messageChannel: MessageChannelWorkspaceEntity, - workspaceId: string, - ): Promise { - await this.messagingTelemetryService.track({ - eventName: `${snakeCase(syncStep)}.error.insufficient_permissions`, - workspaceId, - connectedAccountId: messageChannel.connectedAccountId, - messageChannelId: messageChannel.id, - message: `${error.code}: ${error.reason}`, - }); - - await this.messagingChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport( - messageChannel.id, - workspaceId, - ); - - if (!messageChannel.connectedAccountId) { - throw new Error( - `Connected account ID is not defined for message channel ${messageChannel.id} in workspace ${workspaceId}`, - ); - } - - await this.connectedAccountRepository.updateAuthFailedAt( - messageChannel.connectedAccountId, - workspaceId, - ); - } - - private async handleNotFound( - error: GmailError, - syncStep: SyncStep, - messageChannel: MessageChannelWorkspaceEntity, - workspaceId: string, - ): Promise { - if (syncStep === 'messages-import') { - return; - } - - await this.messagingTelemetryService.track({ - eventName: `${snakeCase(syncStep)}.error.not_found`, - workspaceId, - connectedAccountId: messageChannel.connectedAccountId, - messageChannelId: messageChannel.id, - message: `404: ${error.reason}`, - }); - - await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( - messageChannel.id, - workspaceId, - ); - } - - private async handleThrottle( - syncStep: SyncStep, - messageChannel: MessageChannelWorkspaceEntity, - workspaceId: string, - ): Promise { - if ( - messageChannel.throttleFailureCount >= MESSAGING_THROTTLE_MAX_ATTEMPTS - ) { - await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( - messageChannel.id, - workspaceId, - ); - - return; - } - - await this.throttle(messageChannel, workspaceId); - - switch (syncStep) { - case 'full-message-list-fetch': - await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch( - messageChannel.id, - ); - break; - - case 'partial-message-list-fetch': - await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch( - messageChannel.id, - ); - break; - - case 'messages-import': - await this.messagingChannelSyncStatusService.scheduleMessagesImport( - messageChannel.id, - ); - break; - - default: - break; - } - } - - private async throttle( - messageChannel: MessageChannelWorkspaceEntity, - workspaceId: string, - ): Promise { - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - await messageChannelRepository.increment( - { - id: messageChannel.id, - }, - 'throttleFailureCount', - 1, - ); - - await this.messagingTelemetryService.track({ - eventName: 'message_channel.throttle', - workspaceId, - connectedAccountId: messageChannel.connectedAccountId, - messageChannelId: messageChannel.id, - message: `Increment throttle failure count to ${messageChannel.throttleFailureCount}`, - }); - } - - private async handleUnknownError( - error: GmailError, - syncStep: SyncStep, - messageChannel: MessageChannelWorkspaceEntity, - workspaceId: string, - ): Promise { - await this.messagingTelemetryService.track({ - eventName: `${snakeCase(syncStep)}.error.unknown`, - workspaceId, - connectedAccountId: messageChannel.connectedAccountId, - messageChannelId: messageChannel.id, - message: `${error.code}: ${error.reason}`, - }); - - await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( - messageChannel.id, - workspaceId, - ); - - throw new Error( - `Unhandled Gmail error code ${error.code} with reason ${error.reason}`, - ); - } -} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts index eca0aaedf..fbb0d3919 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts @@ -1,39 +1,30 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; -import { GaxiosResponse } from 'gaxios'; -import { gmail_v1 } from 'googleapis'; -import { Any, EntityManager } from 'typeorm'; +import { Any } from 'typeorm'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; +import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -import { MESSAGING_GMAIL_EXCLUDED_CATEGORIES } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories'; -import { MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-list-max-result.constant'; -import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider'; -import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter'; import { - GmailError, - MessagingErrorHandlingService, -} from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service'; + MessageImportExceptionHandlerService, + MessageImportSyncStep, +} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; +import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; @Injectable() export class MessagingFullMessageListFetchService { - private readonly logger = new Logger( - MessagingFullMessageListFetchService.name, - ); - constructor( - private readonly gmailClientProvider: MessagingGmailClientProvider, @InjectCacheStorage(CacheStorageNamespace.ModuleMessaging) private readonly cacheStorage: CacheStorageService, - private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, - private readonly gmailErrorHandlingService: MessagingErrorHandlingService, + private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService, private readonly twentyORMManager: TwentyORMManager, + private readonly messagingGetMessageListService: MessagingGetMessageListService, + private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, ) {} public async processMessageListFetch( @@ -41,205 +32,78 @@ export class MessagingFullMessageListFetchService { connectedAccount: ConnectedAccountWorkspaceEntity, workspaceId: string, ) { - await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing( - messageChannel.id, - ); - - const gmailClient: gmail_v1.Gmail = - await this.gmailClientProvider.getGmailClient(connectedAccount); - - const { error: gmailError } = await this.fetchAllMessageIdsAndStoreInCache( - gmailClient, - messageChannel.id, - workspaceId, - ); - - if (gmailError) { - await this.gmailErrorHandlingService.handleGmailError( - gmailError, - 'full-message-list-fetch', - messageChannel, - workspaceId, + try { + await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing( + messageChannel.id, ); - return; - } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - await messageChannelRepository.update( - { - id: messageChannel.id, - }, - { - throttleFailureCount: 0, - syncStageStartedAt: null, - }, - ); - - await this.messagingChannelSyncStatusService.scheduleMessagesImport( - messageChannel.id, - ); - } - - private async fetchAllMessageIdsAndStoreInCache( - gmailClient: gmail_v1.Gmail, - messageChannelId: string, - workspaceId: string, - transactionManager?: EntityManager, - ): Promise<{ error?: GmailError }> { - let pageToken: string | undefined; - let fetchedMessageIdsCount = 0; - let hasMoreMessages = true; - let firstMessageExternalId: string | undefined; - let response: GaxiosResponse; - - while (hasMoreMessages) { - try { - response = await gmailClient.users.messages.list({ - userId: 'me', - maxResults: MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT, - pageToken, - q: computeGmailCategoryExcludeSearchFilter( - MESSAGING_GMAIL_EXCLUDED_CATEGORIES, - ), - }); - } catch (error) { - return { - error: { - code: error.response?.status, - reason: error.response?.data?.error, - }, - }; - } - - if (response.data?.messages) { - const messageExternalIds = response.data.messages - .filter((message): message is { id: string } => message.id != null) - .map((message) => message.id); - - if (!firstMessageExternalId) { - firstMessageExternalId = messageExternalIds[0]; - } - - const messageChannelMessageAssociationRepository = - await this.twentyORMManager.getRepository( - 'messageChannelMessageAssociation', - ); - - const existingMessageChannelMessageAssociations = - await messageChannelMessageAssociationRepository.find( - { - where: { - messageChannelId, - messageExternalId: Any(messageExternalIds), - }, - }, - transactionManager, - ); - - const existingMessageChannelMessageAssociationsExternalIds = - existingMessageChannelMessageAssociations.map( - (messageChannelMessageAssociation) => - messageChannelMessageAssociation.messageExternalId, - ); - - const messageIdsToImport = messageExternalIds.filter( - (messageExternalId) => - !existingMessageChannelMessageAssociationsExternalIds.includes( - messageExternalId, - ), + const { messageExternalIds, nextSyncCursor } = + await this.messagingGetMessageListService.getFullMessageList( + connectedAccount, ); - if (messageIdsToImport.length) { - await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, - messageIdsToImport, - ); - } + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', + ); - fetchedMessageIdsCount += messageExternalIds.length; + const existingMessageChannelMessageAssociations = + await messageChannelMessageAssociationRepository.find({ + where: { + messageChannelId: messageChannel.id, + messageExternalId: Any(messageExternalIds), + }, + }); + + const existingMessageChannelMessageAssociationsExternalIds = + existingMessageChannelMessageAssociations.map( + (messageChannelMessageAssociation) => + messageChannelMessageAssociation.messageExternalId, + ); + + const messageIdsToImport = messageExternalIds.filter( + (messageExternalId) => + !existingMessageChannelMessageAssociationsExternalIds.includes( + messageExternalId, + ), + ); + + if (messageIdsToImport.length) { + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, + messageIdsToImport, + ); } - pageToken = response.data.nextPageToken ?? undefined; - hasMoreMessages = !!pageToken; - } + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); - this.logger.log( - `Added ${fetchedMessageIdsCount} messages ids from Gmail for messageChannel ${messageChannelId} in workspace ${workspaceId} and added to cache for import`, - ); - - if (!firstMessageExternalId) { - throw new Error( - `No first message found for workspace ${workspaceId} and account ${messageChannelId}, can't update sync external id`, - ); - } - - await this.updateLastSyncCursor( - gmailClient, - messageChannelId, - firstMessageExternalId, - workspaceId, - transactionManager, - ); - - return {}; - } - - private async updateLastSyncCursor( - gmailClient: gmail_v1.Gmail, - messageChannelId: string, - firstMessageExternalId: string, - workspaceId: string, - transactionManager?: EntityManager, - ) { - const firstMessageContent = await gmailClient.users.messages.get({ - userId: 'me', - id: firstMessageExternalId, - }); - - if (!firstMessageContent?.data) { - throw new Error( - `No first message content found for message ${firstMessageExternalId} in workspace ${workspaceId}`, - ); - } - - const historyId = firstMessageContent?.data?.historyId; - - if (!historyId) { - throw new Error( - `No historyId found for message ${firstMessageExternalId} in workspace ${workspaceId}`, - ); - } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - const messageChannel = await messageChannelRepository.findOneOrFail( - { - where: { - id: messageChannelId, - }, - }, - transactionManager, - ); - - const currentSyncCursor = messageChannel.syncCursor; - - if (!currentSyncCursor || historyId > currentSyncCursor) { await messageChannelRepository.update( { id: messageChannel.id, }, { - syncCursor: historyId, + throttleFailureCount: 0, + syncStageStartedAt: null, + syncCursor: + !messageChannel.syncCursor || + nextSyncCursor > messageChannel.syncCursor + ? nextSyncCursor + : messageChannel.syncCursor, }, - transactionManager, + ); + + await this.messageChannelSyncStatusService.scheduleMessagesImport( + messageChannel.id, + ); + } catch (error) { + await this.messageImportErrorHandlerService.handleDriverException( + error, + MessageImportSyncStep.FULL_MESSAGE_LIST_FETCH, + messageChannel, + workspaceId, ); } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts new file mode 100644 index 000000000..eca12dc5d --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-message-list.service.ts @@ -0,0 +1,66 @@ +import { Injectable } from '@nestjs/common'; + +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { GmailGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service'; +import { + MessageImportException, + MessageImportExceptionCode, +} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; + +export type GetFullMessageListResponse = { + messageExternalIds: string[]; + nextSyncCursor: string; +}; + +export type GetPartialMessageListResponse = { + messageExternalIds: string[]; + messageExternalIdsToDelete: string[]; + nextSyncCursor: string; +}; + +@Injectable() +export class MessagingGetMessageListService { + constructor( + private readonly gmailGetMessageListService: GmailGetMessageListService, + ) {} + + public async getFullMessageList( + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'provider' | 'refreshToken' | 'id' + >, + ): Promise { + switch (connectedAccount.provider) { + case 'google': + return this.gmailGetMessageListService.getFullMessageList( + connectedAccount, + ); + default: + throw new MessageImportException( + `Provider ${connectedAccount.provider} is not supported`, + MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, + ); + } + } + + public async getPartialMessageList( + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'provider' | 'refreshToken' | 'id' + >, + syncCursor: string, + ): Promise { + switch (connectedAccount.provider) { + case 'google': + return this.gmailGetMessageListService.getPartialMessageList( + connectedAccount, + syncCursor, + ); + default: + throw new MessageImportException( + `Provider ${connectedAccount.provider} is not supported`, + MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-messages.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-messages.service.ts new file mode 100644 index 000000000..bbd2e980d --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-get-messages.service.ts @@ -0,0 +1,46 @@ +import { Injectable } from '@nestjs/common'; + +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; +import { GmailGetMessagesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-messages.service'; +import { + MessageImportException, + MessageImportExceptionCode, +} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; +import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message'; + +export type GetMessagesResponse = MessageWithParticipants[]; + +@Injectable() +export class MessagingGetMessagesService { + constructor( + private readonly gmailGetMessagesService: GmailGetMessagesService, + ) {} + + public async getMessages( + messageIds: string[], + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + | 'provider' + | 'accessToken' + | 'refreshToken' + | 'id' + | 'handle' + | 'handleAliases' + >, + workspaceId: string, + ): Promise { + switch (connectedAccount.provider) { + case 'google': + return this.gmailGetMessagesService.getMessages( + messageIds, + connectedAccount, + workspaceId, + ); + default: + throw new MessageImportException( + `Provider ${connectedAccount.provider} is not supported`, + MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message.service.ts index 4a8468878..a69b92448 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-message.service.ts @@ -4,22 +4,17 @@ import { EntityManager } from 'typeorm'; import { v4 } from 'uuid'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity'; import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; -import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; +import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message'; @Injectable() export class MessagingMessageService { constructor(private readonly twentyORMManager: TwentyORMManager) {} public async saveMessagesWithinTransaction( - messages: GmailMessage[], - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'handle' | 'handleAliases' - >, + messages: MessageWithParticipants[], messageChannelId: string, transactionManager: EntityManager, ): Promise> { @@ -103,19 +98,13 @@ export class MessagingMessageService { const newMessageId = v4(); - const messageDirection = - connectedAccount.handle === message.fromHandle || - connectedAccount.handleAliases?.includes(message.fromHandle) - ? 'outgoing' - : 'incoming'; - await messageRepository.insert( { id: newMessageId, headerMessageId: message.headerMessageId, subject: message.subject, - receivedAt: new Date(parseInt(message.internalDate)), - direction: messageDirection, + receivedAt: message.receivedAt, + direction: message.direction, text: message.text, messageThreadId: newOrExistingMessageThreadId, }, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts index ace2f6a8c..dcb590d08 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts @@ -10,17 +10,22 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { EmailAliasManagerService } from 'src/modules/connected-account/email-alias-manager/services/email-alias-manager.service'; +import { RefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception'; import { RefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service'; -import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; +import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; import { MessageChannelSyncStage, MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageImportDriverExceptionCode } from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception'; import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant'; -import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service'; -import { MessagingErrorHandlingService } from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service'; +import { MessageImportExceptionCode } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; +import { + MessageImportExceptionHandlerService, + MessageImportSyncStep, +} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; +import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service'; import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service'; import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util'; import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service'; @@ -30,21 +35,19 @@ export class MessagingMessagesImportService { private readonly logger = new Logger(MessagingMessagesImportService.name); constructor( - private readonly fetchMessagesByBatchesService: MessagingGmailFetchMessagesByBatchesService, @InjectCacheStorage(CacheStorageNamespace.ModuleMessaging) private readonly cacheStorage: CacheStorageService, - private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, + private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService, private readonly saveMessagesAndEnqueueContactCreationService: MessagingSaveMessagesAndEnqueueContactCreationService, - private readonly gmailErrorHandlingService: MessagingErrorHandlingService, private readonly refreshAccessTokenService: RefreshAccessTokenService, private readonly messagingTelemetryService: MessagingTelemetryService, @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) private readonly blocklistRepository: BlocklistRepository, private readonly emailAliasManagerService: EmailAliasManagerService, private readonly isFeatureEnabledService: FeatureFlagService, - @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) - private readonly connectedAccountRepository: ConnectedAccountRepository, private readonly twentyORMManager: TwentyORMManager, + private readonly messagingGetMessagesService: MessagingGetMessagesService, + private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, ) {} async processMessageBatchImport( @@ -52,107 +55,95 @@ export class MessagingMessagesImportService { connectedAccount: ConnectedAccountWorkspaceEntity, workspaceId: string, ) { - if ( - messageChannel.syncStage !== - MessageChannelSyncStage.MESSAGES_IMPORT_PENDING - ) { - return; - } - - await this.messagingTelemetryService.track({ - eventName: 'messages_import.started', - workspaceId, - connectedAccountId: messageChannel.connectedAccountId, - messageChannelId: messageChannel.id, - }); - - this.logger.log( - `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`, - ); - - await this.messagingChannelSyncStatusService.markAsMessagesImportOngoing( - messageChannel.id, - ); - - let accessToken: string; + let messageIdsToFetch: string[] = []; try { - accessToken = - await this.refreshAccessTokenService.refreshAndSaveAccessToken( - connectedAccount, - workspaceId, - ); - } catch (error) { + if ( + messageChannel.syncStage !== + MessageChannelSyncStage.MESSAGES_IMPORT_PENDING + ) { + return; + } + await this.messagingTelemetryService.track({ - eventName: `refresh_token.error.insufficient_permissions`, + eventName: 'messages_import.started', workspaceId, connectedAccountId: messageChannel.connectedAccountId, messageChannelId: messageChannel.id, - message: `${error.code}: ${error.reason}`, }); - await this.messagingChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport( + this.logger.log( + `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`, + ); + + await this.messageChannelSyncStatusService.markAsMessagesImportOngoing( messageChannel.id, - workspaceId, ); - await this.connectedAccountRepository.updateAuthFailedAt( - messageChannel.connectedAccountId, - workspaceId, - ); - - return; - } - - if ( - await this.isFeatureEnabledService.isFeatureEnabled( - FeatureFlagKey.IsMessagingAliasFetchingEnabled, - workspaceId, - ) - ) { try { + connectedAccount.accessToken = + await this.refreshAccessTokenService.refreshAndSaveAccessToken( + connectedAccount, + workspaceId, + ); + } catch (error) { + switch (error.code) { + case (RefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED, + RefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND): + await this.messagingTelemetryService.track({ + eventName: `refresh_token.error.insufficient_permissions`, + workspaceId, + connectedAccountId: messageChannel.connectedAccountId, + messageChannelId: messageChannel.id, + message: `${error.code}: ${error.reason}`, + }); + throw { + code: MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS, + message: error.message, + }; + case RefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED: + throw { + code: MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED, + message: error.message, + }; + default: + throw error; + } + } + + if ( + await this.isFeatureEnabledService.isFeatureEnabled( + FeatureFlagKey.IsMessagingAliasFetchingEnabled, + workspaceId, + ) + ) { await this.emailAliasManagerService.refreshHandleAliases( connectedAccount, workspaceId, ); - } catch (error) { - await this.gmailErrorHandlingService.handleGmailError( - { - code: error.code, - reason: error.message, - }, - 'messages-import', + } + + messageIdsToFetch = await this.cacheStorage.setPop( + `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, + MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE, + ); + + if (!messageIdsToFetch?.length) { + await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( + messageChannel.id, + ); + + return await this.trackMessageImportCompleted( messageChannel, workspaceId, ); } - } - const messageIdsToFetch = - (await this.cacheStorage.setPop( - `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, - MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE, - )) ?? []; - - if (!messageIdsToFetch?.length) { - await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( - messageChannel.id, - ); - - return await this.trackMessageImportCompleted( - messageChannel, + const allMessages = await this.messagingGetMessagesService.getMessages( + messageIdsToFetch, + connectedAccount, workspaceId, ); - } - - try { - const allMessages = - await this.fetchMessagesByBatchesService.fetchAllMessages( - messageIdsToFetch, - accessToken, - connectedAccount.id, - workspaceId, - ); const blocklist = await this.blocklistRepository.getByWorkspaceMemberId( connectedAccount.accountOwnerId, @@ -175,11 +166,11 @@ export class MessagingMessagesImportService { if ( messageIdsToFetch.length < MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE ) { - await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( + await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( messageChannel.id, ); } else { - await this.messagingChannelSyncStatusService.scheduleMessagesImport( + await this.messageChannelSyncStatusService.scheduleMessagesImport( messageChannel.id, ); } @@ -204,30 +195,14 @@ export class MessagingMessagesImportService { workspaceId, ); } catch (error) { - this.logger.log( - `Messaging import for messageId ${ - error.messageId - }, workspace ${workspaceId} and connected account ${ - connectedAccount.id - } failed with error: ${JSON.stringify(error)}`, - ); - await this.cacheStorage.setAdd( `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, messageIdsToFetch, ); - if (error.code === undefined) { - // This should never happen as all errors must be known - throw error; - } - - await this.gmailErrorHandlingService.handleGmailError( - { - code: error.code, - reason: error.errors?.[0]?.reason, - }, - 'messages-import', + await this.messageImportErrorHandlerService.handleDriverException( + error, + MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH, messageChannel, workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts index 05189ded1..e1ec39c77 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts @@ -1,6 +1,5 @@ import { Injectable, Logger } from '@nestjs/common'; -import { gmail_v1 } from 'googleapis'; import { Any } from 'typeorm'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; @@ -8,13 +7,14 @@ import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decora import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; +import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider'; -import { MessagingGmailFetchMessageIdsToExcludeService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-ids-to-exclude.service'; -import { MessagingGmailHistoryService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service'; -import { MessagingErrorHandlingService } from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service'; +import { + MessageImportExceptionHandlerService, + MessageImportSyncStep, +} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service'; +import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service'; @Injectable() export class MessagingPartialMessageListFetchService { @@ -23,14 +23,12 @@ export class MessagingPartialMessageListFetchService { ); constructor( - private readonly gmailClientProvider: MessagingGmailClientProvider, @InjectCacheStorage(CacheStorageNamespace.ModuleMessaging) private readonly cacheStorage: CacheStorageService, - private readonly gmailErrorHandlingService: MessagingErrorHandlingService, - private readonly gmailGetHistoryService: MessagingGmailHistoryService, - private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService, - private readonly gmailFetchMessageIdsToExcludeService: MessagingGmailFetchMessageIdsToExcludeService, + private readonly messagingGetMessageListService: MessagingGetMessageListService, + private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService, private readonly twentyORMManager: TwentyORMManager, + private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, ) {} public async processMessageListFetch( @@ -38,129 +36,90 @@ export class MessagingPartialMessageListFetchService { connectedAccount: ConnectedAccountWorkspaceEntity, workspaceId: string, ): Promise { - await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing( - messageChannel.id, - ); - - const lastSyncHistoryId = messageChannel.syncCursor; - - const gmailClient: gmail_v1.Gmail = - await this.gmailClientProvider.getGmailClient(connectedAccount); - - const { history, historyId, error } = - await this.gmailGetHistoryService.getHistory( - gmailClient, - lastSyncHistoryId, - ); - - if (error) { - await this.gmailErrorHandlingService.handleGmailError( - error, - 'partial-message-list-fetch', - messageChannel, - workspaceId, - ); - - return; - } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - await messageChannelRepository.update( - { - id: messageChannel.id, - }, - { - throttleFailureCount: 0, - syncStageStartedAt: null, - }, - ); - - if (!historyId) { - throw new Error( - `No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`, - ); - } - - if (historyId === lastSyncHistoryId || !history?.length) { - this.logger.log( - `Partial message list import done with history ${historyId} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`, - ); - - await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( + try { + await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing( messageChannel.id, ); - return; - } - - const { messagesAdded, messagesDeleted } = - await this.gmailGetHistoryService.getMessageIdsFromHistory(history); - - let messageIdsToFilter: string[] = []; - - try { - messageIdsToFilter = - await this.gmailFetchMessageIdsToExcludeService.fetchEmailIdsToExcludeOrThrow( - gmailClient, - lastSyncHistoryId, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', ); - } catch (error) { - await this.gmailErrorHandlingService.handleGmailError( - error, - 'partial-message-list-fetch', - messageChannel, - workspaceId, - ); - return; - } - - const messagesAddedFiltered = messagesAdded.filter( - (messageId) => !messageIdsToFilter.includes(messageId), - ); - - await this.cacheStorage.setAdd( - `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, - messagesAddedFiltered, - ); - - this.logger.log( - `Added ${messagesAddedFiltered.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`, - ); - - const messageChannelMessageAssociationRepository = - await this.twentyORMManager.getRepository( - 'messageChannelMessageAssociation', - ); - - await messageChannelMessageAssociationRepository.delete({ - messageChannelId: messageChannel.id, - messageExternalId: Any(messagesDeleted), - }); - - this.logger.log( - `Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`, - ); - - const currentSyncCursor = messageChannel.syncCursor; - - if (!currentSyncCursor || historyId > currentSyncCursor) { await messageChannelRepository.update( { id: messageChannel.id, }, { - syncCursor: historyId, + throttleFailureCount: 0, + syncStageStartedAt: null, }, ); - } - await this.messagingChannelSyncStatusService.scheduleMessagesImport( - messageChannel.id, - ); + const syncCursor = messageChannel.syncCursor; + + const { messageExternalIds, messageExternalIdsToDelete, nextSyncCursor } = + await this.messagingGetMessageListService.getPartialMessageList( + connectedAccount, + syncCursor, + ); + + if (syncCursor === nextSyncCursor) { + this.logger.log( + `Partial message list import done with history ${syncCursor} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( + messageChannel.id, + ); + + return; + } + + await this.cacheStorage.setAdd( + `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, + messageExternalIds, + ); + + this.logger.log( + `Added ${messageExternalIds.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + const messageChannelMessageAssociationRepository = + await this.twentyORMManager.getRepository( + 'messageChannelMessageAssociation', + ); + + await messageChannelMessageAssociationRepository.delete({ + messageChannelId: messageChannel.id, + messageExternalId: Any(messageExternalIdsToDelete), + }); + + this.logger.log( + `Deleted ${messageExternalIdsToDelete.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`, + ); + + if (!syncCursor || nextSyncCursor > syncCursor) { + await messageChannelRepository.update( + { + id: messageChannel.id, + }, + { + syncCursor: nextSyncCursor, + }, + ); + } + + await this.messageChannelSyncStatusService.scheduleMessagesImport( + messageChannel.id, + ); + } catch (error) { + await this.messageImportErrorHandlerService.handleDriverException( + error, + MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH, + messageChannel, + workspaceId, + ); + } } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service.ts index c1a8bf3b4..79f93b3e0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service.ts @@ -5,6 +5,7 @@ import { EntityManager } from 'typeorm'; import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; +import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { @@ -16,15 +17,14 @@ import { MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { - GmailMessage, Participant, ParticipantWithMessageId, -} from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; +} from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message.type'; import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service'; +import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message'; import { MessagingMessageParticipantService } from 'src/modules/messaging/message-participant-manager/services/messaging-message-participant.service'; import { isGroupEmail } from 'src/utils/is-group-email'; import { isWorkEmail } from 'src/utils/is-work-email'; -import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; @Injectable() export class MessagingSaveMessagesAndEnqueueContactCreationService { @@ -37,7 +37,7 @@ export class MessagingSaveMessagesAndEnqueueContactCreationService { ) {} async saveMessagesAndEnqueueContactCreationJob( - messagesToSave: GmailMessage[], + messagesToSave: MessageWithParticipants[], messageChannel: MessageChannelWorkspaceEntity, connectedAccount: ConnectedAccountWorkspaceEntity, workspaceId: string, @@ -51,7 +51,6 @@ export class MessagingSaveMessagesAndEnqueueContactCreationService { const messageExternalIdsAndIdsMap = await this.messageService.saveMessagesWithinTransaction( messagesToSave, - connectedAccount, messageChannel.id, transactionManager, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/types/message.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/types/message.ts new file mode 100644 index 000000000..a58f67b47 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/types/message.ts @@ -0,0 +1,36 @@ +import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; +import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity'; + +export type Message = Omit< + MessageWorkspaceEntity, + | 'createdAt' + | 'updatedAt' + | 'messageChannelMessageAssociations' + | 'messageParticipants' + | 'messageThread' + | 'messageThreadId' + | 'id' +> & { + attachments: { + filename: string; + }[]; + externalId: string; + messageThreadExternalId: string; +}; + +export type MessageParticipant = Omit< + MessageParticipantWorkspaceEntity, + | 'id' + | 'createdAt' + | 'updatedAt' + | 'personId' + | 'workspaceMemberId' + | 'person' + | 'workspaceMember' + | 'message' + | 'messageId' +>; + +export type MessageWithParticipants = Message & { + participants: MessageParticipant[]; +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts index 1992eb45f..6641ae0a6 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts @@ -1,10 +1,10 @@ import { isEmailBlocklisted } from 'src/modules/blocklist/utils/is-email-blocklisted.util'; -import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; +import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message'; // Todo: refactor this into several utils export const filterEmails = ( messageChannelHandle: string, - messages: GmailMessage[], + messages: MessageWithParticipants[], blocklist: string[], ) => { return filterOutBlocklistedMessages( @@ -16,7 +16,7 @@ export const filterEmails = ( const filterOutBlocklistedMessages = ( messageChannelHandle: string, - messages: GmailMessage[], + messages: MessageWithParticipants[], blocklist: string[], ) => { return messages.filter((message) => { @@ -35,7 +35,7 @@ const filterOutBlocklistedMessages = ( }); }; -const filterOutIcsAttachments = (messages: GmailMessage[]) => { +const filterOutIcsAttachments = (messages: MessageWithParticipants[]) => { return messages.filter((message) => { if (!message.attachments) { return true; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util.ts index a6b9e9d26..03bed9460 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util.ts @@ -1,6 +1,6 @@ import addressparser from 'addressparser'; -import { Participant } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; +import { Participant } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message.type'; const formatAddressObjectAsArray = ( addressObject: addressparser.EmailAddress | addressparser.EmailAddress[], diff --git a/packages/twenty-server/src/modules/messaging/message-participant-manager/services/messaging-message-participant.service.ts b/packages/twenty-server/src/modules/messaging/message-participant-manager/services/messaging-message-participant.service.ts index 6f6ccccdd..4ccf40c39 100644 --- a/packages/twenty-server/src/modules/messaging/message-participant-manager/services/messaging-message-participant.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-participant-manager/services/messaging-message-participant.service.ts @@ -5,7 +5,7 @@ import { EntityManager } from 'typeorm'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { MatchParticipantService } from 'src/modules/match-participant/match-participant.service'; import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity'; -import { ParticipantWithMessageId } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; +import { ParticipantWithMessageId } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message.type'; @Injectable() export class MessagingMessageParticipantService {