250 lines
8 KiB
Python
250 lines
8 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.requests import Request
|
|
from fastapi.responses import StreamingResponse
|
|
from typing import Any, Dict, List, Optional
|
|
from datetime import datetime
|
|
import logging
|
|
import uuid
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
from modules.features.chatBot.database import get_async_db_session
|
|
from modules.features.chatBot.service import (
|
|
get_or_create_thread_for_user,
|
|
)
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.datamodels.datamodelChatbot import (
|
|
ChatMessageRequest,
|
|
MessageItem,
|
|
ChatMessageResponse,
|
|
ThreadSummary,
|
|
ThreadListResponse,
|
|
ThreadDetail,
|
|
DeleteResponse,
|
|
)
|
|
from modules.security.auth import getCurrentUser, limiter
|
|
from modules.features.chatBot import service as chat_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(
|
|
prefix="/api/chatbot",
|
|
tags=["Chatbot"],
|
|
responses={404: {"description": "Not found"}},
|
|
)
|
|
|
|
|
|
# --- Actual endpoints for chatbot ---
|
|
|
|
|
|
@router.post("/message/stream")
|
|
@limiter.limit("30/minute")
|
|
async def post_chat_message_stream(
|
|
*,
|
|
request: Request,
|
|
message_request: ChatMessageRequest,
|
|
currentUser: User = Depends(getCurrentUser),
|
|
session: AsyncSession = Depends(get_async_db_session),
|
|
) -> StreamingResponse:
|
|
"""
|
|
Post a message to a chat thread with streaming progress updates.
|
|
Creates a new thread if thread_id is not provided.
|
|
|
|
Returns Server-Sent Events (SSE) stream with status updates and final response.
|
|
"""
|
|
try:
|
|
# Get or create thread using helper function
|
|
thread_id = await get_or_create_thread_for_user(
|
|
thread_id=message_request.thread_id,
|
|
user=currentUser,
|
|
session=session,
|
|
thread_name=message_request.message[:100],
|
|
refresh_date_updated=True,
|
|
)
|
|
|
|
logger.info(
|
|
f"User {currentUser.id} posted streaming message to thread {thread_id}"
|
|
)
|
|
|
|
return StreamingResponse(
|
|
chat_service.post_message_stream(
|
|
thread_id=thread_id,
|
|
message=message_request.message,
|
|
user=currentUser,
|
|
),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
},
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error posting chat message: {type(e).__name__}: {str(e)}", exc_info=True
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to post message: {type(e).__name__}: {str(e) or 'No error message provided'}",
|
|
)
|
|
|
|
|
|
@router.post("/message", response_model=ChatMessageResponse)
|
|
@limiter.limit("30/minute")
|
|
async def post_chat_message(
|
|
*,
|
|
request: Request,
|
|
message_request: ChatMessageRequest,
|
|
currentUser: User = Depends(getCurrentUser),
|
|
session: AsyncSession = Depends(get_async_db_session),
|
|
) -> ChatMessageResponse:
|
|
"""
|
|
Post a message to a chat thread and get assistant response (non-streaming).
|
|
Creates a new thread if thread_id is not provided.
|
|
|
|
For streaming updates, use the /message/stream endpoint instead.
|
|
"""
|
|
try:
|
|
# Get or create thread using helper function
|
|
thread_id = await get_or_create_thread_for_user(
|
|
thread_id=message_request.thread_id,
|
|
user=currentUser,
|
|
session=session,
|
|
thread_name=message_request.message[:100],
|
|
refresh_date_updated=True,
|
|
)
|
|
|
|
logger.info(f"User {currentUser.id} posted message to thread {thread_id}")
|
|
|
|
response = await chat_service.post_message(
|
|
thread_id=thread_id,
|
|
message=message_request.message,
|
|
user=currentUser,
|
|
)
|
|
|
|
return response
|
|
|
|
except ValueError as e:
|
|
logger.error(f"Permission error: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=str(e) or "Permission denied",
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error posting chat message: {type(e).__name__}: {str(e)}", exc_info=True
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to post message: {type(e).__name__}: {str(e) or 'No error message provided'}",
|
|
)
|
|
|
|
|
|
@router.get("/threads", response_model=ThreadListResponse)
|
|
@limiter.limit("30/minute")
|
|
async def get_all_threads(
|
|
*,
|
|
request: Request,
|
|
currentUser: User = Depends(getCurrentUser),
|
|
session: AsyncSession = Depends(get_async_db_session),
|
|
) -> ThreadListResponse:
|
|
"""
|
|
Get all chat threads for the current user.
|
|
"""
|
|
try:
|
|
# Get all threads for the current user
|
|
threads = await chat_service.get_all_threads_for_user(
|
|
user=currentUser, session=session
|
|
)
|
|
|
|
logger.info(f"User {currentUser.id} retrieved {len(threads)} threads")
|
|
|
|
return ThreadListResponse(threads=threads)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error retrieving threads: {type(e).__name__}: {str(e)}", exc_info=True
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to retrieve threads: {type(e).__name__}: {str(e) or 'No error message provided'}",
|
|
)
|
|
|
|
|
|
@router.get("/threads/{thread_id}", response_model=ThreadDetail)
|
|
@limiter.limit("30/minute")
|
|
async def get_thread_by_id(
|
|
*,
|
|
request: Request,
|
|
thread_id: str,
|
|
currentUser: User = Depends(getCurrentUser),
|
|
session: AsyncSession = Depends(get_async_db_session),
|
|
) -> ThreadDetail:
|
|
"""
|
|
Get a specific chat thread with all its messages from LangGraph checkpointer.
|
|
"""
|
|
try:
|
|
thread_detail = await chat_service.get_thread_detail_for_user(
|
|
thread_id=thread_id,
|
|
user=currentUser,
|
|
session=session,
|
|
)
|
|
|
|
logger.info(f"User {currentUser.id} retrieved thread {thread_id}")
|
|
return thread_detail
|
|
|
|
except ValueError as e:
|
|
logger.error(f"Thread not found: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=str(e) or "Thread not found",
|
|
)
|
|
except PermissionError as e:
|
|
logger.error(f"Permission denied: {str(e)}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail=str(e) or "Permission denied",
|
|
)
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error retrieving thread {thread_id}: {type(e).__name__}: {str(e)}",
|
|
exc_info=True,
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to retrieve thread: {type(e).__name__}: {str(e) or 'No error message provided'}",
|
|
)
|
|
|
|
|
|
@router.delete("/threads/{thread_id}", response_model=DeleteResponse)
|
|
@limiter.limit("10/minute")
|
|
async def delete_thread(
|
|
*, request: Request, thread_id: str, currentUser: User = Depends(getCurrentUser)
|
|
) -> DeleteResponse:
|
|
"""
|
|
Delete a chat thread and all its associated data.
|
|
|
|
This endpoint will later delete from LangGraph's PostgreSQL checkpointer.
|
|
"""
|
|
try:
|
|
# In production, this will:
|
|
# 1. Verify the thread belongs to the current user
|
|
# 2. Delete the thread from LangGraph's checkpointer
|
|
# 3. Clean up any associated data
|
|
|
|
logger.info(f"User {currentUser.id} deleted thread {thread_id}")
|
|
|
|
return DeleteResponse(
|
|
message=f"Thread {thread_id} successfully deleted (dummy response)",
|
|
thread_id=thread_id,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error deleting thread {thread_id}: {type(e).__name__}: {str(e)}",
|
|
exc_info=True,
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to delete thread: {type(e).__name__}: {str(e) or 'No error message provided'}",
|
|
)
|