Add file support to agent chat (#13187)
https://github.com/user-attachments/assets/911d5d8d-cc2e-4c18-9f93-2663d84ff9ef --------- Co-authored-by: Raphaël Bosi <71827178+bosiraphael@users.noreply.github.com> Co-authored-by: neo773 <62795688+neo773@users.noreply.github.com> Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> Co-authored-by: Félix Malfait <felix.malfait@gmail.com> Co-authored-by: Félix Malfait <felix@twenty.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions <github-actions@twenty.com> Co-authored-by: MD Readul Islam <99027968+readul-islam@users.noreply.github.com> Co-authored-by: readul-islam <developer.readul@gamil.com> Co-authored-by: Thomas des Francs <tdesfrancs@gmail.com> Co-authored-by: Guillim <guillim@users.noreply.github.com> Co-authored-by: Lucas Bordeau <bordeau.lucas@gmail.com>
This commit is contained in:
@ -0,0 +1,32 @@
|
||||
import { Command, CommandRunner } from 'nest-commander';
|
||||
|
||||
import {
|
||||
CLEANUP_ORPHANED_FILES_CRON_PATTERN,
|
||||
CleanupOrphanedFilesCronJob,
|
||||
} from 'src/engine/core-modules/file/crons/jobs/cleanup-orphaned-files.cron.job';
|
||||
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
|
||||
|
||||
@Command({
|
||||
name: 'cron:file:cleanup-orphaned-files',
|
||||
description: 'Starts a cron job to clean up orphaned files (no messageId)',
|
||||
})
|
||||
export class CleanupOrphanedFilesCronCommand extends CommandRunner {
|
||||
constructor(
|
||||
@InjectMessageQueue(MessageQueue.cronQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async run(): Promise<void> {
|
||||
await this.messageQueueService.addCron<undefined>({
|
||||
jobName: CleanupOrphanedFilesCronJob.name,
|
||||
data: undefined,
|
||||
options: {
|
||||
repeat: { pattern: CLEANUP_ORPHANED_FILES_CRON_PATTERN },
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,69 @@
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
|
||||
import { WorkspaceActivationStatus } from 'twenty-shared/workspace';
|
||||
import { IsNull, LessThan, Repository } from 'typeorm';
|
||||
|
||||
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
|
||||
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
|
||||
import { FileEntity } from 'src/engine/core-modules/file/entities/file.entity';
|
||||
import { FileMetadataService } from 'src/engine/core-modules/file/services/file-metadata.service';
|
||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
|
||||
export const CLEANUP_ORPHANED_FILES_CRON_PATTERN = '0 2 * * *';
|
||||
|
||||
@Processor(MessageQueue.cronQueue)
|
||||
export class CleanupOrphanedFilesCronJob {
|
||||
constructor(
|
||||
@InjectRepository(Workspace, 'core')
|
||||
private readonly workspaceRepository: Repository<Workspace>,
|
||||
@InjectRepository(FileEntity, 'core')
|
||||
private readonly fileRepository: Repository<FileEntity>,
|
||||
private readonly fileMetadataService: FileMetadataService,
|
||||
private readonly exceptionHandlerService: ExceptionHandlerService,
|
||||
) {}
|
||||
|
||||
@Process(CleanupOrphanedFilesCronJob.name)
|
||||
@SentryCronMonitor(
|
||||
CleanupOrphanedFilesCronJob.name,
|
||||
CLEANUP_ORPHANED_FILES_CRON_PATTERN,
|
||||
)
|
||||
async handle(): Promise<void> {
|
||||
const activeWorkspaces = await this.workspaceRepository.find({
|
||||
where: {
|
||||
activationStatus: WorkspaceActivationStatus.ACTIVE,
|
||||
},
|
||||
select: ['id'],
|
||||
});
|
||||
|
||||
if (activeWorkspaces.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000);
|
||||
|
||||
const orphanedFiles = await this.fileRepository.find({
|
||||
select: ['id', 'workspaceId', 'fullPath'],
|
||||
where: {
|
||||
messageId: IsNull(),
|
||||
createdAt: LessThan(oneHourAgo),
|
||||
},
|
||||
});
|
||||
|
||||
if (orphanedFiles.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (const file of orphanedFiles) {
|
||||
await this.fileMetadataService
|
||||
.deleteFileById(file.id, file.workspaceId)
|
||||
.catch((error) => {
|
||||
throw new Error(
|
||||
`[${CleanupOrphanedFilesCronJob.name}] Cannot delete orphaned file - ${file.fullPath}: ${error.message}`,
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,25 @@
|
||||
import { Field, ID, ObjectType } from '@nestjs/graphql';
|
||||
|
||||
@ObjectType('File')
|
||||
export class FileDTO {
|
||||
@Field(() => ID)
|
||||
id: string;
|
||||
|
||||
@Field()
|
||||
name: string;
|
||||
|
||||
@Field()
|
||||
fullPath: string;
|
||||
|
||||
@Field()
|
||||
size: number;
|
||||
|
||||
@Field()
|
||||
type: string;
|
||||
|
||||
@Field(() => ID, { nullable: true })
|
||||
messageId?: string;
|
||||
|
||||
@Field()
|
||||
createdAt: Date;
|
||||
}
|
||||
@ -0,0 +1,53 @@
|
||||
import {
|
||||
Column,
|
||||
CreateDateColumn,
|
||||
Entity,
|
||||
Index,
|
||||
JoinColumn,
|
||||
ManyToOne,
|
||||
PrimaryGeneratedColumn,
|
||||
Relation,
|
||||
} from 'typeorm';
|
||||
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { AgentChatMessageEntity } from 'src/engine/metadata-modules/agent/agent-chat-message.entity';
|
||||
|
||||
@Entity('file')
|
||||
@Index('IDX_FILE_WORKSPACE_ID', ['workspaceId'])
|
||||
export class FileEntity {
|
||||
@PrimaryGeneratedColumn('uuid')
|
||||
id: string;
|
||||
|
||||
@Column({ nullable: false })
|
||||
name: string;
|
||||
|
||||
@Column({ nullable: false })
|
||||
fullPath: string;
|
||||
|
||||
@Column({ nullable: false, type: 'bigint' })
|
||||
size: number;
|
||||
|
||||
@Column({ nullable: false })
|
||||
type: string;
|
||||
|
||||
@Column({ nullable: false, type: 'uuid' })
|
||||
workspaceId: string;
|
||||
|
||||
@ManyToOne(() => Workspace, {
|
||||
onDelete: 'CASCADE',
|
||||
})
|
||||
@JoinColumn({ name: 'workspaceId' })
|
||||
workspace: Relation<Workspace>;
|
||||
|
||||
@Column({ nullable: true, type: 'uuid' })
|
||||
messageId: string;
|
||||
|
||||
@ManyToOne(() => AgentChatMessageEntity, {
|
||||
onDelete: 'CASCADE',
|
||||
})
|
||||
@JoinColumn({ name: 'messageId' })
|
||||
message: Relation<AgentChatMessageEntity>;
|
||||
|
||||
@CreateDateColumn({ type: 'timestamptz' })
|
||||
createdAt: Date;
|
||||
}
|
||||
@ -1,4 +1,6 @@
|
||||
import { HttpModule } from '@nestjs/axios';
|
||||
import { Module } from '@nestjs/common';
|
||||
import { TypeOrmModule } from '@nestjs/typeorm';
|
||||
|
||||
import { FilePathGuard } from 'src/engine/core-modules/file/guards/file-path-guard';
|
||||
import { FileDeletionJob } from 'src/engine/core-modules/file/jobs/file-deletion.job';
|
||||
@ -6,21 +8,37 @@ import { FileWorkspaceFolderDeletionJob } from 'src/engine/core-modules/file/job
|
||||
import { FileAttachmentListener } from 'src/engine/core-modules/file/listeners/file-attachment.listener';
|
||||
import { FileWorkspaceMemberListener } from 'src/engine/core-modules/file/listeners/file-workspace-member.listener';
|
||||
import { JwtModule } from 'src/engine/core-modules/jwt/jwt.module';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
|
||||
import { FileController } from './controllers/file.controller';
|
||||
import { CleanupOrphanedFilesCronCommand } from './crons/commands/cleanup-orphaned-files.cron.command';
|
||||
import { CleanupOrphanedFilesCronJob } from './crons/jobs/cleanup-orphaned-files.cron.job';
|
||||
import { FileEntity } from './entities/file.entity';
|
||||
import { FileUploadService } from './file-upload/services/file-upload.service';
|
||||
import { FileResolver } from './resolvers/file.resolver';
|
||||
import { FileMetadataService } from './services/file-metadata.service';
|
||||
import { FileService } from './services/file.service';
|
||||
|
||||
@Module({
|
||||
imports: [JwtModule],
|
||||
imports: [
|
||||
JwtModule,
|
||||
TypeOrmModule.forFeature([FileEntity, Workspace], 'core'),
|
||||
HttpModule,
|
||||
],
|
||||
providers: [
|
||||
FileService,
|
||||
FileMetadataService,
|
||||
FileResolver,
|
||||
FilePathGuard,
|
||||
FileAttachmentListener,
|
||||
FileWorkspaceMemberListener,
|
||||
FileWorkspaceFolderDeletionJob,
|
||||
FileDeletionJob,
|
||||
CleanupOrphanedFilesCronJob,
|
||||
CleanupOrphanedFilesCronCommand,
|
||||
FileUploadService,
|
||||
],
|
||||
exports: [FileService],
|
||||
exports: [FileService, FileMetadataService, CleanupOrphanedFilesCronCommand],
|
||||
controllers: [FileController],
|
||||
})
|
||||
export class FileModule {}
|
||||
|
||||
@ -8,6 +8,7 @@ export enum FileFolder {
|
||||
Attachment = 'attachment',
|
||||
PersonPicture = 'person-picture',
|
||||
ServerlessFunction = 'serverless-function',
|
||||
File = 'file',
|
||||
}
|
||||
|
||||
registerEnumType(FileFolder, {
|
||||
@ -34,6 +35,9 @@ export const fileFolderConfigs: Record<FileFolder, FileFolderConfig> = {
|
||||
[FileFolder.ServerlessFunction]: {
|
||||
ignoreExpirationToken: false,
|
||||
},
|
||||
[FileFolder.File]: {
|
||||
ignoreExpirationToken: false,
|
||||
},
|
||||
};
|
||||
|
||||
export type AllowedFolders = KebabCase<keyof typeof FileFolder>;
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import { UnrecoverableError } from 'bullmq';
|
||||
|
||||
import { FileService } from 'src/engine/core-modules/file/services/file.service';
|
||||
import { extractFolderPathAndFilename } from 'src/engine/core-modules/file/utils/extract-folderpath-and-filename.utils';
|
||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
@ -18,8 +19,7 @@ export class FileDeletionJob {
|
||||
async handle(data: FileDeletionJobData): Promise<void> {
|
||||
const { workspaceId, fullPath } = data;
|
||||
|
||||
const folderPath = fullPath.split('/').slice(0, -1).join('/');
|
||||
const filename = fullPath.split('/').pop();
|
||||
const { folderPath, filename } = extractFolderPathAndFilename(fullPath);
|
||||
|
||||
if (!filename) {
|
||||
throw new UnrecoverableError(
|
||||
|
||||
@ -0,0 +1,55 @@
|
||||
import { UseFilters, UseGuards, UsePipes } from '@nestjs/common';
|
||||
import { Args, Mutation, Resolver } from '@nestjs/graphql';
|
||||
|
||||
import { FileUpload, GraphQLUpload } from 'graphql-upload';
|
||||
|
||||
import { FileDTO } from 'src/engine/core-modules/file/dtos/file.dto';
|
||||
import { FileMetadataService } from 'src/engine/core-modules/file/services/file-metadata.service';
|
||||
import { PreventNestToAutoLogGraphqlErrorsFilter } from 'src/engine/core-modules/graphql/filters/prevent-nest-to-auto-log-graphql-errors.filter';
|
||||
import { ResolverValidationPipe } from 'src/engine/core-modules/graphql/pipes/resolver-validation.pipe';
|
||||
import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity';
|
||||
import { AuthWorkspace } from 'src/engine/decorators/auth/auth-workspace.decorator';
|
||||
import { WorkspaceAuthGuard } from 'src/engine/guards/workspace-auth.guard';
|
||||
import { streamToBuffer } from 'src/utils/stream-to-buffer';
|
||||
|
||||
@UseGuards(WorkspaceAuthGuard)
|
||||
@UsePipes(ResolverValidationPipe)
|
||||
@UseFilters(PreventNestToAutoLogGraphqlErrorsFilter)
|
||||
@Resolver()
|
||||
export class FileResolver {
|
||||
constructor(private readonly fileMetadataService: FileMetadataService) {}
|
||||
|
||||
@Mutation(() => FileDTO)
|
||||
async createFile(
|
||||
@AuthWorkspace() { id: workspaceId }: Workspace,
|
||||
@Args({ name: 'file', type: () => GraphQLUpload })
|
||||
{ createReadStream, filename, mimetype }: FileUpload,
|
||||
): Promise<FileDTO> {
|
||||
const stream = createReadStream();
|
||||
const buffer = await streamToBuffer(stream);
|
||||
|
||||
return this.fileMetadataService.createFile({
|
||||
file: buffer,
|
||||
filename,
|
||||
mimeType: mimetype,
|
||||
workspaceId,
|
||||
});
|
||||
}
|
||||
|
||||
@Mutation(() => FileDTO)
|
||||
async deleteFile(
|
||||
@AuthWorkspace() { id: workspaceId }: Workspace,
|
||||
@Args('fileId') fileId: string,
|
||||
): Promise<FileDTO> {
|
||||
const deletedFile = await this.fileMetadataService.deleteFileById(
|
||||
fileId,
|
||||
workspaceId,
|
||||
);
|
||||
|
||||
if (!deletedFile) {
|
||||
throw new Error(`File with id ${fileId} not found`);
|
||||
}
|
||||
|
||||
return deletedFile;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,94 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { FileFolder } from 'src/engine/core-modules/file/interfaces/file-folder.interface';
|
||||
|
||||
import { FileStorageService } from 'src/engine/core-modules/file-storage/file-storage.service';
|
||||
import { FileDTO } from 'src/engine/core-modules/file/dtos/file.dto';
|
||||
import { FileEntity } from 'src/engine/core-modules/file/entities/file.entity';
|
||||
import { FileUploadService } from 'src/engine/core-modules/file/file-upload/services/file-upload.service';
|
||||
import { extractFolderPathAndFilename } from 'src/engine/core-modules/file/utils/extract-folderpath-and-filename.utils';
|
||||
|
||||
import { FileService } from './file.service';
|
||||
|
||||
@Injectable()
|
||||
export class FileMetadataService {
|
||||
constructor(
|
||||
@InjectRepository(FileEntity, 'core')
|
||||
private readonly fileRepository: Repository<FileEntity>,
|
||||
private readonly fileService: FileService,
|
||||
private readonly fileStorageService: FileStorageService,
|
||||
private readonly fileUploadService: FileUploadService,
|
||||
) {}
|
||||
|
||||
async createFile({
|
||||
file,
|
||||
filename,
|
||||
mimeType,
|
||||
workspaceId,
|
||||
}: {
|
||||
file: Buffer;
|
||||
filename: string;
|
||||
mimeType: string;
|
||||
workspaceId: string;
|
||||
}): Promise<FileDTO> {
|
||||
const { files } = await this.fileUploadService.uploadFile({
|
||||
file,
|
||||
filename,
|
||||
mimeType,
|
||||
fileFolder: FileFolder.File,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
if (!files.length) {
|
||||
throw new Error('Failed to upload file');
|
||||
}
|
||||
|
||||
const createdFile = this.fileRepository.create({
|
||||
name: filename,
|
||||
fullPath: files[0].path,
|
||||
size: file.length,
|
||||
type: mimeType,
|
||||
workspaceId,
|
||||
});
|
||||
|
||||
const savedFile = await this.fileRepository.save(createdFile);
|
||||
|
||||
return savedFile;
|
||||
}
|
||||
|
||||
async deleteFileById(
|
||||
id: string,
|
||||
workspaceId: string,
|
||||
): Promise<FileDTO | null> {
|
||||
const file = await this.fileRepository.findOne({
|
||||
where: { id, workspaceId },
|
||||
});
|
||||
|
||||
if (!file) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const { folderPath, filename } = extractFolderPathAndFilename(
|
||||
file.fullPath,
|
||||
);
|
||||
|
||||
try {
|
||||
if (file.fullPath) {
|
||||
await this.fileService.deleteFile({
|
||||
folderPath,
|
||||
filename,
|
||||
workspaceId,
|
||||
});
|
||||
}
|
||||
|
||||
await this.fileRepository.delete(file.id);
|
||||
|
||||
return file;
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to delete file ${id}: ${error.message}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3,18 +3,18 @@ import { Injectable } from '@nestjs/common';
|
||||
import { basename, dirname, extname } from 'path';
|
||||
import { Stream } from 'stream';
|
||||
|
||||
import { v4 as uuidV4 } from 'uuid';
|
||||
import { buildSignedPath } from 'twenty-shared/utils';
|
||||
import { isNonEmptyString } from '@sniptt/guards';
|
||||
import { buildSignedPath } from 'twenty-shared/utils';
|
||||
import { v4 as uuidV4 } from 'uuid';
|
||||
|
||||
import { FileStorageService } from 'src/engine/core-modules/file-storage/file-storage.service';
|
||||
import { JwtWrapperService } from 'src/engine/core-modules/jwt/services/jwt-wrapper.service';
|
||||
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
|
||||
import { extractFilenameFromPath } from 'src/engine/core-modules/file/utils/extract-file-id-from-path.utils';
|
||||
import {
|
||||
FileTokenJwtPayload,
|
||||
JwtTokenTypeEnum,
|
||||
} from 'src/engine/core-modules/auth/types/auth-context.type';
|
||||
import { FileStorageService } from 'src/engine/core-modules/file-storage/file-storage.service';
|
||||
import { extractFolderPathAndFilename } from 'src/engine/core-modules/file/utils/extract-folderpath-and-filename.utils';
|
||||
import { JwtWrapperService } from 'src/engine/core-modules/jwt/services/jwt-wrapper.service';
|
||||
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
|
||||
|
||||
@Injectable()
|
||||
export class FileService {
|
||||
@ -45,7 +45,7 @@ export class FileService {
|
||||
return buildSignedPath({
|
||||
path: url,
|
||||
token: this.encodeFileToken({
|
||||
filename: extractFilenameFromPath(url),
|
||||
filename: extractFolderPathAndFilename(url).filename,
|
||||
workspaceId,
|
||||
}),
|
||||
});
|
||||
|
||||
@ -1,37 +0,0 @@
|
||||
import { extractFilenameFromPath } from 'src/engine/core-modules/file/utils/extract-file-id-from-path.utils';
|
||||
|
||||
describe('extractFileIdFromPath', () => {
|
||||
it('should return the last segment of a normal path', () => {
|
||||
const result = extractFilenameFromPath('uploads/files/1234.txt');
|
||||
|
||||
expect(result).toBe('1234.txt');
|
||||
});
|
||||
|
||||
it('should return the last segment when there is no slash', () => {
|
||||
const result = extractFilenameFromPath('file.txt');
|
||||
|
||||
expect(result).toBe('file.txt');
|
||||
});
|
||||
|
||||
it('should throw when empty path', () => {
|
||||
expect(() => extractFilenameFromPath('')).toThrow(
|
||||
new Error('Cannot extract id from empty path'),
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw when empty filename', () => {
|
||||
const folderPath = 'uploads/files/';
|
||||
|
||||
expect(() => extractFilenameFromPath(folderPath)).toThrow(
|
||||
new Error(`Cannot extract id from folder path '${folderPath}'`),
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw when empty filename absolute path', () => {
|
||||
const folderPath = '/a/b/c/';
|
||||
|
||||
expect(() => extractFilenameFromPath(folderPath)).toThrow(
|
||||
new Error(`Cannot extract id from folder path '${folderPath}'`),
|
||||
);
|
||||
});
|
||||
});
|
||||
@ -1,17 +0,0 @@
|
||||
import { basename } from 'path';
|
||||
|
||||
import { isNonEmptyString } from '@sniptt/guards';
|
||||
|
||||
export const extractFilenameFromPath = (path: string) => {
|
||||
if (path.endsWith('/')) {
|
||||
throw new Error(`Cannot extract id from folder path '${path}'`);
|
||||
}
|
||||
|
||||
const fileId = basename(path);
|
||||
|
||||
if (!isNonEmptyString(fileId)) {
|
||||
throw new Error(`Cannot extract id from empty path`);
|
||||
}
|
||||
|
||||
return fileId;
|
||||
};
|
||||
@ -0,0 +1,13 @@
|
||||
export function extractFolderPathAndFilename(fullPath: string): {
|
||||
folderPath: string;
|
||||
filename: string;
|
||||
} {
|
||||
if (!fullPath || typeof fullPath !== 'string') {
|
||||
throw new Error('Invalid fullPath provided');
|
||||
}
|
||||
const parts = fullPath.split('/');
|
||||
const filename = parts.pop() || '';
|
||||
const folderPath = parts.join('/');
|
||||
|
||||
return { folderPath, filename };
|
||||
}
|
||||
@ -5,10 +5,12 @@ import {
|
||||
Index,
|
||||
JoinColumn,
|
||||
ManyToOne,
|
||||
OneToMany,
|
||||
PrimaryGeneratedColumn,
|
||||
Relation,
|
||||
} from 'typeorm';
|
||||
|
||||
import { FileEntity } from 'src/engine/core-modules/file/entities/file.entity';
|
||||
import { AgentChatThreadEntity } from 'src/engine/metadata-modules/agent/agent-chat-thread.entity';
|
||||
|
||||
export enum AgentChatMessageRole {
|
||||
@ -37,6 +39,9 @@ export class AgentChatMessageEntity {
|
||||
@Column('text')
|
||||
content: string;
|
||||
|
||||
@OneToMany(() => FileEntity, (file) => file.message)
|
||||
files: Relation<FileEntity[]>;
|
||||
|
||||
@CreateDateColumn()
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
@ -58,7 +58,7 @@ export class AgentChatController {
|
||||
@Post('stream')
|
||||
async streamAgentChat(
|
||||
@Body()
|
||||
body: { threadId: string; userMessage: string },
|
||||
body: { threadId: string; userMessage: string; fileIds?: string[] },
|
||||
@AuthUserWorkspaceId() userWorkspaceId: string,
|
||||
@Res() res: Response,
|
||||
) {
|
||||
@ -67,6 +67,7 @@ export class AgentChatController {
|
||||
threadId: body.threadId,
|
||||
userMessage: body.userMessage,
|
||||
userWorkspaceId,
|
||||
fileIds: body.fileIds || [],
|
||||
res,
|
||||
});
|
||||
} catch (error) {
|
||||
|
||||
@ -3,6 +3,7 @@ import { InjectRepository } from '@nestjs/typeorm';
|
||||
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { FileEntity } from 'src/engine/core-modules/file/entities/file.entity';
|
||||
import {
|
||||
AgentChatMessageEntity,
|
||||
AgentChatMessageRole,
|
||||
@ -20,6 +21,8 @@ export class AgentChatService {
|
||||
private readonly threadRepository: Repository<AgentChatThreadEntity>,
|
||||
@InjectRepository(AgentChatMessageEntity, 'core')
|
||||
private readonly messageRepository: Repository<AgentChatMessageEntity>,
|
||||
@InjectRepository(FileEntity, 'core')
|
||||
private readonly fileRepository: Repository<FileEntity>,
|
||||
) {}
|
||||
|
||||
async createThread(agentId: string, userWorkspaceId: string) {
|
||||
@ -45,10 +48,12 @@ export class AgentChatService {
|
||||
threadId,
|
||||
role,
|
||||
content,
|
||||
fileIds,
|
||||
}: {
|
||||
threadId: string;
|
||||
role: AgentChatMessageRole;
|
||||
content: string;
|
||||
fileIds?: string[];
|
||||
}) {
|
||||
const message = this.messageRepository.create({
|
||||
threadId,
|
||||
@ -56,7 +61,17 @@ export class AgentChatService {
|
||||
content,
|
||||
});
|
||||
|
||||
return this.messageRepository.save(message);
|
||||
const savedMessage = await this.messageRepository.save(message);
|
||||
|
||||
if (fileIds && fileIds.length > 0) {
|
||||
for (const fileId of fileIds) {
|
||||
await this.fileRepository.update(fileId, {
|
||||
messageId: savedMessage.id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return savedMessage;
|
||||
}
|
||||
|
||||
async getMessagesForThread(threadId: string, userWorkspaceId: string) {
|
||||
@ -77,6 +92,7 @@ export class AgentChatService {
|
||||
return this.messageRepository.find({
|
||||
where: { threadId },
|
||||
order: { createdAt: 'ASC' },
|
||||
relations: ['files'],
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,16 +1,30 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
|
||||
import { Readable } from 'stream';
|
||||
|
||||
import { createAnthropic } from '@ai-sdk/anthropic';
|
||||
import { createOpenAI } from '@ai-sdk/openai';
|
||||
import { CoreMessage, generateObject, generateText, streamText } from 'ai';
|
||||
import { Repository } from 'typeorm';
|
||||
import {
|
||||
CoreMessage,
|
||||
CoreUserMessage,
|
||||
FilePart,
|
||||
generateObject,
|
||||
generateText,
|
||||
ImagePart,
|
||||
streamText,
|
||||
TextPart,
|
||||
} from 'ai';
|
||||
import { In, Repository } from 'typeorm';
|
||||
|
||||
import {
|
||||
ModelId,
|
||||
ModelProvider,
|
||||
} from 'src/engine/core-modules/ai/constants/ai-models.const';
|
||||
import { AiModelRegistryService } from 'src/engine/core-modules/ai/services/ai-model-registry.service';
|
||||
import { FileEntity } from 'src/engine/core-modules/file/entities/file.entity';
|
||||
import { FileService } from 'src/engine/core-modules/file/services/file.service';
|
||||
import { extractFolderPathAndFilename } from 'src/engine/core-modules/file/utils/extract-folderpath-and-filename.utils';
|
||||
import { TwentyConfigService } from 'src/engine/core-modules/twenty-config/twenty-config.service';
|
||||
import {
|
||||
AgentChatMessageEntity,
|
||||
@ -22,6 +36,7 @@ import { AGENT_SYSTEM_PROMPTS } from 'src/engine/metadata-modules/agent/constant
|
||||
import { convertOutputSchemaToZod } from 'src/engine/metadata-modules/agent/utils/convert-output-schema-to-zod';
|
||||
import { OutputSchema } from 'src/modules/workflow/workflow-builder/workflow-schema/types/output-schema.type';
|
||||
import { resolveInput } from 'src/modules/workflow/workflow-executor/utils/variable-resolver.util';
|
||||
import { streamToBuffer } from 'src/utils/stream-to-buffer';
|
||||
|
||||
import { AgentEntity } from './agent.entity';
|
||||
import { AgentException, AgentExceptionCode } from './agent.exception';
|
||||
@ -45,9 +60,12 @@ export class AgentExecutionService {
|
||||
constructor(
|
||||
private readonly twentyConfigService: TwentyConfigService,
|
||||
private readonly agentToolService: AgentToolService,
|
||||
private readonly fileService: FileService,
|
||||
private readonly aiModelRegistryService: AiModelRegistryService,
|
||||
@InjectRepository(AgentEntity, 'core')
|
||||
private readonly agentRepository: Repository<AgentEntity>,
|
||||
@InjectRepository(FileEntity, 'core')
|
||||
private readonly fileRepository: Repository<FileEntity>,
|
||||
) {}
|
||||
|
||||
getModel = (modelId: ModelId, provider: ModelProvider) => {
|
||||
@ -58,9 +76,7 @@ export class AgentExecutionService {
|
||||
apiKey: this.twentyConfigService.get('OPENAI_COMPATIBLE_API_KEY'),
|
||||
});
|
||||
|
||||
return OpenAIProvider(
|
||||
this.aiModelRegistryService.getEffectiveModelConfig(modelId).modelId,
|
||||
);
|
||||
return OpenAIProvider(modelId);
|
||||
}
|
||||
case ModelProvider.OPENAI: {
|
||||
const OpenAIProvider = createOpenAI({
|
||||
@ -167,14 +183,73 @@ export class AgentExecutionService {
|
||||
}
|
||||
}
|
||||
|
||||
private async buildUserMessageWithFiles(
|
||||
userMessage: string,
|
||||
fileIds?: string[],
|
||||
): Promise<CoreUserMessage> {
|
||||
if (!fileIds || fileIds.length === 0) {
|
||||
return { role: AgentChatMessageRole.USER, content: userMessage };
|
||||
}
|
||||
|
||||
const files = await this.fileRepository.find({
|
||||
where: {
|
||||
id: In(fileIds),
|
||||
},
|
||||
});
|
||||
|
||||
const textPart: TextPart = {
|
||||
type: 'text',
|
||||
text: userMessage,
|
||||
};
|
||||
|
||||
const fileParts = await Promise.all(
|
||||
files.map((file) => this.createFilePart(file)),
|
||||
);
|
||||
|
||||
return {
|
||||
role: AgentChatMessageRole.USER,
|
||||
content: [textPart, ...fileParts],
|
||||
};
|
||||
}
|
||||
|
||||
private async createFilePart(
|
||||
file: FileEntity,
|
||||
): Promise<ImagePart | FilePart> {
|
||||
const { folderPath, filename } = extractFolderPathAndFilename(
|
||||
file.fullPath,
|
||||
);
|
||||
const fileStream = await this.fileService.getFileStream(
|
||||
folderPath,
|
||||
filename,
|
||||
file.workspaceId,
|
||||
);
|
||||
const fileBuffer = await streamToBuffer(fileStream as Readable);
|
||||
|
||||
if (file.type.startsWith('image')) {
|
||||
return {
|
||||
type: 'image',
|
||||
image: fileBuffer,
|
||||
mimeType: file.type,
|
||||
};
|
||||
} else {
|
||||
return {
|
||||
type: 'file',
|
||||
data: fileBuffer,
|
||||
mimeType: file.type,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async streamChatResponse({
|
||||
agentId,
|
||||
userMessage,
|
||||
messages,
|
||||
fileIds,
|
||||
}: {
|
||||
agentId: string;
|
||||
userMessage: string;
|
||||
messages: AgentChatMessageEntity[];
|
||||
fileIds?: string[];
|
||||
}) {
|
||||
const agent = await this.agentRepository.findOneOrFail({
|
||||
where: { id: agentId },
|
||||
@ -185,10 +260,12 @@ export class AgentExecutionService {
|
||||
content,
|
||||
}));
|
||||
|
||||
llmMessages.push({
|
||||
role: AgentChatMessageRole.USER,
|
||||
content: userMessage,
|
||||
});
|
||||
const userMessageWithFiles = await this.buildUserMessageWithFiles(
|
||||
userMessage,
|
||||
fileIds,
|
||||
);
|
||||
|
||||
llmMessages.push(userMessageWithFiles);
|
||||
|
||||
const aiRequestConfig = await this.prepareAIRequestConfig({
|
||||
system: `${AGENT_SYSTEM_PROMPTS.AGENT_CHAT}\n\n${agent.prompt}`,
|
||||
|
||||
@ -17,6 +17,7 @@ export type StreamAgentChatOptions = {
|
||||
threadId: string;
|
||||
userMessage: string;
|
||||
userWorkspaceId: string;
|
||||
fileIds?: string[];
|
||||
res: Response;
|
||||
};
|
||||
|
||||
@ -41,6 +42,7 @@ export class AgentStreamingService {
|
||||
threadId,
|
||||
userMessage,
|
||||
userWorkspaceId,
|
||||
fileIds = [],
|
||||
res,
|
||||
}: StreamAgentChatOptions) {
|
||||
try {
|
||||
@ -59,12 +61,6 @@ export class AgentStreamingService {
|
||||
);
|
||||
}
|
||||
|
||||
await this.agentChatService.addMessage({
|
||||
threadId,
|
||||
role: AgentChatMessageRole.USER,
|
||||
content: userMessage,
|
||||
});
|
||||
|
||||
this.setupStreamingHeaders(res);
|
||||
|
||||
const { fullStream } =
|
||||
@ -72,6 +68,7 @@ export class AgentStreamingService {
|
||||
agentId: thread.agent.id,
|
||||
userMessage,
|
||||
messages: thread.messages,
|
||||
fileIds,
|
||||
});
|
||||
|
||||
let aiResponse = '';
|
||||
@ -92,6 +89,20 @@ export class AgentStreamingService {
|
||||
});
|
||||
break;
|
||||
case 'error':
|
||||
{
|
||||
const errorMessage =
|
||||
chunk.error &&
|
||||
typeof chunk.error === 'object' &&
|
||||
'message' in chunk.error
|
||||
? chunk.error.message
|
||||
: 'Something went wrong. Please try again.';
|
||||
|
||||
this.sendStreamEvent(res, {
|
||||
type: 'error',
|
||||
message: errorMessage as string,
|
||||
});
|
||||
res.end();
|
||||
}
|
||||
this.logger.error(`Stream error: ${JSON.stringify(chunk)}`);
|
||||
break;
|
||||
default:
|
||||
@ -100,6 +111,19 @@ export class AgentStreamingService {
|
||||
}
|
||||
}
|
||||
|
||||
if (!aiResponse) {
|
||||
res.end();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.agentChatService.addMessage({
|
||||
threadId,
|
||||
role: AgentChatMessageRole.USER,
|
||||
content: userMessage,
|
||||
fileIds,
|
||||
});
|
||||
|
||||
await this.agentChatService.addMessage({
|
||||
threadId,
|
||||
role: AgentChatMessageRole.ASSISTANT,
|
||||
|
||||
@ -5,6 +5,9 @@ import { AiModule } from 'src/engine/core-modules/ai/ai.module';
|
||||
import { AuditModule } from 'src/engine/core-modules/audit/audit.module';
|
||||
import { TokenModule } from 'src/engine/core-modules/auth/token/token.module';
|
||||
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
|
||||
import { FileEntity } from 'src/engine/core-modules/file/entities/file.entity';
|
||||
import { FileUploadModule } from 'src/engine/core-modules/file/file-upload/file-upload.module';
|
||||
import { FileModule } from 'src/engine/core-modules/file/file.module';
|
||||
import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.module';
|
||||
import { UserWorkspace } from 'src/engine/core-modules/user-workspace/user-workspace.entity';
|
||||
import { AgentChatController } from 'src/engine/metadata-modules/agent/agent-chat.controller';
|
||||
@ -33,6 +36,7 @@ import { AgentService } from './agent.service';
|
||||
RoleTargetsEntity,
|
||||
AgentChatMessageEntity,
|
||||
AgentChatThreadEntity,
|
||||
FileEntity,
|
||||
UserWorkspace,
|
||||
],
|
||||
'core',
|
||||
@ -41,6 +45,8 @@ import { AgentService } from './agent.service';
|
||||
ThrottlerModule,
|
||||
AuditModule,
|
||||
FeatureFlagModule,
|
||||
FileUploadModule,
|
||||
FileModule,
|
||||
ObjectMetadataModule,
|
||||
WorkspacePermissionsCacheModule,
|
||||
WorkspaceCacheStorageModule,
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
import { Field, ID, ObjectType } from '@nestjs/graphql';
|
||||
|
||||
import { FileDTO } from 'src/engine/core-modules/file/dtos/file.dto';
|
||||
|
||||
@ObjectType('AgentChatMessage')
|
||||
export class AgentChatMessageDTO {
|
||||
@Field(() => ID)
|
||||
@ -14,6 +16,9 @@ export class AgentChatMessageDTO {
|
||||
@Field()
|
||||
content: string;
|
||||
|
||||
@Field(() => [FileDTO], { nullable: true })
|
||||
files?: FileDTO[];
|
||||
|
||||
@Field()
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user