[messaging] Add messageParticipant matching once people emails are updated (#3887)

* poc nest event emitter

* add match message participant listener

* add workspacemember listener

* fix after review

* fix deep-equal
This commit is contained in:
Weiko
2024-02-08 17:42:33 +01:00
committed by GitHub
parent c53b593ea6
commit 99e2dd6899
19 changed files with 402 additions and 39 deletions

View File

@ -0,0 +1,47 @@
import { Injectable } from '@nestjs/common';
import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { MessageParticipantService } from 'src/workspace/messaging/message-participant/message-participant.service';
export type MatchMessageParticipantsJobData = {
workspaceId: string;
email: string;
personId?: string;
workspaceMemberId?: string;
};
@Injectable()
export class MatchMessageParticipantJob
implements MessageQueueJob<MatchMessageParticipantsJobData>
{
constructor(
private readonly messageParticipantService: MessageParticipantService,
) {}
async handle(data: MatchMessageParticipantsJobData): Promise<void> {
const { workspaceId, personId, workspaceMemberId, email } = data;
const messageParticipantsToUpdate =
await this.messageParticipantService.getByHandles([email], workspaceId);
const messageParticipantIdsToUpdate = messageParticipantsToUpdate.map(
(participant) => participant.id,
);
if (personId) {
await this.messageParticipantService.updateParticipantsPersonId(
messageParticipantIdsToUpdate,
personId,
workspaceId,
);
}
if (workspaceMemberId) {
await this.messageParticipantService.updateParticipantsWorkspaceMemberId(
messageParticipantIdsToUpdate,
workspaceMemberId,
workspaceId,
);
}
}
}

View File

@ -0,0 +1,63 @@
import { Inject, Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ObjectRecordCreateEvent } from 'src/integrations/event-emitter/types/object-record-create.event';
import { ObjectRecordUpdateEvent } from 'src/integrations/event-emitter/types/object-record-update.event';
import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperties } from 'src/integrations/event-emitter/utils/object-record-changed-properties.util';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import {
MatchMessageParticipantJob,
MatchMessageParticipantsJobData,
} from 'src/workspace/messaging/jobs/match-message-participant.job';
import { PersonObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/person.object-metadata';
@Injectable()
export class MessagingPersonListener {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {}
@OnEvent('person.created')
handleCreatedEvent(payload: ObjectRecordCreateEvent<PersonObjectMetadata>) {
if (payload.createdRecord.email === null) {
return;
}
this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.createdRecord.email,
personId: payload.createdRecord.id,
},
);
}
@OnEvent('person.updated')
handleUpdatedEvent(payload: ObjectRecordUpdateEvent<PersonObjectMetadata>) {
console.log(
objectRecordUpdateEventChangedProperties(
payload.previousRecord,
payload.updatedRecord,
),
);
if (
objectRecordUpdateEventChangedProperties(
payload.previousRecord,
payload.updatedRecord,
).includes('email')
) {
this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.updatedRecord.email,
personId: payload.updatedRecord.id,
},
);
}
}
}

View File

@ -0,0 +1,60 @@
import { Inject, Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ObjectRecordCreateEvent } from 'src/integrations/event-emitter/types/object-record-create.event';
import { ObjectRecordUpdateEvent } from 'src/integrations/event-emitter/types/object-record-update.event';
import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperties } from 'src/integrations/event-emitter/utils/object-record-changed-properties.util';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import {
MatchMessageParticipantJob,
MatchMessageParticipantsJobData,
} from 'src/workspace/messaging/jobs/match-message-participant.job';
import { WorkspaceMemberObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/workspace-member.object-metadata';
@Injectable()
export class MessagingWorkspaceMemberListener {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {}
@OnEvent('workspaceMember.created')
handleCreatedEvent(
payload: ObjectRecordCreateEvent<WorkspaceMemberObjectMetadata>,
) {
if (payload.createdRecord.userEmail === null) {
return;
}
this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.createdRecord.userEmail,
workspaceMemberId: payload.createdRecord.id,
},
);
}
@OnEvent('workspaceMember.updated')
handleUpdatedEvent(
payload: ObjectRecordUpdateEvent<WorkspaceMemberObjectMetadata>,
) {
if (
objectRecordUpdateEventChangedProperties(
payload.previousRecord,
payload.updatedRecord,
).includes('userEmail')
) {
this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.updatedRecord.userEmail,
workspaceMemberId: payload.updatedRecord.id,
},
);
}
}
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { MessageParticipantService } from 'src/workspace/messaging/message-participant/message-participant.service';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [MessageParticipantService],
exports: [MessageParticipantService],
})
export class MessageParticipantModule {}

View File

@ -0,0 +1,64 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageParticipantObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/message-participant.object-metadata';
import { ObjectRecord } from 'src/workspace/workspace-sync-metadata/types/object-record';
@Injectable()
export class MessageParticipantService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getByHandles(
handles: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageParticipantObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageParticipant" WHERE "handle" = ANY($1)`,
[handles],
workspaceId,
transactionManager,
);
}
public async updateParticipantsPersonId(
participantIds: string[],
personId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = $1 WHERE "id" = ANY($2)`,
[personId, participantIds],
workspaceId,
transactionManager,
);
}
public async updateParticipantsWorkspaceMemberId(
participantIds: string[],
workspaceMemberId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "workspaceMemberId" = $1 WHERE "id" = ANY($2)`,
[workspaceMemberId, participantIds],
workspaceId,
transactionManager,
);
}
}

View File

@ -1,18 +1,21 @@
import { Module } from '@nestjs/common';
import { EnvironmentModule } from 'src/integrations/environment/environment.module';
import { ConnectedAccountModule } from 'src/workspace/messaging/connected-account/connected-account.module';
import { MessageChannelMessageAssociationModule } from 'src/workspace/messaging/message-channel-message-association/message-channel-message-assocation.module';
import { MessageChannelModule } from 'src/workspace/messaging/message-channel/message-channel.module';
import { MessageThreadModule } from 'src/workspace/messaging/message-thread/message-thread.module';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
import { EnvironmentModule } from 'src/integrations/environment/environment.module';
import { MessagingPersonListener } from 'src/workspace/messaging/listeners/messaging-person.listener';
import { MessageModule } from 'src/workspace/messaging/message/message.module';
import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider';
import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service';
import { GmailFullSyncService } from 'src/workspace/messaging/services/gmail-full-sync.service';
import { GmailPartialSyncService } from 'src/workspace/messaging/services/gmail-partial-sync.service';
import { GmailRefreshAccessTokenService } from 'src/workspace/messaging/services/gmail-refresh-access-token.service';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
import { MessageParticipantModule } from 'src/workspace/messaging/message-participant/message-participant.module';
import { MessagingWorkspaceMemberListener } from 'src/workspace/messaging/listeners/messaging-workspace-member.listener';
@Module({
imports: [
@ -23,6 +26,7 @@ import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/wo
MessageChannelMessageAssociationModule,
MessageModule,
MessageThreadModule,
MessageParticipantModule,
],
providers: [
GmailFullSyncService,
@ -31,6 +35,8 @@ import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/wo
GmailRefreshAccessTokenService,
MessagingUtilsService,
GmailClientProvider,
MessagingPersonListener,
MessagingWorkspaceMemberListener,
],
exports: [
GmailPartialSyncService,
@ -39,4 +45,4 @@ import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/wo
MessagingUtilsService,
],
})
export class FetchWorkspaceMessagesModule {}
export class MessagingModule {}

View File

@ -4,7 +4,6 @@ import { gmail_v1 } from 'googleapis';
import { FetchMessagesByBatchesService } from 'src/workspace/messaging/services/fetch-messages-by-batches.service';
import { GmailClientProvider } from 'src/workspace/messaging/providers/gmail/gmail-client.provider';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import {
@ -12,8 +11,9 @@ import {
GmailFullSyncJobData,
} from 'src/workspace/messaging/jobs/gmail-full-sync.job';
import { ConnectedAccountService } from 'src/workspace/messaging/connected-account/connected-account.service';
import { MessageChannelService } from 'src/workspace/messaging/message-channel/message-channel.service';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageChannelService } from 'src/workspace/messaging/message-channel/message-channel.service';
import { MessagingUtilsService } from 'src/workspace/messaging/services/messaging-utils.service';
@Injectable()
export class GmailPartialSyncService {

View File

@ -3,8 +3,8 @@ import {
Inject,
Injectable,
InternalServerErrorException,
Logger,
} from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { IConnection } from 'src/utils/pagination/interfaces/connection.interface';
import {
@ -34,10 +34,12 @@ import {
CallWebhookJobsJobOperation,
} from 'src/workspace/workspace-query-runner/jobs/call-webhook-jobs.job';
import { parseResult } from 'src/workspace/workspace-query-runner/utils/parse-result.util';
import { ExceptionHandlerService } from 'src/integrations/exception-handler/exception-handler.service';
import { computeObjectTargetTable } from 'src/workspace/utils/compute-object-target-table.util';
import { ObjectRecordDeleteEvent } from 'src/integrations/event-emitter/types/object-record-delete.event';
import { ObjectRecordCreateEvent } from 'src/integrations/event-emitter/types/object-record-create.event';
import { ObjectRecordUpdateEvent } from 'src/integrations/event-emitter/types/object-record-update.event';
import { WorkspaceQueryRunnerOptions } from './interfaces/query-runner-optionts.interface';
import { WorkspaceQueryRunnerOptions } from './interfaces/query-runner-option.interface';
import {
PGGraphQLMutation,
PGGraphQLResult,
@ -45,14 +47,12 @@ import {
@Injectable()
export class WorkspaceQueryRunnerService {
private readonly logger = new Logger(WorkspaceQueryRunnerService.name);
constructor(
private readonly workspaceQueryBuilderFactory: WorkspaceQueryBuilderFactory,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@Inject(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
private readonly exceptionHandlerService: ExceptionHandlerService,
private readonly eventEmitter: EventEmitter2,
) {}
async findMany<
@ -136,6 +136,13 @@ export class WorkspaceQueryRunnerService {
options,
);
parsedResults.forEach((record) => {
this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.created`, {
workspaceId,
createdRecord: [this.removeNestedProperties(record)],
} satisfies ObjectRecordCreateEvent<any>);
});
return parsedResults;
}
@ -153,6 +160,12 @@ export class WorkspaceQueryRunnerService {
options: WorkspaceQueryRunnerOptions,
): Promise<Record | undefined> {
const { workspaceId, objectMetadataItem } = options;
const existingRecord = await this.findOne(
{ filter: { id: { eq: args.id } } } as FindOneResolverArgs,
options,
);
const query = await this.workspaceQueryBuilderFactory.updateOne(
args,
options,
@ -171,31 +184,11 @@ export class WorkspaceQueryRunnerService {
options,
);
return parsedResults?.[0];
}
async deleteOne<Record extends IRecord = IRecord>(
args: DeleteOneResolverArgs,
options: WorkspaceQueryRunnerOptions,
): Promise<Record | undefined> {
const { workspaceId, objectMetadataItem } = options;
const query = await this.workspaceQueryBuilderFactory.deleteOne(
args,
options,
);
const result = await this.execute(query, workspaceId);
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
objectMetadataItem,
'deleteFrom',
)?.records;
await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.delete,
options,
);
this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.updated`, {
workspaceId,
previousRecord: this.removeNestedProperties(existingRecord as Record),
updatedRecord: this.removeNestedProperties(parsedResults?.[0]),
} satisfies ObjectRecordUpdateEvent<any>);
return parsedResults?.[0];
}
@ -209,6 +202,7 @@ export class WorkspaceQueryRunnerService {
args,
options,
);
const result = await this.execute(query, workspaceId);
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
@ -252,9 +246,63 @@ export class WorkspaceQueryRunnerService {
options,
);
parsedResults.forEach((record) => {
this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.deleted`, {
workspaceId,
deletedRecord: [this.removeNestedProperties(record)],
} satisfies ObjectRecordDeleteEvent<any>);
});
return parsedResults;
}
async deleteOne<Record extends IRecord = IRecord>(
args: DeleteOneResolverArgs,
options: WorkspaceQueryRunnerOptions,
): Promise<Record | undefined> {
const { workspaceId, objectMetadataItem } = options;
const query = await this.workspaceQueryBuilderFactory.deleteOne(
args,
options,
);
const result = await this.execute(query, workspaceId);
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
objectMetadataItem,
'deleteFrom',
)?.records;
await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.delete,
options,
);
this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.deleted`, {
workspaceId,
deletedRecord: this.removeNestedProperties(parsedResults?.[0]),
} satisfies ObjectRecordDeleteEvent<any>);
return parsedResults?.[0];
}
private removeNestedProperties<Record extends IRecord = IRecord>(
record: Record,
) {
const sanitizedRecord = {};
for (const [key, value] of Object.entries(record)) {
if (value && typeof value === 'object' && value['edges']) {
continue;
}
sanitizedRecord[key] = value;
}
return sanitizedRecord;
}
async execute(
query: string,
workspaceId: string,

View File

@ -9,7 +9,7 @@ import {
import { Record as IRecord } from 'src/workspace/workspace-query-builder/interfaces/record.interface';
import { WorkspaceSchemaBuilderContext } from 'src/workspace/workspace-schema-builder/interfaces/workspace-schema-builder-context.interface';
import { WorkspaceResolverBuilderFactoryInterface } from 'src/workspace/workspace-resolver-builder/interfaces/workspace-resolver-builder-factory.interface';
import { WorkspaceQueryRunnerOptions } from 'src/workspace/workspace-query-runner/interfaces/query-runner-optionts.interface';
import { WorkspaceQueryRunnerOptions } from 'src/workspace/workspace-query-runner/interfaces/query-runner-option.interface';
import { WorkspaceQueryRunnerService } from 'src/workspace/workspace-query-runner/workspace-query-runner.service';
import { QuickActionsService } from 'src/core/quick-actions/quick-actions.service';

View File

@ -5,6 +5,7 @@ import { DataSourceModule } from 'src/metadata/data-source/data-source.module';
import { WorkspaceSchemaStorageModule } from 'src/workspace/workspace-schema-storage/workspace-schema-storage.module';
import { ObjectMetadataModule } from 'src/metadata/object-metadata/object-metadata.module';
import { ScalarsExplorerService } from 'src/workspace/services/scalars-explorer.service';
import { MessagingModule } from 'src/workspace/messaging/messaging.module';
import { WorkspaceFactory } from './workspace.factory';
@ -19,6 +20,7 @@ import { WorkspaceResolverBuilderModule } from './workspace-resolver-builder/wor
WorkspaceSchemaBuilderModule,
WorkspaceResolverBuilderModule,
WorkspaceSchemaStorageModule,
MessagingModule,
],
providers: [WorkspaceFactory, ScalarsExplorerService],
exports: [WorkspaceFactory],