5620 implement throttle logic for message and calendar sync (#5718)
Closes #5620 and improve messages filters
This commit is contained in:
@ -22,8 +22,8 @@ import TabItem from '@theme/TabItem';
|
|||||||
Twenty offers integrations with Gmail and Google Calendar. To enable these features, you need to connect to register the following recurring jobs:
|
Twenty offers integrations with Gmail and Google Calendar. To enable these features, you need to connect to register the following recurring jobs:
|
||||||
```
|
```
|
||||||
# from your worker container
|
# from your worker container
|
||||||
yarn command:prod cron:messaging:gmail-messages-import
|
yarn command:prod cron:messaging:messages-import
|
||||||
yarn command:prod cron:messaging:gmail-message-list-fetch
|
yarn command:prod cron:messaging:message-list-fetch
|
||||||
```
|
```
|
||||||
|
|
||||||
# Setup Environment Variables
|
# Setup Environment Variables
|
||||||
|
|||||||
@ -68,7 +68,7 @@ export class GoogleAPIRefreshAccessTokenService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
await this.messagingTelemetryService.track({
|
await this.messagingTelemetryService.track({
|
||||||
eventName: `refresh-token.error.insufficient_permissions`,
|
eventName: `refresh_token.error.insufficient_permissions`,
|
||||||
workspaceId,
|
workspaceId,
|
||||||
connectedAccountId: messageChannel.connectedAccountId,
|
connectedAccountId: messageChannel.connectedAccountId,
|
||||||
messageChannelId: messageChannel.id,
|
messageChannelId: messageChannel.id,
|
||||||
|
|||||||
@ -0,0 +1 @@
|
|||||||
|
export const MESSAGING_THROTTLE_DURATION = 1000 * 60 * 1; // 1 minute
|
||||||
@ -0,0 +1 @@
|
|||||||
|
export const MESSAGING_THROTTLE_MAX_ATTEMPTS = 4;
|
||||||
@ -240,4 +240,39 @@ export class MessageChannelRepository {
|
|||||||
transactionManager,
|
transactionManager,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public async updateThrottlePauseUntilAndIncrementThrottleFailureCount(
|
||||||
|
id: string,
|
||||||
|
throttleDurationMs: number,
|
||||||
|
workspaceId: string,
|
||||||
|
transactionManager?: EntityManager,
|
||||||
|
) {
|
||||||
|
const dataSourceSchema =
|
||||||
|
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||||
|
|
||||||
|
await this.workspaceDataSourceService.executeRawQuery(
|
||||||
|
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NOW() + ($1 || ' milliseconds')::interval, "throttleFailureCount" = "throttleFailureCount" + 1
|
||||||
|
WHERE "id" = $2`,
|
||||||
|
[throttleDurationMs, id],
|
||||||
|
workspaceId,
|
||||||
|
transactionManager,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async resetThrottlePauseUntilAndThrottleFailureCount(
|
||||||
|
id: string,
|
||||||
|
workspaceId: string,
|
||||||
|
transactionManager?: EntityManager,
|
||||||
|
) {
|
||||||
|
const dataSourceSchema =
|
||||||
|
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||||
|
|
||||||
|
await this.workspaceDataSourceService.executeRawQuery(
|
||||||
|
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NULL, "throttleFailureCount" = 0
|
||||||
|
WHERE "id" = $1`,
|
||||||
|
[id],
|
||||||
|
workspaceId,
|
||||||
|
transactionManager,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -9,6 +9,9 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s
|
|||||||
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
|
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
|
||||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||||
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
|
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
|
||||||
|
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
|
||||||
|
import { MESSAGING_THROTTLE_DURATION } from 'src/modules/messaging/common/constants/messaging-throttle-duration';
|
||||||
|
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/common/constants/messaging-throttle-max-attempts';
|
||||||
|
|
||||||
type SyncStep =
|
type SyncStep =
|
||||||
| 'partial-message-list-fetch'
|
| 'partial-message-list-fetch'
|
||||||
@ -27,6 +30,8 @@ export class MessagingErrorHandlingService {
|
|||||||
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
||||||
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
|
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
|
||||||
private readonly messagingTelemetryService: MessagingTelemetryService,
|
private readonly messagingTelemetryService: MessagingTelemetryService,
|
||||||
|
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
|
||||||
|
private readonly messageChannelRepository: MessageChannelRepository,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
public async handleGmailError(
|
public async handleGmailError(
|
||||||
@ -100,7 +105,7 @@ export class MessagingErrorHandlingService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async handleRateLimitExceeded(
|
private async handleRateLimitExceeded(
|
||||||
error: GmailError,
|
error: GmailError,
|
||||||
syncStep: SyncStep,
|
syncStep: SyncStep,
|
||||||
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
|
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
|
||||||
@ -114,6 +119,19 @@ export class MessagingErrorHandlingService {
|
|||||||
message: `${error.code}: ${error.reason}`,
|
message: `${error.code}: ${error.reason}`,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (
|
||||||
|
messageChannel.throttleFailureCount >= MESSAGING_THROTTLE_MAX_ATTEMPTS
|
||||||
|
) {
|
||||||
|
await this.messagingChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
||||||
|
messageChannel.id,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.throttle(messageChannel, workspaceId);
|
||||||
|
|
||||||
switch (syncStep) {
|
switch (syncStep) {
|
||||||
case 'full-message-list-fetch':
|
case 'full-message-list-fetch':
|
||||||
await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch(
|
await this.messagingChannelSyncStatusService.scheduleFullMessageListFetch(
|
||||||
@ -141,7 +159,7 @@ export class MessagingErrorHandlingService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async handleInsufficientPermissions(
|
private async handleInsufficientPermissions(
|
||||||
error: GmailError,
|
error: GmailError,
|
||||||
syncStep: SyncStep,
|
syncStep: SyncStep,
|
||||||
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
|
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
|
||||||
@ -166,7 +184,7 @@ export class MessagingErrorHandlingService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async handleNotFound(
|
private async handleNotFound(
|
||||||
error: GmailError,
|
error: GmailError,
|
||||||
syncStep: SyncStep,
|
syncStep: SyncStep,
|
||||||
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
|
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
|
||||||
@ -189,4 +207,27 @@ export class MessagingErrorHandlingService {
|
|||||||
workspaceId,
|
workspaceId,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async throttle(
|
||||||
|
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
|
||||||
|
workspaceId: string,
|
||||||
|
): Promise<void> {
|
||||||
|
const throttleDuration =
|
||||||
|
MESSAGING_THROTTLE_DURATION *
|
||||||
|
Math.pow(2, messageChannel.throttleFailureCount);
|
||||||
|
|
||||||
|
await this.messageChannelRepository.updateThrottlePauseUntilAndIncrementThrottleFailureCount(
|
||||||
|
messageChannel.id,
|
||||||
|
throttleDuration,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
|
||||||
|
await this.messagingTelemetryService.track({
|
||||||
|
eventName: 'message_channel.throttle',
|
||||||
|
workspaceId,
|
||||||
|
connectedAccountId: messageChannel.connectedAccountId,
|
||||||
|
messageChannelId: messageChannel.id,
|
||||||
|
message: `Throttling for ${throttleDuration}ms`,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,4 +2,5 @@ export const MESSAGING_GMAIL_EXCLUDED_CATEGORIES = [
|
|||||||
'promotions',
|
'promotions',
|
||||||
'social',
|
'social',
|
||||||
'forums',
|
'forums',
|
||||||
|
'updates',
|
||||||
];
|
];
|
||||||
|
|||||||
@ -77,6 +77,11 @@ export class MessagingGmailFullMessageListFetchService {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
|
||||||
|
messageChannel.id,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
|
||||||
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
|
await this.messagingChannelSyncStatusService.scheduleMessagesImport(
|
||||||
messageChannel.id,
|
messageChannel.id,
|
||||||
workspaceId,
|
workspaceId,
|
||||||
|
|||||||
@ -21,6 +21,7 @@ import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messa
|
|||||||
import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service';
|
import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-fetch-messages-by-batches.service';
|
||||||
import { MessagingErrorHandlingService } from 'src/modules/messaging/common/services/messaging-error-handling.service';
|
import { MessagingErrorHandlingService } from 'src/modules/messaging/common/services/messaging-error-handling.service';
|
||||||
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service';
|
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service';
|
||||||
|
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class MessagingGmailMessagesImportService {
|
export class MessagingGmailMessagesImportService {
|
||||||
@ -39,6 +40,8 @@ export class MessagingGmailMessagesImportService {
|
|||||||
private readonly messagingTelemetryService: MessagingTelemetryService,
|
private readonly messagingTelemetryService: MessagingTelemetryService,
|
||||||
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
|
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
|
||||||
private readonly blocklistRepository: BlocklistRepository,
|
private readonly blocklistRepository: BlocklistRepository,
|
||||||
|
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
|
||||||
|
private readonly messageChannelRepository: MessageChannelRepository,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async processMessageBatchImport(
|
async processMessageBatchImport(
|
||||||
@ -134,6 +137,11 @@ export class MessagingGmailMessagesImportService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
|
||||||
|
messageChannel.id,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
|
||||||
return await this.trackMessageImportCompleted(
|
return await this.trackMessageImportCompleted(
|
||||||
messageChannel,
|
messageChannel,
|
||||||
workspaceId,
|
workspaceId,
|
||||||
|
|||||||
@ -74,6 +74,11 @@ export class MessagingGmailPartialMessageListFetchService {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
|
||||||
|
messageChannel.id,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
|
||||||
if (!historyId) {
|
if (!historyId) {
|
||||||
throw new Error(
|
throw new Error(
|
||||||
`No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`,
|
`No historyId found for ${connectedAccount.id} in workspace ${workspaceId} in gmail history response.`,
|
||||||
|
|||||||
@ -75,6 +75,13 @@ export class MessagingMessageListFetchJob
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
messageChannel.throttlePauseUntil &&
|
||||||
|
messageChannel.throttlePauseUntil > new Date()
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
switch (messageChannel.syncSubStatus) {
|
switch (messageChannel.syncSubStatus) {
|
||||||
case MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
|
case MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
|
||||||
this.logger.log(
|
this.logger.log(
|
||||||
@ -85,6 +92,7 @@ export class MessagingMessageListFetchJob
|
|||||||
eventName: 'partial_message_list_fetch.started',
|
eventName: 'partial_message_list_fetch.started',
|
||||||
workspaceId,
|
workspaceId,
|
||||||
connectedAccountId,
|
connectedAccountId,
|
||||||
|
messageChannelId: messageChannel.id,
|
||||||
});
|
});
|
||||||
|
|
||||||
await this.gmailPartialMessageListFetchV2Service.processMessageListFetch(
|
await this.gmailPartialMessageListFetchV2Service.processMessageListFetch(
|
||||||
@ -111,6 +119,7 @@ export class MessagingMessageListFetchJob
|
|||||||
eventName: 'full_message_list_fetch.started',
|
eventName: 'full_message_list_fetch.started',
|
||||||
workspaceId,
|
workspaceId,
|
||||||
connectedAccountId,
|
connectedAccountId,
|
||||||
|
messageChannelId: messageChannel.id,
|
||||||
});
|
});
|
||||||
|
|
||||||
await this.gmailFullMessageListFetchService.processMessageListFetch(
|
await this.gmailFullMessageListFetchService.processMessageListFetch(
|
||||||
|
|||||||
@ -45,6 +45,13 @@ export class MessagingMessagesImportJob
|
|||||||
messageChannelId: messageChannel.id,
|
messageChannelId: messageChannel.id,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (
|
||||||
|
messageChannel.throttlePauseUntil &&
|
||||||
|
messageChannel.throttlePauseUntil > new Date()
|
||||||
|
) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
const connectedAccount =
|
const connectedAccount =
|
||||||
await this.connectedAccountRepository.getConnectedAccountOrThrow(
|
await this.connectedAccountRepository.getConnectedAccountOrThrow(
|
||||||
workspaceId,
|
workspaceId,
|
||||||
|
|||||||
@ -38,7 +38,7 @@ const filterOutIcsAttachments = (messages: GmailMessage[]) => {
|
|||||||
|
|
||||||
const isPersonEmail = (email: string): boolean => {
|
const isPersonEmail = (email: string): boolean => {
|
||||||
const nonPersonalPattern =
|
const nonPersonalPattern =
|
||||||
/noreply|no-reply|do_not_reply|no\.reply|^(info@|contact@|hello@|support@|feedback@|service@|help@|invites@|invite@|welcome@|alerts@|team@)/;
|
/noreply|no-reply|do_not_reply|no\.reply|^(info@|contact@|hello@|support@|feedback@|service@|help@|invites@|invite@|welcome@|alerts@|team@|notifications@|notification@|news@)/;
|
||||||
|
|
||||||
return !nonPersonalPattern.test(email);
|
return !nonPersonalPattern.test(email);
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user