Refactor messaging refresh access token (#6034)
- Put error handling outside of `refreshAndSaveAccessToken` - return after failing to refresh access token in `processMessageBatchImport` - remove unnecessary token refresh in `processMessageListFetch`
This commit is contained in:
@ -6,10 +6,6 @@ import { EnvironmentService } from 'src/engine/integrations/environment/environm
|
|||||||
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
|
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
|
||||||
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
|
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
|
||||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||||
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
|
|
||||||
import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service';
|
|
||||||
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
|
|
||||||
import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service';
|
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class GoogleAPIRefreshAccessTokenService {
|
export class GoogleAPIRefreshAccessTokenService {
|
||||||
@ -17,16 +13,12 @@ export class GoogleAPIRefreshAccessTokenService {
|
|||||||
private readonly environmentService: EnvironmentService,
|
private readonly environmentService: EnvironmentService,
|
||||||
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
|
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
|
||||||
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
||||||
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
|
|
||||||
private readonly messageChannelRepository: MessageChannelRepository,
|
|
||||||
private readonly messagingTelemetryService: MessagingTelemetryService,
|
|
||||||
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
|
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async refreshAndSaveAccessToken(
|
async refreshAndSaveAccessToken(
|
||||||
workspaceId: string,
|
workspaceId: string,
|
||||||
connectedAccountId: string,
|
connectedAccountId: string,
|
||||||
): Promise<void> {
|
): Promise<string> {
|
||||||
const connectedAccount = await this.connectedAccountRepository.getById(
|
const connectedAccount = await this.connectedAccountRepository.getById(
|
||||||
connectedAccountId,
|
connectedAccountId,
|
||||||
workspaceId,
|
workspaceId,
|
||||||
@ -46,51 +38,15 @@ export class GoogleAPIRefreshAccessTokenService {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
const accessToken = await this.refreshAccessToken(refreshToken);
|
||||||
const accessToken = await this.refreshAccessToken(refreshToken);
|
|
||||||
|
|
||||||
await this.connectedAccountRepository.updateAccessToken(
|
await this.connectedAccountRepository.updateAccessToken(
|
||||||
accessToken,
|
accessToken,
|
||||||
connectedAccountId,
|
connectedAccountId,
|
||||||
workspaceId,
|
workspaceId,
|
||||||
);
|
);
|
||||||
} catch (error) {
|
|
||||||
const messageChannel =
|
|
||||||
await this.messageChannelRepository.getFirstByConnectedAccountId(
|
|
||||||
connectedAccountId,
|
|
||||||
workspaceId,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (!messageChannel) {
|
return accessToken;
|
||||||
throw new Error(
|
|
||||||
`No message channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.messagingTelemetryService.track({
|
|
||||||
eventName: `refresh_token.error.insufficient_permissions`,
|
|
||||||
workspaceId,
|
|
||||||
connectedAccountId: messageChannel.connectedAccountId,
|
|
||||||
messageChannelId: messageChannel.id,
|
|
||||||
message: `${error.code}: ${error.reason}`,
|
|
||||||
});
|
|
||||||
|
|
||||||
await this.messagingChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
|
|
||||||
messageChannel.id,
|
|
||||||
workspaceId,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (!messageChannel.connectedAccountId) {
|
|
||||||
throw new Error(
|
|
||||||
`No connected account ID found for message channel ${messageChannel.id} in workspace ${workspaceId}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.connectedAccountRepository.updateAuthFailedAt(
|
|
||||||
messageChannel.connectedAccountId,
|
|
||||||
workspaceId,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async refreshAccessToken(refreshToken: string): Promise<string> {
|
async refreshAccessToken(refreshToken: string): Promise<string> {
|
||||||
|
|||||||
@ -9,9 +9,6 @@ import { assert, assertNotNull } from 'src/utils/assert';
|
|||||||
import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
|
import { GmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/types/gmail-message';
|
||||||
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util';
|
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util';
|
||||||
import { MessagingFetchByBatchesService } from 'src/modules/messaging/common/services/messaging-fetch-by-batch.service';
|
import { MessagingFetchByBatchesService } from 'src/modules/messaging/common/services/messaging-fetch-by-batch.service';
|
||||||
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
|
|
||||||
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
|
|
||||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class MessagingGmailFetchMessagesByBatchesService {
|
export class MessagingGmailFetchMessagesByBatchesService {
|
||||||
@ -21,30 +18,16 @@ export class MessagingGmailFetchMessagesByBatchesService {
|
|||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
private readonly fetchByBatchesService: MessagingFetchByBatchesService,
|
private readonly fetchByBatchesService: MessagingFetchByBatchesService,
|
||||||
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
|
|
||||||
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async fetchAllMessages(
|
async fetchAllMessages(
|
||||||
messageIds: string[],
|
messageIds: string[],
|
||||||
|
accessToken: string,
|
||||||
connectedAccountId: string,
|
connectedAccountId: string,
|
||||||
workspaceId: string,
|
workspaceId: string,
|
||||||
): Promise<GmailMessage[]> {
|
): Promise<GmailMessage[]> {
|
||||||
let startTime = Date.now();
|
let startTime = Date.now();
|
||||||
|
|
||||||
const connectedAccount = await this.connectedAccountRepository.getById(
|
|
||||||
connectedAccountId,
|
|
||||||
workspaceId,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (!connectedAccount) {
|
|
||||||
throw new Error(
|
|
||||||
`Connected account ${connectedAccountId} not found in workspace ${workspaceId}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
const accessToken = connectedAccount.accessToken;
|
|
||||||
|
|
||||||
const { messageIdsByBatch, batchResponses } =
|
const { messageIdsByBatch, batchResponses } =
|
||||||
await this.fetchByBatchesService.fetchAllByBatches(
|
await this.fetchByBatchesService.fetchAllByBatches(
|
||||||
messageIds,
|
messageIds,
|
||||||
|
|||||||
@ -22,8 +22,6 @@ import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/
|
|||||||
import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider';
|
import { MessagingGmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/messaging-gmail-client.provider';
|
||||||
import { MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-list-max-result.constant';
|
import { MESSAGING_GMAIL_USERS_MESSAGES_LIST_MAX_RESULT } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-list-max-result.constant';
|
||||||
import { MESSAGING_GMAIL_EXCLUDED_CATEGORIES } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories';
|
import { MESSAGING_GMAIL_EXCLUDED_CATEGORIES } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-excluded-categories';
|
||||||
import { GoogleAPIRefreshAccessTokenService } from 'src/modules/connected-account/services/google-api-refresh-access-token/google-api-refresh-access-token.service';
|
|
||||||
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
|
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class MessagingGmailFullMessageListFetchService {
|
export class MessagingGmailFullMessageListFetchService {
|
||||||
@ -41,11 +39,8 @@ export class MessagingGmailFullMessageListFetchService {
|
|||||||
MessageChannelMessageAssociationWorkspaceEntity,
|
MessageChannelMessageAssociationWorkspaceEntity,
|
||||||
)
|
)
|
||||||
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
|
private readonly messageChannelMessageAssociationRepository: MessageChannelMessageAssociationRepository,
|
||||||
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
|
|
||||||
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
|
||||||
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
|
private readonly messagingChannelSyncStatusService: MessagingChannelSyncStatusService,
|
||||||
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
|
private readonly gmailErrorHandlingService: MessagingErrorHandlingService,
|
||||||
private readonly googleAPIsRefreshAccessTokenService: GoogleAPIRefreshAccessTokenService,
|
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
public async processMessageListFetch(
|
public async processMessageListFetch(
|
||||||
@ -58,26 +53,9 @@ export class MessagingGmailFullMessageListFetchService {
|
|||||||
workspaceId,
|
workspaceId,
|
||||||
);
|
);
|
||||||
|
|
||||||
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
|
|
||||||
workspaceId,
|
|
||||||
connectedAccount.id,
|
|
||||||
);
|
|
||||||
|
|
||||||
const refreshedConnectedAccount =
|
|
||||||
await this.connectedAccountRepository.getById(
|
|
||||||
connectedAccount.id,
|
|
||||||
workspaceId,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (!refreshedConnectedAccount) {
|
|
||||||
throw new Error(
|
|
||||||
`Connected account ${connectedAccount.id} not found in workspace ${workspaceId}`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
const gmailClient: gmail_v1.Gmail =
|
const gmailClient: gmail_v1.Gmail =
|
||||||
await this.gmailClientProvider.getGmailClient(
|
await this.gmailClientProvider.getGmailClient(
|
||||||
refreshedConnectedAccount.refreshToken,
|
connectedAccount.refreshToken,
|
||||||
);
|
);
|
||||||
|
|
||||||
const { error: gmailError } =
|
const { error: gmailError } =
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import { MessagingGmailFetchMessagesByBatchesService } from 'src/modules/messagi
|
|||||||
import { MessagingErrorHandlingService } from 'src/modules/messaging/common/services/messaging-error-handling.service';
|
import { MessagingErrorHandlingService } from 'src/modules/messaging/common/services/messaging-error-handling.service';
|
||||||
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service';
|
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/common/services/messaging-save-messages-and-enqueue-contact-creation.service';
|
||||||
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
|
import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository';
|
||||||
|
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class MessagingGmailMessagesImportService {
|
export class MessagingGmailMessagesImportService {
|
||||||
@ -40,6 +41,8 @@ export class MessagingGmailMessagesImportService {
|
|||||||
private readonly blocklistRepository: BlocklistRepository,
|
private readonly blocklistRepository: BlocklistRepository,
|
||||||
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
|
@InjectObjectMetadataRepository(MessageChannelWorkspaceEntity)
|
||||||
private readonly messageChannelRepository: MessageChannelRepository,
|
private readonly messageChannelRepository: MessageChannelRepository,
|
||||||
|
@InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity)
|
||||||
|
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
async processMessageBatchImport(
|
async processMessageBatchImport(
|
||||||
@ -70,10 +73,35 @@ export class MessagingGmailMessagesImportService {
|
|||||||
workspaceId,
|
workspaceId,
|
||||||
);
|
);
|
||||||
|
|
||||||
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
|
let accessToken: string;
|
||||||
workspaceId,
|
|
||||||
connectedAccount.id,
|
try {
|
||||||
);
|
accessToken =
|
||||||
|
await this.googleAPIsRefreshAccessTokenService.refreshAndSaveAccessToken(
|
||||||
|
workspaceId,
|
||||||
|
connectedAccount.id,
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
await this.messagingTelemetryService.track({
|
||||||
|
eventName: `refresh_token.error.insufficient_permissions`,
|
||||||
|
workspaceId,
|
||||||
|
connectedAccountId: messageChannel.connectedAccountId,
|
||||||
|
messageChannelId: messageChannel.id,
|
||||||
|
message: `${error.code}: ${error.reason}`,
|
||||||
|
});
|
||||||
|
|
||||||
|
await this.messagingChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport(
|
||||||
|
messageChannel.id,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
|
||||||
|
await this.connectedAccountRepository.updateAuthFailedAt(
|
||||||
|
messageChannel.connectedAccountId,
|
||||||
|
workspaceId,
|
||||||
|
);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const messageIdsToFetch =
|
const messageIdsToFetch =
|
||||||
(await this.cacheStorage.setPop(
|
(await this.cacheStorage.setPop(
|
||||||
@ -97,6 +125,7 @@ export class MessagingGmailMessagesImportService {
|
|||||||
const allMessages =
|
const allMessages =
|
||||||
await this.fetchMessagesByBatchesService.fetchAllMessages(
|
await this.fetchMessagesByBatchesService.fetchAllMessages(
|
||||||
messageIdsToFetch,
|
messageIdsToFetch,
|
||||||
|
accessToken,
|
||||||
connectedAccount.id,
|
connectedAccount.id,
|
||||||
workspaceId,
|
workspaceId,
|
||||||
);
|
);
|
||||||
|
|||||||
Reference in New Issue
Block a user