[messaging] remove v2 feature flag (#4845)

## Context
We are now removing Messaging V2 feature flag to use it everywhere.

## Implementation
- renaming FetchWorkspaceMessagesCommandsModule to
MessagingCommandModule to make it more generic since it it hosts all
commands related to the messaging module
- creating a crons folder inside commands and jobs crons should be named
with xxx.cron.command.ts instead of xxx.command.ts. Same for jobs, jobs
should be named with xxx.cron.job.ts. In a future PR we should make sure
those CronJobs implement a CronJob interface since it's a bit different
(a CronJob does not contain a payload compared to a Job)
- Cron commands have been renamed to "cron:$module:command" so
`fetch-all-workspaces-messages-from-cache:cron:start` has been renamed
to `cron:messaging:gmail-fetch-messages-from-cache`. Also having to
create a command to stop the cron is a bit painful to maintain so I
removed them for now, this can be easily done manually with pg-boss or
bull-mq
- Removing full-sync and partial-sync commands as they were there for
testing only, we might put them back at some point but we will have to
adapt the code anyway.
- Feature flag has been removed from the MessageChannel standard object
to make sure those new columns are created during the next sync-metadata
This commit is contained in:
Weiko
2024-04-05 16:59:48 +02:00
committed by GitHub
parent e0918c89c1
commit f8da8f9805
25 changed files with 55 additions and 1185 deletions

View File

@ -1,31 +0,0 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service';
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module';
import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata';
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
@Module({
imports: [
MessagingProvidersModule,
FetchMessagesByBatchesModule,
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountObjectMetadata,
MessageChannelObjectMetadata,
MessageChannelMessageAssociationObjectMetadata,
BlocklistObjectMetadata,
]),
SaveMessageAndEmitContactCreationEventModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
],
providers: [GmailFullSyncService],
exports: [GmailFullSyncService],
})
export class GmailFullSyncModule {}

View File

@ -1,269 +0,0 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { FetchMessagesByBatchesService } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.service';
import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
GmailFullSyncJobData,
GmailFullSyncJob,
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/repositories/message-channel-message-association.repository';
import { createQueriesFromMessageIds } from 'src/modules/messaging/utils/create-queries-from-message-ids.util';
import { gmailSearchFilterExcludeEmails } from 'src/modules/messaging/utils/gmail-search-filter.util';
import { BlocklistRepository } from 'src/modules/connected-account/repositories/blocklist.repository';
import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata';
import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata';
@Injectable()
export class GmailFullSyncService {
private readonly logger = new Logger(GmailFullSyncService.name);
constructor(
private readonly gmailClientProvider: GmailClientProvider,
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@InjectObjectMetadataRepository(ConnectedAccountObjectMetadata)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelObjectMetadata)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationObjectMetadata,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
@InjectObjectMetadataRepository(BlocklistObjectMetadata)
private readonly blocklistRepository: BlocklistRepository,
private readonly saveMessagesAndEmitContactCreationEventService: SaveMessageAndEmitContactCreationEventService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
) {}
public async fetchConnectedAccountThreads(
workspaceId: string,
connectedAccountId: string,
nextPageToken?: string,
): Promise<void> {
const connectedAccount = await this.connectedAccountRepository.getById(
connectedAccountId,
workspaceId,
);
if (!connectedAccount) {
this.logger.error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId} during full-sync`,
);
return;
}
const accessToken = connectedAccount.accessToken;
const refreshToken = connectedAccount.refreshToken;
const workspaceMemberId = connectedAccount.accountOwnerId;
if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId} during full-sync`,
);
}
const gmailMessageChannel =
await this.messageChannelRepository.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);
if (!gmailMessageChannel) {
this.logger.error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId} during full-syn`,
);
return;
}
const gmailMessageChannelId = gmailMessageChannel.id;
const gmailClient =
await this.gmailClientProvider.getGmailClient(refreshToken);
const isBlocklistEnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId,
key: FeatureFlagKeys.IsBlocklistEnabled,
value: true,
});
const isBlocklistEnabled =
isBlocklistEnabledFeatureFlag && isBlocklistEnabledFeatureFlag.value;
const blocklist = isBlocklistEnabled
? await this.blocklistRepository.getByWorkspaceMemberId(
workspaceMemberId,
workspaceId,
)
: [];
const blocklistedEmails = blocklist.map((blocklist) => blocklist.handle);
let startTime = Date.now();
const messages = await gmailClient.users.messages.list({
userId: 'me',
maxResults: 500,
pageToken: nextPageToken,
q: gmailSearchFilterExcludeEmails(blocklistedEmails),
});
let endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} getting messages list in ${
endTime - startTime
}ms.`,
);
const messagesData = messages.data.messages;
const messageExternalIds = messagesData
? messagesData.map((message) => message.id || '')
: [];
if (!messageExternalIds || messageExternalIds?.length === 0) {
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`,
);
return;
}
startTime = Date.now();
const existingMessageChannelMessageAssociations =
await this.messageChannelMessageAssociationRepository.getByMessageExternalIdsAndMessageChannelId(
messageExternalIds,
gmailMessageChannelId,
workspaceId,
);
endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: getting existing message channel message associations in ${
endTime - startTime
}ms.`,
);
const existingMessageChannelMessageAssociationsExternalIds =
existingMessageChannelMessageAssociations.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageExternalId,
);
const messagesToFetch = messageExternalIds.filter(
(messageExternalId) =>
!existingMessageChannelMessageAssociationsExternalIds.includes(
messageExternalId,
),
);
const messageQueries = createQueriesFromMessageIds(messagesToFetch);
startTime = Date.now();
const { messages: messagesToSave, errors } =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
workspaceId,
connectedAccountId,
);
endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: fetching all messages in ${
endTime - startTime
}ms.`,
);
if (messagesToSave.length > 0) {
await this.saveMessagesAndEmitContactCreationEventService.saveMessagesAndEmitContactCreation(
messagesToSave,
connectedAccount,
workspaceId,
gmailMessageChannelId,
);
} else {
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`,
);
return;
}
if (errors.length) {
throw new Error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during full-sync`,
);
}
const lastModifiedMessageId = messagesToFetch[0];
const historyId = messagesToSave.find(
(message) => message.externalId === lastModifiedMessageId,
)?.historyId;
if (!historyId) {
throw new Error(
`No historyId found for ${connectedAccountId} in workspace ${workspaceId} during full-sync`,
);
}
startTime = Date.now();
await this.connectedAccountRepository.updateLastSyncHistoryIdIfHigher(
historyId,
connectedAccount.id,
workspaceId,
);
endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating last sync history id in ${
endTime - startTime
}ms.`,
);
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} ${
nextPageToken ? `and ${nextPageToken} pageToken` : ''
}done.`,
);
if (messages.data.nextPageToken) {
await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
{
workspaceId,
connectedAccountId,
nextPageToken: messages.data.nextPageToken,
},
{
retryLimit: 2,
},
);
}
}
}