[calendar/messaging] fix connected account auth failed should skip sync (#4920)

- AuthFailedAt is set when a refreshToken is not valid and an
accessToken can't be generated, meaning it will need a manual action
from the user to provide a new refresh token.
- Calendar/messaging jobs should not be executed if authFailedAt is not
null.
This commit is contained in:
Weiko
2024-04-11 17:57:48 +02:00
committed by GitHub
parent 8853408264
commit fc56775c2a
12 changed files with 342 additions and 208 deletions

View File

@ -22,6 +22,10 @@ import { SignUpService } from 'src/engine/core-modules/auth/services/sign-up.ser
import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { FileUploadModule } from 'src/engine/core-modules/file/file-upload/file-upload.module';
import { AppTokenService } from 'src/engine/core-modules/app-token/services/app-token.service';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
import { CalendarChannelObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-channel.object-metadata';
import { AuthResolver } from './auth.resolver';
@ -51,6 +55,11 @@ const jwtModule = JwtModule.registerAsync({
[Workspace, User, AppToken, FeatureFlagEntity],
'core',
),
ObjectMetadataRepositoryModule.forFeature([
ConnectedAccountObjectMetadata,
MessageChannelObjectMetadata,
CalendarChannelObjectMetadata,
]),
HttpModule,
UserWorkspaceModule,
],

View File

@ -56,7 +56,7 @@ export class GoogleAPIsAuthController {
throw new Error('Workspace not found');
}
await this.googleAPIsService.saveOrUpdateConnectedAccount({
await this.googleAPIsService.refreshGoogleRefreshToken({
handle: email,
workspaceMemberId: workspaceMemberId,
workspaceId: workspaceId,

View File

@ -1,31 +0,0 @@
import { ArgsType, Field } from '@nestjs/graphql';
import { IsNotEmpty, IsString } from 'class-validator';
@ArgsType()
export class SaveOrUpdateConnectedAccountInput {
@Field(() => String)
@IsNotEmpty()
@IsString()
handle: string;
@Field(() => String)
@IsNotEmpty()
@IsString()
workspaceMemberId: string;
@Field(() => String)
@IsNotEmpty()
@IsString()
workspaceId: string;
@Field(() => String)
@IsNotEmpty()
@IsString()
accessToken: string;
@Field(() => String)
@IsNotEmpty()
@IsString()
refreshToken: string;
}

View File

@ -1,26 +0,0 @@
import { ArgsType, Field } from '@nestjs/graphql';
import { IsNotEmpty, IsString } from 'class-validator';
@ArgsType()
export class UpdateConnectedAccountInput {
@Field(() => String)
@IsNotEmpty()
@IsString()
workspaceId: string;
@Field(() => String)
@IsNotEmpty()
@IsString()
accessToken: string;
@Field(() => String)
@IsNotEmpty()
@IsString()
refreshToken: string;
@Field(() => String)
@IsNotEmpty()
@IsString()
connectedAccountId: string;
}

View File

@ -2,11 +2,10 @@ import { Inject, Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { v4 } from 'uuid';
import { Repository } from 'typeorm';
import { EntityManager, Repository } from 'typeorm';
import { DataSourceService } from 'src/engine/metadata-modules/data-source/data-source.service';
import { TypeORMService } from 'src/database/typeorm/typeorm.service';
import { SaveOrUpdateConnectedAccountInput } from 'src/engine/core-modules/auth/dto/save-connected-account';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import {
@ -22,7 +21,23 @@ import {
GmailFullSyncV2Job,
GmailFullSyncV2JobData,
} from 'src/modules/messaging/jobs/gmail-full-sync-v2.job';
import { UpdateConnectedAccountInput } from 'src/engine/core-modules/auth/dto/update-connected-account';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import {
ConnectedAccountObjectMetadata,
ConnectedAccountProvider,
} from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import {
MessageChannelObjectMetadata,
MessageChannelType,
MessageChannelVisibility,
} from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
import { MessageChannelRepository } from 'src/modules/messaging/repositories/message-channel.repository';
import {
CalendarChannelObjectMetadata,
CalendarChannelVisibility,
} from 'src/modules/calendar/standard-objects/calendar-channel.object-metadata';
import { CalendarChannelRepository } from 'src/modules/calendar/repositories/calendar-channel.repository';
@Injectable()
export class GoogleAPIsService {
@ -36,15 +51,22 @@ export class GoogleAPIsService {
private readonly environmentService: EnvironmentService,
@InjectRepository(FeatureFlagEntity, 'core')
private readonly featureFlagRepository: Repository<FeatureFlagEntity>,
@InjectObjectMetadataRepository(ConnectedAccountObjectMetadata)
private readonly connectedAccountRepository: ConnectedAccountRepository,
@InjectObjectMetadataRepository(MessageChannelObjectMetadata)
private readonly messageChannelRepository: MessageChannelRepository,
@InjectObjectMetadataRepository(CalendarChannelObjectMetadata)
private readonly calendarChannelRepository: CalendarChannelRepository,
) {}
providerName = 'google';
async saveOrUpdateConnectedAccount(
saveOrUpdateConnectedAccountInput: SaveOrUpdateConnectedAccountInput,
) {
const { handle, workspaceId, workspaceMemberId } =
saveOrUpdateConnectedAccountInput;
async refreshGoogleRefreshToken(input: {
handle: string;
workspaceMemberId: string;
workspaceId: string;
accessToken: string;
refreshToken: string;
}) {
const { handle, workspaceId, workspaceMemberId } = input;
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
@ -54,151 +76,124 @@ export class GoogleAPIsService {
const workspaceDataSource =
await this.typeORMService.connectToDataSource(dataSourceMetadata);
const connectedAccount = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "handle" = $1 AND "provider" = $2 AND "accountOwnerId" = $3`,
[handle, this.providerName, workspaceMemberId],
);
if (connectedAccount.length > 0) {
await this.updateConnectedAccount({
...saveOrUpdateConnectedAccountInput,
connectedAccountId: connectedAccount[0].id,
});
} else {
await this.saveConnectedAccount(saveOrUpdateConnectedAccountInput);
}
}
async saveConnectedAccount(
saveConnectedAccountInput: SaveOrUpdateConnectedAccountInput,
) {
const {
handle,
workspaceId,
accessToken,
refreshToken,
workspaceMemberId,
} = saveConnectedAccountInput;
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId,
);
const workspaceDataSource =
await this.typeORMService.connectToDataSource(dataSourceMetadata);
const connectedAccountId = v4();
const IsCalendarEnabled = await this.featureFlagRepository.findOneBy({
const isCalendarEnabledFlag = await this.featureFlagRepository.findOneBy({
workspaceId,
key: FeatureFlagKeys.IsCalendarEnabled,
value: true,
});
await workspaceDataSource?.transaction(async (manager) => {
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."connectedAccount" ("id", "handle", "provider", "accessToken", "refreshToken", "accountOwnerId") VALUES ($1, $2, $3, $4, $5, $6)`,
[
connectedAccountId,
const isCalendarEnabled =
this.environmentService.get('CALENDAR_PROVIDER_GOOGLE_ENABLED') &&
!!isCalendarEnabledFlag;
await workspaceDataSource?.transaction(async (manager: EntityManager) => {
const connectedAccounts =
await this.connectedAccountRepository.getAllByHandleAndWorkspaceMemberId(
handle,
this.providerName,
accessToken,
refreshToken,
workspaceMemberId,
],
);
if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) {
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."messageChannel" ("visibility", "handle", "connectedAccountId", "type") VALUES ($1, $2, $3, $4)`,
['share_everything', handle, connectedAccountId, 'email'],
workspaceId,
manager,
);
}
if (
this.environmentService.get('CALENDAR_PROVIDER_GOOGLE_ENABLED') &&
IsCalendarEnabled
) {
await manager.query(
`INSERT INTO ${dataSourceMetadata.schema}."calendarChannel" ("visibility", "handle", "connectedAccountId") VALUES ($1, $2, $3)`,
['SHARE_EVERYTHING', handle, connectedAccountId],
if (!connectedAccounts || connectedAccounts?.length === 0) {
const newConnectedAccountId = v4();
await this.connectedAccountRepository.create(
{
id: newConnectedAccountId,
handle,
provider: ConnectedAccountProvider.GOOGLE,
accessToken: input.accessToken,
refreshToken: input.refreshToken,
accountOwnerId: workspaceMemberId,
},
workspaceId,
manager,
);
await this.messageChannelRepository.create(
{
id: v4(),
connectedAccountId: newConnectedAccountId,
type: MessageChannelType.EMAIL,
handle,
visibility: MessageChannelVisibility.SHARE_EVERYTHING,
},
workspaceId,
manager,
);
if (isCalendarEnabled) {
await this.calendarChannelRepository.create(
{
id: v4(),
connectedAccountId: newConnectedAccountId,
handle,
visibility: CalendarChannelVisibility.SHARE_EVERYTHING,
},
workspaceId,
manager,
);
}
await this.enqueueSyncJobs(
newConnectedAccountId,
workspaceId,
isCalendarEnabled,
);
} else {
await this.connectedAccountRepository.updateAccessTokenAndRefreshToken(
input.accessToken,
input.refreshToken,
connectedAccounts[0].id,
workspaceId,
manager,
);
await this.messageChannelRepository.resetSync(
connectedAccounts[0].id,
workspaceId,
manager,
);
await this.enqueueSyncJobs(
connectedAccounts[0].id,
workspaceId,
isCalendarEnabled,
);
}
});
}
private async enqueueSyncJobs(
connectedAccountId: string,
workspaceId: string,
isCalendarEnabled: boolean,
) {
if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) {
await this.enqueueGmailFullSyncJob(workspaceId, connectedAccountId);
await this.messageQueueService.add<GmailFullSyncV2JobData>(
GmailFullSyncV2Job.name,
{
workspaceId,
connectedAccountId,
},
);
}
if (
this.environmentService.get('CALENDAR_PROVIDER_GOOGLE_ENABLED') &&
IsCalendarEnabled
isCalendarEnabled
) {
await this.enqueueGoogleCalendarSyncJob(workspaceId, connectedAccountId);
}
return;
}
async updateConnectedAccount(
updateConnectedAccountInput: UpdateConnectedAccountInput,
) {
const { workspaceId, accessToken, refreshToken, connectedAccountId } =
updateConnectedAccountInput;
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId,
await this.calendarQueueService.add<GoogleCalendarSyncJobData>(
GoogleCalendarSyncJob.name,
{
workspaceId,
connectedAccountId,
},
{
retryLimit: 2,
},
);
const workspaceDataSource =
await this.typeORMService.connectToDataSource(dataSourceMetadata);
await workspaceDataSource?.transaction(async (manager) => {
await manager.query(
`UPDATE ${dataSourceMetadata.schema}."connectedAccount" SET "accessToken" = $1, "refreshToken" = $2, "authFailedAt" = NULL WHERE "id" = $3`,
[accessToken, refreshToken, connectedAccountId],
);
});
if (this.environmentService.get('MESSAGING_PROVIDER_GMAIL_ENABLED')) {
await this.enqueueGmailFullSyncJob(workspaceId, connectedAccountId);
}
if (this.environmentService.get('CALENDAR_PROVIDER_GOOGLE_ENABLED')) {
await this.enqueueGoogleCalendarSyncJob(workspaceId, connectedAccountId);
}
return;
}
async enqueueGmailFullSyncJob(
workspaceId: string,
connectedAccountId: string,
) {
await this.messageQueueService.add<GmailFullSyncV2JobData>(
GmailFullSyncV2Job.name,
{
workspaceId,
connectedAccountId,
},
);
}
async enqueueGoogleCalendarSyncJob(
workspaceId: string,
connectedAccountId: string,
) {
await this.calendarQueueService.add<GoogleCalendarSyncJobData>(
GoogleCalendarSyncJob.name,
{
workspaceId,
connectedAccountId,
},
{
retryLimit: 2,
},
);
}
}

View File

@ -27,6 +27,30 @@ export class CalendarChannelRepository {
);
}
public async create(
calendarChannel: Pick<
ObjectRecord<CalendarChannelObjectMetadata>,
'id' | 'connectedAccountId' | 'handle' | 'visibility'
>,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."calendarChannel" (id, "connectedAccountId", "handle", "visibility") VALUES ($1, $2, $3, $4)`,
[
calendarChannel.id,
calendarChannel.connectedAccountId,
calendarChannel.handle,
calendarChannel.visibility,
],
workspaceId,
transactionManager,
);
}
public async getByConnectedAccountId(
connectedAccountId: string,
workspaceId: string,

View File

@ -43,6 +43,75 @@ export class ConnectedAccountRepository {
);
}
public async getAllByHandleAndWorkspaceMemberId(
handle: string,
workspaceMemberId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<ConnectedAccountObjectMetadata>[] | undefined> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const connectedAccounts =
await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."connectedAccount" WHERE "handle" = $1 AND "accountOwnerId" = $2 LIMIT 1`,
[handle, workspaceMemberId],
workspaceId,
transactionManager,
);
return connectedAccounts;
}
public async create(
connectedAccount: Pick<
ObjectRecord<ConnectedAccountObjectMetadata>,
| 'id'
| 'handle'
| 'provider'
| 'accessToken'
| 'refreshToken'
| 'accountOwnerId'
>,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<ConnectedAccountObjectMetadata>> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."connectedAccount" ("id", "handle", "provider", "accessToken", "refreshToken", "accountOwnerId") VALUES ($1, $2, $3, $4, $5, $6)`,
[
connectedAccount.id,
connectedAccount.handle,
connectedAccount.provider,
connectedAccount.accessToken,
connectedAccount.refreshToken,
connectedAccount.accountOwnerId,
],
workspaceId,
transactionManager,
);
}
public async updateAccessTokenAndRefreshToken(
accessToken: string,
refreshToken: string,
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
) {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."connectedAccount" SET "accessToken" = $1, "refreshToken" = $2 WHERE "id" = $3`,
[accessToken, refreshToken, connectedAccountId],
workspaceId,
transactionManager,
);
}
public async getById(
connectedAccountId: string,
workspaceId: string,

View File

@ -3,7 +3,6 @@ import { Injectable } from '@nestjs/common';
import axios from 'axios';
import { EnvironmentService } from 'src/engine/integrations/environment/environment.service';
import { ExceptionHandlerService } from 'src/engine/integrations/exception-handler/exception-handler.service';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository';
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
@ -14,7 +13,6 @@ export class GoogleAPIRefreshAccessTokenService {
private readonly environmentService: EnvironmentService,
@InjectObjectMetadataRepository(ConnectedAccountObjectMetadata)
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly exceptionHandlerService: ExceptionHandlerService,
) {}
async refreshAndSaveAccessToken(
@ -32,6 +30,12 @@ export class GoogleAPIRefreshAccessTokenService {
);
}
if (connectedAccount.authFailedAt) {
throw new Error(
`Skipping refresh of access token for connected account ${connectedAccountId} in workspace ${workspaceId} because auth already failed, a new refresh token is needed`,
);
}
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
@ -82,11 +86,7 @@ export class GoogleAPIRefreshAccessTokenService {
connectedAccountId,
workspaceId,
);
this.exceptionHandlerService.captureExceptions([error], {
user: {
workspaceId,
},
});
throw new Error(`Error refreshing access token: ${error.message}`);
}
}

View File

@ -17,6 +17,10 @@ import { CalendarChannelObjectMetadata } from 'src/modules/calendar/standard-obj
import { MessageChannelObjectMetadata } from 'src/modules/messaging/standard-objects/message-channel.object-metadata';
import { WorkspaceMemberObjectMetadata } from 'src/modules/workspace-member/standard-objects/workspace-member.object-metadata';
export enum ConnectedAccountProvider {
GOOGLE = 'google',
}
@ObjectMetadata({
standardId: standardObjectIds.connectedAccount,
namePlural: 'connectedAccounts',
@ -43,7 +47,7 @@ export class ConnectedAccountObjectMetadata extends BaseObjectMetadata {
description: 'The account provider',
icon: 'IconSettings',
})
provider: string;
provider: ConnectedAccountProvider; // field metadata should be a SELECT
@FieldMetadata({
standardId: connectedAccountStandardFieldIds.accessToken,

View File

@ -15,6 +15,49 @@ export class MessageChannelRepository {
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
public async create(
messageChannel: Pick<
ObjectRecord<MessageChannelObjectMetadata>,
'id' | 'connectedAccountId' | 'type' | 'handle' | 'visibility'
>,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`INSERT INTO ${dataSourceSchema}."messageChannel" ("id", "connectedAccountId", "type", "handle", "visibility")
VALUES ($1, $2, $3, $4, $5)`,
[
messageChannel.id,
messageChannel.connectedAccountId,
messageChannel.type,
messageChannel.handle,
messageChannel.visibility,
],
workspaceId,
transactionManager,
);
}
public async resetSync(
connectedAccountId: string,
workspaceId: string,
transactionManager?: EntityManager,
): Promise<void> {
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = NULL, "syncCursor" = '', "ongoingSyncStartedAt" = NULL
WHERE "connectedAccountId" = $1`,
[connectedAccountId],
workspaceId,
transactionManager,
);
}
public async getAll(
workspaceId: string,
transactionManager?: EntityManager,

View File

@ -63,6 +63,14 @@ export class GmailFetchMessageContentFromCacheService {
return;
}
if (connectedAccount.authFailedAt) {
this.logger.error(
`Connected account ${connectedAccountId} in workspace ${workspaceId} is in a failed state. Skipping...`,
);
return;
}
const accessToken = connectedAccount.accessToken;
const refreshToken = connectedAccount.refreshToken;
@ -233,6 +241,14 @@ export class GmailFetchMessageContentFromCacheService {
messageIdsToFetch,
);
if (error?.message?.code === 429) {
this.logger.error(
`Error fetching messages for ${connectedAccountId} in workspace ${workspaceId}: Resource has been exhausted, locking for ${GMAIL_ONGOING_SYNC_TIMEOUT}ms...`,
);
return;
}
await this.messageChannelRepository.updateSyncStatus(
gmailMessageChannelId,
MessageChannelSyncStatus.FAILED,

View File

@ -21,6 +21,17 @@ export enum MessageChannelSyncStatus {
FAILED = 'FAILED',
}
export enum MessageChannelVisibility {
METADATA = 'metadata',
SUBJECT = 'subject',
SHARE_EVERYTHING = 'share_everything',
}
export enum MessageChannelType {
EMAIL = 'email',
SMS = 'sms',
}
@ObjectMetadata({
standardId: standardObjectIds.messageChannel,
namePlural: 'messageChannels',
@ -38,16 +49,26 @@ export class MessageChannelObjectMetadata extends BaseObjectMetadata {
description: 'Visibility',
icon: 'IconEyeglass',
options: [
{ value: 'metadata', label: 'Metadata', position: 0, color: 'green' },
{ value: 'subject', label: 'Subject', position: 1, color: 'blue' },
{
value: 'share_everything',
value: MessageChannelVisibility.METADATA,
label: 'Metadata',
position: 0,
color: 'green',
},
{
value: MessageChannelVisibility.SUBJECT,
label: 'Subject',
position: 1,
color: 'blue',
},
{
value: MessageChannelVisibility.SHARE_EVERYTHING,
label: 'Share Everything',
position: 2,
color: 'orange',
},
],
defaultValue: "'share_everything'",
defaultValue: MessageChannelVisibility.SHARE_EVERYTHING,
})
visibility: string;
@ -77,10 +98,20 @@ export class MessageChannelObjectMetadata extends BaseObjectMetadata {
description: 'Channel Type',
icon: 'IconMessage',
options: [
{ value: 'email', label: 'Email', position: 0, color: 'green' },
{ value: 'sms', label: 'SMS', position: 1, color: 'blue' },
{
value: MessageChannelType.EMAIL,
label: 'Email',
position: 0,
color: 'green',
},
{
value: MessageChannelType.SMS,
label: 'SMS',
position: 1,
color: 'blue',
},
],
defaultValue: "'email'",
defaultValue: MessageChannelType.EMAIL,
})
type: string;