5951 create a command to trigger the import of a single message (#5962)

Closes #5951

---------

Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
bosiraphael
2024-06-24 18:01:22 +02:00
committed by GitHub
parent f3701281e9
commit a001bf1514
6 changed files with 214 additions and 96 deletions

View File

@ -5,25 +5,31 @@ import { AxiosResponse } from 'axios';
import { GmailMessageParsedResponse } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message-parsed-response'; import { GmailMessageParsedResponse } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message-parsed-response';
import { BatchQueries } from 'src/modules/messaging/message-import-manager/types/batch-queries'; import { BatchQueries } from 'src/modules/messaging/message-import-manager/types/batch-queries';
import { createQueriesFromMessageIds } from 'src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util';
@Injectable() @Injectable()
export class MessagingFetchByBatchesService { export class MessagingFetchByBatchesService {
constructor(private readonly httpService: HttpService) {} constructor(private readonly httpService: HttpService) {}
async fetchAllByBatches( async fetchAllByBatches(
queries: BatchQueries, messageIds: string[],
accessToken: string, accessToken: string,
boundary: string, boundary: string,
): Promise<AxiosResponse<any, any>[]> { ): Promise<{
messageIdsByBatch: string[][];
batchResponses: AxiosResponse<any, any>[];
}> {
const batchLimit = 50; const batchLimit = 50;
let batchOffset = 0; let batchOffset = 0;
let batchResponses: AxiosResponse<any, any>[] = []; let batchResponses: AxiosResponse<any, any>[] = [];
while (batchOffset < queries.length) { const messageIdsByBatch: string[][] = [];
while (batchOffset < messageIds.length) {
const batchResponse = await this.fetchBatch( const batchResponse = await this.fetchBatch(
queries, messageIds,
accessToken, accessToken,
batchOffset, batchOffset,
batchLimit, batchLimit,
@ -32,19 +38,25 @@ export class MessagingFetchByBatchesService {
batchResponses = batchResponses.concat(batchResponse); batchResponses = batchResponses.concat(batchResponse);
messageIdsByBatch.push(
messageIds.slice(batchOffset, batchOffset + batchLimit),
);
batchOffset += batchLimit; batchOffset += batchLimit;
} }
return batchResponses; return { messageIdsByBatch, batchResponses };
} }
async fetchBatch( async fetchBatch(
queries: BatchQueries, messageIds: string[],
accessToken: string, accessToken: string,
batchOffset: number, batchOffset: number,
batchLimit: number, batchLimit: number,
boundary: string, boundary: string,
): Promise<AxiosResponse<any, any>> { ): Promise<AxiosResponse<any, any>> {
const queries = createQueriesFromMessageIds(messageIds);
const limitedQueries = queries.slice(batchOffset, batchOffset + batchLimit); const limitedQueries = queries.slice(batchOffset, batchOffset + batchLimit);
const response = await this.httpService.axiosRef.post( const response = await this.httpService.axiosRef.post(

View File

@ -0,0 +1,69 @@
import { Command, CommandRunner, Option } from 'nest-commander';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
MessagingAddSingleMessageToCacheForImportJob,
MessagingAddSingleMessageToCacheForImportJobData,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job';
type MessagingSingleMessageImportCommandOptions = {
messageExternalId: string;
messageChannelId: string;
workspaceId: string;
};
@Command({
name: 'messaging:single-message-import',
description: 'Enqueue a job to schedule the import of a single message',
})
export class MessagingSingleMessageImportCommand extends CommandRunner {
constructor(
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}
async run(
_passedParam: string[],
options: MessagingSingleMessageImportCommandOptions,
): Promise<void> {
await this.messageQueueService.add<MessagingAddSingleMessageToCacheForImportJobData>(
MessagingAddSingleMessageToCacheForImportJob.name,
{
messageExternalId: options.messageExternalId,
messageChannelId: options.messageChannelId,
workspaceId: options.workspaceId,
},
);
}
@Option({
flags: '-m, --message-external-id [message_external_id]',
description: 'Message external ID',
required: true,
})
parseMessageId(value: string): string {
return value;
}
@Option({
flags: '-M, --message-channel-id [message_channel_id]',
description: 'Message channel ID',
required: true,
})
parseMessageChannelId(value: string): string {
return value;
}
@Option({
flags: '-w, --workspace-id [workspace_id]',
description: 'Workspace ID',
required: true,
})
parseWorkspaceId(value: string): string {
return value;
}
}

View File

@ -7,7 +7,6 @@ import { gmail_v1 } from 'googleapis';
import { assert, assertNotNull } from 'src/utils/assert'; import { assert, assertNotNull } from 'src/utils/assert';
import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message'; import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
import { MessageQuery } from 'src/modules/messaging/message-import-manager/types/message-or-thread-query';
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util'; import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util';
import { MessagingFetchByBatchesService } from 'src/modules/messaging/common/services/messaging-fetch-by-batch.service'; import { MessagingFetchByBatchesService } from 'src/modules/messaging/common/services/messaging-fetch-by-batch.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
@ -27,7 +26,7 @@ export class MessagingGmailFetchMessagesByBatchesService {
) {} ) {}
async fetchAllMessages( async fetchAllMessages(
queries: MessageQuery[], messageIds: string[],
connectedAccountId: string, connectedAccountId: string,
workspaceId: string, workspaceId: string,
): Promise<GmailMessage[]> { ): Promise<GmailMessage[]> {
@ -46,22 +45,24 @@ export class MessagingGmailFetchMessagesByBatchesService {
const accessToken = connectedAccount.accessToken; const accessToken = connectedAccount.accessToken;
const batchResponses = await this.fetchByBatchesService.fetchAllByBatches( const { messageIdsByBatch, batchResponses } =
queries, await this.fetchByBatchesService.fetchAllByBatches(
accessToken, messageIds,
'batch_gmail_messages', accessToken,
); 'batch_gmail_messages',
);
let endTime = Date.now(); let endTime = Date.now();
this.logger.log( this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} fetching ${ `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} fetching ${
queries.length messageIds.length
} messages in ${endTime - startTime}ms`, } messages in ${endTime - startTime}ms`,
); );
startTime = Date.now(); startTime = Date.now();
const formattedResponse = this.formatBatchResponsesAsGmailMessages( const formattedResponse = this.formatBatchResponsesAsGmailMessages(
messageIdsByBatch,
batchResponses, batchResponses,
workspaceId, workspaceId,
connectedAccountId, connectedAccountId,
@ -71,7 +72,7 @@ export class MessagingGmailFetchMessagesByBatchesService {
this.logger.log( this.logger.log(
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} formatting ${ `Messaging import for workspace ${workspaceId} and account ${connectedAccountId} formatting ${
queries.length messageIds.length
} messages in ${endTime - startTime}ms`, } messages in ${endTime - startTime}ms`,
); );
@ -79,6 +80,7 @@ export class MessagingGmailFetchMessagesByBatchesService {
} }
private formatBatchResponseAsGmailMessage( private formatBatchResponseAsGmailMessage(
messageIds: string[],
responseCollection: AxiosResponse<any, any>, responseCollection: AxiosResponse<any, any>,
workspaceId: string, workspaceId: string,
connectedAccountId: string, connectedAccountId: string,
@ -90,94 +92,92 @@ export class MessagingGmailFetchMessagesByBatchesService {
return str.replace(/\0/g, ''); return str.replace(/\0/g, '');
}; };
const formattedResponse = parsedResponses.map( const formattedResponse = parsedResponses.map((response, index) => {
(response): GmailMessage | null => { if ('error' in response) {
if ('error' in response) { if (response.error.code === 404) {
if (response.error.code === 404) {
return null;
}
throw response.error;
}
const {
historyId,
id,
threadId,
internalDate,
subject,
from,
to,
cc,
bcc,
headerMessageId,
text,
attachments,
deliveredTo,
} = this.parseGmailMessage(response);
if (!from) {
this.logger.log(
`From value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);
return null; return null;
} }
if (!to && !deliveredTo && !bcc && !cc) { throw { ...response.error, messageId: messageIds[index] };
this.logger.log( }
`To, Delivered-To, Bcc or Cc value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);
return null; const {
} historyId,
id,
threadId,
internalDate,
subject,
from,
to,
cc,
bcc,
headerMessageId,
text,
attachments,
deliveredTo,
} = this.parseGmailMessage(response);
if (!headerMessageId) { if (!from) {
this.logger.log( this.logger.log(
`Message-ID is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, `From value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
); );
return null; return null;
} }
if (!threadId) { if (!to && !deliveredTo && !bcc && !cc) {
this.logger.log( this.logger.log(
`Thread Id is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`, `To, Delivered-To, Bcc or Cc value is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
); );
return null; return null;
} }
const participants = [ if (!headerMessageId) {
...formatAddressObjectAsParticipants(from, 'from'), this.logger.log(
...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'), `Message-ID is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
...formatAddressObjectAsParticipants(cc, 'cc'), );
...formatAddressObjectAsParticipants(bcc, 'bcc'),
];
let textWithoutReplyQuotations = text; return null;
}
if (text) { if (!threadId) {
textWithoutReplyQuotations = planer.extractFrom(text, 'text/plain'); this.logger.log(
} `Thread Id is missing while importing message #${id} in workspace ${workspaceId} and account ${connectedAccountId}`,
);
const messageFromGmail: GmailMessage = { return null;
historyId, }
externalId: id,
headerMessageId,
subject: subject || '',
messageThreadExternalId: threadId,
internalDate,
fromHandle: from[0].address || '',
fromDisplayName: from[0].name || '',
participants,
text: sanitizeString(textWithoutReplyQuotations || ''),
attachments,
};
return messageFromGmail; const participants = [
}, ...formatAddressObjectAsParticipants(from, 'from'),
); ...formatAddressObjectAsParticipants(to ?? deliveredTo, 'to'),
...formatAddressObjectAsParticipants(cc, 'cc'),
...formatAddressObjectAsParticipants(bcc, 'bcc'),
];
let textWithoutReplyQuotations = text;
if (text) {
textWithoutReplyQuotations = planer.extractFrom(text, 'text/plain');
}
const messageFromGmail: GmailMessage = {
historyId,
externalId: id,
headerMessageId,
subject: subject || '',
messageThreadExternalId: threadId,
internalDate,
fromHandle: from[0].address || '',
fromDisplayName: from[0].name || '',
participants,
text: sanitizeString(textWithoutReplyQuotations || ''),
attachments,
};
return messageFromGmail;
});
const filteredMessages = formattedResponse.filter((message) => const filteredMessages = formattedResponse.filter((message) =>
assertNotNull(message), assertNotNull(message),
@ -187,12 +187,14 @@ export class MessagingGmailFetchMessagesByBatchesService {
} }
private formatBatchResponsesAsGmailMessages( private formatBatchResponsesAsGmailMessages(
messageIdsByBatch: string[][],
batchResponses: AxiosResponse<any, any>[], batchResponses: AxiosResponse<any, any>[],
workspaceId: string, workspaceId: string,
connectedAccountId: string, connectedAccountId: string,
): GmailMessage[] { ): GmailMessage[] {
const messageBatches = batchResponses.map((response) => { const messageBatches = batchResponses.map((response, index) => {
return this.formatBatchResponseAsGmailMessage( return this.formatBatchResponseAsGmailMessage(
messageIdsByBatch[index],
response, response,
workspaceId, workspaceId,
connectedAccountId, connectedAccountId,

View File

@ -14,7 +14,6 @@ import {
MessageChannelWorkspaceEntity, MessageChannelWorkspaceEntity,
MessageChannelSyncStage, MessageChannelSyncStage,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { createQueriesFromMessageIds } from 'src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util';
import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util'; import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant'; import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant';
@ -95,12 +94,10 @@ export class MessagingGmailMessagesImportService {
); );
} }
const messageQueries = createQueriesFromMessageIds(messageIdsToFetch);
try { try {
const allMessages = const allMessages =
await this.fetchMessagesByBatchesService.fetchAllMessages( await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries, messageIdsToFetch,
connectedAccount.id, connectedAccount.id,
workspaceId, workspaceId,
); );
@ -153,7 +150,9 @@ export class MessagingGmailMessagesImportService {
); );
} catch (error) { } catch (error) {
this.logger.log( this.logger.log(
`Messaging import for workspace ${workspaceId} and connected account ${ `Messaging import for messageId ${
error.messageId
}, workspace ${workspaceId} and connected account ${
connectedAccount.id connectedAccount.id
} failed with error: ${JSON.stringify(error)}`, } failed with error: ${JSON.stringify(error)}`,
); );

View File

@ -0,0 +1,32 @@
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
export type MessagingAddSingleMessageToCacheForImportJobData = {
messageExternalId: string;
messageChannelId: string;
workspaceId: string;
};
@Processor(MessageQueue.messagingQueue)
export class MessagingAddSingleMessageToCacheForImportJob {
constructor(
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
) {}
@Process(MessagingAddSingleMessageToCacheForImportJob.name)
async handle(
data: MessagingAddSingleMessageToCacheForImportJobData,
): Promise<void> {
const { messageExternalId, messageChannelId, workspaceId } = data;
await this.cacheStorage.setAdd(
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
[messageExternalId],
);
}
}

View File

@ -4,11 +4,13 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity'; import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command';
import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command'; import { MessagingMessageListFetchCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-message-list-fetch.cron.command';
import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command'; import { MessagingMessagesImportCronCommand } from 'src/modules/messaging/message-import-manager/crons/commands/messaging-messages-import.cron.command';
import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job'; import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-message-list-fetch.cron.job';
import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job'; import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job';
import { MessagingGmailDriverModule } from 'src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module'; import { MessagingGmailDriverModule } from 'src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module';
import { MessagingAddSingleMessageToCacheForImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job';
import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job'; import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job'; import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job';
@ -22,10 +24,12 @@ import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import
providers: [ providers: [
MessagingMessageListFetchCronCommand, MessagingMessageListFetchCronCommand,
MessagingMessagesImportCronCommand, MessagingMessagesImportCronCommand,
MessagingSingleMessageImportCommand,
MessagingMessageListFetchJob, MessagingMessageListFetchJob,
MessagingMessagesImportJob, MessagingMessagesImportJob,
MessagingMessageListFetchCronJob, MessagingMessageListFetchCronJob,
MessagingMessagesImportCronJob, MessagingMessagesImportCronJob,
MessagingAddSingleMessageToCacheForImportJob,
], ],
exports: [], exports: [],
}) })