Refactor sync sub status and throttle (#5734)

- Rename syncSubStatus to syncStage
- Rename ongoingSyncStartedAt to syncStageStartedAt
- Remove throttlePauseUntil from db and compute it with
syncStageStartedAt and throttleFailureCount
This commit is contained in:
bosiraphael
2024-06-04 16:52:57 +02:00
committed by GitHub
parent ce1469cf0c
commit 234e062232
14 changed files with 148 additions and 110 deletions

View File

@ -15,7 +15,6 @@ import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is
import { WorkspaceIsNotAuditLogged } from 'src/engine/twenty-orm/decorators/workspace-is-not-audit-logged.decorator';
import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator';
import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator';
import { WorkspaceIsNullable } from 'src/engine/twenty-orm/decorators/workspace-is-nullable.decorator';
export enum CalendarChannelVisibility {
METADATA = 'METADATA',
@ -97,16 +96,6 @@ export class CalendarChannelWorkspaceEntity extends BaseWorkspaceEntity {
})
syncCursor: string;
@WorkspaceField({
standardId: CALENDAR_CHANNEL_STANDARD_FIELD_IDS.throttlePauseUntil,
type: FieldMetadataType.DATE_TIME,
label: 'Throttle Pause Until',
description: 'Throttle Pause Until',
icon: 'IconPlayerPause',
})
@WorkspaceIsNullable()
throttlePauseUntil: Date;
@WorkspaceField({
standardId: CALENDAR_CHANNEL_STANDARD_FIELD_IDS.throttleFailureCount,
type: FieldMetadataType.NUMBER,

View File

@ -7,7 +7,7 @@ import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metada
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncStatus,
MessageChannelSyncSubStatus,
MessageChannelSyncStage,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
@Injectable()
@ -51,7 +51,7 @@ export class MessageChannelRepository {
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = NULL, "syncCursor" = '', "ongoingSyncStartedAt" = NULL
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = NULL, "syncCursor" = '', "syncStageStartedAt" = NULL
WHERE "connectedAccountId" = $1`,
[connectedAccountId],
workspaceId,
@ -169,18 +169,11 @@ export class MessageChannelRepository {
this.workspaceDataSourceService.getSchemaName(workspaceId);
const needsToUpdateSyncedAt =
syncStatus === MessageChannelSyncStatus.SUCCEEDED;
const needsToUpdateOngoingSyncStartedAt =
syncStatus === MessageChannelSyncStatus.ONGOING;
syncStatus === MessageChannelSyncStatus.COMPLETED;
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = $1 ${
needsToUpdateSyncedAt ? `, "syncedAt" = NOW()` : ''
} ${
needsToUpdateOngoingSyncStartedAt
? `, "ongoingSyncStartedAt" = NOW()`
: `, "ongoingSyncStartedAt" = NULL`
} WHERE "id" = $2`,
[syncStatus, id],
workspaceId,
@ -188,9 +181,31 @@ export class MessageChannelRepository {
);
}
public async updateSyncSubStatus(
public async updateSyncStage(
id: string,
syncStage: MessageChannelSyncStage,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const needsToUpdateSyncStageStartedAt =
syncStage === MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING ||
syncStage === MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING;
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStage" = $1 ${
needsToUpdateSyncStageStartedAt ? `, "syncStageStartedAt" = NOW()` : ''
} WHERE "id" = $2`,
[syncStage, id],
workspaceId,
transactionManager,
);
}
public async resetSyncStageStartedAt(
id: string,
syncSubStatus: MessageChannelSyncSubStatus,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
@ -198,8 +213,8 @@ export class MessageChannelRepository {
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncSubStatus" = $1 WHERE "id" = $2`,
[syncSubStatus, id],
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStageStartedAt" = NULL WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
@ -241,9 +256,8 @@ export class MessageChannelRepository {
);
}
public async updateThrottlePauseUntilAndIncrementThrottleFailureCount(
public async incrementThrottleFailureCount(
id: string,
throttleDurationMs: number,
workspaceId: string,
transactionManager?: EntityManager,
) {
@ -251,15 +265,15 @@ export class MessageChannelRepository {
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NOW() + ($1 || ' milliseconds')::interval, "throttleFailureCount" = "throttleFailureCount" + 1
WHERE "id" = $2`,
[throttleDurationMs, id],
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = "throttleFailureCount" + 1
WHERE "id" = $1`,
[id],
workspaceId,
transactionManager,
);
}
public async resetThrottlePauseUntilAndThrottleFailureCount(
public async resetThrottleFailureCount(
id: string,
workspaceId: string,
transactionManager?: EntityManager,
@ -268,7 +282,7 @@ export class MessageChannelRepository {
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NULL, "throttleFailureCount" = 0
`UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = 0
WHERE "id" = $1`,
[id],
workspaceId,

View File

@ -7,7 +7,7 @@ import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repos
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncSubStatus,
MessageChannelSyncStage,
MessageChannelSyncStatus,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
@ -24,9 +24,9 @@ export class MessagingChannelSyncStatusService {
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
workspaceId,
);
}
@ -35,9 +35,9 @@ export class MessagingChannelSyncStatusService {
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
workspaceId,
);
}
@ -46,9 +46,9 @@ export class MessagingChannelSyncStatusService {
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING,
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
workspaceId,
);
}
@ -68,6 +68,16 @@ export class MessagingChannelSyncStatusService {
workspaceId,
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannelId,
workspaceId,
);
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannelId,
workspaceId,
);
await this.scheduleFullMessageListFetch(messageChannelId, workspaceId);
}
@ -75,9 +85,9 @@ export class MessagingChannelSyncStatusService {
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGE_LIST_FETCH_ONGOING,
MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
workspaceId,
);
@ -105,9 +115,9 @@ export class MessagingChannelSyncStatusService {
messageChannelId: string,
workspaceId: string,
) {
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING,
MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING,
workspaceId,
);
}
@ -120,9 +130,9 @@ export class MessagingChannelSyncStatusService {
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.FAILED,
MessageChannelSyncStage.FAILED,
workspaceId,
);
@ -141,9 +151,9 @@ export class MessagingChannelSyncStatusService {
`messages-to-import:${workspaceId}:gmail:${messageChannelId}`,
);
await this.messageChannelRepository.updateSyncSubStatus(
await this.messageChannelRepository.updateSyncStage(
messageChannelId,
MessageChannelSyncSubStatus.FAILED,
MessageChannelSyncStage.FAILED,
workspaceId,
);

View File

@ -10,7 +10,6 @@ import { MessagingTelemetryService } from 'src/modules/messaging/common/services
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MESSAGING_THROTTLE_DURATION } from 'src/modules/messaging/common/constants/messaging-throttle-duration';
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/common/constants/messaging-throttle-max-attempts';
type SyncStep =
@ -212,13 +211,8 @@ export class MessagingErrorHandlingService {
messageChannel: ObjectRecord<MessageChannelWorkspaceEntity>,
workspaceId: string,
): Promise<void> {
const throttleDuration =
MESSAGING_THROTTLE_DURATION *
Math.pow(2, messageChannel.throttleFailureCount);
await this.messageChannelRepository.updateThrottlePauseUntilAndIncrementThrottleFailureCount(
await this.messageChannelRepository.incrementThrottleFailureCount(
messageChannel.id,
throttleDuration,
workspaceId,
);
@ -227,7 +221,7 @@ export class MessagingErrorHandlingService {
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `Throttling for ${throttleDuration}ms`,
message: `Increment throttle failure count to ${messageChannel.throttleFailureCount}`,
});
}
}

View File

@ -31,7 +31,7 @@ export enum MessageChannelSyncStatus {
FAILED_UNKNOWN = 'FAILED_UNKNOWN',
}
export enum MessageChannelSyncSubStatus {
export enum MessageChannelSyncStage {
FULL_MESSAGE_LIST_FETCH_PENDING = 'FULL_MESSAGE_LIST_FETCH_PENDING',
PARTIAL_MESSAGE_LIST_FETCH_PENDING = 'PARTIAL_MESSAGE_LIST_FETCH_PENDING',
MESSAGE_LIST_FETCH_ONGOING = 'MESSAGE_LIST_FETCH_ONGOING',
@ -227,72 +227,62 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity {
syncStatus: MessageChannelSyncStatus;
@WorkspaceField({
standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.syncSubStatus,
standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.syncStage,
type: FieldMetadataType.SELECT,
label: 'Sync sub status',
description: 'Sync sub status',
label: 'Sync stage',
description: 'Sync stage',
icon: 'IconStatusChange',
options: [
{
value: MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING,
value: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
label: 'Full messages list fetch pending',
position: 0,
color: 'blue',
},
{
value: MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
value: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING,
label: 'Partial messages list fetch pending',
position: 1,
color: 'blue',
},
{
value: MessageChannelSyncSubStatus.MESSAGE_LIST_FETCH_ONGOING,
value: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING,
label: 'Messages list fetch ongoing',
position: 2,
color: 'orange',
},
{
value: MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING,
value: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING,
label: 'Messages import pending',
position: 3,
color: 'blue',
},
{
value: MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING,
value: MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING,
label: 'Messages import ongoing',
position: 4,
color: 'orange',
},
{
value: MessageChannelSyncSubStatus.FAILED,
value: MessageChannelSyncStage.FAILED,
label: 'Failed',
position: 5,
color: 'red',
},
],
defaultValue: `'${MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING}'`,
defaultValue: `'${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}'`,
})
syncSubStatus: MessageChannelSyncSubStatus;
syncStage: MessageChannelSyncStage;
@WorkspaceField({
standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.ongoingSyncStartedAt,
standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.syncStageStartedAt,
type: FieldMetadataType.DATE_TIME,
label: 'Ongoing sync started at',
description: 'Ongoing sync started at',
label: 'Sync stage started at',
description: 'Sync stage started at',
icon: 'IconHistory',
})
@WorkspaceIsNullable()
ongoingSyncStartedAt: string;
@WorkspaceField({
standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.throttlePauseUntil,
type: FieldMetadataType.DATE_TIME,
label: 'Throttle Pause Until',
description: 'Throttle Pause Until',
icon: 'IconPlayerPause',
})
@WorkspaceIsNullable()
throttlePauseUntil: Date;
syncStageStartedAt: string;
@WorkspaceField({
standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.throttleFailureCount,

View File

@ -77,7 +77,12 @@ export class MessagingGmailFullMessageListFetchService {
return;
}
await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannel.id,
workspaceId,
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannel.id,
workspaceId,
);

View File

@ -12,7 +12,7 @@ import { BlocklistRepository } from 'src/modules/connected-account/repositories/
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
import {
MessageChannelWorkspaceEntity,
MessageChannelSyncSubStatus,
MessageChannelSyncStage,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { createQueriesFromMessageIds } from 'src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util';
import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util';
@ -50,8 +50,8 @@ export class MessagingGmailMessagesImportService {
workspaceId: string,
) {
if (
messageChannel.syncSubStatus !==
MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING
messageChannel.syncStage !==
MessageChannelSyncStage.MESSAGES_IMPORT_PENDING
) {
return;
}
@ -137,7 +137,12 @@ export class MessagingGmailMessagesImportService {
);
}
await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannel.id,
workspaceId,
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannel.id,
workspaceId,
);

View File

@ -74,7 +74,12 @@ export class MessagingGmailPartialMessageListFetchService {
return;
}
await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount(
await this.messageChannelRepository.resetThrottleFailureCount(
messageChannel.id,
workspaceId,
);
await this.messageChannelRepository.resetSyncStageStartedAt(
messageChannel.id,
workspaceId,
);

View File

@ -0,0 +1,25 @@
import { MESSAGING_THROTTLE_DURATION } from 'src/modules/messaging/common/constants/messaging-throttle-duration';
export const isThrottled = (
syncStageStartedAt: string | null,
throttleFailureCount: number,
): boolean => {
if (!syncStageStartedAt) {
return false;
}
return (
computeThrottlePauseUntil(syncStageStartedAt, throttleFailureCount) >
new Date()
);
};
const computeThrottlePauseUntil = (
syncStageStartedAt: string,
throttleFailureCount: number,
): Date => {
return new Date(
new Date(syncStageStartedAt).getTime() +
MESSAGING_THROTTLE_DURATION * Math.pow(2, throttleFailureCount - 1),
);
};

View File

@ -8,11 +8,12 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
import {
MessageChannelSyncSubStatus,
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingGmailFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service';
import { MessagingGmailPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service';
import { isThrottled } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled';
export type MessagingMessageListFetchJobData = {
workspaceId: string;
@ -76,14 +77,16 @@ export class MessagingMessageListFetchJob
}
if (
messageChannel.throttlePauseUntil &&
messageChannel.throttlePauseUntil > new Date()
isThrottled(
messageChannel.syncStageStartedAt,
messageChannel.throttleFailureCount,
)
) {
return;
}
switch (messageChannel.syncSubStatus) {
case MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
switch (messageChannel.syncStage) {
case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
`Fetching partial message list for workspace ${workspaceId} and account ${connectedAccount.id}`,
);
@ -110,7 +113,7 @@ export class MessagingMessageListFetchJob
break;
case MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING:
case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
`Fetching full message list for workspace ${workspaceId} and account ${connectedAccount.id}`,
);

View File

@ -9,6 +9,7 @@ import { MessageChannelRepository } from 'src/modules/messaging/common/repositor
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingGmailMessagesImportService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service';
import { isThrottled } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled';
export type MessagingMessagesImportJobData = {
workspaceId: string;
@ -46,8 +47,10 @@ export class MessagingMessagesImportJob
});
if (
messageChannel.throttlePauseUntil &&
messageChannel.throttlePauseUntil > new Date()
isThrottled(
messageChannel.syncStageStartedAt,
messageChannel.throttleFailureCount,
)
) {
continue;
}