6430 Part 1: remove all raw queries from the messaging and calendar modules (#6572)

Part 1 of #6430
- Remove all repositories which contained raw queries in `messaging`
module
- Replace them using `twentyORMManager`
This commit is contained in:
Raphaël Bosi
2024-08-13 19:40:50 +02:00
committed by GitHub
parent 40bbee8d9f
commit d1c278d6b2
27 changed files with 379 additions and 1115 deletions

View File

@ -30,7 +30,6 @@ import { WorkspaceManagerModule } from 'src/engine/workspace-manager/workspace-m
import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { AuthResolver } from './auth.resolver';
@ -51,7 +50,6 @@ import { JwtAuthStrategy } from './strategies/jwt.auth.strategy';
),
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
MessageChannelWorkspaceEntity,
]),
HttpModule,
UserWorkspaceModule,

View File

@ -23,8 +23,8 @@ import {
ConnectedAccountProvider,
ConnectedAccountWorkspaceEntity,
} from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import {
MessageChannelSyncStage,
MessageChannelSyncStatus,
MessageChannelType,
MessageChannelVisibility,
@ -47,8 +47,6 @@ export class GoogleAPIsService {
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly accountsToReconnectService: AccountsToReconnectService,
) {}
@ -88,6 +86,11 @@ export class GoogleAPIsService {
'calendarChannel',
);
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
const workspaceDataSource = await this.twentyORMManager.getDatasource();
await workspaceDataSource.transaction(async (manager: EntityManager) => {
@ -105,7 +108,7 @@ export class GoogleAPIsService {
manager,
);
await this.messageChannelRepository.create(
await messageChannelRepository.save(
{
id: v4(),
connectedAccountId: newOrExistingConnectedAccountId,
@ -115,7 +118,7 @@ export class GoogleAPIsService {
messageVisibility || MessageChannelVisibility.SHARE_EVERYTHING,
syncStatus: MessageChannelSyncStatus.ONGOING,
},
workspaceId,
{},
manager,
);
@ -159,20 +162,27 @@ export class GoogleAPIsService {
newOrExistingConnectedAccountId,
);
await this.messageChannelRepository.resetSync(
newOrExistingConnectedAccountId,
workspaceId,
await messageChannelRepository.update(
{
connectedAccountId: newOrExistingConnectedAccountId,
},
{
syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
syncStatus: null,
syncCursor: '',
syncStageStartedAt: null,
},
manager,
);
}
});
if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) {
const messageChannels =
await this.messageChannelRepository.getByConnectedAccountId(
newOrExistingConnectedAccountId,
workspaceId,
);
const messageChannels = await messageChannelRepository.find({
where: {
connectedAccountId: newOrExistingConnectedAccountId,
},
});
for (const messageChannel of messageChannels) {
await this.messageQueueService.add<MessagingMessageListFetchJobData>(

View File

@ -1,10 +1,5 @@
import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessageParticipantRepository } from 'src/modules/messaging/common/repositories/message-participant.repository';
import { MessageThreadRepository } from 'src/modules/messaging/common/repositories/message-thread.repository';
import { MessageRepository } from 'src/modules/messaging/common/repositories/message.repository';
import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository';
import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository';
import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository';
@ -13,12 +8,6 @@ export const metadataToRepositoryMapping = {
AuditLogWorkspaceEntity: AuditLogRepository,
BlocklistWorkspaceEntity: BlocklistRepository,
ConnectedAccountWorkspaceEntity: ConnectedAccountRepository,
MessageChannelMessageAssociationWorkspaceEntity:
MessageChannelMessageAssociationRepository,
MessageChannelWorkspaceEntity: MessageChannelRepository,
MessageWorkspaceEntity: MessageRepository,
MessageParticipantWorkspaceEntity: MessageParticipantRepository,
MessageThreadWorkspaceEntity: MessageThreadRepository,
TimelineActivityWorkspaceEntity: TimelineActivityRepository,
WorkspaceMemberWorkspaceEntity: WorkspaceMemberRepository,
};

View File

@ -157,7 +157,7 @@ export class CreateCompanyService {
};
}
private async createCompanyMap(companies: CompanyWorkspaceEntity[]) {
private createCompanyMap(companies: CompanyWorkspaceEntity[]) {
return companies.reduce(
(acc, company) => {
if (!company.domainName) {

View File

@ -1,15 +1,15 @@
import { Logger, Scope } from '@nestjs/common';
import { Any } from 'typeorm';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { MessageChannelMessageAssociationRepository } from 'src/modules/messaging/common/repositories/message-channel-message-association.repository';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
export type BlocklistItemDeleteMessagesJobData = {
@ -25,15 +25,10 @@ export class BlocklistItemDeleteMessagesJob {
private readonly logger = new Logger(BlocklistItemDeleteMessagesJob.name);
constructor(
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectObjectMetadataRepository(
MessageChannelMessageAssociationWorkspaceEntity,
)
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
private readonly blocklistRepository: BlocklistRepository,
private readonly threadCleanerService: MessagingMessageCleanerService,
private readonly twentyORMManager: TwentyORMManager,
) {}
@Process(BlocklistItemDeleteMessagesJob.name)
@ -65,24 +60,26 @@ export class BlocklistItemDeleteMessagesJob {
);
}
const messageChannels =
await this.messageChannelRepository.getIdsByWorkspaceMemberId(
workspaceMemberId,
workspaceId,
const messageChannelMessageAssociationRepository =
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
'messageChannelMessageAssociation',
);
const messageChannelIds = messageChannels.map(({ id }) => id);
const rolesToDelete: ('from' | 'to')[] = ['from', 'to'];
for (const messageChannelId of messageChannelIds) {
await this.messageChannelMessageAssociationRepository.deleteByMessageParticipantHandleAndMessageChannelIdAndRoles(
handle,
messageChannelId,
rolesToDelete,
workspaceId,
);
}
await messageChannelMessageAssociationRepository.delete({
messageChannel: {
connectedAccount: {
accountOwnerId: workspaceMemberId,
},
},
message: {
messageParticipants: {
handle,
role: Any(rolesToDelete),
},
},
});
await this.threadCleanerService.cleanWorkspaceThreads(workspaceId);

View File

@ -8,6 +8,7 @@ import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decora
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@ -15,7 +16,6 @@ import {
BlocklistItemDeleteMessagesJob,
BlocklistItemDeleteMessagesJobData,
} from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
@ -27,8 +27,7 @@ export class MessagingBlocklistListener {
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly twentyORMManager: TwentyORMManager,
) {}
@OnEvent('blocklist.created')
@ -61,14 +60,19 @@ export class MessagingBlocklistListener {
return;
}
const messageChannel =
await this.messageChannelRepository.getByConnectedAccountId(
connectedAccount[0].id,
workspaceId,
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
const messageChannel = await messageChannelRepository.findOneOrFail({
where: {
connectedAccountId: connectedAccount[0].id,
},
});
await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch(
messageChannel[0].id,
messageChannel.id,
workspaceId,
);
}
@ -98,14 +102,19 @@ export class MessagingBlocklistListener {
return;
}
const messageChannel =
await this.messageChannelRepository.getByConnectedAccountId(
connectedAccount[0].id,
workspaceId,
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
const messageChannel = await messageChannelRepository.findOneOrFail({
where: {
connectedAccountId: connectedAccount[0].id,
},
});
await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch(
messageChannel[0].id,
messageChannel.id,
workspaceId,
);
}

View File

@ -2,22 +2,13 @@ import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity';
import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity';
import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity';
@Module({
imports: [
WorkspaceDataSourceModule,
ObjectMetadataRepositoryModule.forFeature([
MessageParticipantWorkspaceEntity,
MessageWorkspaceEntity,
MessageThreadWorkspaceEntity,
]),
TypeOrmModule.forFeature([FeatureFlagEntity], 'core'),
ConnectedAccountModule,
],

View File

@ -1,11 +1,12 @@
import { ForbiddenException } from '@nestjs/common';
import groupBy from 'lodash.groupby';
import { Any } from 'typeorm';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
@ -13,12 +14,11 @@ import { isDefined } from 'src/utils/is-defined';
export class CanAccessMessageThreadService {
constructor(
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelService: MessageChannelRepository,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(WorkspaceMemberWorkspaceEntity)
private readonly workspaceMemberRepository: WorkspaceMemberRepository,
private readonly twentyORMManager: TwentyORMManager,
) {}
public async canAccessMessageThread(
@ -26,12 +26,19 @@ export class CanAccessMessageThreadService {
workspaceId: string,
messageChannelMessageAssociations: any[],
) {
const messageChannels = await this.messageChannelService.getByIds(
messageChannelMessageAssociations.map(
(association) => association.messageChannelId,
),
workspaceId,
);
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
const messageChannels = await messageChannelRepository.find({
where: {
id: Any(
messageChannelMessageAssociations.map(
(association) => association.messageChannelId,
),
),
},
});
const messageChannelsGroupByVisibility = groupBy(
messageChannels,

View File

@ -1,19 +1,15 @@
import { Module } from '@nestjs/common';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { CanAccessMessageThreadService } from 'src/modules/messaging/common/query-hooks/message/can-access-message-thread.service';
import { MessageFindManyPreQueryHook } from 'src/modules/messaging/common/query-hooks/message/message-find-many.pre-query.hook';
import { MessageFindOnePreQueryHook } from 'src/modules/messaging/common/query-hooks/message/message-find-one.pre-query-hook';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
@Module({
imports: [
ObjectMetadataRepositoryModule.forFeature([
MessageChannelMessageAssociationWorkspaceEntity,
MessageChannelWorkspaceEntity,
ConnectedAccountWorkspaceEntity,
WorkspaceMemberWorkspaceEntity,
]),

View File

@ -1,87 +0,0 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
@Injectable()
export class MessageChannelMessageAssociationRepository {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async deleteByMessageParticipantHandleAndMessageChannelIdAndRoles(
messageParticipantHandle: string,
messageChannelId: string,
rolesToDelete: ('from' | 'to' | 'cc' | 'bcc')[],
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const isHandleDomain = messageParticipantHandle.startsWith('@');
const messageChannel =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannel"
WHERE "id" = $1`,
[messageChannelId],
workspaceId,
transactionManager,
);
const messageChannelHandle = messageChannel[0].handle;
const messageChannelMessageAssociationIdsToDelete =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT "messageChannelMessageAssociation".id
FROM ${dataSourceSchema}."messageChannelMessageAssociation" "messageChannelMessageAssociation"
JOIN ${dataSourceSchema}."message" ON "messageChannelMessageAssociation"."messageId" = ${dataSourceSchema}."message"."id"
JOIN ${dataSourceSchema}."messageParticipant" "messageParticipant" ON ${dataSourceSchema}."message"."id" = "messageParticipant"."messageId"
WHERE "messageParticipant"."handle" != $1
AND "messageParticipant"."handle" ${isHandleDomain ? '~*' : '='} $2
AND "messageParticipant"."role" = ANY($3)
AND "messageChannelMessageAssociation"."messageChannelId" = $4`,
[
messageChannelHandle,
isHandleDomain
? // eslint-disable-next-line no-useless-escape
`.+@(.+\.)?${messageParticipantHandle.slice(1)}`
: messageParticipantHandle,
rolesToDelete,
messageChannelId,
],
workspaceId,
transactionManager,
);
const messageChannelMessageAssociationIdsToDeleteArray =
messageChannelMessageAssociationIdsToDelete.map(
(messageChannelMessageAssociation: { id: string }) =>
messageChannelMessageAssociation.id,
);
await this.deleteByIds(
messageChannelMessageAssociationIdsToDeleteArray,
workspaceId,
transactionManager,
);
}
public async deleteByIds(
ids: string[],
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."messageChannelMessageAssociation" WHERE "id" = ANY($1)`,
[ids],
workspaceId,
transactionManager,
);
}
}

View File

@ -1,320 +0,0 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncStatus,
MessageChannelSyncStage,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
@Injectable()
export class MessageChannelRepository {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async create(
messageChannel: Pick<
MessageChannelWorkspaceEntity,
| 'id'
| 'connectedAccountId'
| 'type'
| 'handle'
| 'visibility'
| 'syncStatus'
>,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."messageChannel" ("id", "connectedAccountId", "type", "handle", "visibility", "syncStatus")
VALUES ($1, $2, $3, $4, $5, $6)`,
[
messageChannel.id,
messageChannel.connectedAccountId,
messageChannel.type,
messageChannel.handle,
messageChannel.visibility,
messageChannel.syncStatus,
],
workspaceId,
transactionManager,
);
}
public async resetSync(
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel"
SET "syncStatus" = NULL,
"syncStage" = '${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}',
"syncCursor" = '',
"syncStageStartedAt" = NULL
WHERE "connectedAccountId" = $1`,
[connectedAccountId],
workspaceId,
transactionManager,
);
}
public async getAll(
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageChannelWorkspaceEntity[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannel"`,
[],
workspaceId,
transactionManager,
);
}
public async getByConnectedAccountId(
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageChannelWorkspaceEntity[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "connectedAccountId" = $1 AND "type" = 'email' LIMIT 1`,
[connectedAccountId],
workspaceId,
transactionManager,
);
}
public async getFirstByConnectedAccountIdOrFail(
connectedAccountId: string,
workspaceId: string,
): Promise<MessageChannelWorkspaceEntity> {
const messageChannel = await this.getFirstByConnectedAccountId(
connectedAccountId,
workspaceId,
);
if (!messageChannel) {
throw new Error(
`Message channel for connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
);
}
return messageChannel;
}
public async getFirstByConnectedAccountId(
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageChannelWorkspaceEntity | undefined> {
const messageChannels = await this.getByConnectedAccountId(
connectedAccountId,
workspaceId,
transactionManager,
);
return messageChannels[0];
}
public async getByIds(
ids: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageChannelWorkspaceEntity[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "id" = ANY($1)`,
[ids],
workspaceId,
transactionManager,
);
}
public async getById(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageChannelWorkspaceEntity> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messageChannels =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageChannel" WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
return messageChannels[0];
}
public async getIdsByWorkspaceMemberId(
workspaceMemberId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageChannelWorkspaceEntity[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messageChannelIds =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT "messageChannel".id FROM ${dataSourceSchema}."messageChannel" "messageChannel"
JOIN ${dataSourceSchema}."connectedAccount" ON "messageChannel"."connectedAccountId" = ${dataSourceSchema}."connectedAccount"."id"
WHERE ${dataSourceSchema}."connectedAccount"."accountOwnerId" = $1`,
[workspaceMemberId],
workspaceId,
transactionManager,
);
return messageChannelIds;
}
public async updateSyncStatus(
id: string,
syncStatus: MessageChannelSyncStatus,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const needsToUpdateSyncedAt =
syncStatus === MessageChannelSyncStatus.ACTIVE;
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = $1 ${
needsToUpdateSyncedAt ? `, "syncedAt" = NOW()` : ''
} WHERE "id" = $2`,
[syncStatus, id],
workspaceId,
transactionManager,
);
}
public async updateSyncStage(
id: string,
syncStage: MessageChannelSyncStage,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const needsToUpdateSyncStageStartedAt =
syncStage === MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING ||
syncStage === MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING;
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStage" = $1 ${
needsToUpdateSyncStageStartedAt ? `, "syncStageStartedAt" = NOW()` : ''
} WHERE "id" = $2`,
[syncStage, id],
workspaceId,
transactionManager,
);
}
public async resetSyncStageStartedAt(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStageStartedAt" = NULL WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
}
public async updateLastSyncCursorIfHigher(
id: string,
syncCursor: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncCursor" = $1
WHERE "id" = $2
AND ("syncCursor" < $1 OR "syncCursor" = '')`,
[syncCursor, id],
workspaceId,
transactionManager,
);
}
public async resetSyncCursor(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncCursor" = ''
WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
}
public async incrementThrottleFailureCount(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = "throttleFailureCount" + 1
WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
}
public async resetThrottleFailureCount(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = 0
WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
}
}

View File

@ -1,209 +0,0 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity';
import { ParticipantWithId } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
@Injectable()
export class MessageParticipantRepository {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getByHandles(
handles: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageParticipantWorkspaceEntity[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."messageParticipant" WHERE "handle" = ANY($1)`,
[handles],
workspaceId,
transactionManager,
);
}
public async updateParticipantsPersonId(
participantIds: string[],
personId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = $1 WHERE "id" = ANY($2)`,
[personId, participantIds],
workspaceId,
transactionManager,
);
}
public async updateParticipantsPersonIdAndReturn(
participantIds: string[],
personId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = $1 WHERE "id" = ANY($2) RETURNING *`,
[personId, participantIds],
workspaceId,
transactionManager,
);
}
public async updateParticipantsWorkspaceMemberId(
participantIds: string[],
workspaceMemberId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "workspaceMemberId" = $1 WHERE "id" = ANY($2)`,
[workspaceMemberId, participantIds],
workspaceId,
transactionManager,
);
}
public async removePersonIdByHandle(
handle: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "personId" = NULL WHERE "handle" = $1`,
[handle],
workspaceId,
transactionManager,
);
}
public async removeWorkspaceMemberIdByHandle(
handle: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageParticipant" SET "workspaceMemberId" = NULL WHERE "handle" = $1`,
[handle],
workspaceId,
transactionManager,
);
}
public async getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberIdAndMessageOutgoing(
messageChannelId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ParticipantWithId[]> {
if (!messageChannelId || !workspaceId) {
return [];
}
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messageParticipants: ParticipantWithId[] =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT "messageParticipant".id,
"messageParticipant"."role",
"messageParticipant"."handle",
"messageParticipant"."displayName",
"messageParticipant"."personId",
"messageParticipant"."workspaceMemberId",
"messageParticipant"."messageId"
FROM ${dataSourceSchema}."messageParticipant" "messageParticipant"
LEFT JOIN ${dataSourceSchema}."message" ON "messageParticipant"."messageId" = ${dataSourceSchema}."message"."id"
LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" ON ${dataSourceSchema}."messageChannelMessageAssociation"."messageId" = ${dataSourceSchema}."message"."id"
WHERE ${dataSourceSchema}."messageChannelMessageAssociation"."messageChannelId" = $1
AND "messageParticipant"."personId" IS NULL
AND "messageParticipant"."workspaceMemberId" IS NULL
AND ${dataSourceSchema}."message"."direction" = 'outgoing'`,
[messageChannelId],
workspaceId,
transactionManager,
);
return messageParticipants;
}
public async getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberId(
messageChannelId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ParticipantWithId[]> {
if (!messageChannelId || !workspaceId) {
return [];
}
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messageParticipants: ParticipantWithId[] =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT "messageParticipant".id,
"messageParticipant"."role",
"messageParticipant"."handle",
"messageParticipant"."displayName",
"messageParticipant"."personId",
"messageParticipant"."workspaceMemberId",
"messageParticipant"."messageId"
FROM ${dataSourceSchema}."messageParticipant" "messageParticipant"
LEFT JOIN ${dataSourceSchema}."message" ON "messageParticipant"."messageId" = ${dataSourceSchema}."message"."id"
LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" ON ${dataSourceSchema}."messageChannelMessageAssociation"."messageId" = ${dataSourceSchema}."message"."id"
WHERE ${dataSourceSchema}."messageChannelMessageAssociation"."messageChannelId" = $1
AND "messageParticipant"."personId" IS NULL
AND "messageParticipant"."workspaceMemberId" IS NULL`,
[messageChannelId],
workspaceId,
transactionManager,
);
return messageParticipants;
}
public async getWithoutPersonIdAndWorkspaceMemberId(
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ParticipantWithId[]> {
if (!workspaceId) {
throw new Error('WorkspaceId is required');
}
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messageParticipants: ParticipantWithId[] =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT "messageParticipant".*
FROM ${dataSourceSchema}."messageParticipant" "messageParticipant"
WHERE "messageParticipant"."personId" IS NULL
AND "messageParticipant"."workspaceMemberId" IS NULL`,
[],
workspaceId,
transactionManager,
);
return messageParticipants;
}
}

View File

@ -1,67 +0,0 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
@Injectable()
export class MessageThreadRepository {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getOrphanThreadIdsPaginated(
limit: number,
offset: number,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<string[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const orphanThreads = await this.workspaceDataSourceService.executeRawQuery(
`SELECT mt.id
FROM ${dataSourceSchema}."messageThread" mt
LEFT JOIN ${dataSourceSchema}."message" m ON mt.id = m."messageThreadId"
WHERE m."messageThreadId" IS NULL
LIMIT $1 OFFSET $2`,
[limit, offset],
workspaceId,
transactionManager,
);
return orphanThreads.map(({ id }) => id);
}
public async deleteByIds(
messageThreadIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."messageThread" WHERE id = ANY($1)`,
[messageThreadIds],
workspaceId,
transactionManager,
);
}
public async insert(
messageThreadId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."messageThread" (id) VALUES ($1)`,
[messageThreadId],
workspaceId,
transactionManager,
);
}
}

View File

@ -1,137 +0,0 @@
import { Injectable } from '@nestjs/common';
import { EntityManager } from 'typeorm';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { MessageWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message.workspace-entity';
@Injectable()
export class MessageRepository {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async getNonAssociatedMessageIdsPaginated(
limit: number,
offset: number,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<string[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const nonAssociatedMessages =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT m.id FROM ${dataSourceSchema}."message" m
LEFT JOIN ${dataSourceSchema}."messageChannelMessageAssociation" mcma
ON m.id = mcma."messageId"
WHERE mcma.id IS NULL
LIMIT $1 OFFSET $2`,
[limit, offset],
workspaceId,
transactionManager,
);
return nonAssociatedMessages.map(({ id }) => id);
}
public async getFirstOrNullByHeaderMessageId(
headerMessageId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageWorkspaceEntity | null> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const messages = await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "headerMessageId" = $1 LIMIT 1`,
[headerMessageId],
workspaceId,
transactionManager,
);
if (!messages || messages.length === 0) {
return null;
}
return messages[0];
}
public async getByIds(
messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageWorkspaceEntity[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`,
[messageIds],
workspaceId,
transactionManager,
);
}
public async deleteByIds(
messageIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`DELETE FROM ${dataSourceSchema}."message" WHERE "id" = ANY($1)`,
[messageIds],
workspaceId,
transactionManager,
);
}
public async getByMessageThreadIds(
messageThreadIds: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<MessageWorkspaceEntity[]> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."message" WHERE "messageThreadId" = ANY($1)`,
[messageThreadIds],
workspaceId,
transactionManager,
);
}
public async insert(
id: string,
headerMessageId: string,
subject: string,
receivedAt: Date,
messageDirection: string,
messageThreadId: string,
text: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."message" ("id", "headerMessageId", "subject", "receivedAt", "direction", "messageThreadId", "text") VALUES ($1, $2, $3, $4, $5, $6, $7)`,
[
id,
headerMessageId,
subject,
receivedAt,
messageDirection,
messageThreadId,
text,
],
workspaceId,
transactionManager,
);
}
}

View File

@ -3,11 +3,9 @@ import { Injectable } from '@nestjs/common';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service';
import { AccountsToReconnectKeys } from 'src/modules/connected-account/types/accounts-to-reconnect-key-value.type';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import {
MessageChannelSyncStage,
MessageChannelSyncStatus,
@ -17,44 +15,57 @@ import {
@Injectable()
export class MessagingChannelSyncStatusService {
constructor(
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
private readonly twentyORMManager: TwentyORMManager,
private readonly accountsToReconnectService: AccountsToReconnectService,
) {}
public async scheduleFullMessageListFetch(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
workspaceId,
public async scheduleFullMessageListFetch(messageChannelId: string) {
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await messageChannelRepository.update(
{
id: messageChannelId,
},
{
syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
},
);
}
public async schedulePartialMessageListFetch(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
workspaceId,
public async schedulePartialMessageListFetch(messageChannelId: string) {
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await messageChannelRepository.update(
{
id: messageChannelId,
},
{
syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
},
);
}
public async scheduleMessagesImport(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
workspaceId,
public async scheduleMessagesImport(messageChannelId: string) {
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await messageChannelRepository.update(
{
id: messageChannelId,
},
{
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
},
);
}
@ -66,62 +77,75 @@ export class MessagingChannelSyncStatusService {
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);
await this.messageChannelRepository.resetSyncCursor(
messageChannelId,
workspaceId,
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await messageChannelRepository.update(
{
id: messageChannelId,
},
{
syncCursor: '',
syncStageStartedAt: null,
throttleFailureCount: 0,
},
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannelId,
workspaceId,
);
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannelId,
workspaceId,
);
await this.scheduleFullMessageListFetch(messageChannelId, workspaceId);
await this.scheduleFullMessageListFetch(messageChannelId);
}
public async markAsMessagesListFetchOngoing(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
workspaceId,
);
public async markAsMessagesListFetchOngoing(messageChannelId: string) {
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.ONGOING,
workspaceId,
await messageChannelRepository.update(
{
id: messageChannelId,
},
{
syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
syncStatus: MessageChannelSyncStatus.ONGOING,
},
);
}
public async markAsCompletedAndSchedulePartialMessageListFetch(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.ACTIVE,
workspaceId,
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await messageChannelRepository.update(
{
id: messageChannelId,
},
{
syncStatus: MessageChannelSyncStatus.ACTIVE,
},
);
await this.schedulePartialMessageListFetch(messageChannelId, workspaceId);
await this.schedulePartialMessageListFetch(messageChannelId);
}
public async markAsMessagesImportOngoing(
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING,
workspaceId,
public async markAsMessagesImportOngoing(messageChannelId: string) {
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await messageChannelRepository.update(
{
id: messageChannelId,
},
{
syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING,
},
);
}
@ -133,16 +157,19 @@ export class MessagingChannelSyncStatusService {
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncStage.FAILED,
workspaceId,
);
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.FAILED_UNKNOWN,
workspaceId,
await messageChannelRepository.update(
{
id: messageChannelId,
},
{
syncStage: MessageChannelSyncStage.FAILED,
syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN,
},
);
}
@ -154,16 +181,19 @@ export class MessagingChannelSyncStatusService {
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncStage.FAILED,
workspaceId,
);
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await this.messageChannelRepository.updateSyncStatus(
messageChannelId,
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
workspaceId,
await messageChannelRepository.update(
{
id: messageChannelId,
},
{
syncStage: MessageChannelSyncStage.FAILED,
syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
},
);
await this.addToAccountsToReconnect(messageChannelId, workspaceId);

View File

@ -1,4 +1,3 @@
import { Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
@ -12,8 +11,7 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
@ -25,15 +23,12 @@ import {
@Processor(MessageQueue.cronQueue)
export class MessagingMessageListFetchCronJob {
private readonly logger = new Logger(MessagingMessageListFetchCronJob.name);
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
@Process(MessagingMessageListFetchCronJob.name)
@ -45,9 +40,15 @@ export class MessagingMessageListFetchCronJob {
});
for (const activeWorkspace of activeWorkspaces) {
const messageChannels = await this.messageChannelRepository.getAll(
activeWorkspace.id,
);
const messageChannelRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>(
activeWorkspace.id,
'messageChannel',
);
const messageChannels = await messageChannelRepository.find({
select: ['id'],
});
for (const messageChannel of messageChannels) {
if (

View File

@ -12,8 +12,7 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
@ -32,8 +31,7 @@ export class MessagingMessagesImportCronJob {
private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
@Process(MessagingMessagesImportCronJob.name)
@ -45,9 +43,15 @@ export class MessagingMessagesImportCronJob {
});
for (const activeWorkspace of activeWorkspaces) {
const messageChannels = await this.messageChannelRepository.getAll(
activeWorkspace.id,
);
const messageChannelRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>(
activeWorkspace.id,
'messageChannel',
);
const messageChannels = await messageChannelRepository.find({
select: ['id', 'isSyncEnabled', 'syncStage'],
});
for (const messageChannel of messageChannels) {
if (

View File

@ -12,8 +12,6 @@ import { EmailAliasManagerModule } from 'src/modules/connected-account/email-ali
import { OAuth2ClientManagerModule } from 'src/modules/connected-account/oauth2-client-manager/oauth2-client-manager.module';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider';
import { MessagingGmailFetchByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-by-batch.service';
import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service';
@ -29,8 +27,6 @@ import { MessageParticipantManagerModule } from 'src/modules/messaging/message-p
EnvironmentModule,
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountWorkspaceEntity,
MessageChannelWorkspaceEntity,
MessageChannelMessageAssociationWorkspaceEntity,
BlocklistWorkspaceEntity,
]),
MessagingCommonModule,

View File

@ -4,10 +4,10 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
@ -33,9 +33,8 @@ export class MessagingMessageListFetchJob {
private readonly messagingPartialMessageListFetchService: MessagingPartialMessageListFetchService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly messagingTelemetryService: MessagingTelemetryService,
private readonly twentyORMManager: TwentyORMManager,
) {}
@Process(MessagingMessageListFetchJob.name)
@ -48,10 +47,16 @@ export class MessagingMessageListFetchJob {
workspaceId,
});
const messageChannel = await this.messageChannelRepository.getById(
messageChannelId,
workspaceId,
);
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
const messageChannel = await messageChannelRepository.findOne({
where: {
id: messageChannelId,
},
});
if (!messageChannel) {
await this.messagingTelemetryService.track({

View File

@ -4,10 +4,10 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
@ -29,9 +29,8 @@ export class MessagingMessagesImportJob {
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly gmailFetchMessageContentFromCacheService: MessagingMessagesImportService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly messagingTelemetryService: MessagingTelemetryService,
private readonly twentyORMManager: TwentyORMManager,
) {}
@Process(MessagingMessagesImportJob.name)
@ -44,10 +43,16 @@ export class MessagingMessagesImportJob {
messageChannelId,
});
const messageChannel = await this.messageChannelRepository.getById(
messageChannelId,
workspaceId,
);
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
const messageChannel = await messageChannelRepository.findOne({
where: {
id: messageChannelId,
},
});
if (!messageChannel) {
await this.messagingTelemetryService.track({

View File

@ -59,13 +59,11 @@ export class MessagingOngoingStaleJob {
case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING:
await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
break;
case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING:
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
break;
default:

View File

@ -3,9 +3,9 @@ import { Injectable } from '@nestjs/common';
import snakeCase from 'lodash.snakecase';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts';
@ -28,8 +28,7 @@ export class MessagingErrorHandlingService {
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
private readonly messagingTelemetryService: MessagingTelemetryService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly twentyORMManager: TwentyORMManager,
) {}
public async handleGmailError(
@ -263,21 +262,18 @@ export class MessagingErrorHandlingService {
case 'full-message-list-fetch':
await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch(
messageChannel.id,
workspaceId,
);
break;
case 'partial-message-list-fetch':
await this.messagingChannelSyncStatusService.schedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
break;
case 'messages-import':
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
break;
@ -290,9 +286,17 @@ export class MessagingErrorHandlingService {
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
await this.messageChannelRepository.incrementThrottleFailureCount(
messageChannel.id,
workspaceId,
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await messageChannelRepository.increment(
{
id: messageChannel.id,
},
'throttleFailureCount',
1,
);
await this.messagingTelemetryService.track({

View File

@ -7,10 +7,8 @@ import { Any, EntityManager } from 'typeorm';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
@ -31,8 +29,6 @@ export class MessagingFullMessageListFetchService {
constructor(
private readonly gmailClientProvider: MessagingGmailClientProvider,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
@ -47,7 +43,6 @@ export class MessagingFullMessageListFetchService {
) {
await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing(
messageChannel.id,
workspaceId,
);
const gmailClient: gmail_v1.Gmail =
@ -70,19 +65,23 @@ export class MessagingFullMessageListFetchService {
return;
}
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannel.id,
workspaceId,
);
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannel.id,
workspaceId,
await messageChannelRepository.update(
{
id: messageChannel.id,
},
{
throttleFailureCount: 0,
syncStageStartedAt: null,
},
);
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
}
@ -216,11 +215,32 @@ export class MessagingFullMessageListFetchService {
);
}
await this.messageChannelRepository.updateLastSyncCursorIfHigher(
messageChannelId,
historyId,
workspaceId,
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
const messageChannel = await messageChannelRepository.findOneOrFail(
{
where: {
id: messageChannelId,
},
},
transactionManager,
);
const currentSyncCursor = messageChannel.syncCursor;
if (!currentSyncCursor || historyId > currentSyncCursor) {
await messageChannelRepository.update(
{
id: messageChannel.id,
},
{
syncCursor: historyId,
},
transactionManager,
);
}
}
}

View File

@ -6,13 +6,13 @@ import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { EmailAliasManagerService } from 'src/modules/connected-account/email-alias-manager/services/email-alias-manager.service';
import { RefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import {
MessageChannelSyncStage,
@ -40,12 +40,11 @@ export class MessagingMessagesImportService {
private readonly messagingTelemetryService: MessagingTelemetryService,
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
private readonly blocklistRepository: BlocklistRepository,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly emailAliasManagerService: EmailAliasManagerService,
private readonly isFeatureEnabledService: IsFeatureEnabledService,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly twentyORMManager: TwentyORMManager,
) {}
async processMessageBatchImport(
@ -73,7 +72,6 @@ export class MessagingMessagesImportService {
await this.messagingChannelSyncStatusService.markAsMessagesImportOngoing(
messageChannel.id,
workspaceId,
);
let accessToken: string;
@ -139,7 +137,6 @@ export class MessagingMessagesImportService {
if (!messageIdsToFetch?.length) {
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
return await this.trackMessageImportCompleted(
@ -180,23 +177,26 @@ export class MessagingMessagesImportService {
) {
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
} else {
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
}
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannel.id,
workspaceId,
);
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannel.id,
workspaceId,
await messageChannelRepository.update(
{
id: messageChannel.id,
},
{
throttleFailureCount: 0,
syncStageStartedAt: null,
},
);
return await this.trackMessageImportCompleted(

View File

@ -6,10 +6,8 @@ import { Any } from 'typeorm';
import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service';
import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
@ -26,8 +24,6 @@ export class MessagingPartialMessageListFetchService {
constructor(
private readonly gmailClientProvider: MessagingGmailClientProvider,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectCacheStorage(CacheStorageNamespace.Messaging)
private readonly cacheStorage: CacheStorageService,
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
@ -44,7 +40,6 @@ export class MessagingPartialMessageListFetchService {
): Promise<void> {
await this.messagingChannelSyncStatusService.markAsMessagesListFetchOngoing(
messageChannel.id,
workspaceId,
);
const lastSyncHistoryId = messageChannel.syncCursor;
@ -69,14 +64,19 @@ export class MessagingPartialMessageListFetchService {
return;
}
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannel.id,
workspaceId,
);
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannel.id,
workspaceId,
await messageChannelRepository.update(
{
id: messageChannel.id,
},
{
throttleFailureCount: 0,
syncStageStartedAt: null,
},
);
if (!historyId) {
@ -92,7 +92,6 @@ export class MessagingPartialMessageListFetchService {
await this.messagingChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
messageChannel.id,
workspaceId,
);
return;
@ -147,15 +146,21 @@ export class MessagingPartialMessageListFetchService {
`Deleted ${messagesDeleted.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
await this.messageChannelRepository.updateLastSyncCursorIfHigher(
messageChannel.id,
historyId,
workspaceId,
);
const currentSyncCursor = messageChannel.syncCursor;
if (!currentSyncCursor || historyId > currentSyncCursor) {
await messageChannelRepository.update(
{
id: messageChannel.id,
},
{
syncCursor: historyId,
},
);
}
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
messageChannel.id,
workspaceId,
);
}
}

View File

@ -1,15 +1,16 @@
import { Logger } from '@nestjs/common';
import { Any, IsNull } from 'typeorm';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { CreateCompanyAndContactService } from 'src/modules/contact-creation-manager/services/create-company-and-contact.service';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessageParticipantRepository } from 'src/modules/messaging/common/repositories/message-participant.repository';
import {
MessageChannelContactAutoCreationPolicy,
MessageChannelWorkspaceEntity,
@ -28,12 +29,9 @@ export class MessagingCreateCompanyAndContactAfterSyncJob {
);
constructor(
private readonly createCompanyAndContactService: CreateCompanyAndContactService,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelService: MessageChannelRepository,
@InjectObjectMetadataRepository(MessageParticipantWorkspaceEntity)
private readonly messageParticipantRepository: MessageParticipantRepository,
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly twentyORMManager: TwentyORMManager,
) {}
@Process(MessagingCreateCompanyAndContactAfterSyncJob.name)
@ -45,12 +43,18 @@ export class MessagingCreateCompanyAndContactAfterSyncJob {
);
const { workspaceId, messageChannelId } = data;
const messageChannel = await this.messageChannelService.getByIds(
[messageChannelId],
workspaceId,
);
const messageChannelRepository =
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
'messageChannel',
);
const { contactAutoCreationPolicy, connectedAccountId } = messageChannel[0];
const messageChannel = await messageChannelRepository.findOneOrFail({
where: {
id: messageChannelId,
},
});
const { contactAutoCreationPolicy, connectedAccountId } = messageChannel;
if (
contactAutoCreationPolicy === MessageChannelContactAutoCreationPolicy.NONE
@ -69,17 +73,29 @@ export class MessagingCreateCompanyAndContactAfterSyncJob {
);
}
const contactsToCreate =
const messageParticipantRepository =
await this.twentyORMManager.getRepository<MessageParticipantWorkspaceEntity>(
'messageParticipant',
);
const directionFilter =
contactAutoCreationPolicy ===
MessageChannelContactAutoCreationPolicy.SENT_AND_RECEIVED
? await this.messageParticipantRepository.getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberId(
? Any(['incoming', 'outgoing'])
: 'outgoing';
const contactsToCreate = await messageParticipantRepository.find({
where: {
message: {
messageChannelMessageAssociations: {
messageChannelId,
workspaceId,
)
: await this.messageParticipantRepository.getByMessageChannelIdWithoutPersonIdAndWorkspaceMemberIdAndMessageOutgoing(
messageChannelId,
workspaceId,
);
},
direction: directionFilter,
},
personId: IsNull(),
workspaceMemberId: IsNull(),
},
});
await this.createCompanyAndContactService.createCompaniesAndContactsAndUpdateParticipants(
connectedAccount,

View File

@ -11,8 +11,7 @@ import {
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service';
@ -25,9 +24,8 @@ export class MessagingMessageChannelSyncStatusMonitoringCronJob {
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
private readonly messageChannelRepository: MessageChannelRepository,
private readonly messagingTelemetryService: MessagingTelemetryService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
@Process(MessagingMessageChannelSyncStatusMonitoringCronJob.name)
@ -46,9 +44,14 @@ export class MessagingMessageChannelSyncStatusMonitoringCronJob {
});
for (const activeWorkspace of activeWorkspaces) {
const messageChannels = await this.messageChannelRepository.getAll(
activeWorkspace.id,
);
const messageChannelRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>(
'messageChannel',
activeWorkspace.id,
);
const messageChannels = await messageChannelRepository.find({
select: ['id', 'syncStatus', 'connectedAccountId'],
});
for (const messageChannel of messageChannels) {
if (!messageChannel.syncStatus) {