Refactor calendar to use new sync statuses and stages (#6141)

- Refactor calendar modules and some messaging modules to better
organize them by business rules and decouple them
- Work toward a common architecture for the different calendar providers
by introducing interfaces for the drivers
- Modify cron job to use the new sync statuses and stages
This commit is contained in:
bosiraphael
2024-07-08 17:01:06 +02:00
committed by GitHub
parent 1c3ea9b106
commit f458322303
69 changed files with 1300 additions and 884 deletions

View File

@ -6,14 +6,12 @@ import { AnalyticsModule } from 'src/engine/core-modules/analytics/analytics.mod
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { AddPersonIdAndWorkspaceMemberIdModule } from 'src/modules/calendar-messaging-participant/services/add-person-id-and-workspace-member-id/add-person-id-and-workspace-member-id.module';
import { AddPersonIdAndWorkspaceMemberIdService } from 'src/modules/calendar-messaging-participant-manager/services/add-person-id-and-workspace-member-id/add-person-id-and-workspace-member-id.service';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessagingErrorHandlingService } from 'src/modules/messaging/common/services/messaging-error-handling.service';
import { MessagingFetchByBatchesService } from 'src/modules/messaging/common/services/messaging-fetch-by-batch.service';
import { MessagingMessageParticipantService } from 'src/modules/messaging/common/services/messaging-message-participant.service';
import { MessagingMessageThreadService } from 'src/modules/messaging/common/services/messaging-message-thread.service';
import { MessagingMessageService } from 'src/modules/messaging/common/services/messaging-message.service';
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service';
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity';
import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity';
@ -34,26 +32,22 @@ import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/perso
MessageThreadWorkspaceEntity,
]),
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
AddPersonIdAndWorkspaceMemberIdModule,
],
providers: [
MessagingMessageService,
MessagingMessageThreadService,
MessagingSaveMessagesAndEnqueueContactCreationService,
MessagingErrorHandlingService,
MessagingTelemetryService,
MessagingChannelSyncStatusService,
MessagingMessageParticipantService,
MessagingFetchByBatchesService,
AddPersonIdAndWorkspaceMemberIdService,
],
exports: [
MessagingMessageService,
MessagingMessageThreadService,
MessagingSaveMessagesAndEnqueueContactCreationService,
MessagingErrorHandlingService,
MessagingTelemetryService,
MessagingChannelSyncStatusService,
MessagingMessageParticipantService,
MessagingFetchByBatchesService,
],
})

View File

@ -61,8 +61,6 @@ export class MessagingChannelSyncStatusService {
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);
// TODO: remove nextPageToken from cache
await this.messageChannelRepository.resetSyncCursor(
messageChannelId,
workspaceId,

View File

@ -1,183 +0,0 @@
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { EntityManager } from 'typeorm';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { PersonRepository } from 'src/modules/person/repositories/person.repository';
import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { getFlattenedValuesAndValuesStringForBatchRawQuery } from 'src/modules/calendar/calendar-event-import-manager/utils/get-flattened-values-and-values-string-for-batch-raw-query.util';
import { AddPersonIdAndWorkspaceMemberIdService } from 'src/modules/calendar-messaging-participant/services/add-person-id-and-workspace-member-id/add-person-id-and-workspace-member-id.service';
import { MessageParticipantRepository } from 'src/modules/messaging/common/repositories/message-participant.repository';
import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity';
import { ParticipantWithMessageId } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
// Todo: this is not the right place for this file. The code needs to be refactored in term of business modules with a precise scope.
// Putting it here to avoid circular dependencies for now.
@Injectable()
export class MessagingMessageParticipantService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@InjectObjectMetadataRepository(MessageParticipantWorkspaceEntity)
private readonly messageParticipantRepository: MessageParticipantRepository,
@InjectObjectMetadataRepository(PersonWorkspaceEntity)
private readonly personRepository: PersonRepository,
private readonly addPersonIdAndWorkspaceMemberIdService: AddPersonIdAndWorkspaceMemberIdService,
private readonly eventEmitter: EventEmitter2,
) {}
public async updateMessageParticipantsAfterPeopleCreation(
createdPeople: PersonWorkspaceEntity[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageParticipantWorkspaceEntity[]> {
const participants = await this.messageParticipantRepository.getByHandles(
createdPeople.map((person) => person.email),
workspaceId,
transactionManager,
);
if (!participants) return [];
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const handles = participants.map((participant) => participant.handle);
const participantPersonIds = await this.personRepository.getByEmails(
handles,
workspaceId,
transactionManager,
);
const messageParticipantsToUpdate = participants.map((participant) => ({
id: participant.id,
personId: participantPersonIds.find(
(e: { id: string; email: string }) => e.email === participant.handle,
)?.id,
}));
if (messageParticipantsToUpdate.length === 0) return [];
const { flattenedValues, valuesString } =
getFlattenedValuesAndValuesStringForBatchRawQuery(
messageParticipantsToUpdate,
{
id: 'uuid',
personId: 'uuid',
},
);
return (
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" AS "messageParticipant" SET "personId" = "data"."personId"
FROM (VALUES ${valuesString}) AS "data"("id", "personId")
WHERE "messageParticipant"."id" = "data"."id"
RETURNING *`,
flattenedValues,
workspaceId,
transactionManager,
)
).flat();
}
public async saveMessageParticipants(
participants: ParticipantWithMessageId[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageParticipantWorkspaceEntity[]> {
if (!participants) return [];
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messageParticipantsToSave =
await this.addPersonIdAndWorkspaceMemberIdService.addPersonIdAndWorkspaceMemberId(
participants,
workspaceId,
transactionManager,
);
const { flattenedValues, valuesString } =
getFlattenedValuesAndValuesStringForBatchRawQuery(
messageParticipantsToSave,
{
messageId: 'uuid',
role: `${dataSourceSchema}."messageParticipant_role_enum"`,
handle: 'text',
displayName: 'text',
personId: 'uuid',
workspaceMemberId: 'uuid',
},
);
if (messageParticipantsToSave.length === 0) return [];
return await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."messageParticipant" ("messageId", "role", "handle", "displayName", "personId", "workspaceMemberId") VALUES ${valuesString} RETURNING *`,
flattenedValues,
workspaceId,
transactionManager,
);
}
public async matchMessageParticipants(
workspaceId: string,
email: string,
personId?: string,
workspaceMemberId?: string,
) {
const messageParticipantsToUpdate =
await this.messageParticipantRepository.getByHandles(
[email],
workspaceId,
);
const messageParticipantIdsToUpdate = messageParticipantsToUpdate.map(
(participant) => participant.id,
);
if (personId) {
const updatedMessageParticipants =
await this.messageParticipantRepository.updateParticipantsPersonIdAndReturn(
messageParticipantIdsToUpdate,
personId,
workspaceId,
);
this.eventEmitter.emit(`messageParticipant.matched`, {
workspaceId,
workspaceMemberId: null,
messageParticipants: updatedMessageParticipants,
});
}
if (workspaceMemberId) {
await this.messageParticipantRepository.updateParticipantsWorkspaceMemberId(
messageParticipantIdsToUpdate,
workspaceMemberId,
workspaceId,
);
}
}
public async unmatchMessageParticipants(
workspaceId: string,
handle: string,
personId?: string,
workspaceMemberId?: string,
) {
if (personId) {
await this.messageParticipantRepository.removePersonIdByHandle(
handle,
workspaceId,
);
}
if (workspaceMemberId) {
await this.messageParticipantRepository.removeWorkspaceMemberIdByHandle(
handle,
workspaceId,
);
}
}
}

View File

@ -1,148 +0,0 @@
import { Injectable } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { InjectRepository } from '@nestjs/typeorm';
import { EntityManager, Repository } from 'typeorm';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import {
CreateCompanyAndContactJob,
CreateCompanyAndContactJobData,
} from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessagingMessageParticipantService } from 'src/modules/messaging/common/services/messaging-message-participant.service';
import { MessagingMessageService } from 'src/modules/messaging/common/services/messaging-message.service';
import {
MessageChannelContactAutoCreationPolicy,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity';
import {
GmailMessage,
Participant,
ParticipantWithMessageId,
} from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
import { isGroupEmail } from 'src/utils/is-group-email';
import { isWorkEmail } from 'src/utils/is-work-email';
@Injectable()
export class MessagingSaveMessagesAndEnqueueContactCreationService {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@InjectMessageQueue(MessageQueue.contactCreationQueue)
private readonly messageQueueService: MessageQueueService,
private readonly messageService: MessagingMessageService,
private readonly messageParticipantService: MessagingMessageParticipantService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
private readonly eventEmitter: EventEmitter2,
) {}
async saveMessagesAndEnqueueContactCreationJob(
messagesToSave: GmailMessage[],
messageChannel: MessageChannelWorkspaceEntity,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
) {
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
const emailAliases = connectedAccount.emailAliases?.split(',') || [];
let savedMessageParticipants: MessageParticipantWorkspaceEntity[] = [];
const participantsWithMessageId = await workspaceDataSource?.transaction(
async (transactionManager: EntityManager) => {
const messageExternalIdsAndIdsMap =
await this.messageService.saveMessagesWithinTransaction(
messagesToSave,
connectedAccount,
messageChannel.id,
workspaceId,
transactionManager,
);
const participantsWithMessageId: (ParticipantWithMessageId & {
shouldCreateContact: boolean;
})[] = messagesToSave.flatMap((message) => {
const messageId = messageExternalIdsAndIdsMap.get(message.externalId);
return messageId
? message.participants.map((participant: Participant) => {
const fromHandle =
message.participants.find((p) => p.role === 'from')?.handle ||
'';
const isMessageSentByConnectedAccount =
emailAliases.includes(fromHandle);
const isParticipantConnectedAccount = emailAliases.includes(
participant.handle,
);
const isExcludedByNonProfessionalEmails =
messageChannel.excludeNonProfessionalEmails &&
!isWorkEmail(participant.handle);
const isExcludedByGroupEmails =
messageChannel.excludeGroupEmails &&
isGroupEmail(participant.handle);
const shouldCreateContact =
!isParticipantConnectedAccount &&
!isExcludedByNonProfessionalEmails &&
!isExcludedByGroupEmails &&
(messageChannel.contactAutoCreationPolicy ===
MessageChannelContactAutoCreationPolicy.SENT_AND_RECEIVED ||
(messageChannel.contactAutoCreationPolicy ===
MessageChannelContactAutoCreationPolicy.SENT &&
isMessageSentByConnectedAccount));
return {
...participant,
messageId,
shouldCreateContact,
};
})
: [];
});
savedMessageParticipants =
await this.messageParticipantService.saveMessageParticipants(
participantsWithMessageId,
workspaceId,
transactionManager,
);
return participantsWithMessageId;
},
);
this.eventEmitter.emit(`messageParticipant.matched`, {
workspaceId,
workspaceMemberId: connectedAccount.accountOwnerId,
messageParticipants: savedMessageParticipants,
});
if (messageChannel.isContactAutoCreationEnabled) {
const contactsToCreate = participantsWithMessageId.filter(
(participant) => participant.shouldCreateContact,
);
await this.messageQueueService.add<CreateCompanyAndContactJobData>(
CreateCompanyAndContactJob.name,
{
workspaceId,
connectedAccount,
contactsToCreate,
},
);
}
}
}