[messaging] Add message deletion during partial sync (#4972)
## Context - Rename remaining V2 services. - Delete messages in DB when gmail history tells us they've been deleted. I removed the logic where we store those in a cache since it's a bit overkill because we don't need to query gmail and can use those ids directly. The strategy is to delete the message channel message association of the current channel, not the message or the thread since they can still be linked to other channels. However, we will need to call the threadCleaner service on the workspace to remove orphan threads/non-associated messages. Note: deletion for full-sync is a bit tricky because we need the full list of message ids to compare with the DB and make sure we don't over-delete. Currently, to keep memory, we don't have a variable that holds all ids as we flush it after each page. Easier solution would be to wipe everything before each full sync but it's probably not great for the user experience if they are currently manipulating messages since full-sync can happen without a user intervention (if a partial sync fails due to historyId being invalidated by google for some reason)
This commit is contained in:
@ -0,0 +1,33 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
|
||||
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
|
||||
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
|
||||
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
|
||||
import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata';
|
||||
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
|
||||
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
|
||||
import { GmailPartialSyncV2Service as GmailPartialSyncService } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service';
|
||||
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
|
||||
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
|
||||
import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module';
|
||||
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
MessagingProvidersModule,
|
||||
FetchMessagesByBatchesModule,
|
||||
ObjectMetadataRepositoryModule.forFeature([
|
||||
ConnectedAccountObjectMetadata,
|
||||
MessageChannelObjectMetadata,
|
||||
BlocklistObjectMetadata,
|
||||
]),
|
||||
MessageModule,
|
||||
SaveMessageAndEmitContactCreationEventModule,
|
||||
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
|
||||
WorkspaceDataSourceModule,
|
||||
],
|
||||
providers: [GmailPartialSyncService],
|
||||
exports: [GmailPartialSyncService],
|
||||
})
|
||||
export class GmailPartialSyncModule {}
|
||||
@ -0,0 +1,346 @@
|
||||
import { Inject, Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { gmail_v1 } from 'googleapis';
|
||||
import { EntityManager } from 'typeorm';
|
||||
|
||||
import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider';
|
||||
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
|
||||
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
|
||||
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
|
||||
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
|
||||
import {
|
||||
MessageChannelObjectMetadata,
|
||||
MessageChannelSyncStatus,
|
||||
} from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
|
||||
import { GMAIL_USERS_HISTORY_MAX_RESULT } from 'src/modules/messaging/constants/gmail-users-history-max-result.constant';
|
||||
import { GmailError } from 'src/modules/messaging/types/gmail-error';
|
||||
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
|
||||
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
|
||||
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
import {
|
||||
GmailFullSyncJob,
|
||||
GmailFullSyncJobData,
|
||||
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
|
||||
import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata';
|
||||
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository';
|
||||
|
||||
@Injectable()
|
||||
export class GmailPartialSyncV2Service {
|
||||
private readonly logger = new Logger(GmailPartialSyncV2Service.name);
|
||||
|
||||
constructor(
|
||||
private readonly gmailClientProvider: GmailClientProvider,
|
||||
@Inject(MessageQueue.messagingQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
@InjectObjectMetadataRepository(ConnectedAccountObjectMetadata)
|
||||
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
||||
@InjectObjectMetadataRepository(MessageChannelObjectMetadata)
|
||||
private readonly messageChannelRepository: MessageChannelRepository,
|
||||
@InjectCacheStorage(CacheStorageNamespace.Messaging)
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||
@InjectObjectMetadataRepository(
|
||||
MessageChannelMessageAssociationObjectMetadata,
|
||||
)
|
||||
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
|
||||
) {}
|
||||
|
||||
public async fetchConnectedAccountThreads(
|
||||
workspaceId: string,
|
||||
connectedAccountId: string,
|
||||
): Promise<void> {
|
||||
const connectedAccount = await this.connectedAccountRepository.getById(
|
||||
connectedAccountId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (!connectedAccount) {
|
||||
this.logger.error(
|
||||
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const refreshToken = connectedAccount.refreshToken;
|
||||
|
||||
if (!refreshToken) {
|
||||
throw new Error(
|
||||
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
|
||||
);
|
||||
}
|
||||
|
||||
const gmailMessageChannel =
|
||||
await this.messageChannelRepository.getFirstByConnectedAccountId(
|
||||
connectedAccountId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (!gmailMessageChannel) {
|
||||
this.logger.error(
|
||||
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (gmailMessageChannel.syncStatus !== MessageChannelSyncStatus.SUCCEEDED) {
|
||||
this.logger.log(
|
||||
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} is locked, import will be retried later.`,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannel.id,
|
||||
MessageChannelSyncStatus.ONGOING,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
const workspaceDataSource =
|
||||
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await workspaceDataSource
|
||||
?.transaction(async (transactionManager: EntityManager) => {
|
||||
const lastSyncHistoryId = gmailMessageChannel.syncCursor;
|
||||
|
||||
if (!lastSyncHistoryId) {
|
||||
this.logger.log(
|
||||
`No lastSyncHistoryId for workspace ${workspaceId} and account ${connectedAccountId}, falling back to full sync.`,
|
||||
);
|
||||
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannel.id,
|
||||
MessageChannelSyncStatus.PENDING,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
await this.fallbackToFullSync(workspaceId, connectedAccountId);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const gmailClient: gmail_v1.Gmail =
|
||||
await this.gmailClientProvider.getGmailClient(refreshToken);
|
||||
|
||||
const { history, historyId, error } = await this.getHistoryFromGmail(
|
||||
gmailClient,
|
||||
lastSyncHistoryId,
|
||||
);
|
||||
|
||||
if (error?.code === 404) {
|
||||
this.logger.log(
|
||||
`404: Invalid lastSyncHistoryId: ${lastSyncHistoryId} for workspace ${workspaceId} and account ${connectedAccountId}, falling back to full sync.`,
|
||||
);
|
||||
|
||||
await this.messageChannelRepository.resetSyncCursor(
|
||||
gmailMessageChannel.id,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannel.id,
|
||||
MessageChannelSyncStatus.PENDING,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
await this.fallbackToFullSync(workspaceId, connectedAccountId);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (error?.code === 429) {
|
||||
this.logger.log(
|
||||
`429: rate limit reached for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}, import will be retried later.`,
|
||||
);
|
||||
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannel.id,
|
||||
MessageChannelSyncStatus.PENDING,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (error) {
|
||||
throw new Error(
|
||||
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (!historyId) {
|
||||
throw new Error(
|
||||
`No historyId found for ${connectedAccountId} in workspace ${workspaceId} in gmail history response.`,
|
||||
);
|
||||
}
|
||||
|
||||
if (historyId === lastSyncHistoryId || !history?.length) {
|
||||
this.logger.log(
|
||||
`Messaging import done with history ${historyId} and nothing to update for workspace ${workspaceId} and account ${connectedAccountId}`,
|
||||
);
|
||||
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannel.id,
|
||||
MessageChannelSyncStatus.PENDING,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const { messagesAdded, messagesDeleted } =
|
||||
await this.getMessageIdsFromHistory(history);
|
||||
|
||||
await this.cacheStorage.setAdd(
|
||||
`messages-to-import:${workspaceId}:gmail:${gmailMessageChannel.id}`,
|
||||
messagesAdded,
|
||||
);
|
||||
|
||||
await this.messageChannelMessageAssociationRepository.deleteByMessageExternalIdsAndMessageChannelId(
|
||||
messagesDeleted,
|
||||
gmailMessageChannel.id,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
await this.messageChannelRepository.updateLastSyncCursorIfHigher(
|
||||
gmailMessageChannel.id,
|
||||
historyId,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannel.id,
|
||||
MessageChannelSyncStatus.PENDING,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
})
|
||||
.catch(async (error) => {
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannel.id,
|
||||
MessageChannelSyncStatus.FAILED,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
throw new Error(
|
||||
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private async getMessageIdsFromHistory(
|
||||
history: gmail_v1.Schema$History[],
|
||||
): Promise<{
|
||||
messagesAdded: string[];
|
||||
messagesDeleted: string[];
|
||||
}> {
|
||||
const { messagesAdded, messagesDeleted } = history.reduce(
|
||||
(
|
||||
acc: {
|
||||
messagesAdded: string[];
|
||||
messagesDeleted: string[];
|
||||
},
|
||||
history,
|
||||
) => {
|
||||
const messagesAdded = history.messagesAdded?.map(
|
||||
(messageAdded) => messageAdded.message?.id || '',
|
||||
);
|
||||
|
||||
const messagesDeleted = history.messagesDeleted?.map(
|
||||
(messageDeleted) => messageDeleted.message?.id || '',
|
||||
);
|
||||
|
||||
if (messagesAdded) acc.messagesAdded.push(...messagesAdded);
|
||||
if (messagesDeleted) acc.messagesDeleted.push(...messagesDeleted);
|
||||
|
||||
return acc;
|
||||
},
|
||||
{ messagesAdded: [], messagesDeleted: [] },
|
||||
);
|
||||
|
||||
const uniqueMessagesAdded = messagesAdded.filter(
|
||||
(messageId) => !messagesDeleted.includes(messageId),
|
||||
);
|
||||
|
||||
const uniqueMessagesDeleted = messagesDeleted.filter(
|
||||
(messageId) => !messagesAdded.includes(messageId),
|
||||
);
|
||||
|
||||
return {
|
||||
messagesAdded: uniqueMessagesAdded,
|
||||
messagesDeleted: uniqueMessagesDeleted,
|
||||
};
|
||||
}
|
||||
|
||||
private async getHistoryFromGmail(
|
||||
gmailClient: gmail_v1.Gmail,
|
||||
lastSyncHistoryId: string,
|
||||
): Promise<{
|
||||
history: gmail_v1.Schema$History[];
|
||||
historyId?: string | null;
|
||||
error?: GmailError;
|
||||
}> {
|
||||
const fullHistory: gmail_v1.Schema$History[] = [];
|
||||
let pageToken: string | undefined;
|
||||
let hasMoreMessages = true;
|
||||
let nextHistoryId: string | undefined;
|
||||
|
||||
while (hasMoreMessages) {
|
||||
try {
|
||||
const response = await gmailClient.users.history.list({
|
||||
userId: 'me',
|
||||
maxResults: GMAIL_USERS_HISTORY_MAX_RESULT,
|
||||
pageToken,
|
||||
startHistoryId: lastSyncHistoryId,
|
||||
historyTypes: ['messageAdded', 'messageDeleted'],
|
||||
});
|
||||
|
||||
nextHistoryId = response?.data?.historyId ?? undefined;
|
||||
|
||||
if (response?.data?.history) {
|
||||
fullHistory.push(...response.data.history);
|
||||
}
|
||||
|
||||
pageToken = response?.data?.nextPageToken ?? undefined;
|
||||
hasMoreMessages = !!pageToken;
|
||||
} catch (error) {
|
||||
const errorData = error?.response?.data?.error;
|
||||
|
||||
if (errorData) {
|
||||
return {
|
||||
history: [],
|
||||
error: errorData,
|
||||
historyId: lastSyncHistoryId,
|
||||
};
|
||||
}
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
return { history: fullHistory, historyId: nextHistoryId };
|
||||
}
|
||||
|
||||
private async fallbackToFullSync(
|
||||
workspaceId: string,
|
||||
connectedAccountId: string,
|
||||
) {
|
||||
await this.messageQueueService.add<GmailFullSyncJobData>(
|
||||
GmailFullSyncJob.name,
|
||||
{ workspaceId, connectedAccountId },
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user