import mimetypes from collections.abc import Mapping, Sequence from typing import Any import httpx from sqlalchemy import select from constants import AUDIO_EXTENSIONS, DOCUMENT_EXTENSIONS, IMAGE_EXTENSIONS, VIDEO_EXTENSIONS from core.file import File, FileBelongsTo, FileExtraConfig, FileTransferMethod, FileType from core.helper import ssrf_proxy from extensions.ext_database import db from models import MessageFile, ToolFile, UploadFile from models.enums import CreatedByRole def build_from_message_files( *, message_files: Sequence["MessageFile"], tenant_id: str, config: FileExtraConfig, ) -> Sequence[File]: results = [ build_from_message_file(message_file=file, tenant_id=tenant_id, config=config) for file in message_files if file.belongs_to != FileBelongsTo.ASSISTANT ] return results def build_from_message_file( *, message_file: "MessageFile", tenant_id: str, config: FileExtraConfig, ): mapping = { "transfer_method": message_file.transfer_method, "url": message_file.url, "id": message_file.id, "type": message_file.type, "upload_file_id": message_file.upload_file_id, } return build_from_mapping( mapping=mapping, tenant_id=tenant_id, user_id=message_file.created_by, role=CreatedByRole(message_file.created_by_role), config=config, ) def build_from_mapping( *, mapping: Mapping[str, Any], tenant_id: str, user_id: str, role: "CreatedByRole", config: FileExtraConfig, ): transfer_method = FileTransferMethod.value_of(mapping.get("transfer_method")) match transfer_method: case FileTransferMethod.REMOTE_URL: file = _build_from_remote_url( mapping=mapping, tenant_id=tenant_id, config=config, transfer_method=transfer_method, ) case FileTransferMethod.LOCAL_FILE: file = _build_from_local_file( mapping=mapping, tenant_id=tenant_id, user_id=user_id, role=role, config=config, transfer_method=transfer_method, ) case FileTransferMethod.TOOL_FILE: file = _build_from_tool_file( mapping=mapping, tenant_id=tenant_id, user_id=user_id, config=config, transfer_method=transfer_method, ) case _: raise ValueError(f"Invalid file transfer method: {transfer_method}") return file def build_from_mappings( *, mappings: Sequence[Mapping[str, Any]], config: FileExtraConfig | None, tenant_id: str, user_id: str, role: "CreatedByRole", ) -> Sequence[File]: if not config: return [] files = [ build_from_mapping( mapping=mapping, tenant_id=tenant_id, user_id=user_id, role=role, config=config, ) for mapping in mappings ] if ( # If image config is set. config.image_config # And the number of image files exceeds the maximum limit and sum(1 for _ in (filter(lambda x: x.type == FileType.IMAGE, files))) > config.image_config.number_limits ): raise ValueError(f"Number of image files exceeds the maximum limit {config.image_config.number_limits}") if config.number_limits and len(files) > config.number_limits: raise ValueError(f"Number of files exceeds the maximum limit {config.number_limits}") return files def _build_from_local_file( *, mapping: Mapping[str, Any], tenant_id: str, user_id: str, role: "CreatedByRole", config: FileExtraConfig, transfer_method: FileTransferMethod, ): # check if the upload file exists. file_type = FileType.value_of(mapping.get("type")) stmt = select(UploadFile).where( UploadFile.id == mapping.get("upload_file_id"), UploadFile.tenant_id == tenant_id, UploadFile.created_by == user_id, UploadFile.created_by_role == role, ) if file_type == FileType.IMAGE: stmt = stmt.where(UploadFile.extension.in_(IMAGE_EXTENSIONS)) elif file_type == FileType.VIDEO: stmt = stmt.where(UploadFile.extension.in_(VIDEO_EXTENSIONS)) elif file_type == FileType.AUDIO: stmt = stmt.where(UploadFile.extension.in_(AUDIO_EXTENSIONS)) elif file_type == FileType.DOCUMENT: stmt = stmt.where(UploadFile.extension.in_(DOCUMENT_EXTENSIONS)) row = db.session.scalar(stmt) if row is None: raise ValueError("Invalid upload file") file = File( id=mapping.get("id"), filename=row.name, extension="." + row.extension, mime_type=row.mime_type, tenant_id=tenant_id, type=file_type, transfer_method=transfer_method, remote_url=row.source_url, related_id=mapping.get("upload_file_id"), _extra_config=config, size=row.size, ) return file def _build_from_remote_url( *, mapping: Mapping[str, Any], tenant_id: str, config: FileExtraConfig, transfer_method: FileTransferMethod, ): url = mapping.get("url") if not url: raise ValueError("Invalid file url") mime_type = mimetypes.guess_type(url)[0] or "" file_size = -1 filename = url.split("/")[-1].split("?")[0] or "unknown_file" resp = ssrf_proxy.head(url, follow_redirects=True) if resp.status_code == httpx.codes.OK: if content_disposition := resp.headers.get("Content-Disposition"): filename = content_disposition.split("filename=")[-1].strip('"') file_size = int(resp.headers.get("Content-Length", file_size)) mime_type = mime_type or str(resp.headers.get("Content-Type", "")) # Determine file extension extension = mimetypes.guess_extension(mime_type) or "." + filename.split(".")[-1] if "." in filename else ".bin" if not mime_type: mime_type, _ = mimetypes.guess_type(url) file = File( id=mapping.get("id"), filename=filename, tenant_id=tenant_id, type=FileType.value_of(mapping.get("type")), transfer_method=transfer_method, remote_url=url, _extra_config=config, mime_type=mime_type, extension=extension, size=file_size, ) return file def _build_from_tool_file( *, mapping: Mapping[str, Any], tenant_id: str, user_id: str, config: FileExtraConfig, transfer_method: FileTransferMethod, ): tool_file = ( db.session.query(ToolFile) .filter( ToolFile.id == mapping.get("tool_file_id"), ToolFile.tenant_id == tenant_id, ToolFile.user_id == user_id, ) .first() ) if tool_file is None: raise ValueError(f"ToolFile {mapping.get('tool_file_id')} not found") path = tool_file.file_key if "." in path: extension = "." + path.split("/")[-1].split(".")[-1] else: extension = ".bin" file = File( id=mapping.get("id"), tenant_id=tenant_id, filename=tool_file.name, type=FileType.value_of(mapping.get("type")), transfer_method=transfer_method, remote_url=tool_file.original_url, related_id=tool_file.id, extension=extension, mime_type=tool_file.mimetype, size=tool_file.size, _extra_config=config, ) return file