@ -14,4 +14,5 @@ export enum MessageImportDriverExceptionCode {
|
||||
UNKNOWN = 'UNKNOWN',
|
||||
UNKNOWN_NETWORK_ERROR = 'UNKNOWN_NETWORK_ERROR',
|
||||
NO_NEXT_SYNC_CURSOR = 'NO_NEXT_SYNC_CURSOR',
|
||||
SYNC_CURSOR_ERROR = 'SYNC_CURSOR_ERROR',
|
||||
}
|
||||
|
||||
@ -32,7 +32,7 @@ export class GmailGetMessageListService {
|
||||
public async getFullMessageList(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id'
|
||||
'provider' | 'refreshToken' | 'id' | 'handle'
|
||||
>,
|
||||
): Promise<GetFullMessageListResponse> {
|
||||
const gmailClient =
|
||||
|
||||
@ -10,6 +10,7 @@ import { MicrosoftGetMessageListService } from './microsoft-get-message-list.ser
|
||||
import { MicrosoftHandleErrorService } from './microsoft-handle-error.service';
|
||||
|
||||
const refreshToken = 'replace-with-your-refresh-token';
|
||||
const syncCursor = 'replace-with-your-sync-cursor';
|
||||
|
||||
xdescribe('Microsoft dev tests : get message list service', () => {
|
||||
let service: MicrosoftGetMessageListService;
|
||||
@ -54,4 +55,27 @@ xdescribe('Microsoft dev tests : get message list service', () => {
|
||||
service.getFullMessageList(mockConnectedAccountUnvalid),
|
||||
).rejects.toThrowError('Access token is undefined or empty');
|
||||
});
|
||||
|
||||
it('Should fetch and return partial message list successfully', async () => {
|
||||
const result = await service.getPartialMessageList(
|
||||
mockConnectedAccount,
|
||||
syncCursor,
|
||||
);
|
||||
|
||||
expect(result.nextSyncCursor).toBeTruthy();
|
||||
});
|
||||
|
||||
it('Should fail partial message if syncCursor is invalid', async () => {
|
||||
await expect(
|
||||
service.getPartialMessageList(mockConnectedAccount, 'invalid-syncCursor'),
|
||||
).rejects.toThrowError(
|
||||
/Resource not found for the segment|Badly formed content/g,
|
||||
);
|
||||
});
|
||||
|
||||
it('Should fail partial message if syncCursor is missing', async () => {
|
||||
await expect(
|
||||
service.getPartialMessageList(mockConnectedAccount, ''),
|
||||
).rejects.toThrowError(/Missing SyncCursor/g);
|
||||
});
|
||||
});
|
||||
|
||||
@ -7,8 +7,15 @@ import {
|
||||
} from '@microsoft/microsoft-graph-client';
|
||||
|
||||
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
|
||||
import {
|
||||
MessageImportDriverException,
|
||||
MessageImportDriverExceptionCode,
|
||||
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
|
||||
import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-manager/drivers/microsoft/providers/microsoft-client.provider';
|
||||
import { GetFullMessageListResponse } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
import {
|
||||
GetFullMessageListResponse,
|
||||
GetPartialMessageListResponse,
|
||||
} from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
|
||||
|
||||
// Microsoft API limit is 1000 messages per request on this endpoint
|
||||
const MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT = 1000;
|
||||
@ -54,4 +61,54 @@ export class MicrosoftGetMessageListService {
|
||||
nextSyncCursor: pageIterator.getDeltaLink() || '',
|
||||
};
|
||||
}
|
||||
|
||||
public async getPartialMessageList(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id'
|
||||
>,
|
||||
syncCursor: string,
|
||||
): Promise<GetPartialMessageListResponse> {
|
||||
// important: otherwise tries to get the full message list
|
||||
if (!syncCursor) {
|
||||
throw new MessageImportDriverException(
|
||||
'Missing SyncCursor',
|
||||
MessageImportDriverExceptionCode.SYNC_CURSOR_ERROR,
|
||||
);
|
||||
}
|
||||
|
||||
const messageExternalIds: string[] = [];
|
||||
const messageExternalIdsToDelete: string[] = [];
|
||||
|
||||
const microsoftClient =
|
||||
await this.microsoftClientProvider.getMicrosoftClient(connectedAccount);
|
||||
|
||||
const response: PageCollection = await microsoftClient
|
||||
.api(syncCursor)
|
||||
.version('beta')
|
||||
.headers({
|
||||
Prefer: `odata.maxpagesize=${MESSAGING_MICROSOFT_USERS_MESSAGES_LIST_MAX_RESULT}, IdType="ImmutableId"`,
|
||||
})
|
||||
.get();
|
||||
|
||||
const callback: PageIteratorCallback = (data) => {
|
||||
if (data['@removed']) {
|
||||
messageExternalIdsToDelete.push(data.id);
|
||||
} else {
|
||||
messageExternalIds.push(data.id);
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
const pageIterator = new PageIterator(microsoftClient, response, callback);
|
||||
|
||||
await pageIterator.iterate();
|
||||
|
||||
return {
|
||||
messageExternalIds,
|
||||
messageExternalIdsToDelete,
|
||||
nextSyncCursor: pageIterator.getDeltaLink() || '',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@ -64,6 +64,13 @@ export class MessageImportExceptionHandlerService {
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
case MessageImportDriverExceptionCode.SYNC_CURSOR_ERROR:
|
||||
await this.handlePermanentException(
|
||||
exception,
|
||||
messageChannel,
|
||||
workspaceId,
|
||||
);
|
||||
break;
|
||||
default:
|
||||
throw exception;
|
||||
}
|
||||
@ -149,6 +156,22 @@ export class MessageImportExceptionHandlerService {
|
||||
);
|
||||
}
|
||||
|
||||
private async handlePermanentException(
|
||||
exception: MessageImportDriverException,
|
||||
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
||||
workspaceId: string,
|
||||
): Promise<void> {
|
||||
await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport(
|
||||
[messageChannel.id],
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
throw new MessageImportException(
|
||||
`Permanent error occurred while importing messages for message channel ${messageChannel.id} in workspace ${workspaceId}: ${exception.message}`,
|
||||
MessageImportExceptionCode.UNKNOWN,
|
||||
);
|
||||
}
|
||||
|
||||
private async handleNotFoundException(
|
||||
syncStep: MessageImportSyncStep,
|
||||
messageChannel: Pick<MessageChannelWorkspaceEntity, 'id'>,
|
||||
|
||||
@ -29,7 +29,7 @@ export class MessagingGetMessageListService {
|
||||
public async getFullMessageList(
|
||||
connectedAccount: Pick<
|
||||
ConnectedAccountWorkspaceEntity,
|
||||
'provider' | 'refreshToken' | 'id'
|
||||
'provider' | 'refreshToken' | 'id' | 'handle'
|
||||
>,
|
||||
): Promise<GetFullMessageListResponse> {
|
||||
switch (connectedAccount.provider) {
|
||||
@ -63,11 +63,10 @@ export class MessagingGetMessageListService {
|
||||
syncCursor,
|
||||
);
|
||||
case 'microsoft':
|
||||
return {
|
||||
messageExternalIds: [],
|
||||
messageExternalIdsToDelete: [],
|
||||
nextSyncCursor: '',
|
||||
};
|
||||
return this.microsoftGetMessageListService.getPartialMessageList(
|
||||
connectedAccount,
|
||||
syncCursor,
|
||||
);
|
||||
default:
|
||||
throw new MessageImportException(
|
||||
`Provider ${connectedAccount.provider} is not supported`,
|
||||
|
||||
@ -58,13 +58,14 @@ export class MessagingMessageService {
|
||||
});
|
||||
|
||||
if (existingMessage) {
|
||||
await messageChannelMessageAssociationRepository.insert(
|
||||
await messageChannelMessageAssociationRepository.upsert(
|
||||
{
|
||||
messageChannelId,
|
||||
messageId: existingMessage.id,
|
||||
messageExternalId: message.externalId,
|
||||
messageThreadExternalId: message.messageThreadExternalId,
|
||||
},
|
||||
['messageChannelId', 'messageExternalId'],
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user