Refactor backend folder structure (#4505)

* Refactor backend folder structure

Co-authored-by: Charles Bochet <charles@twenty.com>

* fix tests

* fix

* move yoga hooks

---------

Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
Weiko
2024-03-15 18:37:09 +01:00
committed by GitHub
parent afb9b3e375
commit 2c09096edd
523 changed files with 1386 additions and 1856 deletions

View File

@ -0,0 +1 @@
export const fetchAllWorkspacesMessagesCronPattern = '*/10 * * * *';

View File

@ -0,0 +1,82 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, In } from 'typeorm';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { ConnectedAccountService } from 'src/modules/connected-account/repositories/connected-account/connected-account.service';
import { Workspace } from 'src/engine/modules/workspace/workspace.entity';
import {
GmailPartialSyncJobData,
GmailPartialSyncJob,
} from 'src/modules/messaging/jobs/gmail-partial-sync.job';
import { DataSourceEntity } from 'src/engine-metadata/data-source/data-source.entity';
@Injectable()
export class FetchAllWorkspacesMessagesJob
implements MessageQueueJob<undefined>
{
private readonly logger = new Logger(FetchAllWorkspacesMessagesJob.name);
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectRepository(DataSourceEntity, 'metadata')
private readonly dataSourceRepository: Repository<DataSourceEntity>,
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly connectedAccountService: ConnectedAccountService,
) {}
async handle(): Promise<void> {
const workspaceIds = (
await this.workspaceRepository.find({
where: {
subscriptionStatus: 'active',
},
select: ['id'],
})
).map((workspace) => workspace.id);
const dataSources = await this.dataSourceRepository.find({
where: {
workspaceId: In(workspaceIds),
},
});
const workspaceIdsWithDataSources = new Set(
dataSources.map((dataSource) => dataSource.workspaceId),
);
for (const workspaceId of workspaceIdsWithDataSources) {
await this.fetchWorkspaceMessages(workspaceId);
}
}
private async fetchWorkspaceMessages(workspaceId: string): Promise<void> {
try {
const connectedAccounts =
await this.connectedAccountService.getAll(workspaceId);
for (const connectedAccount of connectedAccounts) {
await this.messageQueueService.add<GmailPartialSyncJobData>(
GmailPartialSyncJob.name,
{
workspaceId,
connectedAccountId: connectedAccount.id,
},
);
}
} catch (error) {
this.logger.error(
`Error while fetching workspace messages for workspace ${workspaceId}`,
);
this.logger.error(error);
return;
}
}
}

View File

@ -0,0 +1,27 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagEntity } from 'src/engine/modules/feature-flag/feature-flag.entity';
import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { DataSourceModule } from 'src/engine-metadata/data-source/data-source.module';
import { GmailFullSyncCommand } from 'src/modules/messaging/commands/gmail-full-sync.command';
import { GmailPartialSyncCommand } from 'src/modules/messaging/commands/gmail-partial-sync.command';
import { ConnectedAccountModule } from 'src/modules/connected-account/repositories/connected-account/connected-account.module';
import { StartFetchAllWorkspacesMessagesCronCommand } from 'src/modules/messaging/commands/start-fetch-all-workspaces-messages.cron.command';
import { StopFetchAllWorkspacesMessagesCronCommand } from 'src/modules/messaging/commands/stop-fetch-all-workspaces-messages.cron.command';
@Module({
imports: [
DataSourceModule,
TypeORMModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
ConnectedAccountModule,
],
providers: [
GmailFullSyncCommand,
GmailPartialSyncCommand,
StartFetchAllWorkspacesMessagesCronCommand,
StopFetchAllWorkspacesMessagesCronCommand,
],
})
export class FetchWorkspaceMessagesCommandsModule {}

View File

@ -0,0 +1,65 @@
import { Inject } from '@nestjs/common';
import { Command, CommandRunner, Option } from 'nest-commander';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
GmailFullSyncJobData,
GmailFullSyncJob,
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
import { ConnectedAccountService } from 'src/modules/connected-account/repositories/connected-account/connected-account.service';
interface GmailFullSyncOptions {
workspaceId: string;
}
@Command({
name: 'workspace:gmail-full-sync',
description: 'Fetch messages of all workspaceMembers in a workspace.',
})
export class GmailFullSyncCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly connectedAccountService: ConnectedAccountService,
) {
super();
}
async run(
_passedParam: string[],
options: GmailFullSyncOptions,
): Promise<void> {
await this.fetchWorkspaceMessages(options.workspaceId);
return;
}
@Option({
flags: '-w, --workspace-id [workspace_id]',
description: 'workspace id',
required: true,
})
parseWorkspaceId(value: string): string {
return value;
}
private async fetchWorkspaceMessages(workspaceId: string): Promise<void> {
const connectedAccounts =
await this.connectedAccountService.getAll(workspaceId);
for (const connectedAccount of connectedAccounts) {
await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
{
workspaceId,
connectedAccountId: connectedAccount.id,
},
{
retryLimit: 2,
},
);
}
}
}

View File

@ -0,0 +1,62 @@
import { Inject } from '@nestjs/common';
import { Command, CommandRunner, Option } from 'nest-commander';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
GmailPartialSyncJob,
GmailPartialSyncJobData,
} from 'src/modules/messaging/jobs/gmail-partial-sync.job';
import { ConnectedAccountService } from 'src/modules/connected-account/repositories/connected-account/connected-account.service';
interface GmailPartialSyncOptions {
workspaceId: string;
}
@Command({
name: 'workspace:gmail-partial-sync',
description: 'Fetch messages of all workspaceMembers in a workspace.',
})
export class GmailPartialSyncCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly connectedAccountService: ConnectedAccountService,
) {
super();
}
async run(
_passedParam: string[],
options: GmailPartialSyncOptions,
): Promise<void> {
await this.fetchWorkspaceMessages(options.workspaceId);
return;
}
@Option({
flags: '-w, --workspace-id [workspace_id]',
description: 'workspace id',
required: true,
})
parseWorkspaceId(value: string): string {
return value;
}
private async fetchWorkspaceMessages(workspaceId: string): Promise<void> {
const connectedAccounts =
await this.connectedAccountService.getAll(workspaceId);
for (const connectedAccount of connectedAccounts) {
await this.messageQueueService.add<GmailPartialSyncJobData>(
GmailPartialSyncJob.name,
{
workspaceId,
connectedAccountId: connectedAccount.id,
},
);
}
}
}

View File

@ -0,0 +1,29 @@
import { Inject } from '@nestjs/common';
import { Command, CommandRunner } from 'nest-commander';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { fetchAllWorkspacesMessagesCronPattern } from 'src/modules/messaging/commands/crons/fetch-all-workspaces-messages.cron.pattern';
import { FetchAllWorkspacesMessagesJob } from 'src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job';
@Command({
name: 'fetch-all-workspaces-messages:cron:start',
description: 'Starts a cron job to fetch all workspaces messages',
})
export class StartFetchAllWorkspacesMessagesCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}
async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
FetchAllWorkspacesMessagesJob.name,
undefined,
fetchAllWorkspacesMessagesCronPattern,
);
}
}

View File

@ -0,0 +1,28 @@
import { Inject } from '@nestjs/common';
import { Command, CommandRunner } from 'nest-commander';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { fetchAllWorkspacesMessagesCronPattern } from 'src/modules/messaging/commands/crons/fetch-all-workspaces-messages.cron.pattern';
import { FetchAllWorkspacesMessagesJob } from 'src/modules/messaging/commands/crons/fetch-all-workspaces-messages.job';
@Command({
name: 'fetch-all-workspaces-messages:cron:stop',
description: 'Stops the fetch all workspaces messages cron job',
})
export class StopFetchAllWorkspacesMessagesCronCommand extends CommandRunner {
constructor(
@Inject(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}
async run(): Promise<void> {
await this.messageQueueService.removeCron(
FetchAllWorkspacesMessagesJob.name,
fetchAllWorkspacesMessagesCronPattern,
);
}
}

View File

@ -0,0 +1,67 @@
import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { CreateCompanyAndContactService } from 'src/modules/connected-account/auto-companies-and-contacts-creation/create-company-and-contact/create-company-and-contact.service';
import { MessageChannelService } from 'src/modules/messaging/repositories/message-channel/message-channel.service';
import { MessageParticipantService } from 'src/modules/messaging/repositories/message-participant/message-participant.service';
export type CreateCompaniesAndContactsAfterSyncJobData = {
workspaceId: string;
messageChannelId: string;
};
@Injectable()
export class CreateCompaniesAndContactsAfterSyncJob
implements MessageQueueJob<CreateCompaniesAndContactsAfterSyncJobData>
{
private readonly logger = new Logger(
CreateCompaniesAndContactsAfterSyncJob.name,
);
constructor(
private readonly createCompaniesAndContactsService: CreateCompanyAndContactService,
private readonly messageChannelService: MessageChannelService,
private readonly messageParticipantService: MessageParticipantService,
) {}
async handle(
data: CreateCompaniesAndContactsAfterSyncJobData,
): Promise<void> {
this.logger.log(
`create contacts and companies after sync for workspace ${data.workspaceId} and messageChannel ${data.messageChannelId}`,
);
const { workspaceId, messageChannelId } = data;
const messageChannel = await this.messageChannelService.getByIds(
[messageChannelId],
workspaceId,
);
const { handle, isContactAutoCreationEnabled } = messageChannel[0];
if (!isContactAutoCreationEnabled) {
return;
}
const messageParticipantsWithoutPersonIdAndWorkspaceMemberId =
await this.messageParticipantService.getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberIdAndMessageOutgoing(
messageChannelId,
workspaceId,
);
await this.createCompaniesAndContactsService.createCompaniesAndContacts(
handle,
messageParticipantsWithoutPersonIdAndWorkspaceMemberId,
workspaceId,
);
await this.messageParticipantService.updateMessageParticipantsAfterPeopleCreation(
messageParticipantsWithoutPersonIdAndWorkspaceMemberId,
workspaceId,
);
this.logger.log(
`create contacts and companies after sync for workspace ${data.workspaceId} and messageChannel ${data.messageChannelId} done`,
);
}
}

View File

@ -0,0 +1,40 @@
import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { CalendarEventCleanerService } from 'src/modules/calendar/services/calendar-event-cleaner/calendar-event-cleaner.service';
export type DeleteConnectedAccountAssociatedCalendarDataJobData = {
workspaceId: string;
connectedAccountId: string;
};
@Injectable()
export class DeleteConnectedAccountAssociatedCalendarDataJob
implements
MessageQueueJob<DeleteConnectedAccountAssociatedCalendarDataJobData>
{
private readonly logger = new Logger(
DeleteConnectedAccountAssociatedCalendarDataJob.name,
);
constructor(
private readonly calendarEventCleanerService: CalendarEventCleanerService,
) {}
async handle(
data: DeleteConnectedAccountAssociatedCalendarDataJobData,
): Promise<void> {
this.logger.log(
`Deleting connected account ${data.connectedAccountId} associated calendar data in workspace ${data.workspaceId}`,
);
await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents(
data.workspaceId,
);
this.logger.log(
`Deleted connected account ${data.connectedAccountId} associated calendar data in workspace ${data.workspaceId}`,
);
}
}

View File

@ -0,0 +1,36 @@
import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { ThreadCleanerService } from 'src/modules/messaging/services/thread-cleaner/thread-cleaner.service';
export type DeleteConnectedAccountAssociatedMessagingDataJobData = {
workspaceId: string;
connectedAccountId: string;
};
@Injectable()
export class DeleteConnectedAccountAssociatedMessagingDataJob
implements
MessageQueueJob<DeleteConnectedAccountAssociatedMessagingDataJobData>
{
private readonly logger = new Logger(
DeleteConnectedAccountAssociatedMessagingDataJob.name,
);
constructor(private readonly threadCleanerService: ThreadCleanerService) {}
async handle(
data: DeleteConnectedAccountAssociatedMessagingDataJobData,
): Promise<void> {
this.logger.log(
`Deleting connected account ${data.connectedAccountId} associated messaging data in workspace ${data.workspaceId}`,
);
await this.threadCleanerService.cleanWorkspaceThreads(data.workspaceId);
this.logger.log(
`Deleted connected account ${data.connectedAccountId} associated messaging data in workspace ${data.workspaceId}`,
);
}
}

View File

@ -0,0 +1,50 @@
import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { GoogleAPIsRefreshAccessTokenService } from 'src/modules/connected-account/services/google-apis-refresh-access-token.service';
import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync.service';
export type GmailFullSyncJobData = {
workspaceId: string;
connectedAccountId: string;
nextPageToken?: string;
};
@Injectable()
export class GmailFullSyncJob implements MessageQueueJob<GmailFullSyncJobData> {
private readonly logger = new Logger(GmailFullSyncJob.name);
constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIsRefreshAccessTokenService,
private readonly gmailFullSyncService: GmailFullSyncService,
) {}
async handle(data: GmailFullSyncJobData): Promise<void> {
this.logger.log(
`gmail full-sync for workspace ${data.workspaceId} and account ${
data.connectedAccountId
} ${data.nextPageToken ? `and ${data.nextPageToken} pageToken` : ''}`,
);
try {
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
data.workspaceId,
data.connectedAccountId,
);
} catch (e) {
this.logger.error(
`Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`,
e,
);
return;
}
await this.gmailFullSyncService.fetchConnectedAccountThreads(
data.workspaceId,
data.connectedAccountId,
data.nextPageToken,
);
}
}

View File

@ -0,0 +1,48 @@
import { Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { GoogleAPIsRefreshAccessTokenService } from 'src/modules/connected-account/services/google-apis-refresh-access-token.service';
import { GmailPartialSyncService } from 'src/modules/messaging/services/gmail-partial-sync.service';
export type GmailPartialSyncJobData = {
workspaceId: string;
connectedAccountId: string;
};
@Injectable()
export class GmailPartialSyncJob
implements MessageQueueJob<GmailPartialSyncJobData>
{
private readonly logger = new Logger(GmailPartialSyncJob.name);
constructor(
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIsRefreshAccessTokenService,
private readonly gmailPartialSyncService: GmailPartialSyncService,
) {}
async handle(data: GmailPartialSyncJobData): Promise<void> {
this.logger.log(
`gmail partial-sync for workspace ${data.workspaceId} and account ${data.connectedAccountId}`,
);
try {
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
data.workspaceId,
data.connectedAccountId,
);
} catch (e) {
this.logger.error(
`Error refreshing access token for connected account ${data.connectedAccountId} in workspace ${data.workspaceId}`,
e,
);
return;
}
await this.gmailPartialSyncService.fetchConnectedAccountThreads(
data.workspaceId,
data.connectedAccountId,
);
}
}

View File

@ -0,0 +1,47 @@
import { Injectable } from '@nestjs/common';
import { MessageQueueJob } from 'src/engine/integrations/message-queue/interfaces/message-queue-job.interface';
import { MessageParticipantService } from 'src/modules/messaging/repositories/message-participant/message-participant.service';
export type MatchMessageParticipantsJobData = {
workspaceId: string;
email: string;
personId?: string;
workspaceMemberId?: string;
};
@Injectable()
export class MatchMessageParticipantJob
implements MessageQueueJob<MatchMessageParticipantsJobData>
{
constructor(
private readonly messageParticipantService: MessageParticipantService,
) {}
async handle(data: MatchMessageParticipantsJobData): Promise<void> {
const { workspaceId, personId, workspaceMemberId, email } = data;
const messageParticipantsToUpdate =
await this.messageParticipantService.getByHandles([email], workspaceId);
const messageParticipantIdsToUpdate = messageParticipantsToUpdate.map(
(participant) => participant.id,
);
if (personId) {
await this.messageParticipantService.updateParticipantsPersonId(
messageParticipantIdsToUpdate,
personId,
workspaceId,
);
}
if (workspaceMemberId) {
await this.messageParticipantService.updateParticipantsWorkspaceMemberId(
messageParticipantIdsToUpdate,
workspaceMemberId,
workspaceId,
);
}
}
}

View File

@ -0,0 +1,46 @@
import { Injectable, Inject } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
DeleteConnectedAccountAssociatedCalendarDataJobData,
DeleteConnectedAccountAssociatedCalendarDataJob,
} from 'src/modules/messaging/jobs/delete-connected-account-associated-calendar-data.job';
import {
DeleteConnectedAccountAssociatedMessagingDataJobData,
DeleteConnectedAccountAssociatedMessagingDataJob,
} from 'src/modules/messaging/jobs/delete-connected-account-associated-messaging-data.job';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
@Injectable()
export class MessagingConnectedAccountListener {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@Inject(MessageQueue.calendarQueue)
private readonly calendarQueueService: MessageQueueService,
) {}
@OnEvent('connectedAccount.deleted')
handleDeletedEvent(
payload: ObjectRecordDeleteEvent<ConnectedAccountObjectMetadata>,
) {
this.messageQueueService.add<DeleteConnectedAccountAssociatedMessagingDataJobData>(
DeleteConnectedAccountAssociatedMessagingDataJob.name,
{
workspaceId: payload.workspaceId,
connectedAccountId: payload.deletedRecord.id,
},
);
this.calendarQueueService.add<DeleteConnectedAccountAssociatedCalendarDataJobData>(
DeleteConnectedAccountAssociatedCalendarDataJob.name,
{
workspaceId: payload.workspaceId,
connectedAccountId: payload.deletedRecord.id,
},
);
}
}

View File

@ -0,0 +1,41 @@
import { Inject, Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event';
import { objectRecordChangedProperties } from 'src/engine/integrations/event-emitter/utils/object-record-changed-properties.util';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
CreateCompaniesAndContactsAfterSyncJobData,
CreateCompaniesAndContactsAfterSyncJob,
} from 'src/modules/messaging/jobs/create-companies-and-contacts-after-sync.job';
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
@Injectable()
export class MessagingMessageChannelListener {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {}
@OnEvent('messageChannel.updated')
handleUpdatedEvent(
payload: ObjectRecordUpdateEvent<MessageChannelObjectMetadata>,
) {
if (
objectRecordChangedProperties(
payload.previousRecord,
payload.updatedRecord,
).includes('isContactAutoCreationEnabled') &&
payload.updatedRecord.isContactAutoCreationEnabled
) {
this.messageQueueService.add<CreateCompaniesAndContactsAfterSyncJobData>(
CreateCompaniesAndContactsAfterSyncJob.name,
{
workspaceId: payload.workspaceId,
messageChannelId: payload.updatedRecord.id,
},
);
}
}
}

View File

@ -0,0 +1,60 @@
import { Inject, Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';
import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event';
import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperties } from 'src/engine/integrations/event-emitter/utils/object-record-changed-properties.util';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
MatchMessageParticipantJob,
MatchMessageParticipantsJobData,
} from 'src/modules/messaging/jobs/match-message-participant.job';
import { PersonObjectMetadata } from 'src/modules/person/standard-objects/person.object-metadata';
@Injectable()
export class MessagingPersonListener {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {}
@OnEvent('person.created')
async handleCreatedEvent(
payload: ObjectRecordCreateEvent<PersonObjectMetadata>,
) {
if (payload.createdRecord.email === null) {
return;
}
this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.createdRecord.email,
personId: payload.createdRecord.id,
},
);
}
@OnEvent('person.updated')
async handleUpdatedEvent(
payload: ObjectRecordUpdateEvent<PersonObjectMetadata>,
) {
if (
objectRecordUpdateEventChangedProperties(
payload.previousRecord,
payload.updatedRecord,
).includes('email')
) {
this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.updatedRecord.email,
personId: payload.updatedRecord.id,
},
);
}
}
}

View File

@ -0,0 +1,60 @@
import { Inject, Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event';
import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/types/object-record-update.event';
import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperties } from 'src/engine/integrations/event-emitter/utils/object-record-changed-properties.util';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
MatchMessageParticipantJob,
MatchMessageParticipantsJobData,
} from 'src/modules/messaging/jobs/match-message-participant.job';
import { WorkspaceMemberObjectMetadata } from 'src/modules/workspace-member/standard-objects/workspace-member.object-metadata';
@Injectable()
export class MessagingWorkspaceMemberListener {
constructor(
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {}
@OnEvent('workspaceMember.created')
async handleCreatedEvent(
payload: ObjectRecordCreateEvent<WorkspaceMemberObjectMetadata>,
) {
if (payload.createdRecord.userEmail === null) {
return;
}
await this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.createdRecord.userEmail,
workspaceMemberId: payload.createdRecord.id,
},
);
}
@OnEvent('workspaceMember.updated')
async handleUpdatedEvent(
payload: ObjectRecordUpdateEvent<WorkspaceMemberObjectMetadata>,
) {
if (
objectRecordUpdateEventChangedProperties(
payload.previousRecord,
payload.updatedRecord,
).includes('userEmail')
) {
await this.messageQueueService.add<MatchMessageParticipantsJobData>(
MatchMessageParticipantJob.name,
{
workspaceId: payload.workspaceId,
email: payload.updatedRecord.userEmail,
workspaceMemberId: payload.updatedRecord.id,
},
);
}
}
}

View File

@ -0,0 +1,76 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { HttpModule } from '@nestjs/axios';
import { ConnectedAccountModule } from 'src/modules/connected-account/repositories/connected-account/connected-account.module';
import { MessageChannelMessageAssociationModule } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-assocation.module';
import { MessageChannelModule } from 'src/modules/messaging/repositories/message-channel/message-channel.module';
import { MessageThreadModule } from 'src/modules/messaging/repositories/message-thread/message-thread.module';
import { EnvironmentModule } from 'src/engine/integrations/environment/environment.module';
import { MessagingPersonListener } from 'src/modules/messaging/listeners/messaging-person.listener';
import { MessageModule } from 'src/modules/messaging/repositories/message/message.module';
import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider';
import { CreateContactService } from 'src/modules/connected-account/auto-companies-and-contacts-creation/create-contact/create-contact.service';
import { CreateCompanyService } from 'src/modules/connected-account/auto-companies-and-contacts-creation/create-company/create-company.service';
import { FetchMessagesByBatchesService } from 'src/modules/messaging/services/fetch-messages-by-batches.service';
import { GmailFullSyncService } from 'src/modules/messaging/services/gmail-full-sync.service';
import { GmailPartialSyncService } from 'src/modules/messaging/services/gmail-partial-sync.service';
import { GoogleAPIsRefreshAccessTokenService } from 'src/modules/connected-account/services/google-apis-refresh-access-token.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { MessageParticipantModule } from 'src/modules/messaging/repositories/message-participant/message-participant.module';
import { MessagingWorkspaceMemberListener } from 'src/modules/messaging/listeners/messaging-workspace-member.listener';
import { MessagingMessageChannelListener } from 'src/modules/messaging/listeners/messaging-message-channel.listener';
import { MessageService } from 'src/modules/messaging/repositories/message/message.service';
import { WorkspaceMemberModule } from 'src/modules/workspace-member/repositories/workspace-member/workspace-member.module';
import { FeatureFlagEntity } from 'src/engine/modules/feature-flag/feature-flag.entity';
import { CreateCompaniesAndContactsModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/create-company-and-contact/create-company-and-contact.module';
import { CompanyModule } from 'src/modules/messaging/repositories/company/company.module';
import { PersonModule } from 'src/modules/person/repositories/person/person.module';
import { SaveMessagesAndCreateContactsService } from 'src/modules/messaging/services/save-messages-and-create-contacts.service';
import { MessagingConnectedAccountListener } from 'src/modules/messaging/listeners/messaging-connected-account.listener';
import { BlocklistModule } from 'src/modules/connected-account/repositories/blocklist/blocklist.module';
import { FetchByBatchesService } from 'src/modules/messaging/services/fetch-by-batch.service';
@Module({
imports: [
EnvironmentModule,
WorkspaceDataSourceModule,
ConnectedAccountModule,
MessageChannelModule,
MessageChannelMessageAssociationModule,
MessageModule,
MessageThreadModule,
MessageParticipantModule,
CreateCompaniesAndContactsModule,
WorkspaceMemberModule,
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
CompanyModule,
PersonModule,
BlocklistModule,
HttpModule.register({
baseURL: 'https://www.googleapis.com/batch/gmail/v1',
}),
],
providers: [
GmailFullSyncService,
GmailPartialSyncService,
FetchMessagesByBatchesService,
GoogleAPIsRefreshAccessTokenService,
GmailClientProvider,
CreateContactService,
CreateCompanyService,
MessagingPersonListener,
MessagingWorkspaceMemberListener,
MessagingMessageChannelListener,
MessageService,
SaveMessagesAndCreateContactsService,
MessagingConnectedAccountListener,
FetchByBatchesService,
],
exports: [
GmailPartialSyncService,
GmailFullSyncService,
GoogleAPIsRefreshAccessTokenService,
FetchByBatchesService,
],
})
export class MessagingModule {}

View File

@ -0,0 +1,94 @@
import {
BadRequestException,
ForbiddenException,
Injectable,
NotFoundException,
} from '@nestjs/common';
import { groupBy } from 'lodash';
import { WorkspacePreQueryHook } from 'src/engine/api/graphql/workspace-query-runner/workspace-pre-query-hook/interfaces/workspace-pre-query-hook.interface';
import { FindManyResolverArgs } from 'src/engine/api/graphql/workspace-resolver-builder/interfaces/workspace-resolvers-builder.interface';
import { MessageChannelMessageAssociationService } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
import { MessageChannelService } from 'src/modules/messaging/repositories/message-channel/message-channel.service';
import { ConnectedAccountService } from 'src/modules/connected-account/repositories/connected-account/connected-account.service';
import { WorkspaceMemberService } from 'src/modules/workspace-member/repositories/workspace-member/workspace-member.service';
@Injectable()
export class MessageFindManyPreQueryHook implements WorkspacePreQueryHook {
constructor(
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
private readonly messageChannelService: MessageChannelService,
private readonly connectedAccountService: ConnectedAccountService,
private readonly workspaceMemberService: WorkspaceMemberService,
) {}
async execute(
userId: string,
workspaceId: string,
payload: FindManyResolverArgs,
): Promise<void> {
if (!payload?.filter?.messageThreadId?.eq) {
throw new BadRequestException('messageThreadId filter is required');
}
const messageChannelMessageAssociations =
await this.messageChannelMessageAssociationService.getByMessageThreadId(
payload?.filter?.messageThreadId?.eq,
workspaceId,
);
if (messageChannelMessageAssociations.length === 0) {
throw new NotFoundException();
}
await this.canAccessMessageThread(
userId,
workspaceId,
messageChannelMessageAssociations,
);
}
private async canAccessMessageThread(
userId: string,
workspaceId: string,
messageChannelMessageAssociations: any[],
) {
const messageChannels = await this.messageChannelService.getByIds(
messageChannelMessageAssociations.map(
(association) => association.messageChannelId,
),
workspaceId,
);
const messageChannelsGroupByVisibility = groupBy(
messageChannels,
(channel) => channel.visibility,
);
if (messageChannelsGroupByVisibility.share_everything) {
return;
}
const currentWorkspaceMember =
await this.workspaceMemberService.getByIdOrFail(userId, workspaceId);
const messageChannelsConnectedAccounts =
await this.connectedAccountService.getByIds(
messageChannels.map((channel) => channel.connectedAccountId),
workspaceId,
);
const messageChannelsWorkspaceMemberIds =
messageChannelsConnectedAccounts.map(
(connectedAccount) => connectedAccount.accountOwnerId,
);
if (messageChannelsWorkspaceMemberIds.includes(currentWorkspaceMember.id)) {
return;
}
throw new ForbiddenException();
}
}

View File

@ -0,0 +1,16 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
import { BadRequestException, Injectable } from '@nestjs/common';
import { WorkspacePreQueryHook } from 'src/engine/api/graphql/workspace-query-runner/workspace-pre-query-hook/interfaces/workspace-pre-query-hook.interface';
import { FindOneResolverArgs } from 'src/engine/api/graphql/workspace-resolver-builder/interfaces/workspace-resolvers-builder.interface';
@Injectable()
export class MessageFindOnePreQueryHook implements WorkspacePreQueryHook {
async execute(
_userId: string,
_workspaceId: string,
_payload: FindOneResolverArgs,
): Promise<void> {
throw new BadRequestException('Method not implemented.');
}
}

View File

@ -0,0 +1,28 @@
import { Module } from '@nestjs/common';
import { MessageFindManyPreQueryHook } from 'src/modules/messaging/query-hooks/message/message-find-many.pre-query.hook';
import { MessageFindOnePreQueryHook } from 'src/modules/messaging/query-hooks/message/message-find-one.pre-query-hook';
import { ConnectedAccountModule } from 'src/modules/connected-account/repositories/connected-account/connected-account.module';
import { MessageChannelMessageAssociationModule } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-assocation.module';
import { MessageChannelModule } from 'src/modules/messaging/repositories/message-channel/message-channel.module';
import { WorkspaceMemberModule } from 'src/modules/workspace-member/repositories/workspace-member/workspace-member.module';
@Module({
imports: [
MessageChannelMessageAssociationModule,
MessageChannelModule,
ConnectedAccountModule,
WorkspaceMemberModule,
],
providers: [
{
provide: MessageFindOnePreQueryHook.name,
useClass: MessageFindOnePreQueryHook,
},
{
provide: MessageFindManyPreQueryHook.name,
useClass: MessageFindManyPreQueryHook,
},
],
})
export class MessagingQueryHookModule {}

View File

@ -0,0 +1,12 @@
import { Module } from '@nestjs/common';
import { CompanyService } from 'src/modules/messaging/repositories/company/company.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
// TODO: Move outside of the messaging module
@Module({
imports: [WorkspaceDataSourceModule],
providers: [CompanyService],
exports: [CompanyService],
})
export class CompanyModule {}

View File

@ -0,0 +1,61 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
export type CompanyToCreate = {
id: string;
domainName: string;
name?: string;
city?: string;
};
// TODO: Move outside of the messaging module
@Injectable()
export class CompanyService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getExistingCompaniesByDomainNames(
domainNames: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<{ id: string; domainName: string }[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const existingCompanies =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT id, "domainName" FROM ${dataSourceSchema}.company WHERE "domainName" = ANY($1)`,
[domainNames],
workspaceId,
transactionManager,
);
return existingCompanies;
}
public async createCompany(
workspaceId: string,
companyToCreate: CompanyToCreate,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}.company (id, "domainName", name, address)
VALUES ($1, $2, $3, $4)`,
[
companyToCreate.id,
companyToCreate.domainName,
companyToCreate.name ?? '',
companyToCreate.city ?? '',
],
workspaceId,
transactionManager,
);
}
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { MessageChannelMessageAssociationService } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [MessageChannelMessageAssociationService],
exports: [MessageChannelMessageAssociationService],
})
export class MessageChannelMessageAssociationModule {}

View File

@ -0,0 +1,195 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
@Injectable()
export class MessageChannelMessageAssociationService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getByMessageExternalIdsAndMessageChannelId(
messageExternalIds: string[],
messageChannelId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation"
WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, messageChannelId],
workspaceId,
transactionManager,
);
}
public async countByMessageExternalIdsAndMessageChannelId(
messageExternalIds: string[],
messageChannelId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<number> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const result = await this.workspaceDataSourceService.executeRawQuery(
`SELECT COUNT(*) FROM ${dataSourceSchema}."messageChannelMessageAssociation"
WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, messageChannelId],
workspaceId,
transactionManager,
);
return result[0]?.count;
}
public async deleteByMessageExternalIdsAndMessageChannelId(
messageExternalIds: string[],
messageChannelId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "messageExternalId" = ANY($1) AND "messageChannelId" = $2`,
[messageExternalIds, messageChannelId],
workspaceId,
transactionManager,
);
}
public async getByMessageChannelIds(
messageChannelIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation"
WHERE "messageChannelId" = ANY($1)`,
[messageChannelIds],
workspaceId,
transactionManager,
);
}
public async deleteByMessageChannelIds(
messageChannelIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
) {
if (messageChannelIds.length === 0) {
return;
}
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "messageChannelId" = ANY($1)`,
[messageChannelIds],
workspaceId,
transactionManager,
);
}
public async deleteByIds(
ids: string[],
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "id" = ANY($1)`,
[ids],
workspaceId,
transactionManager,
);
}
public async getByMessageThreadExternalIds(
messageThreadExternalIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation"
WHERE "messageThreadExternalId" = ANY($1)`,
[messageThreadExternalIds],
workspaceId,
transactionManager,
);
}
public async getFirstByMessageThreadExternalId(
messageThreadExternalId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata> | null> {
const existingMessageChannelMessageAssociations =
await this.getByMessageThreadExternalIds(
[messageThreadExternalId],
workspaceId,
transactionManager,
);
if (
!existingMessageChannelMessageAssociations ||
existingMessageChannelMessageAssociations.length === 0
) {
return null;
}
return existingMessageChannelMessageAssociations[0];
}
public async getByMessageIds(
messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation"
WHERE "messageId" = ANY($1)`,
[messageIds],
workspaceId,
transactionManager,
);
}
public async getByMessageThreadId(
messageThreadId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelMessageAssociationObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannelMessageAssociation"
WHERE "messageThreadId" = $1`,
[messageThreadId],
workspaceId,
transactionManager,
);
}
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { MessageChannelService } from 'src/modules/messaging/repositories/message-channel/message-channel.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [MessageChannelService],
exports: [MessageChannelService],
})
export class MessageChannelModule {}

View File

@ -0,0 +1,88 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
@Injectable()
export class MessageChannelService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getByConnectedAccountId(
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`,
[connectedAccountId],
workspaceId,
transactionManager,
);
}
public async getFirstByConnectedAccountIdOrFail(
connectedAccountId: string,
workspaceId: string,
): Promise<ObjectRecord<MessageChannelObjectMetadata>> {
const messageChannel = await this.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);
if (!messageChannel) {
throw new Error(
`Message channel for connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
);
}
return messageChannel;
}
public async getFirstByConnectedAccountId(
connectedAccountId: string,
workspaceId: string,
): Promise<ObjectRecord<MessageChannelObjectMetadata> | undefined> {
const messageChannels = await this.getByConnectedAccountId(
connectedAccountId,
workspaceId,
);
return messageChannels[0];
}
public async getIsContactAutoCreationEnabledByConnectedAccountIdOrFail(
connectedAccountId: string,
workspaceId: string,
): Promise<boolean> {
const messageChannel = await this.getFirstByConnectedAccountIdOrFail(
connectedAccountId,
workspaceId,
);
return messageChannel.isContactAutoCreationEnabled;
}
public async getByIds(
ids: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageChannelObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "id" = ANY($1)`,
[ids],
workspaceId,
transactionManager,
);
}
}

View File

@ -0,0 +1,12 @@
import { Module } from '@nestjs/common';
import { MessageParticipantService } from 'src/modules/messaging/repositories/message-participant/message-participant.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { PersonModule } from 'src/modules/person/repositories/person/person.module';
@Module({
imports: [WorkspaceDataSourceModule, PersonModule],
providers: [MessageParticipantService],
exports: [MessageParticipantService],
})
export class MessageParticipantModule {}

View File

@ -0,0 +1,239 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import {
ParticipantWithId,
ParticipantWithMessageId,
} from 'src/modules/messaging/types/gmail-message';
import { PersonService } from 'src/modules/person/repositories/person/person.service';
@Injectable()
export class MessageParticipantService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly personService: PersonService,
) {}
public async getByHandles(
handles: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageParticipantObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageParticipant" WHERE "handle" = ANY($1)`,
[handles],
workspaceId,
transactionManager,
);
}
public async updateParticipantsPersonId(
participantIds: string[],
personId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = $1 WHERE "id" = ANY($2)`,
[personId, participantIds],
workspaceId,
transactionManager,
);
}
public async updateParticipantsWorkspaceMemberId(
participantIds: string[],
workspaceMemberId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "workspaceMemberId" = $1 WHERE "id" = ANY($2)`,
[workspaceMemberId, participantIds],
workspaceId,
transactionManager,
);
}
public async getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberIdAndMessageOutgoing(
messageChannelId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ParticipantWithId[]> {
if (!messageChannelId || !workspaceId) {
return [];
}
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messageParticipants: ParticipantWithId[] =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT "messageParticipant".id,
"messageParticipant"."role",
"messageParticipant"."handle",
"messageParticipant"."displayName",
"messageParticipant"."personId",
"messageParticipant"."workspaceMemberId",
"messageParticipant"."messageId"
FROM ${dataSourceSchema}."messageParticipant" "messageParticipant"
LEFT JOIN ${dataSourceSchema}."message" ON "messageParticipant"."messageId" = ${dataSourceSchema}."message"."id"
LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" ON ${dataSourceSchema}."messageChannelMessageAssociation"."messageId" = ${dataSourceSchema}."message"."id"
WHERE ${dataSourceSchema}."messageChannelMessageAssociation"."messageChannelId" = $1
AND "messageParticipant"."personId" IS NULL
AND "messageParticipant"."workspaceMemberId" IS NULL
AND ${dataSourceSchema}."message"."direction" = 'outgoing'`,
[messageChannelId],
workspaceId,
transactionManager,
);
return messageParticipants;
}
public async getByHandlesWithoutPersonIdAndWorkspaceMemberId(
handles: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ParticipantWithId[]> {
if (!workspaceId) {
throw new Error('WorkspaceId is required');
}
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messageParticipants: ParticipantWithId[] =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT "messageParticipant".id,
"messageParticipant"."role",
"messageParticipant"."handle",
"messageParticipant"."displayName",
"messageParticipant"."personId",
"messageParticipant"."workspaceMemberId",
"messageParticipant"."messageId"
FROM ${dataSourceSchema}."messageParticipant" "messageParticipant"
WHERE "messageParticipant"."personId" IS NULL
AND "messageParticipant"."workspaceMemberId" IS NULL
AND "messageParticipant"."handle" = ANY($1)`,
[handles],
workspaceId,
transactionManager,
);
return messageParticipants;
}
public async saveMessageParticipants(
participants: ParticipantWithMessageId[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
if (!participants) return;
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const handles = participants.map((participant) => participant.handle);
const participantPersonIds =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT id, email FROM ${dataSourceSchema}."person" WHERE "email" = ANY($1)`,
[handles],
workspaceId,
transactionManager,
);
const participantWorkspaceMemberIds =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT "workspaceMember"."id", "connectedAccount"."handle" AS email FROM ${dataSourceSchema}."workspaceMember"
JOIN ${dataSourceSchema}."connectedAccount" ON ${dataSourceSchema}."workspaceMember"."id" = ${dataSourceSchema}."connectedAccount"."accountOwnerId"
WHERE ${dataSourceSchema}."connectedAccount"."handle" = ANY($1)`,
[handles],
workspaceId,
transactionManager,
);
const messageParticipantsToSave = participants.map((participant) => [
participant.messageId,
participant.role,
participant.handle,
participant.displayName,
participantPersonIds.find((e) => e.email === participant.handle)?.id,
participantWorkspaceMemberIds.find((e) => e.email === participant.handle)
?.id,
]);
const valuesString = messageParticipantsToSave
.map(
(_, index) =>
`($${index * 6 + 1}, $${index * 6 + 2}, $${index * 6 + 3}, $${
index * 6 + 4
}, $${index * 6 + 5}, $${index * 6 + 6})`,
)
.join(', ');
if (messageParticipantsToSave.length === 0) return;
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."messageParticipant" ("messageId", "role", "handle", "displayName", "personId", "workspaceMemberId") VALUES ${valuesString}`,
messageParticipantsToSave.flat(),
workspaceId,
transactionManager,
);
}
public async updateMessageParticipantsAfterPeopleCreation(
participants: ParticipantWithId[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
if (!participants) return;
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const handles = participants.map((participant) => participant.handle);
const participantPersonIds = await this.personService.getByEmails(
handles,
workspaceId,
transactionManager,
);
const messageParticipantsToUpdate = participants.map((participant) => [
participant.id,
participantPersonIds.find(
(e: { id: string; email: string }) => e.email === participant.handle,
)?.id,
]);
if (messageParticipantsToUpdate.length === 0) return;
const valuesString = messageParticipantsToUpdate
.map((_, index) => `($${index * 2 + 1}::uuid, $${index * 2 + 2}::uuid)`)
.join(', ');
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" AS "messageParticipant" SET "personId" = "data"."personId"
FROM (VALUES ${valuesString}) AS "data"("id", "personId")
WHERE "messageParticipant"."id" = "data"."id"`,
messageParticipantsToUpdate.flat(),
workspaceId,
transactionManager,
);
}
}

View File

@ -0,0 +1,17 @@
import { Module, forwardRef } from '@nestjs/common';
import { MessageChannelMessageAssociationModule } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-assocation.module';
import { MessageThreadService } from 'src/modules/messaging/repositories/message-thread/message-thread.service';
import { MessageModule } from 'src/modules/messaging/repositories/message/message.module';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
@Module({
imports: [
WorkspaceDataSourceModule,
MessageChannelMessageAssociationModule,
forwardRef(() => MessageModule),
],
providers: [MessageThreadService],
exports: [MessageThreadService],
})
export class MessageThreadModule {}

View File

@ -0,0 +1,105 @@
import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { v4 } from 'uuid';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { DataSourceEntity } from 'src/engine-metadata/data-source/data-source.entity';
import { MessageChannelMessageAssociationService } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
import { MessageService } from 'src/modules/messaging/repositories/message/message.service';
@Injectable()
export class MessageThreadService {
constructor(
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@Inject(forwardRef(() => MessageService))
private readonly messageService: MessageService,
) {}
public async getOrphanThreadIdsPaginated(
limit: number,
offset: number,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<string[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const orphanThreads = await this.workspaceDataSourceService.executeRawQuery(
`SELECT mt.id
FROM ${dataSourceSchema}."messageThread" mt
LEFT JOIN ${dataSourceSchema}."message" m ON mt.id = m."messageThreadId"
WHERE m."messageThreadId" IS NULL
LIMIT $1 OFFSET $2`,
[limit, offset],
workspaceId,
transactionManager,
);
return orphanThreads.map(({ id }) => id);
}
public async deleteByIds(
messageThreadIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."messageThread" WHERE id = ANY($1)`,
[messageThreadIds],
workspaceId,
transactionManager,
);
}
public async saveMessageThreadOrReturnExistingMessageThread(
headerMessageId: string,
messageThreadExternalId: string,
dataSourceMetadata: DataSourceEntity,
workspaceId: string,
manager: EntityManager,
) {
// Check if message thread already exists via threadExternalId
const existingMessageChannelMessageAssociationByMessageThreadExternalId =
await this.messageChannelMessageAssociationService.getFirstByMessageThreadExternalId(
messageThreadExternalId,
workspaceId,
manager,
);
const existingMessageThread =
existingMessageChannelMessageAssociationByMessageThreadExternalId?.messageThreadId;
if (existingMessageThread) {
return Promise.resolve(existingMessageThread);
}
// Check if message thread already exists via existing message headerMessageId
const existingMessageWithSameHeaderMessageId =
await this.messageService.getFirstOrNullByHeaderMessageId(
headerMessageId,
workspaceId,
manager,
);
if (existingMessageWithSameHeaderMessageId) {
return Promise.resolve(
existingMessageWithSameHeaderMessageId.messageThreadId,
);
}
// If message thread does not exist, create new message thread
const newMessageThreadId = v4();
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageThread" ("id") VALUES ($1)`,
[newMessageThreadId],
);
return Promise.resolve(newMessageThreadId);
}
}

View File

@ -0,0 +1,23 @@
import { Module, forwardRef } from '@nestjs/common';
import { MessageChannelModule } from 'src/modules/messaging/repositories/message-channel/message-channel.module';
import { MessageChannelMessageAssociationModule } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-assocation.module';
import { MessageParticipantModule } from 'src/modules/messaging/repositories/message-participant/message-participant.module';
import { MessageThreadModule } from 'src/modules/messaging/repositories/message-thread/message-thread.module';
import { MessageService } from 'src/modules/messaging/repositories/message/message.service';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { CreateCompaniesAndContactsModule } from 'src/modules/connected-account/auto-companies-and-contacts-creation/create-company-and-contact/create-company-and-contact.module';
@Module({
imports: [
WorkspaceDataSourceModule,
forwardRef(() => MessageThreadModule),
MessageParticipantModule,
MessageChannelMessageAssociationModule,
MessageChannelModule,
CreateCompaniesAndContactsModule,
],
providers: [MessageService],
exports: [MessageService],
})
export class MessageModule {}

View File

@ -0,0 +1,346 @@
import { Inject, Injectable, Logger, forwardRef } from '@nestjs/common';
import { DataSource, EntityManager } from 'typeorm';
import { v4 } from 'uuid';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { MessageObjectMetadata } from 'src/modules/messaging/standard-objects/message.object-metadata';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
import { DataSourceEntity } from 'src/engine-metadata/data-source/data-source.entity';
import { GmailMessage } from 'src/modules/messaging/types/gmail-message';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { MessageChannelMessageAssociationService } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
import { MessageThreadService } from 'src/modules/messaging/repositories/message-thread/message-thread.service';
import { MessageChannelService } from 'src/modules/messaging/repositories/message-channel/message-channel.service';
@Injectable()
export class MessageService {
private readonly logger = new Logger(MessageService.name);
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
@Inject(forwardRef(() => MessageThreadService))
private readonly messageThreadService: MessageThreadService,
private readonly messageChannelService: MessageChannelService,
) {}
public async getNonAssociatedMessageIdsPaginated(
limit: number,
offset: number,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<string[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const nonAssociatedMessages =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT m.id FROM ${dataSourceSchema}."message" m
LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" mcma
ON m.id = mcma."messageId"
WHERE mcma.id IS NULL
LIMIT $1 OFFSET $2`,
[limit, offset],
workspaceId,
transactionManager,
);
return nonAssociatedMessages.map(({ id }) => id);
}
public async getFirstOrNullByHeaderMessageId(
headerMessageId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata> | null> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messages = await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "headerMessageId" = $1 LIMIT 1`,
[headerMessageId],
workspaceId,
transactionManager,
);
if (!messages || messages.length === 0) {
return null;
}
return messages[0];
}
public async getByIds(
messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`,
[messageIds],
workspaceId,
transactionManager,
);
}
public async deleteByIds(
messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`,
[messageIds],
workspaceId,
transactionManager,
);
}
public async getByMessageThreadIds(
messageThreadIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<MessageObjectMetadata>[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "messageThreadId" = ANY($1)`,
[messageThreadIds],
workspaceId,
transactionManager,
);
}
public async saveMessages(
messages: GmailMessage[],
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
gmailMessageChannelId: string,
workspaceId: string,
): Promise<Map<string, string>> {
const messageExternalIdsAndIdsMap = new Map<string, string>();
try {
let keepImporting = true;
for (const message of messages) {
if (!keepImporting) {
break;
}
await workspaceDataSource?.transaction(
async (manager: EntityManager) => {
const gmailMessageChannel =
await this.messageChannelService.getByIds(
[gmailMessageChannelId],
workspaceId,
manager,
);
if (gmailMessageChannel.length === 0) {
this.logger.error(
`No message channel found for connected account ${connectedAccount.id} in workspace ${workspaceId} in saveMessages`,
);
keepImporting = false;
return;
}
const existingMessageChannelMessageAssociationsCount =
await this.messageChannelMessageAssociationService.countByMessageExternalIdsAndMessageChannelId(
[message.externalId],
gmailMessageChannelId,
workspaceId,
manager,
);
if (existingMessageChannelMessageAssociationsCount > 0) {
return;
}
// TODO: This does not handle all thread merging use cases and might create orphan threads.
const savedOrExistingMessageThreadId =
await this.messageThreadService.saveMessageThreadOrReturnExistingMessageThread(
message.headerMessageId,
message.messageThreadExternalId,
dataSourceMetadata,
workspaceId,
manager,
);
const savedOrExistingMessageId =
await this.saveMessageOrReturnExistingMessage(
message,
savedOrExistingMessageThreadId,
connectedAccount,
dataSourceMetadata,
workspaceId,
manager,
);
messageExternalIdsAndIdsMap.set(
message.externalId,
savedOrExistingMessageId,
);
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageChannelMessageAssociation" ("messageChannelId", "messageId", "messageExternalId", "messageThreadId", "messageThreadExternalId") VALUES ($1, $2, $3, $4, $5)`,
[
gmailMessageChannelId,
savedOrExistingMessageId,
message.externalId,
savedOrExistingMessageThreadId,
message.messageThreadExternalId,
],
);
},
);
}
} catch (error) {
throw new Error(
`Error saving connected account ${connectedAccount.id} messages to workspace ${workspaceId}: ${error.message}`,
);
}
return messageExternalIdsAndIdsMap;
}
private async saveMessageOrReturnExistingMessage(
message: GmailMessage,
messageThreadId: string,
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
dataSourceMetadata: DataSourceEntity,
workspaceId: string,
manager: EntityManager,
): Promise<string> {
const existingMessage = await this.getFirstOrNullByHeaderMessageId(
message.headerMessageId,
workspaceId,
);
const existingMessageId = existingMessage?.id;
if (existingMessageId) {
return Promise.resolve(existingMessageId);
}
const newMessageId = v4();
const messageDirection =
connectedAccount.handle === message.fromHandle ? 'outgoing' : 'incoming';
const receivedAt = new Date(parseInt(message.internalDate));
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."message" ("id", "headerMessageId", "subject", "receivedAt", "direction", "messageThreadId", "text") VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
newMessageId,
message.headerMessageId,
message.subject,
receivedAt,
messageDirection,
messageThreadId,
message.text,
],
);
return Promise.resolve(newMessageId);
}
public async deleteMessages(
messagesDeletedMessageExternalIds: string[],
gmailMessageChannelId: string,
workspaceId: string,
) {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
await workspaceDataSource?.transaction(async (manager: EntityManager) => {
const messageChannelMessageAssociationsToDelete =
await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId(
messagesDeletedMessageExternalIds,
gmailMessageChannelId,
workspaceId,
manager,
);
const messageChannelMessageAssociationIdsToDeleteIds =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.id,
);
await this.messageChannelMessageAssociationService.deleteByIds(
messageChannelMessageAssociationIdsToDeleteIds,
workspaceId,
manager,
);
const messageIdsFromMessageChannelMessageAssociationsToDelete =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.messageId,
);
const messageChannelMessageAssociationByMessageIds =
await this.messageChannelMessageAssociationService.getByMessageIds(
messageIdsFromMessageChannelMessageAssociationsToDelete,
workspaceId,
manager,
);
const messageIdsFromMessageChannelMessageAssociationByMessageIds =
messageChannelMessageAssociationByMessageIds.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageId,
);
const messageIdsToDelete =
messageIdsFromMessageChannelMessageAssociationsToDelete.filter(
(messageId) =>
!messageIdsFromMessageChannelMessageAssociationByMessageIds.includes(
messageId,
),
);
await this.deleteByIds(messageIdsToDelete, workspaceId, manager);
const messageThreadIdsFromMessageChannelMessageAssociationsToDelete =
messageChannelMessageAssociationsToDelete.map(
(messageChannelMessageAssociationToDelete) =>
messageChannelMessageAssociationToDelete.messageThreadId,
);
const messagesByThreadIds = await this.getByMessageThreadIds(
messageThreadIdsFromMessageChannelMessageAssociationsToDelete,
workspaceId,
manager,
);
const threadIdsToDelete =
messageThreadIdsFromMessageChannelMessageAssociationsToDelete.filter(
(threadId) =>
!messagesByThreadIds.find(
(message) => message.messageThreadId === threadId,
),
);
await this.messageThreadService.deleteByIds(
threadIdsToDelete,
workspaceId,
manager,
);
});
}
}

View File

@ -0,0 +1,128 @@
import { HttpService } from '@nestjs/axios';
import { Injectable } from '@nestjs/common';
import { AxiosResponse } from 'axios';
import { BatchQueries } from 'src/modules/messaging/types/batch-queries';
import { GmailMessageParsedResponse } from 'src/modules/messaging/types/gmail-message-parsed-response';
@Injectable()
export class FetchByBatchesService {
constructor(private readonly httpService: HttpService) {}
async fetchAllByBatches(
queries: BatchQueries,
accessToken: string,
boundary: string,
): Promise<AxiosResponse<any, any>[]> {
const batchLimit = 50;
let batchOffset = 0;
let batchResponses: AxiosResponse<any, any>[] = [];
while (batchOffset < queries.length) {
const batchResponse = await this.fetchBatch(
queries,
accessToken,
batchOffset,
batchLimit,
boundary,
);
batchResponses = batchResponses.concat(batchResponse);
batchOffset += batchLimit;
}
return batchResponses;
}
async fetchBatch(
queries: BatchQueries,
accessToken: string,
batchOffset: number,
batchLimit: number,
boundary: string,
): Promise<AxiosResponse<any, any>> {
const limitedQueries = queries.slice(batchOffset, batchOffset + batchLimit);
const response = await this.httpService.axiosRef.post(
'/',
this.createBatchBody(limitedQueries, boundary),
{
headers: {
'Content-Type': 'multipart/mixed; boundary=' + boundary,
Authorization: 'Bearer ' + accessToken,
},
},
);
return response;
}
createBatchBody(queries: BatchQueries, boundary: string): string {
let batchBody: string[] = [];
queries.forEach(function (call) {
const method = 'GET';
const uri = call.uri;
batchBody = batchBody.concat([
'--',
boundary,
'\r\n',
'Content-Type: application/http',
'\r\n\r\n',
method,
' ',
uri,
'\r\n\r\n',
]);
});
return batchBody.concat(['--', boundary, '--']).join('');
}
parseBatch(
responseCollection: AxiosResponse<any, any>,
): GmailMessageParsedResponse[] {
const responseItems: GmailMessageParsedResponse[] = [];
const boundary = this.getBatchSeparator(responseCollection);
const responseLines: string[] = responseCollection.data.split(
'--' + boundary,
);
responseLines.forEach(function (response) {
const startJson = response.indexOf('{');
const endJson = response.lastIndexOf('}');
if (startJson < 0 || endJson < 0) return;
const responseJson = response.substring(startJson, endJson + 1);
const item = JSON.parse(responseJson);
responseItems.push(item);
});
return responseItems;
}
getBatchSeparator(responseCollection: AxiosResponse<any, any>): string {
const headers = responseCollection.headers;
const contentType: string = headers['content-type'];
if (!contentType) return '';
const components = contentType.split('; ');
const boundary = components.find((item) => item.startsWith('boundary='));
return boundary?.replace('boundary=', '').trim() || '';
}
}

View File

@ -0,0 +1,157 @@
import { Injectable, Logger } from '@nestjs/common';
import { AxiosResponse } from 'axios';
import { simpleParser } from 'mailparser';
import planer from 'planer';
import { GmailMessage } from 'src/modules/messaging/types/gmail-message';
import { MessageQuery } from 'src/modules/messaging/types/message-or-thread-query';
import { GmailMessageParsedResponse } from 'src/modules/messaging/types/gmail-message-parsed-response';
import { FetchByBatchesService } from 'src/modules/messaging/services/fetch-by-batch.service';
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/services/utils/format-address-object-as-participants.util';
@Injectable()
export class FetchMessagesByBatchesService {
private readonly logger = new Logger(FetchMessagesByBatchesService.name);
constructor(private readonly fetchByBatchesService: FetchByBatchesService) {}
async fetchAllMessages(
queries: MessageQuery[],
accessToken: string,
jobName?: string,
workspaceId?: string,
connectedAccountId?: string,
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
let startTime = Date.now();
const batchResponses = await this.fetchByBatchesService.fetchAllByBatches(
queries,
accessToken,
'batch_gmail_messages',
);
let endTime = Date.now();
this.logger.log(
`${jobName} for workspace ${workspaceId} and account ${connectedAccountId} fetching ${
queries.length
} messages in ${endTime - startTime}ms`,
);
startTime = Date.now();
const formattedResponse =
await this.formatBatchResponsesAsGmailMessages(batchResponses);
endTime = Date.now();
this.logger.log(
`${jobName} for workspace ${workspaceId} and account ${connectedAccountId} formatting ${
queries.length
} messages in ${endTime - startTime}ms`,
);
return formattedResponse;
}
async formatBatchResponseAsGmailMessage(
responseCollection: AxiosResponse<any, any>,
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
const parsedResponses = this.fetchByBatchesService.parseBatch(
responseCollection,
) as GmailMessageParsedResponse[];
const errors: any = [];
const formattedResponse = Promise.all(
parsedResponses.map(async (message: GmailMessageParsedResponse) => {
if (message.error) {
errors.push(message.error);
return;
}
const { historyId, id, threadId, internalDate, raw } = message;
const body = atob(raw?.replace(/-/g, '+').replace(/_/g, '/'));
try {
const parsed = await simpleParser(body, {
skipHtmlToText: true,
skipImageLinks: true,
skipTextToHtml: true,
maxHtmlLengthToParse: 0,
});
const { subject, messageId, from, to, cc, bcc, text, attachments } =
parsed;
if (!from) throw new Error('From value is missing');
const participants = [
...formatAddressObjectAsParticipants(from, 'from'),
...formatAddressObjectAsParticipants(to, 'to'),
...formatAddressObjectAsParticipants(cc, 'cc'),
...formatAddressObjectAsParticipants(bcc, 'bcc'),
];
let textWithoutReplyQuotations = text;
if (text)
try {
textWithoutReplyQuotations = planer.extractFrom(
text,
'text/plain',
);
} catch (error) {
console.log(
'Error while trying to remove reply quotations',
error,
);
}
const messageFromGmail: GmailMessage = {
historyId,
externalId: id,
headerMessageId: messageId || '',
subject: subject || '',
messageThreadExternalId: threadId,
internalDate,
fromHandle: from.value[0].address || '',
fromDisplayName: from.value[0].name || '',
participants,
text: textWithoutReplyQuotations || '',
attachments,
};
return messageFromGmail;
} catch (error) {
console.log('Error', error);
errors.push(error);
}
}),
);
const filteredMessages = (await formattedResponse).filter(
(message) => message,
) as GmailMessage[];
return { messages: filteredMessages, errors };
}
async formatBatchResponsesAsGmailMessages(
batchResponses: AxiosResponse<any, any>[],
): Promise<{ messages: GmailMessage[]; errors: any[] }> {
const messagesAndErrors = await Promise.all(
batchResponses.map(async (response) => {
return this.formatBatchResponseAsGmailMessage(response);
}),
);
const messages = messagesAndErrors.map((item) => item.messages).flat();
const errors = messagesAndErrors.map((item) => item.errors).flat();
return { messages, errors };
}
}

View File

@ -0,0 +1,258 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { FetchMessagesByBatchesService } from 'src/modules/messaging/services/fetch-messages-by-batches.service';
import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
GmailFullSyncJobData,
GmailFullSyncJob,
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
import { ConnectedAccountService } from 'src/modules/connected-account/repositories/connected-account/connected-account.service';
import { MessageChannelService } from 'src/modules/messaging/repositories/message-channel/message-channel.service';
import { MessageChannelMessageAssociationService } from 'src/modules/messaging/repositories/message-channel-message-association/message-channel-message-association.service';
import { createQueriesFromMessageIds } from 'src/modules/messaging/utils/create-queries-from-message-ids.util';
import { gmailSearchFilterExcludeEmails } from 'src/modules/messaging/utils/gmail-search-filter.util';
import { BlocklistService } from 'src/modules/connected-account/repositories/blocklist/blocklist.service';
import { SaveMessagesAndCreateContactsService } from 'src/modules/messaging/services/save-messages-and-create-contacts.service';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/modules/feature-flag/feature-flag.entity';
@Injectable()
export class GmailFullSyncService {
private readonly logger = new Logger(GmailFullSyncService.name);
constructor(
private readonly gmailClientProvider: GmailClientProvider,
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly connectedAccountService: ConnectedAccountService,
private readonly messageChannelService: MessageChannelService,
private readonly messageChannelMessageAssociationService: MessageChannelMessageAssociationService,
private readonly blocklistService: BlocklistService,
private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
) {}
public async fetchConnectedAccountThreads(
workspaceId: string,
connectedAccountId: string,
nextPageToken?: string,
): Promise<void> {
const connectedAccount = await this.connectedAccountService.getById(
connectedAccountId,
workspaceId,
);
if (!connectedAccount) {
this.logger.error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId} during full-sync`,
);
return;
}
const accessToken = connectedAccount.accessToken;
const refreshToken = connectedAccount.refreshToken;
const workspaceMemberId = connectedAccount.accountOwnerId;
if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId} during full-sync`,
);
}
const gmailMessageChannel =
await this.messageChannelService.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);
if (!gmailMessageChannel) {
this.logger.error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId} during full-syn`,
);
return;
}
const gmailMessageChannelId = gmailMessageChannel.id;
const gmailClient =
await this.gmailClientProvider.getGmailClient(refreshToken);
const isBlocklistEnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId,
key: FeatureFlagKeys.IsBlocklistEnabled,
value: true,
});
const isBlocklistEnabled =
isBlocklistEnabledFeatureFlag && isBlocklistEnabledFeatureFlag.value;
const blocklist = isBlocklistEnabled
? await this.blocklistService.getByWorkspaceMemberId(
workspaceMemberId,
workspaceId,
)
: [];
const blocklistedEmails = blocklist.map((blocklist) => blocklist.handle);
let startTime = Date.now();
const messages = await gmailClient.users.messages.list({
userId: 'me',
maxResults: 500,
pageToken: nextPageToken,
q: gmailSearchFilterExcludeEmails(blocklistedEmails),
});
let endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} getting messages list in ${
endTime - startTime
}ms.`,
);
const messagesData = messages.data.messages;
const messageExternalIds = messagesData
? messagesData.map((message) => message.id || '')
: [];
if (!messageExternalIds || messageExternalIds?.length === 0) {
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`,
);
return;
}
startTime = Date.now();
const existingMessageChannelMessageAssociations =
await this.messageChannelMessageAssociationService.getByMessageExternalIdsAndMessageChannelId(
messageExternalIds,
gmailMessageChannelId,
workspaceId,
);
endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: getting existing message channel message associations in ${
endTime - startTime
}ms.`,
);
const existingMessageChannelMessageAssociationsExternalIds =
existingMessageChannelMessageAssociations.map(
(messageChannelMessageAssociation) =>
messageChannelMessageAssociation.messageExternalId,
);
const messagesToFetch = messageExternalIds.filter(
(messageExternalId) =>
!existingMessageChannelMessageAssociationsExternalIds.includes(
messageExternalId,
),
);
const messageQueries = createQueriesFromMessageIds(messagesToFetch);
startTime = Date.now();
const { messages: messagesToSave, errors } =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
'gmail full-sync',
workspaceId,
connectedAccountId,
);
endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: fetching all messages in ${
endTime - startTime
}ms.`,
);
if (messagesToSave.length > 0) {
await this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts(
messagesToSave,
connectedAccount,
workspaceId,
gmailMessageChannelId,
'gmail full-sync',
);
} else {
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`,
);
}
if (errors.length) {
throw new Error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during full-sync`,
);
}
const lastModifiedMessageId = messagesToFetch[0];
const historyId = messagesToSave.find(
(message) => message.externalId === lastModifiedMessageId,
)?.historyId;
if (!historyId) {
throw new Error(
`No historyId found for ${connectedAccountId} in workspace ${workspaceId} during full-sync`,
);
}
startTime = Date.now();
await this.connectedAccountService.updateLastSyncHistoryIdIfHigher(
historyId,
connectedAccount.id,
workspaceId,
);
endTime = Date.now();
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating last sync history id in ${
endTime - startTime
}ms.`,
);
this.logger.log(
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} ${
nextPageToken ? `and ${nextPageToken} pageToken` : ''
}done.`,
);
if (messages.data.nextPageToken) {
await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
{
workspaceId,
connectedAccountId,
nextPageToken: messages.data.nextPageToken,
},
{
retryLimit: 2,
},
);
}
}
}

View File

@ -0,0 +1,427 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { gmail_v1 } from 'googleapis';
import { Repository } from 'typeorm';
import { FetchMessagesByBatchesService } from 'src/modules/messaging/services/fetch-messages-by-batches.service';
import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import {
GmailFullSyncJob,
GmailFullSyncJobData,
} from 'src/modules/messaging/jobs/gmail-full-sync.job';
import { ConnectedAccountService } from 'src/modules/connected-account/repositories/connected-account/connected-account.service';
import { MessageChannelService } from 'src/modules/messaging/repositories/message-channel/message-channel.service';
import { MessageService } from 'src/modules/messaging/repositories/message/message.service';
import { createQueriesFromMessageIds } from 'src/modules/messaging/utils/create-queries-from-message-ids.util';
import { GmailMessage } from 'src/modules/messaging/types/gmail-message';
import { isPersonEmail } from 'src/modules/messaging/utils/is-person-email.util';
import { BlocklistService } from 'src/modules/connected-account/repositories/blocklist/blocklist.service';
import { SaveMessagesAndCreateContactsService } from 'src/modules/messaging/services/save-messages-and-create-contacts.service';
import {
FeatureFlagEntity,
FeatureFlagKeys,
} from 'src/engine/modules/feature-flag/feature-flag.entity';
@Injectable()
export class GmailPartialSyncService {
private readonly logger = new Logger(GmailPartialSyncService.name);
constructor(
private readonly gmailClientProvider: GmailClientProvider,
private readonly fetchMessagesByBatchesService: FetchMessagesByBatchesService,
@Inject(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly connectedAccountService: ConnectedAccountService,
private readonly messageChannelService: MessageChannelService,
private readonly messageService: MessageService,
private readonly blocklistService: BlocklistService,
private readonly saveMessagesAndCreateContactsService: SaveMessagesAndCreateContactsService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
) {}
public async fetchConnectedAccountThreads(
workspaceId: string,
connectedAccountId: string,
maxResults = 500,
): Promise<void> {
const connectedAccount = await this.connectedAccountService.getById(
connectedAccountId,
workspaceId,
);
if (!connectedAccount) {
this.logger.error(
`Connected account ${connectedAccountId} not found in workspace ${workspaceId} during partial-sync`,
);
return;
}
const lastSyncHistoryId = connectedAccount.lastSyncHistoryId;
if (!lastSyncHistoryId) {
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: no lastSyncHistoryId, falling back to full sync.`,
);
await this.fallbackToFullSync(workspaceId, connectedAccountId);
return;
}
const accessToken = connectedAccount.accessToken;
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
throw new Error(
`No refresh token found for connected account ${connectedAccountId} in workspace ${workspaceId} during partial-sync`,
);
}
let startTime = Date.now();
const { history, historyId, error } = await this.getHistoryFromGmail(
refreshToken,
lastSyncHistoryId,
maxResults,
);
let endTime = Date.now();
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} getting history in ${
endTime - startTime
}ms.`,
);
if (error && error.code === 404) {
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: invalid lastSyncHistoryId, falling back to full sync.`,
);
await this.connectedAccountService.deleteHistoryId(
connectedAccountId,
workspaceId,
);
await this.fallbackToFullSync(workspaceId, connectedAccountId);
return;
}
if (error && error.code === 429) {
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: Error 429: ${error.message}, partial sync will be retried later.`,
);
return;
}
if (error) {
throw new Error(
`Error getting history for ${connectedAccountId} in workspace ${workspaceId} during partial-sync:
${JSON.stringify(error)}`,
);
}
if (!historyId) {
throw new Error(
`No historyId found for ${connectedAccountId} in workspace ${workspaceId} during partial-sync`,
);
}
if (historyId === lastSyncHistoryId || !history?.length) {
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to update.`,
);
return;
}
const gmailMessageChannel =
await this.messageChannelService.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);
if (!gmailMessageChannel) {
this.logger.error(
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId} during partial-sync`,
);
return;
}
const gmailMessageChannelId = gmailMessageChannel.id;
const { messagesAdded, messagesDeleted } =
await this.getMessageIdsFromHistory(history);
const messageQueries = createQueriesFromMessageIds(messagesAdded);
const { messages, errors } =
await this.fetchMessagesByBatchesService.fetchAllMessages(
messageQueries,
accessToken,
'gmail partial-sync',
workspaceId,
connectedAccountId,
);
const isBlocklistEnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId,
key: FeatureFlagKeys.IsBlocklistEnabled,
value: true,
});
const isBlocklistEnabled =
isBlocklistEnabledFeatureFlag && isBlocklistEnabledFeatureFlag.value;
const blocklist = isBlocklistEnabled
? await this.blocklistService.getByWorkspaceMemberId(
connectedAccount.accountOwnerId,
workspaceId,
)
: [];
const blocklistedEmails = blocklist.map((blocklist) => blocklist.handle);
const messagesToSave = messages.filter(
(message) =>
!this.shouldSkipImport(
connectedAccount.handle,
message,
blocklistedEmails,
),
);
if (messagesToSave.length !== 0) {
await this.saveMessagesAndCreateContactsService.saveMessagesAndCreateContacts(
messagesToSave,
connectedAccount,
workspaceId,
gmailMessageChannelId,
'gmail partial-sync',
);
}
if (messagesDeleted.length !== 0) {
startTime = Date.now();
await this.messageService.deleteMessages(
messagesDeleted,
gmailMessageChannelId,
workspaceId,
);
endTime = Date.now();
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId}: deleting messages in ${
endTime - startTime
}ms.`,
);
}
if (errors.length) {
this.logger.error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during partial-sync: ${JSON.stringify(
errors,
null,
2,
)}`,
);
const errorsCanBeIgnored = errors.every((error) => error.code === 404);
const errorsShouldBeRetried = errors.some((error) => error.code === 429);
if (errorsShouldBeRetried) {
return;
}
if (!errorsCanBeIgnored) {
throw new Error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId} during partial-sync`,
);
}
}
startTime = Date.now();
await this.connectedAccountService.updateLastSyncHistoryId(
historyId,
connectedAccount.id,
workspaceId,
);
endTime = Date.now();
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} updating lastSyncHistoryId in ${
endTime - startTime
}ms.`,
);
this.logger.log(
`gmail partial-sync for workspace ${workspaceId} and account ${connectedAccountId} done.`,
);
}
private async getMessageIdsFromHistory(
history: gmail_v1.Schema$History[],
): Promise<{
messagesAdded: string[];
messagesDeleted: string[];
}> {
const { messagesAdded, messagesDeleted } = history.reduce(
(
acc: {
messagesAdded: string[];
messagesDeleted: string[];
},
history,
) => {
const messagesAdded = history.messagesAdded?.map(
(messageAdded) => messageAdded.message?.id || '',
);
const messagesDeleted = history.messagesDeleted?.map(
(messageDeleted) => messageDeleted.message?.id || '',
);
if (messagesAdded) acc.messagesAdded.push(...messagesAdded);
if (messagesDeleted) acc.messagesDeleted.push(...messagesDeleted);
return acc;
},
{ messagesAdded: [], messagesDeleted: [] },
);
const uniqueMessagesAdded = messagesAdded.filter(
(messageId) => !messagesDeleted.includes(messageId),
);
const uniqueMessagesDeleted = messagesDeleted.filter(
(messageId) => !messagesAdded.includes(messageId),
);
return {
messagesAdded: uniqueMessagesAdded,
messagesDeleted: uniqueMessagesDeleted,
};
}
private async getHistoryFromGmail(
refreshToken: string,
lastSyncHistoryId: string,
maxResults: number,
): Promise<{
history: gmail_v1.Schema$History[];
historyId?: string | null;
error?: {
code: number;
errors: {
domain: string;
reason: string;
message: string;
locationType?: string;
location?: string;
}[];
message: string;
};
}> {
const gmailClient =
await this.gmailClientProvider.getGmailClient(refreshToken);
const fullHistory: gmail_v1.Schema$History[] = [];
try {
const history = await gmailClient.users.history.list({
userId: 'me',
startHistoryId: lastSyncHistoryId,
historyTypes: ['messageAdded', 'messageDeleted'],
maxResults,
});
let nextPageToken = history?.data?.nextPageToken;
const historyId = history?.data?.historyId;
if (history?.data?.history) {
fullHistory.push(...history.data.history);
}
while (nextPageToken) {
const nextHistory = await gmailClient.users.history.list({
userId: 'me',
startHistoryId: lastSyncHistoryId,
historyTypes: ['messageAdded', 'messageDeleted'],
maxResults,
pageToken: nextPageToken,
});
nextPageToken = nextHistory?.data?.nextPageToken;
if (nextHistory?.data?.history) {
fullHistory.push(...nextHistory.data.history);
}
}
return { history: fullHistory, historyId };
} catch (error) {
const errorData = error?.response?.data?.error;
if (errorData) {
return { history: [], error: errorData };
}
throw error;
}
}
private async fallbackToFullSync(
workspaceId: string,
connectedAccountId: string,
) {
await this.messageQueueService.add<GmailFullSyncJobData>(
GmailFullSyncJob.name,
{ workspaceId, connectedAccountId },
{
retryLimit: 2,
},
);
}
private isHandleBlocked = (
selfHandle: string,
message: GmailMessage,
blocklistedEmails: string[],
): boolean => {
// If the message is received, check if the sender is in the blocklist
// If the message is sent, check if any of the recipients with role 'to' is in the blocklist
if (message.fromHandle === selfHandle) {
return message.participants.some(
(participant) =>
participant.role === 'to' &&
blocklistedEmails.includes(participant.handle),
);
}
return blocklistedEmails.includes(message.fromHandle);
};
private shouldSkipImport(
selfHandle: string,
message: GmailMessage,
blocklistedEmails: string[],
): boolean {
return (
!isPersonEmail(message.fromHandle) ||
this.isHandleBlocked(selfHandle, message, blocklistedEmails)
);
}
}

View File

@ -0,0 +1,40 @@
import { Injectable } from '@nestjs/common';
import { OAuth2Client } from 'google-auth-library';
import { gmail_v1, google } from 'googleapis';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
@Injectable()
export class GmailClientProvider {
constructor(private readonly environmentService: EnvironmentService) {}
public async getGmailClient(refreshToken: string): Promise<gmail_v1.Gmail> {
const oAuth2Client = await this.getOAuth2Client(refreshToken);
const gmailClient = google.gmail({
version: 'v1',
auth: oAuth2Client,
});
return gmailClient;
}
private async getOAuth2Client(refreshToken: string): Promise<OAuth2Client> {
const gmailClientId = this.environmentService.get('AUTH_GOOGLE_CLIENT_ID');
const gmailClientSecret = this.environmentService.get(
'AUTH_GOOGLE_CLIENT_SECRET',
);
const oAuth2Client = new google.auth.OAuth2(
gmailClientId,
gmailClientSecret,
);
oAuth2Client.setCredentials({
refresh_token: refreshToken,
});
return oAuth2Client;
}
}

View File

@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { EnvironmentModule } from 'src/engine/integrations/environment/environment.module';
import { GmailClientProvider } from 'src/modules/messaging/services/providers/gmail/gmail-client.provider';
@Module({
imports: [EnvironmentModule],
providers: [GmailClientProvider],
exports: [GmailClientProvider],
})
export class MessagingProvidersModule {}

View File

@ -0,0 +1,181 @@
import { Injectable, Logger } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { MessageChannelService } from 'src/modules/messaging/repositories/message-channel/message-channel.service';
import { MessageParticipantService } from 'src/modules/messaging/repositories/message-participant/message-participant.service';
import { MessageService } from 'src/modules/messaging/repositories/message/message.service';
import { CreateCompanyAndContactService } from 'src/modules/connected-account/auto-companies-and-contacts-creation/create-company-and-contact/create-company-and-contact.service';
import {
GmailMessage,
ParticipantWithMessageId,
} from 'src/modules/messaging/types/gmail-message';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
@Injectable()
export class SaveMessagesAndCreateContactsService {
private readonly logger = new Logger(
SaveMessagesAndCreateContactsService.name,
);
constructor(
private readonly messageService: MessageService,
private readonly messageChannelService: MessageChannelService,
private readonly createCompaniesAndContactsService: CreateCompanyAndContactService,
private readonly messageParticipantService: MessageParticipantService,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
async saveMessagesAndCreateContacts(
messagesToSave: GmailMessage[],
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
workspaceId: string,
gmailMessageChannelId: string,
jobName?: string,
) {
const { dataSource: workspaceDataSource, dataSourceMetadata } =
await this.workspaceDataSourceService.connectedToWorkspaceDataSourceAndReturnMetadata(
workspaceId,
);
let startTime = Date.now();
const messageExternalIdsAndIdsMap = await this.messageService.saveMessages(
messagesToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccount,
gmailMessageChannelId,
workspaceId,
);
let endTime = Date.now();
this.logger.log(
`${jobName} saving messages for workspace ${workspaceId} and account ${
connectedAccount.id
} in ${endTime - startTime}ms`,
);
const gmailMessageChannel =
await this.messageChannelService.getFirstByConnectedAccountId(
connectedAccount.id,
workspaceId,
);
if (!gmailMessageChannel) {
this.logger.error(
`No message channel found for connected account ${connectedAccount.id} in workspace ${workspaceId} in saveMessagesAndCreateContacts`,
);
return;
}
const isContactAutoCreationEnabled =
gmailMessageChannel.isContactAutoCreationEnabled;
const participantsWithMessageId: ParticipantWithMessageId[] =
messagesToSave.flatMap((message) => {
const messageId = messageExternalIdsAndIdsMap.get(message.externalId);
return messageId
? message.participants.map((participant) => ({
...participant,
messageId,
}))
: [];
});
const contactsToCreate = messagesToSave
.filter((message) => connectedAccount.handle === message.fromHandle)
.flatMap((message) => message.participants);
if (isContactAutoCreationEnabled) {
startTime = Date.now();
await workspaceDataSource?.transaction(
async (transactionManager: EntityManager) => {
await this.createCompaniesAndContactsService.createCompaniesAndContacts(
connectedAccount.handle,
contactsToCreate,
workspaceId,
transactionManager,
);
},
);
const handles = participantsWithMessageId.map(
(participant) => participant.handle,
);
const messageParticipantsWithoutPersonIdAndWorkspaceMemberId =
await this.messageParticipantService.getByHandlesWithoutPersonIdAndWorkspaceMemberId(
handles,
workspaceId,
);
await this.messageParticipantService.updateMessageParticipantsAfterPeopleCreation(
messageParticipantsWithoutPersonIdAndWorkspaceMemberId,
workspaceId,
);
endTime = Date.now();
this.logger.log(
`${jobName} creating companies and contacts for workspace ${workspaceId} and account ${
connectedAccount.id
} in ${endTime - startTime}ms`,
);
}
startTime = Date.now();
await this.tryToSaveMessageParticipantsOrDeleteMessagesIfError(
participantsWithMessageId,
gmailMessageChannelId,
workspaceId,
connectedAccount,
jobName,
);
endTime = Date.now();
this.logger.log(
`${jobName} saving message participants for workspace ${workspaceId} and account in ${
connectedAccount.id
} ${endTime - startTime}ms`,
);
}
private async tryToSaveMessageParticipantsOrDeleteMessagesIfError(
participantsWithMessageId: ParticipantWithMessageId[],
gmailMessageChannelId: string,
workspaceId: string,
connectedAccount: ObjectRecord<ConnectedAccountObjectMetadata>,
jobName?: string,
) {
try {
await this.messageParticipantService.saveMessageParticipants(
participantsWithMessageId,
workspaceId,
);
} catch (error) {
this.logger.error(
`${jobName} error saving message participants for workspace ${workspaceId} and account ${connectedAccount.id}`,
error,
);
const messagesToDelete = participantsWithMessageId.map(
(participant) => participant.messageId,
);
await this.messageService.deleteMessages(
messagesToDelete,
gmailMessageChannelId,
workspaceId,
);
}
}
}

View File

@ -0,0 +1,19 @@
import { Module } from '@nestjs/common';
import { TypeORMModule } from 'src/database/typeorm/typeorm.module';
import { DataSourceModule } from 'src/engine-metadata/data-source/data-source.module';
import { MessageThreadModule } from 'src/modules/messaging/repositories/message-thread/message-thread.module';
import { MessageModule } from 'src/modules/messaging/repositories/message/message.module';
import { ThreadCleanerService } from 'src/modules/messaging/services/thread-cleaner/thread-cleaner.service';
@Module({
imports: [
DataSourceModule,
TypeORMModule,
MessageThreadModule,
MessageModule,
],
providers: [ThreadCleanerService],
exports: [ThreadCleanerService],
})
export class ThreadCleanerModule {}

View File

@ -0,0 +1,37 @@
import { Injectable } from '@nestjs/common';
import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { DataSourceService } from 'src/engine-metadata/data-source/data-source.service';
import { MessageThreadService } from 'src/modules/messaging/repositories/message-thread/message-thread.service';
import { MessageService } from 'src/modules/messaging/repositories/message/message.service';
import { deleteUsingPagination } from 'src/modules/messaging/services/thread-cleaner/utils/delete-using-pagination.util';
@Injectable()
export class ThreadCleanerService {
constructor(
private readonly dataSourceService: DataSourceService,
private readonly typeORMService: TypeORMService,
private readonly messageService: MessageService,
private readonly messageThreadService: MessageThreadService,
) {}
public async cleanWorkspaceThreads(workspaceId: string) {
await deleteUsingPagination(
workspaceId,
500,
this.messageService.getNonAssociatedMessageIdsPaginated.bind(
this.messageService,
),
this.messageService.deleteByIds.bind(this.messageService),
);
await deleteUsingPagination(
workspaceId,
500,
this.messageThreadService.getOrphanThreadIdsPaginated.bind(
this.messageThreadService,
),
this.messageThreadService.deleteByIds.bind(this.messageThreadService),
);
}
}

View File

@ -0,0 +1,44 @@
import { deleteUsingPagination } from './delete-using-pagination.util';
describe('deleteUsingPagination', () => {
it('should delete items using pagination', async () => {
const workspaceId = 'workspace123';
const batchSize = 10;
const getterPaginated = jest
.fn()
.mockResolvedValueOnce(['id1', 'id2'])
.mockResolvedValueOnce([]);
const deleter = jest.fn();
const transactionManager = undefined;
await deleteUsingPagination(
workspaceId,
batchSize,
getterPaginated,
deleter,
transactionManager,
);
expect(getterPaginated).toHaveBeenNthCalledWith(
1,
batchSize,
0,
workspaceId,
transactionManager,
);
expect(getterPaginated).toHaveBeenNthCalledWith(
2,
batchSize,
batchSize,
workspaceId,
transactionManager,
);
expect(deleter).toHaveBeenNthCalledWith(
1,
['id1', 'id2'],
workspaceId,
transactionManager,
);
expect(deleter).toHaveBeenCalledTimes(1);
});
});

View File

@ -0,0 +1,38 @@
import { EntityManager } from 'typeorm';
export const deleteUsingPagination = async (
workspaceId: string,
batchSize: number,
getterPaginated: (
limit: number,
offset: number,
workspaceId: string,
transactionManager?: EntityManager,
) => Promise<string[]>,
deleter: (
ids: string[],
workspaceId: string,
transactionManager?: EntityManager,
) => Promise<void>,
transactionManager?: EntityManager,
) => {
let offset = 0;
let hasMoreData = true;
while (hasMoreData) {
const idsToDelete = await getterPaginated(
batchSize,
offset,
workspaceId,
transactionManager,
);
if (idsToDelete.length > 0) {
await deleter(idsToDelete, workspaceId, transactionManager);
} else {
hasMoreData = false;
}
offset += batchSize;
}
};

View File

@ -0,0 +1,37 @@
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/services/utils/format-address-object-as-participants.util';
describe('formatAddressObjectAsParticipants', () => {
it('should format address object as participants', () => {
const addressObject = {
value: [
{ name: 'John Doe', address: 'john.doe @example.com' },
{ name: 'Jane Smith', address: 'jane.smith@example.com ' },
],
html: '',
text: '',
};
const result = formatAddressObjectAsParticipants(addressObject, 'from');
expect(result).toEqual([
{
role: 'from',
handle: 'john.doe@example.com',
displayName: 'John Doe',
},
{
role: 'from',
handle: 'jane.smith@example.com',
displayName: 'Jane Smith',
},
]);
});
it('should return an empty array if address object is undefined', () => {
const addressObject = undefined;
const result = formatAddressObjectAsParticipants(addressObject, 'to');
expect(result).toEqual([]);
});
});

View File

@ -0,0 +1,37 @@
import { AddressObject } from 'mailparser';
import { Participant } from 'src/modules/messaging/types/gmail-message';
const formatAddressObjectAsArray = (
addressObject: AddressObject | AddressObject[],
): AddressObject[] => {
return Array.isArray(addressObject) ? addressObject : [addressObject];
};
const removeSpacesAndLowerCase = (email: string): string => {
return email.replace(/\s/g, '').toLowerCase();
};
export const formatAddressObjectAsParticipants = (
addressObject: AddressObject | AddressObject[] | undefined,
role: 'from' | 'to' | 'cc' | 'bcc',
): Participant[] => {
if (!addressObject) return [];
const addressObjects = formatAddressObjectAsArray(addressObject);
const participants = addressObjects.map((addressObject) => {
const emailAdresses = addressObject.value;
return emailAdresses.map((emailAddress) => {
const { name, address } = emailAddress;
return {
role,
handle: address ? removeSpacesAndLowerCase(address) : '',
displayName: name || '',
};
});
});
return participants.flat();
};

View File

@ -0,0 +1,77 @@
import { FieldMetadataType } from 'src/engine-metadata/field-metadata/field-metadata.entity';
import { messageChannelMessageAssociationStandardFieldIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { standardObjectIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { FieldMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/field-metadata.decorator';
import { IsNullable } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-nullable.decorator';
import { IsSystem } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-system.decorator';
import { ObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/object-metadata.decorator';
import { BaseObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/standard-objects/base.object-metadata';
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
import { MessageThreadObjectMetadata } from 'src/modules/messaging/standard-objects/message-thread.object-metadata';
import { MessageObjectMetadata } from 'src/modules/messaging/standard-objects/message.object-metadata';
@ObjectMetadata({
standardId: standardObjectIds.messageChannelMessageAssociation,
namePlural: 'messageChannelMessageAssociations',
labelSingular: 'Message Channel Message Association',
labelPlural: 'Message Channel Message Associations',
description: 'Message Synced with a Message Channel',
icon: 'IconMessage',
})
@IsSystem()
export class MessageChannelMessageAssociationObjectMetadata extends BaseObjectMetadata {
@FieldMetadata({
standardId: messageChannelMessageAssociationStandardFieldIds.messageChannel,
type: FieldMetadataType.RELATION,
label: 'Message Channel Id',
description: 'Message Channel Id',
icon: 'IconHash',
joinColumn: 'messageChannelId',
})
@IsNullable()
messageChannel: MessageChannelObjectMetadata;
@FieldMetadata({
standardId: messageChannelMessageAssociationStandardFieldIds.message,
type: FieldMetadataType.RELATION,
label: 'Message Id',
description: 'Message Id',
icon: 'IconHash',
joinColumn: 'messageId',
})
@IsNullable()
message: MessageObjectMetadata;
@FieldMetadata({
standardId:
messageChannelMessageAssociationStandardFieldIds.messageExternalId,
type: FieldMetadataType.TEXT,
label: 'Message External Id',
description: 'Message id from the messaging provider',
icon: 'IconHash',
})
@IsNullable()
messageExternalId: string;
@FieldMetadata({
standardId: messageChannelMessageAssociationStandardFieldIds.messageThread,
type: FieldMetadataType.RELATION,
label: 'Message Thread Id',
description: 'Message Thread Id',
icon: 'IconHash',
joinColumn: 'messageThreadId',
})
@IsNullable()
messageThread: MessageThreadObjectMetadata;
@FieldMetadata({
standardId:
messageChannelMessageAssociationStandardFieldIds.messageThreadExternalId,
type: FieldMetadataType.TEXT,
label: 'Thread External Id',
description: 'Thread id from the messaging provider',
icon: 'IconHash',
})
@IsNullable()
messageThreadExternalId: string;
}

View File

@ -0,0 +1,105 @@
import { FieldMetadataType } from 'src/engine-metadata/field-metadata/field-metadata.entity';
import {
RelationMetadataType,
RelationOnDeleteAction,
} from 'src/engine-metadata/relation-metadata/relation-metadata.entity';
import { messageChannelStandardFieldIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { standardObjectIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { FieldMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/field-metadata.decorator';
import { IsNullable } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-nullable.decorator';
import { IsSystem } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-system.decorator';
import { ObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/object-metadata.decorator';
import { RelationMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/relation-metadata.decorator';
import { BaseObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/standard-objects/base.object-metadata';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata';
@ObjectMetadata({
standardId: standardObjectIds.messageChannel,
namePlural: 'messageChannels',
labelSingular: 'Message Channel',
labelPlural: 'Message Channels',
description: 'Message Channels',
icon: 'IconMessage',
})
@IsSystem()
export class MessageChannelObjectMetadata extends BaseObjectMetadata {
@FieldMetadata({
standardId: messageChannelStandardFieldIds.visibility,
type: FieldMetadataType.SELECT,
label: 'Visibility',
description: 'Visibility',
icon: 'IconEyeglass',
options: [
{ value: 'metadata', label: 'Metadata', position: 0, color: 'green' },
{ value: 'subject', label: 'Subject', position: 1, color: 'blue' },
{
value: 'share_everything',
label: 'Share Everything',
position: 2,
color: 'orange',
},
],
defaultValue: { value: 'share_everything' },
})
visibility: string;
@FieldMetadata({
standardId: messageChannelStandardFieldIds.handle,
type: FieldMetadataType.TEXT,
label: 'Handle',
description: 'Handle',
icon: 'IconAt',
})
handle: string;
@FieldMetadata({
standardId: messageChannelStandardFieldIds.connectedAccount,
type: FieldMetadataType.RELATION,
label: 'Connected Account',
description: 'Connected Account',
icon: 'IconUserCircle',
joinColumn: 'connectedAccountId',
})
connectedAccount: ConnectedAccountObjectMetadata;
@FieldMetadata({
standardId: messageChannelStandardFieldIds.type,
type: FieldMetadataType.SELECT,
label: 'Type',
description: 'Channel Type',
icon: 'IconMessage',
options: [
{ value: 'email', label: 'Email', position: 0, color: 'green' },
{ value: 'sms', label: 'SMS', position: 1, color: 'blue' },
],
defaultValue: { value: 'email' },
})
type: string;
@FieldMetadata({
standardId: messageChannelStandardFieldIds.isContactAutoCreationEnabled,
type: FieldMetadataType.BOOLEAN,
label: 'Is Contact Auto Creation Enabled',
description: 'Is Contact Auto Creation Enabled',
icon: 'IconUserCircle',
defaultValue: { value: true },
})
isContactAutoCreationEnabled: boolean;
@FieldMetadata({
standardId:
messageChannelStandardFieldIds.messageChannelMessageAssociations,
type: FieldMetadataType.RELATION,
label: 'Message Channel Association',
description: 'Messages from the channel.',
icon: 'IconMessage',
})
@RelationMetadata({
type: RelationMetadataType.ONE_TO_MANY,
inverseSideTarget: () => MessageChannelMessageAssociationObjectMetadata,
onDelete: RelationOnDeleteAction.CASCADE,
})
@IsNullable()
messageChannelMessageAssociations: MessageChannelMessageAssociationObjectMetadata[];
}

View File

@ -0,0 +1,88 @@
import { FieldMetadataType } from 'src/engine-metadata/field-metadata/field-metadata.entity';
import { messageParticipantStandardFieldIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { standardObjectIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { FieldMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/field-metadata.decorator';
import { IsNullable } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-nullable.decorator';
import { IsSystem } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-system.decorator';
import { ObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/object-metadata.decorator';
import { BaseObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/standard-objects/base.object-metadata';
import { MessageObjectMetadata } from 'src/modules/messaging/standard-objects/message.object-metadata';
import { PersonObjectMetadata } from 'src/modules/person/standard-objects/person.object-metadata';
import { WorkspaceMemberObjectMetadata } from 'src/modules/workspace-member/standard-objects/workspace-member.object-metadata';
@ObjectMetadata({
standardId: standardObjectIds.messageParticipant,
namePlural: 'messageParticipants',
labelSingular: 'Message Participant',
labelPlural: 'Message Participants',
description: 'Message Participants',
icon: 'IconUserCircle',
})
@IsSystem()
export class MessageParticipantObjectMetadata extends BaseObjectMetadata {
@FieldMetadata({
standardId: messageParticipantStandardFieldIds.message,
type: FieldMetadataType.RELATION,
label: 'Message',
description: 'Message',
icon: 'IconMessage',
joinColumn: 'messageId',
})
message: MessageObjectMetadata;
@FieldMetadata({
standardId: messageParticipantStandardFieldIds.role,
type: FieldMetadataType.SELECT,
label: 'Role',
description: 'Role',
icon: 'IconAt',
options: [
{ value: 'from', label: 'From', position: 0, color: 'green' },
{ value: 'to', label: 'To', position: 1, color: 'blue' },
{ value: 'cc', label: 'Cc', position: 2, color: 'orange' },
{ value: 'bcc', label: 'Bcc', position: 3, color: 'red' },
],
defaultValue: { value: 'from' },
})
role: string;
@FieldMetadata({
standardId: messageParticipantStandardFieldIds.handle,
type: FieldMetadataType.TEXT,
label: 'Handle',
description: 'Handle',
icon: 'IconAt',
})
handle: string;
@FieldMetadata({
standardId: messageParticipantStandardFieldIds.displayName,
type: FieldMetadataType.TEXT,
label: 'Display Name',
description: 'Display Name',
icon: 'IconUser',
})
displayName: string;
@FieldMetadata({
standardId: messageParticipantStandardFieldIds.person,
type: FieldMetadataType.RELATION,
label: 'Person',
description: 'Person',
icon: 'IconUser',
joinColumn: 'personId',
})
@IsNullable()
person: PersonObjectMetadata;
@FieldMetadata({
standardId: messageParticipantStandardFieldIds.workspaceMember,
type: FieldMetadataType.RELATION,
label: 'Workspace Member',
description: 'Workspace member',
icon: 'IconCircleUser',
joinColumn: 'workspaceMemberId',
})
@IsNullable()
workspaceMember: WorkspaceMemberObjectMetadata;
}

View File

@ -0,0 +1,56 @@
import { FieldMetadataType } from 'src/engine-metadata/field-metadata/field-metadata.entity';
import {
RelationMetadataType,
RelationOnDeleteAction,
} from 'src/engine-metadata/relation-metadata/relation-metadata.entity';
import { messageThreadStandardFieldIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { standardObjectIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { FieldMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/field-metadata.decorator';
import { IsNullable } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-nullable.decorator';
import { IsSystem } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-system.decorator';
import { ObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/object-metadata.decorator';
import { RelationMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/relation-metadata.decorator';
import { BaseObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/standard-objects/base.object-metadata';
import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata';
import { MessageObjectMetadata } from 'src/modules/messaging/standard-objects/message.object-metadata';
@ObjectMetadata({
standardId: standardObjectIds.messageThread,
namePlural: 'messageThreads',
labelSingular: 'Message Thread',
labelPlural: 'Message Threads',
description: 'Message Thread',
icon: 'IconMessage',
})
@IsSystem()
export class MessageThreadObjectMetadata extends BaseObjectMetadata {
@FieldMetadata({
standardId: messageThreadStandardFieldIds.messages,
type: FieldMetadataType.RELATION,
label: 'Messages',
description: 'Messages from the thread.',
icon: 'IconMessage',
})
@RelationMetadata({
type: RelationMetadataType.ONE_TO_MANY,
inverseSideTarget: () => MessageObjectMetadata,
onDelete: RelationOnDeleteAction.CASCADE,
})
@IsNullable()
messages: MessageObjectMetadata[];
@FieldMetadata({
standardId: messageThreadStandardFieldIds.messageChannelMessageAssociations,
type: FieldMetadataType.RELATION,
label: 'Message Channel Association',
description: 'Messages from the channel.',
icon: 'IconMessage',
})
@RelationMetadata({
type: RelationMetadataType.ONE_TO_MANY,
inverseSideTarget: () => MessageChannelMessageAssociationObjectMetadata,
onDelete: RelationOnDeleteAction.RESTRICT,
})
@IsNullable()
messageChannelMessageAssociations: MessageChannelMessageAssociationObjectMetadata[];
}

View File

@ -0,0 +1,120 @@
import { FieldMetadataType } from 'src/engine-metadata/field-metadata/field-metadata.entity';
import {
RelationMetadataType,
RelationOnDeleteAction,
} from 'src/engine-metadata/relation-metadata/relation-metadata.entity';
import { messageStandardFieldIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids';
import { standardObjectIds } from 'src/engine/workspace-manager/workspace-sync-metadata/constants/standard-object-ids';
import { FieldMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/field-metadata.decorator';
import { IsNullable } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-nullable.decorator';
import { IsSystem } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/is-system.decorator';
import { ObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/object-metadata.decorator';
import { RelationMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/decorators/relation-metadata.decorator';
import { BaseObjectMetadata } from 'src/engine/workspace-manager/workspace-sync-metadata/standard-objects/base.object-metadata';
import { MessageChannelMessageAssociationObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel-message-association.object-metadata';
import { MessageParticipantObjectMetadata } from 'src/modules/messaging/standard-objects/message-participant.object-metadata';
import { MessageThreadObjectMetadata } from 'src/modules/messaging/standard-objects/message-thread.object-metadata';
@ObjectMetadata({
standardId: standardObjectIds.message,
namePlural: 'messages',
labelSingular: 'Message',
labelPlural: 'Messages',
description: 'Message',
icon: 'IconMessage',
})
@IsSystem()
export class MessageObjectMetadata extends BaseObjectMetadata {
@FieldMetadata({
standardId: messageStandardFieldIds.headerMessageId,
type: FieldMetadataType.TEXT,
label: 'Header message Id',
description: 'Message id from the message header',
icon: 'IconHash',
})
headerMessageId: string;
@FieldMetadata({
standardId: messageStandardFieldIds.messageThread,
type: FieldMetadataType.RELATION,
label: 'Message Thread Id',
description: 'Message Thread Id',
icon: 'IconHash',
joinColumn: 'messageThreadId',
})
@IsNullable()
messageThread: MessageThreadObjectMetadata;
@FieldMetadata({
standardId: messageStandardFieldIds.direction,
type: FieldMetadataType.SELECT,
label: 'Direction',
description: 'Message Direction',
icon: 'IconDirection',
options: [
{ value: 'incoming', label: 'Incoming', position: 0, color: 'green' },
{ value: 'outgoing', label: 'Outgoing', position: 1, color: 'blue' },
],
defaultValue: { value: 'incoming' },
})
direction: string;
@FieldMetadata({
standardId: messageStandardFieldIds.subject,
type: FieldMetadataType.TEXT,
label: 'Subject',
description: 'Subject',
icon: 'IconMessage',
})
subject: string;
@FieldMetadata({
standardId: messageStandardFieldIds.text,
type: FieldMetadataType.TEXT,
label: 'Text',
description: 'Text',
icon: 'IconMessage',
})
text: string;
@FieldMetadata({
standardId: messageStandardFieldIds.receivedAt,
type: FieldMetadataType.DATE_TIME,
label: 'Received At',
description: 'The date the message was received',
icon: 'IconCalendar',
})
@IsNullable()
receivedAt: string;
@FieldMetadata({
standardId: messageStandardFieldIds.messageParticipants,
type: FieldMetadataType.RELATION,
label: 'Message Participants',
description: 'Message Participants',
icon: 'IconUserCircle',
})
@RelationMetadata({
type: RelationMetadataType.ONE_TO_MANY,
inverseSideTarget: () => MessageParticipantObjectMetadata,
inverseSideFieldKey: 'message',
onDelete: RelationOnDeleteAction.CASCADE,
})
@IsNullable()
messageParticipants: MessageParticipantObjectMetadata[];
@FieldMetadata({
standardId: messageStandardFieldIds.messageChannelMessageAssociations,
type: FieldMetadataType.RELATION,
label: 'Message Channel Association',
description: 'Messages from the channel.',
icon: 'IconMessage',
})
@RelationMetadata({
type: RelationMetadataType.ONE_TO_MANY,
inverseSideTarget: () => MessageChannelMessageAssociationObjectMetadata,
onDelete: RelationOnDeleteAction.CASCADE,
})
@IsNullable()
messageChannelMessageAssociations: MessageChannelMessageAssociationObjectMetadata[];
}

View File

@ -0,0 +1,5 @@
type Query = {
uri: string;
};
export type BatchQueries = Query[];

View File

@ -0,0 +1,15 @@
export type GmailMessageParsedResponse = {
id: string;
threadId: string;
labelIds: string[];
snippet: string;
sizeEstimate: number;
raw: string;
historyId: string;
internalDate: string;
error?: {
code: number;
message: string;
status: string;
};
};

View File

@ -0,0 +1,27 @@
import { Attachment } from 'mailparser';
export type GmailMessage = {
historyId: string;
externalId: string;
headerMessageId: string;
subject: string;
messageThreadExternalId: string;
internalDate: string;
fromHandle: string;
fromDisplayName: string;
participants: Participant[];
text: string;
attachments: Attachment[];
};
export type Participant = {
role: 'from' | 'to' | 'cc' | 'bcc';
handle: string;
displayName: string;
};
export type ParticipantWithMessageId = Participant & { messageId: string };
export type ParticipantWithId = Participant & {
id: string;
};

View File

@ -0,0 +1,4 @@
export type GmailThread = {
id: string;
subject: string;
};

View File

@ -0,0 +1,3 @@
export type MessageQuery = {
uri: string;
};

View File

@ -0,0 +1,9 @@
import { MessageQuery } from 'src/modules/messaging/types/message-or-thread-query';
export const createQueriesFromMessageIds = (
messageExternalIds: string[],
): MessageQuery[] => {
return messageExternalIds.map((messageId) => ({
uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW',
}));
};

View File

@ -0,0 +1,27 @@
import { Participant } from 'src/modules/messaging/types/gmail-message';
import { getDomainNameFromHandle } from 'src/modules/messaging/utils/get-domain-name-from-handle.util';
import { WorkspaceMemberObjectMetadata } from 'src/modules/workspace-member/standard-objects/workspace-member.object-metadata';
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
export function filterOutParticipantsFromCompanyOrWorkspace(
participants: Participant[],
selfHandle: string,
workspaceMembers: ObjectRecord<WorkspaceMemberObjectMetadata>[],
): Participant[] {
const selfDomainName = getDomainNameFromHandle(selfHandle);
const workspaceMembersMap = workspaceMembers.reduce(
(map, workspaceMember) => {
map[workspaceMember.userEmail] = true;
return map;
},
new Map<string, boolean>(),
);
return participants.filter(
(participant) =>
getDomainNameFromHandle(participant.handle) !== selfDomainName &&
!workspaceMembersMap[participant.handle],
);
}

View File

@ -0,0 +1,9 @@
import psl from 'psl';
import { capitalize } from 'src/utils/capitalize';
export const getCompanyNameFromDomainName = (domainName: string) => {
const { sld } = psl.parse(domainName);
return sld ? capitalize(sld) : '';
};

View File

@ -0,0 +1,9 @@
import psl from 'psl';
export const getDomainNameFromHandle = (handle: string): string => {
const wholeDomain = handle?.split('@')?.[1] || '';
const { domain } = psl.parse(wholeDomain);
return domain || '';
};

View File

@ -0,0 +1,18 @@
import { capitalize } from 'src/utils/capitalize';
export function getFirstNameAndLastNameFromHandleAndDisplayName(
handle: string,
displayName: string,
): { firstName: string; lastName: string } {
const firstName = displayName.split(' ')[0];
const lastName = displayName.split(' ')[1];
const contactFullNameFromHandle = handle.split('@')[0];
const firstNameFromHandle = contactFullNameFromHandle.split('.')[0];
const lastNameFromHandle = contactFullNameFromHandle.split('.')[1];
return {
firstName: capitalize(firstName || firstNameFromHandle || ''),
lastName: capitalize(lastName || lastNameFromHandle || ''),
};
}

View File

@ -0,0 +1,19 @@
import { uniq, uniqBy } from 'lodash';
import { Participant } from 'src/modules/messaging/types/gmail-message';
export function getUniqueParticipantsAndHandles(participants: Participant[]): {
uniqueParticipants: Participant[];
uniqueHandles: string[];
} {
if (participants.length === 0) {
return { uniqueParticipants: [], uniqueHandles: [] };
}
const uniqueHandles = uniq(
participants.map((participant) => participant.handle),
);
const uniqueParticipants = uniqBy(participants, 'handle');
return { uniqueParticipants, uniqueHandles };
}

View File

@ -0,0 +1,14 @@
export const gmailSearchFilterNonPersonalEmails =
'noreply|no-reply|do_not_reply|no.reply|accounts@|info@|admin@|contact@|hello@|support@|sales@|feedback@|service@|help@|mailer-daemon|notifications|digest|auto|apps|assign|comments|customer-success|enterprise|esign|express|forum|gc@|learn|mailer|marketing|messages|news|notification|payments|receipts|recrutement|security|service|support|team';
export const gmailSearchFilterExcludeEmails = (emails: string[]): string => {
if (emails.length === 0) {
return `from:-(${gmailSearchFilterNonPersonalEmails}`;
}
return `(in:inbox from:-(${gmailSearchFilterNonPersonalEmails}|${emails.join(
'|',
)})|(in:sent to:-(${gmailSearchFilterNonPersonalEmails}|${emails.join(
'|',
)}))`;
};

View File

@ -0,0 +1,8 @@
export const isPersonEmail = (email: string | undefined): boolean => {
if (!email) return false;
const nonPersonalPattern =
/noreply|no-reply|do_not_reply|no\.reply|^(accounts@|info@|admin@|contact@|hello@|support@|sales@|feedback@|service@|help@|mailer-daemon|notifications?|digest|auto|apps|assign|comments|customer-success|enterprise|esign|express|forum|gc@|learn|mailer|marketing|messages|news|notification|payments|receipts|recrutement|security|service|support|team)/;
return !nonPersonalPattern.test(email);
};