From cd66ea74a2d73a7143274e5a08e5ebace9f0a93f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Bosi?= <71827178+bosiraphael@users.noreply.github.com> Date: Sat, 31 Aug 2024 16:38:47 +0200 Subject: [PATCH] 6657 Refactor and fix blocklist (#6803) Closes #6657 - Fix listeners - Refactor jobs to take array of events - Fix calendar events and messages deletion --------- Co-authored-by: Charles Bochet --- .../modules/workspace/types/FeatureFlagKey.ts | 1 - .../__stories__/RecordIndexPage.stories.tsx | 6 +- .../settings/accounts/SettingsAccounts.tsx | 5 +- .../typeorm-seeds/core/feature-flags.ts | 15 -- .../listeners/entity-events-to-db.listener.ts | 12 +- ...pis-oauth-exchange-code-for-token.guard.ts | 33 +-- .../google-apis-oauth-request-code.guard.ts | 32 +-- .../google-apis-oauth-common.auth.strategy.ts | 7 +- ...h-exchange-code-for-token.auth.strategy.ts | 3 +- .../enums/feature-flag-key.enum.ts | 3 - .../is-email-blocklisted.util.spec.ts | 32 +-- .../utils/is-email-blocklisted.util.ts | 6 +- .../calendar-blocklist-manager.module.ts | 4 +- ...ocklist-item-delete-calendar-events.job.ts | 163 ++++++++---- .../blocklist-reimport-calendar-events.job.ts | 63 +++-- .../listeners/calendar-blocklist.listener.ts | 58 +--- .../calendar-event-import-manager.module.ts | 4 +- .../jobs/calendar-ongoing-stale.job.ts | 10 +- ...-event-import-exception-handler.service.ts | 16 +- .../calendar-events-import.service.ts | 15 +- .../utils/filter-events.util.ts | 5 +- .../filter-out-blocklisted-events.util.ts | 4 +- .../src/modules/calendar/calendar.module.ts | 2 + .../calendar/common/calendar-common.module.ts | 18 ++ .../calendar-channel-sync-status.service.ts | 186 ++++++++----- ...ging-blocklist-item-delete-messages.job.ts | 151 ++++++++--- ...ssaging-blocklist-reimport-messages.job.ts | 63 +++++ .../listeners/messaging-blocklist.listener.ts | 115 ++------ .../messaging-blocklist-manager.module.ts | 7 +- .../message-channel-sync-status.service.ts | 249 ++++++++++-------- .../messaging-message-cleaner.service.ts | 126 +++++---- .../jobs/messaging-ongoing-stale.job.ts | 10 +- ...essage-import-exception-handler.service.ts | 20 +- ...ssaging-full-message-list-fetch.service.ts | 6 +- .../messaging-messages-import.service.ts | 32 +-- ...ging-partial-message-list-fetch.service.ts | 8 +- .../utils/filter-emails.util.ts | 8 +- 37 files changed, 799 insertions(+), 699 deletions(-) create mode 100644 packages/twenty-server/src/modules/calendar/common/calendar-common.module.ts rename packages/twenty-server/src/modules/calendar/{calendar-event-import-manager => common}/services/calendar-channel-sync-status.service.ts (56%) create mode 100644 packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts diff --git a/packages/twenty-front/src/modules/workspace/types/FeatureFlagKey.ts b/packages/twenty-front/src/modules/workspace/types/FeatureFlagKey.ts index e1dd39384..49f3c8dc8 100644 --- a/packages/twenty-front/src/modules/workspace/types/FeatureFlagKey.ts +++ b/packages/twenty-front/src/modules/workspace/types/FeatureFlagKey.ts @@ -1,5 +1,4 @@ export type FeatureFlagKey = - | 'IS_BLOCKLIST_ENABLED' | 'IS_EVENT_OBJECT_ENABLED' | 'IS_AIRTABLE_INTEGRATION_ENABLED' | 'IS_POSTGRESQL_INTEGRATION_ENABLED' diff --git a/packages/twenty-front/src/pages/object-record/__stories__/RecordIndexPage.stories.tsx b/packages/twenty-front/src/pages/object-record/__stories__/RecordIndexPage.stories.tsx index 465b931c6..76b559961 100644 --- a/packages/twenty-front/src/pages/object-record/__stories__/RecordIndexPage.stories.tsx +++ b/packages/twenty-front/src/pages/object-record/__stories__/RecordIndexPage.stories.tsx @@ -32,9 +32,7 @@ export const Default: Story = { play: async ({ canvasElement }) => { const canvas = within(canvasElement); - await canvas.findByText('People'); - await canvas.findAllByText('Companies'); - await canvas.findByText('Opportunities'); - await canvas.findByText('My Customs'); + await canvas.findByText('People', undefined, { timeout: 3000 }); + await canvas.findByText('Linkedin'); }, }; diff --git a/packages/twenty-front/src/pages/settings/accounts/SettingsAccounts.tsx b/packages/twenty-front/src/pages/settings/accounts/SettingsAccounts.tsx index c9fa96c6c..2a6763c8f 100644 --- a/packages/twenty-front/src/pages/settings/accounts/SettingsAccounts.tsx +++ b/packages/twenty-front/src/pages/settings/accounts/SettingsAccounts.tsx @@ -14,7 +14,6 @@ import { SettingsAccountsSettingsSection } from '@/settings/accounts/components/ import { SettingsPageContainer } from '@/settings/components/SettingsPageContainer'; import { SubMenuTopBarContainer } from '@/ui/layout/page/SubMenuTopBarContainer'; import { Section } from '@/ui/layout/section/components/Section'; -import { useIsFeatureEnabled } from '@/workspace/hooks/useIsFeatureEnabled'; export const SettingsAccounts = () => { const currentWorkspaceMember = useRecoilValue(currentWorkspaceMemberState); @@ -33,8 +32,6 @@ export const SettingsAccounts = () => { recordGqlFields: generateDepthOneRecordGqlFields({ objectMetadataItem }), }); - const isBlocklistEnabled = useIsFeatureEnabled('IS_BLOCKLIST_ENABLED'); - return ( @@ -52,7 +49,7 @@ export const SettingsAccounts = () => { loading={loading} /> - {isBlocklistEnabled && } + )} diff --git a/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts b/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts index dd0be3fa8..94fa13540 100644 --- a/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts +++ b/packages/twenty-server/src/database/typeorm-seeds/core/feature-flags.ts @@ -15,11 +15,6 @@ export const seedFeatureFlags = async ( .into(`${schemaName}.${tableName}`, ['key', 'workspaceId', 'value']) .orIgnore() .values([ - { - key: FeatureFlagKey.IsBlocklistEnabled, - workspaceId: workspaceId, - value: true, - }, { key: FeatureFlagKey.IsAirtableIntegrationEnabled, workspaceId: workspaceId, @@ -40,16 +35,6 @@ export const seedFeatureFlags = async ( workspaceId: workspaceId, value: true, }, - { - key: FeatureFlagKey.IsMessagingAliasFetchingEnabled, - workspaceId: workspaceId, - value: true, - }, - { - key: FeatureFlagKey.IsGoogleCalendarSyncV2Enabled, - workspaceId: workspaceId, - value: true, - }, { key: FeatureFlagKey.IsFunctionSettingsEnabled, workspaceId: workspaceId, diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts index 1804901ac..48e091720 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts @@ -50,16 +50,22 @@ export class EntityEventsToDbListener { } private async handle(payload: WorkspaceEventBatch) { - payload.events = payload.events.filter( + const filteredEvents = payload.events.filter( (event) => event.objectMetadata?.isAuditLogged, ); await this.messageQueueService.add< WorkspaceEventBatch - >(CreateAuditLogFromInternalEvent.name, payload); + >(CreateAuditLogFromInternalEvent.name, { + ...payload, + events: filteredEvents, + }); await this.messageQueueService.add< WorkspaceEventBatch - >(UpsertTimelineActivityFromInternalEvent.name, payload); + >(UpsertTimelineActivityFromInternalEvent.name, { + ...payload, + events: filteredEvents, + }); } } diff --git a/packages/twenty-server/src/engine/core-modules/auth/guards/google-apis-oauth-exchange-code-for-token.guard.ts b/packages/twenty-server/src/engine/core-modules/auth/guards/google-apis-oauth-exchange-code-for-token.guard.ts index e60c41b64..5ef0fcf1d 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/guards/google-apis-oauth-exchange-code-for-token.guard.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/guards/google-apis-oauth-exchange-code-for-token.guard.ts @@ -1,33 +1,19 @@ import { ExecutionContext, Injectable } from '@nestjs/common'; import { AuthGuard } from '@nestjs/passport'; -import { InjectRepository } from '@nestjs/typeorm'; - -import { Repository } from 'typeorm'; import { AuthException, AuthExceptionCode, } from 'src/engine/core-modules/auth/auth.exception'; -import { TokenService } from 'src/engine/core-modules/auth/services/token.service'; -import { - GoogleAPIScopeConfig, - GoogleAPIsOauthExchangeCodeForTokenStrategy, -} from 'src/engine/core-modules/auth/strategies/google-apis-oauth-exchange-code-for-token.auth.strategy'; +import { GoogleAPIsOauthExchangeCodeForTokenStrategy } from 'src/engine/core-modules/auth/strategies/google-apis-oauth-exchange-code-for-token.auth.strategy'; import { setRequestExtraParams } from 'src/engine/core-modules/auth/utils/google-apis-set-request-extra-params.util'; -import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; -import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; @Injectable() export class GoogleAPIsOauthExchangeCodeForTokenGuard extends AuthGuard( 'google-apis', ) { - constructor( - private readonly environmentService: EnvironmentService, - private readonly tokenService: TokenService, - @InjectRepository(FeatureFlagEntity, 'core') - private readonly featureFlagRepository: Repository, - ) { + constructor(private readonly environmentService: EnvironmentService) { super(); } @@ -45,22 +31,9 @@ export class GoogleAPIsOauthExchangeCodeForTokenGuard extends AuthGuard( ); } - const { workspaceId } = await this.tokenService.verifyTransientToken( - state.transientToken, - ); - - const scopeConfig: GoogleAPIScopeConfig = { - isMessagingAliasFetchingEnabled: - !!(await this.featureFlagRepository.findOneBy({ - workspaceId, - key: FeatureFlagKey.IsMessagingAliasFetchingEnabled, - value: true, - })), - }; - new GoogleAPIsOauthExchangeCodeForTokenStrategy( this.environmentService, - scopeConfig, + {}, ); setRequestExtraParams(request, { diff --git a/packages/twenty-server/src/engine/core-modules/auth/guards/google-apis-oauth-request-code.guard.ts b/packages/twenty-server/src/engine/core-modules/auth/guards/google-apis-oauth-request-code.guard.ts index 1d70446b1..fadeb2b1b 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/guards/google-apis-oauth-request-code.guard.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/guards/google-apis-oauth-request-code.guard.ts @@ -1,29 +1,17 @@ import { ExecutionContext, Injectable } from '@nestjs/common'; import { AuthGuard } from '@nestjs/passport'; -import { InjectRepository } from '@nestjs/typeorm'; - -import { Repository } from 'typeorm'; import { AuthException, AuthExceptionCode, } from 'src/engine/core-modules/auth/auth.exception'; -import { TokenService } from 'src/engine/core-modules/auth/services/token.service'; -import { GoogleAPIScopeConfig } from 'src/engine/core-modules/auth/strategies/google-apis-oauth-exchange-code-for-token.auth.strategy'; import { GoogleAPIsOauthRequestCodeStrategy } from 'src/engine/core-modules/auth/strategies/google-apis-oauth-request-code.auth.strategy'; import { setRequestExtraParams } from 'src/engine/core-modules/auth/utils/google-apis-set-request-extra-params.util'; -import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; -import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; @Injectable() export class GoogleAPIsOauthRequestCodeGuard extends AuthGuard('google-apis') { - constructor( - private readonly environmentService: EnvironmentService, - private readonly tokenService: TokenService, - @InjectRepository(FeatureFlagEntity, 'core') - private readonly featureFlagRepository: Repository, - ) { + constructor(private readonly environmentService: EnvironmentService) { super({ prompt: 'select_account', }); @@ -42,23 +30,7 @@ export class GoogleAPIsOauthRequestCodeGuard extends AuthGuard('google-apis') { ); } - const { workspaceId } = await this.tokenService.verifyTransientToken( - request.query.transientToken, - ); - - const scopeConfig: GoogleAPIScopeConfig = { - isMessagingAliasFetchingEnabled: - !!(await this.featureFlagRepository.findOneBy({ - workspaceId, - key: FeatureFlagKey.IsMessagingAliasFetchingEnabled, - value: true, - })), - }; - - new GoogleAPIsOauthRequestCodeStrategy( - this.environmentService, - scopeConfig, - ); + new GoogleAPIsOauthRequestCodeStrategy(this.environmentService, {}); setRequestExtraParams(request, { transientToken: request.query.transientToken, redirectLocation: request.query.redirectLocation, diff --git a/packages/twenty-server/src/engine/core-modules/auth/strategies/google-apis-oauth-common.auth.strategy.ts b/packages/twenty-server/src/engine/core-modules/auth/strategies/google-apis-oauth-common.auth.strategy.ts index 99bd05cab..64b69bf35 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/strategies/google-apis-oauth-common.auth.strategy.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/strategies/google-apis-oauth-common.auth.strategy.ts @@ -1,5 +1,5 @@ -import { PassportStrategy } from '@nestjs/passport'; import { Injectable } from '@nestjs/common'; +import { PassportStrategy } from '@nestjs/passport'; import { Strategy } from 'passport-google-oauth20'; @@ -24,12 +24,9 @@ export class GoogleAPIsOauthCommonStrategy extends PassportStrategy( 'profile', 'https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/calendar.events', + 'https://www.googleapis.com/auth/profile.emails.read', ]; - if (scopeConfig?.isMessagingAliasFetchingEnabled) { - scopes.push('https://www.googleapis.com/auth/profile.emails.read'); - } - super({ clientID: environmentService.get('AUTH_GOOGLE_CLIENT_ID'), clientSecret: environmentService.get('AUTH_GOOGLE_CLIENT_SECRET'), diff --git a/packages/twenty-server/src/engine/core-modules/auth/strategies/google-apis-oauth-exchange-code-for-token.auth.strategy.ts b/packages/twenty-server/src/engine/core-modules/auth/strategies/google-apis-oauth-exchange-code-for-token.auth.strategy.ts index 047a7f55f..4d5dfe5c2 100644 --- a/packages/twenty-server/src/engine/core-modules/auth/strategies/google-apis-oauth-exchange-code-for-token.auth.strategy.ts +++ b/packages/twenty-server/src/engine/core-modules/auth/strategies/google-apis-oauth-exchange-code-for-token.auth.strategy.ts @@ -3,12 +3,11 @@ import { Injectable } from '@nestjs/common'; import { VerifyCallback } from 'passport-google-oauth20'; import { GoogleAPIsOauthCommonStrategy } from 'src/engine/core-modules/auth/strategies/google-apis-oauth-common.auth.strategy'; -import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; import { GoogleAPIsRequest } from 'src/engine/core-modules/auth/types/google-api-request.type'; +import { EnvironmentService } from 'src/engine/integrations/environment/environment.service'; export type GoogleAPIScopeConfig = { isCalendarEnabled?: boolean; - isMessagingAliasFetchingEnabled?: boolean; }; @Injectable() diff --git a/packages/twenty-server/src/engine/core-modules/feature-flag/enums/feature-flag-key.enum.ts b/packages/twenty-server/src/engine/core-modules/feature-flag/enums/feature-flag-key.enum.ts index ae1035253..82c5eb775 100644 --- a/packages/twenty-server/src/engine/core-modules/feature-flag/enums/feature-flag-key.enum.ts +++ b/packages/twenty-server/src/engine/core-modules/feature-flag/enums/feature-flag-key.enum.ts @@ -1,12 +1,9 @@ export enum FeatureFlagKey { - IsBlocklistEnabled = 'IS_BLOCKLIST_ENABLED', IsEventObjectEnabled = 'IS_EVENT_OBJECT_ENABLED', IsAirtableIntegrationEnabled = 'IS_AIRTABLE_INTEGRATION_ENABLED', IsPostgreSQLIntegrationEnabled = 'IS_POSTGRESQL_INTEGRATION_ENABLED', IsStripeIntegrationEnabled = 'IS_STRIPE_INTEGRATION_ENABLED', IsCopilotEnabled = 'IS_COPILOT_ENABLED', - IsMessagingAliasFetchingEnabled = 'IS_MESSAGING_ALIAS_FETCHING_ENABLED', - IsGoogleCalendarSyncV2Enabled = 'IS_GOOGLE_CALENDAR_SYNC_V2_ENABLED', IsFreeAccessEnabled = 'IS_FREE_ACCESS_ENABLED', IsFunctionSettingsEnabled = 'IS_FUNCTION_SETTINGS_ENABLED', IsWorkflowEnabled = 'IS_WORKFLOW_ENABLED', diff --git a/packages/twenty-server/src/modules/blocklist/utils/__tests__/is-email-blocklisted.util.spec.ts b/packages/twenty-server/src/modules/blocklist/utils/__tests__/is-email-blocklisted.util.spec.ts index 4f33e4d37..9f8c35ee7 100644 --- a/packages/twenty-server/src/modules/blocklist/utils/__tests__/is-email-blocklisted.util.spec.ts +++ b/packages/twenty-server/src/modules/blocklist/utils/__tests__/is-email-blocklisted.util.spec.ts @@ -2,67 +2,67 @@ import { isEmailBlocklisted } from 'src/modules/blocklist/utils/is-email-blockli describe('isEmailBlocklisted', () => { it('should return true if email is blocklisted', () => { - const channelHandle = 'abc@example.com'; + const channelHandles = ['abc@example.com']; const email = 'hello@twenty.com'; const blocklist = ['hello@twenty.com', 'hey@twenty.com']; - const result = isEmailBlocklisted(channelHandle, email, blocklist); + const result = isEmailBlocklisted(channelHandles, email, blocklist); expect(result).toBe(true); }); it('should return false if email is not blocklisted', () => { - const channelHandle = 'abc@example.com'; + const channelHandles = ['abc@example.com']; const email = 'hello@twenty.com'; const blocklist = ['hey@example.com']; - const result = isEmailBlocklisted(channelHandle, email, blocklist); + const result = isEmailBlocklisted(channelHandles, email, blocklist); expect(result).toBe(false); }); it('should return false if email is null', () => { - const channelHandle = 'abc@twenty.com'; + const channelHandles = ['abc@twenty.com']; const email = null; const blocklist = ['@example.com']; - const result = isEmailBlocklisted(channelHandle, email, blocklist); + const result = isEmailBlocklisted(channelHandles, email, blocklist); expect(result).toBe(false); }); it('should return true for subdomains', () => { - const channelHandle = 'abc@example.com'; + const channelHandles = ['abc@example.com']; const email = 'hello@twenty.twenty.com'; const blocklist = ['@twenty.com']; - const result = isEmailBlocklisted(channelHandle, email, blocklist); + const result = isEmailBlocklisted(channelHandles, email, blocklist); expect(result).toBe(true); }); it('should return false for domains which end with blocklisted domain but are not subdomains', () => { - const channelHandle = 'abc@example.com'; + const channelHandles = ['abc@example.com']; const email = 'hello@twentytwenty.com'; const blocklist = ['@twenty.com']; - const result = isEmailBlocklisted(channelHandle, email, blocklist); + const result = isEmailBlocklisted(channelHandles, email, blocklist); expect(result).toBe(false); }); it('should return false if email is undefined', () => { - const channelHandle = 'abc@example.com'; + const channelHandles = ['abc@example.com']; const email = undefined; const blocklist = ['@twenty.com']; - const result = isEmailBlocklisted(channelHandle, email, blocklist); + const result = isEmailBlocklisted(channelHandles, email, blocklist); expect(result).toBe(false); }); it('should return true if email ends with blocklisted domain', () => { - const channelHandle = 'abc@example.com'; + const channelHandles = ['abc@example.com']; const email = 'hello@twenty.com'; const blocklist = ['@twenty.com']; - const result = isEmailBlocklisted(channelHandle, email, blocklist); + const result = isEmailBlocklisted(channelHandles, email, blocklist); expect(result).toBe(true); }); it('should return false if email is same as channel handle', () => { - const channelHandle = 'hello@twenty.com'; + const channelHandles = ['hello@twenty.com']; const email = 'hello@twenty.com'; const blocklist = ['@twenty.com']; - const result = isEmailBlocklisted(channelHandle, email, blocklist); + const result = isEmailBlocklisted(channelHandles, email, blocklist); expect(result).toBe(false); }); diff --git a/packages/twenty-server/src/modules/blocklist/utils/is-email-blocklisted.util.ts b/packages/twenty-server/src/modules/blocklist/utils/is-email-blocklisted.util.ts index 1ba64ee19..642df70ac 100644 --- a/packages/twenty-server/src/modules/blocklist/utils/is-email-blocklisted.util.ts +++ b/packages/twenty-server/src/modules/blocklist/utils/is-email-blocklisted.util.ts @@ -1,11 +1,9 @@ -// TODO: Move inside blocklist module - export const isEmailBlocklisted = ( - channelHandle: string, + channelHandle: string[], email: string | null | undefined, blocklist: string[], ): boolean => { - if (!email || email === channelHandle) { + if (!email || channelHandle.includes(email)) { return false; } diff --git a/packages/twenty-server/src/modules/calendar/blocklist-manager/calendar-blocklist-manager.module.ts b/packages/twenty-server/src/modules/calendar/blocklist-manager/calendar-blocklist-manager.module.ts index 57497f1d2..8f1224d5a 100644 --- a/packages/twenty-server/src/modules/calendar/blocklist-manager/calendar-blocklist-manager.module.ts +++ b/packages/twenty-server/src/modules/calendar/blocklist-manager/calendar-blocklist-manager.module.ts @@ -4,10 +4,10 @@ import { BlocklistItemDeleteCalendarEventsJob } from 'src/modules/calendar/block import { BlocklistReimportCalendarEventsJob } from 'src/modules/calendar/blocklist-manager/jobs/blocklist-reimport-calendar-events.job'; import { CalendarBlocklistListener } from 'src/modules/calendar/blocklist-manager/listeners/calendar-blocklist.listener'; import { CalendarEventCleanerModule } from 'src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module'; -import { CalendarEventImportManagerModule } from 'src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module'; +import { CalendarCommonModule } from 'src/modules/calendar/common/calendar-common.module'; @Module({ - imports: [CalendarEventCleanerModule, CalendarEventImportManagerModule], + imports: [CalendarEventCleanerModule, CalendarCommonModule], providers: [ CalendarBlocklistListener, BlocklistItemDeleteCalendarEventsJob, diff --git a/packages/twenty-server/src/modules/calendar/blocklist-manager/jobs/blocklist-item-delete-calendar-events.job.ts b/packages/twenty-server/src/modules/calendar/blocklist-manager/jobs/blocklist-item-delete-calendar-events.job.ts index 9f9777328..be172d0b1 100644 --- a/packages/twenty-server/src/modules/calendar/blocklist-manager/jobs/blocklist-item-delete-calendar-events.job.ts +++ b/packages/twenty-server/src/modules/calendar/blocklist-manager/jobs/blocklist-item-delete-calendar-events.job.ts @@ -1,20 +1,21 @@ import { Logger, Scope } from '@nestjs/common'; -import { Any, ILike } from 'typeorm'; +import { And, Any, ILike, In, Not, Or } from 'typeorm'; +import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { CalendarEventCleanerService } from 'src/modules/calendar/calendar-event-cleaner/services/calendar-event-cleaner.service'; +import { CalendarChannelEventAssociationWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel-event-association.workspace-entity'; +import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; -export type BlocklistItemDeleteCalendarEventsJobData = { - workspaceId: string; - blocklistItemId: string; -}; +export type BlocklistItemDeleteCalendarEventsJobData = WorkspaceEventBatch< + ObjectRecordCreateEvent +>; @Processor({ queueName: MessageQueue.calendarQueue, @@ -27,77 +28,133 @@ export class BlocklistItemDeleteCalendarEventsJob { constructor( private readonly twentyORMManager: TwentyORMManager, - @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) - private readonly blocklistRepository: BlocklistRepository, private readonly calendarEventCleanerService: CalendarEventCleanerService, ) {} @Process(BlocklistItemDeleteCalendarEventsJob.name) async handle(data: BlocklistItemDeleteCalendarEventsJobData): Promise { - const { workspaceId, blocklistItemId } = data; + const workspaceId = data.workspaceId; - const blocklistItem = await this.blocklistRepository.getById( - blocklistItemId, - workspaceId, + const blocklistItemIds = data.events.map( + (eventPayload) => eventPayload.recordId, ); - if (!blocklistItem) { - this.logger.log( - `Blocklist item with id ${blocklistItemId} not found in workspace ${workspaceId}`, + const blocklistRepository = + await this.twentyORMManager.getRepository( + 'blocklist', ); - return; - } - - const { handle, workspaceMemberId } = blocklistItem; - - this.logger.log( - `Deleting calendar events from ${handle} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`, - ); - - if (!workspaceMemberId) { - throw new Error( - `Workspace member ID is undefined for blocklist item ${blocklistItemId} in workspace ${workspaceId}`, - ); - } - - const calendarChannelRepository = - await this.twentyORMManager.getRepository('calendarChannel'); - - const calendarChannels = await calendarChannelRepository.find({ + const blocklist = await blocklistRepository.find({ where: { - connectedAccount: { - accountOwnerId: workspaceMemberId, - }, + id: Any(blocklistItemIds), }, }); - const calendarChannelIds = calendarChannels.map(({ id }) => id); + const handlesToDeleteByWorkspaceMemberIdMap = blocklist.reduce( + (acc, blocklistItem) => { + const { handle, workspaceMemberId } = blocklistItem; - const isHandleDomain = handle.startsWith('@'); + if (!acc.has(workspaceMemberId)) { + acc.set(workspaceMemberId, []); + } + + acc.get(workspaceMemberId)?.push(handle); + + return acc; + }, + new Map(), + ); + + const calendarChannelRepository = + await this.twentyORMManager.getRepository( + 'calendarChannel', + ); const calendarChannelEventAssociationRepository = - await this.twentyORMManager.getRepository( + await this.twentyORMManager.getRepository( 'calendarChannelEventAssociation', ); - await calendarChannelEventAssociationRepository.delete({ - calendarEvent: { - calendarEventParticipants: { - handle: isHandleDomain ? ILike(`%${handle}`) : handle, + for (const workspaceMemberId of handlesToDeleteByWorkspaceMemberIdMap.keys()) { + const handles = + handlesToDeleteByWorkspaceMemberIdMap.get(workspaceMemberId); + + if (!handles) { + continue; + } + + this.logger.log( + `Deleting calendar events from ${handles.join( + ', ', + )} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`, + ); + + const calendarChannels = await calendarChannelRepository.find({ + select: { + id: true, + handle: true, + connectedAccount: { + handleAliases: true, + }, }, - calendarChannelEventAssociations: { - calendarChannelId: Any(calendarChannelIds), + where: { + connectedAccount: { + accountOwnerId: workspaceMemberId, + }, }, - }, - }); + relations: ['connectedAccount'], + }); + + for (const calendarChannel of calendarChannels) { + const calendarChannelHandles = [calendarChannel.handle]; + + if (calendarChannel.connectedAccount.handleAliases) { + calendarChannelHandles.push( + ...calendarChannel.connectedAccount.handleAliases.split(','), + ); + } + + const handleConditions = handles.map((handle) => { + const isHandleDomain = handle.startsWith('@'); + + return isHandleDomain + ? { + handle: And( + Or(ILike(`%${handle}`), ILike(`%.${handle.slice(1)}`)), + Not(In(calendarChannelHandles)), + ), + } + : { handle }; + }); + + const calendarEventsAssociationsToDelete = + await calendarChannelEventAssociationRepository.find({ + where: { + calendarChannelId: calendarChannel.id, + calendarEvent: { + calendarEventParticipants: handleConditions, + }, + }, + }); + + if (calendarEventsAssociationsToDelete.length === 0) { + continue; + } + + await calendarChannelEventAssociationRepository.delete( + calendarEventsAssociationsToDelete.map(({ id }) => id), + ); + } + + this.logger.log( + `Deleted calendar events from handle ${handles.join( + ', ', + )} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`, + ); + } await this.calendarEventCleanerService.cleanWorkspaceCalendarEvents( workspaceId, ); - - this.logger.log( - `Deleted calendar events from handle ${handle} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`, - ); } } diff --git a/packages/twenty-server/src/modules/calendar/blocklist-manager/jobs/blocklist-reimport-calendar-events.job.ts b/packages/twenty-server/src/modules/calendar/blocklist-manager/jobs/blocklist-reimport-calendar-events.job.ts index a9501ad2f..494136ab2 100644 --- a/packages/twenty-server/src/modules/calendar/blocklist-manager/jobs/blocklist-reimport-calendar-events.job.ts +++ b/packages/twenty-server/src/modules/calendar/blocklist-manager/jobs/blocklist-reimport-calendar-events.job.ts @@ -1,23 +1,23 @@ import { Scope } from '@nestjs/common'; -import { Any } from 'typeorm'; +import { Not } from 'typeorm'; +import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; +import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; +import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service'; import { CalendarChannelSyncStage, CalendarChannelWorkspaceEntity, } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; -import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; -export type BlocklistReimportCalendarEventsJobData = { - workspaceId: string; - workspaceMemberId: string; -}; +export type BlocklistReimportCalendarEventsJobData = WorkspaceEventBatch< + ObjectRecordDeleteEvent +>; @Processor({ queueName: MessageQueue.calendarQueue, @@ -26,39 +26,38 @@ export type BlocklistReimportCalendarEventsJobData = { export class BlocklistReimportCalendarEventsJob { constructor( private readonly twentyORMManager: TwentyORMManager, - @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) - private readonly connectedAccountRepository: ConnectedAccountRepository, + private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService, ) {} @Process(BlocklistReimportCalendarEventsJob.name) async handle(data: BlocklistReimportCalendarEventsJobData): Promise { - const { workspaceId, workspaceMemberId } = data; - - const connectedAccounts = - await this.connectedAccountRepository.getAllByWorkspaceMemberId( - workspaceMemberId, - workspaceId, - ); - - if (!connectedAccounts || connectedAccounts.length === 0) { - return; - } + const workspaceId = data.workspaceId; const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - await calendarChannelRepository.update( - { - connectedAccountId: Any( - connectedAccounts.map((connectedAccount) => connectedAccount.id), - ), - }, - { - syncStage: - CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING, - }, - ); + for (const eventPayload of data.events) { + const workspaceMemberId = + eventPayload.properties.before.workspaceMemberId; + + const calendarChannels = await calendarChannelRepository.find({ + select: ['id'], + where: { + connectedAccount: { + accountOwnerId: workspaceMemberId, + }, + syncStage: Not( + CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING, + ), + }, + }); + + await this.calendarChannelSyncStatusService.resetAndScheduleFullCalendarEventListFetch( + calendarChannels.map((calendarChannel) => calendarChannel.id), + workspaceId, + ); + } } } diff --git a/packages/twenty-server/src/modules/calendar/blocklist-manager/listeners/calendar-blocklist.listener.ts b/packages/twenty-server/src/modules/calendar/blocklist-manager/listeners/calendar-blocklist.listener.ts index 7a2b7e0e8..9010dc185 100644 --- a/packages/twenty-server/src/modules/calendar/blocklist-manager/listeners/calendar-blocklist.listener.ts +++ b/packages/twenty-server/src/modules/calendar/blocklist-manager/listeners/calendar-blocklist.listener.ts @@ -31,16 +31,9 @@ export class CalendarBlocklistListener { ObjectRecordCreateEvent >, ) { - await Promise.all( - payload.events.map((eventPayload) => - this.messageQueueService.add( - BlocklistItemDeleteCalendarEventsJob.name, - { - workspaceId: payload.workspaceId, - blocklistItemId: eventPayload.recordId, - }, - ), - ), + await this.messageQueueService.add( + BlocklistItemDeleteCalendarEventsJob.name, + payload, ); } @@ -50,17 +43,9 @@ export class CalendarBlocklistListener { ObjectRecordDeleteEvent >, ) { - await Promise.all( - payload.events.map((eventPayload) => - this.messageQueueService.add( - BlocklistReimportCalendarEventsJob.name, - { - workspaceId: payload.workspaceId, - workspaceMemberId: - eventPayload.properties.before.workspaceMember.id, - }, - ), - ), + await this.messageQueueService.add( + BlocklistReimportCalendarEventsJob.name, + payload, ); } @@ -70,31 +55,14 @@ export class CalendarBlocklistListener { ObjectRecordUpdateEvent >, ) { - await Promise.all( - payload.events.reduce((acc: Promise[], eventPayload) => { - acc.push( - this.messageQueueService.add( - BlocklistItemDeleteCalendarEventsJob.name, - { - workspaceId: payload.workspaceId, - blocklistItemId: eventPayload.recordId, - }, - ), - ); + await this.messageQueueService.add( + BlocklistItemDeleteCalendarEventsJob.name, + payload, + ); - acc.push( - this.messageQueueService.add( - BlocklistReimportCalendarEventsJob.name, - { - workspaceId: payload.workspaceId, - workspaceMemberId: - eventPayload.properties.after.workspaceMember.id, - }, - ), - ); - - return acc; - }, []), + await this.messageQueueService.add( + BlocklistReimportCalendarEventsJob.name, + payload, ); } } diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts index 1e2ff015a..0e472d465 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts @@ -16,12 +16,13 @@ import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event import { GoogleCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/google-calendar-driver.module'; import { CalendarEventListFetchJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job'; import { CalendarOngoingStaleJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job'; -import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service'; import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service'; import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service'; import { CalendarGetCalendarEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service'; import { CalendarSaveEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service'; import { CalendarEventParticipantManagerModule } from 'src/modules/calendar/calendar-event-participant-manager/calendar-event-participant-manager.module'; +import { CalendarCommonModule } from 'src/modules/calendar/common/calendar-common.module'; +import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service'; import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module'; import { RefreshAccessTokenManagerModule } from 'src/modules/connected-account/refresh-access-token-manager/refresh-access-token-manager.module'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @@ -44,6 +45,7 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta RefreshAccessTokenManagerModule, CalendarEventParticipantManagerModule, ConnectedAccountModule, + CalendarCommonModule, ], providers: [ CalendarChannelSyncStatusService, diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job.ts index bcf5805d1..ee579d44f 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job.ts @@ -6,8 +6,8 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service'; import { isSyncStale } from 'src/modules/calendar/calendar-event-import-manager/utils/is-sync-stale.util'; +import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service'; import { CalendarChannelSyncStage, CalendarChannelWorkspaceEntity, @@ -54,19 +54,19 @@ export class CalendarOngoingStaleJob { this.logger.log( `Sync for calendar channel ${calendarChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to pending`, ); - await this.calendarChannelSyncStatusService.resetSyncStageStartedAt( + await this.calendarChannelSyncStatusService.resetSyncStageStartedAt([ calendarChannel.id, - ); + ]); switch (calendarChannel.syncStage) { case CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING: await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch( - calendarChannel.id, + [calendarChannel.id], ); break; case CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING: await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport( - calendarChannel.id, + [calendarChannel.id], ); break; default: diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service.ts index 8f1de59aa..0bb40aa6b 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service.ts @@ -10,7 +10,7 @@ import { CalendarEventImportException, CalendarEventImportExceptionCode, } from 'src/modules/calendar/calendar-event-import-manager/exceptions/calendar-event-import.exception'; -import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service'; +import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service'; import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; export enum CalendarEventImportSyncStep { @@ -81,7 +81,7 @@ export class CalendarEventImportErrorHandlerService { calendarChannel.throttleFailureCount >= CALENDAR_THROTTLE_MAX_ATTEMPTS ) { await this.calendarChannelSyncStatusService.markAsFailedUnknownAndFlushCalendarEventsToImport( - calendarChannel.id, + [calendarChannel.id], workspaceId, ); @@ -104,19 +104,19 @@ export class CalendarEventImportErrorHandlerService { switch (syncStep) { case CalendarEventImportSyncStep.FULL_CALENDAR_EVENT_LIST_FETCH: await this.calendarChannelSyncStatusService.scheduleFullCalendarEventListFetch( - calendarChannel.id, + [calendarChannel.id], ); break; case CalendarEventImportSyncStep.PARTIAL_CALENDAR_EVENT_LIST_FETCH: await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch( - calendarChannel.id, + [calendarChannel.id], ); break; case CalendarEventImportSyncStep.CALENDAR_EVENTS_IMPORT: await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport( - calendarChannel.id, + [calendarChannel.id], ); break; @@ -130,7 +130,7 @@ export class CalendarEventImportErrorHandlerService { workspaceId: string, ): Promise { await this.calendarChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushCalendarEventsToImport( - calendarChannel.id, + [calendarChannel.id], workspaceId, ); } @@ -141,7 +141,7 @@ export class CalendarEventImportErrorHandlerService { workspaceId: string, ): Promise { await this.calendarChannelSyncStatusService.markAsFailedUnknownAndFlushCalendarEventsToImport( - calendarChannel.id, + [calendarChannel.id], workspaceId, ); @@ -163,7 +163,7 @@ export class CalendarEventImportErrorHandlerService { } await this.calendarChannelSyncStatusService.resetAndScheduleFullCalendarEventListFetch( - calendarChannel.id, + [calendarChannel.id], workspaceId, ); } diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts index d2b7a6181..0dd215b88 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts @@ -7,7 +7,6 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { CalendarEventCleanerService } from 'src/modules/calendar/calendar-event-cleaner/services/calendar-event-cleaner.service'; -import { CalendarChannelSyncStatusService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service'; import { CalendarEventImportErrorHandlerService, CalendarEventImportSyncStep, @@ -18,6 +17,7 @@ import { } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service'; import { CalendarSaveEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service'; import { filterEventsAndReturnCancelledEvents } from 'src/modules/calendar/calendar-event-import-manager/utils/filter-events.util'; +import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service'; import { CalendarChannelEventAssociationWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel-event-association.workspace-entity'; import { CalendarChannelSyncStage, @@ -50,7 +50,7 @@ export class CalendarEventsImportService { : CalendarEventImportSyncStep.PARTIAL_CALENDAR_EVENT_LIST_FETCH; await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchOngoing( - calendarChannel.id, + [calendarChannel.id], ); let calendarEvents: GetCalendarEventsResponse['calendarEvents'] = []; let nextSyncCursor: GetCalendarEventsResponse['nextSyncCursor'] = ''; @@ -81,7 +81,7 @@ export class CalendarEventsImportService { ); await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch( - calendarChannel.id, + [calendarChannel.id], ); } @@ -92,7 +92,10 @@ export class CalendarEventsImportService { const { filteredEvents, cancelledEvents } = filterEventsAndReturnCancelledEvents( - calendarChannel, + [ + calendarChannel.handle, + ...connectedAccount.handleAliases.split(','), + ], calendarEvents, blocklist.map((blocklist) => blocklist.handle), ); @@ -133,8 +136,8 @@ export class CalendarEventsImportService { }, ); - await this.calendarChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( - calendarChannel.id, + await this.calendarChannelSyncStatusService.markAsCompletedAndSchedulePartialCalendarEventListFetch( + [calendarChannel.id], ); } catch (error) { await this.calendarEventImportErrorHandlerService.handleDriverException( diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/filter-events.util.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/filter-events.util.ts index b3e7e0d1d..2cc28f3a0 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/filter-events.util.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/filter-events.util.ts @@ -1,9 +1,8 @@ import { filterOutBlocklistedEvents } from 'src/modules/calendar/calendar-event-import-manager/utils/filter-out-blocklisted-events.util'; -import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types/calendar-event'; export const filterEventsAndReturnCancelledEvents = ( - calendarChannel: Pick, + calendarChannelHandles: string[], events: CalendarEventWithParticipants[], blocklist: string[], ): { @@ -11,7 +10,7 @@ export const filterEventsAndReturnCancelledEvents = ( cancelledEvents: CalendarEventWithParticipants[]; } => { const filteredEvents = filterOutBlocklistedEvents( - calendarChannel.handle, + calendarChannelHandles, events, blocklist, ); diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/filter-out-blocklisted-events.util.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/filter-out-blocklisted-events.util.ts index c850f653d..d6a82a978 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/filter-out-blocklisted-events.util.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/utils/filter-out-blocklisted-events.util.ts @@ -2,7 +2,7 @@ import { isEmailBlocklisted } from 'src/modules/blocklist/utils/is-email-blockli import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types/calendar-event'; export const filterOutBlocklistedEvents = ( - calendarChannelHandle: string, + calendarChannelHandles: string[], events: CalendarEventWithParticipants[], blocklist: string[], ) => { @@ -13,7 +13,7 @@ export const filterOutBlocklistedEvents = ( return event.participants.every( (attendee) => - !isEmailBlocklisted(calendarChannelHandle, attendee.handle, blocklist), + !isEmailBlocklisted(calendarChannelHandles, attendee.handle, blocklist), ); }); }; diff --git a/packages/twenty-server/src/modules/calendar/calendar.module.ts b/packages/twenty-server/src/modules/calendar/calendar.module.ts index ebcd340e4..de5759c18 100644 --- a/packages/twenty-server/src/modules/calendar/calendar.module.ts +++ b/packages/twenty-server/src/modules/calendar/calendar.module.ts @@ -4,6 +4,7 @@ import { CalendarBlocklistManagerModule } from 'src/modules/calendar/blocklist-m import { CalendarEventCleanerModule } from 'src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module'; import { CalendarEventImportManagerModule } from 'src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module'; import { CalendarEventParticipantManagerModule } from 'src/modules/calendar/calendar-event-participant-manager/calendar-event-participant-manager.module'; +import { CalendarCommonModule } from 'src/modules/calendar/common/calendar-common.module'; @Module({ imports: [ @@ -11,6 +12,7 @@ import { CalendarEventParticipantManagerModule } from 'src/modules/calendar/cale CalendarEventCleanerModule, CalendarEventImportManagerModule, CalendarEventParticipantManagerModule, + CalendarCommonModule, ], providers: [], exports: [], diff --git a/packages/twenty-server/src/modules/calendar/common/calendar-common.module.ts b/packages/twenty-server/src/modules/calendar/common/calendar-common.module.ts new file mode 100644 index 000000000..9227a3520 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/common/calendar-common.module.ts @@ -0,0 +1,18 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; + +import { FeatureFlagEntity } from 'src/engine/core-modules/feature-flag/feature-flag.entity'; +import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module'; +import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service'; +import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module'; + +@Module({ + imports: [ + WorkspaceDataSourceModule, + TypeOrmModule.forFeature([FeatureFlagEntity], 'core'), + ConnectedAccountModule, + ], + providers: [CalendarChannelSyncStatusService], + exports: [CalendarChannelSyncStatusService], +}) +export class CalendarCommonModule {} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts b/packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts similarity index 56% rename from packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts rename to packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts index 01345eebb..dfa8ac279 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/calendar/common/services/calendar-channel-sync-status.service.ts @@ -1,13 +1,11 @@ import { Injectable } from '@nestjs/common'; +import { Any } from 'typeorm'; + import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { - CalendarEventImportException, - CalendarEventImportExceptionCode, -} from 'src/modules/calendar/calendar-event-import-manager/exceptions/calendar-event-import.exception'; import { CalendarChannelSyncStage, CalendarChannelSyncStatus, @@ -26,39 +24,55 @@ export class CalendarChannelSyncStatusService { private readonly accountsToReconnectService: AccountsToReconnectService, ) {} - public async scheduleFullCalendarEventListFetch(calendarChannelId: string) { + public async scheduleFullCalendarEventListFetch( + calendarChannelIds: string[], + ) { + if (!calendarChannelIds.length) { + return; + } + const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - await calendarChannelRepository.update(calendarChannelId, { + await calendarChannelRepository.update(calendarChannelIds, { syncStage: CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING, }); } public async schedulePartialCalendarEventListFetch( - calendarChannelId: string, + calendarChannelIds: string[], ) { + if (!calendarChannelIds.length) { + return; + } + const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - await calendarChannelRepository.update(calendarChannelId, { + await calendarChannelRepository.update(calendarChannelIds, { syncStage: CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING, }); } - public async markAsCalendarEventListFetchOngoing(calendarChannelId: string) { + public async markAsCalendarEventListFetchOngoing( + calendarChannelIds: string[], + ) { + if (!calendarChannelIds.length) { + return; + } + const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - await calendarChannelRepository.update(calendarChannelId, { + await calendarChannelRepository.update(calendarChannelIds, { syncStage: CalendarChannelSyncStage.CALENDAR_EVENT_LIST_FETCH_ONGOING, syncStatus: CalendarChannelSyncStatus.ONGOING, syncStageStartedAt: new Date().toISOString(), @@ -66,58 +80,92 @@ export class CalendarChannelSyncStatusService { } public async resetAndScheduleFullCalendarEventListFetch( - calendarChannelId: string, + calendarChannelIds: string[], workspaceId: string, ) { - await this.cacheStorage.del( - `calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, - ); + if (!calendarChannelIds.length) { + return; + } + + for (const calendarChannelId of calendarChannelIds) { + await this.cacheStorage.del( + `calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, + ); + } const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - await calendarChannelRepository.update(calendarChannelId, { + await calendarChannelRepository.update(calendarChannelIds, { syncCursor: '', syncStageStartedAt: null, throttleFailureCount: 0, }); - await this.scheduleFullCalendarEventListFetch(calendarChannelId); + await this.scheduleFullCalendarEventListFetch(calendarChannelIds); } - public async resetSyncStageStartedAt(calendarChannelId: string) { + public async resetSyncStageStartedAt(calendarChannelIds: string[]) { + if (!calendarChannelIds.length) { + return; + } + const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - await calendarChannelRepository.update(calendarChannelId, { + await calendarChannelRepository.update(calendarChannelIds, { syncStageStartedAt: null, }); } - public async scheduleCalendarEventsImport(calendarChannelId: string) { + public async scheduleCalendarEventsImport(calendarChannelIds: string[]) { + if (!calendarChannelIds.length) { + return; + } + const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - await calendarChannelRepository.update(calendarChannelId, { + await calendarChannelRepository.update(calendarChannelIds, { syncStage: CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING, }); } - public async markAsCompletedAndSchedulePartialMessageListFetch( - calendarChannelId: string, - ) { + public async markAsCalendarEventsImportOngoing(calendarChannelIds: string[]) { + if (!calendarChannelIds.length) { + return; + } + const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - await calendarChannelRepository.update(calendarChannelId, { + await calendarChannelRepository.update(calendarChannelIds, { + syncStage: CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_ONGOING, + syncStatus: CalendarChannelSyncStatus.ONGOING, + }); + } + + public async markAsCompletedAndSchedulePartialCalendarEventListFetch( + calendarChannelIds: string[], + ) { + if (!calendarChannelIds.length) { + return; + } + + const calendarChannelRepository = + await this.twentyORMManager.getRepository( + 'calendarChannel', + ); + + await calendarChannelRepository.update(calendarChannelIds, { syncStage: CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING, syncStatus: CalendarChannelSyncStatus.ACTIVE, @@ -125,42 +173,53 @@ export class CalendarChannelSyncStatusService { syncStageStartedAt: null, }); - await this.schedulePartialCalendarEventListFetch(calendarChannelId); + await this.schedulePartialCalendarEventListFetch(calendarChannelIds); } public async markAsFailedUnknownAndFlushCalendarEventsToImport( - calendarChannelId: string, + calendarChannelIds: string[], workspaceId: string, ) { + if (!calendarChannelIds.length) { + return; + } + const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - await this.cacheStorage.del( - `calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, - ); + for (const calendarChannelId of calendarChannelIds) { + await this.cacheStorage.del( + `calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, + ); + } - await calendarChannelRepository.update(calendarChannelId, { + await calendarChannelRepository.update(calendarChannelIds, { syncStatus: CalendarChannelSyncStatus.FAILED_UNKNOWN, syncStage: CalendarChannelSyncStage.FAILED, }); } public async markAsFailedInsufficientPermissionsAndFlushCalendarEventsToImport( - calendarChannelId: string, + calendarChannelIds: string[], workspaceId: string, ) { + if (!calendarChannelIds.length) { + return; + } + const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - await this.cacheStorage.del( - `calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, - ); - - await calendarChannelRepository.update(calendarChannelId, { + for (const calendarChannelId of calendarChannelIds) { + await this.cacheStorage.del( + `calendar-events-to-import:${workspaceId}:google-calendar:${calendarChannelId}`, + ); + } + await calendarChannelRepository.update(calendarChannelIds, { syncStatus: CalendarChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, syncStage: CalendarChannelSyncStage.FAILED, }); @@ -170,41 +229,44 @@ export class CalendarChannelSyncStatusService { 'connectedAccount', ); - const calendarChannel = await calendarChannelRepository.findOne({ - where: { id: calendarChannelId }, + const calendarChannels = await calendarChannelRepository.find({ + select: ['id', 'connectedAccountId'], + where: { id: Any(calendarChannelIds) }, }); - if (!calendarChannel) { - throw new CalendarEventImportException( - `Calendar channel ${calendarChannelId} not found in workspace ${workspaceId}`, - CalendarEventImportExceptionCode.CALENDAR_CHANNEL_NOT_FOUND, - ); - } - - const connectedAccountId = calendarChannel.connectedAccountId; + const connectedAccountIds = calendarChannels.map( + (calendarChannel) => calendarChannel.connectedAccountId, + ); await connectedAccountRepository.update( - { id: connectedAccountId }, + { id: Any(connectedAccountIds) }, { authFailedAt: new Date(), }, ); - await this.addToAccountsToReconnect(calendarChannelId, workspaceId); + await this.addToAccountsToReconnect( + calendarChannels.map((calendarChannel) => calendarChannel.id), + workspaceId, + ); } private async addToAccountsToReconnect( - calendarChannelId: string, + calendarChannelIds: string[], workspaceId: string, ) { + if (!calendarChannelIds.length) { + return; + } + const calendarChannelRepository = await this.twentyORMManager.getRepository( 'calendarChannel', ); - const calendarChannel = await calendarChannelRepository.findOne({ + const calendarChannels = await calendarChannelRepository.find({ where: { - id: calendarChannelId, + id: Any(calendarChannelIds), }, relations: { connectedAccount: { @@ -213,18 +275,16 @@ export class CalendarChannelSyncStatusService { }, }); - if (!calendarChannel) { - return; + for (const calendarChannel of calendarChannels) { + const userId = calendarChannel.connectedAccount.accountOwner.userId; + const connectedAccountId = calendarChannel.connectedAccount.id; + + await this.accountsToReconnectService.addAccountToReconnectByKey( + AccountsToReconnectKeys.ACCOUNTS_TO_RECONNECT_INSUFFICIENT_PERMISSIONS, + userId, + workspaceId, + connectedAccountId, + ); } - - const userId = calendarChannel.connectedAccount.accountOwner.userId; - const connectedAccountId = calendarChannel.connectedAccount.id; - - await this.accountsToReconnectService.addAccountToReconnectByKey( - AccountsToReconnectKeys.ACCOUNTS_TO_RECONNECT_INSUFFICIENT_PERMISSIONS, - userId, - workspaceId, - connectedAccountId, - ); } } diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts index 3c7d37204..b9396eb83 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job.ts @@ -1,21 +1,21 @@ import { Logger, Scope } from '@nestjs/common'; -import { Any } from 'typeorm'; +import { And, Any, ILike, In, Not, Or } from 'typeorm'; +import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; -import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity'; +import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingMessageCleanerService } from 'src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service'; -export type BlocklistItemDeleteMessagesJobData = { - workspaceId: string; - blocklistItemId: string; -}; +export type BlocklistItemDeleteMessagesJobData = WorkspaceEventBatch< + ObjectRecordCreateEvent +>; @Processor({ queueName: MessageQueue.messagingQueue, @@ -25,66 +25,135 @@ export class BlocklistItemDeleteMessagesJob { private readonly logger = new Logger(BlocklistItemDeleteMessagesJob.name); constructor( - @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) - private readonly blocklistRepository: BlocklistRepository, private readonly threadCleanerService: MessagingMessageCleanerService, private readonly twentyORMManager: TwentyORMManager, ) {} @Process(BlocklistItemDeleteMessagesJob.name) async handle(data: BlocklistItemDeleteMessagesJobData): Promise { - const { workspaceId, blocklistItemId } = data; + const workspaceId = data.workspaceId; - const blocklistItem = await this.blocklistRepository.getById( - blocklistItemId, - workspaceId, + const blocklistItemIds = data.events.map( + (eventPayload) => eventPayload.recordId, ); - if (!blocklistItem) { - this.logger.log( - `Blocklist item with id ${blocklistItemId} not found in workspace ${workspaceId}`, + const blocklistRepository = + await this.twentyORMManager.getRepository( + 'blocklist', ); - return; - } + const blocklist = await blocklistRepository.find({ + where: { + id: Any(blocklistItemIds), + }, + }); - const { handle, workspaceMemberId } = blocklistItem; + const handlesToDeleteByWorkspaceMemberIdMap = blocklist.reduce( + (acc, blocklistItem) => { + const { handle, workspaceMemberId } = blocklistItem; - this.logger.log( - `Deleting messages from ${handle} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`, + if (!acc.has(workspaceMemberId)) { + acc.set(workspaceMemberId, []); + } + + acc.get(workspaceMemberId)?.push(handle); + + return acc; + }, + new Map(), ); - if (!workspaceMemberId) { - throw new Error( - `Workspace member ID is not defined for blocklist item ${blocklistItemId} in workspace ${workspaceId}`, + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', ); - } const messageChannelMessageAssociationRepository = await this.twentyORMManager.getRepository( 'messageChannelMessageAssociation', ); - const rolesToDelete: ('from' | 'to')[] = ['from', 'to']; + for (const workspaceMemberId of handlesToDeleteByWorkspaceMemberIdMap.keys()) { + const handles = + handlesToDeleteByWorkspaceMemberIdMap.get(workspaceMemberId); - await messageChannelMessageAssociationRepository.delete({ - messageChannel: { - connectedAccount: { - accountOwnerId: workspaceMemberId, + if (!handles) { + continue; + } + + this.logger.log( + `Deleting messages from ${handles.join( + ', ', + )} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`, + ); + + const rolesToDelete: ('from' | 'to')[] = ['from', 'to']; + + const messageChannels = await messageChannelRepository.find({ + select: { + id: true, + handle: true, + connectedAccount: { + handleAliases: true, + }, }, - }, - message: { - messageParticipants: { - handle, - role: Any(rolesToDelete), + where: { + connectedAccount: { + accountOwnerId: workspaceMemberId, + }, }, - }, - }); + relations: ['connectedAccount'], + }); + + for (const messageChannel of messageChannels) { + const messageChannelHandles = [messageChannel.handle]; + + if (messageChannel.connectedAccount.handleAliases) { + messageChannelHandles.push( + ...messageChannel.connectedAccount.handleAliases.split(','), + ); + } + + const handleConditions = handles.map((handle) => { + const isHandleDomain = handle.startsWith('@'); + + return isHandleDomain + ? { + handle: And( + Or(ILike(`%${handle}`), ILike(`%.${handle.slice(1)}`)), + Not(In(messageChannelHandles)), + ), + role: In(rolesToDelete), + } + : { handle, role: In(rolesToDelete) }; + }); + + const messageChannelMessageAssociationsToDelete = + await messageChannelMessageAssociationRepository.find({ + where: { + messageChannelId: messageChannel.id, + message: { + messageParticipants: handleConditions, + }, + }, + }); + + if (messageChannelMessageAssociationsToDelete.length === 0) { + continue; + } + + await messageChannelMessageAssociationRepository.delete( + messageChannelMessageAssociationsToDelete.map(({ id }) => id), + ); + } + + this.logger.log( + `Deleted messages from handle ${handles.join( + ', ', + )} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`, + ); + } await this.threadCleanerService.cleanWorkspaceThreads(workspaceId); - - this.logger.log( - `Deleted messages from handle ${handle} in workspace ${workspaceId} for workspace member ${workspaceMemberId}`, - ); } } diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts new file mode 100644 index 000000000..0d2a76794 --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job.ts @@ -0,0 +1,63 @@ +import { Scope } from '@nestjs/common'; + +import { Not } from 'typeorm'; + +import { ObjectRecordDeleteEvent } from 'src/engine/integrations/event-emitter/types/object-record-delete.event'; +import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; +import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; +import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; +import { + MessageChannelSyncStage, + MessageChannelWorkspaceEntity, +} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; + +export type BlocklistReimportMessagesJobData = WorkspaceEventBatch< + ObjectRecordDeleteEvent +>; + +@Processor({ + queueName: MessageQueue.messagingQueue, + scope: Scope.REQUEST, +}) +export class BlocklistReimportMessagesJob { + constructor( + private readonly twentyORMManager: TwentyORMManager, + private readonly messagingChannelSyncStatusService: MessageChannelSyncStatusService, + ) {} + + @Process(BlocklistReimportMessagesJob.name) + async handle(data: BlocklistReimportMessagesJobData): Promise { + const workspaceId = data.workspaceId; + + const messageChannelRepository = + await this.twentyORMManager.getRepository( + 'messageChannel', + ); + + for (const eventPayload of data.events) { + const workspaceMemberId = + eventPayload.properties.before.workspaceMemberId; + + const messageChannels = await messageChannelRepository.find({ + select: ['id'], + where: { + connectedAccount: { + accountOwnerId: workspaceMemberId, + }, + syncStage: Not( + MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, + ), + }, + }); + + await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( + messageChannels.map((messageChannel) => messageChannel.id), + workspaceId, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts index 4335cf29d..db2ded2d4 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener.ts @@ -1,4 +1,4 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Scope } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { ObjectRecordCreateEvent } from 'src/engine/integrations/event-emitter/types/object-record-create.event'; @@ -7,28 +7,22 @@ import { ObjectRecordUpdateEvent } from 'src/engine/integrations/event-emitter/t import { InjectMessageQueue } from 'src/engine/integrations/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/integrations/message-queue/services/message-queue.service'; -import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; -import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; -import { ConnectedAccountRepository } from 'src/modules/connected-account/repositories/connected-account.repository'; -import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; import { BlocklistItemDeleteMessagesJob, BlocklistItemDeleteMessagesJobData, } from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job'; -import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service'; -import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { + BlocklistReimportMessagesJob, + BlocklistReimportMessagesJobData, +} from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job'; -@Injectable() +@Injectable({ scope: Scope.REQUEST }) export class MessagingBlocklistListener { constructor( @InjectMessageQueue(MessageQueue.messagingQueue) private readonly messageQueueService: MessageQueueService, - @InjectObjectMetadataRepository(ConnectedAccountWorkspaceEntity) - private readonly connectedAccountRepository: ConnectedAccountRepository, - private readonly messagingChannelSyncStatusService: MessageChannelSyncStatusService, - private readonly twentyORMManager: TwentyORMManager, ) {} @OnEvent('blocklist.created') @@ -37,17 +31,9 @@ export class MessagingBlocklistListener { ObjectRecordCreateEvent >, ) { - await Promise.all( - payload.events.map((eventPayload) => - // TODO: modify to pass an array of blocklist items - this.messageQueueService.add( - BlocklistItemDeleteMessagesJob.name, - { - workspaceId: payload.workspaceId, - blocklistItemId: eventPayload.recordId, - }, - ), - ), + await this.messageQueueService.add( + BlocklistItemDeleteMessagesJob.name, + payload, ); } @@ -57,38 +43,10 @@ export class MessagingBlocklistListener { ObjectRecordDeleteEvent >, ) { - const workspaceId = payload.workspaceId; - - for (const eventPayload of payload.events) { - const workspaceMemberId = - eventPayload.properties.before.workspaceMember.id; - - const connectedAccount = - await this.connectedAccountRepository.getAllByWorkspaceMemberId( - workspaceMemberId, - workspaceId, - ); - - if (!connectedAccount || connectedAccount.length === 0) { - return; - } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - const messageChannel = await messageChannelRepository.findOneOrFail({ - where: { - connectedAccountId: connectedAccount[0].id, - }, - }); - - await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( - messageChannel.id, - workspaceId, - ); - } + await this.messageQueueService.add( + BlocklistReimportMessagesJob.name, + payload, + ); } @OnEvent('blocklist.updated') @@ -97,45 +55,14 @@ export class MessagingBlocklistListener { ObjectRecordUpdateEvent >, ) { - const workspaceId = payload.workspaceId; + await this.messageQueueService.add( + BlocklistItemDeleteMessagesJob.name, + payload, + ); - for (const eventPayload of payload.events) { - const workspaceMemberId = - eventPayload.properties.before.workspaceMember.id; - - await this.messageQueueService.add( - BlocklistItemDeleteMessagesJob.name, - { - workspaceId, - blocklistItemId: eventPayload.recordId, - }, - ); - - const connectedAccount = - await this.connectedAccountRepository.getAllByWorkspaceMemberId( - workspaceMemberId, - workspaceId, - ); - - if (!connectedAccount || connectedAccount.length === 0) { - continue; - } - - const messageChannelRepository = - await this.twentyORMManager.getRepository( - 'messageChannel', - ); - - const messageChannel = await messageChannelRepository.findOneOrFail({ - where: { - connectedAccountId: connectedAccount[0].id, - }, - }); - - await this.messagingChannelSyncStatusService.resetAndScheduleFullMessageListFetch( - messageChannel.id, - workspaceId, - ); - } + await this.messageQueueService.add( + BlocklistReimportMessagesJob.name, + payload, + ); } } diff --git a/packages/twenty-server/src/modules/messaging/blocklist-manager/messaging-blocklist-manager.module.ts b/packages/twenty-server/src/modules/messaging/blocklist-manager/messaging-blocklist-manager.module.ts index 0ed9ad1fe..716f67b88 100644 --- a/packages/twenty-server/src/modules/messaging/blocklist-manager/messaging-blocklist-manager.module.ts +++ b/packages/twenty-server/src/modules/messaging/blocklist-manager/messaging-blocklist-manager.module.ts @@ -1,6 +1,7 @@ import { Module } from '@nestjs/common'; import { BlocklistItemDeleteMessagesJob } from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-item-delete-messages.job'; +import { BlocklistReimportMessagesJob } from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job'; import { MessagingBlocklistListener } from 'src/modules/messaging/blocklist-manager/listeners/messaging-blocklist.listener'; import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module'; import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cleaner/messaging-message-cleaner.module'; @@ -9,10 +10,8 @@ import { MessagingMessageCleanerModule } from 'src/modules/messaging/message-cle imports: [MessagingCommonModule, MessagingMessageCleanerModule], providers: [ MessagingBlocklistListener, - { - provide: BlocklistItemDeleteMessagesJob.name, - useClass: BlocklistItemDeleteMessagesJob, - }, + BlocklistItemDeleteMessagesJob, + BlocklistReimportMessagesJob, ], exports: [], }) diff --git a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts index 6be3ad36a..bc29575c5 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/message-channel-sync-status.service.ts @@ -1,5 +1,7 @@ import { Injectable } from '@nestjs/common'; +import { Any } from 'typeorm'; + import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; @@ -12,10 +14,6 @@ import { MessageChannelSyncStatus, MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; -import { - MessageImportException, - MessageImportExceptionCode, -} from 'src/modules/messaging/message-import-manager/exceptions/message-import.exception'; @Injectable() export class MessageChannelSyncStatusService { @@ -26,216 +24,235 @@ export class MessageChannelSyncStatusService { private readonly accountsToReconnectService: AccountsToReconnectService, ) {} - public async scheduleFullMessageListFetch(messageChannelId: string) { + public async scheduleFullMessageListFetch(messageChannelIds: string[]) { + if (!messageChannelIds.length) { + return; + } + const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - await messageChannelRepository.update( - { id: messageChannelId }, - { - syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, - }, - ); + await messageChannelRepository.update(messageChannelIds, { + syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, + }); } - public async schedulePartialMessageListFetch(messageChannelId: string) { + public async schedulePartialMessageListFetch(messageChannelIds: string[]) { + if (!messageChannelIds.length) { + return; + } + const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - await messageChannelRepository.update( - { id: messageChannelId }, - { - syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, - }, - ); + await messageChannelRepository.update(messageChannelIds, { + syncStage: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, + }); } - public async scheduleMessagesImport(messageChannelId: string) { + public async scheduleMessagesImport(messageChannelIds: string[]) { + if (!messageChannelIds.length) { + return; + } + const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - await messageChannelRepository.update( - { id: messageChannelId }, - { - syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, - }, - ); + await messageChannelRepository.update(messageChannelIds, { + syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, + }); } public async resetAndScheduleFullMessageListFetch( - messageChannelId: string, + messageChannelIds: string[], workspaceId: string, ) { - await this.cacheStorage.del( - `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, - ); + if (!messageChannelIds.length) { + return; + } + + for (const messageChannelId of messageChannelIds) { + await this.cacheStorage.del( + `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, + ); + } const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - await messageChannelRepository.update( - { id: messageChannelId }, - { - syncCursor: '', - syncStageStartedAt: null, - throttleFailureCount: 0, - }, - ); + await messageChannelRepository.update(messageChannelIds, { + syncCursor: '', + syncStageStartedAt: null, + throttleFailureCount: 0, + }); - await this.scheduleFullMessageListFetch(messageChannelId); + await this.scheduleFullMessageListFetch(messageChannelIds); } - public async resetSyncStageStartedAt(messageChannelId: string) { + public async resetSyncStageStartedAt(messageChannelIds: string[]) { + if (!messageChannelIds.length) { + return; + } + const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - await messageChannelRepository.update( - { id: messageChannelId }, - { - syncStageStartedAt: null, - }, - ); + await messageChannelRepository.update(messageChannelIds, { + syncStageStartedAt: null, + }); } - public async markAsMessagesListFetchOngoing(messageChannelId: string) { + public async markAsMessagesListFetchOngoing(messageChannelIds: string[]) { + if (!messageChannelIds.length) { + return; + } + const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - await messageChannelRepository.update( - { id: messageChannelId }, - { - syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, - syncStatus: MessageChannelSyncStatus.ONGOING, - }, - ); + await messageChannelRepository.update(messageChannelIds, { + syncStage: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, + syncStatus: MessageChannelSyncStatus.ONGOING, + }); } public async markAsCompletedAndSchedulePartialMessageListFetch( - messageChannelId: string, + messageChannelIds: string[], ) { + if (!messageChannelIds.length) { + return; + } + const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - await messageChannelRepository.update( - { id: messageChannelId }, - { - syncStatus: MessageChannelSyncStatus.ACTIVE, - }, - ); + await messageChannelRepository.update(messageChannelIds, { + syncStatus: MessageChannelSyncStatus.ACTIVE, + }); - await this.schedulePartialMessageListFetch(messageChannelId); + await this.schedulePartialMessageListFetch(messageChannelIds); } - public async markAsMessagesImportOngoing(messageChannelId: string) { + public async markAsMessagesImportOngoing(messageChannelIds: string[]) { + if (!messageChannelIds.length) { + return; + } + const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - await messageChannelRepository.update( - { id: messageChannelId }, - { - syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, - }, - ); + await messageChannelRepository.update(messageChannelIds, { + syncStage: MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, + }); } public async markAsFailedUnknownAndFlushMessagesToImport( - messageChannelId: string, + messageChannelIds: string[], workspaceId: string, ) { - await this.cacheStorage.del( - `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, - ); + if (!messageChannelIds.length) { + return; + } + + for (const messageChannelId of messageChannelIds) { + await this.cacheStorage.del( + `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, + ); + } const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - await messageChannelRepository.update( - { id: messageChannelId }, - { - syncStage: MessageChannelSyncStage.FAILED, - syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN, - }, - ); + await messageChannelRepository.update(messageChannelIds, { + syncStage: MessageChannelSyncStage.FAILED, + syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN, + }); } public async markAsFailedInsufficientPermissionsAndFlushMessagesToImport( - messageChannelId: string, + messageChannelIds: string[], workspaceId: string, ) { - await this.cacheStorage.del( - `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, - ); + if (!messageChannelIds.length) { + return; + } + + for (const messageChannelId of messageChannelIds) { + await this.cacheStorage.del( + `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, + ); + } const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - await messageChannelRepository.update( - { id: messageChannelId }, - { - syncStage: MessageChannelSyncStage.FAILED, - syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, - }, - ); + await messageChannelRepository.update(messageChannelIds, { + syncStage: MessageChannelSyncStage.FAILED, + syncStatus: MessageChannelSyncStatus.FAILED_INSUFFICIENT_PERMISSIONS, + }); const connectedAccountRepository = await this.twentyORMManager.getRepository( 'connectedAccount', ); - const messageChannel = await messageChannelRepository.findOne({ - where: { id: messageChannelId }, + const messageChannels = await messageChannelRepository.find({ + select: ['id', 'connectedAccountId'], + where: { id: Any(messageChannelIds) }, }); - if (!messageChannel) { - throw new MessageImportException( - `Message channel ${messageChannelId} not found in workspace ${workspaceId}`, - MessageImportExceptionCode.MESSAGE_CHANNEL_NOT_FOUND, - ); - } - - const connectedAccountId = messageChannel.connectedAccountId; + const connectedAccountIds = messageChannels.map( + (messageChannel) => messageChannel.connectedAccountId, + ); await connectedAccountRepository.update( - { id: connectedAccountId }, + { id: Any(connectedAccountIds) }, { authFailedAt: new Date(), }, ); - await this.addToAccountsToReconnect(messageChannelId, workspaceId); + await this.addToAccountsToReconnect( + messageChannels.map((messageChannel) => messageChannel.id), + workspaceId, + ); } private async addToAccountsToReconnect( - messageChannelId: string, + messageChannelIds: string[], workspaceId: string, ) { + if (!messageChannelIds.length) { + return; + } + const messageChannelRepository = await this.twentyORMManager.getRepository( 'messageChannel', ); - const messageChannel = await messageChannelRepository.findOne({ - where: { id: messageChannelId }, + const messageChannels = await messageChannelRepository.find({ + where: { id: Any(messageChannelIds) }, relations: { connectedAccount: { accountOwner: true, @@ -243,18 +260,16 @@ export class MessageChannelSyncStatusService { }, }); - if (!messageChannel) { - return; + for (const messageChannel of messageChannels) { + const userId = messageChannel.connectedAccount.accountOwner.userId; + const connectedAccountId = messageChannel.connectedAccount.id; + + await this.accountsToReconnectService.addAccountToReconnectByKey( + AccountsToReconnectKeys.ACCOUNTS_TO_RECONNECT_INSUFFICIENT_PERMISSIONS, + userId, + workspaceId, + connectedAccountId, + ); } - - const userId = messageChannel.connectedAccount.accountOwner.userId; - const connectedAccountId = messageChannel.connectedAccount.id; - - await this.accountsToReconnectService.addAccountToReconnectByKey( - AccountsToReconnectKeys.ACCOUNTS_TO_RECONNECT_INSUFFICIENT_PERMISSIONS, - userId, - workspaceId, - connectedAccountId, - ); } } diff --git a/packages/twenty-server/src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service.ts b/packages/twenty-server/src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service.ts index bb9524b41..751504050 100644 --- a/packages/twenty-server/src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-cleaner/services/messaging-message-cleaner.service.ts @@ -1,6 +1,6 @@ import { Injectable } from '@nestjs/common'; -import { EntityManager } from 'typeorm'; +import { EntityManager, IsNull } from 'typeorm'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { MessageThreadWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-thread.workspace-entity'; @@ -22,67 +22,77 @@ export class MessagingMessageCleanerService { 'message', ); - await deleteUsingPagination( - workspaceId, - 500, - async ( - limit: number, - offset: number, - workspaceId: string, - transactionManager?: EntityManager, - ) => { - const nonAssociatedMessages = await messageRepository.find( - { - where: { - messageChannelMessageAssociations: [], + const workspaceDataSource = await this.twentyORMManager.getDatasource(); + + await workspaceDataSource.transaction(async (transactionManager) => { + await deleteUsingPagination( + workspaceId, + 500, + async ( + limit: number, + offset: number, + workspaceId: string, + transactionManager: EntityManager, + ) => { + const nonAssociatedMessages = await messageRepository.find( + { + where: { + messageChannelMessageAssociations: { + id: IsNull(), + }, + }, + take: limit, + skip: offset, + relations: ['messageChannelMessageAssociations'], }, - take: limit, - skip: offset, - relations: ['messageChannelMessageAssociations'], - }, - transactionManager, - ); + transactionManager, + ); - return nonAssociatedMessages.map(({ id }) => id); - }, - async ( - ids: string[], - workspaceId: string, - transactionManager?: EntityManager, - ) => { - await messageRepository.delete(ids, transactionManager); - }, - ); + return nonAssociatedMessages.map(({ id }) => id); + }, + async ( + ids: string[], + workspaceId: string, + transactionManager?: EntityManager, + ) => { + await messageRepository.delete(ids, transactionManager); + }, + transactionManager, + ); - await deleteUsingPagination( - workspaceId, - 500, - async ( - limit: number, - offset: number, - workspaceId: string, - transactionManager?: EntityManager, - ) => { - const orphanThreads = await messageThreadRepository.find( - { - where: { - messages: [], + await deleteUsingPagination( + workspaceId, + 500, + async ( + limit: number, + offset: number, + workspaceId: string, + transactionManager?: EntityManager, + ) => { + const orphanThreads = await messageThreadRepository.find( + { + where: { + messages: { + id: IsNull(), + }, + }, + take: limit, + skip: offset, }, - take: limit, - skip: offset, - }, - transactionManager, - ); + transactionManager, + ); - return orphanThreads.map(({ id }) => id); - }, - async ( - ids: string[], - workspaceId: string, - transactionManager?: EntityManager, - ) => { - await messageThreadRepository.delete(ids, transactionManager); - }, - ); + return orphanThreads.map(({ id }) => id); + }, + async ( + ids: string[], + workspaceId: string, + transactionManager?: EntityManager, + ) => { + await messageThreadRepository.delete(ids, transactionManager); + }, + transactionManager, + ); + }); } } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts index 20d23e71a..6f5166d26 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job.ts @@ -55,20 +55,20 @@ export class MessagingOngoingStaleJob { `Sync for message channel ${messageChannel.id} and workspace ${workspaceId} is stale. Setting sync stage to MESSAGES_IMPORT_PENDING`, ); - await this.messageChannelSyncStatusService.resetSyncStageStartedAt( + await this.messageChannelSyncStatusService.resetSyncStageStartedAt([ messageChannel.id, - ); + ]); switch (messageChannel.syncStage) { case MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING: await this.messageChannelSyncStatusService.schedulePartialMessageListFetch( - messageChannel.id, + [messageChannel.id], ); break; case MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING: - await this.messageChannelSyncStatusService.scheduleMessagesImport( + await this.messageChannelSyncStatusService.scheduleMessagesImport([ messageChannel.id, - ); + ]); break; default: break; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/message-import-exception-handler.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/message-import-exception-handler.service.ts index 55cff5f16..6e0dd24fb 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/message-import-exception-handler.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/message-import-exception-handler.service.ts @@ -79,7 +79,7 @@ export class MessageImportExceptionHandlerService { ): Promise { if (messageChannel.throttleFailureCount >= CALENDAR_THROTTLE_MAX_ATTEMPTS) { await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( - messageChannel.id, + [messageChannel.id], workspaceId, ); @@ -92,9 +92,7 @@ export class MessageImportExceptionHandlerService { ); await messageChannelRepository.increment( - { - id: messageChannel.id, - }, + { id: messageChannel.id }, 'throttleFailureCount', 1, ); @@ -102,20 +100,20 @@ export class MessageImportExceptionHandlerService { switch (syncStep) { case MessageImportSyncStep.FULL_MESSAGE_LIST_FETCH: await this.messageChannelSyncStatusService.scheduleFullMessageListFetch( - messageChannel.id, + [messageChannel.id], ); break; case MessageImportSyncStep.PARTIAL_MESSAGE_LIST_FETCH: await this.messageChannelSyncStatusService.schedulePartialMessageListFetch( - messageChannel.id, + [messageChannel.id], ); break; case MessageImportSyncStep.MESSAGES_IMPORT: - await this.messageChannelSyncStatusService.scheduleMessagesImport( + await this.messageChannelSyncStatusService.scheduleMessagesImport([ messageChannel.id, - ); + ]); break; default: @@ -128,7 +126,7 @@ export class MessageImportExceptionHandlerService { workspaceId: string, ): Promise { await this.messageChannelSyncStatusService.markAsFailedInsufficientPermissionsAndFlushMessagesToImport( - messageChannel.id, + [messageChannel.id], workspaceId, ); } @@ -139,7 +137,7 @@ export class MessageImportExceptionHandlerService { workspaceId: string, ): Promise { await this.messageChannelSyncStatusService.markAsFailedUnknownAndFlushMessagesToImport( - messageChannel.id, + [messageChannel.id], workspaceId, ); @@ -159,7 +157,7 @@ export class MessageImportExceptionHandlerService { } await this.messageChannelSyncStatusService.resetAndScheduleFullMessageListFetch( - messageChannel.id, + [messageChannel.id], workspaceId, ); } diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts index fbb0d3919..549e90ad0 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service.ts @@ -34,7 +34,7 @@ export class MessagingFullMessageListFetchService { ) { try { await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing( - messageChannel.id, + [messageChannel.id], ); const { messageExternalIds, nextSyncCursor } = @@ -95,9 +95,9 @@ export class MessagingFullMessageListFetchService { }, ); - await this.messageChannelSyncStatusService.scheduleMessagesImport( + await this.messageChannelSyncStatusService.scheduleMessagesImport([ messageChannel.id, - ); + ]); } catch (error) { await this.messageImportErrorHandlerService.handleDriverException( error, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts index dcb590d08..d13e80f03 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-messages-import.service.ts @@ -1,7 +1,5 @@ import { Injectable, Logger } from '@nestjs/common'; -import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; -import { FeatureFlagService } from 'src/engine/core-modules/feature-flag/services/feature-flag.service'; import { CacheStorageService } from 'src/engine/integrations/cache-storage/cache-storage.service'; import { InjectCacheStorage } from 'src/engine/integrations/cache-storage/decorators/cache-storage.decorator'; import { CacheStorageNamespace } from 'src/engine/integrations/cache-storage/types/cache-storage-namespace.enum'; @@ -44,7 +42,6 @@ export class MessagingMessagesImportService { @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) private readonly blocklistRepository: BlocklistRepository, private readonly emailAliasManagerService: EmailAliasManagerService, - private readonly isFeatureEnabledService: FeatureFlagService, private readonly twentyORMManager: TwentyORMManager, private readonly messagingGetMessagesService: MessagingGetMessagesService, private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService, @@ -76,9 +73,9 @@ export class MessagingMessagesImportService { `Messaging import for workspace ${workspaceId} and account ${connectedAccount.id} starting...`, ); - await this.messageChannelSyncStatusService.markAsMessagesImportOngoing( + await this.messageChannelSyncStatusService.markAsMessagesImportOngoing([ messageChannel.id, - ); + ]); try { connectedAccount.accessToken = @@ -111,17 +108,10 @@ export class MessagingMessagesImportService { } } - if ( - await this.isFeatureEnabledService.isFeatureEnabled( - FeatureFlagKey.IsMessagingAliasFetchingEnabled, - workspaceId, - ) - ) { - await this.emailAliasManagerService.refreshHandleAliases( - connectedAccount, - workspaceId, - ); - } + await this.emailAliasManagerService.refreshHandleAliases( + connectedAccount, + workspaceId, + ); messageIdsToFetch = await this.cacheStorage.setPop( `messages-to-import:${workspaceId}:gmail:${messageChannel.id}`, @@ -130,7 +120,7 @@ export class MessagingMessagesImportService { if (!messageIdsToFetch?.length) { await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( - messageChannel.id, + [messageChannel.id], ); return await this.trackMessageImportCompleted( @@ -151,7 +141,7 @@ export class MessagingMessagesImportService { ); const messagesToSave = filterEmails( - messageChannel.handle, + [messageChannel.handle, ...connectedAccount.handleAliases.split(',')], allMessages, blocklist.map((blocklistItem) => blocklistItem.handle), ); @@ -167,12 +157,12 @@ export class MessagingMessagesImportService { messageIdsToFetch.length < MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE ) { await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( - messageChannel.id, + [messageChannel.id], ); } else { - await this.messageChannelSyncStatusService.scheduleMessagesImport( + await this.messageChannelSyncStatusService.scheduleMessagesImport([ messageChannel.id, - ); + ]); } const messageChannelRepository = diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts index e1ec39c77..d7bded110 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service.ts @@ -38,7 +38,7 @@ export class MessagingPartialMessageListFetchService { ): Promise { try { await this.messageChannelSyncStatusService.markAsMessagesListFetchOngoing( - messageChannel.id, + [messageChannel.id], ); const messageChannelRepository = @@ -70,7 +70,7 @@ export class MessagingPartialMessageListFetchService { ); await this.messageChannelSyncStatusService.markAsCompletedAndSchedulePartialMessageListFetch( - messageChannel.id, + [messageChannel.id], ); return; @@ -110,9 +110,9 @@ export class MessagingPartialMessageListFetchService { ); } - await this.messageChannelSyncStatusService.scheduleMessagesImport( + await this.messageChannelSyncStatusService.scheduleMessagesImport([ messageChannel.id, - ); + ]); } catch (error) { await this.messageImportErrorHandlerService.handleDriverException( error, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts index 6641ae0a6..4278fab5c 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/utils/filter-emails.util.ts @@ -3,19 +3,19 @@ import { MessageWithParticipants } from 'src/modules/messaging/message-import-ma // Todo: refactor this into several utils export const filterEmails = ( - messageChannelHandle: string, + messageChannelHandles: string[], messages: MessageWithParticipants[], blocklist: string[], ) => { return filterOutBlocklistedMessages( - messageChannelHandle, + messageChannelHandles, filterOutIcsAttachments(messages), blocklist, ); }; const filterOutBlocklistedMessages = ( - messageChannelHandle: string, + messageChannelHandles: string[], messages: MessageWithParticipants[], blocklist: string[], ) => { @@ -27,7 +27,7 @@ const filterOutBlocklistedMessages = ( return message.participants.every( (participant) => !isEmailBlocklisted( - messageChannelHandle, + messageChannelHandles, participant.handle, blocklist, ),