3218 make the function fetchworkspacememberthreads idempotent (#3230)

* wip

* fetch only the messages which are not in the db

* fetch only the messages and threads which are not in the db

* fix bugs

* merge

* remove eslint-plugins-twenty

* get saved message thread ids and message ids at the same time
This commit is contained in:
bosiraphael
2024-01-04 13:36:37 +01:00
committed by GitHub
parent f36fa9aa14
commit 54c1d245ab

View File

@ -39,29 +39,14 @@ export class FetchWorkspaceMessagesService {
workspaceMemberId: string,
maxResults = 500,
): Promise<void> {
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
const { workspaceDataSource, dataSourceMetadata, connectedAccount } =
await this.getDataSourceMetadataWorkspaceMetadataAndConnectedAccount(
workspaceId,
workspaceMemberId,
);
const workspaceDataSource =
await this.typeORMService.connectToDataSource(dataSourceMetadata);
if (!workspaceDataSource) {
throw new Error('No workspace data source found');
}
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`,
[workspaceMemberId],
);
if (!connectedAccounts || connectedAccounts.length === 0) {
throw new Error('No connected account found');
}
const accessToken = connectedAccounts[0]?.accessToken;
const refreshToken = connectedAccounts[0]?.refreshToken;
const accessToken = connectedAccount.accessToken;
const refreshToken = connectedAccount.refreshToken;
if (!refreshToken) {
throw new Error('No refresh token found');
@ -80,11 +65,22 @@ export class FetchWorkspaceMessagesService {
return;
}
const { savedMessageIds, savedThreadIds } =
await this.getAllSavedMessagesIdsAndMessageThreadsIdsForConnectedAccount(
dataSourceMetadata,
workspaceDataSource,
connectedAccount.id,
);
const threadsToSave = threadsData.filter(
(thread) => thread.id && !savedThreadIds.includes(thread.id),
);
await this.saveMessageThreads(
threadsData,
threadsToSave,
dataSourceMetadata,
workspaceDataSource,
connectedAccounts[0].id,
connectedAccount.id,
);
const threadQueries: MessageOrThreadQuery[] = threadsData.map((thread) => ({
@ -101,7 +97,11 @@ export class FetchWorkspaceMessagesService {
.map((thread) => thread.messageIds)
.flat();
const messageQueries: MessageOrThreadQuery[] = messageIds.map(
const messageIdsToSave = messageIds.filter(
(messageId) => !savedMessageIds.includes(messageId),
);
const messageQueries: MessageOrThreadQuery[] = messageIdsToSave.map(
(messageId) => ({
uri: '/gmail/v1/users/me/messages/' + messageId + '?format=RAW',
}),
@ -224,4 +224,68 @@ export class FetchWorkspaceMessagesService {
});
}
}
async getAllSavedMessagesIdsAndMessageThreadsIdsForConnectedAccount(
dataSourceMetadata: DataSourceEntity,
workspaceDataSource: DataSource,
connectedAccountId: string,
): Promise<{
savedMessageIds: string[];
savedThreadIds: string[];
}> {
const messageIds: { messageId: string; messageThreadId: string }[] =
await workspaceDataSource?.query(
`SELECT message."externalId" AS "messageId",
"messageThread"."externalId" AS "messageThreadId"
FROM ${dataSourceMetadata.schema}."message" message
LEFT JOIN ${dataSourceMetadata.schema}."messageThread" "messageThread" ON message."messageThreadId" = "messageThread"."id"
LEFT JOIN ${dataSourceMetadata.schema}."messageChannel" ON "messageThread"."messageChannelId" = ${dataSourceMetadata.schema}."messageChannel"."id"
WHERE ${dataSourceMetadata.schema}."messageChannel"."connectedAccountId" = $1`,
[connectedAccountId],
);
return {
savedMessageIds: messageIds.map((message) => message.messageId),
savedThreadIds: [
...new Set(messageIds.map((message) => message.messageThreadId)),
],
};
}
async getDataSourceMetadataWorkspaceMetadataAndConnectedAccount(
workspaceId: string,
workspaceMemberId: string,
): Promise<{
dataSourceMetadata: DataSourceEntity;
workspaceDataSource: DataSource;
connectedAccount: any;
}> {
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
workspaceId,
);
const workspaceDataSource = await this.typeORMService.connectToDataSource(
dataSourceMetadata,
);
if (!workspaceDataSource) {
throw new Error('No workspace data source found');
}
const connectedAccounts = await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."connectedAccount" WHERE "provider" = 'gmail' AND "accountOwnerId" = $1`,
[workspaceMemberId],
);
if (!connectedAccounts || connectedAccounts.length === 0) {
throw new Error('No connected account found');
}
return {
dataSourceMetadata,
workspaceDataSource,
connectedAccount: connectedAccounts[0],
};
}
}