4738 add listeners on person creation and workspacemember creation to update participants (#4854)

Closes #4738

- Added the logic to unmatch a participant when the email of a person or
a workspace member is updated
This commit is contained in:
bosiraphael
2024-04-08 17:03:42 +02:00
committed by GitHub
parent 5019b5febc
commit 038b2c0efc
12 changed files with 406 additions and 79 deletions

View File

@ -13,8 +13,8 @@ import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { EmailSenderJob } from 'src/engine/integrations/email/email-sender.job';
import { UserModule } from 'src/engine/core-modules/user/user.module';
import { EnvironmentModule } from 'src/engine/integrations/environment/environment.module';
import { MatchParticipantJob } from 'src/modules/connected-account/jobs/match-participant.job';
import { GmailPartialSyncCronJob } from 'src/modules/messaging/jobs/crons/gmail-partial-sync.cron.job';
import { MatchMessageParticipantJob } from 'src/modules/messaging/jobs/match-message-participant.job';
import { CreateCompaniesAndContactsAfterSyncJob } from 'src/modules/messaging/jobs/create-companies-and-contacts-after-sync.job';
import { AutoCompaniesAndContactsCreationModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/auto-companies-and-contacts-creation.module';
import { DataSeedDemoWorkspaceModule } from 'src/database/commands/data-seed-demo-workspace/data-seed-demo-workspace.module';
@ -38,7 +38,6 @@ import { GoogleAPIRefreshAccessTokenModule } from 'src/modules/connected-account
import { MessageParticipantModule } from 'src/modules/messaging/services/message-participant/message-participant.module';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata';
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
import { SaveEventToDbJob } from 'src/engine/api/graphql/workspace-query-runner/jobs/save-event-to-db.job';
import { CreateCompanyAndContactJob } from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job';
@ -50,6 +49,8 @@ import { FetchAllMessagesFromCacheCronJob } from 'src/modules/messaging/jobs/cro
import { GmailFullSyncV2Job } from 'src/modules/messaging/jobs/gmail-full-sync-v2.job';
import { GmailPartialSyncV2Job } from 'src/modules/messaging/jobs/gmail-partial-sync-v2.job';
import { GmailPartialSyncV2Module } from 'src/modules/messaging/services/gmail-partial-sync-v2/gmail-partial-sync-v2.module';
import { CalendarEventParticipantModule } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.module';
import { UnmatchParticipantJob } from 'src/modules/connected-account/jobs/unmatch-participant.job';
@Module({
imports: [
@ -75,13 +76,13 @@ import { GmailPartialSyncV2Module } from 'src/modules/messaging/services/gmail-p
MessageParticipantModule,
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountObjectMetadata,
MessageParticipantObjectMetadata,
MessageChannelObjectMetadata,
EventObjectMetadata,
]),
GmailFullSynV2Module,
GmailFetchMessageContentFromCacheModule,
GmailPartialSyncV2Module,
CalendarEventParticipantModule,
],
providers: [
{
@ -106,8 +107,12 @@ import { GmailPartialSyncV2Module } from 'src/modules/messaging/services/gmail-p
useClass: GmailPartialSyncCronJob,
},
{
provide: MatchMessageParticipantJob.name,
useClass: MatchMessageParticipantJob,
provide: MatchParticipantJob.name,
useClass: MatchParticipantJob,
},
{
provide: UnmatchParticipantJob.name,
useClass: UnmatchParticipantJob,
},
{
provide: CreateCompaniesAndContactsAfterSyncJob.name,

View File

@ -18,6 +18,88 @@ export class CalendarEventParticipantRepository {
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getByHandles(
handles: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<CalendarEventParticipantObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."calendarEventParticipant" WHERE "handle" = ANY($1)`,
[handles],
workspaceId,
transactionManager,
);
}
public async updateParticipantsPersonId(
participantIds: string[],
personId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."calendarEventParticipant" SET "personId" = $1 WHERE "id" = ANY($2)`,
[personId, participantIds],
workspaceId,
transactionManager,
);
}
public async updateParticipantsWorkspaceMemberId(
participantIds: string[],
workspaceMemberId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."calendarEventParticipant" SET "workspaceMemberId" = $1 WHERE "id" = ANY($2)`,
[workspaceMemberId, participantIds],
workspaceId,
transactionManager,
);
}
public async removePersonIdByHandle(
handle: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."calendarEventParticipant" SET "personId" = NULL WHERE "handle" = $1`,
[handle],
workspaceId,
transactionManager,
);
}
public async removeWorkspaceMemberIdByHandle(
handle: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."calendarEventParticipant" SET "workspaceMemberId" = NULL WHERE "handle" = $1`,
[handle],
workspaceId,
transactionManager,
);
}
public async getByIds(
calendarEventParticipantIds: string[],
workspaceId: string,

View File

@ -12,11 +12,15 @@ import {
CalendarEventParticipantWithId,
} from 'src/modules/calendar/types/calendar-event';
import { AddPersonIdAndWorkspaceMemberIdService } from 'src/modules/connected-account/services/add-person-id-and-workspace-member-id/add-person-id-and-workspace-member-id.service';
import { CalendarEventParticipantRepository } from 'src/modules/calendar/repositories/calendar-event-participant.repository';
import { CalendarEventParticipantObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-event-participant.object-metadata';
@Injectable()
export class CalendarEventParticipantService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@InjectObjectMetadataRepository(CalendarEventParticipantObjectMetadata)
private readonly calendarEventParticipantRepository: CalendarEventParticipantRepository,
@InjectObjectMetadataRepository(PersonObjectMetadata)
private readonly personRepository: PersonRepository,
private readonly addPersonIdAndWorkspaceMemberIdService: AddPersonIdAndWorkspaceMemberIdService,
@ -110,4 +114,55 @@ export class CalendarEventParticipantService {
transactionManager,
);
}
public async matchCalendarEventParticipants(
workspaceId: string,
email: string,
personId?: string,
workspaceMemberId?: string,
) {
const calendarEventParticipantsToUpdate =
await this.calendarEventParticipantRepository.getByHandles(
[email],
workspaceId,
);
const calendarEventParticipantIdsToUpdate =
calendarEventParticipantsToUpdate.map((participant) => participant.id);
if (personId) {
await this.calendarEventParticipantRepository.updateParticipantsPersonId(
calendarEventParticipantIdsToUpdate,
personId,
workspaceId,
);
}
if (workspaceMemberId) {
await this.calendarEventParticipantRepository.updateParticipantsWorkspaceMemberId(
calendarEventParticipantIdsToUpdate,
workspaceMemberId,
workspaceId,
);
}
}
public async unmatchCalendarEventParticipants(
workspaceId: string,
handle: string,
personId?: string,
workspaceMemberId?: string,
) {
if (personId) {
await this.calendarEventParticipantRepository.removePersonIdByHandle(
handle,
workspaceId,
);
}
if (workspaceMemberId) {
await this.calendarEventParticipantRepository.removeWorkspaceMemberIdByHandle(
handle,
workspaceId,
);
}
}
}

View File

@ -0,0 +1,60 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { CalendarEventParticipantService } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service';
import { MessageParticipantService } from 'src/modules/messaging/services/message-participant/message-participant.service';
export type MatchParticipantJobData = {
workspaceId: string;
email: string;
personId?: string;
workspaceMemberId?: string;
};
@Injectable()
export class MatchParticipantJob
implements MessageQueueJob<MatchParticipantJobData>
{
constructor(
private readonly messageParticipantService: MessageParticipantService,
private readonly calendarEventParticipantService: CalendarEventParticipantService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
) {}
async handle(data: MatchParticipantJobData): Promise<void> {
const { workspaceId, email, personId, workspaceMemberId } = data;
await this.messageParticipantService.matchMessageParticipants(
workspaceId,
email,
personId,
workspaceMemberId,
);
const isCalendarEnabled = await this.featureFlagRepository.findOneBy({
workspaceId,
key: FeatureFlagKeys.IsCalendarEnabled,
value: true,
});
if (!isCalendarEnabled || !isCalendarEnabled.value) {
return;
}
await this.calendarEventParticipantService.matchCalendarEventParticipants(
workspaceId,
email,
personId,
workspaceMemberId,
);
}
}

View File

@ -0,0 +1,60 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { CalendarEventParticipantService } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service';
import { MessageParticipantService } from 'src/modules/messaging/services/message-participant/message-participant.service';
export type UnmatchParticipantJobData = {
workspaceId: string;
email: string;
personId?: string;
workspaceMemberId?: string;
};
@Injectable()
export class UnmatchParticipantJob
implements MessageQueueJob<UnmatchParticipantJobData>
{
constructor(
private readonly messageParticipantService: MessageParticipantService,
private readonly calendarEventParticipantService: CalendarEventParticipantService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
) {}
async handle(data: UnmatchParticipantJobData): Promise<void> {
const { workspaceId, email, personId, workspaceMemberId } = data;
await this.messageParticipantService.unmatchMessageParticipants(
workspaceId,
email,
personId,
workspaceMemberId,
);
const isCalendarEnabled = await this.featureFlagRepository.findOneBy({
workspaceId,
key: FeatureFlagKeys.IsCalendarEnabled,
value: true,
});
if (!isCalendarEnabled || !isCalendarEnabled.value) {
return;
}
await this.calendarEventParticipantService.unmatchCalendarEventParticipants(
workspaceId,
email,
personId,
workspaceMemberId,
);
}
}

View File

@ -7,13 +7,17 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
MatchMessageParticipantJob,
MatchMessageParticipantsJobData,
} from 'src/modules/messaging/jobs/match-message-participant.job';
MatchParticipantJob,
MatchParticipantJobData,
} from 'src/modules/connected-account/jobs/match-participant.job';
import {
UnmatchParticipantJobData,
UnmatchParticipantJob,
} from 'src/modules/connected-account/jobs/unmatch-participant.job';
import { PersonObjectMetadata } from 'src/modules/person/standard-objects/person.object-metadata';
@Injectable()
export class MessagingPersonListener {
export class ParticipantPersonListener {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@ -27,8 +31,8 @@ export class MessagingPersonListener {
return;
}
this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
await this.messageQueueService.add<MatchParticipantJobData>(
MatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.details.after.email,
@ -47,8 +51,17 @@ export class MessagingPersonListener {
payload.details.after,
).includes('email')
) {
this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
await this.messageQueueService.add<UnmatchParticipantJobData>(
UnmatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.details.before.email,
personId: payload.recordId,
},
);
await this.messageQueueService.add<MatchParticipantJobData>(
MatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.details.after.email,

View File

@ -7,13 +7,17 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
MatchMessageParticipantJob,
MatchMessageParticipantsJobData,
} from 'src/modules/messaging/jobs/match-message-participant.job';
MatchParticipantJob,
MatchParticipantJobData,
} from 'src/modules/connected-account/jobs/match-participant.job';
import {
UnmatchParticipantJobData,
UnmatchParticipantJob,
} from 'src/modules/connected-account/jobs/unmatch-participant.job';
import { WorkspaceMemberObjectMetadata } from 'src/modules/workspace-member/standard-objects/workspace-member.object-metadata';
@Injectable()
export class MessagingWorkspaceMemberListener {
export class ParticipantWorkspaceMemberListener {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@ -27,8 +31,8 @@ export class MessagingWorkspaceMemberListener {
return;
}
await this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
await this.messageQueueService.add<MatchParticipantJobData>(
MatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.details.after.userEmail,
@ -47,8 +51,17 @@ export class MessagingWorkspaceMemberListener {
payload.details.after,
).includes('userEmail')
) {
await this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
await this.messageQueueService.add<UnmatchParticipantJobData>(
UnmatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.details.before.userEmail,
personId: payload.recordId,
},
);
await this.messageQueueService.add<MatchParticipantJobData>(
MatchParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.details.after.userEmail,

View File

@ -1,53 +0,0 @@
import { Injectable } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageParticipantRepository } from 'src/modules/messaging/repositories/message-participant.repository';
import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata';
export type MatchMessageParticipantsJobData = {
workspaceId: string;
email: string;
personId?: string;
workspaceMemberId?: string;
};
@Injectable()
export class MatchMessageParticipantJob
implements MessageQueueJob<MatchMessageParticipantsJobData>
{
constructor(
@InjectObjectMetadataRepository(MessageParticipantObjectMetadata)
private readonly messageParticipantRepository: MessageParticipantRepository,
) {}
async handle(data: MatchMessageParticipantsJobData): Promise<void> {
const { workspaceId, personId, workspaceMemberId, email } = data;
const messageParticipantsToUpdate =
await this.messageParticipantRepository.getByHandles(
[email],
workspaceId,
);
const messageParticipantIdsToUpdate = messageParticipantsToUpdate.map(
(participant) => participant.id,
);
if (personId) {
await this.messageParticipantRepository.updateParticipantsPersonId(
messageParticipantIdsToUpdate,
personId,
workspaceId,
);
}
if (workspaceMemberId) {
await this.messageParticipantRepository.updateParticipantsWorkspaceMemberId(
messageParticipantIdsToUpdate,
workspaceMemberId,
workspaceId,
);
}
}
}

View File

@ -1,8 +1,8 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { MessagingPersonListener } from 'src/modules/messaging/listeners/messaging-person.listener';
import { MessagingWorkspaceMemberListener } from 'src/modules/messaging/listeners/messaging-workspace-member.listener';
import { ParticipantPersonListener } from 'src/modules/connected-account/listeners/participant-person.listener';
import { ParticipantWorkspaceMemberListener } from 'src/modules/connected-account/listeners/participant-workspace-member.listener';
import { MessagingMessageChannelListener } from 'src/modules/messaging/listeners/messaging-message-channel.listener';
import { MessagingConnectedAccountListener } from 'src/modules/messaging/listeners/messaging-connected-account.listener';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
@ -10,8 +10,8 @@ import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-
@Module({
imports: [TypeOrmModule.forFeature([FeatureFlagEntity], 'core')],
providers: [
MessagingPersonListener,
MessagingWorkspaceMemberListener,
ParticipantPersonListener,
ParticipantWorkspaceMemberListener,
MessagingMessageChannelListener,
MessagingConnectedAccountListener,
],

View File

@ -63,6 +63,38 @@ export class MessageParticipantRepository {
);
}
public async removePersonIdByHandle(
handle: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = NULL WHERE "handle" = $1`,
[handle],
workspaceId,
transactionManager,
);
}
public async removeWorkspaceMemberIdByHandle(
handle: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "workspaceMemberId" = NULL WHERE "handle" = $1`,
[handle],
workspaceId,
transactionManager,
);
}
public async getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberIdAndMessageOutgoing(
messageChannelId: string,
workspaceId: string,

View File

@ -4,12 +4,16 @@ import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repos
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { AddPersonIdAndWorkspaceMemberIdModule } from 'src/modules/connected-account/services/add-person-id-and-workspace-member-id/add-person-id-and-workspace-member-id.module';
import { MessageParticipantService } from 'src/modules/messaging/services/message-participant/message-participant.service';
import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata';
import { PersonObjectMetadata } from 'src/modules/person/standard-objects/person.object-metadata';
@Module({
imports: [
WorkspaceDataSourceModule,
ObjectMetadataRepositoryModule.forFeature([PersonObjectMetadata]),
ObjectMetadataRepositoryModule.forFeature([
PersonObjectMetadata,
MessageParticipantObjectMetadata,
]),
AddPersonIdAndWorkspaceMemberIdModule,
],
providers: [MessageParticipantService],

View File

@ -12,11 +12,15 @@ import { PersonObjectMetadata } from 'src/modules/person/standard-objects/person
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { getFlattenedValuesAndValuesStringForBatchRawQuery } from 'src/modules/calendar/utils/getFlattenedValuesAndValuesStringForBatchRawQuery.util';
import { AddPersonIdAndWorkspaceMemberIdService } from 'src/modules/connected-account/services/add-person-id-and-workspace-member-id/add-person-id-and-workspace-member-id.service';
import { MessageParticipantRepository } from 'src/modules/messaging/repositories/message-participant.repository';
import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata';
@Injectable()
export class MessageParticipantService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@InjectObjectMetadataRepository(MessageParticipantObjectMetadata)
private readonly messageParticipantRepository: MessageParticipantRepository,
@InjectObjectMetadataRepository(PersonObjectMetadata)
private readonly personRepository: PersonRepository,
private readonly addPersonIdAndWorkspaceMemberIdService: AddPersonIdAndWorkspaceMemberIdService,
@ -107,4 +111,56 @@ export class MessageParticipantService {
transactionManager,
);
}
public async matchMessageParticipants(
workspaceId: string,
email: string,
personId?: string,
workspaceMemberId?: string,
) {
const messageParticipantsToUpdate =
await this.messageParticipantRepository.getByHandles(
[email],
workspaceId,
);
const messageParticipantIdsToUpdate = messageParticipantsToUpdate.map(
(participant) => participant.id,
);
if (personId) {
await this.messageParticipantRepository.updateParticipantsPersonId(
messageParticipantIdsToUpdate,
personId,
workspaceId,
);
}
if (workspaceMemberId) {
await this.messageParticipantRepository.updateParticipantsWorkspaceMemberId(
messageParticipantIdsToUpdate,
workspaceMemberId,
workspaceId,
);
}
}
public async unmatchMessageParticipants(
workspaceId: string,
handle: string,
personId?: string,
workspaceMemberId?: string,
) {
if (personId) {
await this.messageParticipantRepository.removePersonIdByHandle(
handle,
workspaceId,
);
}
if (workspaceMemberId) {
await this.messageParticipantRepository.removeWorkspaceMemberIdByHandle(
handle,
workspaceId,
);
}
}
}