6256 refactor messaging module to remove all provider specific code and put it inside the drivers folders (#6721)
Closes #6256 Closes #6257 + Create custom exceptions --------- Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
@ -0,0 +1,166 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { CALENDAR_THROTTLE_MAX_ATTEMPTS } from 'src/modules/calendar/calendar-event-import-manager/constants/calendar-throttle-max-attempts';
|
||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import {
|
||||
MessageImportDriverException,
|
||||
MessageImportDriverExceptionCode,
|
||||
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
|
||||
import {
|
||||
MessageImportException,
|
||||
MessageImportExceptionCode,
|
||||
} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception';
|
||||
|
||||
export enum MessageImportSyncStep {
|
||||
FULL_MESSAGE_LIST_FETCH = 'FULL_MESSAGE_LIST_FETCH',
|
||||
PARTIAL_MESSAGE_LIST_FETCH = 'PARTIAL_MESSAGE_LIST_FETCH',
|
||||
MESSAGES_IMPORT = 'MESSAGES_IMPORT',
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class MessageImportExceptionHandlerService {
|
||||
constructor(
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
|
||||
) {}
|
||||
|
||||
public async handleDriverException(
|
||||
exception: MessageImportDriverException,
|
||||
syncStep: MessageImportSyncStep,
|
||||
messageChannel: Pick<
|
||||
MessageChannelWorkspaceEntity,
|
||||
'id' | 'throttleFailureCount'
|
||||
>,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
switch (exception.code) {
|
||||
case MessageImportDriverExceptionCode.NOT_FOUND:
|
||||
await this.handleNotFoundException(
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
case MessageImportDriverExceptionCode.TEMPORARY_ERROR:
|
||||
await this.handleTemporaryException(
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
case MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS:
|
||||
await this.handleInsufficientPermissionsException(
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
case MessageImportDriverExceptionCode.UNKNOWN:
|
||||
case MessageImportDriverExceptionCode.UNKNOWN_NETWORK_ERROR:
|
||||
await this.handleUnknownException(
|
||||
exception,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
default:
|
||||
throw exception;
|
||||
}
|
||||
}
|
||||
|
||||
private async handleTemporaryException(
|
||||
syncStep: MessageImportSyncStep,
|
||||
messageChannel: Pick<
|
||||
MessageChannelWorkspaceEntity,
|
||||
'id' | 'throttleFailureCount'
|
||||
>,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
if (messageChannel.throttleFailureCount >= CALENDAR_THROTTLE_MAX_ATTEMPTS) {
|
||||
await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const messageChannelRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||
'messageChannel',
|
||||
);
|
||||
|
||||
await messageChannelRepository.increment(
|
||||
{
|
||||
id: messageChannel.id,
|
||||
},
|
||||
'throttleFailureCount',
|
||||
1,
|
||||
);
|
||||
|
||||
switch (syncStep) {
|
||||
case MessageImportSyncStep.FULL_MESSAGE_LIST_FETCH:
|
||||
await this.messageChannelSyncStatusService.scheduleFullMessageListFetch(
|
||||
messageChannel.id,
|
||||
);
|
||||
break;
|
||||
|
||||
case MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH:
|
||||
await this.messageChannelSyncStatusService.schedulePartialMessageListFetch(
|
||||
messageChannel.id,
|
||||
);
|
||||
break;
|
||||
|
||||
case MessageImportSyncStep.MESSAGES_IMPORT:
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
messageChannel.id,
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private async handleInsufficientPermissionsException(
|
||||
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
await this.messageChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
private async handleUnknownException(
|
||||
exception: MessageImportDriverException,
|
||||
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
throw new MessageImportException(
|
||||
`Unknown error occurred while importing messages for message channel ${messageChannel.id} in workspace ${workspaceId}: ${exception.message}`,
|
||||
MessageImportExceptionCode.UNKNOWN,
|
||||
);
|
||||
}
|
||||
|
||||
private async handleNotFoundException(
|
||||
syncStep: MessageImportSyncStep,
|
||||
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
if (syncStep === MessageImportSyncStep.FULL_MESSAGE_LIST_FETCH) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -1,334 +0,0 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import snakeCase from 'lodash.snakecase';
|
||||
|
||||
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
|
||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts';
|
||||
import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service';
|
||||
|
||||
type SyncStep =
|
||||
| 'partial-message-list-fetch'
|
||||
| 'full-message-list-fetch'
|
||||
| 'messages-import';
|
||||
|
||||
export type GmailError = {
|
||||
code: number | string;
|
||||
reason: string;
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class MessagingErrorHandlingService {
|
||||
constructor(
|
||||
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
|
||||
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
||||
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
|
||||
private readonly messagingTelemetryService: MessagingTelemetryService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
) {}
|
||||
|
||||
public async handleGmailError(
|
||||
error: GmailError,
|
||||
syncStep: SyncStep,
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
const { code, reason } = error;
|
||||
|
||||
switch (code) {
|
||||
case 400:
|
||||
if (reason === 'invalid_grant') {
|
||||
await this.handleInsufficientPermissions(
|
||||
error,
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
if (reason === 'failedPrecondition') {
|
||||
await this.handleFailedPrecondition(
|
||||
error,
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
} else {
|
||||
await this.handleUnknownError(
|
||||
error,
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
break;
|
||||
case 404:
|
||||
await this.handleNotFound(error, syncStep, messageChannel, workspaceId);
|
||||
break;
|
||||
|
||||
case 429:
|
||||
await this.handleRateLimitExceeded(
|
||||
error,
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
|
||||
case 403:
|
||||
if (
|
||||
reason === 'rateLimitExceeded' ||
|
||||
reason === 'userRateLimitExceeded'
|
||||
) {
|
||||
await this.handleRateLimitExceeded(
|
||||
error,
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
} else {
|
||||
await this.handleInsufficientPermissions(
|
||||
error,
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
break;
|
||||
|
||||
case 401:
|
||||
await this.handleInsufficientPermissions(
|
||||
error,
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
case 500:
|
||||
if (reason === 'backendError') {
|
||||
await this.handleRateLimitExceeded(
|
||||
error,
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
} else {
|
||||
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
throw new Error(
|
||||
`Unhandled Gmail error code ${code} with reason ${reason}`,
|
||||
);
|
||||
}
|
||||
break;
|
||||
case 'ECONNRESET':
|
||||
case 'ENOTFOUND':
|
||||
case 'ECONNABORTED':
|
||||
case 'ETIMEDOUT':
|
||||
case 'ERR_NETWORK':
|
||||
// We are currently mixing up Gmail Error code (HTTP status) and axios error code (ECONNRESET)
|
||||
|
||||
// In case of a network error, we should retry the request
|
||||
await this.handleRateLimitExceeded(
|
||||
error,
|
||||
syncStep,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
default:
|
||||
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
throw new Error(
|
||||
`Unhandled Gmail error code ${code} with reason ${reason}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async handleRateLimitExceeded(
|
||||
error: GmailError,
|
||||
syncStep: SyncStep,
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
await this.messagingTelemetryService.track({
|
||||
eventName: `${snakeCase(syncStep)}.error.rate_limit_exceeded`,
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccountId,
|
||||
messageChannelId: messageChannel.id,
|
||||
message: `${error.code}: ${error.reason}`,
|
||||
});
|
||||
|
||||
await this.handleThrottle(syncStep, messageChannel, workspaceId);
|
||||
}
|
||||
|
||||
private async handleFailedPrecondition(
|
||||
error: GmailError,
|
||||
syncStep: SyncStep,
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
await this.messagingTelemetryService.track({
|
||||
eventName: `${snakeCase(syncStep)}.error.failed_precondition`,
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccountId,
|
||||
messageChannelId: messageChannel.id,
|
||||
message: `${error.code}: ${error.reason}`,
|
||||
});
|
||||
|
||||
await this.handleThrottle(syncStep, messageChannel, workspaceId);
|
||||
}
|
||||
|
||||
private async handleInsufficientPermissions(
|
||||
error: GmailError,
|
||||
syncStep: SyncStep,
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
await this.messagingTelemetryService.track({
|
||||
eventName: `${snakeCase(syncStep)}.error.insufficient_permissions`,
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccountId,
|
||||
messageChannelId: messageChannel.id,
|
||||
message: `${error.code}: ${error.reason}`,
|
||||
});
|
||||
|
||||
await this.messagingChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (!messageChannel.connectedAccountId) {
|
||||
throw new Error(
|
||||
`Connected account ID is not defined for message channel ${messageChannel.id} in workspace ${workspaceId}`,
|
||||
);
|
||||
}
|
||||
|
||||
await this.connectedAccountRepository.updateAuthFailedAt(
|
||||
messageChannel.connectedAccountId,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
private async handleNotFound(
|
||||
error: GmailError,
|
||||
syncStep: SyncStep,
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
if (syncStep === 'messages-import') {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.messagingTelemetryService.track({
|
||||
eventName: `${snakeCase(syncStep)}.error.not_found`,
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccountId,
|
||||
messageChannelId: messageChannel.id,
|
||||
message: `404: ${error.reason}`,
|
||||
});
|
||||
|
||||
await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
private async handleThrottle(
|
||||
syncStep: SyncStep,
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
if (
|
||||
messageChannel.throttleFailureCount >= MESSAGING_THROTTLE_MAX_ATTEMPTS
|
||||
) {
|
||||
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.throttle(messageChannel, workspaceId);
|
||||
|
||||
switch (syncStep) {
|
||||
case 'full-message-list-fetch':
|
||||
await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch(
|
||||
messageChannel.id,
|
||||
);
|
||||
break;
|
||||
|
||||
case 'partial-message-list-fetch':
|
||||
await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch(
|
||||
messageChannel.id,
|
||||
);
|
||||
break;
|
||||
|
||||
case 'messages-import':
|
||||
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
|
||||
messageChannel.id,
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private async throttle(
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
const messageChannelRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||
'messageChannel',
|
||||
);
|
||||
|
||||
await messageChannelRepository.increment(
|
||||
{
|
||||
id: messageChannel.id,
|
||||
},
|
||||
'throttleFailureCount',
|
||||
1,
|
||||
);
|
||||
|
||||
await this.messagingTelemetryService.track({
|
||||
eventName: 'message_channel.throttle',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccountId,
|
||||
messageChannelId: messageChannel.id,
|
||||
message: `Increment throttle failure count to ${messageChannel.throttleFailureCount}`,
|
||||
});
|
||||
}
|
||||
|
||||
private async handleUnknownError(
|
||||
error: GmailError,
|
||||
syncStep: SyncStep,
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
await this.messagingTelemetryService.track({
|
||||
eventName: `${snakeCase(syncStep)}.error.unknown`,
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccountId,
|
||||
messageChannelId: messageChannel.id,
|
||||
message: `${error.code}: ${error.reason}`,
|
||||
});
|
||||
|
||||
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
throw new Error(
|
||||
`Unhandled Gmail error code ${error.code} with reason ${error.reason}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -1,39 +1,30 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { GaxiosResponse } from 'gaxios';
|
||||
import { gmail_v1 } from 'googleapis';
|
||||
import { Any, EntityManager } from 'typeorm';
|
||||
import { Any } from 'typeorm';
|
||||
|
||||
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
|
||||
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
|
||||
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
|
||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
|
||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import { MESSAGING_GMAIL_EXCLUDED_CATEGORIES } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories';
|
||||
import { MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-list-max-result.constant';
|
||||
import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider';
|
||||
import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter';
|
||||
import {
|
||||
GmailError,
|
||||
MessagingErrorHandlingService,
|
||||
} from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service';
|
||||
MessageImportExceptionHandlerService,
|
||||
MessageImportSyncStep,
|
||||
} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
|
||||
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
|
||||
@Injectable()
|
||||
export class MessagingFullMessageListFetchService {
|
||||
private readonly logger = new Logger(
|
||||
MessagingFullMessageListFetchService.name,
|
||||
);
|
||||
|
||||
constructor(
|
||||
private readonly gmailClientProvider: MessagingGmailClientProvider,
|
||||
@InjectCacheStorage(CacheStorageNamespace.ModuleMessaging)
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
|
||||
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
|
||||
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly messagingGetMessageListService: MessagingGetMessageListService,
|
||||
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
|
||||
) {}
|
||||
|
||||
public async processMessageListFetch(
|
||||
@ -41,205 +32,78 @@ export class MessagingFullMessageListFetchService {
|
||||
connectedAccount: ConnectedAccountWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
) {
|
||||
await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing(
|
||||
messageChannel.id,
|
||||
);
|
||||
|
||||
const gmailClient: gmail_v1.Gmail =
|
||||
await this.gmailClientProvider.getGmailClient(connectedAccount);
|
||||
|
||||
const { error: gmailError } = await this.fetchAllMessageIdsAndStoreInCache(
|
||||
gmailClient,
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (gmailError) {
|
||||
await this.gmailErrorHandlingService.handleGmailError(
|
||||
gmailError,
|
||||
'full-message-list-fetch',
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
try {
|
||||
await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing(
|
||||
messageChannel.id,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const messageChannelRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||
'messageChannel',
|
||||
);
|
||||
|
||||
await messageChannelRepository.update(
|
||||
{
|
||||
id: messageChannel.id,
|
||||
},
|
||||
{
|
||||
throttleFailureCount: 0,
|
||||
syncStageStartedAt: null,
|
||||
},
|
||||
);
|
||||
|
||||
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
|
||||
messageChannel.id,
|
||||
);
|
||||
}
|
||||
|
||||
private async fetchAllMessageIdsAndStoreInCache(
|
||||
gmailClient: gmail_v1.Gmail,
|
||||
messageChannelId: string,
|
||||
workspaceId: string,
|
||||
transactionManager?: EntityManager,
|
||||
): Promise<{ error?: GmailError }> {
|
||||
let pageToken: string | undefined;
|
||||
let fetchedMessageIdsCount = 0;
|
||||
let hasMoreMessages = true;
|
||||
let firstMessageExternalId: string | undefined;
|
||||
let response: GaxiosResponse<gmail_v1.Schema$ListMessagesResponse>;
|
||||
|
||||
while (hasMoreMessages) {
|
||||
try {
|
||||
response = await gmailClient.users.messages.list({
|
||||
userId: 'me',
|
||||
maxResults: MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT,
|
||||
pageToken,
|
||||
q: computeGmailCategoryExcludeSearchFilter(
|
||||
MESSAGING_GMAIL_EXCLUDED_CATEGORIES,
|
||||
),
|
||||
});
|
||||
} catch (error) {
|
||||
return {
|
||||
error: {
|
||||
code: error.response?.status,
|
||||
reason: error.response?.data?.error,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
if (response.data?.messages) {
|
||||
const messageExternalIds = response.data.messages
|
||||
.filter((message): message is { id: string } => message.id != null)
|
||||
.map((message) => message.id);
|
||||
|
||||
if (!firstMessageExternalId) {
|
||||
firstMessageExternalId = messageExternalIds[0];
|
||||
}
|
||||
|
||||
const messageChannelMessageAssociationRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
|
||||
'messageChannelMessageAssociation',
|
||||
);
|
||||
|
||||
const existingMessageChannelMessageAssociations =
|
||||
await messageChannelMessageAssociationRepository.find(
|
||||
{
|
||||
where: {
|
||||
messageChannelId,
|
||||
messageExternalId: Any(messageExternalIds),
|
||||
},
|
||||
},
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
const existingMessageChannelMessageAssociationsExternalIds =
|
||||
existingMessageChannelMessageAssociations.map(
|
||||
(messageChannelMessageAssociation) =>
|
||||
messageChannelMessageAssociation.messageExternalId,
|
||||
);
|
||||
|
||||
const messageIdsToImport = messageExternalIds.filter(
|
||||
(messageExternalId) =>
|
||||
!existingMessageChannelMessageAssociationsExternalIds.includes(
|
||||
messageExternalId,
|
||||
),
|
||||
const { messageExternalIds, nextSyncCursor } =
|
||||
await this.messagingGetMessageListService.getFullMessageList(
|
||||
connectedAccount,
|
||||
);
|
||||
|
||||
if (messageIdsToImport.length) {
|
||||
await this.cacheStorage.setAdd(
|
||||
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
|
||||
messageIdsToImport,
|
||||
);
|
||||
}
|
||||
const messageChannelMessageAssociationRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
|
||||
'messageChannelMessageAssociation',
|
||||
);
|
||||
|
||||
fetchedMessageIdsCount += messageExternalIds.length;
|
||||
const existingMessageChannelMessageAssociations =
|
||||
await messageChannelMessageAssociationRepository.find({
|
||||
where: {
|
||||
messageChannelId: messageChannel.id,
|
||||
messageExternalId: Any(messageExternalIds),
|
||||
},
|
||||
});
|
||||
|
||||
const existingMessageChannelMessageAssociationsExternalIds =
|
||||
existingMessageChannelMessageAssociations.map(
|
||||
(messageChannelMessageAssociation) =>
|
||||
messageChannelMessageAssociation.messageExternalId,
|
||||
);
|
||||
|
||||
const messageIdsToImport = messageExternalIds.filter(
|
||||
(messageExternalId) =>
|
||||
!existingMessageChannelMessageAssociationsExternalIds.includes(
|
||||
messageExternalId,
|
||||
),
|
||||
);
|
||||
|
||||
if (messageIdsToImport.length) {
|
||||
await this.cacheStorage.setAdd(
|
||||
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
|
||||
messageIdsToImport,
|
||||
);
|
||||
}
|
||||
|
||||
pageToken = response.data.nextPageToken ?? undefined;
|
||||
hasMoreMessages = !!pageToken;
|
||||
}
|
||||
const messageChannelRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||
'messageChannel',
|
||||
);
|
||||
|
||||
this.logger.log(
|
||||
`Added ${fetchedMessageIdsCount} messages ids from Gmail for messageChannel ${messageChannelId} in workspace ${workspaceId} and added to cache for import`,
|
||||
);
|
||||
|
||||
if (!firstMessageExternalId) {
|
||||
throw new Error(
|
||||
`No first message found for workspace ${workspaceId} and account ${messageChannelId}, can't update sync external id`,
|
||||
);
|
||||
}
|
||||
|
||||
await this.updateLastSyncCursor(
|
||||
gmailClient,
|
||||
messageChannelId,
|
||||
firstMessageExternalId,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
private async updateLastSyncCursor(
|
||||
gmailClient: gmail_v1.Gmail,
|
||||
messageChannelId: string,
|
||||
firstMessageExternalId: string,
|
||||
workspaceId: string,
|
||||
transactionManager?: EntityManager,
|
||||
) {
|
||||
const firstMessageContent = await gmailClient.users.messages.get({
|
||||
userId: 'me',
|
||||
id: firstMessageExternalId,
|
||||
});
|
||||
|
||||
if (!firstMessageContent?.data) {
|
||||
throw new Error(
|
||||
`No first message content found for message ${firstMessageExternalId} in workspace ${workspaceId}`,
|
||||
);
|
||||
}
|
||||
|
||||
const historyId = firstMessageContent?.data?.historyId;
|
||||
|
||||
if (!historyId) {
|
||||
throw new Error(
|
||||
`No historyId found for message ${firstMessageExternalId} in workspace ${workspaceId}`,
|
||||
);
|
||||
}
|
||||
|
||||
const messageChannelRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||
'messageChannel',
|
||||
);
|
||||
|
||||
const messageChannel = await messageChannelRepository.findOneOrFail(
|
||||
{
|
||||
where: {
|
||||
id: messageChannelId,
|
||||
},
|
||||
},
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
const currentSyncCursor = messageChannel.syncCursor;
|
||||
|
||||
if (!currentSyncCursor || historyId > currentSyncCursor) {
|
||||
await messageChannelRepository.update(
|
||||
{
|
||||
id: messageChannel.id,
|
||||
},
|
||||
{
|
||||
syncCursor: historyId,
|
||||
throttleFailureCount: 0,
|
||||
syncStageStartedAt: null,
|
||||
syncCursor:
|
||||
!messageChannel.syncCursor ||
|
||||
nextSyncCursor > messageChannel.syncCursor
|
||||
? nextSyncCursor
|
||||
: messageChannel.syncCursor,
|
||||
},
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
messageChannel.id,
|
||||
);
|
||||
} catch (error) {
|
||||
await this.messageImportErrorHandlerService.handleDriverException(
|
||||
error,
|
||||
MessageImportSyncStep.FULL_MESSAGE_LIST_FETCH,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,66 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { GmailGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service';
|
||||
import {
|
||||
MessageImportException,
|
||||
MessageImportExceptionCode,
|
||||
} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception';
|
||||
|
||||
export type GetFullMessageListResponse = {
|
||||
messageExternalIds: string[];
|
||||
nextSyncCursor: string;
|
||||
};
|
||||
|
||||
export type GetPartialMessageListResponse = {
|
||||
messageExternalIds: string[];
|
||||
messageExternalIdsToDelete: string[];
|
||||
nextSyncCursor: string;
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class MessagingGetMessageListService {
|
||||
constructor(
|
||||
private readonly gmailGetMessageListService: GmailGetMessageListService,
|
||||
) {}
|
||||
|
||||
public async getFullMessageList(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id'
|
||||
>,
|
||||
): Promise<GetFullMessageListResponse> {
|
||||
switch (connectedAccount.provider) {
|
||||
case 'google':
|
||||
return this.gmailGetMessageListService.getFullMessageList(
|
||||
connectedAccount,
|
||||
);
|
||||
default:
|
||||
throw new MessageImportException(
|
||||
`Provider ${connectedAccount.provider} is not supported`,
|
||||
MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public async getPartialMessageList(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id'
|
||||
>,
|
||||
syncCursor: string,
|
||||
): Promise<GetPartialMessageListResponse> {
|
||||
switch (connectedAccount.provider) {
|
||||
case 'google':
|
||||
return this.gmailGetMessageListService.getPartialMessageList(
|
||||
connectedAccount,
|
||||
syncCursor,
|
||||
);
|
||||
default:
|
||||
throw new MessageImportException(
|
||||
`Provider ${connectedAccount.provider} is not supported`,
|
||||
MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,46 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { GmailGetMessagesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-messages.service';
|
||||
import {
|
||||
MessageImportException,
|
||||
MessageImportExceptionCode,
|
||||
} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception';
|
||||
import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message';
|
||||
|
||||
export type GetMessagesResponse = MessageWithParticipants[];
|
||||
|
||||
@Injectable()
|
||||
export class MessagingGetMessagesService {
|
||||
constructor(
|
||||
private readonly gmailGetMessagesService: GmailGetMessagesService,
|
||||
) {}
|
||||
|
||||
public async getMessages(
|
||||
messageIds: string[],
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
| 'provider'
|
||||
| 'accessToken'
|
||||
| 'refreshToken'
|
||||
| 'id'
|
||||
| 'handle'
|
||||
| 'handleAliases'
|
||||
>,
|
||||
workspaceId: string,
|
||||
): Promise<GetMessagesResponse> {
|
||||
switch (connectedAccount.provider) {
|
||||
case 'google':
|
||||
return this.gmailGetMessagesService.getMessages(
|
||||
messageIds,
|
||||
connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
default:
|
||||
throw new MessageImportException(
|
||||
`Provider ${connectedAccount.provider} is not supported`,
|
||||
MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -4,22 +4,17 @@ import { EntityManager } from 'typeorm';
|
||||
import { v4 } from 'uuid';
|
||||
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
|
||||
import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity';
|
||||
import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity';
|
||||
import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
|
||||
import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message';
|
||||
|
||||
@Injectable()
|
||||
export class MessagingMessageService {
|
||||
constructor(private readonly twentyORMManager: TwentyORMManager) {}
|
||||
|
||||
public async saveMessagesWithinTransaction(
|
||||
messages: GmailMessage[],
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'handle' | 'handleAliases'
|
||||
>,
|
||||
messages: MessageWithParticipants[],
|
||||
messageChannelId: string,
|
||||
transactionManager: EntityManager,
|
||||
): Promise<Map<string, string>> {
|
||||
@ -103,19 +98,13 @@ export class MessagingMessageService {
|
||||
|
||||
const newMessageId = v4();
|
||||
|
||||
const messageDirection =
|
||||
connectedAccount.handle === message.fromHandle ||
|
||||
connectedAccount.handleAliases?.includes(message.fromHandle)
|
||||
? 'outgoing'
|
||||
: 'incoming';
|
||||
|
||||
await messageRepository.insert(
|
||||
{
|
||||
id: newMessageId,
|
||||
headerMessageId: message.headerMessageId,
|
||||
subject: message.subject,
|
||||
receivedAt: new Date(parseInt(message.internalDate)),
|
||||
direction: messageDirection,
|
||||
receivedAt: message.receivedAt,
|
||||
direction: message.direction,
|
||||
text: message.text,
|
||||
messageThreadId: newOrExistingMessageThreadId,
|
||||
},
|
||||
|
||||
@ -10,17 +10,22 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository';
|
||||
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
|
||||
import { EmailAliasManagerService } from 'src/modules/connected-account/email-alias-manager/services/email-alias-manager.service';
|
||||
import { RefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception';
|
||||
import { RefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service';
|
||||
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
|
||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||
import {
|
||||
MessageChannelSyncStage,
|
||||
MessageChannelWorkspaceEntity,
|
||||
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import { MessageImportDriverExceptionCode } from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
|
||||
import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant';
|
||||
import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service';
|
||||
import { MessagingErrorHandlingService } from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service';
|
||||
import { MessageImportExceptionCode } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception';
|
||||
import {
|
||||
MessageImportExceptionHandlerService,
|
||||
MessageImportSyncStep,
|
||||
} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
|
||||
import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service';
|
||||
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service';
|
||||
import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util';
|
||||
import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service';
|
||||
@ -30,21 +35,19 @@ export class MessagingMessagesImportService {
|
||||
private readonly logger = new Logger(MessagingMessagesImportService.name);
|
||||
|
||||
constructor(
|
||||
private readonly fetchMessagesByBatchesService: MessagingGmailFetchMessagesByBatchesService,
|
||||
@InjectCacheStorage(CacheStorageNamespace.ModuleMessaging)
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
|
||||
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
|
||||
private readonly saveMessagesAndEnqueueContactCreationService: MessagingSaveMessagesAndEnqueueContactCreationService,
|
||||
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
|
||||
private readonly refreshAccessTokenService: RefreshAccessTokenService,
|
||||
private readonly messagingTelemetryService: MessagingTelemetryService,
|
||||
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
|
||||
private readonly blocklistRepository: BlocklistRepository,
|
||||
private readonly emailAliasManagerService: EmailAliasManagerService,
|
||||
private readonly isFeatureEnabledService: FeatureFlagService,
|
||||
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
|
||||
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly messagingGetMessagesService: MessagingGetMessagesService,
|
||||
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
|
||||
) {}
|
||||
|
||||
async processMessageBatchImport(
|
||||
@ -52,107 +55,95 @@ export class MessagingMessagesImportService {
|
||||
connectedAccount: ConnectedAccountWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
) {
|
||||
if (
|
||||
messageChannel.syncStage !==
|
||||
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.messagingTelemetryService.track({
|
||||
eventName: 'messages_import.started',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccountId,
|
||||
messageChannelId: messageChannel.id,
|
||||
});
|
||||
|
||||
this.logger.log(
|
||||
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`,
|
||||
);
|
||||
|
||||
await this.messagingChannelSyncStatusService.markAsMessagesImportOngoing(
|
||||
messageChannel.id,
|
||||
);
|
||||
|
||||
let accessToken: string;
|
||||
let messageIdsToFetch: string[] = [];
|
||||
|
||||
try {
|
||||
accessToken =
|
||||
await this.refreshAccessTokenService.refreshAndSaveAccessToken(
|
||||
connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
} catch (error) {
|
||||
if (
|
||||
messageChannel.syncStage !==
|
||||
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
await this.messagingTelemetryService.track({
|
||||
eventName: `refresh_token.error.insufficient_permissions`,
|
||||
eventName: 'messages_import.started',
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccountId,
|
||||
messageChannelId: messageChannel.id,
|
||||
message: `${error.code}: ${error.reason}`,
|
||||
});
|
||||
|
||||
await this.messagingChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
|
||||
this.logger.log(
|
||||
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`,
|
||||
);
|
||||
|
||||
await this.messageChannelSyncStatusService.markAsMessagesImportOngoing(
|
||||
messageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.connectedAccountRepository.updateAuthFailedAt(
|
||||
messageChannel.connectedAccountId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (
|
||||
await this.isFeatureEnabledService.isFeatureEnabled(
|
||||
FeatureFlagKey.IsMessagingAliasFetchingEnabled,
|
||||
workspaceId,
|
||||
)
|
||||
) {
|
||||
try {
|
||||
connectedAccount.accessToken =
|
||||
await this.refreshAccessTokenService.refreshAndSaveAccessToken(
|
||||
connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
} catch (error) {
|
||||
switch (error.code) {
|
||||
case (RefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED,
|
||||
RefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND):
|
||||
await this.messagingTelemetryService.track({
|
||||
eventName: `refresh_token.error.insufficient_permissions`,
|
||||
workspaceId,
|
||||
connectedAccountId: messageChannel.connectedAccountId,
|
||||
messageChannelId: messageChannel.id,
|
||||
message: `${error.code}: ${error.reason}`,
|
||||
});
|
||||
throw {
|
||||
code: MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
|
||||
message: error.message,
|
||||
};
|
||||
case RefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED:
|
||||
throw {
|
||||
code: MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED,
|
||||
message: error.message,
|
||||
};
|
||||
default:
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
if (
|
||||
await this.isFeatureEnabledService.isFeatureEnabled(
|
||||
FeatureFlagKey.IsMessagingAliasFetchingEnabled,
|
||||
workspaceId,
|
||||
)
|
||||
) {
|
||||
await this.emailAliasManagerService.refreshHandleAliases(
|
||||
connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
} catch (error) {
|
||||
await this.gmailErrorHandlingService.handleGmailError(
|
||||
{
|
||||
code: error.code,
|
||||
reason: error.message,
|
||||
},
|
||||
'messages-import',
|
||||
}
|
||||
|
||||
messageIdsToFetch = await this.cacheStorage.setPop(
|
||||
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
|
||||
MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE,
|
||||
);
|
||||
|
||||
if (!messageIdsToFetch?.length) {
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
messageChannel.id,
|
||||
);
|
||||
|
||||
return await this.trackMessageImportCompleted(
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const messageIdsToFetch =
|
||||
(await this.cacheStorage.setPop(
|
||||
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
|
||||
MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE,
|
||||
)) ?? [];
|
||||
|
||||
if (!messageIdsToFetch?.length) {
|
||||
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
messageChannel.id,
|
||||
);
|
||||
|
||||
return await this.trackMessageImportCompleted(
|
||||
messageChannel,
|
||||
const allMessages = await this.messagingGetMessagesService.getMessages(
|
||||
messageIdsToFetch,
|
||||
connectedAccount,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
const allMessages =
|
||||
await this.fetchMessagesByBatchesService.fetchAllMessages(
|
||||
messageIdsToFetch,
|
||||
accessToken,
|
||||
connectedAccount.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
const blocklist = await this.blocklistRepository.getByWorkspaceMemberId(
|
||||
connectedAccount.accountOwnerId,
|
||||
@ -175,11 +166,11 @@ export class MessagingMessagesImportService {
|
||||
if (
|
||||
messageIdsToFetch.length < MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE
|
||||
) {
|
||||
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
messageChannel.id,
|
||||
);
|
||||
} else {
|
||||
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
messageChannel.id,
|
||||
);
|
||||
}
|
||||
@ -204,30 +195,14 @@ export class MessagingMessagesImportService {
|
||||
workspaceId,
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.log(
|
||||
`Messaging import for messageId ${
|
||||
error.messageId
|
||||
}, workspace ${workspaceId} and connected account ${
|
||||
connectedAccount.id
|
||||
} failed with error: ${JSON.stringify(error)}`,
|
||||
);
|
||||
|
||||
await this.cacheStorage.setAdd(
|
||||
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
|
||||
messageIdsToFetch,
|
||||
);
|
||||
|
||||
if (error.code === undefined) {
|
||||
// This should never happen as all errors must be known
|
||||
throw error;
|
||||
}
|
||||
|
||||
await this.gmailErrorHandlingService.handleGmailError(
|
||||
{
|
||||
code: error.code,
|
||||
reason: error.errors?.[0]?.reason,
|
||||
},
|
||||
'messages-import',
|
||||
await this.messageImportErrorHandlerService.handleDriverException(
|
||||
error,
|
||||
MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { gmail_v1 } from 'googleapis';
|
||||
import { Any } from 'typeorm';
|
||||
|
||||
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
|
||||
@ -8,13 +7,14 @@ import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decora
|
||||
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
|
||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
|
||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider';
|
||||
import { MessagingGmailFetchMessageIdsToExcludeService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-ids-to-exclude.service';
|
||||
import { MessagingGmailHistoryService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service';
|
||||
import { MessagingErrorHandlingService } from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service';
|
||||
import {
|
||||
MessageImportExceptionHandlerService,
|
||||
MessageImportSyncStep,
|
||||
} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
|
||||
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
|
||||
@Injectable()
|
||||
export class MessagingPartialMessageListFetchService {
|
||||
@ -23,14 +23,12 @@ export class MessagingPartialMessageListFetchService {
|
||||
);
|
||||
|
||||
constructor(
|
||||
private readonly gmailClientProvider: MessagingGmailClientProvider,
|
||||
@InjectCacheStorage(CacheStorageNamespace.ModuleMessaging)
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
|
||||
private readonly gmailGetHistoryService: MessagingGmailHistoryService,
|
||||
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
|
||||
private readonly gmailFetchMessageIdsToExcludeService: MessagingGmailFetchMessageIdsToExcludeService,
|
||||
private readonly messagingGetMessageListService: MessagingGetMessageListService,
|
||||
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
|
||||
) {}
|
||||
|
||||
public async processMessageListFetch(
|
||||
@ -38,129 +36,90 @@ export class MessagingPartialMessageListFetchService {
|
||||
connectedAccount: ConnectedAccountWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing(
|
||||
messageChannel.id,
|
||||
);
|
||||
|
||||
const lastSyncHistoryId = messageChannel.syncCursor;
|
||||
|
||||
const gmailClient: gmail_v1.Gmail =
|
||||
await this.gmailClientProvider.getGmailClient(connectedAccount);
|
||||
|
||||
const { history, historyId, error } =
|
||||
await this.gmailGetHistoryService.getHistory(
|
||||
gmailClient,
|
||||
lastSyncHistoryId,
|
||||
);
|
||||
|
||||
if (error) {
|
||||
await this.gmailErrorHandlingService.handleGmailError(
|
||||
error,
|
||||
'partial-message-list-fetch',
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const messageChannelRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||
'messageChannel',
|
||||
);
|
||||
|
||||
await messageChannelRepository.update(
|
||||
{
|
||||
id: messageChannel.id,
|
||||
},
|
||||
{
|
||||
throttleFailureCount: 0,
|
||||
syncStageStartedAt: null,
|
||||
},
|
||||
);
|
||||
|
||||
if (!historyId) {
|
||||
throw new Error(
|
||||
`No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`,
|
||||
);
|
||||
}
|
||||
|
||||
if (historyId === lastSyncHistoryId || !history?.length) {
|
||||
this.logger.log(
|
||||
`Partial message list import done with history ${historyId} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
||||
);
|
||||
|
||||
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
try {
|
||||
await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing(
|
||||
messageChannel.id,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const { messagesAdded, messagesDeleted } =
|
||||
await this.gmailGetHistoryService.getMessageIdsFromHistory(history);
|
||||
|
||||
let messageIdsToFilter: string[] = [];
|
||||
|
||||
try {
|
||||
messageIdsToFilter =
|
||||
await this.gmailFetchMessageIdsToExcludeService.fetchEmailIdsToExcludeOrThrow(
|
||||
gmailClient,
|
||||
lastSyncHistoryId,
|
||||
const messageChannelRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
||||
'messageChannel',
|
||||
);
|
||||
} catch (error) {
|
||||
await this.gmailErrorHandlingService.handleGmailError(
|
||||
error,
|
||||
'partial-message-list-fetch',
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const messagesAddedFiltered = messagesAdded.filter(
|
||||
(messageId) => !messageIdsToFilter.includes(messageId),
|
||||
);
|
||||
|
||||
await this.cacheStorage.setAdd(
|
||||
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
|
||||
messagesAddedFiltered,
|
||||
);
|
||||
|
||||
this.logger.log(
|
||||
`Added ${messagesAddedFiltered.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
||||
);
|
||||
|
||||
const messageChannelMessageAssociationRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
|
||||
'messageChannelMessageAssociation',
|
||||
);
|
||||
|
||||
await messageChannelMessageAssociationRepository.delete({
|
||||
messageChannelId: messageChannel.id,
|
||||
messageExternalId: Any(messagesDeleted),
|
||||
});
|
||||
|
||||
this.logger.log(
|
||||
`Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
||||
);
|
||||
|
||||
const currentSyncCursor = messageChannel.syncCursor;
|
||||
|
||||
if (!currentSyncCursor || historyId > currentSyncCursor) {
|
||||
await messageChannelRepository.update(
|
||||
{
|
||||
id: messageChannel.id,
|
||||
},
|
||||
{
|
||||
syncCursor: historyId,
|
||||
throttleFailureCount: 0,
|
||||
syncStageStartedAt: null,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
|
||||
messageChannel.id,
|
||||
);
|
||||
const syncCursor = messageChannel.syncCursor;
|
||||
|
||||
const { messageExternalIds, messageExternalIdsToDelete, nextSyncCursor } =
|
||||
await this.messagingGetMessageListService.getPartialMessageList(
|
||||
connectedAccount,
|
||||
syncCursor,
|
||||
);
|
||||
|
||||
if (syncCursor === nextSyncCursor) {
|
||||
this.logger.log(
|
||||
`Partial message list import done with history ${syncCursor} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
||||
);
|
||||
|
||||
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
||||
messageChannel.id,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.cacheStorage.setAdd(
|
||||
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
|
||||
messageExternalIds,
|
||||
);
|
||||
|
||||
this.logger.log(
|
||||
`Added ${messageExternalIds.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
||||
);
|
||||
|
||||
const messageChannelMessageAssociationRepository =
|
||||
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
|
||||
'messageChannelMessageAssociation',
|
||||
);
|
||||
|
||||
await messageChannelMessageAssociationRepository.delete({
|
||||
messageChannelId: messageChannel.id,
|
||||
messageExternalId: Any(messageExternalIdsToDelete),
|
||||
});
|
||||
|
||||
this.logger.log(
|
||||
`Deleted ${messageExternalIdsToDelete.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
||||
);
|
||||
|
||||
if (!syncCursor || nextSyncCursor > syncCursor) {
|
||||
await messageChannelRepository.update(
|
||||
{
|
||||
id: messageChannel.id,
|
||||
},
|
||||
{
|
||||
syncCursor: nextSyncCursor,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
await this.messageChannelSyncStatusService.scheduleMessagesImport(
|
||||
messageChannel.id,
|
||||
);
|
||||
} catch (error) {
|
||||
await this.messageImportErrorHandlerService.handleDriverException(
|
||||
error,
|
||||
MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import { EntityManager } from 'typeorm';
|
||||
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
|
||||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||
import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import {
|
||||
@ -16,15 +17,14 @@ import {
|
||||
MessageChannelWorkspaceEntity,
|
||||
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||
import {
|
||||
GmailMessage,
|
||||
Participant,
|
||||
ParticipantWithMessageId,
|
||||
} from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
|
||||
} from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message.type';
|
||||
import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service';
|
||||
import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message';
|
||||
import { MessagingMessageParticipantService } from 'src/modules/messaging/message-participant-manager/services/messaging-message-participant.service';
|
||||
import { isGroupEmail } from 'src/utils/is-group-email';
|
||||
import { isWorkEmail } from 'src/utils/is-work-email';
|
||||
import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
|
||||
|
||||
@Injectable()
|
||||
export class MessagingSaveMessagesAndEnqueueContactCreationService {
|
||||
@ -37,7 +37,7 @@ export class MessagingSaveMessagesAndEnqueueContactCreationService {
|
||||
) {}
|
||||
|
||||
async saveMessagesAndEnqueueContactCreationJob(
|
||||
messagesToSave: GmailMessage[],
|
||||
messagesToSave: MessageWithParticipants[],
|
||||
messageChannel: MessageChannelWorkspaceEntity,
|
||||
connectedAccount: ConnectedAccountWorkspaceEntity,
|
||||
workspaceId: string,
|
||||
@ -51,7 +51,6 @@ export class MessagingSaveMessagesAndEnqueueContactCreationService {
|
||||
const messageExternalIdsAndIdsMap =
|
||||
await this.messageService.saveMessagesWithinTransaction(
|
||||
messagesToSave,
|
||||
connectedAccount,
|
||||
messageChannel.id,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user