diff --git a/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts b/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts index a5c0d332c..bbcb96759 100644 --- a/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts +++ b/packages/twenty-server/src/modules/calendar/repositories/calendar-event-participant.repository.ts @@ -162,7 +162,6 @@ export class CalendarEventParticipantRepository { public async updateCalendarEventParticipantsAndReturnNewOnes( calendarEventParticipants: CalendarEventParticipant[], - iCalUIDCalendarEventIdMap: Map, workspaceId: string, transactionManager?: EntityManager, ): Promise { @@ -173,10 +172,10 @@ export class CalendarEventParticipantRepository { const dataSourceSchema = this.workspaceDataSourceService.getSchemaName(workspaceId); - const calendarEventIds = Array.from(iCalUIDCalendarEventIdMap.values()); - const existingCalendarEventParticipants = await this.getByCalendarEventIds( - calendarEventIds, + calendarEventParticipants.map( + (calendarEventParticipant) => calendarEventParticipant.calendarEventId, + ), workspaceId, transactionManager, ); @@ -205,23 +204,17 @@ export class CalendarEventParticipantRepository { transactionManager, ); - const values = calendarEventParticipants.map( - (calendarEventParticipant) => ({ - ...calendarEventParticipant, - calendarEventId: iCalUIDCalendarEventIdMap.get( - calendarEventParticipant.iCalUID, - ), - }), - ); - const { flattenedValues, valuesString } = - getFlattenedValuesAndValuesStringForBatchRawQuery(values, { - calendarEventId: 'uuid', - handle: 'text', - displayName: 'text', - isOrganizer: 'boolean', - responseStatus: `${dataSourceSchema}."calendarEventParticipant_responsestatus_enum"`, - }); + getFlattenedValuesAndValuesStringForBatchRawQuery( + calendarEventParticipants, + { + calendarEventId: 'uuid', + handle: 'text', + displayName: 'text', + isOrganizer: 'boolean', + responseStatus: `${dataSourceSchema}."calendarEventParticipant_responsestatus_enum"`, + }, + ); await this.workspaceDataSourceService.executeRawQuery( `UPDATE ${dataSourceSchema}."calendarEventParticipant" AS "calendarEventParticipant" diff --git a/packages/twenty-server/src/modules/calendar/repositories/calendar-event.repository.ts b/packages/twenty-server/src/modules/calendar/repositories/calendar-event.repository.ts index cefee02fc..4ac314e84 100644 --- a/packages/twenty-server/src/modules/calendar/repositories/calendar-event.repository.ts +++ b/packages/twenty-server/src/modules/calendar/repositories/calendar-event.repository.ts @@ -35,6 +35,26 @@ export class CalendarEventRepository { ); } + public async getByICalUIDs( + iCalUIDs: string[], + workspaceId: string, + transactionManager?: EntityManager, + ): Promise[]> { + if (iCalUIDs.length === 0) { + return []; + } + + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + return await this.workspaceDataSourceService.executeRawQuery( + `SELECT * FROM ${dataSourceSchema}."calendarEvent" WHERE "iCalUID" = ANY($1)`, + [iCalUIDs], + workspaceId, + transactionManager, + ); + } + public async deleteByIds( calendarEventIds: string[], workspaceId: string, @@ -80,11 +100,11 @@ export class CalendarEventRepository { } public async getICalUIDCalendarEventIdMap( - calendarEventIds: string[], + iCalUIDs: string[], workspaceId: string, transactionManager?: EntityManager, ): Promise> { - if (calendarEventIds.length === 0) { + if (iCalUIDs.length === 0) { return new Map(); } @@ -97,8 +117,8 @@ export class CalendarEventRepository { iCalUID: string; }[] | undefined = await this.workspaceDataSourceService.executeRawQuery( - `SELECT id, "iCalUID" FROM ${dataSourceSchema}."calendarEvent" WHERE "id" = ANY($1)`, - [calendarEventIds], + `SELECT id, "iCalUID" FROM ${dataSourceSchema}."calendarEvent" WHERE "iCalUID" = ANY($1)`, + [iCalUIDs], workspaceId, transactionManager, ); diff --git a/packages/twenty-server/src/modules/calendar/services/google-calendar-sync/google-calendar-sync.service.ts b/packages/twenty-server/src/modules/calendar/services/google-calendar-sync/google-calendar-sync.service.ts index c1d62f4a0..1ef42e210 100644 --- a/packages/twenty-server/src/modules/calendar/services/google-calendar-sync/google-calendar-sync.service.ts +++ b/packages/twenty-server/src/modules/calendar/services/google-calendar-sync/google-calendar-sync.service.ts @@ -26,15 +26,15 @@ import { CalendarEventParticipantObjectMetadata } from 'src/modules/calendar/sta import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata'; import { CalendarEventCleanerService } from 'src/modules/calendar/services/calendar-event-cleaner/calendar-event-cleaner.service'; import { CalendarEventParticipantService } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service'; -import { CalendarEventParticipant } from 'src/modules/calendar/types/calendar-event'; +import { CalendarEventWithParticipants } from 'src/modules/calendar/types/calendar-event'; import { filterOutBlocklistedEvents } from 'src/modules/calendar/utils/filter-out-blocklisted-events.util'; +import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; import { CreateCompanyAndContactJobData, CreateCompanyAndContactJob, } from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job'; -import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; -import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; @Injectable() export class GoogleCalendarSyncService { @@ -102,70 +102,12 @@ export class GoogleCalendarSyncService { const calendarChannelId = calendarChannel.id; - const googleCalendarClient = - await this.googleCalendarClientProvider.getGoogleCalendarClient( - refreshToken, - ); - - const isBlocklistEnabledFeatureFlag = - await this.featureFlagRepository.findOneBy({ - workspaceId, - key: FeatureFlagKeys.IsBlocklistEnabled, - value: true, - }); - - const isBlocklistEnabled = - isBlocklistEnabledFeatureFlag && isBlocklistEnabledFeatureFlag.value; - - const blocklist = isBlocklistEnabled - ? await this.blocklistRepository.getByWorkspaceMemberId( - workspaceMemberId, - workspaceId, - ) - : []; - - const blocklistedEmails = blocklist.map((blocklist) => blocklist.handle); - - let startTime = Date.now(); - - let nextSyncToken: string | null | undefined; - let nextPageToken: string | undefined; - const events: calendarV3.Schema$Event[] = []; - - let hasMoreEvents = true; - - while (hasMoreEvents) { - const googleCalendarEvents = await googleCalendarClient.events.list({ - calendarId: 'primary', - maxResults: 500, - syncToken: emailOrDomainToReimport ? undefined : syncToken, - pageToken: nextPageToken, - q: emailOrDomainToReimport, - showDeleted: true, - }); - - nextSyncToken = googleCalendarEvents.data.nextSyncToken; - nextPageToken = googleCalendarEvents.data.nextPageToken || undefined; - - const { items } = googleCalendarEvents.data; - - if (!items || items.length === 0) { - break; - } - - events.push(...items); - - if (!nextPageToken) { - hasMoreEvents = false; - } - } - - let endTime = Date.now(); - - this.logger.log( - `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} getting events list in ${ - endTime - startTime - }ms.`, + const { events, nextSyncToken } = await this.getEventsFromGoogleCalendar( + refreshToken, + workspaceId, + connectedAccountId, + emailOrDomainToReimport, + syncToken, ); if (!events || events?.length === 0) { @@ -176,7 +118,11 @@ export class GoogleCalendarSyncService { return; } - let filteredEvents = filterOutBlocklistedEvents(events, blocklistedEmails); + const blocklist = await this.getBlocklist(workspaceMemberId, workspaceId); + + let filteredEvents = filterOutBlocklistedEvents(events, blocklist).filter( + (event) => event.status !== 'cancelled', + ); if (emailOrDomainToReimport) { // We still need to filter the events to only keep the ones that have the email or domain we want to reimport @@ -190,13 +136,46 @@ export class GoogleCalendarSyncService { ); } - const eventExternalIds = filteredEvents.map((event) => event.id as string); + const cancelledEventExternalIds = filteredEvents + .filter((event) => event.status === 'cancelled') + .map((event) => event.id as string); + + const iCalUIDCalendarEventIdMap = + await this.calendarEventRepository.getICalUIDCalendarEventIdMap( + filteredEvents.map((calendarEvent) => calendarEvent.iCalUID as string), + workspaceId, + ); + + const formattedEvents = filteredEvents.map((event) => + formatGoogleCalendarEvent(event, iCalUIDCalendarEventIdMap), + ); + + // TODO: When we will be able to add unicity contraint on iCalUID, we will do a INSERT ON CONFLICT DO UPDATE + + let startTime = Date.now(); + + const existingEvents = await this.calendarEventRepository.getByICalUIDs( + formattedEvents.map((event) => event.iCalUID), + workspaceId, + ); + + const existingEventsICalUIDs = existingEvents.map((event) => event.iCalUID); + + let endTime = Date.now(); + + const eventsToSave = formattedEvents.filter( + (event) => !existingEventsICalUIDs.includes(event.iCalUID), + ); + + const eventsToUpdate = formattedEvents.filter((event) => + existingEventsICalUIDs.includes(event.iCalUID), + ); startTime = Date.now(); const existingCalendarChannelEventAssociations = await this.calendarChannelEventAssociationRepository.getByEventExternalIdsAndCalendarChannelId( - eventExternalIds, + formattedEvents.map((event) => event.externalId), calendarChannelId, workspaceId, ); @@ -209,198 +188,58 @@ export class GoogleCalendarSyncService { }ms.`, ); - // TODO: When we will be able to add unicity contraint on iCalUID, we will do a INSERT ON CONFLICT DO UPDATE - - const existingEventExternalIds = - existingCalendarChannelEventAssociations.map( - (association) => association.eventExternalId, - ); - - const existingEventsIds = existingCalendarChannelEventAssociations.map( - (association) => association.calendarEventId, - ); - - const iCalUIDCalendarEventIdMap = - await this.calendarEventRepository.getICalUIDCalendarEventIdMap( - existingEventsIds, - workspaceId, - ); - - const formattedEvents = filteredEvents - .filter((event) => event.status !== 'cancelled') - .map((event) => - formatGoogleCalendarEvent(event, iCalUIDCalendarEventIdMap), - ); - - const eventsToSave = formattedEvents.filter( - (event) => !existingEventExternalIds.includes(event.externalId), - ); - - const eventsToUpdate = formattedEvents.filter((event) => - existingEventExternalIds.includes(event.externalId), - ); - - const cancelledEventExternalIds = filteredEvents - .filter((event) => event.status === 'cancelled') - .map((event) => event.id as string); - - const calendarChannelEventAssociationsToSave = eventsToSave.map( - (event) => ({ + const calendarChannelEventAssociationsToSave = formattedEvents + .filter( + (event) => + !existingCalendarChannelEventAssociations.some( + (association) => association.eventExternalId === event.id, + ), + ) + .map((event) => ({ calendarEventId: event.id, eventExternalId: event.externalId, calendarChannelId, - }), - ); + })); - const participantsToSave = eventsToSave.flatMap( - (event) => event.participants, - ); + if (events.length > 0) { + await this.saveGoogleCalendarEvents( + eventsToSave, + eventsToUpdate, + calendarChannelEventAssociationsToSave, + connectedAccount, + calendarChannel, + workspaceId, + ); - const participantsToUpdate = eventsToUpdate.flatMap( - (event) => event.participants, - ); + startTime = Date.now(); - let newCalendarEventParticipants: CalendarEventParticipant[] = []; + await this.calendarChannelEventAssociationRepository.deleteByEventExternalIdsAndCalendarChannelId( + cancelledEventExternalIds, + calendarChannelId, + workspaceId, + ); - if (filteredEvents.length > 0) { - const dataSourceMetadata = - await this.workspaceDataSourceService.connectToWorkspaceDataSource( - workspaceId, - ); + endTime = Date.now(); - try { - dataSourceMetadata?.transaction(async (transactionManager) => { - startTime = Date.now(); + this.logger.log( + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: deleting calendar channel event associations in ${ + endTime - startTime + }ms.`, + ); - await this.calendarEventRepository.saveCalendarEvents( - eventsToSave, - workspaceId, - transactionManager, - ); + startTime = Date.now(); - endTime = Date.now(); + await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents( + workspaceId, + ); - this.logger.log( - `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: saving ${ - eventsToSave.length - } events in ${endTime - startTime}ms.`, - ); + endTime = Date.now(); - startTime = Date.now(); - - await this.calendarEventRepository.updateCalendarEvents( - eventsToUpdate, - workspaceId, - transactionManager, - ); - - endTime = Date.now(); - - this.logger.log( - `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: updating ${ - eventsToUpdate.length - } events in ${endTime - startTime}ms.`, - ); - - startTime = Date.now(); - - await this.calendarChannelEventAssociationRepository.saveCalendarChannelEventAssociations( - calendarChannelEventAssociationsToSave, - workspaceId, - transactionManager, - ); - - endTime = Date.now(); - - this.logger.log( - `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: saving calendar channel event associations in ${ - endTime - startTime - }ms.`, - ); - - startTime = Date.now(); - - newCalendarEventParticipants = - await this.calendarEventParticipantsRepository.updateCalendarEventParticipantsAndReturnNewOnes( - participantsToUpdate, - iCalUIDCalendarEventIdMap, - workspaceId, - transactionManager, - ); - - endTime = Date.now(); - - participantsToSave.push(...newCalendarEventParticipants); - - this.logger.log( - `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: updating participants in ${ - endTime - startTime - }ms.`, - ); - - startTime = Date.now(); - - await this.calendarEventParticipantsService.saveCalendarEventParticipants( - participantsToSave, - workspaceId, - transactionManager, - ); - - endTime = Date.now(); - - this.logger.log( - `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: saving participants in ${ - endTime - startTime - }ms.`, - ); - - startTime = Date.now(); - - await this.calendarChannelEventAssociationRepository.deleteByEventExternalIdsAndCalendarChannelId( - cancelledEventExternalIds, - calendarChannelId, - workspaceId, - transactionManager, - ); - - endTime = Date.now(); - - this.logger.log( - `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: deleting calendar channel event associations in ${ - endTime - startTime - }ms.`, - ); - }); - - startTime = Date.now(); - - await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents( - workspaceId, - ); - - endTime = Date.now(); - - this.logger.log( - `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: cleaning calendar events in ${ - endTime - startTime - }ms.`, - ); - - if (calendarChannel.isContactAutoCreationEnabled) { - await this.messageQueueService.add( - CreateCompanyAndContactJob.name, - { - workspaceId, - connectedAccountHandle: connectedAccount.handle, - contactsToCreate: participantsToSave, - }, - ); - } - } catch (error) { - this.logger.error( - `Error during google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}`, - ); - } + this.logger.log( + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: cleaning calendar events in ${ + endTime - startTime + }ms.`, + ); } else { this.logger.log( `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`, @@ -435,4 +274,220 @@ export class GoogleCalendarSyncService { }done.`, ); } + + public async getBlocklist(workspaceMemberId: string, workspaceId: string) { + const isBlocklistEnabledFeatureFlag = + await this.featureFlagRepository.findOneBy({ + workspaceId, + key: FeatureFlagKeys.IsBlocklistEnabled, + value: true, + }); + + const isBlocklistEnabled = + isBlocklistEnabledFeatureFlag && isBlocklistEnabledFeatureFlag.value; + + const blocklist = isBlocklistEnabled + ? await this.blocklistRepository.getByWorkspaceMemberId( + workspaceMemberId, + workspaceId, + ) + : []; + + return blocklist.map((blocklist) => blocklist.handle); + } + + public async getEventsFromGoogleCalendar( + refreshToken: string, + workspaceId: string, + connectedAccountId: string, + emailOrDomainToReimport?: string, + syncToken?: string, + ): Promise<{ + events: calendarV3.Schema$Event[]; + nextSyncToken: string | null | undefined; + }> { + const googleCalendarClient = + await this.googleCalendarClientProvider.getGoogleCalendarClient( + refreshToken, + ); + + const startTime = Date.now(); + + let nextSyncToken: string | null | undefined; + let nextPageToken: string | undefined; + const events: calendarV3.Schema$Event[] = []; + + let hasMoreEvents = true; + + while (hasMoreEvents) { + const googleCalendarEvents = await googleCalendarClient.events.list({ + calendarId: 'primary', + maxResults: 500, + syncToken: emailOrDomainToReimport ? undefined : syncToken, + pageToken: nextPageToken, + q: emailOrDomainToReimport, + showDeleted: true, + }); + + nextSyncToken = googleCalendarEvents.data.nextSyncToken; + nextPageToken = googleCalendarEvents.data.nextPageToken || undefined; + + const { items } = googleCalendarEvents.data; + + if (!items || items.length === 0) { + break; + } + + events.push(...items); + + if (!nextPageToken) { + hasMoreEvents = false; + } + } + + const endTime = Date.now(); + + this.logger.log( + `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} getting events list in ${ + endTime - startTime + }ms.`, + ); + + return { events, nextSyncToken }; + } + + public async saveGoogleCalendarEvents( + eventsToSave: CalendarEventWithParticipants[], + eventsToUpdate: CalendarEventWithParticipants[], + calendarChannelEventAssociationsToSave: { + calendarEventId: string; + eventExternalId: string; + calendarChannelId: string; + }[], + connectedAccount: ConnectedAccountObjectMetadata, + calendarChannel: CalendarChannelObjectMetadata, + workspaceId: string, + ): Promise { + const dataSourceMetadata = + await this.workspaceDataSourceService.connectToWorkspaceDataSource( + workspaceId, + ); + + const participantsToSave = eventsToSave.flatMap( + (event) => event.participants, + ); + + const participantsToUpdate = eventsToUpdate.flatMap( + (event) => event.participants, + ); + + let startTime: number; + let endTime: number; + + try { + dataSourceMetadata?.transaction(async (transactionManager) => { + startTime = Date.now(); + + await this.calendarEventRepository.saveCalendarEvents( + eventsToSave, + workspaceId, + transactionManager, + ); + + endTime = Date.now(); + + this.logger.log( + `google calendar sync for workspace ${workspaceId} and account ${ + connectedAccount.id + }: saving ${eventsToSave.length} events in ${endTime - startTime}ms.`, + ); + + startTime = Date.now(); + + await this.calendarEventRepository.updateCalendarEvents( + eventsToUpdate, + workspaceId, + transactionManager, + ); + + endTime = Date.now(); + + this.logger.log( + `google calendar sync for workspace ${workspaceId} and account ${ + connectedAccount.id + }: updating ${eventsToUpdate.length} events in ${ + endTime - startTime + }ms.`, + ); + + startTime = Date.now(); + + await this.calendarChannelEventAssociationRepository.saveCalendarChannelEventAssociations( + calendarChannelEventAssociationsToSave, + workspaceId, + transactionManager, + ); + + endTime = Date.now(); + + this.logger.log( + `google calendar sync for workspace ${workspaceId} and account ${ + connectedAccount.id + }: saving calendar channel event associations in ${ + endTime - startTime + }ms.`, + ); + + startTime = Date.now(); + + const newCalendarEventParticipants = + await this.calendarEventParticipantsRepository.updateCalendarEventParticipantsAndReturnNewOnes( + participantsToUpdate, + workspaceId, + transactionManager, + ); + + endTime = Date.now(); + + participantsToSave.push(...newCalendarEventParticipants); + + this.logger.log( + `google calendar sync for workspace ${workspaceId} and account ${ + connectedAccount.id + }: updating participants in ${endTime - startTime}ms.`, + ); + + startTime = Date.now(); + + await this.calendarEventParticipantsService.saveCalendarEventParticipants( + participantsToSave, + workspaceId, + transactionManager, + ); + + endTime = Date.now(); + + this.logger.log( + `google calendar sync for workspace ${workspaceId} and account ${ + connectedAccount.id + }: saving participants in ${endTime - startTime}ms.`, + ); + }); + + if (calendarChannel.isContactAutoCreationEnabled) { + await this.messageQueueService.add( + CreateCompanyAndContactJob.name, + { + workspaceId, + connectedAccountHandle: connectedAccount.handle, + contactsToCreate: participantsToSave, + }, + ); + } + } catch (error) { + this.logger.error( + `Error during google calendar sync for workspace ${workspaceId} and account ${connectedAccount.id}: ${error.message}`, + ); + } + } }