Improve messaging/calendar create contact performance (#5314)
In this PR, I'm refactoring the way we associate messageParticipant post person/company creation. Instead of looking a all person without participant, we are passing the one that were just created. Also, I'm making sure the message and messageParticipant creation transaction is commited before creating person/company creation (and then messageParticipant association)
This commit is contained in:
@ -212,7 +212,7 @@ export class CalendarEventParticipantRepository {
|
||||
handle: 'text',
|
||||
displayName: 'text',
|
||||
isOrganizer: 'boolean',
|
||||
responseStatus: `${dataSourceSchema}."calendarEventParticipant_responsestatus_enum"`,
|
||||
responseStatus: `${dataSourceSchema}."calendarEventParticipant_responseStatus_enum"`,
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@ -11,6 +11,7 @@ import { CalendarEventParticipant } from 'src/modules/calendar/types/calendar-ev
|
||||
import { CalendarEventParticipantRepository } from 'src/modules/calendar/repositories/calendar-event-participant.repository';
|
||||
import { CalendarEventParticipantObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-event-participant.object-metadata';
|
||||
import { AddPersonIdAndWorkspaceMemberIdService } from 'src/modules/calendar-messaging-participant/services/add-person-id-and-workspace-member-id/add-person-id-and-workspace-member-id.service';
|
||||
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
|
||||
|
||||
@Injectable()
|
||||
export class CalendarEventParticipantService {
|
||||
@ -24,11 +25,13 @@ export class CalendarEventParticipantService {
|
||||
) {}
|
||||
|
||||
public async updateCalendarEventParticipantsAfterPeopleCreation(
|
||||
createdPeople: ObjectRecord<PersonObjectMetadata>[],
|
||||
workspaceId: string,
|
||||
transactionManager?: EntityManager,
|
||||
): Promise<void> {
|
||||
const participants =
|
||||
await this.calendarEventParticipantRepository.getWithoutPersonIdAndWorkspaceMemberId(
|
||||
await this.calendarEventParticipantRepository.getByHandles(
|
||||
createdPeople.map((person) => person.email),
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
@ -102,7 +105,7 @@ export class CalendarEventParticipantService {
|
||||
handle: 'text',
|
||||
displayName: 'text',
|
||||
isOrganizer: 'boolean',
|
||||
responseStatus: `${dataSourceSchema}."calendarEventParticipant_responsestatus_enum"`,
|
||||
responseStatus: `${dataSourceSchema}."calendarEventParticipant_responseStatus_enum"`,
|
||||
personId: 'uuid',
|
||||
workspaceMemberId: 'uuid',
|
||||
},
|
||||
|
||||
@ -7,6 +7,7 @@ import { PersonRepository } from 'src/modules/person/repositories/person.reposit
|
||||
import { getFirstNameAndLastNameFromHandleAndDisplayName } from 'src/modules/calendar-messaging-participant/utils/get-first-name-and-last-name-from-handle-and-display-name.util';
|
||||
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
|
||||
import { PersonObjectMetadata } from 'src/modules/person/standard-objects/person.object-metadata';
|
||||
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
|
||||
|
||||
type ContactToCreate = {
|
||||
handle: string;
|
||||
@ -50,16 +51,16 @@ export class CreateContactService {
|
||||
});
|
||||
}
|
||||
|
||||
public async createContacts(
|
||||
public async createPeople(
|
||||
contactsToCreate: ContactToCreate[],
|
||||
workspaceId: string,
|
||||
transactionManager?: EntityManager,
|
||||
): Promise<void> {
|
||||
if (contactsToCreate.length === 0) return;
|
||||
): Promise<ObjectRecord<PersonObjectMetadata>[]> {
|
||||
if (contactsToCreate.length === 0) return [];
|
||||
|
||||
const formattedContacts = this.formatContacts(contactsToCreate);
|
||||
|
||||
await this.personRepository.createPeople(
|
||||
return await this.personRepository.createPeople(
|
||||
formattedContacts,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
|
||||
@ -20,6 +20,7 @@ import { MessageParticipantService } from 'src/modules/messaging/services/messag
|
||||
import { CalendarEventParticipantService } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service';
|
||||
import { filterOutContactsFromCompanyOrWorkspace } from 'src/modules/connected-account/auto-companies-and-contacts-creation/utils/filter-out-contacts-from-company-or-workspace.util';
|
||||
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
|
||||
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
|
||||
|
||||
@Injectable()
|
||||
export class CreateCompanyAndContactService {
|
||||
@ -37,14 +38,14 @@ export class CreateCompanyAndContactService {
|
||||
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
|
||||
) {}
|
||||
|
||||
async createCompaniesAndContacts(
|
||||
async createCompaniesAndPeople(
|
||||
connectedAccountHandle: string,
|
||||
contactsToCreate: Contacts,
|
||||
workspaceId: string,
|
||||
transactionManager?: EntityManager,
|
||||
) {
|
||||
): Promise<ObjectRecord<PersonObjectMetadata>[]> {
|
||||
if (!contactsToCreate || contactsToCreate.length === 0) {
|
||||
return;
|
||||
return [];
|
||||
}
|
||||
|
||||
// TODO: This is a feature that may be implemented in the future
|
||||
@ -68,7 +69,7 @@ export class CreateCompanyAndContactService {
|
||||
);
|
||||
|
||||
if (uniqueHandles.length === 0) {
|
||||
return;
|
||||
return [];
|
||||
}
|
||||
|
||||
const alreadyCreatedContacts = await this.personRepository.getByEmails(
|
||||
@ -120,7 +121,7 @@ export class CreateCompanyAndContactService {
|
||||
: undefined,
|
||||
}));
|
||||
|
||||
await this.createContactService.createContacts(
|
||||
return await this.createContactService.createPeople(
|
||||
formattedContactsToCreate,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
@ -139,7 +140,7 @@ export class CreateCompanyAndContactService {
|
||||
|
||||
await workspaceDataSource?.transaction(
|
||||
async (transactionManager: EntityManager) => {
|
||||
await this.createCompaniesAndContacts(
|
||||
const createdPeople = await this.createCompaniesAndPeople(
|
||||
connectedAccountHandle,
|
||||
contactsToCreate,
|
||||
workspaceId,
|
||||
@ -147,11 +148,13 @@ export class CreateCompanyAndContactService {
|
||||
);
|
||||
|
||||
await this.messageParticipantService.updateMessageParticipantsAfterPeopleCreation(
|
||||
createdPeople,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
await this.calendarEventParticipantService.updateCalendarEventParticipantsAfterPeopleCreation(
|
||||
createdPeople,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
@ -5,8 +5,8 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
|
||||
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
|
||||
import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fetch-messages-by-batches/fetch-messages-by-batches.module';
|
||||
import { GmailFetchMessageContentFromCacheService } from 'src/modules/messaging/services/gmail-fetch-message-content-from-cache/gmail-fetch-message-content-from-cache.service';
|
||||
import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module';
|
||||
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
|
||||
import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module';
|
||||
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
|
||||
|
||||
@Module({
|
||||
@ -16,9 +16,10 @@ import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-obj
|
||||
ConnectedAccountObjectMetadata,
|
||||
MessageChannelObjectMetadata,
|
||||
]),
|
||||
SaveMessageAndEmitContactCreationEventModule,
|
||||
MessageModule,
|
||||
WorkspaceDataSourceModule,
|
||||
MessageModule,
|
||||
MessageParticipantModule,
|
||||
],
|
||||
providers: [GmailFetchMessageContentFromCacheService],
|
||||
exports: [GmailFetchMessageContentFromCacheService],
|
||||
|
||||
@ -17,7 +17,6 @@ import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/typ
|
||||
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
|
||||
import { GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/constants/gmail-users-messages-get-batch-size.constant';
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service';
|
||||
import {
|
||||
GmailFullSyncJobData,
|
||||
GmailFullSyncJob,
|
||||
@ -25,6 +24,13 @@ import {
|
||||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||
import { GMAIL_ONGOING_SYNC_TIMEOUT } from 'src/modules/messaging/constants/gmail-ongoing-sync-timeout.constant';
|
||||
import { MessageParticipantService } from 'src/modules/messaging/services/message-participant/message-participant.service';
|
||||
import { MessageService } from 'src/modules/messaging/services/message/message.service';
|
||||
import { ParticipantWithMessageId } from 'src/modules/messaging/types/gmail-message';
|
||||
import {
|
||||
CreateCompanyAndContactJobData,
|
||||
CreateCompanyAndContactJob,
|
||||
} from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job';
|
||||
|
||||
@Injectable()
|
||||
export class GmailFetchMessageContentFromCacheService {
|
||||
@ -38,12 +44,13 @@ export class GmailFetchMessageContentFromCacheService {
|
||||
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
||||
@InjectObjectMetadataRepository(MessageChannelObjectMetadata)
|
||||
private readonly messageChannelRepository: MessageChannelRepository,
|
||||
private readonly saveMessageAndEmitContactCreationEventService: SaveMessageAndEmitContactCreationEventService,
|
||||
@InjectCacheStorage(CacheStorageNamespace.Messaging)
|
||||
private readonly cacheStorage: CacheStorageService,
|
||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||
@Inject(MessageQueue.messagingQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
private readonly messageService: MessageService,
|
||||
private readonly messageParticipantService: MessageParticipantService,
|
||||
) {}
|
||||
|
||||
async fetchMessageContentFromCache(
|
||||
@ -164,89 +171,131 @@ export class GmailFetchMessageContentFromCacheService {
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
await workspaceDataSource
|
||||
?.transaction(async (transactionManager: EntityManager) => {
|
||||
const messageQueries = createQueriesFromMessageIds(messageIdsToFetch);
|
||||
const messageQueries = createQueriesFromMessageIds(messageIdsToFetch);
|
||||
|
||||
const { messages: messagesToSave, errors } =
|
||||
await this.fetchMessagesByBatchesService.fetchAllMessages(
|
||||
messageQueries,
|
||||
accessToken,
|
||||
try {
|
||||
const { messages: messagesToSave, errors } =
|
||||
await this.fetchMessagesByBatchesService.fetchAllMessages(
|
||||
messageQueries,
|
||||
accessToken,
|
||||
workspaceId,
|
||||
connectedAccountId,
|
||||
);
|
||||
|
||||
const participantsWithMessageId = await workspaceDataSource?.transaction(
|
||||
async (transactionManager: EntityManager) => {
|
||||
if (!messagesToSave.length) {
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannelId,
|
||||
MessageChannelSyncStatus.PENDING,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
if (errors.length) {
|
||||
const errorsCanBeIgnored = errors.every(
|
||||
(error) => error.code === 404,
|
||||
);
|
||||
|
||||
if (!errorsCanBeIgnored) {
|
||||
throw new Error(
|
||||
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${JSON.stringify(
|
||||
errors,
|
||||
null,
|
||||
2,
|
||||
)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const messageExternalIdsAndIdsMap =
|
||||
await this.messageService.saveMessagesWithinTransaction(
|
||||
messagesToSave,
|
||||
connectedAccount,
|
||||
gmailMessageChannel.id,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
const participantsWithMessageId: (ParticipantWithMessageId & {
|
||||
shouldCreateContact: boolean;
|
||||
})[] = messagesToSave.flatMap((message) => {
|
||||
const messageId = messageExternalIdsAndIdsMap.get(
|
||||
message.externalId,
|
||||
);
|
||||
|
||||
return messageId
|
||||
? message.participants.map((participant) => ({
|
||||
...participant,
|
||||
messageId,
|
||||
shouldCreateContact:
|
||||
gmailMessageChannel.isContactAutoCreationEnabled &&
|
||||
message.participants.find((p) => p.role === 'from')
|
||||
?.handle === connectedAccount.handle,
|
||||
}))
|
||||
: [];
|
||||
});
|
||||
|
||||
await this.messageParticipantService.saveMessageParticipants(
|
||||
participantsWithMessageId,
|
||||
workspaceId,
|
||||
connectedAccountId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
if (!messagesToSave.length) {
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannelId,
|
||||
MessageChannelSyncStatus.PENDING,
|
||||
workspaceId,
|
||||
);
|
||||
if (messageIdsToFetch.length < GMAIL_USERS_MESSAGES_GET_BATCH_SIZE) {
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannelId,
|
||||
MessageChannelSyncStatus.SUCCEEDED,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
this.logger.log(
|
||||
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with no more messages to import.`,
|
||||
);
|
||||
} else {
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannelId,
|
||||
MessageChannelSyncStatus.PENDING,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
if (errors.length) {
|
||||
const errorsCanBeIgnored = errors.every(
|
||||
(error) => error.code === 404,
|
||||
);
|
||||
|
||||
if (!errorsCanBeIgnored) {
|
||||
throw new Error(
|
||||
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${JSON.stringify(
|
||||
errors,
|
||||
null,
|
||||
2,
|
||||
)}`,
|
||||
this.logger.log(
|
||||
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with more messages to import.`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await this.saveMessageAndEmitContactCreationEventService.saveMessagesAndEmitContactCreationEventWithinTransaction(
|
||||
messagesToSave,
|
||||
connectedAccount,
|
||||
workspaceId,
|
||||
gmailMessageChannel,
|
||||
transactionManager,
|
||||
return participantsWithMessageId;
|
||||
},
|
||||
);
|
||||
|
||||
if (gmailMessageChannel.isContactAutoCreationEnabled) {
|
||||
const contactsToCreate = participantsWithMessageId.filter(
|
||||
(participant) => participant.shouldCreateContact,
|
||||
);
|
||||
|
||||
if (messageIdsToFetch.length < GMAIL_USERS_MESSAGES_GET_BATCH_SIZE) {
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannelId,
|
||||
MessageChannelSyncStatus.SUCCEEDED,
|
||||
await this.messageQueueService.add<CreateCompanyAndContactJobData>(
|
||||
CreateCompanyAndContactJob.name,
|
||||
{
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
this.logger.log(
|
||||
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with no more messages to import.`,
|
||||
);
|
||||
} else {
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannelId,
|
||||
MessageChannelSyncStatus.PENDING,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
this.logger.log(
|
||||
`Messaging import for workspace ${workspaceId} and account ${connectedAccountId} done with more messages to import.`,
|
||||
);
|
||||
}
|
||||
})
|
||||
.catch(async (error) => {
|
||||
await this.cacheStorage.setAdd(
|
||||
`messages-to-import:${workspaceId}:gmail:${gmailMessageChannelId}`,
|
||||
messageIdsToFetch,
|
||||
connectedAccountHandle: connectedAccount.handle,
|
||||
contactsToCreate,
|
||||
},
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
await this.cacheStorage.setAdd(
|
||||
`messages-to-import:${workspaceId}:gmail:${gmailMessageChannelId}`,
|
||||
messageIdsToFetch,
|
||||
);
|
||||
|
||||
if (error?.message?.code === 429) {
|
||||
this.logger.error(
|
||||
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: Resource has been exhausted, locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
if (error?.message?.code === 429) {
|
||||
this.logger.error(
|
||||
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: Resource has been exhausted, locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`,
|
||||
);
|
||||
|
||||
await this.messageChannelRepository.updateSyncStatus(
|
||||
gmailMessageChannelId,
|
||||
@ -257,7 +306,8 @@ export class GmailFetchMessageContentFromCacheService {
|
||||
throw new Error(
|
||||
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: ${error.message}`,
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async fallbackToFullSync(
|
||||
|
||||
@ -10,7 +10,6 @@ import { FetchMessagesByBatchesModule } from 'src/modules/messaging/services/fet
|
||||
import { GmailPartialSyncV2Service as GmailPartialSyncService } from 'src/modules/messaging/services/gmail-partial-sync/gmail-partial-sync.service';
|
||||
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
|
||||
import { MessagingProvidersModule } from 'src/modules/messaging/services/providers/messaging-providers.module';
|
||||
import { SaveMessageAndEmitContactCreationEventModule } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.module';
|
||||
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
|
||||
|
||||
@Module({
|
||||
@ -23,7 +22,6 @@ import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-obj
|
||||
BlocklistObjectMetadata,
|
||||
]),
|
||||
MessageModule,
|
||||
SaveMessageAndEmitContactCreationEventModule,
|
||||
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
|
||||
WorkspaceDataSourceModule,
|
||||
],
|
||||
|
||||
@ -11,6 +11,7 @@ import { getFlattenedValuesAndValuesStringForBatchRawQuery } from 'src/modules/c
|
||||
import { MessageParticipantRepository } from 'src/modules/messaging/repositories/message-participant.repository';
|
||||
import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata';
|
||||
import { AddPersonIdAndWorkspaceMemberIdService } from 'src/modules/calendar-messaging-participant/services/add-person-id-and-workspace-member-id/add-person-id-and-workspace-member-id.service';
|
||||
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
|
||||
|
||||
@Injectable()
|
||||
export class MessageParticipantService {
|
||||
@ -24,13 +25,15 @@ export class MessageParticipantService {
|
||||
) {}
|
||||
|
||||
public async updateMessageParticipantsAfterPeopleCreation(
|
||||
createdPeople: ObjectRecord<PersonObjectMetadata>[],
|
||||
workspaceId: string,
|
||||
transactionManager?: EntityManager,
|
||||
): Promise<void> {
|
||||
const participants =
|
||||
await this.messageParticipantRepository.getWithoutPersonIdAndWorkspaceMemberId(
|
||||
workspaceId,
|
||||
);
|
||||
const participants = await this.messageParticipantRepository.getByHandles(
|
||||
createdPeople.map((person) => person.email),
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
if (!participants) return;
|
||||
|
||||
|
||||
@ -1,22 +0,0 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
|
||||
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
|
||||
import { AutoCompaniesAndContactsCreationModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/auto-companies-and-contacts-creation.module';
|
||||
import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module';
|
||||
import { MessageModule } from 'src/modules/messaging/services/message/message.module';
|
||||
import { SaveMessageAndEmitContactCreationEventService } from 'src/modules/messaging/services/save-message-and-emit-contact-creation-event/save-message-and-emit-contact-creation-event.service';
|
||||
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
MessageModule,
|
||||
ObjectMetadataRepositoryModule.forFeature([MessageChannelObjectMetadata]),
|
||||
AutoCompaniesAndContactsCreationModule,
|
||||
MessageParticipantModule,
|
||||
WorkspaceDataSourceModule,
|
||||
],
|
||||
providers: [SaveMessageAndEmitContactCreationEventService],
|
||||
exports: [SaveMessageAndEmitContactCreationEventService],
|
||||
})
|
||||
export class SaveMessageAndEmitContactCreationEventModule {}
|
||||
@ -1,219 +0,0 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { EntityManager } from 'typeorm';
|
||||
|
||||
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
|
||||
import {
|
||||
GmailMessage,
|
||||
ParticipantWithMessageId,
|
||||
} from 'src/modules/messaging/types/gmail-message';
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
|
||||
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
|
||||
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
|
||||
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
|
||||
import { MessageService } from 'src/modules/messaging/services/message/message.service';
|
||||
import { MessageParticipantService } from 'src/modules/messaging/services/message-participant/message-participant.service';
|
||||
import {
|
||||
CreateCompanyAndContactJobData,
|
||||
CreateCompanyAndContactJob,
|
||||
} from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job';
|
||||
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
|
||||
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
|
||||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||
|
||||
@Injectable()
|
||||
export class SaveMessageAndEmitContactCreationEventService {
|
||||
private readonly logger = new Logger(
|
||||
SaveMessageAndEmitContactCreationEventService.name,
|
||||
);
|
||||
|
||||
constructor(
|
||||
private readonly messageService: MessageService,
|
||||
@InjectObjectMetadataRepository(MessageChannelObjectMetadata)
|
||||
private readonly messageChannelRepository: MessageChannelRepository,
|
||||
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
|
||||
private readonly messageParticipantService: MessageParticipantService,
|
||||
@InjectMessageQueue(MessageQueue.emailQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
) {}
|
||||
|
||||
public async saveMessagesAndEmitContactCreationEventWithinTransaction(
|
||||
messagesToSave: GmailMessage[],
|
||||
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
|
||||
workspaceId: string,
|
||||
gmailMessageChannel: ObjectRecord<MessageChannelObjectMetadata>,
|
||||
transactionManager: EntityManager,
|
||||
) {
|
||||
const messageExternalIdsAndIdsMap =
|
||||
await this.messageService.saveMessagesWithinTransaction(
|
||||
messagesToSave,
|
||||
connectedAccount,
|
||||
gmailMessageChannel.id,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
const participantsWithMessageId: (ParticipantWithMessageId & {
|
||||
shouldCreateContact: boolean;
|
||||
})[] = messagesToSave.flatMap((message) => {
|
||||
const messageId = messageExternalIdsAndIdsMap.get(message.externalId);
|
||||
|
||||
return messageId
|
||||
? message.participants.map((participant) => ({
|
||||
...participant,
|
||||
messageId,
|
||||
shouldCreateContact:
|
||||
gmailMessageChannel.isContactAutoCreationEnabled &&
|
||||
message.participants.find((p) => p.role === 'from')?.handle ===
|
||||
connectedAccount.handle,
|
||||
}))
|
||||
: [];
|
||||
});
|
||||
|
||||
await this.messageParticipantService.saveMessageParticipants(
|
||||
participantsWithMessageId,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
if (gmailMessageChannel.isContactAutoCreationEnabled) {
|
||||
const contactsToCreate = participantsWithMessageId.filter(
|
||||
(participant) => participant.shouldCreateContact,
|
||||
);
|
||||
|
||||
await this.messageQueueService.add<CreateCompanyAndContactJobData>(
|
||||
CreateCompanyAndContactJob.name,
|
||||
{
|
||||
workspaceId,
|
||||
connectedAccountHandle: connectedAccount.handle,
|
||||
contactsToCreate,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async saveMessagesAndEmitContactCreation(
|
||||
messagesToSave: GmailMessage[],
|
||||
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
|
||||
workspaceId: string,
|
||||
gmailMessageChannelId: string,
|
||||
) {
|
||||
const { dataSource: workspaceDataSource } =
|
||||
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
let startTime = Date.now();
|
||||
|
||||
const messageExternalIdsAndIdsMap = await this.messageService.saveMessages(
|
||||
messagesToSave,
|
||||
workspaceDataSource,
|
||||
connectedAccount,
|
||||
gmailMessageChannelId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
let endTime = Date.now();
|
||||
|
||||
this.logger.log(
|
||||
`Saving messages for workspace ${workspaceId} and account ${
|
||||
connectedAccount.id
|
||||
} in ${endTime - startTime}ms`,
|
||||
);
|
||||
|
||||
const gmailMessageChannel =
|
||||
await this.messageChannelRepository.getFirstByConnectedAccountId(
|
||||
connectedAccount.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (!gmailMessageChannel) {
|
||||
this.logger.error(
|
||||
`No message channel found for connected account ${connectedAccount.id} in workspace ${workspaceId} in saveMessagesAndCreateContacts`,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const participantsWithMessageId: (ParticipantWithMessageId & {
|
||||
shouldCreateContact: boolean;
|
||||
})[] = messagesToSave.flatMap((message) => {
|
||||
const messageId = messageExternalIdsAndIdsMap.get(message.externalId);
|
||||
|
||||
return messageId
|
||||
? message.participants.map((participant) => ({
|
||||
...participant,
|
||||
messageId,
|
||||
shouldCreateContact:
|
||||
gmailMessageChannel.isContactAutoCreationEnabled &&
|
||||
message.participants.find((p) => p.role === 'from')?.handle ===
|
||||
connectedAccount.handle,
|
||||
}))
|
||||
: [];
|
||||
});
|
||||
|
||||
startTime = Date.now();
|
||||
|
||||
await this.tryToSaveMessageParticipantsOrDeleteMessagesIfError(
|
||||
participantsWithMessageId,
|
||||
gmailMessageChannel,
|
||||
workspaceId,
|
||||
connectedAccount,
|
||||
);
|
||||
|
||||
endTime = Date.now();
|
||||
|
||||
this.logger.log(
|
||||
`Saving message participants for workspace ${workspaceId} and account in ${
|
||||
connectedAccount.id
|
||||
} ${endTime - startTime}ms`,
|
||||
);
|
||||
}
|
||||
|
||||
private async tryToSaveMessageParticipantsOrDeleteMessagesIfError(
|
||||
participantsWithMessageId: (ParticipantWithMessageId & {
|
||||
shouldCreateContact: boolean;
|
||||
})[],
|
||||
gmailMessageChannel: ObjectRecord<MessageChannelObjectMetadata>,
|
||||
workspaceId: string,
|
||||
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
|
||||
) {
|
||||
try {
|
||||
await this.messageParticipantService.saveMessageParticipants(
|
||||
participantsWithMessageId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (gmailMessageChannel.isContactAutoCreationEnabled) {
|
||||
const contactsToCreate = participantsWithMessageId.filter(
|
||||
(participant) => participant.shouldCreateContact,
|
||||
);
|
||||
|
||||
await this.messageQueueService.add<CreateCompanyAndContactJobData>(
|
||||
CreateCompanyAndContactJob.name,
|
||||
{
|
||||
workspaceId,
|
||||
connectedAccountHandle: connectedAccount.handle,
|
||||
contactsToCreate,
|
||||
},
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error saving message participants for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
||||
error,
|
||||
);
|
||||
|
||||
const messagesToDelete = participantsWithMessageId.map(
|
||||
(participant) => participant.messageId,
|
||||
);
|
||||
|
||||
await this.messageService.deleteMessages(
|
||||
messagesToDelete,
|
||||
gmailMessageChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -56,7 +56,7 @@ export class PersonRepository {
|
||||
}[],
|
||||
workspaceId: string,
|
||||
transactionManager?: EntityManager,
|
||||
): Promise<void> {
|
||||
): Promise<ObjectRecord<PersonObjectMetadata>[]> {
|
||||
const dataSourceSchema =
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
@ -81,7 +81,7 @@ export class PersonRepository {
|
||||
});
|
||||
|
||||
return await this.workspaceDataSourceService.executeRawQuery(
|
||||
`INSERT INTO ${dataSourceSchema}.person (id, email, "nameFirstName", "nameLastName", "companyId", "position") VALUES ${valuesString}`,
|
||||
`INSERT INTO ${dataSourceSchema}.person (id, email, "nameFirstName", "nameLastName", "companyId", "position") VALUES ${valuesString} RETURNING *`,
|
||||
flattenedValues,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
|
||||
Reference in New Issue
Block a user