from fastapi import APIRouter, Depends, HTTPException, status from fastapi.requests import Request from fastapi.responses import StreamingResponse from typing import Any, Dict, List, Optional from datetime import datetime import logging import uuid from sqlalchemy.ext.asyncio import AsyncSession from modules.features.chatBot.database import get_async_db_session from modules.features.chatBot.service import ( get_or_create_thread_for_user, ) from modules.datamodels.datamodelUam import User from modules.datamodels.datamodelChatbot import ( ChatMessageRequest, MessageItem, ChatMessageResponse, ThreadSummary, ThreadListResponse, ThreadDetail, RenameThreadRequest, DeleteResponse, ) from modules.security.auth import getCurrentUser, limiter from modules.features.chatBot import service as chat_service logger = logging.getLogger(__name__) router = APIRouter( prefix="/api/chatbot", tags=["Chatbot"], responses={404: {"description": "Not found"}}, ) # --- Actual endpoints for chatbot --- @router.post("/message/stream") @limiter.limit("30/minute") async def post_chat_message_stream( *, request: Request, message_request: ChatMessageRequest, currentUser: User = Depends(getCurrentUser), session: AsyncSession = Depends(get_async_db_session), ) -> StreamingResponse: """ Post a message to a chat thread with streaming progress updates. Creates a new thread if thread_id is not provided. Returns Server-Sent Events (SSE) stream with status updates and final response. """ try: # Get or create thread using helper function thread_id = await get_or_create_thread_for_user( thread_id=message_request.thread_id, user=currentUser, session=session, thread_name=message_request.message[:100], refresh_date_updated=True, ) logger.info( f"User {currentUser.id} posted streaming message to thread {thread_id}" ) return StreamingResponse( chat_service.post_message_stream( thread_id=thread_id, message=message_request.message, user=currentUser, ), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", }, ) except Exception as e: logger.error( f"Error posting chat message: {type(e).__name__}: {str(e)}", exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to post message: {type(e).__name__}: {str(e) or 'No error message provided'}", ) @router.post("/message", response_model=ChatMessageResponse) @limiter.limit("30/minute") async def post_chat_message( *, request: Request, message_request: ChatMessageRequest, currentUser: User = Depends(getCurrentUser), session: AsyncSession = Depends(get_async_db_session), ) -> ChatMessageResponse: """ Post a message to a chat thread and get assistant response (non-streaming). Creates a new thread if thread_id is not provided. For streaming updates, use the /message/stream endpoint instead. """ try: # Get or create thread using helper function thread_id = await get_or_create_thread_for_user( thread_id=message_request.thread_id, user=currentUser, session=session, thread_name=message_request.message[:100], refresh_date_updated=True, ) logger.info(f"User {currentUser.id} posted message to thread {thread_id}") response = await chat_service.post_message( thread_id=thread_id, message=message_request.message, user=currentUser, ) return response except ValueError as e: logger.error(f"Permission error: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=str(e) or "Permission denied", ) except Exception as e: logger.error( f"Error posting chat message: {type(e).__name__}: {str(e)}", exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to post message: {type(e).__name__}: {str(e) or 'No error message provided'}", ) @router.get("/threads", response_model=ThreadListResponse) @limiter.limit("30/minute") async def get_all_threads( *, request: Request, currentUser: User = Depends(getCurrentUser), session: AsyncSession = Depends(get_async_db_session), ) -> ThreadListResponse: """ Get all chat threads for the current user. """ try: # Get all threads for the current user threads = await chat_service.get_all_threads_for_user( user=currentUser, session=session ) logger.info(f"User {currentUser.id} retrieved {len(threads)} threads") return ThreadListResponse(threads=threads) except Exception as e: logger.error( f"Error retrieving threads: {type(e).__name__}: {str(e)}", exc_info=True ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve threads: {type(e).__name__}: {str(e) or 'No error message provided'}", ) @router.get("/threads/{thread_id}", response_model=ThreadDetail) @limiter.limit("30/minute") async def get_thread_by_id( *, request: Request, thread_id: str, currentUser: User = Depends(getCurrentUser), session: AsyncSession = Depends(get_async_db_session), ) -> ThreadDetail: """ Get a specific chat thread with all its messages from LangGraph checkpointer. """ try: thread_detail = await chat_service.get_thread_detail_for_user( thread_id=thread_id, user=currentUser, session=session, ) logger.info(f"User {currentUser.id} retrieved thread {thread_id}") return thread_detail except ValueError as e: logger.error(f"Thread not found: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=str(e) or "Thread not found", ) except PermissionError as e: logger.error(f"Permission denied: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=str(e) or "Permission denied", ) except Exception as e: logger.error( f"Error retrieving thread {thread_id}: {type(e).__name__}: {str(e)}", exc_info=True, ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve thread: {type(e).__name__}: {str(e) or 'No error message provided'}", ) @router.patch("/threads/{thread_id}", response_model=DeleteResponse) @limiter.limit("30/minute") async def rename_thread( *, request: Request, thread_id: str, rename_request: RenameThreadRequest, currentUser: User = Depends(getCurrentUser), session: AsyncSession = Depends(get_async_db_session), ) -> DeleteResponse: """ Rename a chat thread. """ try: await chat_service.update_thread_name( thread_id=thread_id, user=currentUser, new_thread_name=rename_request.new_name, session=session, ) logger.info( f"User {currentUser.id} renamed thread {thread_id} to '{rename_request.new_name}'" ) return DeleteResponse( message=f"Thread {thread_id} successfully renamed", thread_id=thread_id, ) except ValueError as e: logger.error(f"Thread not found or permission denied: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=str(e) or "Thread not found or permission denied", ) except Exception as e: logger.error( f"Error renaming thread {thread_id}: {type(e).__name__}: {str(e)}", exc_info=True, ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to rename thread: {type(e).__name__}: {str(e) or 'No error message provided'}", ) @router.delete("/threads/{thread_id}", response_model=DeleteResponse) @limiter.limit("10/minute") async def delete_thread( *, request: Request, thread_id: str, currentUser: User = Depends(getCurrentUser), session: AsyncSession = Depends(get_async_db_session), ) -> DeleteResponse: """ Delete a chat thread and all its associated data from both LangGraph and database. """ try: await chat_service.delete_thread_for_user( thread_id=thread_id, user=currentUser, session=session, ) logger.info(f"User {currentUser.id} deleted thread {thread_id}") return DeleteResponse( message=f"Thread {thread_id} successfully deleted", thread_id=thread_id, ) except ValueError as e: logger.error(f"Thread not found or permission denied: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=str(e) or "Thread not found or permission denied", ) except PermissionError as e: logger.error(f"Permission denied: {str(e)}", exc_info=True) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=str(e) or "Permission denied", ) except Exception as e: logger.error( f"Error deleting thread {thread_id}: {type(e).__name__}: {str(e)}", exc_info=True, ) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete thread: {type(e).__name__}: {str(e) or 'No error message provided'}", )