gateway/modules/routes/routeChatbot.py
2025-10-09 15:30:15 +02:00

536 lines
17 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.requests import Request
from fastapi.responses import StreamingResponse
from typing import Any, Dict, List, Optional
from datetime import datetime
import logging
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from modules.features.chatBot.database import get_async_db_session
from modules.features.chatBot.service import (
get_or_create_thread_for_user,
)
from modules.datamodels.datamodelUam import User, UserPrivilege
from modules.datamodels.datamodelChatbot import (
ChatMessageRequest,
MessageItem,
ChatMessageResponse,
ThreadSummary,
ThreadListResponse,
ThreadDetail,
RenameThreadRequest,
DeleteResponse,
ToolListResponse,
ToolInfo,
GrantToolRequest,
GrantToolResponse,
RevokeToolRequest,
RevokeToolResponse,
UpdateToolRequest,
UpdateToolResponse,
)
from modules.security.auth import getCurrentUser, limiter
from modules.features.chatBot import service as chat_service
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/api/chatbot",
tags=["Chatbot"],
responses={404: {"description": "Not found"}},
)
# --- Actual endpoints for chatbot ---
@router.post("/message/stream")
@limiter.limit("30/minute")
async def post_chat_message_stream(
*,
request: Request,
message_request: ChatMessageRequest,
currentUser: User = Depends(getCurrentUser),
session: AsyncSession = Depends(get_async_db_session),
) -> StreamingResponse:
"""
Post a message to a chat thread with streaming progress updates.
Creates a new thread if thread_id is not provided.
Returns Server-Sent Events (SSE) stream with status updates and final response.
"""
try:
# Get or create thread using helper function
thread_id = await get_or_create_thread_for_user(
thread_id=message_request.thread_id,
user=currentUser,
session=session,
thread_name=message_request.message[:100],
refresh_date_updated=True,
)
logger.info(
f"User {currentUser.id} posted streaming message to thread {thread_id}"
)
return StreamingResponse(
chat_service.post_message_stream(
thread_id=thread_id,
message=message_request.message,
user=currentUser,
),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
except Exception as e:
logger.error(
f"Error posting chat message: {type(e).__name__}: {str(e)}", exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to post message: {type(e).__name__}: {str(e) or 'No error message provided'}",
)
@router.post("/message", response_model=ChatMessageResponse)
@limiter.limit("30/minute")
async def post_chat_message(
*,
request: Request,
message_request: ChatMessageRequest,
currentUser: User = Depends(getCurrentUser),
session: AsyncSession = Depends(get_async_db_session),
) -> ChatMessageResponse:
"""
Post a message to a chat thread and get assistant response (non-streaming).
Creates a new thread if thread_id is not provided.
For streaming updates, use the /message/stream endpoint instead.
"""
try:
# Get or create thread using helper function
thread_id = await get_or_create_thread_for_user(
thread_id=message_request.thread_id,
user=currentUser,
session=session,
thread_name=message_request.message[:100],
refresh_date_updated=True,
)
logger.info(f"User {currentUser.id} posted message to thread {thread_id}")
response = await chat_service.post_message(
thread_id=thread_id,
message=message_request.message,
user=currentUser,
)
return response
except ValueError as e:
logger.error(f"Permission error: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=str(e) or "Permission denied",
)
except Exception as e:
logger.error(
f"Error posting chat message: {type(e).__name__}: {str(e)}", exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to post message: {type(e).__name__}: {str(e) or 'No error message provided'}",
)
@router.get("/threads", response_model=ThreadListResponse)
@limiter.limit("30/minute")
async def get_all_threads(
*,
request: Request,
currentUser: User = Depends(getCurrentUser),
session: AsyncSession = Depends(get_async_db_session),
) -> ThreadListResponse:
"""
Get all chat threads for the current user.
"""
try:
# Get all threads for the current user
threads = await chat_service.get_all_threads_for_user(
user=currentUser, session=session
)
logger.info(f"User {currentUser.id} retrieved {len(threads)} threads")
return ThreadListResponse(threads=threads)
except Exception as e:
logger.error(
f"Error retrieving threads: {type(e).__name__}: {str(e)}", exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve threads: {type(e).__name__}: {str(e) or 'No error message provided'}",
)
@router.get("/threads/{thread_id}", response_model=ThreadDetail)
@limiter.limit("30/minute")
async def get_thread_by_id(
*,
request: Request,
thread_id: str,
currentUser: User = Depends(getCurrentUser),
session: AsyncSession = Depends(get_async_db_session),
) -> ThreadDetail:
"""
Get a specific chat thread with all its messages from LangGraph checkpointer.
"""
try:
thread_detail = await chat_service.get_thread_detail_for_user(
thread_id=thread_id,
user=currentUser,
session=session,
)
logger.info(f"User {currentUser.id} retrieved thread {thread_id}")
return thread_detail
except ValueError as e:
logger.error(f"Thread not found: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(e) or "Thread not found",
)
except PermissionError as e:
logger.error(f"Permission denied: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=str(e) or "Permission denied",
)
except Exception as e:
logger.error(
f"Error retrieving thread {thread_id}: {type(e).__name__}: {str(e)}",
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve thread: {type(e).__name__}: {str(e) or 'No error message provided'}",
)
@router.patch("/threads/{thread_id}", response_model=DeleteResponse)
@limiter.limit("30/minute")
async def rename_thread(
*,
request: Request,
thread_id: str,
rename_request: RenameThreadRequest,
currentUser: User = Depends(getCurrentUser),
session: AsyncSession = Depends(get_async_db_session),
) -> DeleteResponse:
"""
Rename a chat thread.
"""
try:
await chat_service.update_thread_name(
thread_id=thread_id,
user=currentUser,
new_thread_name=rename_request.new_name,
session=session,
)
logger.info(
f"User {currentUser.id} renamed thread {thread_id} to '{rename_request.new_name}'"
)
return DeleteResponse(
message=f"Thread {thread_id} successfully renamed",
thread_id=thread_id,
)
except ValueError as e:
logger.error(f"Thread not found or permission denied: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(e) or "Thread not found or permission denied",
)
except Exception as e:
logger.error(
f"Error renaming thread {thread_id}: {type(e).__name__}: {str(e)}",
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to rename thread: {type(e).__name__}: {str(e) or 'No error message provided'}",
)
@router.delete("/threads/{thread_id}", response_model=DeleteResponse)
@limiter.limit("10/minute")
async def delete_thread(
*,
request: Request,
thread_id: str,
currentUser: User = Depends(getCurrentUser),
session: AsyncSession = Depends(get_async_db_session),
) -> DeleteResponse:
"""
Delete a chat thread and all its associated data from both LangGraph and database.
"""
try:
await chat_service.delete_thread_for_user(
thread_id=thread_id,
user=currentUser,
session=session,
)
logger.info(f"User {currentUser.id} deleted thread {thread_id}")
return DeleteResponse(
message=f"Thread {thread_id} successfully deleted",
thread_id=thread_id,
)
except ValueError as e:
logger.error(f"Thread not found or permission denied: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=str(e) or "Thread not found or permission denied",
)
except PermissionError as e:
logger.error(f"Permission denied: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=str(e) or "Permission denied",
)
except Exception as e:
logger.error(
f"Error deleting thread {thread_id}: {type(e).__name__}: {str(e)}",
exc_info=True,
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete thread: {type(e).__name__}: {str(e) or 'No error message provided'}",
)
# Tool Management Endpoints
@router.get("/tools", response_model=ToolListResponse)
@limiter.limit("30/minute")
async def get_all_tools(
*,
request: Request,
currentUser: User = Depends(getCurrentUser),
session: AsyncSession = Depends(get_async_db_session),
) -> ToolListResponse:
"""
Get all available chatbot tools.
Only accessible to system administrators.
"""
try:
# Check SYSADMIN permission
if currentUser.privilege != UserPrivilege.SYSADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only system administrators can view tools",
)
# Get all tools from service
tools_data = await chat_service.get_all_tools(session=session)
# Convert to ToolInfo objects
tools = [ToolInfo(**tool) for tool in tools_data]
logger.info(f"User {currentUser.id} retrieved {len(tools)} tools")
return ToolListResponse(tools=tools)
except HTTPException:
raise
except Exception as e:
logger.error(
f"Error retrieving tools: {type(e).__name__}: {str(e)}", exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to retrieve tools: {type(e).__name__}: {str(e) or 'No error message provided'}",
)
@router.post("/tools/grant", response_model=GrantToolResponse)
@limiter.limit("10/minute")
async def grant_tool_to_user(
*,
request: Request,
grant_request: GrantToolRequest,
currentUser: User = Depends(getCurrentUser),
session: AsyncSession = Depends(get_async_db_session),
) -> GrantToolResponse:
"""
Grant a tool to a user.
Only accessible to system administrators.
"""
try:
# Check SYSADMIN permission
if currentUser.privilege != UserPrivilege.SYSADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only system administrators can grant tools",
)
# Grant the tool
await chat_service.grant_tool_to_user(
user_id=grant_request.user_id,
tool_id=grant_request.tool_id,
session=session,
)
logger.info(
f"User {currentUser.id} granted tool {grant_request.tool_id} to user {grant_request.user_id}"
)
return GrantToolResponse(
message=f"Tool successfully granted to user {grant_request.user_id}",
user_id=grant_request.user_id,
tool_id=grant_request.tool_id,
)
except ValueError as e:
logger.error(f"Validation error: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e) or "Invalid request",
)
except HTTPException:
raise
except Exception as e:
logger.error(
f"Error granting tool: {type(e).__name__}: {str(e)}", exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to grant tool: {type(e).__name__}: {str(e) or 'No error message provided'}",
)
@router.delete("/tools/revoke", response_model=RevokeToolResponse)
@limiter.limit("10/minute")
async def revoke_tool_from_user(
*,
request: Request,
revoke_request: RevokeToolRequest,
currentUser: User = Depends(getCurrentUser),
session: AsyncSession = Depends(get_async_db_session),
) -> RevokeToolResponse:
"""
Revoke a tool from a user.
Only accessible to system administrators.
"""
try:
# Check SYSADMIN permission
if currentUser.privilege != UserPrivilege.SYSADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only system administrators can revoke tools",
)
# Revoke the tool
await chat_service.revoke_tool_from_user(
user_id=revoke_request.user_id,
tool_id=revoke_request.tool_id,
session=session,
)
logger.info(
f"User {currentUser.id} revoked tool {revoke_request.tool_id} from user {revoke_request.user_id}"
)
return RevokeToolResponse(
message=f"Tool successfully revoked from user {revoke_request.user_id}",
user_id=revoke_request.user_id,
tool_id=revoke_request.tool_id,
)
except ValueError as e:
logger.error(f"Validation error: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e) or "Invalid request",
)
except HTTPException:
raise
except Exception as e:
logger.error(
f"Error revoking tool: {type(e).__name__}: {str(e)}", exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to revoke tool: {type(e).__name__}: {str(e) or 'No error message provided'}",
)
@router.patch("/tools/{tool_id}", response_model=UpdateToolResponse)
@limiter.limit("10/minute")
async def update_tool(
*,
request: Request,
tool_id: str,
update_request: UpdateToolRequest,
currentUser: User = Depends(getCurrentUser),
session: AsyncSession = Depends(get_async_db_session),
) -> UpdateToolResponse:
"""
Update a tool's label and/or description.
Only accessible to system administrators.
"""
try:
# Check SYSADMIN permission
if currentUser.privilege != UserPrivilege.SYSADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only system administrators can update tools",
)
# Update the tool
updated_fields = await chat_service.update_tool(
tool_id=tool_id,
label=update_request.label,
description=update_request.description,
session=session,
)
logger.info(
f"User {currentUser.id} updated tool {tool_id}, fields: {updated_fields}"
)
return UpdateToolResponse(
message="Tool successfully updated",
tool_id=tool_id,
updated_fields=updated_fields,
)
except ValueError as e:
logger.error(f"Validation error: {str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e) or "Invalid request",
)
except HTTPException:
raise
except Exception as e:
logger.error(
f"Error updating tool: {type(e).__name__}: {str(e)}", exc_info=True
)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update tool: {type(e).__name__}: {str(e) or 'No error message provided'}",
)