4398 decouple contacts and companies creation from messages import (#4590)
* emit event * create queue and listener * filter participants with role 'from' * create job * Add job to job module * Refactoring * Refactor contact creation in CreateCompanyAndContactService * update job * wip * add getByHandlesWithoutPersonIdAndWorkspaceMemberId to calendar event attendee repository * refactoring * refactoring * Revert "refactoring" This reverts commit e5434f0b871e45447227aa8d55ba5af381c3ff1c. * fix nest imports * add await * fix contact creation condition * emit contact creation event after calendar-full-sync * add await * add missing transactionManager * calendar event attendees personId update is working * messageParticipant and calendarEventAttendee update is working as intended * rename module * fix lodash import * add test * update package.json
This commit is contained in:
@ -8,7 +8,7 @@ import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/st
|
||||
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
|
||||
import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync/gmail-full-sync.service';
|
||||
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
|
||||
import { SaveMessagesAndCreateContactsModule } from 'src/modules/messaging/services/save-message-and-create-contact/save-message-and-create-contacts.module';
|
||||
import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module';
|
||||
import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata';
|
||||
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
|
||||
|
||||
@ -22,7 +22,7 @@ import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-obj
|
||||
MessageChannelMessageAssociationObjectMetadata,
|
||||
BlocklistObjectMetadata,
|
||||
]),
|
||||
SaveMessagesAndCreateContactsModule,
|
||||
SaveMessageAndEmitContactCreationEventModule,
|
||||
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
|
||||
],
|
||||
providers: [GmailFullSyncService],
|
||||
|
||||
@ -17,7 +17,7 @@ import { MessageChannelMessageAssociationRepository } from 'src/modules/messagin
|
||||
import { createQueriesFromMessageIds } from 'src/modules/messaging/utils/create-queries-from-message-ids.util';
|
||||
import { gmailSearchFilterExcludeEmails } from 'src/modules/messaging/utils/gmail-search-filter.util';
|
||||
import { BlocklistRepository } from 'src/modules/connected-account/repositories/blocklist.repository';
|
||||
import { SaveMessagesAndCreateContactsService } from 'src/modules/messaging/services/save-message-and-create-contact/save-messages-and-create-contacts.service';
|
||||
import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service';
|
||||
import {
|
||||
FeatureFlagEntity,
|
||||
FeatureFlagKeys,
|
||||
@ -47,7 +47,7 @@ export class GmailFullSyncService {
|
||||
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
|
||||
@InjectObjectMetadataRepository(BlocklistObjectMetadata)
|
||||
private readonly blocklistRepository: BlocklistRepository,
|
||||
private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService,
|
||||
private readonly saveMessagesAndCreateContactsService: SaveMessageAndEmitContactCreationEventService,
|
||||
@InjectRepository(FeatureFlagEntity, 'core')
|
||||
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
|
||||
) {}
|
||||
|
||||
@ -9,7 +9,7 @@ import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fet
|
||||
import { GmailPartialSyncService } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service';
|
||||
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
|
||||
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
|
||||
import { SaveMessagesAndCreateContactsModule } from 'src/modules/messaging/services/save-message-and-create-contact/save-message-and-create-contacts.module';
|
||||
import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module';
|
||||
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
|
||||
|
||||
@Module({
|
||||
@ -22,7 +22,7 @@ import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-obj
|
||||
BlocklistObjectMetadata,
|
||||
]),
|
||||
MessageModule,
|
||||
SaveMessagesAndCreateContactsModule,
|
||||
SaveMessageAndEmitContactCreationEventModule,
|
||||
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
|
||||
],
|
||||
providers: [GmailPartialSyncService],
|
||||
|
||||
@ -18,7 +18,7 @@ import { createQueriesFromMessageIds } from 'src/modules/messaging/utils/create-
|
||||
import { GmailMessage } from 'src/modules/messaging/types/gmail-message';
|
||||
import { isPersonEmail } from 'src/modules/messaging/utils/is-person-email.util';
|
||||
import { BlocklistRepository } from 'src/modules/connected-account/repositories/blocklist.repository';
|
||||
import { SaveMessagesAndCreateContactsService } from 'src/modules/messaging/services/save-message-and-create-contact/save-messages-and-create-contacts.service';
|
||||
import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service';
|
||||
import {
|
||||
FeatureFlagEntity,
|
||||
FeatureFlagKeys,
|
||||
@ -45,7 +45,7 @@ export class GmailPartialSyncService {
|
||||
private readonly messageService: MessageService,
|
||||
@InjectObjectMetadataRepository(BlocklistObjectMetadata)
|
||||
private readonly blocklistRepository: BlocklistRepository,
|
||||
private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService,
|
||||
private readonly saveMessagesAndCreateContactsService: SaveMessageAndEmitContactCreationEventService,
|
||||
@InjectRepository(FeatureFlagEntity, 'core')
|
||||
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
|
||||
) {}
|
||||
|
||||
@ -7,6 +7,7 @@ import { ParticipantWithId } from 'src/modules/messaging/types/gmail-message';
|
||||
import { PersonRepository } from 'src/modules/person/repositories/person.repository';
|
||||
import { PersonObjectMetadata } from 'src/modules/person/standard-objects/person.object-metadata';
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
import { getFlattenedValuesAndValuesStringForBatchRawQuery } from 'src/modules/calendar/utils/getFlattenedValuesAndValuesStringForBatchRawQuery.util';
|
||||
|
||||
@Injectable()
|
||||
export class MessageParticipantService {
|
||||
@ -34,24 +35,29 @@ export class MessageParticipantService {
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
const messageParticipantsToUpdate = participants.map((participant) => [
|
||||
participant.id,
|
||||
participantPersonIds.find(
|
||||
const messageParticipantsToUpdate = participants.map((participant) => ({
|
||||
id: participant.id,
|
||||
personId: participantPersonIds.find(
|
||||
(e: { id: string; email: string }) => e.email === participant.handle,
|
||||
)?.id,
|
||||
]);
|
||||
}));
|
||||
|
||||
if (messageParticipantsToUpdate.length === 0) return;
|
||||
|
||||
const valuesString = messageParticipantsToUpdate
|
||||
.map((_, index) => `($${index * 2 + 1}::uuid, $${index * 2 + 2}::uuid)`)
|
||||
.join(', ');
|
||||
const { flattenedValues, valuesString } =
|
||||
getFlattenedValuesAndValuesStringForBatchRawQuery(
|
||||
messageParticipantsToUpdate,
|
||||
{
|
||||
id: 'uuid',
|
||||
personId: 'uuid',
|
||||
},
|
||||
);
|
||||
|
||||
await this.workspaceDataSourceService.executeRawQuery(
|
||||
`UPDATE ${dataSourceSchema}."messageParticipant" AS "messageParticipant" SET "personId" = "data"."personId"
|
||||
FROM (VALUES ${valuesString}) AS "data"("id", "personId")
|
||||
WHERE "messageParticipant"."id" = "data"."id"`,
|
||||
messageParticipantsToUpdate.flat(),
|
||||
flattenedValues,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
@ -2,10 +2,10 @@ import { Module } from '@nestjs/common';
|
||||
|
||||
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
|
||||
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
|
||||
import { CreateCompaniesAndContactsModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/create-company-and-contact/create-company-and-contact.module';
|
||||
import { AutoCompaniesAndContactsCreationModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/auto-companies-and-contacts-creation.module';
|
||||
import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module';
|
||||
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
|
||||
import { SaveMessagesAndCreateContactsService } from 'src/modules/messaging/services/save-message-and-create-contact/save-messages-and-create-contacts.service';
|
||||
import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service';
|
||||
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
|
||||
import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata';
|
||||
|
||||
@ -16,11 +16,11 @@ import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard
|
||||
MessageChannelObjectMetadata,
|
||||
MessageParticipantObjectMetadata,
|
||||
]),
|
||||
CreateCompaniesAndContactsModule,
|
||||
AutoCompaniesAndContactsCreationModule,
|
||||
MessageParticipantModule,
|
||||
WorkspaceDataSourceModule,
|
||||
],
|
||||
providers: [SaveMessagesAndCreateContactsService],
|
||||
exports: [SaveMessagesAndCreateContactsService],
|
||||
providers: [SaveMessageAndEmitContactCreationEventService],
|
||||
exports: [SaveMessageAndEmitContactCreationEventService],
|
||||
})
|
||||
export class SaveMessagesAndCreateContactsModule {}
|
||||
export class SaveMessageAndEmitContactCreationEventModule {}
|
||||
@ -1,10 +1,8 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { EntityManager } from 'typeorm';
|
||||
import { EventEmitter2 } from '@nestjs/event-emitter';
|
||||
|
||||
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
|
||||
import { MessageParticipantRepository } from 'src/modules/messaging/repositories/message-participant.repository';
|
||||
import { CreateCompanyAndContactService } from 'src/modules/connected-account/auto-companies-and-contacts-creation/create-company-and-contact/create-company-and-contact.service';
|
||||
import {
|
||||
GmailMessage,
|
||||
ParticipantWithMessageId,
|
||||
@ -16,12 +14,11 @@ import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repos
|
||||
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
|
||||
import { MessageService } from 'src/modules/messaging/services/message/message.service';
|
||||
import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata';
|
||||
import { MessageParticipantService } from 'src/modules/messaging/services/message-participant/message-participant.service';
|
||||
|
||||
@Injectable()
|
||||
export class SaveMessagesAndCreateContactsService {
|
||||
export class SaveMessageAndEmitContactCreationEventService {
|
||||
private readonly logger = new Logger(
|
||||
SaveMessagesAndCreateContactsService.name,
|
||||
SaveMessageAndEmitContactCreationEventService.name,
|
||||
);
|
||||
|
||||
constructor(
|
||||
@ -30,8 +27,7 @@ export class SaveMessagesAndCreateContactsService {
|
||||
private readonly messageChannelRepository: MessageChannelRepository,
|
||||
@InjectObjectMetadataRepository(MessageParticipantObjectMetadata)
|
||||
private readonly messageParticipantRepository: MessageParticipantRepository,
|
||||
private readonly createCompaniesAndContactsService: CreateCompanyAndContactService,
|
||||
private readonly messageParticipantService: MessageParticipantService,
|
||||
private readonly eventEmitter: EventEmitter2,
|
||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||
) {}
|
||||
|
||||
@ -80,68 +76,28 @@ export class SaveMessagesAndCreateContactsService {
|
||||
return;
|
||||
}
|
||||
|
||||
const isContactAutoCreationEnabled =
|
||||
gmailMessageChannel.isContactAutoCreationEnabled;
|
||||
const participantsWithMessageId: (ParticipantWithMessageId & {
|
||||
shouldCreateContact: boolean;
|
||||
})[] = messagesToSave.flatMap((message) => {
|
||||
const messageId = messageExternalIdsAndIdsMap.get(message.externalId);
|
||||
|
||||
const participantsWithMessageId: ParticipantWithMessageId[] =
|
||||
messagesToSave.flatMap((message) => {
|
||||
const messageId = messageExternalIdsAndIdsMap.get(message.externalId);
|
||||
|
||||
return messageId
|
||||
? message.participants.map((participant) => ({
|
||||
...participant,
|
||||
messageId,
|
||||
}))
|
||||
: [];
|
||||
});
|
||||
|
||||
const contactsToCreate = messagesToSave
|
||||
.filter((message) => connectedAccount.handle === message.fromHandle)
|
||||
.flatMap((message) => message.participants);
|
||||
|
||||
if (isContactAutoCreationEnabled) {
|
||||
startTime = Date.now();
|
||||
|
||||
await workspaceDataSource?.transaction(
|
||||
async (transactionManager: EntityManager) => {
|
||||
await this.createCompaniesAndContactsService.createCompaniesAndContacts(
|
||||
connectedAccount.handle,
|
||||
contactsToCreate,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
const handles = participantsWithMessageId.map(
|
||||
(participant) => participant.handle,
|
||||
);
|
||||
|
||||
const messageParticipantsWithoutPersonIdAndWorkspaceMemberId =
|
||||
await this.messageParticipantRepository.getByHandlesWithoutPersonIdAndWorkspaceMemberId(
|
||||
handles,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await this.messageParticipantService.updateMessageParticipantsAfterPeopleCreation(
|
||||
messageParticipantsWithoutPersonIdAndWorkspaceMemberId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
endTime = Date.now();
|
||||
|
||||
this.logger.log(
|
||||
`${jobName} creating companies and contacts for workspace ${workspaceId} and account ${
|
||||
connectedAccount.id
|
||||
} in ${endTime - startTime}ms`,
|
||||
);
|
||||
}
|
||||
return messageId
|
||||
? message.participants.map((participant) => ({
|
||||
...participant,
|
||||
messageId,
|
||||
shouldCreateContact:
|
||||
gmailMessageChannel.isContactAutoCreationEnabled &&
|
||||
message.participants.find((p) => p.role === 'from')?.handle ===
|
||||
connectedAccount.handle,
|
||||
}))
|
||||
: [];
|
||||
});
|
||||
|
||||
startTime = Date.now();
|
||||
|
||||
await this.tryToSaveMessageParticipantsOrDeleteMessagesIfError(
|
||||
participantsWithMessageId,
|
||||
gmailMessageChannelId,
|
||||
gmailMessageChannel,
|
||||
workspaceId,
|
||||
connectedAccount,
|
||||
jobName,
|
||||
@ -157,8 +113,10 @@ export class SaveMessagesAndCreateContactsService {
|
||||
}
|
||||
|
||||
private async tryToSaveMessageParticipantsOrDeleteMessagesIfError(
|
||||
participantsWithMessageId: ParticipantWithMessageId[],
|
||||
gmailMessageChannelId: string,
|
||||
participantsWithMessageId: (ParticipantWithMessageId & {
|
||||
shouldCreateContact: boolean;
|
||||
})[],
|
||||
gmailMessageChannel: ObjectRecord<MessageChannelObjectMetadata>,
|
||||
workspaceId: string,
|
||||
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
|
||||
jobName?: string,
|
||||
@ -168,6 +126,18 @@ export class SaveMessagesAndCreateContactsService {
|
||||
participantsWithMessageId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (gmailMessageChannel.isContactAutoCreationEnabled) {
|
||||
const contactsToCreate = participantsWithMessageId.filter(
|
||||
(participant) => participant.shouldCreateContact,
|
||||
);
|
||||
|
||||
this.eventEmitter.emit(`createContacts`, {
|
||||
workspaceId,
|
||||
connectedAccountHandle: connectedAccount.handle,
|
||||
contactsToCreate,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`${jobName} error saving message participants for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
||||
@ -180,7 +150,7 @@ export class SaveMessagesAndCreateContactsService {
|
||||
|
||||
await this.messageService.deleteMessages(
|
||||
messagesToDelete,
|
||||
gmailMessageChannelId,
|
||||
gmailMessageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user