better logging errors in messages (#12136)
This commit is contained in:
@ -83,12 +83,6 @@ export class BlocklistItemDeleteCalendarEventsJob {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Deleting calendar events from ${handles.join(
|
|
||||||
', ',
|
|
||||||
)} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
const calendarChannels = await calendarChannelRepository.find({
|
const calendarChannels = await calendarChannelRepository.find({
|
||||||
select: {
|
select: {
|
||||||
id: true,
|
id: true,
|
||||||
@ -145,12 +139,6 @@ export class BlocklistItemDeleteCalendarEventsJob {
|
|||||||
calendarEventsAssociationsToDelete.map(({ id }) => id),
|
calendarEventsAssociationsToDelete.map(({ id }) => id),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Deleted calendar events from handle ${handles.join(
|
|
||||||
', ',
|
|
||||||
)} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents(
|
await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents(
|
||||||
|
|||||||
@ -1,5 +1,3 @@
|
|||||||
import { Logger } from '@nestjs/common';
|
|
||||||
|
|
||||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||||
@ -12,10 +10,6 @@ export type DeleteConnectedAccountAssociatedCalendarDataJobData = {
|
|||||||
|
|
||||||
@Processor(MessageQueue.calendarQueue)
|
@Processor(MessageQueue.calendarQueue)
|
||||||
export class DeleteConnectedAccountAssociatedCalendarDataJob {
|
export class DeleteConnectedAccountAssociatedCalendarDataJob {
|
||||||
private readonly logger = new Logger(
|
|
||||||
DeleteConnectedAccountAssociatedCalendarDataJob.name,
|
|
||||||
);
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly calendarEventCleanerService: CalendarEventCleanerService,
|
private readonly calendarEventCleanerService: CalendarEventCleanerService,
|
||||||
) {}
|
) {}
|
||||||
@ -24,16 +18,8 @@ export class DeleteConnectedAccountAssociatedCalendarDataJob {
|
|||||||
async handle(
|
async handle(
|
||||||
data: DeleteConnectedAccountAssociatedCalendarDataJobData,
|
data: DeleteConnectedAccountAssociatedCalendarDataJobData,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
this.logger.log(
|
|
||||||
`Deleting connected account ${data.connectedAccountId} associated calendar data in workspace ${data.workspaceId}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents(
|
await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents(
|
||||||
data.workspaceId,
|
data.workspaceId,
|
||||||
);
|
);
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Deleted connected account ${data.connectedAccountId} associated calendar data in workspace ${data.workspaceId}`,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
import { InjectRepository } from '@nestjs/typeorm';
|
import { InjectRepository } from '@nestjs/typeorm';
|
||||||
|
|
||||||
import { Repository } from 'typeorm';
|
|
||||||
import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
|
import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
|
||||||
|
import { Repository } from 'typeorm';
|
||||||
|
|
||||||
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
|
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
|
||||||
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
|
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
import { Logger, Scope } from '@nestjs/common';
|
import { Scope } from '@nestjs/common';
|
||||||
|
|
||||||
import { In } from 'typeorm';
|
import { In } from 'typeorm';
|
||||||
|
|
||||||
@ -22,16 +22,13 @@ export type CalendarOngoingStaleJobData = {
|
|||||||
scope: Scope.REQUEST,
|
scope: Scope.REQUEST,
|
||||||
})
|
})
|
||||||
export class CalendarOngoingStaleJob {
|
export class CalendarOngoingStaleJob {
|
||||||
private readonly logger = new Logger(CalendarOngoingStaleJob.name);
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly twentyORMManager: TwentyORMManager,
|
private readonly twentyORMManager: TwentyORMManager,
|
||||||
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
|
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
@Process(CalendarOngoingStaleJob.name)
|
@Process(CalendarOngoingStaleJob.name)
|
||||||
async handle(data: CalendarOngoingStaleJobData): Promise<void> {
|
async handle(): Promise<void> {
|
||||||
const { workspaceId } = data;
|
|
||||||
|
|
||||||
const calendarChannelRepository =
|
const calendarChannelRepository =
|
||||||
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
|
||||||
'calendarChannel',
|
'calendarChannel',
|
||||||
@ -51,9 +48,6 @@ export class CalendarOngoingStaleJob {
|
|||||||
calendarChannel.syncStageStartedAt &&
|
calendarChannel.syncStageStartedAt &&
|
||||||
isSyncStale(calendarChannel.syncStageStartedAt)
|
isSyncStale(calendarChannel.syncStageStartedAt)
|
||||||
) {
|
) {
|
||||||
this.logger.log(
|
|
||||||
`Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to pending`,
|
|
||||||
);
|
|
||||||
await this.calendarChannelSyncStatusService.resetSyncStageStartedAt([
|
await this.calendarChannelSyncStatusService.resetSyncStageStartedAt([
|
||||||
calendarChannel.id,
|
calendarChannel.id,
|
||||||
]);
|
]);
|
||||||
|
|||||||
@ -1,15 +1,15 @@
|
|||||||
import { Logger, Scope } from '@nestjs/common';
|
import { Scope } from '@nestjs/common';
|
||||||
|
|
||||||
import { IsNull } from 'typeorm';
|
import { IsNull } from 'typeorm';
|
||||||
|
|
||||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||||
|
import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
|
||||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||||
import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
|
import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
|
||||||
import { CalendarEventParticipantWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-event-participant.workspace-entity';
|
import { CalendarEventParticipantWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-event-participant.workspace-entity';
|
||||||
import { CreateCompanyAndContactService } from 'src/modules/contact-creation-manager/services/create-company-and-contact.service';
|
import { CreateCompanyAndContactService } from 'src/modules/contact-creation-manager/services/create-company-and-contact.service';
|
||||||
import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
|
|
||||||
|
|
||||||
export type CalendarCreateCompanyAndContactAfterSyncJobData = {
|
export type CalendarCreateCompanyAndContactAfterSyncJobData = {
|
||||||
workspaceId: string;
|
workspaceId: string;
|
||||||
@ -21,9 +21,6 @@ export type CalendarCreateCompanyAndContactAfterSyncJobData = {
|
|||||||
scope: Scope.REQUEST,
|
scope: Scope.REQUEST,
|
||||||
})
|
})
|
||||||
export class CalendarCreateCompanyAndContactAfterSyncJob {
|
export class CalendarCreateCompanyAndContactAfterSyncJob {
|
||||||
private readonly logger = new Logger(
|
|
||||||
CalendarCreateCompanyAndContactAfterSyncJob.name,
|
|
||||||
);
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly twentyORMManager: TwentyORMManager,
|
private readonly twentyORMManager: TwentyORMManager,
|
||||||
private readonly createCompanyAndContactService: CreateCompanyAndContactService,
|
private readonly createCompanyAndContactService: CreateCompanyAndContactService,
|
||||||
@ -33,9 +30,6 @@ export class CalendarCreateCompanyAndContactAfterSyncJob {
|
|||||||
async handle(
|
async handle(
|
||||||
data: CalendarCreateCompanyAndContactAfterSyncJobData,
|
data: CalendarCreateCompanyAndContactAfterSyncJobData,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
this.logger.log(
|
|
||||||
`create contacts and companies after sync for workspace ${data.workspaceId} and calendarChannel ${data.calendarChannelId}`,
|
|
||||||
);
|
|
||||||
const { workspaceId, calendarChannelId } = data;
|
const { workspaceId, calendarChannelId } = data;
|
||||||
|
|
||||||
const calendarChannelRepository =
|
const calendarChannelRepository =
|
||||||
@ -99,9 +93,5 @@ export class CalendarCreateCompanyAndContactAfterSyncJob {
|
|||||||
workspaceId,
|
workspaceId,
|
||||||
FieldActorSource.CALENDAR,
|
FieldActorSource.CALENDAR,
|
||||||
);
|
);
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`create contacts and companies after sync for workspace ${data.workspaceId} and calendarChannel ${data.calendarChannelId} done`,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
import { Logger, Scope } from '@nestjs/common';
|
import { Scope } from '@nestjs/common';
|
||||||
|
|
||||||
import { And, Any, ILike, In, Not, Or } from 'typeorm';
|
import { And, Any, ILike, In, Not, Or } from 'typeorm';
|
||||||
|
|
||||||
@ -22,8 +22,6 @@ export type BlocklistItemDeleteMessagesJobData = WorkspaceEventBatch<
|
|||||||
scope: Scope.REQUEST,
|
scope: Scope.REQUEST,
|
||||||
})
|
})
|
||||||
export class BlocklistItemDeleteMessagesJob {
|
export class BlocklistItemDeleteMessagesJob {
|
||||||
private readonly logger = new Logger(BlocklistItemDeleteMessagesJob.name);
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly threadCleanerService: MessagingMessageCleanerService,
|
private readonly threadCleanerService: MessagingMessageCleanerService,
|
||||||
private readonly twentyORMManager: TwentyORMManager,
|
private readonly twentyORMManager: TwentyORMManager,
|
||||||
@ -81,12 +79,6 @@ export class BlocklistItemDeleteMessagesJob {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Deleting messages from ${handles.join(
|
|
||||||
', ',
|
|
||||||
)} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
const rolesToDelete: ('from' | 'to')[] = ['from', 'to'];
|
const rolesToDelete: ('from' | 'to')[] = ['from', 'to'];
|
||||||
|
|
||||||
const messageChannels = await messageChannelRepository.find({
|
const messageChannels = await messageChannelRepository.find({
|
||||||
@ -146,12 +138,6 @@ export class BlocklistItemDeleteMessagesJob {
|
|||||||
messageChannelMessageAssociationsToDelete.map(({ id }) => id),
|
messageChannelMessageAssociationsToDelete.map(({ id }) => id),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Deleted messages from handle ${handles.join(
|
|
||||||
', ',
|
|
||||||
)} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
await this.threadCleanerService.cleanWorkspaceThreads(workspaceId);
|
await this.threadCleanerService.cleanWorkspaceThreads(workspaceId);
|
||||||
|
|||||||
@ -1,9 +1,9 @@
|
|||||||
import { Logger, Scope } from '@nestjs/common';
|
import { Logger, Scope } from '@nestjs/common';
|
||||||
|
|
||||||
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
|
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
|
||||||
|
|
||||||
export type MessagingConnectedAccountDeletionCleanupJobData = {
|
export type MessagingConnectedAccountDeletionCleanupJobData = {
|
||||||
workspaceId: string;
|
workspaceId: string;
|
||||||
@ -27,14 +27,6 @@ export class MessagingConnectedAccountDeletionCleanupJob {
|
|||||||
async handle(
|
async handle(
|
||||||
data: MessagingConnectedAccountDeletionCleanupJobData,
|
data: MessagingConnectedAccountDeletionCleanupJobData,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
this.logger.log(
|
|
||||||
`Deleting connected account ${data.connectedAccountId} associated messaging data in workspace ${data.workspaceId}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.messageCleanerService.cleanWorkspaceThreads(data.workspaceId);
|
await this.messageCleanerService.cleanWorkspaceThreads(data.workspaceId);
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Deleted connected account ${data.connectedAccountId} associated messaging data in workspace ${data.workspaceId}`,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
import { Injectable, Logger } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
|
|
||||||
import { AxiosResponse } from 'axios';
|
import { AxiosResponse } from 'axios';
|
||||||
import { gmail_v1 as gmailV1 } from 'googleapis';
|
import { gmail_v1 as gmailV1 } from 'googleapis';
|
||||||
@ -12,8 +12,6 @@ import { MessageWithParticipants } from 'src/modules/messaging/message-import-ma
|
|||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class GmailGetMessagesService {
|
export class GmailGetMessagesService {
|
||||||
private readonly logger = new Logger(GmailGetMessagesService.name);
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly fetchByBatchesService: GmailFetchByBatchService,
|
private readonly fetchByBatchesService: GmailFetchByBatchService,
|
||||||
private readonly gmailHandleErrorService: GmailHandleErrorService,
|
private readonly gmailHandleErrorService: GmailHandleErrorService,
|
||||||
@ -25,25 +23,13 @@ export class GmailGetMessagesService {
|
|||||||
ConnectedAccountWorkspaceEntity,
|
ConnectedAccountWorkspaceEntity,
|
||||||
'accessToken' | 'refreshToken' | 'id' | 'handle' | 'handleAliases'
|
'accessToken' | 'refreshToken' | 'id' | 'handle' | 'handleAliases'
|
||||||
>,
|
>,
|
||||||
workspaceId: string,
|
|
||||||
): Promise<MessageWithParticipants[]> {
|
): Promise<MessageWithParticipants[]> {
|
||||||
let startTime = Date.now();
|
|
||||||
|
|
||||||
const { messageIdsByBatch, batchResponses } =
|
const { messageIdsByBatch, batchResponses } =
|
||||||
await this.fetchByBatchesService.fetchAllByBatches(
|
await this.fetchByBatchesService.fetchAllByBatches(
|
||||||
messageIds,
|
messageIds,
|
||||||
connectedAccount.accessToken,
|
connectedAccount.accessToken,
|
||||||
'batch_gmail_messages',
|
'batch_gmail_messages',
|
||||||
);
|
);
|
||||||
let endTime = Date.now();
|
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} fetching ${
|
|
||||||
messageIds.length
|
|
||||||
} messages in ${endTime - startTime}ms`,
|
|
||||||
);
|
|
||||||
|
|
||||||
startTime = Date.now();
|
|
||||||
|
|
||||||
const messages = batchResponses.flatMap((response, index) => {
|
const messages = batchResponses.flatMap((response, index) => {
|
||||||
return this.formatBatchResponseAsMessage(
|
return this.formatBatchResponseAsMessage(
|
||||||
@ -53,14 +39,6 @@ export class GmailGetMessagesService {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
endTime = Date.now();
|
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} formatting ${
|
|
||||||
messageIds.length
|
|
||||||
} messages in ${endTime - startTime}ms`,
|
|
||||||
);
|
|
||||||
|
|
||||||
return messages;
|
return messages;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable, Logger } from '@nestjs/common';
|
||||||
|
|
||||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||||
import { MicrosoftImportDriverException } from 'src/modules/messaging/message-import-manager/drivers/microsoft/exceptions/microsoft-import-driver.exception';
|
import { MicrosoftImportDriverException } from 'src/modules/messaging/message-import-manager/drivers/microsoft/exceptions/microsoft-import-driver.exception';
|
||||||
@ -8,6 +8,7 @@ import { isMicrosoftClientTemporaryError } from 'src/modules/messaging/message-i
|
|||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class MicrosoftFetchByBatchService {
|
export class MicrosoftFetchByBatchService {
|
||||||
|
private readonly logger = new Logger(MicrosoftFetchByBatchService.name);
|
||||||
constructor(
|
constructor(
|
||||||
private readonly microsoftClientProvider: MicrosoftClientProvider,
|
private readonly microsoftClientProvider: MicrosoftClientProvider,
|
||||||
) {}
|
) {}
|
||||||
@ -56,8 +57,18 @@ export class MicrosoftFetchByBatchService {
|
|||||||
typeof error.body === 'string' &&
|
typeof error.body === 'string' &&
|
||||||
isMicrosoftClientTemporaryError(error.body)
|
isMicrosoftClientTemporaryError(error.body)
|
||||||
) {
|
) {
|
||||||
|
// TODO: remove this log once we catch better the error codes
|
||||||
|
this.logger.error(
|
||||||
|
`Error temporary (${error.code}) fetching messages for account ${connectedAccount.id.slice(0, 8)}`,
|
||||||
|
);
|
||||||
|
this.logger.log(error);
|
||||||
throw new MicrosoftImportDriverException(error.body, error.code, 429);
|
throw new MicrosoftImportDriverException(error.body, error.code, 429);
|
||||||
} else {
|
} else {
|
||||||
|
// TODO: remove this log once we catch better the error codes
|
||||||
|
this.logger.error(
|
||||||
|
`Error unknown (${error.code}) fetching messages for account ${connectedAccount.id.slice(0, 8)}`,
|
||||||
|
);
|
||||||
|
this.logger.log(error);
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -51,7 +51,6 @@ xdescribe('Microsoft dev tests : get messages service', () => {
|
|||||||
const result = await service.getMessages(
|
const result = await service.getMessages(
|
||||||
mockMessageIds,
|
mockMessageIds,
|
||||||
mockConnectedAccount,
|
mockConnectedAccount,
|
||||||
'workspace-1',
|
|
||||||
);
|
);
|
||||||
|
|
||||||
expect(result).toHaveLength(1);
|
expect(result).toHaveLength(1);
|
||||||
|
|||||||
@ -28,10 +28,7 @@ export class MicrosoftGetMessagesService {
|
|||||||
async getMessages(
|
async getMessages(
|
||||||
messageIds: string[],
|
messageIds: string[],
|
||||||
connectedAccount: ConnectedAccountType,
|
connectedAccount: ConnectedAccountType,
|
||||||
workspaceId: string,
|
|
||||||
): Promise<MessageWithParticipants[]> {
|
): Promise<MessageWithParticipants[]> {
|
||||||
const startTime = Date.now();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const { batchResponses } =
|
const { batchResponses } =
|
||||||
await this.microsoftFetchByBatchService.fetchAllByBatches(
|
await this.microsoftFetchByBatchService.fetchAllByBatches(
|
||||||
@ -44,14 +41,6 @@ export class MicrosoftGetMessagesService {
|
|||||||
connectedAccount,
|
connectedAccount,
|
||||||
);
|
);
|
||||||
|
|
||||||
const endTime = Date.now();
|
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Messaging import for workspace ${workspaceId} and account ${
|
|
||||||
connectedAccount.id
|
|
||||||
} fetched ${messages.length} messages in ${endTime - startTime}ms`,
|
|
||||||
);
|
|
||||||
|
|
||||||
return messages;
|
return messages;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.microsoftHandleErrorService.handleMicrosoftMessageFetchError(error);
|
this.microsoftHandleErrorService.handleMicrosoftMessageFetchError(error);
|
||||||
|
|||||||
@ -1,3 +1,6 @@
|
|||||||
export const isMicrosoftClientTemporaryError = (body: string): boolean => {
|
export const isMicrosoftClientTemporaryError = (body: string): boolean => {
|
||||||
return body.includes('Unexpected token < in JSON at position');
|
return (
|
||||||
|
body.includes('Unexpected token < in JSON at position') ||
|
||||||
|
body.includes('ApplicationThrottled 429 error')
|
||||||
|
);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -1,5 +1,3 @@
|
|||||||
import { Logger } from '@nestjs/common';
|
|
||||||
|
|
||||||
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
|
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
|
||||||
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
|
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
|
||||||
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
|
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
|
||||||
@ -14,8 +12,6 @@ export type MessagingCleanCacheJobData = {
|
|||||||
|
|
||||||
@Processor(MessageQueue.messagingQueue)
|
@Processor(MessageQueue.messagingQueue)
|
||||||
export class MessagingCleanCacheJob {
|
export class MessagingCleanCacheJob {
|
||||||
private readonly logger = new Logger(MessagingCleanCacheJob.name);
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
@InjectCacheStorage(CacheStorageNamespace.ModuleMessaging)
|
@InjectCacheStorage(CacheStorageNamespace.ModuleMessaging)
|
||||||
private readonly cacheStorage: CacheStorageService,
|
private readonly cacheStorage: CacheStorageService,
|
||||||
@ -23,16 +19,8 @@ export class MessagingCleanCacheJob {
|
|||||||
|
|
||||||
@Process(MessagingCleanCacheJob.name)
|
@Process(MessagingCleanCacheJob.name)
|
||||||
async handle(data: MessagingCleanCacheJobData): Promise<void> {
|
async handle(data: MessagingCleanCacheJobData): Promise<void> {
|
||||||
this.logger.log(
|
|
||||||
`Deleting message channel ${data.messageChannelId} associated cache in workspace ${data.workspaceId}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.cacheStorage.del(
|
await this.cacheStorage.del(
|
||||||
`messages-to-import:${data.workspaceId}:${data.messageChannelId}`,
|
`messages-to-import:${data.workspaceId}:${data.messageChannelId}`,
|
||||||
);
|
);
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Deleted message channel ${data.messageChannelId} associated cache in workspace ${data.workspaceId}`,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
import { Logger, Scope } from '@nestjs/common';
|
import { Scope } from '@nestjs/common';
|
||||||
|
|
||||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||||
@ -31,8 +31,6 @@ export type MessagingMessageListFetchJobData = {
|
|||||||
scope: Scope.REQUEST,
|
scope: Scope.REQUEST,
|
||||||
})
|
})
|
||||||
export class MessagingMessageListFetchJob {
|
export class MessagingMessageListFetchJob {
|
||||||
private readonly logger = new Logger(MessagingMessageListFetchJob.name);
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly messagingFullMessageListFetchService: MessagingFullMessageListFetchService,
|
private readonly messagingFullMessageListFetchService: MessagingFullMessageListFetchService,
|
||||||
private readonly messagingPartialMessageListFetchService: MessagingPartialMessageListFetchService,
|
private readonly messagingPartialMessageListFetchService: MessagingPartialMessageListFetchService,
|
||||||
@ -117,10 +115,6 @@ export class MessagingMessageListFetchJob {
|
|||||||
|
|
||||||
switch (messageChannel.syncStage) {
|
switch (messageChannel.syncStage) {
|
||||||
case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
|
case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
|
||||||
this.logger.log(
|
|
||||||
`Fetching partial message list for workspace ${workspaceId} and messageChannelId ${messageChannel.id}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.messagingMonitoringService.track({
|
await this.messagingMonitoringService.track({
|
||||||
eventName: 'partial_message_list_fetch.started',
|
eventName: 'partial_message_list_fetch.started',
|
||||||
workspaceId,
|
workspaceId,
|
||||||
@ -144,10 +138,6 @@ export class MessagingMessageListFetchJob {
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING:
|
case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING:
|
||||||
this.logger.log(
|
|
||||||
`Fetching full message list for workspace ${workspaceId} and account ${messageChannel.connectedAccount.id}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.messagingMonitoringService.track({
|
await this.messagingMonitoringService.track({
|
||||||
eventName: 'full_message_list_fetch.started',
|
eventName: 'full_message_list_fetch.started',
|
||||||
workspaceId,
|
workspaceId,
|
||||||
|
|||||||
@ -31,20 +31,17 @@ export class MessagingGetMessagesService {
|
|||||||
| 'handle'
|
| 'handle'
|
||||||
| 'handleAliases'
|
| 'handleAliases'
|
||||||
>,
|
>,
|
||||||
workspaceId: string,
|
|
||||||
): Promise<GetMessagesResponse> {
|
): Promise<GetMessagesResponse> {
|
||||||
switch (connectedAccount.provider) {
|
switch (connectedAccount.provider) {
|
||||||
case ConnectedAccountProvider.GOOGLE:
|
case ConnectedAccountProvider.GOOGLE:
|
||||||
return this.gmailGetMessagesService.getMessages(
|
return this.gmailGetMessagesService.getMessages(
|
||||||
messageIds,
|
messageIds,
|
||||||
connectedAccount,
|
connectedAccount,
|
||||||
workspaceId,
|
|
||||||
);
|
);
|
||||||
case ConnectedAccountProvider.MICROSOFT:
|
case ConnectedAccountProvider.MICROSOFT:
|
||||||
return this.microsoftGetMessagesService.getMessages(
|
return this.microsoftGetMessagesService.getMessages(
|
||||||
messageIds,
|
messageIds,
|
||||||
connectedAccount,
|
connectedAccount,
|
||||||
workspaceId,
|
|
||||||
);
|
);
|
||||||
default:
|
default:
|
||||||
throw new MessageImportException(
|
throw new MessageImportException(
|
||||||
|
|||||||
@ -214,7 +214,6 @@ describe('MessagingMessagesImportService', () => {
|
|||||||
expect(messagingGetMessagesService.getMessages).toHaveBeenCalledWith(
|
expect(messagingGetMessagesService.getMessages).toHaveBeenCalledWith(
|
||||||
['message-id-1', 'message-id-2'],
|
['message-id-1', 'message-id-2'],
|
||||||
mockConnectedAccount,
|
mockConnectedAccount,
|
||||||
workspaceId,
|
|
||||||
);
|
);
|
||||||
expect(
|
expect(
|
||||||
saveMessagesService.saveMessagesAndEnqueueContactCreation,
|
saveMessagesService.saveMessagesAndEnqueueContactCreation,
|
||||||
|
|||||||
@ -68,10 +68,6 @@ export class MessagingMessagesImportService {
|
|||||||
messageChannelId: messageChannel.id,
|
messageChannelId: messageChannel.id,
|
||||||
});
|
});
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`,
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.messageChannelSyncStatusService.markAsMessagesImportOngoing([
|
await this.messageChannelSyncStatusService.markAsMessagesImportOngoing([
|
||||||
messageChannel.id,
|
messageChannel.id,
|
||||||
]);
|
]);
|
||||||
@ -103,6 +99,10 @@ export class MessagingMessagesImportService {
|
|||||||
message: error.message,
|
message: error.message,
|
||||||
};
|
};
|
||||||
default:
|
default:
|
||||||
|
this.logger.error(
|
||||||
|
`Error (${error.code}) refreshing access token for account ${connectedAccount.id}`,
|
||||||
|
);
|
||||||
|
this.logger.log(error);
|
||||||
throw error;
|
throw error;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -130,7 +130,6 @@ export class MessagingMessagesImportService {
|
|||||||
const allMessages = await this.messagingGetMessagesService.getMessages(
|
const allMessages = await this.messagingGetMessagesService.getMessages(
|
||||||
messageIdsToFetch,
|
messageIdsToFetch,
|
||||||
connectedAccount,
|
connectedAccount,
|
||||||
workspaceId,
|
|
||||||
);
|
);
|
||||||
|
|
||||||
const blocklist = await this.blocklistRepository.getByWorkspaceMemberId(
|
const blocklist = await this.blocklistRepository.getByWorkspaceMemberId(
|
||||||
@ -184,6 +183,10 @@ export class MessagingMessagesImportService {
|
|||||||
workspaceId,
|
workspaceId,
|
||||||
);
|
);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
// TODO: remove this log once we catch better the error codes
|
||||||
|
this.logger.error(
|
||||||
|
`Error (${error.code}) importing messages for workspace ${workspaceId.slice(0, 8)} and account ${connectedAccount.id.slice(0, 8)}: ${error.message} - ${error.body}`,
|
||||||
|
);
|
||||||
await this.cacheStorage.setAdd(
|
await this.cacheStorage.setAdd(
|
||||||
`messages-to-import:${workspaceId}:${messageChannel.id}`,
|
`messages-to-import:${workspaceId}:${messageChannel.id}`,
|
||||||
messageIdsToFetch,
|
messageIdsToFetch,
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
import { Injectable, Logger } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
|
|
||||||
import { In } from 'typeorm';
|
import { In } from 'typeorm';
|
||||||
|
|
||||||
@ -20,10 +20,6 @@ import {
|
|||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class MessagingPartialMessageListFetchService {
|
export class MessagingPartialMessageListFetchService {
|
||||||
private readonly logger = new Logger(
|
|
||||||
MessagingPartialMessageListFetchService.name,
|
|
||||||
);
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
@InjectCacheStorage(CacheStorageNamespace.ModuleMessaging)
|
@InjectCacheStorage(CacheStorageNamespace.ModuleMessaging)
|
||||||
private readonly cacheStorage: CacheStorageService,
|
private readonly cacheStorage: CacheStorageService,
|
||||||
@ -79,9 +75,6 @@ export class MessagingPartialMessageListFetchService {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (isPartialImportFinished) {
|
if (isPartialImportFinished) {
|
||||||
this.logger.log(
|
|
||||||
`Partial message list import done on message channel ${messageChannel.id} in folder ${folderId} for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
|
||||||
);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,10 +83,6 @@ export class MessagingPartialMessageListFetchService {
|
|||||||
messageExternalIds,
|
messageExternalIds,
|
||||||
);
|
);
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Added ${messageExternalIds.length} messages to import for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
const messageChannelMessageAssociationRepository =
|
const messageChannelMessageAssociationRepository =
|
||||||
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
|
await this.twentyORMManager.getRepository<MessageChannelMessageAssociationWorkspaceEntity>(
|
||||||
'messageChannelMessageAssociation',
|
'messageChannelMessageAssociation',
|
||||||
@ -110,10 +99,6 @@ export class MessagingPartialMessageListFetchService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.logger.log(
|
|
||||||
`Deleted ${messageExternalIdsToDelete.length} messages for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.messagingCursorService.updateCursor(
|
await this.messagingCursorService.updateCursor(
|
||||||
messageChannel,
|
messageChannel,
|
||||||
nextSyncCursor,
|
nextSyncCursor,
|
||||||
@ -130,10 +115,6 @@ export class MessagingPartialMessageListFetchService {
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (isPartialImportFinishedForAllFolders) {
|
if (isPartialImportFinishedForAllFolders) {
|
||||||
this.logger.log(
|
|
||||||
`Partial message list import done on message channel ${messageChannel.id} entirely for workspace ${workspaceId} and account ${connectedAccount.id}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch(
|
||||||
[messageChannel.id],
|
[messageChannel.id],
|
||||||
);
|
);
|
||||||
|
|||||||
@ -38,8 +38,6 @@ export class MessagingMessageChannelSyncStatusMonitoringCronJob {
|
|||||||
MESSAGING_MESSAGE_CHANNEL_SYNC_STATUS_MONITORING_CRON_PATTERN,
|
MESSAGING_MESSAGE_CHANNEL_SYNC_STATUS_MONITORING_CRON_PATTERN,
|
||||||
)
|
)
|
||||||
async handle(): Promise<void> {
|
async handle(): Promise<void> {
|
||||||
this.logger.log('Starting message channel sync status monitoring...');
|
|
||||||
|
|
||||||
await this.messagingMonitoringService.track({
|
await this.messagingMonitoringService.track({
|
||||||
eventName: 'message_channel.monitoring.sync_status.start',
|
eventName: 'message_channel.monitoring.sync_status.start',
|
||||||
message: 'Starting message channel sync status monitoring',
|
message: 'Starting message channel sync status monitoring',
|
||||||
|
|||||||
Reference in New Issue
Block a user