6255 move services from messaging common module into the correct module and refactor them (#6409)

Closes #6255 

- Move files from `messaging/common` into the correct module
- Remove common module between calendar and messaging
`calendar-messaging-participant-manager`
- Update and fix massaging and calendar participant matching
- Create `MatchParticipantModule`

---------

Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
bosiraphael
2024-07-27 12:29:02 +02:00
committed by GitHub
parent 3060eb4e1e
commit d0db3b765f
42 changed files with 398 additions and 540 deletions

View File

@ -0,0 +1,330 @@
import { Injectable } from '@nestjs/common';
import snakeCase from 'lodash.snakecase';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts';
import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service';
type SyncStep =
| 'partial-message-list-fetch'
| 'full-message-list-fetch'
| 'messages-import';
export type GmailError = {
code: number | string;
reason: string;
};
@Injectable()
export class MessagingErrorHandlingService {
constructor(
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly messagingTelemetryService: MessagingTelemetryService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
) {}
public async handleGmailError(
error: GmailError,
syncStep: SyncStep,
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
const { code, reason } = error;
switch (code) {
case 400:
if (reason === 'invalid_grant') {
await this.handleInsufficientPermissions(
error,
syncStep,
messageChannel,
workspaceId,
);
}
if (reason === 'failedPrecondition') {
await this.handleFailedPrecondition(
error,
syncStep,
messageChannel,
workspaceId,
);
} else {
await this.handleUnknownError(
error,
syncStep,
messageChannel,
workspaceId,
);
}
break;
case 404:
await this.handleNotFound(error, syncStep, messageChannel, workspaceId);
break;
case 429:
await this.handleRateLimitExceeded(
error,
syncStep,
messageChannel,
workspaceId,
);
break;
case 403:
if (
reason === 'rateLimitExceeded' ||
reason === 'userRateLimitExceeded'
) {
await this.handleRateLimitExceeded(
error,
syncStep,
messageChannel,
workspaceId,
);
} else {
await this.handleInsufficientPermissions(
error,
syncStep,
messageChannel,
workspaceId,
);
}
break;
case 401:
await this.handleInsufficientPermissions(
error,
syncStep,
messageChannel,
workspaceId,
);
break;
case 500:
if (reason === 'backendError') {
await this.handleRateLimitExceeded(
error,
syncStep,
messageChannel,
workspaceId,
);
} else {
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
messageChannel.id,
workspaceId,
);
throw new Error(
`Unhandled Gmail error code ${code} with reason ${reason}`,
);
}
break;
case 'ECONNRESET':
case 'ENOTFOUND':
case 'ECONNABORTED':
case 'ETIMEDOUT':
case 'ERR_NETWORK':
// We are currently mixing up Gmail Error code (HTTP status) and axios error code (ECONNRESET)
// In case of a network error, we should retry the request
await this.handleRateLimitExceeded(
error,
syncStep,
messageChannel,
workspaceId,
);
break;
default:
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
messageChannel.id,
workspaceId,
);
throw new Error(
`Unhandled Gmail error code ${code} with reason ${reason}`,
);
}
}
private async handleRateLimitExceeded(
error: GmailError,
syncStep: SyncStep,
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
await this.messagingTelemetryService.track({
eventName: `${snakeCase(syncStep)}.error.rate_limit_exceeded`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `${error.code}: ${error.reason}`,
});
await this.handleThrottle(syncStep, messageChannel, workspaceId);
}
private async handleFailedPrecondition(
error: GmailError,
syncStep: SyncStep,
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
await this.messagingTelemetryService.track({
eventName: `${snakeCase(syncStep)}.error.failed_precondition`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `${error.code}: ${error.reason}`,
});
await this.handleThrottle(syncStep, messageChannel, workspaceId);
}
private async handleInsufficientPermissions(
error: GmailError,
syncStep: SyncStep,
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
await this.messagingTelemetryService.track({
eventName: `${snakeCase(syncStep)}.error.insufficient_permissions`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `${error.code}: ${error.reason}`,
});
await this.messagingChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
messageChannel.id,
workspaceId,
);
if (!messageChannel.connectedAccountId) {
throw new Error(
`Connected account ID is not defined for message channel ${messageChannel.id} in workspace ${workspaceId}`,
);
}
await this.connectedAccountRepository.updateAuthFailedAt(
messageChannel.connectedAccountId,
workspaceId,
);
}
private async handleNotFound(
error: GmailError,
syncStep: SyncStep,
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
if (syncStep === 'messages-import') {
return;
}
await this.messagingTelemetryService.track({
eventName: `${snakeCase(syncStep)}.error.not_found`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `404: ${error.reason}`,
});
await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch(
messageChannel.id,
workspaceId,
);
}
private async handleThrottle(
syncStep: SyncStep,
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
if (
messageChannel.throttleFailureCount >= MESSAGING_THROTTLE_MAX_ATTEMPTS
) {
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
messageChannel.id,
workspaceId,
);
return;
}
await this.throttle(messageChannel, workspaceId);
switch (syncStep) {
case 'full-message-list-fetch':
await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch(
messageChannel.id,
workspaceId,
);
break;
case 'partial-message-list-fetch':
await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
break;
case 'messages-import':
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
break;
default:
break;
}
}
private async throttle(
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
await this.messageChannelRepository.incrementThrottleFailureCount(
messageChannel.id,
workspaceId,
);
await this.messagingTelemetryService.track({
eventName: 'message_channel.throttle',
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `Increment throttle failure count to ${messageChannel.throttleFailureCount}`,
});
}
private async handleUnknownError(
error: GmailError,
syncStep: SyncStep,
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
await this.messagingTelemetryService.track({
eventName: `${snakeCase(syncStep)}.error.unknown`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `${error.code}: ${error.reason}`,
});
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
messageChannel.id,
workspaceId,
);
throw new Error(
`Unhandled Gmail error code ${error.code} with reason ${error.reason}`,
);
}
}

View File

@ -0,0 +1,221 @@
import { Injectable, Logger } from '@nestjs/common';
import { GaxiosResponse } from 'gaxios';
import { gmail_v1 } from 'googleapis';
import { EntityManager } from 'typeorm';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MESSAGING_GMAIL_EXCLUDED_CATEGORIES } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories';
import { MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-list-max-result.constant';
import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider';
import { computeGmailCategoryExcludeSearchFilter } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-gmail-category-excude-search-filter';
import {
GmailError,
MessagingErrorHandlingService,
} from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service';
@Injectable()
export class MessagingFullMessageListFetchService {
private readonly logger = new Logger(
MessagingFullMessageListFetchService.name,
);
constructor(
private readonly gmailClientProvider: MessagingGmailClientProvider,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
) {}
public async processMessageListFetch(
messageChannel: MessageChannelWorkspaceEntity,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
) {
await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing(
messageChannel.id,
workspaceId,
);
const gmailClient: gmail_v1.Gmail =
await this.gmailClientProvider.getGmailClient(connectedAccount);
const { error: gmailError } = await this.fetchAllMessageIdsAndStoreInCache(
gmailClient,
messageChannel.id,
workspaceId,
);
if (gmailError) {
await this.gmailErrorHandlingService.handleGmailError(
gmailError,
'full-message-list-fetch',
messageChannel,
workspaceId,
);
return;
}
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannel.id,
workspaceId,
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannel.id,
workspaceId,
);
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
}
private async fetchAllMessageIdsAndStoreInCache(
gmailClient: gmail_v1.Gmail,
messageChannelId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<{ error?: GmailError }> {
let pageToken: string | undefined;
let fetchedMessageIdsCount = 0;
let hasMoreMessages = true;
let firstMessageExternalId: string | undefined;
let response: GaxiosResponse<gmail_v1.Schema$ListMessagesResponse>;
while (hasMoreMessages) {
try {
response = await gmailClient.users.messages.list({
userId: 'me',
maxResults: MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT,
pageToken,
q: computeGmailCategoryExcludeSearchFilter(
MESSAGING_GMAIL_EXCLUDED_CATEGORIES,
),
});
} catch (error) {
return {
error: {
code: error.response?.status,
reason: error.response?.data?.error,
},
};
}
if (response.data?.messages) {
const messageExternalIds = response.data.messages
.filter((message): message is { id: string } => message.id != null)
.map((message) => message.id);
if (!firstMessageExternalId) {
firstMessageExternalId = messageExternalIds[0];
}
const existingMessageChannelMessageAssociations =
await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId(
messageExternalIds,
messageChannelId,
workspaceId,
transactionManager,
);
const existingMessageChannelMessageAssociationsExternalIds =
existingMessageChannelMessageAssociations.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageExternalId,
);
const messageIdsToImport = messageExternalIds.filter(
(messageExternalId) =>
!existingMessageChannelMessageAssociationsExternalIds.includes(
messageExternalId,
),
);
if (messageIdsToImport.length) {
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
messageIdsToImport,
);
}
fetchedMessageIdsCount += messageExternalIds.length;
}
pageToken = response.data.nextPageToken ?? undefined;
hasMoreMessages = !!pageToken;
}
this.logger.log(
`Added ${fetchedMessageIdsCount} messages ids from Gmail for messageChannel ${messageChannelId} in workspace ${workspaceId} and added to cache for import`,
);
if (!firstMessageExternalId) {
throw new Error(
`No first message found for workspace ${workspaceId} and account ${messageChannelId}, can't update sync external id`,
);
}
await this.updateLastSyncCursor(
gmailClient,
messageChannelId,
firstMessageExternalId,
workspaceId,
transactionManager,
);
return {};
}
private async updateLastSyncCursor(
gmailClient: gmail_v1.Gmail,
messageChannelId: string,
firstMessageExternalId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const firstMessageContent = await gmailClient.users.messages.get({
userId: 'me',
id: firstMessageExternalId,
});
if (!firstMessageContent?.data) {
throw new Error(
`No first message content found for message ${firstMessageExternalId} in workspace ${workspaceId}`,
);
}
const historyId = firstMessageContent?.data?.historyId;
if (!historyId) {
throw new Error(
`No historyId found for message ${firstMessageExternalId} in workspace ${workspaceId}`,
);
}
await this.messageChannelRepository.updateLastSyncCursorIfHigher(
messageChannelId,
historyId,
workspaceId,
transactionManager,
);
}
}

View File

@ -0,0 +1,73 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { v4 } from 'uuid';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageThreadRepository } from 'src/modules/messaging/common/repositories/message-thread.repository';
import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity';
import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity';
@Injectable()
export class MessagingMessageThreadService {
constructor(
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
@InjectObjectMetadataRepository(MessageWorkspaceEntity)
private readonly messageRepository: MessageRepository,
@InjectObjectMetadataRepository(MessageThreadWorkspaceEntity)
private readonly messageThreadRepository: MessageThreadRepository,
) {}
public async saveMessageThreadOrReturnExistingMessageThread(
headerMessageId: string,
messageThreadExternalId: string,
workspaceId: string,
manager: EntityManager,
) {
// Check if message thread already exists via threadExternalId
const existingMessageChannelMessageAssociationByMessageThreadExternalId =
await this.messageChannelMessageAssociationRepository.getFirstByMessageThreadExternalId(
messageThreadExternalId,
workspaceId,
manager,
);
const existingMessageThread =
existingMessageChannelMessageAssociationByMessageThreadExternalId?.messageThreadId;
if (existingMessageThread) {
return Promise.resolve(existingMessageThread);
}
// Check if message thread already exists via existing message headerMessageId
const existingMessageWithSameHeaderMessageId =
await this.messageRepository.getFirstOrNullByHeaderMessageId(
headerMessageId,
workspaceId,
manager,
);
if (existingMessageWithSameHeaderMessageId) {
return Promise.resolve(
existingMessageWithSameHeaderMessageId.messageThreadId,
);
}
// If message thread does not exist, create new message thread
const newMessageThreadId = v4();
await this.messageThreadRepository.insert(
newMessageThreadId,
workspaceId,
manager,
);
return Promise.resolve(newMessageThreadId);
}
}

View File

@ -0,0 +1,233 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { v4 } from 'uuid';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageThreadRepository } from 'src/modules/messaging/common/repositories/message-thread.repository';
import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity';
import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity';
import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
import { MessagingMessageThreadService } from 'src/modules/messaging/message-import-manager/services/messaging-message-thread.service';
@Injectable()
export class MessagingMessageService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
@InjectObjectMetadataRepository(MessageWorkspaceEntity)
private readonly messageRepository: MessageRepository,
@InjectObjectMetadataRepository(MessageThreadWorkspaceEntity)
private readonly messageThreadRepository: MessageThreadRepository,
private readonly messageThreadService: MessagingMessageThreadService,
) {}
public async saveMessagesWithinTransaction(
messages: GmailMessage[],
connectedAccount: ConnectedAccountWorkspaceEntity,
gmailMessageChannelId: string,
workspaceId: string,
transactionManager: EntityManager,
): Promise<Map<string, string>> {
const messageExternalIdsAndIdsMap = new Map<string, string>();
for (const message of messages) {
const existingMessageChannelMessageAssociationsCount =
await this.messageChannelMessageAssociationRepository.countByMessageExternalIdsAndMessageChannelId(
[message.externalId],
gmailMessageChannelId,
workspaceId,
transactionManager,
);
if (existingMessageChannelMessageAssociationsCount > 0) {
continue;
}
// TODO: This does not handle all thread merging use cases and might create orphan threads.
const savedOrExistingMessageThreadId =
await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread(
message.headerMessageId,
message.messageThreadExternalId,
workspaceId,
transactionManager,
);
if (!savedOrExistingMessageThreadId) {
throw new Error(
`No message thread found for message ${message.headerMessageId} in workspace ${workspaceId} in saveMessages`,
);
}
const savedOrExistingMessageId =
await this.saveMessageOrReturnExistingMessage(
message,
savedOrExistingMessageThreadId,
connectedAccount,
workspaceId,
transactionManager,
);
messageExternalIdsAndIdsMap.set(
message.externalId,
savedOrExistingMessageId,
);
await this.messageChannelMessageAssociationRepository.insert(
gmailMessageChannelId,
savedOrExistingMessageId,
message.externalId,
savedOrExistingMessageThreadId,
message.messageThreadExternalId,
workspaceId,
transactionManager,
);
}
return messageExternalIdsAndIdsMap;
}
private async saveMessageOrReturnExistingMessage(
message: GmailMessage,
messageThreadId: string,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
manager: EntityManager,
): Promise<string> {
const existingMessage =
await this.messageRepository.getFirstOrNullByHeaderMessageId(
message.headerMessageId,
workspaceId,
);
const existingMessageId = existingMessage?.id;
if (existingMessageId) {
return Promise.resolve(existingMessageId);
}
const newMessageId = v4();
const messageDirection =
connectedAccount.handle === message.fromHandle ||
connectedAccount.handleAliases?.includes(message.fromHandle)
? 'outgoing'
: 'incoming';
const receivedAt = new Date(parseInt(message.internalDate));
await this.messageRepository.insert(
newMessageId,
message.headerMessageId,
message.subject,
receivedAt,
messageDirection,
messageThreadId,
message.text,
workspaceId,
manager,
);
return Promise.resolve(newMessageId);
}
public async deleteMessages(
messagesDeletedMessageExternalIds: string[],
gmailMessageChannelId: string,
workspaceId: string,
) {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
await workspaceDataSource?.transaction(async (manager: EntityManager) => {
const messageChannelMessageAssociationsToDelete =
await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId(
messagesDeletedMessageExternalIds,
gmailMessageChannelId,
workspaceId,
manager,
);
const messageChannelMessageAssociationIdsToDeleteIds =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.id,
);
await this.messageChannelMessageAssociationRepository.deleteByIds(
messageChannelMessageAssociationIdsToDeleteIds,
workspaceId,
manager,
);
const messageIdsFromMessageChannelMessageAssociationsToDelete =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.messageId,
);
const messageChannelMessageAssociationByMessageIds =
await this.messageChannelMessageAssociationRepository.getByMessageIds(
messageIdsFromMessageChannelMessageAssociationsToDelete,
workspaceId,
manager,
);
const messageIdsFromMessageChannelMessageAssociationByMessageIds =
messageChannelMessageAssociationByMessageIds.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageId,
);
const messageIdsToDelete =
messageIdsFromMessageChannelMessageAssociationsToDelete.filter(
(messageId) =>
!messageIdsFromMessageChannelMessageAssociationByMessageIds.includes(
messageId,
),
);
await this.messageRepository.deleteByIds(
messageIdsToDelete,
workspaceId,
manager,
);
const messageThreadIdsFromMessageChannelMessageAssociationsToDelete =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.messageThreadId,
);
const messagesByThreadIds =
await this.messageRepository.getByMessageThreadIds(
messageThreadIdsFromMessageChannelMessageAssociationsToDelete,
workspaceId,
manager,
);
const threadIdsToDelete =
messageThreadIdsFromMessageChannelMessageAssociationsToDelete.filter(
(threadId) =>
!messagesByThreadIds.find(
(message) => message.messageThreadId === threadId,
),
);
await this.messageThreadRepository.deleteByIds(
threadIdsToDelete,
workspaceId,
manager,
);
});
}
}

View File

@ -0,0 +1,253 @@
import { Injectable, Logger } from '@nestjs/common';
import { FeatureFlagKeys } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { IsFeatureEnabledService } from 'src/engine/core-modules/feature-flag/services/is-feature-enabled.service';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { EmailAliasManagerService } from 'src/modules/connected-account/email-alias-manager/services/email-alias-manager.service';
import { RefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant';
import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service';
import { MessagingErrorHandlingService } from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service';
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service';
import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util';
import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service';
@Injectable()
export class MessagingMessagesImportService {
private readonly logger = new Logger(MessagingMessagesImportService.name);
constructor(
private readonly fetchMessagesByBatchesService: MessagingGmailFetchMessagesByBatchesService,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly saveMessagesAndEnqueueContactCreationService: MessagingSaveMessagesAndEnqueueContactCreationService,
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
private readonly refreshAccessTokenService: RefreshAccessTokenService,
private readonly messagingTelemetryService: MessagingTelemetryService,
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
private readonly blocklistRepository: BlocklistRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly emailAliasManagerService: EmailAliasManagerService,
private readonly isFeatureEnabledService: IsFeatureEnabledService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
) {}
async processMessageBatchImport(
messageChannel: MessageChannelWorkspaceEntity,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
) {
if (
messageChannel.syncStage !==
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING
) {
return;
}
await this.messagingTelemetryService.track({
eventName: 'messages_import.started',
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
});
this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`,
);
await this.messagingChannelSyncStatusService.markAsMessagesImportOngoing(
messageChannel.id,
workspaceId,
);
let accessToken: string;
try {
accessToken =
await this.refreshAccessTokenService.refreshAndSaveAccessToken(
connectedAccount,
workspaceId,
);
} catch (error) {
await this.messagingTelemetryService.track({
eventName: `refresh_token.error.insufficient_permissions`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `${error.code}: ${error.reason}`,
});
await this.messagingChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
messageChannel.id,
workspaceId,
);
await this.connectedAccountRepository.updateAuthFailedAt(
messageChannel.connectedAccountId,
workspaceId,
);
return;
}
if (
await this.isFeatureEnabledService.isFeatureEnabled(
FeatureFlagKeys.IsMessagingAliasFetchingEnabled,
workspaceId,
)
) {
try {
await this.emailAliasManagerService.refreshHandleAliases(
connectedAccount,
workspaceId,
);
} catch (error) {
await this.gmailErrorHandlingService.handleGmailError(
{
code: error.code,
reason: error.message,
},
'messages-import',
messageChannel,
workspaceId,
);
}
}
const messageIdsToFetch =
(await this.cacheStorage.setPop(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE,
)) ?? [];
if (!messageIdsToFetch?.length) {
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
);
}
try {
const allMessages =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageIdsToFetch,
accessToken,
connectedAccount.id,
workspaceId,
);
const blocklist = await this.blocklistRepository.getByWorkspaceMemberId(
connectedAccount.accountOwnerId,
workspaceId,
);
const messagesToSave = filterEmails(
messageChannel.handle,
allMessages,
blocklist.map((blocklistItem) => blocklistItem.handle),
);
await this.saveMessagesAndEnqueueContactCreationService.saveMessagesAndEnqueueContactCreationJob(
messagesToSave,
messageChannel,
connectedAccount,
workspaceId,
);
if (
messageIdsToFetch.length < MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE
) {
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
} else {
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
}
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannel.id,
workspaceId,
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannel.id,
workspaceId,
);
return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
);
} catch (error) {
this.logger.log(
`Messaging import for messageId ${
error.messageId
}, workspace ${workspaceId} and connected account ${
connectedAccount.id
} failed with error: ${JSON.stringify(error)}`,
);
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
messageIdsToFetch,
);
if (error.code === undefined) {
// This should never happen as all errors must be known
throw error;
}
await this.gmailErrorHandlingService.handleGmailError(
{
code: error.code,
reason: error.errors?.[0]?.reason,
},
'messages-import',
messageChannel,
workspaceId,
);
return await this.trackMessageImportCompleted(
messageChannel,
workspaceId,
);
}
}
private async trackMessageImportCompleted(
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
) {
await this.messagingTelemetryService.track({
eventName: 'messages_import.completed',
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
});
}
}

View File

@ -0,0 +1,159 @@
import { Injectable, Logger } from '@nestjs/common';
import { gmail_v1 } from 'googleapis';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider';
import { MessagingGmailFetchMessageIdsToExcludeService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-ids-to-exclude.service';
import { MessagingGmailHistoryService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-history.service';
import { MessagingErrorHandlingService } from 'src/modules/messaging/message-import-manager/services/messaging-error-handling.service';
@Injectable()
export class MessagingPartialMessageListFetchService {
private readonly logger = new Logger(
MessagingPartialMessageListFetchService.name,
);
constructor(
private readonly gmailClientProvider: MessagingGmailClientProvider,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
private readonly gmailGetHistoryService: MessagingGmailHistoryService,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly gmailFetchMessageIdsToExcludeService: MessagingGmailFetchMessageIdsToExcludeService,
) {}
public async processMessageListFetch(
messageChannel: MessageChannelWorkspaceEntity,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
): Promise<void> {
await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing(
messageChannel.id,
workspaceId,
);
const lastSyncHistoryId = messageChannel.syncCursor;
const gmailClient: gmail_v1.Gmail =
await this.gmailClientProvider.getGmailClient(connectedAccount);
const { history, historyId, error } =
await this.gmailGetHistoryService.getHistory(
gmailClient,
lastSyncHistoryId,
);
if (error) {
await this.gmailErrorHandlingService.handleGmailError(
error,
'partial-message-list-fetch',
messageChannel,
workspaceId,
);
return;
}
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannel.id,
workspaceId,
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannel.id,
workspaceId,
);
if (!historyId) {
throw new Error(
`No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`,
);
}
if (historyId === lastSyncHistoryId || !history?.length) {
this.logger.log(
`Partial message list import done with history ${historyId} and nothing to update for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
return;
}
const { messagesAdded, messagesDeleted } =
await this.gmailGetHistoryService.getMessageIdsFromHistory(history);
let messageIdsToFilter: string[] = [];
try {
messageIdsToFilter =
await this.gmailFetchMessageIdsToExcludeService.fetchEmailIdsToExcludeOrThrow(
gmailClient,
lastSyncHistoryId,
);
} catch (error) {
await this.gmailErrorHandlingService.handleGmailError(
error,
'partial-message-list-fetch',
messageChannel,
workspaceId,
);
return;
}
const messagesAddedFiltered = messagesAdded.filter(
(messageId) => !messageIdsToFilter.includes(messageId),
);
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannel.id}`,
messagesAddedFiltered,
);
this.logger.log(
`Added ${messagesAddedFiltered.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messageChannelMessageAssociationRepository.deleteByMessageExternalIdsAndMessageChannelId(
messagesDeleted,
messageChannel.id,
workspaceId,
);
this.logger.log(
`Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messageChannelRepository.updateLastSyncCursorIfHigher(
messageChannel.id,
historyId,
workspaceId,
);
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
}
}

View File

@ -0,0 +1,129 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import {
CreateCompanyAndContactJob,
CreateCompanyAndContactJobData,
} from 'src/modules/contact-creation-manager/jobs/create-company-and-contact.job';
import {
MessageChannelContactAutoCreationPolicy,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
GmailMessage,
Participant,
ParticipantWithMessageId,
} from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service';
import { MessagingMessageParticipantService } from 'src/modules/messaging/message-participant-manager/services/messaging-message-participant.service';
import { isGroupEmail } from 'src/utils/is-group-email';
import { isWorkEmail } from 'src/utils/is-work-email';
@Injectable()
export class MessagingSaveMessagesAndEnqueueContactCreationService {
constructor(
@InjectMessageQueue(MessageQueue.contactCreationQueue)
private readonly messageQueueService: MessageQueueService,
private readonly messageService: MessagingMessageService,
private readonly messageParticipantService: MessagingMessageParticipantService,
private readonly twentyORMManager: TwentyORMManager,
) {}
async saveMessagesAndEnqueueContactCreationJob(
messagesToSave: GmailMessage[],
messageChannel: MessageChannelWorkspaceEntity,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
) {
const handleAliases = connectedAccount.handleAliases?.split(',') || [];
const workspaceDataSource = await this.twentyORMManager.getDatasource();
const participantsWithMessageId = await workspaceDataSource?.transaction(
async (transactionManager: EntityManager) => {
const messageExternalIdsAndIdsMap =
await this.messageService.saveMessagesWithinTransaction(
messagesToSave,
connectedAccount,
messageChannel.id,
workspaceId,
transactionManager,
);
const participantsWithMessageId: (ParticipantWithMessageId & {
shouldCreateContact: boolean;
})[] = messagesToSave.flatMap((message) => {
const messageId = messageExternalIdsAndIdsMap.get(message.externalId);
return messageId
? message.participants.map((participant: Participant) => {
const fromHandle =
message.participants.find((p) => p.role === 'from')?.handle ||
'';
const isMessageSentByConnectedAccount =
handleAliases.includes(fromHandle) ||
fromHandle === connectedAccount.handle;
const isParticipantConnectedAccount =
handleAliases.includes(participant.handle) ||
participant.handle === connectedAccount.handle;
const isExcludedByNonProfessionalEmails =
messageChannel.excludeNonProfessionalEmails &&
!isWorkEmail(participant.handle);
const isExcludedByGroupEmails =
messageChannel.excludeGroupEmails &&
isGroupEmail(participant.handle);
const shouldCreateContact =
!isParticipantConnectedAccount &&
!isExcludedByNonProfessionalEmails &&
!isExcludedByGroupEmails &&
(messageChannel.contactAutoCreationPolicy ===
MessageChannelContactAutoCreationPolicy.SENT_AND_RECEIVED ||
(messageChannel.contactAutoCreationPolicy ===
MessageChannelContactAutoCreationPolicy.SENT &&
isMessageSentByConnectedAccount));
return {
...participant,
messageId,
shouldCreateContact,
};
})
: [];
});
await this.messageParticipantService.saveMessageParticipants(
participantsWithMessageId,
transactionManager,
);
return participantsWithMessageId;
},
);
if (messageChannel.isContactAutoCreationEnabled) {
const contactsToCreate = participantsWithMessageId.filter(
(participant) => participant.shouldCreateContact,
);
await this.messageQueueService.add<CreateCompanyAndContactJobData>(
CreateCompanyAndContactJob.name,
{
workspaceId,
connectedAccount,
contactsToCreate,
},
);
}
}
}