Possiblity to reconnect on Failed unkown (#11576)
Possiblity to reconnect you account on Failed Unkown errors. This PR triggers a similar flow than the Failed errors for unsufficient permission already existing in twenty. Allows our users to force the synchro again even though they have a strange error. It's been asked by some customers since we have had a couple of issues in messaging lately Fixes https://github.com/twentyhq/twenty/issues/11411
This commit is contained in:
@ -174,9 +174,12 @@ export class MessageChannelSyncStatusService {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public async markAsFailedUnknownAndFlushMessagesToImport(
|
public async markAsFailedAndFlushMessagesToImport(
|
||||||
messageChannelIds: string[],
|
messageChannelIds: string[],
|
||||||
workspaceId: string,
|
workspaceId: string,
|
||||||
|
syncStatus:
|
||||||
|
| MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS
|
||||||
|
| MessageChannelSyncStatus.FAILED_UNKNOWN,
|
||||||
) {
|
) {
|
||||||
if (!messageChannelIds.length) {
|
if (!messageChannelIds.length) {
|
||||||
return;
|
return;
|
||||||
@ -195,41 +198,16 @@ export class MessageChannelSyncStatusService {
|
|||||||
|
|
||||||
await messageChannelRepository.update(messageChannelIds, {
|
await messageChannelRepository.update(messageChannelIds, {
|
||||||
syncStage: MessageChannelSyncStage.FAILED,
|
syncStage: MessageChannelSyncStage.FAILED,
|
||||||
syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN,
|
syncStatus: syncStatus,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
const metricsKey =
|
||||||
|
syncStatus === MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS
|
||||||
|
? MetricsKeys.MessageChannelSyncJobFailedInsufficientPermissions
|
||||||
|
: MetricsKeys.MessageChannelSyncJobFailedUnknown;
|
||||||
|
|
||||||
await this.metricsService.batchIncrementCounter({
|
await this.metricsService.batchIncrementCounter({
|
||||||
key: MetricsKeys.MessageChannelSyncJobFailedUnknown,
|
key: metricsKey,
|
||||||
eventIds: messageChannelIds,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
public async markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
|
|
||||||
messageChannelIds: string[],
|
|
||||||
workspaceId: string,
|
|
||||||
) {
|
|
||||||
if (!messageChannelIds.length) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const messageChannelId of messageChannelIds) {
|
|
||||||
await this.cacheStorage.del(
|
|
||||||
`messages-to-import:${workspaceId}:${messageChannelId}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
const messageChannelRepository =
|
|
||||||
await this.twentyORMManager.getRepository<MessageChannelWorkspaceEntity>(
|
|
||||||
'messageChannel',
|
|
||||||
);
|
|
||||||
|
|
||||||
await messageChannelRepository.update(messageChannelIds, {
|
|
||||||
syncStage: MessageChannelSyncStage.FAILED,
|
|
||||||
syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
|
|
||||||
});
|
|
||||||
|
|
||||||
await this.metricsService.batchIncrementCounter({
|
|
||||||
key: MetricsKeys.MessageChannelSyncJobFailedInsufficientPermissions,
|
|
||||||
eventIds: messageChannelIds,
|
eventIds: messageChannelIds,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@ -3,7 +3,10 @@ import { Injectable } from '@nestjs/common';
|
|||||||
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
|
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
|
||||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||||
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
|
||||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
import {
|
||||||
|
MessageChannelSyncStatus,
|
||||||
|
MessageChannelWorkspaceEntity,
|
||||||
|
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
||||||
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts';
|
import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/message-import-manager/constants/messaging-throttle-max-attempts';
|
||||||
import {
|
import {
|
||||||
MessageImportDriverException,
|
MessageImportDriverException,
|
||||||
@ -90,9 +93,10 @@ export class MessageImportExceptionHandlerService {
|
|||||||
if (
|
if (
|
||||||
messageChannel.throttleFailureCount >= MESSAGING_THROTTLE_MAX_ATTEMPTS
|
messageChannel.throttleFailureCount >= MESSAGING_THROTTLE_MAX_ATTEMPTS
|
||||||
) {
|
) {
|
||||||
await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
await this.messageChannelSyncStatusService.markAsFailedAndFlushMessagesToImport(
|
||||||
[messageChannel.id],
|
[messageChannel.id],
|
||||||
workspaceId,
|
workspaceId,
|
||||||
|
MessageChannelSyncStatus.FAILED_UNKNOWN,
|
||||||
);
|
);
|
||||||
throw new MessageImportException(
|
throw new MessageImportException(
|
||||||
`Unknown temporary error occurred multiple times while importing messages for message channel ${messageChannel.id} in workspace ${workspaceId}`,
|
`Unknown temporary error occurred multiple times while importing messages for message channel ${messageChannel.id} in workspace ${workspaceId}`,
|
||||||
@ -139,9 +143,10 @@ export class MessageImportExceptionHandlerService {
|
|||||||
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
||||||
workspaceId: string,
|
workspaceId: string,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
await this.messageChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
|
await this.messageChannelSyncStatusService.markAsFailedAndFlushMessagesToImport(
|
||||||
[messageChannel.id],
|
[messageChannel.id],
|
||||||
workspaceId,
|
workspaceId,
|
||||||
|
MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS,
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -150,9 +155,10 @@ export class MessageImportExceptionHandlerService {
|
|||||||
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
||||||
workspaceId: string,
|
workspaceId: string,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
await this.messageChannelSyncStatusService.markAsFailedAndFlushMessagesToImport(
|
||||||
[messageChannel.id],
|
[messageChannel.id],
|
||||||
workspaceId,
|
workspaceId,
|
||||||
|
MessageChannelSyncStatus.FAILED_UNKNOWN,
|
||||||
);
|
);
|
||||||
|
|
||||||
this.exceptionHandlerService.captureExceptions([
|
this.exceptionHandlerService.captureExceptions([
|
||||||
@ -170,9 +176,10 @@ export class MessageImportExceptionHandlerService {
|
|||||||
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
||||||
workspaceId: string,
|
workspaceId: string,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
await this.messageChannelSyncStatusService.markAsFailedAndFlushMessagesToImport(
|
||||||
[messageChannel.id],
|
[messageChannel.id],
|
||||||
workspaceId,
|
workspaceId,
|
||||||
|
MessageChannelSyncStatus.FAILED_UNKNOWN,
|
||||||
);
|
);
|
||||||
|
|
||||||
throw new MessageImportException(
|
throw new MessageImportException(
|
||||||
|
|||||||
Reference in New Issue
Block a user