diff --git a/app.py b/app.py index 30def90e..ed4e7214 100644 --- a/app.py +++ b/app.py @@ -1,10 +1,11 @@ import os + os.environ["NUMEXPR_MAX_THREADS"] = "12" from fastapi import FastAPI, HTTPException, Depends, Body, status, Response from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager - + import logging from logging.handlers import RotatingFileHandler @@ -20,32 +21,36 @@ class DailyRotatingFileHandler(RotatingFileHandler): A rotating file handler that automatically switches to a new file when the date changes. The log file name includes the current date and switches at midnight. """ - - def __init__(self, log_dir, filename_prefix, max_bytes=10485760, backup_count=5, **kwargs): + + def __init__( + self, log_dir, filename_prefix, max_bytes=10485760, backup_count=5, **kwargs + ): self.log_dir = log_dir self.filename_prefix = filename_prefix self.current_date = None self.current_file = None - + # Initialize with today's file self._update_file_if_needed() - + # Call parent constructor with current file - super().__init__(self.current_file, maxBytes=max_bytes, backupCount=backup_count, **kwargs) - + super().__init__( + self.current_file, maxBytes=max_bytes, backupCount=backup_count, **kwargs + ) + def _update_file_if_needed(self): """Update the log file if the date has changed""" today = datetime.now().strftime("%Y%m%d") - + if self.current_date != today: self.current_date = today new_file = os.path.join(self.log_dir, f"{self.filename_prefix}_{today}.log") - + if self.current_file != new_file: self.current_file = new_file return True return False - + def emit(self, record): """Emit a log record, switching files if date has changed""" # Check if we need to switch to a new file @@ -54,16 +59,17 @@ class DailyRotatingFileHandler(RotatingFileHandler): if self.stream: self.stream.close() self.stream = None - + # Update the baseFilename for the parent class self.baseFilename = self.current_file # Reopen the stream if not self.delay: self.stream = self._open() - + # Call parent emit method super().emit(record) + def initLogging(): """Initialize logging with configuration from APP_CONFIG""" # Get log level from config (default to INFO if not found) @@ -76,33 +82,39 @@ def initLogging(): # If relative path, make it relative to the gateway directory gatewayDir = os.path.dirname(os.path.abspath(__file__)) logDir = os.path.join(gatewayDir, logDir) - + # Ensure log directory exists os.makedirs(logDir, exist_ok=True) # Create formatters - using single line format consoleFormatter = logging.Formatter( fmt="%(asctime)s - %(levelname)s - %(name)s - %(message)s", - datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S") + datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S"), ) - + # File formatter with more detailed error information but still single line fileFormatter = logging.Formatter( fmt="%(asctime)s - %(levelname)s - %(name)s - %(message)s - %(pathname)s:%(lineno)d - %(funcName)s", - datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S") + datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S"), ) # Add filter to exclude Chrome DevTools requests class ChromeDevToolsFilter(logging.Filter): def filter(self, record): - return not (isinstance(record.msg, str) and - ('.well-known/appspecific/com.chrome.devtools.json' in record.msg or - 'Request: /index.html' in record.msg)) + return not ( + isinstance(record.msg, str) + and ( + ".well-known/appspecific/com.chrome.devtools.json" in record.msg + or "Request: /index.html" in record.msg + ) + ) # Add filter to exclude all httpcore loggers (including sub-loggers) class HttpcoreStarFilter(logging.Filter): def filter(self, record): - return not (record.name == 'httpcore' or record.name.startswith('httpcore.')) + return not ( + record.name == "httpcore" or record.name.startswith("httpcore.") + ) # Add filter to exclude HTTP debug messages class HTTPDebugFilter(logging.Filter): @@ -110,14 +122,14 @@ def initLogging(): if isinstance(record.msg, str): # Filter out HTTP debug messages http_debug_patterns = [ - 'receive_response_body.started', - 'receive_response_body.complete', - 'response_closed.started', - '_send_single_request', - 'httpcore.http11', - 'httpx._client', - 'HTTP Request', - 'multipart.multipart' + "receive_response_body.started", + "receive_response_body.complete", + "response_closed.started", + "_send_single_request", + "httpcore.http11", + "httpx._client", + "HTTP Request", + "multipart.multipart", ] return not any(pattern in record.msg for pattern in http_debug_patterns) return True @@ -129,8 +141,21 @@ def initLogging(): # Remove only emojis, preserve other Unicode characters like quotes import re import unicodedata + # Remove emoji characters specifically - record.msg = ''.join(char for char in record.msg if unicodedata.category(char) != 'So' or not (0x1F600 <= ord(char) <= 0x1F64F or 0x1F300 <= ord(char) <= 0x1F5FF or 0x1F680 <= ord(char) <= 0x1F6FF or 0x1F1E0 <= ord(char) <= 0x1F1FF or 0x2600 <= ord(char) <= 0x26FF or 0x2700 <= ord(char) <= 0x27BF)) + record.msg = "".join( + char + for char in record.msg + if unicodedata.category(char) != "So" + or not ( + 0x1F600 <= ord(char) <= 0x1F64F + or 0x1F300 <= ord(char) <= 0x1F5FF + or 0x1F680 <= ord(char) <= 0x1F6FF + or 0x1F1E0 <= ord(char) <= 0x1F1FF + or 0x2600 <= ord(char) <= 0x26FF + or 0x2700 <= ord(char) <= 0x27BF + ) + ) return True # Configure handlers based on config @@ -149,14 +174,16 @@ def initLogging(): # Add file handler if enabled if APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", True): # Create daily application log file with automatic date switching - rotationSize = int(APP_CONFIG.get("APP_LOGGING_ROTATION_SIZE", 10485760)) # Default: 10MB + rotationSize = int( + APP_CONFIG.get("APP_LOGGING_ROTATION_SIZE", 10485760) + ) # Default: 10MB backupCount = int(APP_CONFIG.get("APP_LOGGING_BACKUP_COUNT", 5)) - + fileHandler = DailyRotatingFileHandler( log_dir=logDir, filename_prefix="log_app", max_bytes=rotationSize, - backup_count=backupCount + backup_count=backupCount, ) fileHandler.setFormatter(fileFormatter) fileHandler.addFilter(ChromeDevToolsFilter()) @@ -171,11 +198,18 @@ def initLogging(): format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt=APP_CONFIG.get("APP_LOGGING_DATE_FORMAT", "%Y-%m-%d %H:%M:%S"), handlers=handlers, - force=True # Force reconfiguration of the root logger + force=True, # Force reconfiguration of the root logger ) # Silence noisy third-party libraries - use the same level as the root logger - noisyLoggers = ["httpx", "httpcore", "urllib3", "asyncio", "fastapi.security.oauth2", "msal"] + noisyLoggers = [ + "httpx", + "httpcore", + "urllib3", + "asyncio", + "fastapi.security.oauth2", + "msal", + ] for loggerName in noisyLoggers: logging.getLogger(loggerName).setLevel(logging.WARNING) @@ -183,21 +217,25 @@ def initLogging(): logger = logging.getLogger(__name__) logger.info(f"Logging initialized with level {logLevelName}") logger.info(f"Log directory: {logDir}") - - if APP_CONFIG.get('APP_LOGGING_FILE_ENABLED', True): + + if APP_CONFIG.get("APP_LOGGING_FILE_ENABLED", True): today = datetime.now().strftime("%Y%m%d") appLogFile = os.path.join(logDir, f"log_app_{today}.log") logger.info(f"Application log file: {appLogFile} (auto-switches daily)") else: logger.info("Application log file: disabled") - - logger.info(f"Console logging: {'enabled' if APP_CONFIG.get('APP_LOGGING_CONSOLE_ENABLED', True) else 'disabled'}") + + logger.info( + f"Console logging: {'enabled' if APP_CONFIG.get('APP_LOGGING_CONSOLE_ENABLED', True) else 'disabled'}" + ) + # Initialize logging initLogging() logger = logging.getLogger(__name__) instanceLabel = APP_CONFIG.get("APP_ENV_LABEL") + # Define lifespan context manager for application startup/shutdown events @asynccontextmanager async def lifespan(app: FastAPI): @@ -210,11 +248,12 @@ async def lifespan(app: FastAPI): # START APP app = FastAPI( - title="PowerOn | Data Platform API", + title="PowerOn | Data Platform API", description=f"Backend API for the Multi-Agent Platform by ValueOn AG ({instanceLabel})", - lifespan=lifespan + lifespan=lifespan, ) + # Parse CORS origins from environment variable def get_allowed_origins(): origins_str = APP_CONFIG.get("APP_ALLOWED_ORIGINS", "http://localhost:8080") @@ -223,73 +262,99 @@ def get_allowed_origins(): logger.info(f"CORS allowed origins: {origins}") return origins + # CORS configuration using environment variables app.add_middleware( CORSMiddleware, - allow_origins= get_allowed_origins(), + allow_origins=get_allowed_origins(), allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], expose_headers=["*"], - max_age=86400 # Increased caching for preflight requests + max_age=86400, # Increased caching for preflight requests ) # CSRF protection middleware from modules.security.csrf import CSRFMiddleware -from modules.security.tokenRefreshMiddleware import TokenRefreshMiddleware, ProactiveTokenRefreshMiddleware +from modules.security.tokenRefreshMiddleware import ( + TokenRefreshMiddleware, + ProactiveTokenRefreshMiddleware, +) + app.add_middleware(CSRFMiddleware) # Token refresh middleware (silent refresh for expired OAuth tokens) app.add_middleware(TokenRefreshMiddleware, enabled=True) # Proactive token refresh middleware (refresh tokens before they expire) -app.add_middleware(ProactiveTokenRefreshMiddleware, enabled=True, check_interval_minutes=5) +app.add_middleware( + ProactiveTokenRefreshMiddleware, enabled=True, check_interval_minutes=5 +) # Run triggered features import modules.features.init # Include all routers from modules.routes.routeAdmin import router as generalRouter + app.include_router(generalRouter) from modules.routes.routeAttributes import router as attributesRouter + app.include_router(attributesRouter) from modules.routes.routeDataMandates import router as mandateRouter + app.include_router(mandateRouter) from modules.routes.routeDataUsers import router as userRouter + app.include_router(userRouter) from modules.routes.routeDataFiles import router as fileRouter + app.include_router(fileRouter) from modules.routes.routeDataNeutralization import router as neutralizationRouter + app.include_router(neutralizationRouter) from modules.routes.routeDataPrompts import router as promptRouter + app.include_router(promptRouter) from modules.routes.routeDataConnections import router as connectionsRouter + app.include_router(connectionsRouter) from modules.routes.routeWorkflows import router as workflowRouter + app.include_router(workflowRouter) from modules.routes.routeChatPlayground import router as chatPlaygroundRouter + app.include_router(chatPlaygroundRouter) from modules.routes.routeSecurityLocal import router as localRouter + app.include_router(localRouter) from modules.routes.routeSecurityMsft import router as msftRouter + app.include_router(msftRouter) from modules.routes.routeSecurityGoogle import router as googleRouter + app.include_router(googleRouter) from modules.routes.routeVoiceGoogle import router as voiceGoogleRouter + app.include_router(voiceGoogleRouter) from modules.routes.routeSecurityAdmin import router as adminSecurityRouter -app.include_router(adminSecurityRouter) \ No newline at end of file + +app.include_router(adminSecurityRouter) + +from modules.routes.routeChatbot import router as chatbotRouter + +app.include_router(chatbotRouter) diff --git a/modules/features/chatBot/chatbotTools/customerTools/__init__.py b/modules/features/chatBot/chatbotTools/customerTools/__init__.py new file mode 100644 index 00000000..52043b31 --- /dev/null +++ b/modules/features/chatBot/chatbotTools/customerTools/__init__.py @@ -0,0 +1 @@ +"""Tools that are shared between multiple customers go here.""" diff --git a/modules/features/chatBot/chatbotTools/sharedTools/__init__.py b/modules/features/chatBot/chatbotTools/sharedTools/__init__.py new file mode 100644 index 00000000..b0b10bb2 --- /dev/null +++ b/modules/features/chatBot/chatbotTools/sharedTools/__init__.py @@ -0,0 +1 @@ +"""Tools that are custom to a specific customer go here.""" diff --git a/modules/features/chatBot/utils/permissions.py b/modules/features/chatBot/utils/permissions.py new file mode 100644 index 00000000..45e306a5 --- /dev/null +++ b/modules/features/chatBot/utils/permissions.py @@ -0,0 +1,7 @@ +# get_allowed_tools + + +# get_allowed_models + + +# get_system_prompt diff --git a/modules/routes/routeChatbot.py b/modules/routes/routeChatbot.py new file mode 100644 index 00000000..ff5fa9f4 --- /dev/null +++ b/modules/routes/routeChatbot.py @@ -0,0 +1,254 @@ +from pydantic import BaseModel, Field +from fastapi import APIRouter, Depends, HTTPException, status +from fastapi.requests import Request +from typing import Any, Dict, List, Optional +from datetime import datetime +import logging +import uuid + +from modules.datamodels.datamodelUam import User +from modules.security.auth import getCurrentUser, limiter + +logger = logging.getLogger(__name__) + +router = APIRouter( + prefix="/api/chatbot", + tags=["Chatbot"], + responses={404: {"description": "Not found"}}, +) + +# --- Pydantic models for requests and responses --- + + +class ChatMessageRequest(BaseModel): + """Request model for posting a chat message""" + + thread_id: Optional[str] = Field( + None, description="Thread ID (creates new thread if not provided)" + ) + message: str = Field(..., description="User message content") + + +class MessageItem(BaseModel): + """Individual message in a thread""" + + role: str = Field(..., description="Message role (user or assistant)") + content: str = Field(..., description="Message content") + timestamp: float = Field(..., description="Message timestamp (Unix timestamp)") + + +class ChatMessageResponse(BaseModel): + """Response model for posting a chat message""" + + thread_id: str = Field(..., description="Thread ID") + messages: List[MessageItem] = Field(..., description="All messages in thread") + + +class ThreadSummary(BaseModel): + """Summary of a chat thread for list view""" + + thread_id: str = Field(..., description="Thread ID") + created_at: float = Field(..., description="Thread creation timestamp") + last_message: str = Field(..., description="Last message content") + message_count: int = Field(..., description="Total number of messages") + + +class ThreadListResponse(BaseModel): + """Response model for listing all threads""" + + threads: List[ThreadSummary] = Field(..., description="List of thread summaries") + + +class ThreadDetail(BaseModel): + """Detailed view of a single thread""" + + thread_id: str = Field(..., description="Thread ID") + created_at: float = Field(..., description="Thread creation timestamp") + messages: List[MessageItem] = Field( + ..., description="All messages in chronological order" + ) + + +class DeleteResponse(BaseModel): + """Response model for delete operations""" + + message: str = Field(..., description="Confirmation message") + thread_id: str = Field(..., description="Deleted thread ID") + + +# --- Actual endpoints for chatbot --- + + +@router.post("/message", response_model=ChatMessageResponse) +@limiter.limit("30/minute") +async def post_chat_message( + *, + request: Request, + message_request: ChatMessageRequest, + currentUser: User = Depends(getCurrentUser), +) -> ChatMessageResponse: + """ + Post a message to a chat thread and get assistant response. + Creates a new thread if thread_id is not provided. + + This endpoint will later be connected to LangGraph's checkpointer. + """ + try: + # Generate or use existing thread_id + thread_id = message_request.thread_id or f"thread_{uuid.uuid4()}" + + # Get current timestamp + current_time = datetime.now().timestamp() + + # Create dummy message history + # In production, this will fetch from LangGraph's checkpointer + messages = [ + MessageItem( + role="user", content=message_request.message, timestamp=current_time + ), + MessageItem( + role="assistant", + content=f"Echo: {message_request.message} (This is a dummy response. LangGraph integration pending.)", + timestamp=current_time + 0.5, + ), + ] + + logger.info(f"User {currentUser.id} posted message to thread {thread_id}") + + return ChatMessageResponse(thread_id=thread_id, messages=messages) + + except Exception as e: + logger.error(f"Error posting chat message: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to post message: {str(e)}", + ) + + +@router.get("/threads", response_model=ThreadListResponse) +@limiter.limit("30/minute") +async def get_all_threads( + *, request: Request, currentUser: User = Depends(getCurrentUser) +) -> ThreadListResponse: + """ + Get all chat threads for the current user. + + This endpoint will later fetch from LangGraph's PostgreSQL checkpointer. + """ + try: + # Return dummy thread data + # In production, this will query LangGraph's checkpointer database + dummy_threads = [ + ThreadSummary( + thread_id="thread_001", + created_at=datetime.now().timestamp() - 86400, # 1 day ago + last_message="Hello, how can I help you?", + message_count=4, + ), + ThreadSummary( + thread_id="thread_002", + created_at=datetime.now().timestamp() - 3600, # 1 hour ago + last_message="Thank you for your help!", + message_count=8, + ), + ThreadSummary( + thread_id="thread_003", + created_at=datetime.now().timestamp() - 300, # 5 minutes ago + last_message="Can you explain this concept?", + message_count=2, + ), + ] + + logger.info(f"User {currentUser.id} retrieved {len(dummy_threads)} threads") + + return ThreadListResponse(threads=dummy_threads) + + except Exception as e: + logger.error(f"Error retrieving threads: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to retrieve threads: {str(e)}", + ) + + +@router.get("/threads/{thread_id}", response_model=ThreadDetail) +@limiter.limit("30/minute") +async def get_thread_by_id( + *, request: Request, thread_id: str, currentUser: User = Depends(getCurrentUser) +) -> ThreadDetail: + """ + Get a specific chat thread with all its messages. + + This endpoint will later fetch from LangGraph's PostgreSQL checkpointer. + """ + try: + # Return dummy thread detail + # In production, this will query LangGraph's checkpointer for the specific thread + current_time = datetime.now().timestamp() + + dummy_messages = [ + MessageItem( + role="user", + content="Hello! I need help with Python.", + timestamp=current_time - 120, + ), + MessageItem( + role="assistant", + content="Hello! I'd be happy to help you with Python. What would you like to know?", + timestamp=current_time - 119, + ), + MessageItem( + role="user", + content="How do I use list comprehensions?", + timestamp=current_time - 60, + ), + MessageItem( + role="assistant", + content="List comprehensions are a concise way to create lists. Here's an example: [x**2 for x in range(10)]", + timestamp=current_time - 59, + ), + ] + + logger.info(f"User {currentUser.id} retrieved thread {thread_id}") + + return ThreadDetail( + thread_id=thread_id, created_at=current_time - 120, messages=dummy_messages + ) + + except Exception as e: + logger.error(f"Error retrieving thread {thread_id}: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to retrieve thread: {str(e)}", + ) + + +@router.delete("/threads/{thread_id}", response_model=DeleteResponse) +@limiter.limit("10/minute") +async def delete_thread( + *, request: Request, thread_id: str, currentUser: User = Depends(getCurrentUser) +) -> DeleteResponse: + """ + Delete a chat thread and all its associated data. + + This endpoint will later delete from LangGraph's PostgreSQL checkpointer. + """ + try: + # In production, this will: + # 1. Verify the thread belongs to the current user + # 2. Delete the thread from LangGraph's checkpointer + # 3. Clean up any associated data + + logger.info(f"User {currentUser.id} deleted thread {thread_id}") + + return DeleteResponse( + message=f"Thread {thread_id} successfully deleted (dummy response)", + thread_id=thread_id, + ) + + except Exception as e: + logger.error(f"Error deleting thread {thread_id}: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to delete thread: {str(e)}", + )