feat: IMAP Driver Integration (#12576)

### Added IMAP integration 

This PR adds support for connecting email accounts via IMAP protocol,
allowing users to sync their emails without OAuth.

#### DB Changes:
- Added customConnectionParams and connectionType fields to
ConnectedAccountWorkspaceEntity

#### UI:
- Added settings pages for creating and editing IMAP connections with
proper validation and connection testing.
- Implemented reconnection flows for handling permission issues.

#### Backend:
- Built ImapConnectionModule with corresponding resolver and service for
managing IMAP connections.
- Created MessagingIMAPDriverModule to handle IMAP client operations,
message fetching/parsing, and error handling.

#### Dependencies:
Integrated `imapflow` and `mailparser` libraries with their type
definitions to handle the IMAP protocol communication.

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: Félix Malfait <felix.malfait@gmail.com>
Co-authored-by: Félix Malfait <felix@twenty.com>
This commit is contained in:
neo773
2025-06-30 01:02:15 +05:30
committed by GitHub
parent 3c5595e4ff
commit 7c8d362772
80 changed files with 3588 additions and 113 deletions

View File

@ -96,6 +96,7 @@ describe('ClientConfigController', () => {
isGoogleMessagingEnabled: false,
isGoogleCalendarEnabled: false,
isConfigVariablesInDbEnabled: false,
isIMAPMessagingEnabled: false,
calendarBookingPageId: undefined,
};

View File

@ -177,6 +177,9 @@ export class ClientConfig {
@Field(() => Boolean)
isConfigVariablesInDbEnabled: boolean;
@Field(() => Boolean)
isIMAPMessagingEnabled: boolean;
@Field(() => String, { nullable: true })
calendarBookingPageId?: string;
}

View File

@ -137,6 +137,9 @@ export class ClientConfigService {
isConfigVariablesInDbEnabled: this.twentyConfigService.get(
'IS_CONFIG_VARIABLES_IN_DB_ENABLED',
),
isIMAPMessagingEnabled: this.twentyConfigService.get(
'MESSAGING_PROVIDER_IMAP_ENABLED',
),
calendarBookingPageId: this.twentyConfigService.get(
'CALENDAR_BOOKING_PAGE_ID',
),

View File

@ -23,6 +23,7 @@ import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-
import { FileStorageModule } from 'src/engine/core-modules/file-storage/file-storage.module';
import { FileStorageService } from 'src/engine/core-modules/file-storage/file-storage.service';
import { HealthModule } from 'src/engine/core-modules/health/health.module';
import { ImapSmtpCaldavModule } from 'src/engine/core-modules/imap-smtp-caldav-connection/imap-smtp-caldav-connection.module';
import { LabModule } from 'src/engine/core-modules/lab/lab.module';
import { LoggerModule } from 'src/engine/core-modules/logger/logger.module';
import { loggerModuleFactory } from 'src/engine/core-modules/logger/logger.module-factory';
@ -83,6 +84,7 @@ import { FileModule } from './file/file.module';
RedisClientModule,
WorkspaceQueryRunnerModule,
SubscriptionsModule,
ImapSmtpCaldavModule,
FileStorageModule.forRoot(),
LoggerModule.forRootAsync({
useFactory: loggerModuleFactory,
@ -125,6 +127,7 @@ import { FileModule } from './file/file.module';
WorkspaceModule,
WorkspaceInvitationModule,
WorkspaceSSOModule,
ImapSmtpCaldavModule,
],
})
export class CoreEngineModule {}

View File

@ -12,6 +12,15 @@ export type PublicFeatureFlag = {
};
export const PUBLIC_FEATURE_FLAGS: PublicFeatureFlag[] = [
{
key: FeatureFlagKey.IS_IMAP_ENABLED,
metadata: {
label: 'IMAP',
description:
'Easily add email accounts from any provider that supports IMAP (and soon, send emails with SMTP)',
imagePath: 'https://twenty.com/images/lab/is-imap-enabled.png',
},
},
...(process.env.CLOUDFLARE_API_KEY
? [
// {

View File

@ -5,4 +5,5 @@ export enum FeatureFlagKey {
IS_UNIQUE_INDEXES_ENABLED = 'IS_UNIQUE_INDEXES_ENABLED',
IS_JSON_FILTER_ENABLED = 'IS_JSON_FILTER_ENABLED',
IS_AI_ENABLED = 'IS_AI_ENABLED',
IS_IMAP_ENABLED = 'IS_IMAP_ENABLED',
}

View File

@ -0,0 +1,23 @@
import { Field, ObjectType } from '@nestjs/graphql';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { ImapSmtpCaldavConnectionParameters } from 'src/engine/core-modules/imap-smtp-caldav-connection/dtos/imap-smtp-caldav-connection.dto';
@ObjectType()
export class ConnectedImapSmtpCaldavAccount {
@Field(() => String)
id: string;
@Field(() => String)
handle: string;
@Field(() => String)
provider: ConnectedAccountProvider;
@Field(() => String)
accountOwnerId: string;
@Field(() => ImapSmtpCaldavConnectionParameters, { nullable: true })
connectionParameters: ImapSmtpCaldavConnectionParameters | null;
}

View File

@ -0,0 +1,7 @@
import { Field, ObjectType } from '@nestjs/graphql';
@ObjectType()
export class ImapSmtpCaldavConnectionSuccess {
@Field(() => Boolean)
success: boolean;
}

View File

@ -0,0 +1,60 @@
import { Field, InputType, ObjectType } from '@nestjs/graphql';
@InputType()
export class AccountType {
@Field(() => String)
type: 'IMAP' | 'SMTP' | 'CALDAV';
}
@InputType()
export class ConnectionParameters {
@Field(() => String)
host: string;
@Field(() => Number)
port: number;
@Field(() => String)
username: string;
/**
* Note: This field is stored in plain text in the database.
* While encrypting it could provide an extra layer of defense, we have decided not to,
* as database access implies a broader compromise. For context, see discussion in PR #12576.
*/
@Field(() => String)
password: string;
@Field(() => Boolean, { nullable: true })
secure?: boolean;
}
@ObjectType()
export class ConnectionParametersOutput {
@Field(() => String)
host: string;
@Field(() => Number)
port: number;
@Field(() => String)
username: string;
@Field(() => String)
password: string;
@Field(() => Boolean, { nullable: true })
secure?: boolean;
}
@ObjectType()
export class ImapSmtpCaldavConnectionParameters {
@Field(() => ConnectionParametersOutput, { nullable: true })
IMAP?: ConnectionParametersOutput;
@Field(() => ConnectionParametersOutput, { nullable: true })
SMTP?: ConnectionParametersOutput;
@Field(() => ConnectionParametersOutput, { nullable: true })
CALDAV?: ConnectionParametersOutput;
}

View File

@ -0,0 +1,32 @@
import { Module } from '@nestjs/common';
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
import { ImapSmtpCaldavValidatorModule } from 'src/engine/core-modules/imap-smtp-caldav-connection/services/imap-smtp-caldav-connection-validator.module';
import { MessageQueueModule } from 'src/engine/core-modules/message-queue/message-queue.module';
import { PermissionsModule } from 'src/engine/metadata-modules/permissions/permissions.module';
import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module';
import { ConnectedAccountModule } from 'src/modules/connected-account/connected-account.module';
import { IMAPAPIsModule } from 'src/modules/connected-account/imap-api/imap-apis.module';
import { MessagingIMAPDriverModule } from 'src/modules/messaging/message-import-manager/drivers/imap/messaging-imap-driver.module';
import { MessagingImportManagerModule } from 'src/modules/messaging/message-import-manager/messaging-import-manager.module';
import { ImapSmtpCaldavResolver } from './imap-smtp-caldav-connection.resolver';
import { ImapSmtpCaldavService } from './services/imap-smtp-caldav-connection.service';
@Module({
imports: [
ConnectedAccountModule,
MessagingIMAPDriverModule,
IMAPAPIsModule,
MessagingImportManagerModule,
MessageQueueModule,
TwentyORMModule,
FeatureFlagModule,
ImapSmtpCaldavValidatorModule,
PermissionsModule,
],
providers: [ImapSmtpCaldavResolver, ImapSmtpCaldavService],
exports: [ImapSmtpCaldavService],
})
export class ImapSmtpCaldavModule {}

View File

@ -0,0 +1,139 @@
import { UseFilters, UseGuards, UsePipes } from '@nestjs/common';
import { Args, Mutation, Query, Resolver } from '@nestjs/graphql';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { AuthGraphqlApiExceptionFilter } from 'src/engine/core-modules/auth/filters/auth-graphql-api-exception.filter';
import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum';
import { FeatureFlagService } from 'src/engine/core-modules/feature-flag/services/feature-flag.service';
import { ResolverValidationPipe } from 'src/engine/core-modules/graphql/pipes/resolver-validation.pipe';
import { UserInputError } from 'src/engine/core-modules/graphql/utils/graphql-errors.util';
import { ConnectedImapSmtpCaldavAccount } from 'src/engine/core-modules/imap-smtp-caldav-connection/dtos/imap-smtp-caldav-connected-account.dto';
import { ImapSmtpCaldavConnectionSuccess } from 'src/engine/core-modules/imap-smtp-caldav-connection/dtos/imap-smtp-caldav-connection-success.dto';
import {
AccountType,
ConnectionParameters,
} from 'src/engine/core-modules/imap-smtp-caldav-connection/dtos/imap-smtp-caldav-connection.dto';
import { ImapSmtpCaldavValidatorService } from 'src/engine/core-modules/imap-smtp-caldav-connection/services/imap-smtp-caldav-connection-validator.service';
import { ImapSmtpCaldavService } from 'src/engine/core-modules/imap-smtp-caldav-connection/services/imap-smtp-caldav-connection.service';
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator';
import { SettingsPermissionsGuard } from 'src/engine/guards/settings-permissions.guard';
import { WorkspaceAuthGuard } from 'src/engine/guards/workspace-auth.guard';
import { SettingPermissionType } from 'src/engine/metadata-modules/permissions/constants/setting-permission-type.constants';
import { PermissionsGraphqlApiExceptionFilter } from 'src/engine/metadata-modules/permissions/utils/permissions-graphql-api-exception.filter';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { ImapSmtpCalDavAPIService } from 'src/modules/connected-account/services/imap-smtp-caldav-apis.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@Resolver()
@UsePipes(ResolverValidationPipe)
@UseFilters(AuthGraphqlApiExceptionFilter, PermissionsGraphqlApiExceptionFilter)
@UseGuards(SettingsPermissionsGuard(SettingPermissionType.WORKSPACE))
export class ImapSmtpCaldavResolver {
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly ImapSmtpCaldavConnectionService: ImapSmtpCaldavService,
private readonly imapSmtpCaldavApisService: ImapSmtpCalDavAPIService,
private readonly featureFlagService: FeatureFlagService,
private readonly mailConnectionValidatorService: ImapSmtpCaldavValidatorService,
) {}
private async checkIfFeatureEnabled(
workspaceId: string,
accountType: AccountType,
): Promise<void> {
if (accountType.type === 'IMAP') {
const isImapEnabled = await this.featureFlagService.isFeatureEnabled(
FeatureFlagKey.IS_IMAP_ENABLED,
workspaceId,
);
if (!isImapEnabled) {
throw new UserInputError(
'IMAP feature is not enabled for this workspace',
);
}
}
if (accountType.type === 'SMTP') {
throw new UserInputError(
'SMTP feature is not enabled for this workspace',
);
}
if (accountType.type === 'CALDAV') {
throw new UserInputError(
'CALDAV feature is not enabled for this workspace',
);
}
}
@Query(() => ConnectedImapSmtpCaldavAccount)
@UseGuards(WorkspaceAuthGuard)
async getConnectedImapSmtpCaldavAccount(
@Args('id') id: string,
@AuthWorkspace() workspace: Workspace,
): Promise<ConnectedImapSmtpCaldavAccount> {
const connectedAccountRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<ConnectedAccountWorkspaceEntity>(
workspace.id,
'connectedAccount',
);
const connectedAccount = await connectedAccountRepository.findOne({
where: { id, provider: ConnectedAccountProvider.IMAP_SMTP_CALDAV },
});
if (!connectedAccount) {
throw new UserInputError(
`Connected mail account with ID ${id} not found`,
);
}
return {
id: connectedAccount.id,
handle: connectedAccount.handle,
provider: connectedAccount.provider,
connectionParameters: connectedAccount.connectionParameters,
accountOwnerId: connectedAccount.accountOwnerId,
};
}
@Mutation(() => ImapSmtpCaldavConnectionSuccess)
@UseGuards(WorkspaceAuthGuard)
async saveImapSmtpCaldav(
@Args('accountOwnerId') accountOwnerId: string,
@Args('handle') handle: string,
@Args('accountType') accountType: AccountType,
@Args('connectionParameters')
connectionParameters: ConnectionParameters,
@AuthWorkspace() workspace: Workspace,
@Args('id', { nullable: true }) id?: string,
): Promise<ImapSmtpCaldavConnectionSuccess> {
await this.checkIfFeatureEnabled(workspace.id, accountType);
const validatedParams =
this.mailConnectionValidatorService.validateProtocolConnectionParams(
connectionParameters,
);
await this.ImapSmtpCaldavConnectionService.testImapSmtpCaldav(
validatedParams,
accountType.type,
);
await this.imapSmtpCaldavApisService.setupConnectedAccount({
handle,
workspaceMemberId: accountOwnerId,
workspaceId: workspace.id,
connectionParams: validatedParams,
accountType: accountType.type,
connectedAccountId: id,
});
return {
success: true,
};
}
}

View File

@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { ImapSmtpCaldavValidatorService } from './imap-smtp-caldav-connection-validator.service';
@Module({
providers: [ImapSmtpCaldavValidatorService],
exports: [ImapSmtpCaldavValidatorService],
})
export class ImapSmtpCaldavValidatorModule {}

View File

@ -0,0 +1,41 @@
import { Injectable } from '@nestjs/common';
import { z } from 'zod';
import { UserInputError } from 'src/engine/core-modules/graphql/utils/graphql-errors.util';
import { ConnectionParameters } from 'src/engine/core-modules/imap-smtp-caldav-connection/types/imap-smtp-caldav-connection.type';
@Injectable()
export class ImapSmtpCaldavValidatorService {
private readonly protocolConnectionSchema = z.object({
host: z.string().min(1, 'Host is required'),
port: z.number().int().positive('Port must be a positive number'),
username: z.string().min(1, 'Username is required'),
password: z.string().min(1, 'Password is required'),
secure: z.boolean().optional(),
});
validateProtocolConnectionParams(
params: ConnectionParameters,
): ConnectionParameters {
if (!params) {
throw new UserInputError('Protocol connection parameters are required');
}
try {
return this.protocolConnectionSchema.parse(params);
} catch (error) {
if (error instanceof z.ZodError) {
const errorMessages = error.errors
.map((err) => `${err.path.join('.')}: ${err.message}`)
.join(', ');
throw new UserInputError(
`Protocol connection validation failed: ${errorMessages}`,
);
}
throw new UserInputError('Protocol connection validation failed');
}
}
}

View File

@ -0,0 +1,129 @@
import { Injectable, Logger } from '@nestjs/common';
import { ImapFlow } from 'imapflow';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { UserInputError } from 'src/engine/core-modules/graphql/utils/graphql-errors.util';
import {
AccountType,
ConnectionParameters,
} from 'src/engine/core-modules/imap-smtp-caldav-connection/types/imap-smtp-caldav-connection.type';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@Injectable()
export class ImapSmtpCaldavService {
private readonly logger = new Logger(ImapSmtpCaldavService.name);
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
async testImapConnection(params: ConnectionParameters): Promise<boolean> {
if (!params.host || !params.username || !params.password) {
throw new UserInputError('Missing required IMAP connection parameters');
}
const client = new ImapFlow({
host: params.host,
port: params.port,
secure: params.secure ?? true,
auth: {
user: params.username,
pass: params.password,
},
logger: false,
tls: {
rejectUnauthorized: false,
},
});
try {
await client.connect();
const mailboxes = await client.list();
this.logger.log(
`IMAP connection successful. Found ${mailboxes.length} mailboxes.`,
);
return true;
} catch (error) {
this.logger.error(
`IMAP connection failed: ${error.message}`,
error.stack,
);
if (error.authenticationFailed) {
throw new UserInputError(
'IMAP authentication failed. Please check your credentials.',
);
}
if (error.code === 'ECONNREFUSED') {
throw new UserInputError(
`IMAP connection refused. Please verify server and port.`,
);
}
throw new UserInputError(`IMAP connection failed: ${error.message}`);
} finally {
if (client.authenticated) {
await client.logout();
}
}
}
async testSmtpConnection(params: ConnectionParameters): Promise<boolean> {
this.logger.log('SMTP connection testing not yet implemented', params);
return true;
}
async testCaldavConnection(params: ConnectionParameters): Promise<boolean> {
this.logger.log('CALDAV connection testing not yet implemented', params);
return true;
}
async testImapSmtpCaldav(
params: ConnectionParameters,
accountType: AccountType,
): Promise<boolean> {
if (accountType === 'IMAP') {
return this.testImapConnection(params);
}
if (accountType === 'SMTP') {
return this.testSmtpConnection(params);
}
if (accountType === 'CALDAV') {
return this.testCaldavConnection(params);
}
throw new UserInputError(
'Invalid account type. Must be one of: IMAP, SMTP, CALDAV',
);
}
async getImapSmtpCaldav(
workspaceId: string,
connectionId: string,
): Promise<ConnectedAccountWorkspaceEntity | null> {
const connectedAccountRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<ConnectedAccountWorkspaceEntity>(
workspaceId,
'connectedAccount',
);
const connectedAccount = await connectedAccountRepository.findOne({
where: {
id: connectionId,
provider: ConnectedAccountProvider.IMAP_SMTP_CALDAV,
},
});
return connectedAccount;
}
}

View File

@ -0,0 +1,16 @@
export type ConnectionParameters = {
host: string;
port: number;
username: string;
password: string;
secure?: boolean;
};
export type AccountType = 'IMAP' | 'SMTP' | 'CALDAV';
export type ImapSmtpCaldavParams = {
handle: string;
IMAP?: ConnectionParameters;
SMTP?: ConnectionParameters;
CALDAV?: ConnectionParameters;
};

View File

@ -143,6 +143,13 @@ export class ConfigVariables {
})
MESSAGING_PROVIDER_GMAIL_ENABLED = false;
@ConfigVariablesMetadata({
group: ConfigVariablesGroup.Other,
description: 'Enable or disable the IMAP messaging integration',
type: ConfigVariableType.BOOLEAN,
})
MESSAGING_PROVIDER_IMAP_ENABLED = false;
@ConfigVariablesMetadata({
group: ConfigVariablesGroup.MicrosoftAuth,
description: 'Enable or disable Microsoft authentication',

View File

@ -40,6 +40,11 @@ export const seedFeatureFlags = async (
workspaceId: workspaceId,
value: true,
},
{
key: FeatureFlagKey.IS_IMAP_ENABLED,
workspaceId: workspaceId,
value: true,
},
])
.execute();
};

View File

@ -155,6 +155,7 @@ export const CONNECTED_ACCOUNT_STANDARD_FIELD_IDS = {
calendarChannels: '20202020-af4a-47bb-99ec-51911c1d3977',
handleAliases: '20202020-8a3d-46be-814f-6228af16c47b',
scopes: '20202020-8a3d-46be-814f-6228af16c47c',
connectionParameters: '20202020-a1b2-46be-814f-6228af16c481',
};
export const EVENT_STANDARD_FIELD_IDS = {

View File

@ -1,7 +1,7 @@
import { Injectable } from '@nestjs/common';
import { assertUnreachable } from 'twenty-shared/utils';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { assertUnreachable } from 'twenty-shared/utils';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { GoogleEmailAliasManagerService } from 'src/modules/connected-account/email-alias-manager/drivers/google/google-email-alias-manager.service';
@ -34,6 +34,10 @@ export class EmailAliasManagerService {
connectedAccount,
);
break;
case ConnectedAccountProvider.IMAP_SMTP_CALDAV:
// IMAP Protocol does not support email aliases
handleAliases = [];
break;
default:
assertUnreachable(
connectedAccount.provider,

View File

@ -0,0 +1,24 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
import { MessageQueueModule } from 'src/engine/core-modules/message-queue/message-queue.module';
import { TwentyConfigModule } from 'src/engine/core-modules/twenty-config/twenty-config.module';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { TwentyORMModule } from 'src/engine/twenty-orm/twenty-orm.module';
import { WorkspaceEventEmitterModule } from 'src/engine/workspace-event-emitter/workspace-event-emitter.module';
import { ImapSmtpCalDavAPIService } from 'src/modules/connected-account/services/imap-smtp-caldav-apis.service';
@Module({
imports: [
TypeOrmModule.forFeature([ObjectMetadataEntity], 'core'),
MessageQueueModule,
WorkspaceEventEmitterModule,
TwentyConfigModule,
TwentyORMModule,
FeatureFlagModule,
],
providers: [ImapSmtpCalDavAPIService],
exports: [ImapSmtpCalDavAPIService],
})
export class IMAPAPIsModule {}

View File

@ -85,6 +85,11 @@ export class ConnectedAccountRefreshTokensService {
return await this.microsoftAPIRefreshAccessTokenService.refreshTokens(
refreshToken,
);
case ConnectedAccountProvider.IMAP_SMTP_CALDAV:
throw new ConnectedAccountRefreshAccessTokenException(
`Token refresh is not supported for IMAP provider for connected account ${connectedAccount.id} in workspace ${workspaceId}`,
ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED,
);
default:
return assertUnreachable(
connectedAccount.provider,

View File

@ -0,0 +1,248 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { Repository } from 'typeorm';
import { v4 } from 'uuid';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { FeatureFlagService } from 'src/engine/core-modules/feature-flag/services/feature-flag.service';
import {
AccountType,
ConnectionParameters,
} from 'src/engine/core-modules/imap-smtp-caldav-connection/types/imap-smtp-caldav-connection.type';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import {
MessageChannelSyncStage,
MessageChannelSyncStatus,
MessageChannelType,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessagingMessageListFetchJob,
MessagingMessageListFetchJobData,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job';
@Injectable()
export class ImapSmtpCalDavAPIService {
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
private readonly twentyConfigService: TwentyConfigService,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
@InjectRepository(ObjectMetadataEntity, 'core')
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
private readonly featureFlagService: FeatureFlagService,
) {}
async setupConnectedAccount(input: {
handle: string;
workspaceMemberId: string;
workspaceId: string;
accountType: AccountType;
connectionParams: ConnectionParameters;
connectedAccountId?: string;
}) {
const {
handle,
workspaceId,
workspaceMemberId,
connectionParams,
connectedAccountId,
} = input;
const connectedAccountRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<ConnectedAccountWorkspaceEntity>(
workspaceId,
'connectedAccount',
);
const connectedAccount = connectedAccountId
? await connectedAccountRepository.findOne({
where: { id: connectedAccountId },
})
: await connectedAccountRepository.findOne({
where: { handle, accountOwnerId: workspaceMemberId },
});
const existingAccountId = connectedAccount?.id;
const newOrExistingConnectedAccountId =
existingAccountId ?? connectedAccountId ?? v4();
const messageChannelRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>(
workspaceId,
'messageChannel',
);
const workspaceDataSource =
await this.twentyORMGlobalManager.getDataSourceForWorkspace({
workspaceId,
});
await workspaceDataSource.transaction(async () => {
if (!existingAccountId) {
const newConnectedAccount = await connectedAccountRepository.save(
{
id: newOrExistingConnectedAccountId,
handle,
provider: ConnectedAccountProvider.IMAP_SMTP_CALDAV,
connectionParameters: {
[input.accountType]: connectionParams,
},
accountOwnerId: workspaceMemberId,
},
{},
);
const connectedAccountMetadata =
await this.objectMetadataRepository.findOneOrFail({
where: { nameSingular: 'connectedAccount', workspaceId },
});
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: 'connectedAccount',
action: DatabaseEventAction.CREATED,
events: [
{
recordId: newConnectedAccount.id,
objectMetadata: connectedAccountMetadata,
properties: {
after: newConnectedAccount,
},
},
],
workspaceId,
});
const newMessageChannel = await messageChannelRepository.save(
{
id: v4(),
connectedAccountId: newOrExistingConnectedAccountId,
type: MessageChannelType.EMAIL,
handle,
syncStatus: MessageChannelSyncStatus.ONGOING,
},
{},
);
const messageChannelMetadata =
await this.objectMetadataRepository.findOneOrFail({
where: { nameSingular: 'messageChannel', workspaceId },
});
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: 'messageChannel',
action: DatabaseEventAction.CREATED,
events: [
{
recordId: newMessageChannel.id,
objectMetadata: messageChannelMetadata,
properties: {
after: newMessageChannel,
},
},
],
workspaceId,
});
} else {
const updatedConnectedAccount = await connectedAccountRepository.update(
{
id: newOrExistingConnectedAccountId,
},
{
connectionParameters: {
...connectedAccount.connectionParameters,
[input.accountType]: connectionParams,
},
},
);
const connectedAccountMetadata =
await this.objectMetadataRepository.findOneOrFail({
where: { nameSingular: 'connectedAccount', workspaceId },
});
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: 'connectedAccount',
action: DatabaseEventAction.UPDATED,
events: [
{
recordId: newOrExistingConnectedAccountId,
objectMetadata: connectedAccountMetadata,
properties: {
before: connectedAccount,
after: {
...connectedAccount,
...updatedConnectedAccount.raw[0],
},
},
},
],
workspaceId,
});
const messageChannels = await messageChannelRepository.find({
where: { connectedAccountId: newOrExistingConnectedAccountId },
});
const messageChannelUpdates = await messageChannelRepository.update(
{
connectedAccountId: newOrExistingConnectedAccountId,
},
{
syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING,
syncStatus: null,
syncCursor: '',
syncStageStartedAt: null,
},
);
const messageChannelMetadata =
await this.objectMetadataRepository.findOneOrFail({
where: { nameSingular: 'messageChannel', workspaceId },
});
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: 'messageChannel',
action: DatabaseEventAction.UPDATED,
events: messageChannels.map((messageChannel) => ({
recordId: messageChannel.id,
objectMetadata: messageChannelMetadata,
properties: {
before: messageChannel,
after: { ...messageChannel, ...messageChannelUpdates.raw[0] },
},
})),
workspaceId,
});
}
});
if (this.twentyConfigService.get('MESSAGING_PROVIDER_IMAP_ENABLED')) {
const messageChannels = await messageChannelRepository.find({
where: {
connectedAccountId: newOrExistingConnectedAccountId,
},
});
for (const messageChannel of messageChannels) {
await this.messageQueueService.add<MessagingMessageListFetchJobData>(
MessagingMessageListFetchJob.name,
{
workspaceId,
messageChannelId: messageChannel.id,
},
);
}
}
}
}

View File

@ -8,6 +8,7 @@ import { RelationOnDeleteAction } from 'src/engine/metadata-modules/field-metada
import { RelationType } from 'src/engine/metadata-modules/field-metadata/interfaces/relation-type.interface';
import { Relation } from 'src/engine/workspace-manager/workspace-sync-metadata/interfaces/relation.interface';
import { ImapSmtpCaldavParams } from 'src/engine/core-modules/imap-smtp-caldav-connection/types/imap-smtp-caldav-connection.type';
import { BaseWorkspaceEntity } from 'src/engine/twenty-orm/base.workspace-entity';
import { WorkspaceEntity } from 'src/engine/twenty-orm/decorators/workspace-entity.decorator';
import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator';
@ -107,6 +108,16 @@ export class ConnectedAccountWorkspaceEntity extends BaseWorkspaceEntity {
@WorkspaceIsNullable()
scopes: string[] | null;
@WorkspaceField({
standardId: CONNECTED_ACCOUNT_STANDARD_FIELD_IDS.connectionParameters,
type: FieldMetadataType.RAW_JSON,
label: msg`Custom Connection Parameters`,
description: msg`JSON object containing custom connection parameters`,
icon: 'IconSettings',
})
@WorkspaceIsNullable()
connectionParameters: ImapSmtpCaldavParams | null;
@WorkspaceRelation({
standardId: CONNECTED_ACCOUNT_STANDARD_FIELD_IDS.accountOwner,
type: RelationType.MANY_TO_ONE,

View File

@ -5,9 +5,9 @@ import planer from 'planer';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { computeMessageDirection } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-message-direction.util';
import { parseGmailMessage } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/parse-gmail-message.util';
import { sanitizeString } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/sanitize-string.util';
import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message';
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util';
import { sanitizeString } from 'src/modules/messaging/message-import-manager/utils/sanitize-string.util';
export const parseAndFormatGmailMessage = (
message: gmailV1.Schema$Message,

View File

@ -0,0 +1,47 @@
import { HttpModule } from '@nestjs/axios';
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { FeatureFlag } from 'src/engine/core-modules/feature-flag/feature-flag.entity';
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
import { ObjectMetadataRepositoryModule } from 'src/engine/object-metadata-repository/object-metadata-repository.module';
import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/workspace-datasource.module';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { EmailAliasManagerModule } from 'src/modules/connected-account/email-alias-manager/email-alias-manager.module';
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
import { ImapClientProvider } from 'src/modules/messaging/message-import-manager/drivers/imap/providers/imap-client.provider';
import { ImapFetchByBatchService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-fetch-by-batch.service';
import { ImapGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service';
import { ImapGetMessagesService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-messages.service';
import { ImapHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-handle-error.service';
import { ImapMessageLocatorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-locator.service';
import { ImapMessageProcessorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-processor.service';
import { MessageParticipantManagerModule } from 'src/modules/messaging/message-participant-manager/message-participant-manager.module';
@Module({
imports: [
HttpModule,
ObjectMetadataRepositoryModule.forFeature([BlocklistWorkspaceEntity]),
MessagingCommonModule,
TypeOrmModule.forFeature([FeatureFlag], 'core'),
EmailAliasManagerModule,
FeatureFlagModule,
WorkspaceDataSourceModule,
MessageParticipantManagerModule,
],
providers: [
ImapClientProvider,
ImapFetchByBatchService,
ImapGetMessagesService,
ImapGetMessageListService,
ImapHandleErrorService,
ImapMessageLocatorService,
ImapMessageProcessorService,
],
exports: [
ImapGetMessagesService,
ImapGetMessageListService,
ImapClientProvider,
],
})
export class MessagingIMAPDriverModule {}

View File

@ -0,0 +1,110 @@
import { Injectable, Logger } from '@nestjs/common';
import { ImapFlow } from 'imapflow';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { isDefined } from 'twenty-shared/utils';
import { ImapSmtpCaldavParams } from 'src/engine/core-modules/imap-smtp-caldav-connection/types/imap-smtp-caldav-connection.type';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
interface ImapClientInstance {
client: ImapFlow;
isReady: boolean;
}
@Injectable()
export class ImapClientProvider {
private readonly logger = new Logger(ImapClientProvider.name);
private readonly clientInstances = new Map<string, ImapClientInstance>();
constructor() {}
async getClient(
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'id' | 'provider' | 'connectionParameters' | 'handle'
>,
): Promise<ImapFlow> {
const cacheKey = `${connectedAccount.id}`;
if (this.clientInstances.has(cacheKey)) {
const instance = this.clientInstances.get(cacheKey);
if (instance?.isReady) {
return instance.client;
}
}
if (
connectedAccount.provider !== ConnectedAccountProvider.IMAP_SMTP_CALDAV ||
!isDefined(connectedAccount.connectionParameters?.IMAP)
) {
throw new Error('Connected account is not an IMAP provider');
}
const connectionParameters: ImapSmtpCaldavParams =
(connectedAccount.connectionParameters as unknown as ImapSmtpCaldavParams) ||
{};
const client = new ImapFlow({
host: connectionParameters.IMAP?.host || '',
port: connectionParameters.IMAP?.port || 993,
secure: connectionParameters.IMAP?.secure,
auth: {
user: connectedAccount.handle,
pass: connectionParameters.IMAP?.password || '',
},
logger: false,
tls: {
rejectUnauthorized: false,
},
});
try {
await client.connect();
this.logger.log(
`Connected to IMAP server for ${connectionParameters.handle}`,
);
try {
const mailboxes = await client.list();
this.logger.log(
`Available mailboxes for ${connectionParameters.handle}: ${mailboxes.map((m) => m.path).join(', ')}`,
);
} catch (error) {
this.logger.warn(`Failed to list mailboxes: ${error.message}`);
}
this.clientInstances.set(cacheKey, {
client,
isReady: true,
});
return client;
} catch (error) {
this.logger.error(
`Failed to connect to IMAP server: ${error.message}`,
error.stack,
);
throw error;
}
}
async closeClient(connectedAccountId: string): Promise<void> {
const cacheKey = `${connectedAccountId}`;
const instance = this.clientInstances.get(cacheKey);
if (instance?.isReady) {
try {
await instance.client.logout();
this.logger.log('Closed IMAP client');
} catch (error) {
this.logger.error(`Error closing IMAP client: ${error.message}`);
} finally {
this.clientInstances.delete(cacheKey);
}
}
}
}

View File

@ -0,0 +1,147 @@
import { Injectable, Logger } from '@nestjs/common';
import { ImapFlow } from 'imapflow';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { ImapClientProvider } from 'src/modules/messaging/message-import-manager/drivers/imap/providers/imap-client.provider';
import {
ImapMessageLocatorService,
MessageLocation,
} from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-locator.service';
import {
ImapMessageProcessorService,
MessageFetchResult,
} from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-processor.service';
type ConnectedAccount = Pick<
ConnectedAccountWorkspaceEntity,
'id' | 'provider' | 'handle' | 'handleAliases' | 'connectionParameters'
>;
type FetchAllResult = {
messageIdsByBatch: string[][];
batchResults: MessageFetchResult[][];
};
@Injectable()
export class ImapFetchByBatchService {
private readonly logger = new Logger(ImapFetchByBatchService.name);
private static readonly RETRY_ATTEMPTS = 2;
private static readonly RETRY_DELAY_MS = 1000;
private static readonly BATCH_LIMIT = 20;
constructor(
private readonly imapClientProvider: ImapClientProvider,
private readonly imapMessageLocatorService: ImapMessageLocatorService,
private readonly imapMessageProcessorService: ImapMessageProcessorService,
) {}
async fetchAllByBatches(
messageIds: string[],
connectedAccount: ConnectedAccount,
): Promise<FetchAllResult> {
const batchResults: MessageFetchResult[][] = [];
const messageIdsByBatch: string[][] = [];
this.logger.log(
`Starting optimized batch fetch for ${messageIds.length} messages`,
);
let client: ImapFlow | null = null;
try {
client = await this.imapClientProvider.getClient(connectedAccount);
const messageLocations =
await this.imapMessageLocatorService.locateAllMessages(
messageIds,
client,
);
const batches = this.chunkArray(
messageIds,
ImapFetchByBatchService.BATCH_LIMIT,
);
let processedCount = 0;
for (const batch of batches) {
const batchResult = await this.fetchBatchWithRetry(
batch,
messageLocations,
client,
);
batchResults.push(batchResult);
messageIdsByBatch.push(batch);
processedCount += batch.length;
this.logger.log(
`Fetched ${processedCount}/${messageIds.length} messages`,
);
}
return { messageIdsByBatch, batchResults };
} finally {
if (client) {
await this.imapClientProvider.closeClient(connectedAccount.id);
}
}
}
private async fetchBatchWithRetry(
messageIds: string[],
messageLocations: Map<string, MessageLocation>,
client: ImapFlow,
attempt = 1,
): Promise<MessageFetchResult[]> {
try {
return await this.imapMessageProcessorService.processMessagesByIds(
messageIds,
messageLocations,
client,
);
} catch (error) {
if (attempt < ImapFetchByBatchService.RETRY_ATTEMPTS) {
const delay = ImapFetchByBatchService.RETRY_DELAY_MS * attempt;
this.logger.warn(
`Batch fetch attempt ${attempt} failed, retrying in ${delay}ms: ${error.message}`,
);
await this.delay(delay);
return this.fetchBatchWithRetry(
messageIds,
messageLocations,
client,
attempt + 1,
);
}
this.logger.error(
`Batch fetch failed after ${ImapFetchByBatchService.RETRY_ATTEMPTS} attempts: ${error.message}`,
);
return this.imapMessageProcessorService.createErrorResults(
messageIds,
error as Error,
);
}
}
private chunkArray<T>(array: T[], chunkSize: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize));
}
return chunks;
}
private delay(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}

View File

@ -0,0 +1,201 @@
import { Injectable, Logger } from '@nestjs/common';
import { ImapFlow } from 'imapflow';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { ImapClientProvider } from 'src/modules/messaging/message-import-manager/drivers/imap/providers/imap-client.provider';
import { ImapHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-handle-error.service';
import { findSentMailbox } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/find-sent-mailbox.util';
import { GetFullMessageListResponse } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
@Injectable()
export class ImapGetMessageListService {
private readonly logger = new Logger(ImapGetMessageListService.name);
constructor(
private readonly imapClientProvider: ImapClientProvider,
private readonly imapHandleErrorService: ImapHandleErrorService,
) {}
async getFullMessageList(
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'id' | 'provider' | 'connectionParameters' | 'handle'
>,
): Promise<GetFullMessageListResponse> {
try {
const client = await this.imapClientProvider.getClient(connectedAccount);
const mailboxes = ['INBOX'];
const sentFolder = await findSentMailbox(client, this.logger);
if (sentFolder) {
mailboxes.push(sentFolder);
}
let allMessages: { id: string; date: string }[] = [];
for (const mailbox of mailboxes) {
try {
const messages = await this.getMessagesFromMailbox(client, mailbox);
allMessages = [...allMessages, ...messages];
this.logger.log(
`Fetched ${messages.length} messages from ${mailbox}`,
);
} catch (error) {
this.logger.warn(
`Error fetching from mailbox ${mailbox}: ${error.message}. Continuing with other mailboxes.`,
);
}
}
allMessages.sort(
(a, b) => new Date(b.date).getTime() - new Date(a.date).getTime(),
);
const messageExternalIds = allMessages.map((message) => message.id);
const nextSyncCursor =
allMessages.length > 0 ? allMessages[allMessages.length - 1].date : '';
return {
messageExternalIds,
nextSyncCursor,
};
} catch (error) {
this.logger.error(
`Error getting message list: ${error.message}`,
error.stack,
);
this.imapHandleErrorService.handleImapMessageListFetchError(error);
return { messageExternalIds: [], nextSyncCursor: '' };
} finally {
await this.imapClientProvider.closeClient(connectedAccount.id);
}
}
async getPartialMessageList(
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'id' | 'provider' | 'connectionParameters' | 'handle'
>,
syncCursor?: string,
): Promise<{ messageExternalIds: string[]; nextSyncCursor: string }> {
try {
const client = await this.imapClientProvider.getClient(connectedAccount);
const mailboxes = ['INBOX'];
const sentFolder = await findSentMailbox(client, this.logger);
if (sentFolder) {
mailboxes.push(sentFolder);
}
let allMessages: { id: string; date: string }[] = [];
for (const mailbox of mailboxes) {
try {
const messages = await this.getMessagesFromMailbox(
client,
mailbox,
syncCursor,
);
allMessages = [...allMessages, ...messages];
this.logger.log(
`Fetched ${messages.length} messages from ${mailbox}`,
);
} catch (error) {
this.logger.warn(
`Error fetching from mailbox ${mailbox}: ${error.message}. Continuing with other mailboxes.`,
);
}
}
allMessages.sort(
(a, b) => new Date(b.date).getTime() - new Date(a.date).getTime(),
);
const messageExternalIds = allMessages.map((message) => message.id);
const nextSyncCursor =
allMessages.length > 0
? allMessages[allMessages.length - 1].date
: syncCursor || '';
return {
messageExternalIds,
nextSyncCursor,
};
} catch (error) {
this.logger.error(
`Error getting message list: ${error.message}`,
error.stack,
);
this.imapHandleErrorService.handleImapMessageListFetchError(error);
return { messageExternalIds: [], nextSyncCursor: syncCursor || '' };
} finally {
await this.imapClientProvider.closeClient(connectedAccount.id);
}
}
private async getMessagesFromMailbox(
client: ImapFlow,
mailbox: string,
cursor?: string,
): Promise<{ id: string; date: string }[]> {
let lock;
try {
lock = await client.getMailboxLock(mailbox);
let searchOptions = {};
if (cursor) {
searchOptions = {
since: new Date(cursor),
};
}
const messages: { id: string; date: string }[] = [];
for await (const message of client.fetch(searchOptions, {
envelope: true,
})) {
if (message.envelope?.messageId) {
const messageDate = message.envelope.date
? new Date(message.envelope.date)
: new Date();
const validDate = isNaN(messageDate.getTime())
? new Date()
: messageDate;
messages.push({
id: message.envelope.messageId,
date: validDate.toISOString(),
});
}
}
return messages;
} catch (error) {
this.logger.error(
`Error fetching from mailbox ${mailbox}: ${error.message}`,
error.stack,
);
return [];
} finally {
if (lock) {
lock.release();
}
}
}
}

View File

@ -0,0 +1,218 @@
import { Injectable, Logger } from '@nestjs/common';
import { AddressObject, ParsedMail } from 'mailparser';
// @ts-expect-error legacy noImplicitAny
import planer from 'planer';
import { isDefined } from 'twenty-shared/utils';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { computeMessageDirection } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/compute-message-direction.util';
import { ImapFetchByBatchService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-fetch-by-batch.service';
import { MessageFetchResult } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-processor.service';
import { EmailAddress } from 'src/modules/messaging/message-import-manager/types/email-address';
import { MessageWithParticipants } from 'src/modules/messaging/message-import-manager/types/message';
import { formatAddressObjectAsParticipants } from 'src/modules/messaging/message-import-manager/utils/format-address-object-as-participants.util';
import { sanitizeString } from 'src/modules/messaging/message-import-manager/utils/sanitize-string.util';
type AddressType = 'from' | 'to' | 'cc' | 'bcc';
@Injectable()
export class ImapGetMessagesService {
private readonly logger = new Logger(ImapGetMessagesService.name);
constructor(private readonly fetchByBatchService: ImapFetchByBatchService) {}
async getMessages(
messageIds: string[],
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'id' | 'provider' | 'handle' | 'handleAliases' | 'connectionParameters'
>,
): Promise<MessageWithParticipants[]> {
if (!messageIds.length) {
return [];
}
const { messageIdsByBatch, batchResults } =
await this.fetchByBatchService.fetchAllByBatches(
messageIds,
connectedAccount,
);
this.logger.log(`IMAP fetch completed`);
const messages = batchResults.flatMap((batchResult, index) => {
return this.formatBatchResultAsMessages(
messageIdsByBatch[index],
batchResult,
connectedAccount,
);
});
return messages;
}
private formatBatchResultAsMessages(
messageIds: string[],
batchResults: MessageFetchResult[],
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'handle' | 'handleAliases'
>,
): MessageWithParticipants[] {
const messages = batchResults.map((result) => {
if (!result.parsed) {
this.logger.debug(
`Message ${result.messageId} could not be parsed - likely not found in current mailboxes`,
);
return undefined;
}
return this.createMessageFromParsedMail(
result.parsed,
result.messageId,
connectedAccount,
);
});
return messages.filter(isDefined);
}
private createMessageFromParsedMail(
parsed: ParsedMail,
messageId: string,
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'handle' | 'handleAliases'
>,
): MessageWithParticipants {
const participants = this.extractAllParticipants(parsed);
const attachments = this.extractAttachments(parsed);
const threadId = this.extractThreadId(parsed);
const fromAddresses = this.extractAddresses(
parsed.from as AddressObject | undefined,
'from',
);
const fromHandle = fromAddresses.length > 0 ? fromAddresses[0].address : '';
const textWithoutReplyQuotations = parsed.text
? planer.extractFrom(parsed.text, 'text/plain')
: '';
const direction = computeMessageDirection(fromHandle, connectedAccount);
const text = sanitizeString(textWithoutReplyQuotations);
return {
externalId: messageId,
messageThreadExternalId: threadId || messageId,
headerMessageId: parsed.messageId || messageId,
subject: parsed.subject || '',
text: text,
receivedAt: parsed.date || new Date(),
direction: direction,
attachments,
participants,
};
}
private extractThreadId(parsed: ParsedMail): string | null {
const { messageId, references, inReplyTo } = parsed;
if (references && Array.isArray(references) && references.length > 0) {
const threadRoot = references[0].trim();
if (threadRoot && threadRoot.length > 0) {
return this.normalizeMessageId(threadRoot);
}
}
if (inReplyTo) {
const cleanInReplyTo =
typeof inReplyTo === 'string'
? inReplyTo.trim()
: String(inReplyTo).trim();
if (cleanInReplyTo && cleanInReplyTo.length > 0) {
return this.normalizeMessageId(cleanInReplyTo);
}
}
if (messageId) {
return this.normalizeMessageId(messageId);
}
const timestamp = Date.now();
const randomSuffix = Math.random().toString(36).substring(2, 11);
return `thread-${timestamp}-${randomSuffix}`;
}
private normalizeMessageId(messageId: string): string {
const trimmedMessageId = messageId.trim();
if (
trimmedMessageId.includes('@') &&
!trimmedMessageId.startsWith('<') &&
!trimmedMessageId.endsWith('>')
) {
return `<${trimmedMessageId}>`;
}
return trimmedMessageId;
}
private extractAllParticipants(parsed: ParsedMail) {
const fromAddresses = this.extractAddresses(
parsed.from as AddressObject | undefined,
'from',
);
const toAddresses = this.extractAddresses(
parsed.to as AddressObject | undefined,
'to',
);
const ccAddresses = this.extractAddresses(
parsed.cc as AddressObject | undefined,
'cc',
);
const bccAddresses = this.extractAddresses(
parsed.bcc as AddressObject | undefined,
'bcc',
);
return [
...formatAddressObjectAsParticipants(fromAddresses, 'from'),
...formatAddressObjectAsParticipants(toAddresses, 'to'),
...formatAddressObjectAsParticipants(ccAddresses, 'cc'),
...formatAddressObjectAsParticipants(bccAddresses, 'bcc'),
];
}
private extractAddresses(
addressObject: AddressObject | undefined,
_type: AddressType,
): EmailAddress[] {
const addresses: EmailAddress[] = [];
if (addressObject && 'value' in addressObject) {
for (const addr of addressObject.value) {
if (addr.address) {
addresses.push({
address: addr.address,
name: addr.name || '',
});
}
}
}
return addresses;
}
private extractAttachments(parsed: ParsedMail) {
return (parsed.attachments || []).map((attachment) => ({
filename: attachment.filename || 'unnamed-attachment',
}));
}
}

View File

@ -0,0 +1,107 @@
import { Injectable, Logger } from '@nestjs/common';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
MessageChannelSyncStatus,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { parseImapError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-imap-error.util';
import { parseImapMessageListFetchError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-imap-message-list-fetch-error.util';
import { parseImapMessagesImportError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/parse-imap-messages-import-error.util';
@Injectable()
export class ImapHandleErrorService {
private readonly logger = new Logger(ImapHandleErrorService.name);
constructor(
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
) {}
async handleError(
error: Error,
workspaceId: string,
messageChannelId: string,
): Promise<void> {
this.logger.error(
`IMAP error for message channel ${messageChannelId}: ${error.message}`,
error.stack,
);
try {
const messageChannelRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<MessageChannelWorkspaceEntity>(
workspaceId,
'messageChannel',
);
const messageChannel = await messageChannelRepository.findOneOrFail({
where: { id: messageChannelId },
});
await messageChannelRepository.update(
{ id: messageChannelId },
{
syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN,
},
);
const dataSource =
await this.twentyORMGlobalManager.getDataSourceForWorkspace({
workspaceId,
});
const messageChannelMetadata = await dataSource
.getRepository(ObjectMetadataEntity)
.findOneOrFail({
where: { nameSingular: 'messageChannel', workspaceId },
});
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: 'messageChannel',
action: DatabaseEventAction.UPDATED,
events: [
{
recordId: messageChannelId,
objectMetadata: messageChannelMetadata,
properties: {
before: { syncStatus: messageChannel.syncStatus },
after: { syncStatus: MessageChannelSyncStatus.FAILED_UNKNOWN },
},
},
],
workspaceId,
});
} catch (handleErrorError) {
this.logger.error(
`Error handling IMAP error: ${handleErrorError.message}`,
handleErrorError.stack,
);
}
}
public handleImapMessageListFetchError(error: Error): void {
const imapError = parseImapError(error);
if (imapError) {
throw imapError;
}
throw parseImapMessageListFetchError(error);
}
public handleImapMessagesImportError(
error: Error,
messageExternalId: string,
): void {
const imapError = parseImapError(error);
if (imapError) {
throw imapError;
}
throw parseImapMessagesImportError(error, messageExternalId);
}
}

View File

@ -0,0 +1,112 @@
import { Injectable, Logger } from '@nestjs/common';
import { ImapFlow } from 'imapflow';
import { findSentMailbox } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/find-sent-mailbox.util';
export type MessageLocation = {
messageId: string;
sequence: number;
mailbox: string;
};
@Injectable()
export class ImapMessageLocatorService {
private readonly logger = new Logger(ImapMessageLocatorService.name);
private static readonly IMAP_SEARCH_BATCH_SIZE = 50;
async locateAllMessages(
messageIds: string[],
client: ImapFlow,
): Promise<Map<string, MessageLocation>> {
const locations = new Map<string, MessageLocation>();
const mailboxes = await this.getMailboxesToSearch(client);
for (const mailbox of mailboxes) {
try {
const lock = await client.getMailboxLock(mailbox);
try {
const searchBatches = this.chunkArray(
messageIds.filter((id) => !locations.has(id)),
ImapMessageLocatorService.IMAP_SEARCH_BATCH_SIZE,
);
for (const batch of searchBatches) {
await this.locateMessagesInMailbox(
batch,
mailbox,
client,
locations,
);
}
} finally {
lock.release();
}
} catch (error) {
this.logger.warn(
`Error searching mailbox ${mailbox}: ${error.message}`,
);
}
}
return locations;
}
private async locateMessagesInMailbox(
messageIds: string[],
mailbox: string,
client: ImapFlow,
locations: Map<string, MessageLocation>,
): Promise<void> {
try {
const orConditions = messageIds.map((id) => ({
header: { 'message-id': id },
}));
const searchResults = await client.search({ or: orConditions });
if (searchResults.length === 0) return;
const fetchResults = client.fetch(
searchResults.map((r) => r.toString()).join(','),
{ envelope: true },
);
for await (const message of fetchResults) {
const messageId = message.envelope?.messageId;
if (messageId && messageIds.includes(messageId)) {
locations.set(messageId, {
messageId,
sequence: message.seq,
mailbox,
});
}
}
} catch (error) {
this.logger.debug(`Batch search failed in ${mailbox}: ${error.message}`);
}
}
private async getMailboxesToSearch(client: ImapFlow): Promise<string[]> {
const mailboxes = ['INBOX'];
const sentFolder = await findSentMailbox(client, this.logger);
if (sentFolder) {
mailboxes.push(sentFolder);
}
return mailboxes;
}
private chunkArray<T>(array: T[], chunkSize: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += chunkSize) {
chunks.push(array.slice(i, i + chunkSize));
}
return chunks;
}
}

View File

@ -0,0 +1,238 @@
import { Injectable, Logger } from '@nestjs/common';
import { FetchMessageObject, ImapFlow } from 'imapflow';
import { ParsedMail, simpleParser } from 'mailparser';
import { ImapHandleErrorService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-handle-error.service';
import { MessageLocation } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-message-locator.service';
export type MessageFetchResult = {
messageId: string;
parsed: ParsedMail | null;
processingTimeMs?: number;
};
@Injectable()
export class ImapMessageProcessorService {
private readonly logger = new Logger(ImapMessageProcessorService.name);
constructor(
private readonly imapHandleErrorService: ImapHandleErrorService,
) {}
async processMessagesByIds(
messageIds: string[],
messageLocations: Map<string, MessageLocation>,
client: ImapFlow,
): Promise<MessageFetchResult[]> {
if (!messageIds.length) {
return [];
}
const results: MessageFetchResult[] = [];
const messagesByMailbox = new Map<string, MessageLocation[]>();
const notFoundIds: string[] = [];
for (const messageId of messageIds) {
const location = messageLocations.get(messageId);
if (location) {
const locations = messagesByMailbox.get(location.mailbox) || [];
locations.push(location);
messagesByMailbox.set(location.mailbox, locations);
} else {
notFoundIds.push(messageId);
}
}
const fetchPromises = Array.from(messagesByMailbox.entries()).map(
([mailbox, locations]) =>
this.fetchMessagesFromMailbox(locations, client, mailbox),
);
const mailboxResults = await Promise.allSettled(fetchPromises);
for (const result of mailboxResults) {
if (result.status === 'fulfilled') {
results.push(...result.value);
} else {
this.logger.error(`Mailbox batch fetch failed: ${result.reason}`);
}
}
for (const messageId of notFoundIds) {
results.push({
messageId,
parsed: null,
processingTimeMs: 0,
});
}
return results;
}
private async fetchMessagesFromMailbox(
messageLocations: MessageLocation[],
client: ImapFlow,
mailbox: string,
): Promise<MessageFetchResult[]> {
if (!messageLocations.length) return [];
try {
const lock = await client.getMailboxLock(mailbox);
try {
return await this.fetchMessagesWithSequences(messageLocations, client);
} finally {
lock.release();
}
} catch (error) {
this.logger.error(
`Failed to fetch messages from mailbox ${mailbox}: ${error.message}`,
);
return messageLocations.map((location) =>
this.createErrorResult(location.messageId, error as Error, Date.now()),
);
}
}
private async fetchMessagesWithSequences(
messageLocations: MessageLocation[],
client: ImapFlow,
): Promise<MessageFetchResult[]> {
const startTime = Date.now();
const results: MessageFetchResult[] = [];
try {
const sequences = messageLocations.map((loc) => loc.sequence.toString());
const sequenceSet = sequences.join(',');
const fetchResults = client.fetch(sequenceSet, {
source: true,
envelope: true,
});
const messagesData = new Map<number, FetchMessageObject>();
for await (const message of fetchResults) {
messagesData.set(message.seq, message);
}
for (const location of messageLocations) {
const messageData = messagesData.get(location.sequence);
if (messageData) {
const result = await this.processMessageData(
location.messageId,
messageData,
startTime,
);
results.push(result);
} else {
results.push({
messageId: location.messageId,
parsed: null,
processingTimeMs: Date.now() - startTime,
});
}
}
} catch (error) {
this.logger.error(`Batch fetch failed: ${error.message}`);
return messageLocations.map((location) =>
this.createErrorResult(location.messageId, error as Error, startTime),
);
}
return results;
}
private async processMessageData(
messageId: string,
messageData: FetchMessageObject,
startTime: number,
): Promise<MessageFetchResult> {
try {
const rawContent = messageData.source?.toString() || '';
if (!rawContent) {
this.logger.debug(`No source content for message ${messageId}`);
return {
messageId,
parsed: null,
processingTimeMs: Date.now() - startTime,
};
}
const parsed = await this.parseMessage(rawContent, messageId);
const processingTime = Date.now() - startTime;
this.logger.debug(
`Processed message ${messageId} in ${processingTime}ms`,
);
return {
messageId,
parsed,
processingTimeMs: processingTime,
};
} catch (error) {
return this.createErrorResult(messageId, error as Error, startTime);
}
}
private async parseMessage(
rawContent: string,
messageId: string,
): Promise<ParsedMail> {
try {
return await simpleParser(rawContent);
} catch (error) {
this.logger.error(
`Failed to parse message ${messageId}: ${error.message}`,
);
throw error;
}
}
createErrorResult(
messageId: string,
error: Error,
startTime: number,
): MessageFetchResult {
const processingTime = Date.now() - startTime;
this.logger.error(`Failed to fetch message ${messageId}: ${error.message}`);
this.imapHandleErrorService.handleImapMessagesImportError(error, messageId);
return {
messageId,
parsed: null,
processingTimeMs: processingTime,
};
}
createErrorResults(messageIds: string[], error: Error): MessageFetchResult[] {
return messageIds.map((messageId) => {
this.logger.error(
`Failed to fetch message ${messageId}: ${error.message}`,
);
this.imapHandleErrorService.handleImapMessagesImportError(
error,
messageId,
);
return {
messageId,
parsed: null,
};
});
}
}

View File

@ -0,0 +1,11 @@
export interface ImapFlowError extends Error {
code?: string;
serverResponseCode?: string;
responseText?: string;
responseStatus?: string;
executedCommand?: string;
authenticationFailed?: boolean;
response?: string;
syscall?: string;
errno?: number;
}

View File

@ -0,0 +1,59 @@
import { Logger } from '@nestjs/common';
import { ImapFlow } from 'imapflow';
/**
* Find sent folder using IMAP special-use flags
*
* This function uses IMAP special-use extension (RFC 6154) to identify
* the sent folder by looking for the \Sent flag rather than relying on
* folder names which can vary across providers and locales.
*
* Falls back to regex-based detection if special-use flags are not available.
* The regex pattern is inspired by imapsync's comprehensive folder mapping.
*/
export async function findSentMailbox(
client: ImapFlow,
logger: Logger,
): Promise<string | null> {
try {
const list = await client.list();
logger.debug(
`Available folders: ${list.map((item) => item.path).join(', ')}`,
);
for (const folder of list) {
if (folder.specialUse && folder.specialUse.includes('\\Sent')) {
logger.log(`Found sent folder via special-use flag: ${folder.path}`);
return folder.path;
}
}
// Fallback: comprehensive regex pattern for legacy IMAP servers
// Source: https://imapsync.lamiral.info/FAQ.d/FAQ.Folders_Mapping.txt
// Based on imapsync's regextrans2 examples (originally "Sent|Sent Messages|Gesendet")
// Extended with additional common localizations for broader provider/language support
const sentFolderPattern =
/^(.*\/)?(sent|sent[\s_-]?(items|mail|messages|elements)?|envoy[éê]s?|[ée]l[ée]ments[\s_-]?envoy[éê]s|gesendet|gesendete[\s_-]?elemente|enviados?|elementos[\s_-]?enviados|itens[\s_-]?enviados|posta[\s_-]?inviata|inviati|보낸편지함|\[gmail\]\/sent[\s_-]?mail)$/i;
const availableFolders = list.map((item) => item.path);
for (const folder of availableFolders) {
if (sentFolderPattern.test(folder)) {
logger.log(`Found sent folder via pattern match: ${folder}`);
return folder;
}
}
logger.warn('No sent folder found. Only inbox messages will be imported.');
return null;
} catch (error) {
logger.warn(`Error listing mailboxes: ${error.message}`);
return null;
}
}

View File

@ -0,0 +1,5 @@
import { ImapFlowError } from 'src/modules/messaging/message-import-manager/drivers/imap/types/imap-error.type';
export const isImapFlowError = (error: Error): error is ImapFlowError => {
return error !== undefined && error !== null;
};

View File

@ -0,0 +1,63 @@
import {
MessageImportDriverException,
MessageImportDriverExceptionCode,
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
import { isImapFlowError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/is-imap-flow-error.util';
export const parseImapError = (
error: Error,
): MessageImportDriverException | null => {
if (!error) {
return null;
}
if (!isImapFlowError(error)) {
return null;
}
if (error.code === 'ECONNREFUSED' || error.message === 'Failed to connect') {
return new MessageImportDriverException(
`IMAP connection error: ${error.message}`,
MessageImportDriverExceptionCode.UNKNOWN_NETWORK_ERROR,
);
}
if (error.serverResponseCode) {
if (error.serverResponseCode === 'AUTHENTICATIONFAILED') {
return new MessageImportDriverException(
`IMAP authentication error: ${error.responseText || error.message}`,
MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
);
}
if (error.serverResponseCode === 'NONEXISTENT') {
return new MessageImportDriverException(
`IMAP mailbox not found: ${error.responseText || error.message}`,
MessageImportDriverExceptionCode.NOT_FOUND,
);
}
}
if (error.authenticationFailed === true) {
return new MessageImportDriverException(
`IMAP authentication error: ${error.responseText || error.message}`,
MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
);
}
if (error.message === 'Command failed' && error.responseText) {
if (error.responseText.includes('Resource temporarily unavailable')) {
return new MessageImportDriverException(
`IMAP temporary error: ${error.responseText}`,
MessageImportDriverExceptionCode.TEMPORARY_ERROR,
);
}
return new MessageImportDriverException(
`IMAP command failed: ${error.responseText}`,
MessageImportDriverExceptionCode.UNKNOWN,
);
}
return null;
};

View File

@ -0,0 +1,63 @@
import {
MessageImportDriverException,
MessageImportDriverExceptionCode,
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
import { isImapFlowError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/is-imap-flow-error.util';
export const parseImapMessageListFetchError = (
error: Error,
): MessageImportDriverException => {
if (!error) {
return new MessageImportDriverException(
'Unknown IMAP message list fetch error: No error provided',
MessageImportDriverExceptionCode.UNKNOWN,
);
}
const errorMessage = error.message || '';
if (!isImapFlowError(error)) {
return new MessageImportDriverException(
`Unknown IMAP message list fetch error: ${errorMessage}`,
MessageImportDriverExceptionCode.UNKNOWN,
);
}
if (error.responseText) {
if (
error.responseText.includes('Invalid search') ||
error.responseText.includes('invalid sequence set')
) {
return new MessageImportDriverException(
`IMAP sync cursor error: ${error.responseText}`,
MessageImportDriverExceptionCode.SYNC_CURSOR_ERROR,
);
}
if (error.responseText.includes('No matching messages')) {
return new MessageImportDriverException(
'No messages found for next sync cursor',
MessageImportDriverExceptionCode.NO_NEXT_SYNC_CURSOR,
);
}
}
if (errorMessage.includes('Invalid sequence set')) {
return new MessageImportDriverException(
`IMAP sync cursor error: ${errorMessage}`,
MessageImportDriverExceptionCode.SYNC_CURSOR_ERROR,
);
}
if (errorMessage.includes('No messages found')) {
return new MessageImportDriverException(
'No messages found for next sync cursor',
MessageImportDriverExceptionCode.NO_NEXT_SYNC_CURSOR,
);
}
return new MessageImportDriverException(
`Unknown IMAP message list fetch error: ${errorMessage}`,
MessageImportDriverExceptionCode.UNKNOWN,
);
};

View File

@ -0,0 +1,71 @@
import {
MessageImportDriverException,
MessageImportDriverExceptionCode,
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
import { isImapFlowError } from 'src/modules/messaging/message-import-manager/drivers/imap/utils/is-imap-flow-error.util';
export const parseImapMessagesImportError = (
error: Error,
messageExternalId: string,
): MessageImportDriverException => {
if (!error) {
return new MessageImportDriverException(
`Unknown IMAP message import error for message ${messageExternalId}: No error provided`,
MessageImportDriverExceptionCode.UNKNOWN,
);
}
const errorMessage = error.message || '';
if (!isImapFlowError(error)) {
return new MessageImportDriverException(
`Unknown IMAP message import error for message ${messageExternalId}: ${errorMessage}`,
MessageImportDriverExceptionCode.UNKNOWN,
);
}
if (error.responseText) {
if (error.responseText.includes('No such message')) {
return new MessageImportDriverException(
`IMAP message not found: ${messageExternalId}`,
MessageImportDriverExceptionCode.NOT_FOUND,
);
}
if (error.responseText.includes('expunged')) {
return new MessageImportDriverException(
`IMAP message no longer exists (expunged): ${messageExternalId}`,
MessageImportDriverExceptionCode.NOT_FOUND,
);
}
if (error.responseText.includes('message size exceeds')) {
return new MessageImportDriverException(
`IMAP message fetch error for message ${messageExternalId}: ${error.responseText}`,
MessageImportDriverExceptionCode.TEMPORARY_ERROR,
);
}
}
if (
errorMessage.includes('Message not found') ||
errorMessage.includes('Invalid sequence set')
) {
return new MessageImportDriverException(
`IMAP message not found: ${messageExternalId}`,
MessageImportDriverExceptionCode.NOT_FOUND,
);
}
if (errorMessage.includes('Failed to fetch message')) {
return new MessageImportDriverException(
`IMAP message fetch error for message ${messageExternalId}: ${errorMessage}`,
MessageImportDriverExceptionCode.TEMPORARY_ERROR,
);
}
return new MessageImportDriverException(
`Unknown IMAP message import error for message ${messageExternalId}: ${errorMessage}`,
MessageImportDriverExceptionCode.UNKNOWN,
);
};

View File

@ -1,20 +1,17 @@
//
import { Scope } from '@nestjs/common';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { ConnectedAccountRefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception';
import { ConnectedAccountRefreshTokensService } from 'src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service';
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessageImportDriverException,
MessageImportDriverExceptionCode,
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service';
import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service';
import {
MessageImportExceptionHandlerService,
@ -40,6 +37,7 @@ export class MessagingMessageListFetchJob {
private readonly twentyORMManager: TwentyORMManager,
private readonly connectedAccountRefreshTokensService: ConnectedAccountRefreshTokensService,
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
private readonly messagingAccountAuthenticationService: MessagingAccountAuthenticationService,
) {}
@Process(MessagingMessageListFetchJob.name)
@ -84,41 +82,10 @@ export class MessagingMessageListFetchJob {
return;
}
try {
messageChannel.connectedAccount.accessToken =
await this.connectedAccountRefreshTokensService.refreshAndSaveTokens(
messageChannel.connectedAccount,
workspaceId,
);
} catch (error) {
switch (error.code) {
case ConnectedAccountRefreshAccessTokenExceptionCode.TEMPORARY_NETWORK_ERROR:
throw new MessageImportDriverException(
error.message,
MessageImportDriverExceptionCode.TEMPORARY_ERROR,
);
case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED:
case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND:
await this.messagingMonitoringService.track({
eventName: `refresh_token.error.insufficient_permissions`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `${error.code}: ${error.reason ?? ''}`,
});
throw new MessageImportDriverException(
error.message,
MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
);
case ConnectedAccountRefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED:
throw new MessageImportDriverException(
error.message,
MessageImportDriverExceptionCode.PROVIDER_NOT_SUPPORTED,
);
default:
throw error;
}
}
await this.messagingAccountAuthenticationService.validateAndPrepareAuthentication(
messageChannel,
workspaceId,
);
switch (messageChannel.syncStage) {
case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING:

View File

@ -20,6 +20,7 @@ import { MessagingMessageListFetchCronJob } from 'src/modules/messaging/message-
import { MessagingMessagesImportCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-messages-import.cron.job';
import { MessagingOngoingStaleCronJob } from 'src/modules/messaging/message-import-manager/crons/jobs/messaging-ongoing-stale.cron.job';
import { MessagingGmailDriverModule } from 'src/modules/messaging/message-import-manager/drivers/gmail/messaging-gmail-driver.module';
import { MessagingIMAPDriverModule } from 'src/modules/messaging/message-import-manager/drivers/imap/messaging-imap-driver.module';
import { MessagingMicrosoftDriverModule } from 'src/modules/messaging/message-import-manager/drivers/microsoft/messaging-microsoft-driver.module';
import { MessagingAddSingleMessageToCacheForImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-add-single-message-to-cache-for-import.job';
import { MessagingCleanCacheJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-clean-cache';
@ -27,6 +28,7 @@ import { MessagingMessageListFetchJob } from 'src/modules/messaging/message-impo
import { MessagingMessagesImportJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job';
import { MessagingOngoingStaleJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-ongoing-stale.job';
import { MessagingMessageImportManagerMessageChannelListener } from 'src/modules/messaging/message-import-manager/listeners/messaging-import-manager-message-channel.listener';
import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service';
import { MessagingCursorService } from 'src/modules/messaging/message-import-manager/services/messaging-cursor.service';
import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service';
import { MessagingGetMessageListService } from 'src/modules/messaging/message-import-manager/services/messaging-get-message-list.service';
@ -45,6 +47,7 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess
WorkspaceDataSourceModule,
MessagingGmailDriverModule,
MessagingMicrosoftDriverModule,
MessagingIMAPDriverModule,
MessagingCommonModule,
TypeOrmModule.forFeature(
[Workspace, DataSourceEntity, ObjectMetadataEntity],
@ -82,6 +85,7 @@ import { MessagingMonitoringModule } from 'src/modules/messaging/monitoring/mess
MessageImportExceptionHandlerService,
MessagingCursorService,
MessagingSendMessageService,
MessagingAccountAuthenticationService,
],
exports: [
MessagingSendMessageService,

View File

@ -0,0 +1,155 @@
import { Injectable } from '@nestjs/common';
import { isDefined } from 'class-validator';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { ConnectedAccountRefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception';
import { ConnectedAccountRefreshTokensService } from 'src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessageImportDriverException,
MessageImportDriverExceptionCode,
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
import { MessagingMonitoringService } from 'src/modules/messaging/monitoring/services/messaging-monitoring.service';
@Injectable()
export class MessagingAccountAuthenticationService {
constructor(
private readonly connectedAccountRefreshTokensService: ConnectedAccountRefreshTokensService,
private readonly messagingMonitoringService: MessagingMonitoringService,
) {}
async validateAndPrepareAuthentication(
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
if (
messageChannel.connectedAccount.provider ===
ConnectedAccountProvider.IMAP_SMTP_CALDAV
) {
await this.validateImapCredentials(messageChannel, workspaceId);
return;
}
await this.refreshAccessTokenForNonImapProvider(
messageChannel.connectedAccount,
workspaceId,
messageChannel.id,
messageChannel.connectedAccountId,
);
}
async validateConnectedAccountAuthentication(
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
messageChannelId: string,
): Promise<void> {
if (
connectedAccount.provider === ConnectedAccountProvider.IMAP_SMTP_CALDAV &&
isDefined(connectedAccount.connectionParameters?.IMAP)
) {
await this.validateImapCredentialsForConnectedAccount(
connectedAccount,
workspaceId,
messageChannelId,
);
return;
}
await this.refreshAccessTokenForNonImapProvider(
connectedAccount,
workspaceId,
messageChannelId,
connectedAccount.id,
);
}
private async validateImapCredentialsForConnectedAccount(
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
messageChannelId: string,
): Promise<void> {
if (!connectedAccount.connectionParameters) {
await this.messagingMonitoringService.track({
eventName: 'messages_import.error.missing_imap_credentials',
workspaceId,
connectedAccountId: connectedAccount.id,
messageChannelId,
});
throw {
code: MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
message: 'Missing IMAP credentials in connectionParameters',
};
}
}
private async validateImapCredentials(
messageChannel: MessageChannelWorkspaceEntity,
workspaceId: string,
): Promise<void> {
if (
!isDefined(messageChannel.connectedAccount.connectionParameters?.IMAP)
) {
await this.messagingMonitoringService.track({
eventName: 'message_list_fetch_job.error.missing_imap_credentials',
workspaceId,
connectedAccountId: messageChannel.connectedAccount.id,
messageChannelId: messageChannel.id,
});
throw {
code: MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
message: 'Missing IMAP credentials in connectionParameters',
};
}
}
private async refreshAccessTokenForNonImapProvider(
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
messageChannelId: string,
connectedAccountId: string,
): Promise<string> {
try {
const accessToken =
await this.connectedAccountRefreshTokensService.refreshAndSaveTokens(
connectedAccount,
workspaceId,
);
return accessToken;
} catch (error) {
switch (error.code) {
case ConnectedAccountRefreshAccessTokenExceptionCode.TEMPORARY_NETWORK_ERROR:
throw new MessageImportDriverException(
error.message,
MessageImportDriverExceptionCode.TEMPORARY_ERROR,
);
case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED:
case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND:
await this.messagingMonitoringService.track({
eventName: `refresh_token.error.insufficient_permissions`,
workspaceId,
connectedAccountId,
messageChannelId,
message: `${error.code}: ${error.reason ?? ''}`,
});
throw new MessageImportDriverException(
error.message,
MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
);
case ConnectedAccountRefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED:
throw new MessageImportDriverException(
error.message,
MessageImportDriverExceptionCode.PROVIDER_NOT_SUPPORTED,
);
default:
throw error;
}
}
}
}

View File

@ -6,6 +6,7 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageFolderWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-folder.workspace-entity';
import { GmailGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-message-list.service';
import { ImapGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-message-list.service';
import { MicrosoftGetMessageListService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-message-list.service';
import {
MessageImportException,
@ -40,6 +41,7 @@ export class MessagingGetMessageListService {
constructor(
private readonly gmailGetMessageListService: GmailGetMessageListService,
private readonly microsoftGetMessageListService: MicrosoftGetMessageListService,
private readonly imapGetMessageListService: ImapGetMessageListService,
private readonly messagingCursorService: MessagingCursorService,
private readonly twentyORMManager: TwentyORMManager,
) {}
@ -78,6 +80,19 @@ export class MessagingGetMessageListService {
folders,
);
}
case ConnectedAccountProvider.IMAP_SMTP_CALDAV: {
const fullMessageList =
await this.imapGetMessageListService.getFullMessageList(
messageChannel.connectedAccount,
);
return [
{
...fullMessageList,
folderId: undefined,
},
];
}
default:
throw new MessageImportException(
`Provider ${messageChannel.connectedAccount.provider} is not supported`,
@ -105,6 +120,23 @@ export class MessagingGetMessageListService {
messageChannel.connectedAccount,
messageChannel,
);
case ConnectedAccountProvider.IMAP_SMTP_CALDAV: {
const messageList =
await this.imapGetMessageListService.getPartialMessageList(
messageChannel.connectedAccount,
messageChannel.syncCursor,
);
return [
{
messageExternalIds: messageList.messageExternalIds,
messageExternalIdsToDelete: [],
previousSyncCursor: messageChannel.syncCursor || '',
nextSyncCursor: messageList.nextSyncCursor || '',
folderId: undefined,
},
];
}
default:
throw new MessageImportException(
`Provider ${messageChannel.connectedAccount.provider} is not supported`,

View File

@ -4,6 +4,7 @@ import { ConnectedAccountProvider } from 'twenty-shared/types';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GmailGetMessagesService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/gmail-get-messages.service';
import { ImapGetMessagesService } from 'src/modules/messaging/message-import-manager/drivers/imap/services/imap-get-messages.service';
import { MicrosoftGetMessagesService } from 'src/modules/messaging/message-import-manager/drivers/microsoft/services/microsoft-get-messages.service';
import {
MessageImportException,
@ -18,6 +19,7 @@ export class MessagingGetMessagesService {
constructor(
private readonly gmailGetMessagesService: GmailGetMessagesService,
private readonly microsoftGetMessagesService: MicrosoftGetMessagesService,
private readonly imapGetMessagesService: ImapGetMessagesService,
) {}
public async getMessages(
@ -30,6 +32,8 @@ export class MessagingGetMessagesService {
| 'id'
| 'handle'
| 'handleAliases'
| 'accountOwnerId'
| 'connectionParameters'
>,
): Promise<GetMessagesResponse> {
switch (connectedAccount.provider) {
@ -43,6 +47,11 @@ export class MessagingGetMessagesService {
messageIds,
connectedAccount,
);
case ConnectedAccountProvider.IMAP_SMTP_CALDAV:
return this.imapGetMessagesService.getMessages(
messageIds,
connectedAccount,
);
default:
throw new MessageImportException(
`Provider ${connectedAccount.provider} is not supported`,

View File

@ -16,11 +16,13 @@ import {
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant';
import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service';
import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service';
import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/messaging-import-exception-handler.service';
import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service';
import { MessagingSaveMessagesAndEnqueueContactCreationService } from 'src/modules/messaging/message-import-manager/services/messaging-save-messages-and-enqueue-contact-creation.service';
import { MessagingMonitoringService } from 'src/modules/messaging/monitoring/services/messaging-monitoring.service';
describe('MessagingMessagesImportService', () => {
let service: MessagingMessagesImportService;
let messageChannelSyncStatusService: MessageChannelSyncStatusService;
@ -139,6 +141,10 @@ describe('MessagingMessagesImportService', () => {
handleDriverException: jest.fn().mockResolvedValue(undefined),
},
},
{
provide: MessagingAccountAuthenticationService,
useClass: MessagingAccountAuthenticationService,
},
];
const module: TestingModule = await Test.createTestingModule({
providers: [

View File

@ -1,3 +1,4 @@
//
import { Injectable, Logger } from '@nestjs/common';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
@ -8,7 +9,6 @@ import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { EmailAliasManagerService } from 'src/modules/connected-account/email-alias-manager/services/email-alias-manager.service';
import { ConnectedAccountRefreshAccessTokenExceptionCode } from 'src/modules/connected-account/refresh-tokens-manager/exceptions/connected-account-refresh-tokens.exception';
import { ConnectedAccountRefreshTokensService } from 'src/modules/connected-account/refresh-tokens-manager/services/connected-account-refresh-tokens.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
@ -16,11 +16,8 @@ import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessageImportDriverException,
MessageImportDriverExceptionCode,
} from 'src/modules/messaging/message-import-manager/drivers/exceptions/message-import-driver.exception';
import { MESSAGING_GMAIL_USERS_MESSAGES_GET_BATCH_SIZE } from 'src/modules/messaging/message-import-manager/drivers/gmail/constants/messaging-gmail-users-messages-get-batch-size.constant';
import { MessagingAccountAuthenticationService } from 'src/modules/messaging/message-import-manager/services/messaging-account-authentication.service';
import { MessagingGetMessagesService } from 'src/modules/messaging/message-import-manager/services/messaging-get-messages.service';
import {
MessageImportExceptionHandlerService,
@ -46,6 +43,7 @@ export class MessagingMessagesImportService {
private readonly twentyORMManager: TwentyORMManager,
private readonly messagingGetMessagesService: MessagingGetMessagesService,
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
private readonly messagingAccountAuthenticationService: MessagingAccountAuthenticationService,
) {}
async processMessageBatchImport(
@ -74,45 +72,11 @@ export class MessagingMessagesImportService {
messageChannel.id,
]);
try {
connectedAccount.accessToken =
await this.connectedAccountRefreshTokensService.refreshAndSaveTokens(
connectedAccount,
workspaceId,
);
} catch (error) {
switch (error.code) {
case ConnectedAccountRefreshAccessTokenExceptionCode.TEMPORARY_NETWORK_ERROR:
throw new MessageImportDriverException(
error.message,
MessageImportDriverExceptionCode.TEMPORARY_ERROR,
);
case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_ACCESS_TOKEN_FAILED:
case ConnectedAccountRefreshAccessTokenExceptionCode.REFRESH_TOKEN_NOT_FOUND:
await this.messagingMonitoringService.track({
eventName: `refresh_token.error.insufficient_permissions`,
workspaceId,
connectedAccountId: messageChannel.connectedAccountId,
messageChannelId: messageChannel.id,
message: `${error.code}: ${error.reason}`,
});
throw new MessageImportDriverException(
error.message,
MessageImportDriverExceptionCode.INSUFFICIENT_PERMISSIONS,
);
case ConnectedAccountRefreshAccessTokenExceptionCode.PROVIDER_NOT_SUPPORTED:
throw new MessageImportDriverException(
error.message,
MessageImportDriverExceptionCode.PROVIDER_NOT_SUPPORTED,
);
default:
this.logger.error(
`Error (${error.code}) refreshing access token for account ${connectedAccount.id}`,
);
this.logger.log(error);
throw error;
}
}
await this.messagingAccountAuthenticationService.validateConnectedAccountAuthentication(
connectedAccount,
workspaceId,
messageChannel.id,
);
await this.emailAliasManagerService.refreshHandleAliases(
connectedAccount,

View File

@ -1,13 +1,13 @@
import { Injectable } from '@nestjs/common';
import { z } from 'zod';
import { assertUnreachable, isDefined } from 'twenty-shared/utils';
import { ConnectedAccountProvider } from 'twenty-shared/types';
import { assertUnreachable, isDefined } from 'twenty-shared/utils';
import { z } from 'zod';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { GmailClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/gmail-client.provider';
import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-manager/drivers/microsoft/providers/microsoft-client.provider';
import { OAuth2ClientProvider } from 'src/modules/messaging/message-import-manager/drivers/gmail/providers/oauth2-client.provider';
import { MicrosoftClientProvider } from 'src/modules/messaging/message-import-manager/drivers/microsoft/providers/microsoft-client.provider';
import { mimeEncode } from 'src/modules/messaging/message-import-manager/utils/mime-encode.util';
interface SendMessageInput {
@ -93,6 +93,9 @@ export class MessagingSendMessageService {
await microsoftClient.api(`/me/messages/${response.id}/send`).post({});
break;
}
case ConnectedAccountProvider.IMAP_SMTP_CALDAV: {
throw new Error('IMAP provider does not support sending messages');
}
default:
assertUnreachable(
connectedAccount.provider,

View File

@ -1,3 +1,6 @@
/**
* Removes null characters (\0) from a string to prevent unexpected errors
*/
export const sanitizeString = (str: string) => {
return str.replace(/\0/g, '');
};