from typing import Optional, Union from core.app.entities.app_invoke_entities import InvokeFrom from extensions.ext_database import db from libs.infinite_scroll_pagination import InfiniteScrollPagination from models.account import Account from models.model import App, EndUser from models.web import PinnedConversation from services.conversation_service import ConversationService class WebConversationService: @classmethod def pagination_by_last_id( cls, app_model: App, user: Optional[Union[Account, EndUser]], last_id: Optional[str], limit: int, invoke_from: InvokeFrom, pinned: Optional[bool] = None, sort_by="-updated_at", ) -> InfiniteScrollPagination: include_ids = None exclude_ids = None if pinned is not None: pinned_conversations = ( db.session.query(PinnedConversation) .filter( PinnedConversation.app_id == app_model.id, PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"), PinnedConversation.created_by == user.id, ) .order_by(PinnedConversation.created_at.desc()) .all() ) pinned_conversation_ids = [pc.conversation_id for pc in pinned_conversations] if pinned: include_ids = pinned_conversation_ids else: exclude_ids = pinned_conversation_ids return ConversationService.pagination_by_last_id( app_model=app_model, user=user, last_id=last_id, limit=limit, invoke_from=invoke_from, include_ids=include_ids, exclude_ids=exclude_ids, sort_by=sort_by, ) @classmethod def pin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]): pinned_conversation = ( db.session.query(PinnedConversation) .filter( PinnedConversation.app_id == app_model.id, PinnedConversation.conversation_id == conversation_id, PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"), PinnedConversation.created_by == user.id, ) .first() ) if pinned_conversation: return conversation = ConversationService.get_conversation( app_model=app_model, conversation_id=conversation_id, user=user ) pinned_conversation = PinnedConversation( app_id=app_model.id, conversation_id=conversation.id, created_by_role="account" if isinstance(user, Account) else "end_user", created_by=user.id, ) db.session.add(pinned_conversation) db.session.commit() @classmethod def unpin(cls, app_model: App, conversation_id: str, user: Optional[Union[Account, EndUser]]): pinned_conversation = ( db.session.query(PinnedConversation) .filter( PinnedConversation.app_id == app_model.id, PinnedConversation.conversation_id == conversation_id, PinnedConversation.created_by_role == ("account" if isinstance(user, Account) else "end_user"), PinnedConversation.created_by == user.id, ) .first() ) if not pinned_conversation: return db.session.delete(pinned_conversation) db.session.commit()