microsoft sync failed (#10381)

This PR is supposed to solve an issue with the syncrhonisation of
messages, specifically with microsoft driver. Microsoft calls don't need
access_Token so refreshing toekns was not implemented.

However, microsoft rely on its client which calls its refresfh_token,
and I might have missed some underlying dependency from microsoft
impelemtation so I setup the access token process to refresh it

Needs a talk before to be merged

Fix : https://github.com/twentyhq/twenty/issues/10367

EDIT:
it was a problem with microsoft making refreshtoken expire (contrarily
to google) which needs to be handled.
This commit is contained in:
Guillim
2025-03-05 16:22:51 +01:00
committed by GitHub
parent d61f48d7ee
commit 55a45c50cc
18 changed files with 332 additions and 176 deletions

View File

@ -4,12 +4,20 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountRefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception';
import { ConnectedAccountRefreshTokensService } from 'src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service';
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageImportDriverExceptionCode } from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
import { MessageImportExceptionCode } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception';
import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service';
import {
MessageImportExceptionHandlerService,
MessageImportSyncStep,
} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service';
import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service';
@ -30,6 +38,8 @@ export class MessagingMessageListFetchJob {
private readonly messagingPartialMessageListFetchService: MessagingPartialMessageListFetchService,
private readonly messagingTelemetryService: MessagingTelemetryService,
private readonly twentyORMManager: TwentyORMManager,
private readonly connectedAccountRefreshTokensService: ConnectedAccountRefreshTokensService,
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
) {}
@Process(MessagingMessageListFetchJob.name)
@ -64,72 +74,112 @@ export class MessagingMessageListFetchJob {
return;
}
if (
isThrottled(
messageChannel.syncStageStartedAt,
messageChannel.throttleFailureCount,
)
) {
return;
}
try {
if (
isThrottled(
messageChannel.syncStageStartedAt,
messageChannel.throttleFailureCount,
)
) {
return;
}
switch (messageChannel.syncStage) {
case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
`Fetching partial message list for workspace ${workspaceId} and messageChannelId ${messageChannel.id}`,
);
try {
messageChannel.connectedAccount.accessToken =
await this.connectedAccountRefreshTokensService.refreshAndSaveTokens(
messageChannel.connectedAccount,
workspaceId,
);
} catch (error) {
switch (error.code) {
case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED:
case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND:
await this.messagingTelemetryService.track({
eventName: `refresh_token.error.insufficient_permissions`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `${error.code}: ${error.reason ?? ''}`,
});
throw {
code: MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
message: error.message,
};
case ConnectedAccountRefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED:
throw {
code: MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED,
message: error.message,
};
default:
throw error;
}
}
await this.messagingTelemetryService.track({
eventName: 'partial_message_list_fetch.started',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
switch (messageChannel.syncStage) {
case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
`Fetching partial message list for workspace ${workspaceId} and messageChannelId ${messageChannel.id}`,
);
await this.messagingPartialMessageListFetchService.processMessageListFetch(
messageChannel,
messageChannel.connectedAccount,
workspaceId,
);
await this.messagingTelemetryService.track({
eventName: 'partial_message_list_fetch.started',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
await this.messagingTelemetryService.track({
eventName: 'partial_message_list_fetch.completed',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
await this.messagingPartialMessageListFetchService.processMessageListFetch(
messageChannel,
messageChannel.connectedAccount,
workspaceId,
);
break;
await this.messagingTelemetryService.track({
eventName: 'partial_message_list_fetch.completed',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
`Fetching full message list for workspace ${workspaceId} and account ${messageChannel.connectedAccount.id}`,
);
break;
await this.messagingTelemetryService.track({
eventName: 'full_message_list_fetch.started',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING:
this.logger.log(
`Fetching full message list for workspace ${workspaceId} and account ${messageChannel.connectedAccount.id}`,
);
await this.messagingFullMessageListFetchService.processMessageListFetch(
messageChannel,
messageChannel.connectedAccount,
workspaceId,
);
await this.messagingTelemetryService.track({
eventName: 'full_message_list_fetch.started',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
await this.messagingTelemetryService.track({
eventName: 'full_message_list_fetch.completed',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
await this.messagingFullMessageListFetchService.processMessageListFetch(
messageChannel,
messageChannel.connectedAccount,
workspaceId,
);
break;
await this.messagingTelemetryService.track({
eventName: 'full_message_list_fetch.completed',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
default:
break;
break;
default:
break;
}
} catch (error) {
await this.messageImportErrorHandlerService.handleDriverException(
error,
MessageImportSyncStep.FULL_OR_PARTIAL_MESSAGE_LIST_FETCH,
messageChannel,
workspaceId,
);
}
}
}

View File

@ -7,7 +7,7 @@ import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { DataSourceEntity } from 'src/engine/metadata-modules/data-source/data-source.entity';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { EmailAliasManagerModule } from 'src/modules/connected-account/email-alias-manager/email-alias-manager.module';
import { RefreshAccessTokenManagerModule } from 'src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module';
import { RefreshTokensManagerModule } from 'src/modules/connected-account/refresh-tokens-manager/connected-account-refresh-tokens-manager.module';
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cleaner/messaging-message-cleaner.module';
import { MessagingSingleMessageImportCommand } from 'src/modules/messaging/message-import-manager/commands/messaging-single-message-import.command';
@ -25,11 +25,11 @@ import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-impo
import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job';
import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';
import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener';
import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service';
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service';
import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
import { MessagingMessageService } from 'src/modules/messaging/message-import-manager/services/messaging-message.service';
import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service';
import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service';
@ -38,7 +38,7 @@ import { MessageParticipantManagerModule } from 'src/modules/messaging/message-p
import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/messaging-monitoring.module';
@Module({
imports: [
RefreshAccessTokenManagerModule,
RefreshTokensManagerModule,
WorkspaceDataSourceModule,
MessagingGmailDriverModule,
MessagingMicrosoftDriverModule,

View File

@ -11,12 +11,12 @@ import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/se
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
import {
MessageImportExceptionHandlerService,
MessageImportSyncStep,
} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
@Injectable()
export class MessagingFullMessageListFetchService {
constructor(

View File

@ -16,6 +16,7 @@ import {
export enum MessageImportSyncStep {
FULL_MESSAGE_LIST_FETCH = 'FULL_MESSAGE_LIST_FETCH',
PARTIAL_MESSAGE_LIST_FETCH = 'PARTIAL_MESSAGE_LIST_FETCH',
FULL_OR_PARTIAL_MESSAGE_LIST_FETCH = 'FULL_OR_PARTIAL_MESSAGE_LIST_FETCH',
MESSAGES_IMPORT = 'MESSAGES_IMPORT',
}

View File

@ -8,8 +8,8 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { EmailAliasManagerService } from 'src/modules/connected-account/email-alias-manager/services/email-alias-manager.service';
import { RefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-access-token-manager/exceptions/refresh-access-token.exception';
import { RefreshAccessTokenService } from 'src/modules/connected-account/refresh-access-token-manager/services/refresh-access-token.service';
import { ConnectedAccountRefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception';
import { ConnectedAccountRefreshTokensService } from 'src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
import {
@ -19,11 +19,11 @@ import {
import { MessageImportDriverExceptionCode } from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant';
import { MessageImportExceptionCode } from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception';
import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service';
import {
MessageImportExceptionHandlerService,
MessageImportSyncStep,
} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service';
} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service';
import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util';
import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service';
@ -37,7 +37,7 @@ export class MessagingMessagesImportService {
private readonly cacheStorage: CacheStorageService,
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
private readonly saveMessagesAndEnqueueContactCreationService: MessagingSaveMessagesAndEnqueueContactCreationService,
private readonly refreshAccessTokenService: RefreshAccessTokenService,
private readonly connectedAccountRefreshTokensService: ConnectedAccountRefreshTokensService,
private readonly messagingTelemetryService: MessagingTelemetryService,
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
private readonly blocklistRepository: BlocklistRepository,
@ -79,14 +79,14 @@ export class MessagingMessagesImportService {
try {
connectedAccount.accessToken =
await this.refreshAccessTokenService.refreshAndSaveAccessToken(
await this.connectedAccountRefreshTokensService.refreshAndSaveTokens(
connectedAccount,
workspaceId,
);
} catch (error) {
switch (error.code) {
case RefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED:
case RefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND:
case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED:
case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND:
await this.messagingTelemetryService.track({
eventName: `refresh_token.error.insufficient_permissions`,
workspaceId,
@ -98,7 +98,7 @@ export class MessagingMessagesImportService {
code: MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
message: error.message,
};
case RefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED:
case ConnectedAccountRefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED:
throw {
code: MessageImportExceptionCode.PROVIDER_NOT_SUPPORTED,
message: error.message,

View File

@ -11,12 +11,12 @@ import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/se
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
import {
MessageImportExceptionHandlerService,
MessageImportSyncStep,
} from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
} from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
@Injectable()
export class MessagingPartialMessageListFetchService {