o365 calendar sync (#8044)

Implemented:

* Account Connect
* Calendar sync via delta ids then requesting single events


I think I would split the messaging part into a second pr - that's a
step more complex then the calendar :)

---------

Co-authored-by: bosiraphael <raphael.bosi@gmail.com>
This commit is contained in:
brendanlaschke
2024-11-07 18:13:22 +01:00
committed by GitHub
parent 83f3963bfb
commit f9c076df31
50 changed files with 1417 additions and 118 deletions

View File

@ -10,14 +10,19 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { CalendarEventCleanerModule } from 'src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module';
import { CalendarEventListFetchCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command';
import { CalendarEventsImportCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-import.cron.command';
import { CalendarOngoingStaleCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command';
import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job';
import { CalendarEventsImportCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job';
import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job';
import { GoogleCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/google-calendar-driver.module';
import { MicrosoftCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/microsoft-calendar-driver.module';
import { CalendarEventListFetchJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job';
import { CalendarEventsImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-events-import.job';
import { CalendarOngoingStaleJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job';
import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service';
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
import { CalendarFetchEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service';
import { CalendarGetCalendarEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service';
import { CalendarSaveEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service';
import { CalendarEventParticipantManagerModule } from 'src/modules/calendar/calendar-event-participant-manager/calendar-event-participant-manager.module';
@ -39,6 +44,7 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta
WorkspaceDataSourceModule,
CalendarEventCleanerModule,
GoogleCalendarDriverModule,
MicrosoftCalendarDriverModule,
BillingModule,
RefreshAccessTokenManagerModule,
CalendarEventParticipantManagerModule,
@ -48,16 +54,20 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta
providers: [
CalendarChannelSyncStatusService,
CalendarEventsImportService,
CalendarFetchEventsService,
CalendarEventImportErrorHandlerService,
CalendarGetCalendarEventsService,
CalendarSaveEventsService,
CalendarEventListFetchCronJob,
CalendarEventListFetchCronCommand,
CalendarEventListFetchJob,
CalendarEventsImportCronJob,
CalendarEventsImportCronCommand,
CalendarEventsImportJob,
CalendarOngoingStaleCronJob,
CalendarOngoingStaleCronCommand,
CalendarOngoingStaleJob,
],
exports: [CalendarEventsImportService],
exports: [CalendarEventsImportService, CalendarFetchEventsService],
})
export class CalendarEventImportManagerModule {}

View File

@ -0,0 +1 @@
export const CALENDAR_EVENT_IMPORT_BATCH_SIZE = 100;

View File

@ -3,10 +3,8 @@ import { Command, CommandRunner } from 'nest-commander';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import {
CALENDAR_EVENTS_IMPORT_CRON_PATTERN,
CalendarEventListFetchCronJob,
} from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job';
import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job';
import { CALENDAR_EVENTS_IMPORT_CRON_PATTERN } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job';
@Command({
name: 'cron:calendar:calendar-event-list-fetch',

View File

@ -0,0 +1,32 @@
import { Command, CommandRunner } from 'nest-commander';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import {
CALENDAR_EVENTS_IMPORT_CRON_PATTERN,
CalendarEventsImportCronJob,
} from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-events-import.cron.job';
@Command({
name: 'cron:calendar:calendar-events-import',
description: 'Starts a cron job to import the calendar events',
})
export class CalendarEventsImportCronCommand extends CommandRunner {
constructor(
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}
async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
CalendarEventsImportCronJob.name,
undefined,
{
repeat: { pattern: CALENDAR_EVENTS_IMPORT_CRON_PATTERN },
},
);
}
}

View File

@ -16,11 +16,11 @@ import {
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import {
CalendarEventListFetchJob,
CalendarEventsImportJobData,
CalendarEventListFetchJobData,
} from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job';
import { CalendarChannelSyncStage } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
export const CALENDAR_EVENTS_IMPORT_CRON_PATTERN = '*/5 * * * *';
export const CALENDAR_EVENT_LIST_FETCH_CRON_PATTERN = '*/5 * * * *';
@Processor({
queueName: MessageQueue.cronQueue,
@ -38,7 +38,7 @@ export class CalendarEventListFetchCronJob {
@Process(CalendarEventListFetchCronJob.name)
@SentryCronMonitor(
CalendarEventListFetchCronJob.name,
CALENDAR_EVENTS_IMPORT_CRON_PATTERN,
CALENDAR_EVENT_LIST_FETCH_CRON_PATTERN,
)
async handle(): Promise<void> {
console.time('CalendarEventListFetchCronJob time');
@ -68,7 +68,7 @@ export class CalendarEventListFetchCronJob {
});
for (const calendarChannel of calendarChannels) {
await this.messageQueueService.add<CalendarEventsImportJobData>(
await this.messageQueueService.add<CalendarEventListFetchJobData>(
CalendarEventListFetchJob.name,
{
calendarChannelId: calendarChannel.id,

View File

@ -0,0 +1,87 @@
import { InjectRepository } from '@nestjs/typeorm';
import { Equal, Repository } from 'typeorm';
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import {
Workspace,
WorkspaceActivationStatus,
} from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { CalendarEventListFetchJobData } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job';
import { CalendarEventsImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-events-import.job';
import { CalendarChannelSyncStage } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
export const CALENDAR_EVENTS_IMPORT_CRON_PATTERN = '*/1 * * * *';
@Processor({
queueName: MessageQueue.cronQueue,
})
export class CalendarEventsImportCronJob {
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.calendarQueue)
private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly exceptionHandlerService: ExceptionHandlerService,
) {}
@Process(CalendarEventsImportCronJob.name)
@SentryCronMonitor(
CalendarEventsImportCronJob.name,
CALENDAR_EVENTS_IMPORT_CRON_PATTERN,
)
async handle(): Promise<void> {
console.time('CalendarEventsImportCronJob time');
const activeWorkspaces = await this.workspaceRepository.find({
where: {
activationStatus: WorkspaceActivationStatus.ACTIVE,
},
});
for (const activeWorkspace of activeWorkspaces) {
try {
const calendarChannelRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace(
activeWorkspace.id,
'calendarChannel',
);
const calendarChannels = await calendarChannelRepository.find({
where: {
isSyncEnabled: true,
syncStage: Equal(
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING,
),
},
});
for (const calendarChannel of calendarChannels) {
await this.messageQueueService.add<CalendarEventListFetchJobData>(
CalendarEventsImportJob.name,
{
calendarChannelId: calendarChannel.id,
workspaceId: activeWorkspace.id,
},
);
}
} catch (error) {
this.exceptionHandlerService.captureExceptions([error], {
user: {
workspaceId: activeWorkspace.id,
},
});
}
}
console.timeEnd('CalendarEventsImportCronJob time');
}
}

View File

@ -72,6 +72,7 @@ export class GoogleCalendarGetEventsService {
}
return {
fullEvents: true,
calendarEvents: formatGoogleCalendarEvents(events),
nextSyncCursor: nextSyncToken || '',
};

View File

@ -0,0 +1,20 @@
import { Module } from '@nestjs/common';
import { EnvironmentModule } from 'src/engine/core-modules/environment/environment.module';
import { MicrosoftCalendarGetEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-get-events.service';
import { MicrosoftCalendarImportEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-import-events.service';
import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service';
@Module({
imports: [EnvironmentModule],
providers: [
MicrosoftCalendarGetEventsService,
MicrosoftCalendarImportEventsService,
MicrosoftOAuth2ClientManagerService,
],
exports: [
MicrosoftCalendarGetEventsService,
MicrosoftCalendarImportEventsService,
],
})
export class MicrosoftCalendarDriverModule {}

View File

@ -0,0 +1,62 @@
import { Injectable } from '@nestjs/common';
import {
PageCollection,
PageIterator,
PageIteratorCallback,
} from '@microsoft/microsoft-graph-client';
import { parseMicrosoftCalendarError } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/parse-microsoft-calendar-error.util';
import { GetCalendarEventsResponse } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service';
import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@Injectable()
export class MicrosoftCalendarGetEventsService {
constructor(
private readonly microsoftOAuth2ClientManagerService: MicrosoftOAuth2ClientManagerService,
) {}
public async getCalendarEvents(
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'provider' | 'refreshToken' | 'id'
>,
syncCursor?: string,
): Promise<GetCalendarEventsResponse> {
try {
const microsoftClient =
await this.microsoftOAuth2ClientManagerService.getOAuth2Client(
connectedAccount.refreshToken,
);
const eventIds: string[] = [];
const response: PageCollection = await microsoftClient
.api(syncCursor || '/me/calendar/events/delta')
.version('beta')
.get();
const callback: PageIteratorCallback = (data) => {
eventIds.push(data.id);
return true;
};
const pageIterator = new PageIterator(
microsoftClient,
response,
callback,
);
await pageIterator.iterate();
return {
fullEvents: false,
calendarEventIds: eventIds,
nextSyncCursor: pageIterator.getDeltaLink() || '',
};
} catch (error) {
throw parseMicrosoftCalendarError(error);
}
}
}

View File

@ -0,0 +1,45 @@
import { Injectable } from '@nestjs/common';
import { Event } from '@microsoft/microsoft-graph-types';
import { formatMicrosoftCalendarEvents } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/format-microsoft-calendar-event.util';
import { parseMicrosoftCalendarError } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/parse-microsoft-calendar-error.util';
import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types/calendar-event';
import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@Injectable()
export class MicrosoftCalendarImportEventsService {
constructor(
private readonly microsoftOAuth2ClientManagerService: MicrosoftOAuth2ClientManagerService,
) {}
public async getCalendarEvents(
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'provider' | 'refreshToken' | 'id'
>,
changedEventIds: string[],
): Promise<CalendarEventWithParticipants[]> {
try {
const microsoftClient =
await this.microsoftOAuth2ClientManagerService.getOAuth2Client(
connectedAccount.refreshToken,
);
const events: Event[] = [];
for (const changedEventId of changedEventIds) {
const event = await microsoftClient
.api(`/me/calendar/events/${changedEventId}`)
.get();
events.push(event);
}
return formatMicrosoftCalendarEvents(events);
} catch (error) {
throw parseMicrosoftCalendarError(error);
}
}
}

View File

@ -0,0 +1,60 @@
import {
Event,
NullableOption,
ResponseType,
} from '@microsoft/microsoft-graph-types';
import { CalendarEventParticipantResponseStatus } from 'src/modules/calendar/common/standard-objects/calendar-event-participant.workspace-entity';
import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types/calendar-event';
export const formatMicrosoftCalendarEvents = (
events: Event[],
): CalendarEventWithParticipants[] => {
return events.map(formatMicrosoftCalendarEvent);
};
const formatMicrosoftCalendarEvent = (
event: Event,
): CalendarEventWithParticipants => {
const formatResponseStatus = (
status: NullableOption<ResponseType> | undefined,
) => {
switch (status) {
case 'accepted':
case 'organizer':
return CalendarEventParticipantResponseStatus.ACCEPTED;
case 'declined':
return CalendarEventParticipantResponseStatus.DECLINED;
case 'tentativelyAccepted':
return CalendarEventParticipantResponseStatus.TENTATIVE;
default:
return CalendarEventParticipantResponseStatus.NEEDS_ACTION;
}
};
return {
title: event.subject ?? '',
isCanceled: !!event.isCancelled,
isFullDay: !!event.isAllDay,
startsAt: event.start?.dateTime ?? null,
endsAt: event.end?.dateTime ?? null,
externalId: event.id ?? '',
externalCreatedAt: event.createdDateTime ?? null,
externalUpdatedAt: event.lastModifiedDateTime ?? null,
description: event.body?.content ?? '',
location: event.location?.displayName ?? '',
iCalUID: event.iCalUId ?? '',
conferenceSolution: event.onlineMeetingProvider ?? '',
conferenceLinkLabel: event.onlineMeeting?.joinUrl ?? '',
conferenceLinkUrl: event.onlineMeeting?.joinUrl ?? '',
recurringEventExternalId: event.id ?? '',
participants:
event.attendees?.map((attendee) => ({
handle: attendee.emailAddress?.address ?? '',
displayName: attendee.emailAddress?.name ?? '',
isOrganizer: attendee.status?.response === 'organizer',
responseStatus: formatResponseStatus(attendee.status?.response),
})) ?? [],
status: '',
};
};

View File

@ -0,0 +1,65 @@
import { GraphError } from '@microsoft/microsoft-graph-client';
import {
CalendarEventImportDriverException,
CalendarEventImportDriverExceptionCode,
} from 'src/modules/calendar/calendar-event-import-manager/drivers/exceptions/calendar-event-import-driver.exception';
export const parseMicrosoftCalendarError = (
error: GraphError,
): CalendarEventImportDriverException => {
const { statusCode, message } = error;
switch (statusCode) {
case 400:
return new CalendarEventImportDriverException(
message,
CalendarEventImportDriverExceptionCode.UNKNOWN,
);
case 404:
if (
message ==
'The mailbox is either inactive, soft-deleted, or is hosted on-premise.'
) {
return new CalendarEventImportDriverException(
message,
CalendarEventImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
);
}
return new CalendarEventImportDriverException(
message,
CalendarEventImportDriverExceptionCode.NOT_FOUND,
);
case 429:
return new CalendarEventImportDriverException(
message,
CalendarEventImportDriverExceptionCode.TEMPORARY_ERROR,
);
case 403:
return new CalendarEventImportDriverException(
message,
CalendarEventImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
);
case 401:
return new CalendarEventImportDriverException(
message,
CalendarEventImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
);
case 500:
return new CalendarEventImportDriverException(
message,
CalendarEventImportDriverExceptionCode.UNKNOWN,
);
default:
return new CalendarEventImportDriverException(
message,
CalendarEventImportDriverExceptionCode.UNKNOWN,
);
}
};

View File

@ -4,14 +4,14 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
import { CalendarFetchEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service';
import {
CalendarChannelSyncStage,
CalendarChannelWorkspaceEntity,
} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
export type CalendarEventsImportJobData = {
export type CalendarEventListFetchJobData = {
calendarChannelId: string;
workspaceId: string;
};
@ -23,11 +23,11 @@ export type CalendarEventsImportJobData = {
export class CalendarEventListFetchJob {
constructor(
private readonly twentyORMManager: TwentyORMManager,
private readonly calendarEventsImportService: CalendarEventsImportService,
private readonly calendarFetchEventsService: CalendarFetchEventsService,
) {}
@Process(CalendarEventListFetchJob.name)
async handle(data: CalendarEventsImportJobData): Promise<void> {
async handle(data: CalendarEventListFetchJobData): Promise<void> {
console.time('CalendarEventListFetchJob time');
const { workspaceId, calendarChannelId } = data;
@ -65,7 +65,7 @@ export class CalendarEventListFetchJob {
syncStageStartedAt: null,
});
await this.calendarEventsImportService.processCalendarEventsImport(
await this.calendarFetchEventsService.fetchCalendarEvents(
calendarChannel,
calendarChannel.connectedAccount,
workspaceId,
@ -73,7 +73,7 @@ export class CalendarEventListFetchJob {
break;
case CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING:
await this.calendarEventsImportService.processCalendarEventsImport(
await this.calendarFetchEventsService.fetchCalendarEvents(
calendarChannel,
calendarChannel.connectedAccount,
workspaceId,

View File

@ -0,0 +1,75 @@
import { Scope } from '@nestjs/common';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
import {
CalendarChannelSyncStage,
CalendarChannelWorkspaceEntity,
} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
export type CalendarEventsImportJobData = {
calendarChannelId: string;
workspaceId: string;
};
@Processor({
queueName: MessageQueue.calendarQueue,
scope: Scope.REQUEST,
})
export class CalendarEventsImportJob {
constructor(
private readonly calendarEventsImportService: CalendarEventsImportService,
private readonly twentyORMManager: TwentyORMManager,
) {}
@Process(CalendarEventsImportJob.name)
async handle(data: CalendarEventsImportJobData): Promise<void> {
console.time('CalendarEventsImportJob time');
const { calendarChannelId, workspaceId } = data;
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
'calendarChannel',
);
const calendarChannel = await calendarChannelRepository.findOne({
where: {
id: calendarChannelId,
isSyncEnabled: true,
},
relations: ['connectedAccount'],
});
if (!calendarChannel?.isSyncEnabled) {
return;
}
if (
isThrottled(
calendarChannel.syncStageStartedAt,
calendarChannel.throttleFailureCount,
)
) {
return;
}
if (
calendarChannel.syncStage !==
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING
) {
return;
}
await this.calendarEventsImportService.processCalendarEventsImport(
calendarChannel,
calendarChannel.connectedAccount,
workspaceId,
);
console.timeEnd('CalendarEventsImportJob time');
}
}

View File

@ -2,84 +2,86 @@ import { Injectable } from '@nestjs/common';
import { Any } from 'typeorm';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { CalendarEventCleanerService } from 'src/modules/calendar/calendar-event-cleaner/services/calendar-event-cleaner.service';
import { CALENDAR_EVENT_IMPORT_BATCH_SIZE } from 'src/modules/calendar/calendar-event-import-manager/constants/calendar-event-import-batch-size';
import { MicrosoftCalendarImportEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-import-events.service';
import {
CalendarEventImportErrorHandlerService,
CalendarEventImportSyncStep,
} from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service';
import {
CalendarGetCalendarEventsService,
GetCalendarEventsResponse,
} from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service';
import { CalendarSaveEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service';
import { filterEventsAndReturnCancelledEvents } from 'src/modules/calendar/calendar-event-import-manager/utils/filter-events.util';
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
import { CalendarChannelEventAssociationWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel-event-association.workspace-entity';
import {
CalendarChannelSyncStage,
CalendarChannelWorkspaceEntity,
} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types/calendar-event';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@Injectable()
export class CalendarEventsImportService {
constructor(
@InjectCacheStorage(CacheStorageNamespace.ModuleCalendar)
private readonly cacheStorage: CacheStorageService,
private readonly twentyORMManager: TwentyORMManager,
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
private readonly blocklistRepository: BlocklistRepository,
private readonly calendarEventCleanerService: CalendarEventCleanerService,
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
private readonly getCalendarEventsService: CalendarGetCalendarEventsService,
private readonly calendarSaveEventsService: CalendarSaveEventsService,
private readonly calendarEventImportErrorHandlerService: CalendarEventImportErrorHandlerService,
private readonly microsoftCalendarImportEventService: MicrosoftCalendarImportEventsService,
) {}
public async processCalendarEventsImport(
calendarChannel: CalendarChannelWorkspaceEntity,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
fetchedCalendarEvents?: CalendarEventWithParticipants[],
): Promise<void> {
const syncStep =
calendarChannel.syncStage ===
CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING
? CalendarEventImportSyncStep.FULL_CALENDAR_EVENT_LIST_FETCH
: CalendarEventImportSyncStep.PARTIAL_CALENDAR_EVENT_LIST_FETCH;
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchOngoing(
await this.calendarChannelSyncStatusService.markAsCalendarEventsImportOngoing(
[calendarChannel.id],
);
let calendarEvents: GetCalendarEventsResponse['calendarEvents'] = [];
let nextSyncCursor: GetCalendarEventsResponse['nextSyncCursor'] = '';
let calendarEvents: CalendarEventWithParticipants[] = [];
try {
const getCalendarEventsResponse =
await this.getCalendarEventsService.getCalendarEvents(
connectedAccount,
calendarChannel.syncCursor,
if (fetchedCalendarEvents) {
calendarEvents = fetchedCalendarEvents;
} else {
const eventIdsToFetch: string[] = await this.cacheStorage.setPop(
`calendar-events-to-import:${workspaceId}:${calendarChannel.id}`,
CALENDAR_EVENT_IMPORT_BATCH_SIZE,
);
calendarEvents = getCalendarEventsResponse.calendarEvents;
nextSyncCursor = getCalendarEventsResponse.nextSyncCursor;
if (!eventIdsToFetch || eventIdsToFetch.length === 0) {
await this.calendarChannelSyncStatusService.markAsCompletedAndSchedulePartialCalendarEventListFetch(
[calendarChannel.id],
);
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
'calendarChannel',
);
return;
}
switch (connectedAccount.provider) {
case 'microsoft':
calendarEvents =
await this.microsoftCalendarImportEventService.getCalendarEvents(
connectedAccount,
eventIdsToFetch,
);
break;
default:
break;
}
}
if (!calendarEvents || calendarEvents?.length === 0) {
await calendarChannelRepository.update(
{
id: calendarChannel.id,
},
{
syncCursor: nextSyncCursor,
},
);
await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch(
[calendarChannel.id],
);
@ -127,22 +129,13 @@ export class CalendarEventsImportService {
workspaceId,
);
await calendarChannelRepository.update(
{
id: calendarChannel.id,
},
{
syncCursor: nextSyncCursor,
},
);
await this.calendarChannelSyncStatusService.markAsCompletedAndSchedulePartialCalendarEventListFetch(
[calendarChannel.id],
);
} catch (error) {
await this.calendarEventImportErrorHandlerService.handleDriverException(
error,
syncStep,
CalendarEventImportSyncStep.CALENDAR_EVENTS_IMPORT,
calendarChannel,
workspaceId,
);

View File

@ -0,0 +1,129 @@
import { Injectable } from '@nestjs/common';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import {
CalendarEventImportDriverException,
CalendarEventImportDriverExceptionCode,
} from 'src/modules/calendar/calendar-event-import-manager/drivers/exceptions/calendar-event-import-driver.exception';
import {
CalendarEventImportErrorHandlerService,
CalendarEventImportSyncStep,
} from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service';
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
import { CalendarGetCalendarEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service';
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
import {
CalendarChannelSyncStage,
CalendarChannelWorkspaceEntity,
} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@Injectable()
export class CalendarFetchEventsService {
constructor(
@InjectCacheStorage(CacheStorageNamespace.ModuleCalendar)
private readonly cacheStorage: CacheStorageService,
private readonly twentyORMManager: TwentyORMManager,
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
private readonly getCalendarEventsService: CalendarGetCalendarEventsService,
private readonly calendarEventImportErrorHandlerService: CalendarEventImportErrorHandlerService,
private readonly calendarEventsImportService: CalendarEventsImportService,
) {}
public async fetchCalendarEvents(
calendarChannel: CalendarChannelWorkspaceEntity,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
): Promise<void> {
const syncStep =
calendarChannel.syncStage ===
CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING
? CalendarEventImportSyncStep.FULL_CALENDAR_EVENT_LIST_FETCH
: CalendarEventImportSyncStep.PARTIAL_CALENDAR_EVENT_LIST_FETCH;
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchOngoing(
[calendarChannel.id],
);
try {
const getCalendarEventsResponse =
await this.getCalendarEventsService.getCalendarEvents(
connectedAccount,
calendarChannel.syncCursor,
);
const hasFullEvents = getCalendarEventsResponse.fullEvents;
const calendarEvents = hasFullEvents
? getCalendarEventsResponse.calendarEvents
: null;
const calendarEventIds = getCalendarEventsResponse.calendarEventIds;
const nextSyncCursor = getCalendarEventsResponse.nextSyncCursor;
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
'calendarChannel',
);
if (!calendarEvents || calendarEvents?.length === 0) {
await calendarChannelRepository.update(
{
id: calendarChannel.id,
},
{
syncCursor: nextSyncCursor,
},
);
await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch(
[calendarChannel.id],
);
}
await calendarChannelRepository.update(
{
id: calendarChannel.id,
},
{
syncCursor: nextSyncCursor,
},
);
if (hasFullEvents && calendarEvents) {
// Event Import already done
await this.calendarEventsImportService.processCalendarEventsImport(
calendarChannel,
connectedAccount,
workspaceId,
calendarEvents,
);
} else if (!hasFullEvents && calendarEventIds) {
// Event Import still needed
await this.cacheStorage.setAdd(
`calendar-events-to-import:${workspaceId}:${calendarChannel.id}`,
calendarEventIds,
);
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
[calendarChannel.id],
);
} else {
throw new CalendarEventImportDriverException(
"Expected 'calendarEvents' or 'calendarEventIds' to be present",
CalendarEventImportDriverExceptionCode.UNKNOWN,
);
}
} catch (error) {
await this.calendarEventImportErrorHandlerService.handleDriverException(
error,
syncStep,
calendarChannel,
workspaceId,
);
}
}
}

View File

@ -1,6 +1,7 @@
import { Injectable } from '@nestjs/common';
import { GoogleCalendarGetEventsService as GoogleCalendarGetCalendarEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service';
import { GoogleCalendarGetEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service';
import { MicrosoftCalendarGetEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-get-events.service';
import {
CalendarEventImportException,
CalendarEventImportExceptionCode,
@ -9,14 +10,17 @@ import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
export type GetCalendarEventsResponse = {
calendarEvents: CalendarEventWithParticipants[];
fullEvents: boolean;
calendarEvents?: CalendarEventWithParticipants[];
calendarEventIds?: string[];
nextSyncCursor: string;
};
@Injectable()
export class CalendarGetCalendarEventsService {
constructor(
private readonly googleCalendarGetCalendarEventsService: GoogleCalendarGetCalendarEventsService,
private readonly googleCalendarGetEventsService: GoogleCalendarGetEventsService,
private readonly microsoftCalendarGetEventsService: MicrosoftCalendarGetEventsService,
) {}
public async getCalendarEvents(
@ -28,7 +32,12 @@ export class CalendarGetCalendarEventsService {
): Promise<GetCalendarEventsResponse> {
switch (connectedAccount.provider) {
case 'google':
return this.googleCalendarGetCalendarEventsService.getCalendarEvents(
return this.googleCalendarGetEventsService.getCalendarEvents(
connectedAccount,
syncCursor,
);
case 'microsoft':
return this.microsoftCalendarGetEventsService.getCalendarEvents(
connectedAccount,
syncCursor,
);

View File

@ -23,7 +23,7 @@ export const filterEventsAndReturnCancelledEvents = (
},
event,
) => {
if (event.status === 'cancelled') {
if (event.isCanceled) {
acc.cancelledEvents.push(event);
} else {
acc.filteredEvents.push(event);

View File

@ -0,0 +1,62 @@
import { Injectable } from '@nestjs/common';
import {
AuthProvider,
AuthProviderCallback,
Client,
} from '@microsoft/microsoft-graph-client';
import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service';
@Injectable()
export class MicrosoftOAuth2ClientManagerService {
constructor(private readonly environmentService: EnvironmentService) {}
public async getOAuth2Client(refreshToken: string): Promise<Client> {
const authProvider: AuthProvider = async (
callback: AuthProviderCallback,
) => {
try {
const tenantId = this.environmentService.get(
'AUTH_MICROSOFT_TENANT_ID',
);
const urlData = new URLSearchParams();
urlData.append(
'client_id',
this.environmentService.get('AUTH_MICROSOFT_CLIENT_ID'),
);
urlData.append('scope', 'https://graph.microsoft.com/.default');
urlData.append('refresh_token', refreshToken);
urlData.append(
'client_secret',
this.environmentService.get('AUTH_MICROSOFT_CLIENT_SECRET'),
);
urlData.append('grant_type', 'refresh_token');
const res = await fetch(
`https://login.microsoftonline.com/${tenantId}/oauth2/v2.0/token`,
{
method: 'POST',
body: urlData,
},
);
const data = await res.json();
callback(null, data.access_token);
} catch (error) {
callback(error, null);
}
};
const client = Client.init({
defaultVersion: 'v1.0',
debugLogging: false,
authProvider: authProvider,
});
return client;
}
}

View File

@ -1,11 +1,16 @@
import { Module } from '@nestjs/common';
import { GoogleOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/google/google-oauth2-client-manager.service';
import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service';
import { OAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/services/oauth2-client-manager.service';
@Module({
imports: [],
providers: [OAuth2ClientManagerService, GoogleOAuth2ClientManagerService],
exports: [OAuth2ClientManagerService],
providers: [
OAuth2ClientManagerService,
GoogleOAuth2ClientManagerService,
MicrosoftOAuth2ClientManagerService,
],
exports: [OAuth2ClientManagerService, MicrosoftOAuth2ClientManagerService],
})
export class OAuth2ClientManagerModule {}

View File

@ -22,6 +22,7 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta
export enum ConnectedAccountProvider {
GOOGLE = 'google',
MICROSOFT = 'microsoft',
}
@WorkspaceEntity({

View File

@ -35,6 +35,12 @@ export class MessagingGetMessageListService {
return this.gmailGetMessageListService.getFullMessageList(
connectedAccount,
);
case 'microsoft':
// TODO: Placeholder
return {
messageExternalIds: [],
nextSyncCursor: '',
};
default:
throw new MessageImportException(
`Provider ${connectedAccount.provider} is not supported`,
@ -56,6 +62,12 @@ export class MessagingGetMessageListService {
connectedAccount,
syncCursor,
);
case 'microsoft':
return {
messageExternalIds: [],
messageExternalIdsToDelete: [],
nextSyncCursor: '',
};
default:
throw new MessageImportException(
`Provider ${connectedAccount.provider} is not supported`,