4489 timebox finish google calendar full sync (#4615)
* add lodash differenceWith * add awaits * update sync cursor is working * add logs * use isSyncEnabled information to enqueue jobs * add decorator InjectObjectMetadataRepository * fix gmail-full-sync
This commit is contained in:
@ -11,6 +11,8 @@ import {
|
||||
} from 'src/modules/calendar/jobs/google-calendar-full-sync.job';
|
||||
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
|
||||
import { ConnectedAccountObjectMetadata } from 'src/modules/connected-account/standard-objects/connected-account.object-metadata';
|
||||
import { CalendarChannelRepository } from 'src/modules/calendar/repositories/calendar-channel.repository';
|
||||
import { CalendarChannelObjectMetadata } from 'src/modules/calendar/standard-objects/calendar-channel.object-metadata';
|
||||
|
||||
interface GoogleCalendarFullSyncOptions {
|
||||
workspaceId: string;
|
||||
@ -27,6 +29,8 @@ export class GoogleCalendarFullSyncCommand extends CommandRunner {
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
@InjectObjectMetadataRepository(ConnectedAccountObjectMetadata)
|
||||
private readonly connectedAccountRepository: ConnectedAccountRepository,
|
||||
@InjectObjectMetadataRepository(CalendarChannelObjectMetadata)
|
||||
private readonly calendarChannelRepository: CalendarChannelRepository,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
@ -54,6 +58,16 @@ export class GoogleCalendarFullSyncCommand extends CommandRunner {
|
||||
await this.connectedAccountRepository.getAll(workspaceId);
|
||||
|
||||
for (const connectedAccount of connectedAccounts) {
|
||||
const calendarChannel =
|
||||
await this.calendarChannelRepository.getFirstByConnectedAccountId(
|
||||
connectedAccount.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (!calendarChannel?.isSyncEnabled) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await this.messageQueueService.add<GoogleCalendarFullSyncJobData>(
|
||||
GoogleCalendarFullSyncJob.name,
|
||||
{
|
||||
|
||||
@ -28,36 +28,18 @@ export class CalendarChannelRepository {
|
||||
);
|
||||
}
|
||||
|
||||
public async getFirstByConnectedAccountIdOrFail(
|
||||
public async getFirstByConnectedAccountId(
|
||||
connectedAccountId: string,
|
||||
workspaceId: string,
|
||||
): Promise<ObjectRecord<CalendarChannelObjectMetadata>> {
|
||||
): Promise<ObjectRecord<CalendarChannelObjectMetadata> | undefined> {
|
||||
const calendarChannels = await this.getByConnectedAccountId(
|
||||
connectedAccountId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (!calendarChannels || calendarChannels.length === 0) {
|
||||
throw new Error(
|
||||
`No calendar channel found for connected account ${connectedAccountId} in workspace ${workspaceId}`,
|
||||
);
|
||||
}
|
||||
|
||||
return calendarChannels[0];
|
||||
}
|
||||
|
||||
public async getIsContactAutoCreationEnabledByConnectedAccountIdOrFail(
|
||||
connectedAccountId: string,
|
||||
workspaceId: string,
|
||||
): Promise<boolean> {
|
||||
const calendarChannel = await this.getFirstByConnectedAccountIdOrFail(
|
||||
connectedAccountId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
return calendarChannel.isContactAutoCreationEnabled;
|
||||
}
|
||||
|
||||
public async getByIds(
|
||||
ids: string[],
|
||||
workspaceId: string,
|
||||
@ -73,4 +55,21 @@ export class CalendarChannelRepository {
|
||||
transactionManager,
|
||||
);
|
||||
}
|
||||
|
||||
public async updateSyncCursor(
|
||||
syncCursor: string,
|
||||
calendarChannelId: string,
|
||||
workspaceId: string,
|
||||
transactionManager?: EntityManager,
|
||||
): Promise<void> {
|
||||
const dataSourceSchema =
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
await this.workspaceDataSourceService.executeRawQuery(
|
||||
`UPDATE ${dataSourceSchema}."calendarChannel" SET "syncCursor" = $1 WHERE "id" = $2`,
|
||||
[syncCursor, calendarChannelId],
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { EntityManager } from 'typeorm';
|
||||
import differenceWith from 'lodash.differencewith';
|
||||
|
||||
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
|
||||
import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metadata/types/object-record';
|
||||
@ -27,7 +28,7 @@ export class CalendarEventAttendeeRepository {
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
return await this.workspaceDataSourceService.executeRawQuery(
|
||||
`SELECT * FROM ${dataSourceSchema}."calendarEventAttendees" WHERE "id" = ANY($1)`,
|
||||
`SELECT * FROM ${dataSourceSchema}."calendarEventAttendee" WHERE "id" = ANY($1)`,
|
||||
[calendarEventAttendeeIds],
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
@ -47,7 +48,7 @@ export class CalendarEventAttendeeRepository {
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
return await this.workspaceDataSourceService.executeRawQuery(
|
||||
`SELECT * FROM ${dataSourceSchema}."calendarEventAttendees" WHERE "calendarEventId" = ANY($1)`,
|
||||
`SELECT * FROM ${dataSourceSchema}."calendarEventAttendee" WHERE "calendarEventId" = ANY($1)`,
|
||||
[calendarEventIds],
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
@ -67,7 +68,7 @@ export class CalendarEventAttendeeRepository {
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
await this.workspaceDataSourceService.executeRawQuery(
|
||||
`DELETE FROM ${dataSourceSchema}."calendarEventAttendees" WHERE "id" = ANY($1)`,
|
||||
`DELETE FROM ${dataSourceSchema}."calendarEventAttendee" WHERE "id" = ANY($1)`,
|
||||
[calendarEventAttendeeIds],
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
@ -119,6 +120,29 @@ export class CalendarEventAttendeeRepository {
|
||||
const dataSourceSchema =
|
||||
this.workspaceDataSourceService.getSchemaName(workspaceId);
|
||||
|
||||
const calendarEventIds = Array.from(iCalUIDCalendarEventIdMap.values());
|
||||
|
||||
const existingCalendarEventAttendees = await this.getByCalendarEventIds(
|
||||
calendarEventIds,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
const calendarEventAttendeesToDelete = differenceWith(
|
||||
existingCalendarEventAttendees,
|
||||
calendarEventAttendees,
|
||||
(existingCalendarEventAttendee, calendarEventAttendee) =>
|
||||
existingCalendarEventAttendee.handle === calendarEventAttendee.handle,
|
||||
);
|
||||
|
||||
await this.deleteByIds(
|
||||
calendarEventAttendeesToDelete.map(
|
||||
(calendarEventAttendee) => calendarEventAttendee.id,
|
||||
),
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
const values = calendarEventAttendees.map((calendarEventAttendee) => ({
|
||||
...calendarEventAttendee,
|
||||
calendarEventId: iCalUIDCalendarEventIdMap.get(
|
||||
|
||||
@ -79,11 +79,15 @@ export class GoogleCalendarFullSyncService {
|
||||
}
|
||||
|
||||
const calendarChannel =
|
||||
await this.calendarChannelRepository.getFirstByConnectedAccountIdOrFail(
|
||||
await this.calendarChannelRepository.getFirstByConnectedAccountId(
|
||||
connectedAccountId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (!calendarChannel) {
|
||||
return;
|
||||
}
|
||||
|
||||
const calendarChannelId = calendarChannel.id;
|
||||
|
||||
const googleCalendarClient =
|
||||
@ -109,6 +113,7 @@ export class GoogleCalendarFullSyncService {
|
||||
: [];
|
||||
|
||||
const blocklistedEmails = blocklist.map((blocklist) => blocklist.handle);
|
||||
|
||||
let startTime = Date.now();
|
||||
|
||||
const googleCalendarEvents = await googleCalendarClient.events.list({
|
||||
@ -206,38 +211,94 @@ export class GoogleCalendarFullSyncService {
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
dataSourceMetadata?.transaction(async (transactionManager) => {
|
||||
this.calendarEventRepository.saveCalendarEvents(
|
||||
eventsToSave,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
try {
|
||||
dataSourceMetadata?.transaction(async (transactionManager) => {
|
||||
startTime = Date.now();
|
||||
|
||||
this.calendarEventRepository.updateCalendarEvents(
|
||||
eventsToUpdate,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
await this.calendarEventRepository.saveCalendarEvents(
|
||||
eventsToSave,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
this.calendarChannelEventAssociationRepository.saveCalendarChannelEventAssociations(
|
||||
calendarChannelEventAssociationsToSave,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
endTime = Date.now();
|
||||
|
||||
this.calendarEventAttendeesRepository.saveCalendarEventAttendees(
|
||||
attendeesToSave,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
this.logger.log(
|
||||
`google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: saving events in ${
|
||||
endTime - startTime
|
||||
}ms.`,
|
||||
);
|
||||
|
||||
this.calendarEventAttendeesRepository.updateCalendarEventAttendees(
|
||||
attendeesToUpdate,
|
||||
iCalUIDCalendarEventIdMap,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
startTime = Date.now();
|
||||
|
||||
await this.calendarEventRepository.updateCalendarEvents(
|
||||
eventsToUpdate,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
endTime = Date.now();
|
||||
|
||||
this.logger.log(
|
||||
`google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating events in ${
|
||||
endTime - startTime
|
||||
}ms.`,
|
||||
);
|
||||
|
||||
startTime = Date.now();
|
||||
|
||||
await this.calendarChannelEventAssociationRepository.saveCalendarChannelEventAssociations(
|
||||
calendarChannelEventAssociationsToSave,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
endTime = Date.now();
|
||||
|
||||
this.logger.log(
|
||||
`google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: saving calendar channel event associations in ${
|
||||
endTime - startTime
|
||||
}ms.`,
|
||||
);
|
||||
|
||||
startTime = Date.now();
|
||||
|
||||
await this.calendarEventAttendeesRepository.saveCalendarEventAttendees(
|
||||
attendeesToSave,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
endTime = Date.now();
|
||||
|
||||
this.logger.log(
|
||||
`google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: saving attendees in ${
|
||||
endTime - startTime
|
||||
}ms.`,
|
||||
);
|
||||
|
||||
startTime = Date.now();
|
||||
|
||||
await this.calendarEventAttendeesRepository.updateCalendarEventAttendees(
|
||||
attendeesToUpdate,
|
||||
iCalUIDCalendarEventIdMap,
|
||||
workspaceId,
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
endTime = Date.now();
|
||||
|
||||
this.logger.log(
|
||||
`google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: updating attendees in ${
|
||||
endTime - startTime
|
||||
}ms.`,
|
||||
);
|
||||
});
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Error during google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}`,
|
||||
);
|
||||
});
|
||||
}
|
||||
} else {
|
||||
this.logger.log(
|
||||
`google calendar full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`,
|
||||
@ -252,11 +313,11 @@ export class GoogleCalendarFullSyncService {
|
||||
|
||||
startTime = Date.now();
|
||||
|
||||
// await this.calendarChannelService.updateSyncCursor(
|
||||
// nextSyncToken,
|
||||
// connectedAccount.id,
|
||||
// workspaceId,
|
||||
// );
|
||||
await this.calendarChannelRepository.updateSyncCursor(
|
||||
nextSyncToken,
|
||||
calendarChannel.id,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
endTime = Date.now();
|
||||
|
||||
|
||||
@ -211,6 +211,8 @@ export class GmailFullSyncService {
|
||||
this.logger.log(
|
||||
`gmail full-sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (errors.length) {
|
||||
|
||||
Reference in New Issue
Block a user