Fix duplicated calendar events (#5209)

Fix duplicated calendar events when two workspace members participate to
the same event.
This commit is contained in:
bosiraphael
2024-04-29 15:23:40 +02:00
committed by GitHub
parent 9809298753
commit 6cafd25c97
3 changed files with 343 additions and 275 deletions

View File

@ -162,7 +162,6 @@ export class CalendarEventParticipantRepository {
public async updateCalendarEventParticipantsAndReturnNewOnes( public async updateCalendarEventParticipantsAndReturnNewOnes(
calendarEventParticipants: CalendarEventParticipant[], calendarEventParticipants: CalendarEventParticipant[],
iCalUIDCalendarEventIdMap: Map<string, string>,
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager, transactionManager?: EntityManager,
): Promise<CalendarEventParticipant[]> { ): Promise<CalendarEventParticipant[]> {
@ -173,10 +172,10 @@ export class CalendarEventParticipantRepository {
const dataSourceSchema = const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId); this.workspaceDataSourceService.getSchemaName(workspaceId);
const calendarEventIds = Array.from(iCalUIDCalendarEventIdMap.values());
const existingCalendarEventParticipants = await this.getByCalendarEventIds( const existingCalendarEventParticipants = await this.getByCalendarEventIds(
calendarEventIds, calendarEventParticipants.map(
(calendarEventParticipant) => calendarEventParticipant.calendarEventId,
),
workspaceId, workspaceId,
transactionManager, transactionManager,
); );
@ -205,23 +204,17 @@ export class CalendarEventParticipantRepository {
transactionManager, transactionManager,
); );
const values = calendarEventParticipants.map(
(calendarEventParticipant) => ({
...calendarEventParticipant,
calendarEventId: iCalUIDCalendarEventIdMap.get(
calendarEventParticipant.iCalUID,
),
}),
);
const { flattenedValues, valuesString } = const { flattenedValues, valuesString } =
getFlattenedValuesAndValuesStringForBatchRawQuery(values, { getFlattenedValuesAndValuesStringForBatchRawQuery(
calendarEventId: 'uuid', calendarEventParticipants,
handle: 'text', {
displayName: 'text', calendarEventId: 'uuid',
isOrganizer: 'boolean', handle: 'text',
responseStatus: `${dataSourceSchema}."calendarEventParticipant_responsestatus_enum"`, displayName: 'text',
}); isOrganizer: 'boolean',
responseStatus: `${dataSourceSchema}."calendarEventParticipant_responsestatus_enum"`,
},
);
await this.workspaceDataSourceService.executeRawQuery( await this.workspaceDataSourceService.executeRawQuery(
`UPDATE ${dataSourceSchema}."calendarEventParticipant" AS "calendarEventParticipant" `UPDATE ${dataSourceSchema}."calendarEventParticipant" AS "calendarEventParticipant"

View File

@ -35,6 +35,26 @@ export class CalendarEventRepository {
); );
} }
public async getByICalUIDs(
iCalUIDs: string[],
workspaceId: string,
transactionManager?: EntityManager,
): Promise<ObjectRecord<CalendarEventObjectMetadata>[]> {
if (iCalUIDs.length === 0) {
return [];
}
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
return await this.workspaceDataSourceService.executeRawQuery(
`SELECT * FROM ${dataSourceSchema}."calendarEvent" WHERE "iCalUID" = ANY($1)`,
[iCalUIDs],
workspaceId,
transactionManager,
);
}
public async deleteByIds( public async deleteByIds(
calendarEventIds: string[], calendarEventIds: string[],
workspaceId: string, workspaceId: string,
@ -80,11 +100,11 @@ export class CalendarEventRepository {
} }
public async getICalUIDCalendarEventIdMap( public async getICalUIDCalendarEventIdMap(
calendarEventIds: string[], iCalUIDs: string[],
workspaceId: string, workspaceId: string,
transactionManager?: EntityManager, transactionManager?: EntityManager,
): Promise<Map<string, string>> { ): Promise<Map<string, string>> {
if (calendarEventIds.length === 0) { if (iCalUIDs.length === 0) {
return new Map(); return new Map();
} }
@ -97,8 +117,8 @@ export class CalendarEventRepository {
iCalUID: string; iCalUID: string;
}[] }[]
| undefined = await this.workspaceDataSourceService.executeRawQuery( | undefined = await this.workspaceDataSourceService.executeRawQuery(
`SELECT id, "iCalUID" FROM ${dataSourceSchema}."calendarEvent" WHERE "id" = ANY($1)`, `SELECT id, "iCalUID" FROM ${dataSourceSchema}."calendarEvent" WHERE "iCalUID" = ANY($1)`,
[calendarEventIds], [iCalUIDs],
workspaceId, workspaceId,
transactionManager, transactionManager,
); );

View File

@ -26,15 +26,15 @@ import { CalendarEventParticipantObjectMetadata } from 'src/modules/calendar/sta
import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata'; import { BlocklistObjectMetadata } from 'src/modules/connected-account/standard-objects/blocklist.object-metadata';
import { CalendarEventCleanerService } from 'src/modules/calendar/services/calendar-event-cleaner/calendar-event-cleaner.service'; import { CalendarEventCleanerService } from 'src/modules/calendar/services/calendar-event-cleaner/calendar-event-cleaner.service';
import { CalendarEventParticipantService } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service'; import { CalendarEventParticipantService } from 'src/modules/calendar/services/calendar-event-participant/calendar-event-participant.service';
import { CalendarEventParticipant } from 'src/modules/calendar/types/calendar-event'; import { CalendarEventWithParticipants } from 'src/modules/calendar/types/calendar-event';
import { filterOutBlocklistedEvents } from 'src/modules/calendar/utils/filter-out-blocklisted-events.util'; import { filterOutBlocklistedEvents } from 'src/modules/calendar/utils/filter-out-blocklisted-events.util';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
import { import {
CreateCompanyAndContactJobData, CreateCompanyAndContactJobData,
CreateCompanyAndContactJob, CreateCompanyAndContactJob,
} from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job'; } from 'src/modules/connected-account/auto-companies-and-contacts-creation/jobs/create-company-and-contact.job';
import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service';
@Injectable() @Injectable()
export class GoogleCalendarSyncService { export class GoogleCalendarSyncService {
@ -102,70 +102,12 @@ export class GoogleCalendarSyncService {
const calendarChannelId = calendarChannel.id; const calendarChannelId = calendarChannel.id;
const googleCalendarClient = const { events, nextSyncToken } = await this.getEventsFromGoogleCalendar(
await this.googleCalendarClientProvider.getGoogleCalendarClient( refreshToken,
refreshToken, workspaceId,
); connectedAccountId,
emailOrDomainToReimport,
const isBlocklistEnabledFeatureFlag = syncToken,
await this.featureFlagRepository.findOneBy({
workspaceId,
key: FeatureFlagKeys.IsBlocklistEnabled,
value: true,
});
const isBlocklistEnabled =
isBlocklistEnabledFeatureFlag && isBlocklistEnabledFeatureFlag.value;
const blocklist = isBlocklistEnabled
? await this.blocklistRepository.getByWorkspaceMemberId(
workspaceMemberId,
workspaceId,
)
: [];
const blocklistedEmails = blocklist.map((blocklist) => blocklist.handle);
let startTime = Date.now();
let nextSyncToken: string | null | undefined;
let nextPageToken: string | undefined;
const events: calendarV3.Schema$Event[] = [];
let hasMoreEvents = true;
while (hasMoreEvents) {
const googleCalendarEvents = await googleCalendarClient.events.list({
calendarId: 'primary',
maxResults: 500,
syncToken: emailOrDomainToReimport ? undefined : syncToken,
pageToken: nextPageToken,
q: emailOrDomainToReimport,
showDeleted: true,
});
nextSyncToken = googleCalendarEvents.data.nextSyncToken;
nextPageToken = googleCalendarEvents.data.nextPageToken || undefined;
const { items } = googleCalendarEvents.data;
if (!items || items.length === 0) {
break;
}
events.push(...items);
if (!nextPageToken) {
hasMoreEvents = false;
}
}
let endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} getting events list in ${
endTime - startTime
}ms.`,
); );
if (!events || events?.length === 0) { if (!events || events?.length === 0) {
@ -176,7 +118,11 @@ export class GoogleCalendarSyncService {
return; return;
} }
let filteredEvents = filterOutBlocklistedEvents(events, blocklistedEmails); const blocklist = await this.getBlocklist(workspaceMemberId, workspaceId);
let filteredEvents = filterOutBlocklistedEvents(events, blocklist).filter(
(event) => event.status !== 'cancelled',
);
if (emailOrDomainToReimport) { if (emailOrDomainToReimport) {
// We still need to filter the events to only keep the ones that have the email or domain we want to reimport // We still need to filter the events to only keep the ones that have the email or domain we want to reimport
@ -190,13 +136,46 @@ export class GoogleCalendarSyncService {
); );
} }
const eventExternalIds = filteredEvents.map((event) => event.id as string); const cancelledEventExternalIds = filteredEvents
.filter((event) => event.status === 'cancelled')
.map((event) => event.id as string);
const iCalUIDCalendarEventIdMap =
await this.calendarEventRepository.getICalUIDCalendarEventIdMap(
filteredEvents.map((calendarEvent) => calendarEvent.iCalUID as string),
workspaceId,
);
const formattedEvents = filteredEvents.map((event) =>
formatGoogleCalendarEvent(event, iCalUIDCalendarEventIdMap),
);
// TODO: When we will be able to add unicity contraint on iCalUID, we will do a INSERT ON CONFLICT DO UPDATE
let startTime = Date.now();
const existingEvents = await this.calendarEventRepository.getByICalUIDs(
formattedEvents.map((event) => event.iCalUID),
workspaceId,
);
const existingEventsICalUIDs = existingEvents.map((event) => event.iCalUID);
let endTime = Date.now();
const eventsToSave = formattedEvents.filter(
(event) => !existingEventsICalUIDs.includes(event.iCalUID),
);
const eventsToUpdate = formattedEvents.filter((event) =>
existingEventsICalUIDs.includes(event.iCalUID),
);
startTime = Date.now(); startTime = Date.now();
const existingCalendarChannelEventAssociations = const existingCalendarChannelEventAssociations =
await this.calendarChannelEventAssociationRepository.getByEventExternalIdsAndCalendarChannelId( await this.calendarChannelEventAssociationRepository.getByEventExternalIdsAndCalendarChannelId(
eventExternalIds, formattedEvents.map((event) => event.externalId),
calendarChannelId, calendarChannelId,
workspaceId, workspaceId,
); );
@ -209,198 +188,58 @@ export class GoogleCalendarSyncService {
}ms.`, }ms.`,
); );
// TODO: When we will be able to add unicity contraint on iCalUID, we will do a INSERT ON CONFLICT DO UPDATE const calendarChannelEventAssociationsToSave = formattedEvents
.filter(
const existingEventExternalIds = (event) =>
existingCalendarChannelEventAssociations.map( !existingCalendarChannelEventAssociations.some(
(association) => association.eventExternalId, (association) => association.eventExternalId === event.id,
); ),
)
const existingEventsIds = existingCalendarChannelEventAssociations.map( .map((event) => ({
(association) => association.calendarEventId,
);
const iCalUIDCalendarEventIdMap =
await this.calendarEventRepository.getICalUIDCalendarEventIdMap(
existingEventsIds,
workspaceId,
);
const formattedEvents = filteredEvents
.filter((event) => event.status !== 'cancelled')
.map((event) =>
formatGoogleCalendarEvent(event, iCalUIDCalendarEventIdMap),
);
const eventsToSave = formattedEvents.filter(
(event) => !existingEventExternalIds.includes(event.externalId),
);
const eventsToUpdate = formattedEvents.filter((event) =>
existingEventExternalIds.includes(event.externalId),
);
const cancelledEventExternalIds = filteredEvents
.filter((event) => event.status === 'cancelled')
.map((event) => event.id as string);
const calendarChannelEventAssociationsToSave = eventsToSave.map(
(event) => ({
calendarEventId: event.id, calendarEventId: event.id,
eventExternalId: event.externalId, eventExternalId: event.externalId,
calendarChannelId, calendarChannelId,
}), }));
);
const participantsToSave = eventsToSave.flatMap( if (events.length > 0) {
(event) => event.participants, await this.saveGoogleCalendarEvents(
); eventsToSave,
eventsToUpdate,
calendarChannelEventAssociationsToSave,
connectedAccount,
calendarChannel,
workspaceId,
);
const participantsToUpdate = eventsToUpdate.flatMap( startTime = Date.now();
(event) => event.participants,
);
let newCalendarEventParticipants: CalendarEventParticipant[] = []; await this.calendarChannelEventAssociationRepository.deleteByEventExternalIdsAndCalendarChannelId(
cancelledEventExternalIds,
calendarChannelId,
workspaceId,
);
if (filteredEvents.length > 0) { endTime = Date.now();
const dataSourceMetadata =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
try { this.logger.log(
dataSourceMetadata?.transaction(async (transactionManager) => { `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: deleting calendar channel event associations in ${
startTime = Date.now(); endTime - startTime
}ms.`,
);
await this.calendarEventRepository.saveCalendarEvents( startTime = Date.now();
eventsToSave,
workspaceId,
transactionManager,
);
endTime = Date.now(); await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents(
workspaceId,
);
this.logger.log( endTime = Date.now();
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: saving ${
eventsToSave.length
} events in ${endTime - startTime}ms.`,
);
startTime = Date.now(); this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: cleaning calendar events in ${
await this.calendarEventRepository.updateCalendarEvents( endTime - startTime
eventsToUpdate, }ms.`,
workspaceId, );
transactionManager,
);
endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: updating ${
eventsToUpdate.length
} events in ${endTime - startTime}ms.`,
);
startTime = Date.now();
await this.calendarChannelEventAssociationRepository.saveCalendarChannelEventAssociations(
calendarChannelEventAssociationsToSave,
workspaceId,
transactionManager,
);
endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: saving calendar channel event associations in ${
endTime - startTime
}ms.`,
);
startTime = Date.now();
newCalendarEventParticipants =
await this.calendarEventParticipantsRepository.updateCalendarEventParticipantsAndReturnNewOnes(
participantsToUpdate,
iCalUIDCalendarEventIdMap,
workspaceId,
transactionManager,
);
endTime = Date.now();
participantsToSave.push(...newCalendarEventParticipants);
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: updating participants in ${
endTime - startTime
}ms.`,
);
startTime = Date.now();
await this.calendarEventParticipantsService.saveCalendarEventParticipants(
participantsToSave,
workspaceId,
transactionManager,
);
endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: saving participants in ${
endTime - startTime
}ms.`,
);
startTime = Date.now();
await this.calendarChannelEventAssociationRepository.deleteByEventExternalIdsAndCalendarChannelId(
cancelledEventExternalIds,
calendarChannelId,
workspaceId,
transactionManager,
);
endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: deleting calendar channel event associations in ${
endTime - startTime
}ms.`,
);
});
startTime = Date.now();
await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents(
workspaceId,
);
endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: cleaning calendar events in ${
endTime - startTime
}ms.`,
);
if (calendarChannel.isContactAutoCreationEnabled) {
await this.messageQueueService.add<CreateCompanyAndContactJobData>(
CreateCompanyAndContactJob.name,
{
workspaceId,
connectedAccountHandle: connectedAccount.handle,
contactsToCreate: participantsToSave,
},
);
}
} catch (error) {
this.logger.error(
`Error during google calendar sync for workspace ${workspaceId} and account ${connectedAccountId}: ${error.message}`,
);
}
} else { } else {
this.logger.log( this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`, `google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} done with nothing to import.`,
@ -435,4 +274,220 @@ export class GoogleCalendarSyncService {
}done.`, }done.`,
); );
} }
public async getBlocklist(workspaceMemberId: string, workspaceId: string) {
const isBlocklistEnabledFeatureFlag =
await this.featureFlagRepository.findOneBy({
workspaceId,
key: FeatureFlagKeys.IsBlocklistEnabled,
value: true,
});
const isBlocklistEnabled =
isBlocklistEnabledFeatureFlag && isBlocklistEnabledFeatureFlag.value;
const blocklist = isBlocklistEnabled
? await this.blocklistRepository.getByWorkspaceMemberId(
workspaceMemberId,
workspaceId,
)
: [];
return blocklist.map((blocklist) => blocklist.handle);
}
public async getEventsFromGoogleCalendar(
refreshToken: string,
workspaceId: string,
connectedAccountId: string,
emailOrDomainToReimport?: string,
syncToken?: string,
): Promise<{
events: calendarV3.Schema$Event[];
nextSyncToken: string | null | undefined;
}> {
const googleCalendarClient =
await this.googleCalendarClientProvider.getGoogleCalendarClient(
refreshToken,
);
const startTime = Date.now();
let nextSyncToken: string | null | undefined;
let nextPageToken: string | undefined;
const events: calendarV3.Schema$Event[] = [];
let hasMoreEvents = true;
while (hasMoreEvents) {
const googleCalendarEvents = await googleCalendarClient.events.list({
calendarId: 'primary',
maxResults: 500,
syncToken: emailOrDomainToReimport ? undefined : syncToken,
pageToken: nextPageToken,
q: emailOrDomainToReimport,
showDeleted: true,
});
nextSyncToken = googleCalendarEvents.data.nextSyncToken;
nextPageToken = googleCalendarEvents.data.nextPageToken || undefined;
const { items } = googleCalendarEvents.data;
if (!items || items.length === 0) {
break;
}
events.push(...items);
if (!nextPageToken) {
hasMoreEvents = false;
}
}
const endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${connectedAccountId} getting events list in ${
endTime - startTime
}ms.`,
);
return { events, nextSyncToken };
}
public async saveGoogleCalendarEvents(
eventsToSave: CalendarEventWithParticipants[],
eventsToUpdate: CalendarEventWithParticipants[],
calendarChannelEventAssociationsToSave: {
calendarEventId: string;
eventExternalId: string;
calendarChannelId: string;
}[],
connectedAccount: ConnectedAccountObjectMetadata,
calendarChannel: CalendarChannelObjectMetadata,
workspaceId: string,
): Promise<void> {
const dataSourceMetadata =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
workspaceId,
);
const participantsToSave = eventsToSave.flatMap(
(event) => event.participants,
);
const participantsToUpdate = eventsToUpdate.flatMap(
(event) => event.participants,
);
let startTime: number;
let endTime: number;
try {
dataSourceMetadata?.transaction(async (transactionManager) => {
startTime = Date.now();
await this.calendarEventRepository.saveCalendarEvents(
eventsToSave,
workspaceId,
transactionManager,
);
endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${
connectedAccount.id
}: saving ${eventsToSave.length} events in ${endTime - startTime}ms.`,
);
startTime = Date.now();
await this.calendarEventRepository.updateCalendarEvents(
eventsToUpdate,
workspaceId,
transactionManager,
);
endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${
connectedAccount.id
}: updating ${eventsToUpdate.length} events in ${
endTime - startTime
}ms.`,
);
startTime = Date.now();
await this.calendarChannelEventAssociationRepository.saveCalendarChannelEventAssociations(
calendarChannelEventAssociationsToSave,
workspaceId,
transactionManager,
);
endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${
connectedAccount.id
}: saving calendar channel event associations in ${
endTime - startTime
}ms.`,
);
startTime = Date.now();
const newCalendarEventParticipants =
await this.calendarEventParticipantsRepository.updateCalendarEventParticipantsAndReturnNewOnes(
participantsToUpdate,
workspaceId,
transactionManager,
);
endTime = Date.now();
participantsToSave.push(...newCalendarEventParticipants);
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${
connectedAccount.id
}: updating participants in ${endTime - startTime}ms.`,
);
startTime = Date.now();
await this.calendarEventParticipantsService.saveCalendarEventParticipants(
participantsToSave,
workspaceId,
transactionManager,
);
endTime = Date.now();
this.logger.log(
`google calendar sync for workspace ${workspaceId} and account ${
connectedAccount.id
}: saving participants in ${endTime - startTime}ms.`,
);
});
if (calendarChannel.isContactAutoCreationEnabled) {
await this.messageQueueService.add<CreateCompanyAndContactJobData>(
CreateCompanyAndContactJob.name,
{
workspaceId,
connectedAccountHandle: connectedAccount.handle,
contactsToCreate: participantsToSave,
},
);
}
} catch (error) {
this.logger.error(
`Error during google calendar sync for workspace ${workspaceId} and account ${connectedAccount.id}: ${error.message}`,
);
}
}
} }