# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Shared configuration, models, helpers, and auth for the Private-LLM service.""" import os import base64 import json import re import logging import time import uuid from collections import defaultdict from typing import Optional, List, Dict, Any from fastapi import HTTPException, Header, Depends from pydantic import BaseModel, Field # PDF Support try: import fitz # PyMuPDF PDF_SUPPORT = True except ImportError: PDF_SUPPORT = False logger = logging.getLogger(__name__) # ============================================================================ # Configuration # ============================================================================ def _loadConfig() -> Dict[str, Any]: """Load configuration from config.ini file.""" configPath = os.path.join(os.path.dirname(__file__), "config.ini") config = { "apiKey": None, "cursorApiKey": None, "ollamaUrl": "http://localhost:11434", "authUsername": "poweron", "authPassword": "poweron", "secretKey": "poweron-secret-key-change-in-production", "rateLimitRequestsPerMinute": 60, "rateLimitBurstSize": 10, } if os.path.exists(configPath): try: with open(configPath, "r") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" in line: key, value = line.split("=", 1) key = key.strip() value = value.strip() if key == "PRIVATE_LLM_API_KEY": config["apiKey"] = value elif key == "CURSOR_API_KEY": config["cursorApiKey"] = value elif key == "OLLAMA_URL": config["ollamaUrl"] = value elif key == "AUTH_USERNAME": config["authUsername"] = value elif key == "AUTH_PASSWORD": config["authPassword"] = value elif key == "SECRET_KEY": config["secretKey"] = value elif key == "RATE_LIMIT_REQUESTS_PER_MINUTE": config["rateLimitRequestsPerMinute"] = int(value) elif key == "RATE_LIMIT_BURST_SIZE": config["rateLimitBurstSize"] = int(value) except Exception as e: logger.warning(f"Error loading config.ini: {e}") config["apiKey"] = os.environ.get("PRIVATE_LLM_API_KEY", config["apiKey"]) config["cursorApiKey"] = os.environ.get("CURSOR_API_KEY", config["cursorApiKey"]) config["ollamaUrl"] = os.environ.get("OLLAMA_URL", config["ollamaUrl"]) config["authUsername"] = os.environ.get("AUTH_USERNAME", config["authUsername"]) config["authPassword"] = os.environ.get("AUTH_PASSWORD", config["authPassword"]) config["secretKey"] = os.environ.get("SECRET_KEY", config["secretKey"]) config["rateLimitRequestsPerMinute"] = int(os.environ.get("RATE_LIMIT_REQUESTS_PER_MINUTE", config["rateLimitRequestsPerMinute"])) config["rateLimitBurstSize"] = int(os.environ.get("RATE_LIMIT_BURST_SIZE", config["rateLimitBurstSize"])) return config CONFIG = _loadConfig() # ============================================================================ # Rate Limiting (Token Bucket Algorithm) # ============================================================================ class RateLimiter: """Token bucket rate limiter with per-API-key tracking.""" def __init__(self, requestsPerMinute: int = 60, burstSize: int = 10): self.requestsPerMinute = requestsPerMinute self.burstSize = burstSize self.tokensPerSecond = requestsPerMinute / 60.0 self._buckets: Dict[str, Dict[str, float]] = defaultdict( lambda: {"tokens": burstSize, "lastUpdate": time.time()} ) def _refillTokens(self, bucket: Dict[str, float]) -> None: now = time.time() elapsed = now - bucket["lastUpdate"] bucket["tokens"] = min( self.burstSize, bucket["tokens"] + elapsed * self.tokensPerSecond ) bucket["lastUpdate"] = now def isAllowed(self, apiKey: str) -> tuple[bool, Dict[str, Any]]: bucket = self._buckets[apiKey] self._refillTokens(bucket) if bucket["tokens"] >= 1.0: bucket["tokens"] -= 1.0 return True, { "remaining": int(bucket["tokens"]), "limit": self.requestsPerMinute, "resetSeconds": 60 } else: retryAfter = (1.0 - bucket["tokens"]) / self.tokensPerSecond return False, { "remaining": 0, "limit": self.requestsPerMinute, "retryAfter": round(retryAfter, 1), "resetSeconds": 60 } def cleanup(self, maxAgeSeconds: int = 3600) -> int: now = time.time() staleKeys = [ key for key, bucket in self._buckets.items() if now - bucket["lastUpdate"] > maxAgeSeconds ] for key in staleKeys: del self._buckets[key] return len(staleKeys) rateLimiter = RateLimiter( requestsPerMinute=CONFIG["rateLimitRequestsPerMinute"], burstSize=CONFIG["rateLimitBurstSize"] ) # ============================================================================ # Model Mapping # ============================================================================ MODEL_MAPPING = { "poweron-text-general": "qwen2.5:7b", "poweron-vision-general": "qwen2.5vl:7b", "poweron-vision-deep": "granite3.2-vision", } INTERNAL_TO_EXTERNAL = {v: k for k, v in MODEL_MAPPING.items()} # ============================================================================ # Request/Response Models # ============================================================================ class AnalyzeRequest(BaseModel): imageBase64: Optional[str] = Field(default=None, description="Base64 encoded image") prompt: str = Field(description="Analysis prompt") modelName: str = Field(default="poweron-vision-general", description="Model to use") class AnalyzeResponse(BaseModel): success: bool = Field(description="Whether the analysis was successful") data: Optional[Dict[str, Any]] = Field(default=None, description="Extracted data") rawResponse: Optional[str] = Field(default=None, description="Raw model response") error: Optional[str] = Field(default=None, description="Error message if failed") class PdfExtractRequest(BaseModel): pdfBase64: str = Field(description="Base64 encoded PDF") page: Optional[int] = Field(default=None, description="Specific page number (1-indexed)") class ModelInfo(BaseModel): name: str = Field(description="External model name") internalName: str = Field(description="Internal Ollama model name") isVision: bool = Field(description="Whether it's a vision model") pricePerCall: float = Field(description="Price per call in CHF") class HealthResponse(BaseModel): status: str service: str pdfSupport: bool ollamaConnected: bool class OllamaStatusResponse(BaseModel): connected: bool models: Optional[List[str]] = None visionModels: Optional[List[str]] = None totalModels: Optional[int] = None error: Optional[str] = None class OpenAiModelInfo(BaseModel): id: str object: str = "model" created: int ownedBy: str = Field(default="poweron", alias="owned_by") class OpenAiModelsResponse(BaseModel): object: str = "list" data: List[OpenAiModelInfo] class OpenAiChatMessage(BaseModel): role: str content: Any class OpenAiChatCompletionRequest(BaseModel): model: str messages: List[OpenAiChatMessage] stream: Optional[bool] = False maxTokens: Optional[int] = Field(default=None, alias="max_tokens") temperature: Optional[float] = None class OpenAiChatCompletionChoice(BaseModel): index: int message: OpenAiChatMessage finishReason: str = Field(default="stop", alias="finish_reason") class OpenAiChatCompletionUsage(BaseModel): promptTokens: int = Field(default=0, alias="prompt_tokens") completionTokens: int = Field(default=0, alias="completion_tokens") totalTokens: int = Field(default=0, alias="total_tokens") class OpenAiChatCompletionResponse(BaseModel): id: str object: str = "chat.completion" created: int model: str choices: List[OpenAiChatCompletionChoice] usage: OpenAiChatCompletionUsage # ============================================================================ # Helper Functions # ============================================================================ def _isVisionModel(modelName: str) -> bool: if not modelName: return False modelLower = modelName.lower() visionIndicators = ["vision", "vl", "llava", "bakllava", "granite"] return any(indicator in modelLower for indicator in visionIndicators) def _getInternalModelName(externalName: str) -> str: return MODEL_MAPPING.get(externalName, externalName) def _getExternalModelName(internalName: str) -> str: return INTERNAL_TO_EXTERNAL.get(internalName, internalName) def _contentToText(content: Any) -> str: """Normalize OpenAI message content into plain text.""" if content is None: return "" if isinstance(content, str): return content if isinstance(content, list): textParts = [] for part in content: if isinstance(part, str): textParts.append(part) continue if isinstance(part, dict): partText = part.get("text") if isinstance(partText, str): textParts.append(partText) return "\n".join([part for part in textParts if part.strip()]) if isinstance(content, dict): contentText = content.get("text") if isinstance(contentText, str): return contentText return str(content) def _messagesToPrompt(messages: List[OpenAiChatMessage]) -> str: """Convert OpenAI chat messages to a single prompt for Ollama generate.""" promptLines = [] for message in messages: normalizedText = _contentToText(message.content).strip() if not normalizedText: continue promptLines.append(f"{message.role}: {normalizedText}") if not promptLines: return "" promptLines.append("assistant:") return "\n\n".join(promptLines) # ============================================================================ # PDF Helper Functions # ============================================================================ def _extractImagesFromPdf(pdfBytes: bytes, maxPages: int = 5) -> List[Dict[str, Any]]: if not PDF_SUPPORT: raise Exception("PDF-Support nicht verfügbar. Bitte PyMuPDF installieren.") images = [] doc = fitz.open(stream=pdfBytes, filetype="pdf") numPages = min(len(doc), maxPages) for pageNum in range(numPages): page = doc[pageNum] mat = fitz.Matrix(2.0, 2.0) pix = page.get_pixmap(matrix=mat) imgBytes = pix.tobytes("png") imgBase64 = base64.b64encode(imgBytes).decode("utf-8") images.append({ "page": pageNum + 1, "base64": imgBase64, "width": pix.width, "height": pix.height }) doc.close() return images def _renderPdfPageAsImage(pdfBytes: bytes, pageNum: int = 0, zoom: float = 2.0) -> Dict[str, Any]: if not PDF_SUPPORT: raise Exception("PDF-Support nicht verfügbar.") doc = fitz.open(stream=pdfBytes, filetype="pdf") if pageNum >= len(doc): pageNum = len(doc) - 1 page = doc[pageNum] mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat) imgBytes = pix.tobytes("png") imgBase64 = base64.b64encode(imgBytes).decode("utf-8") result = { "base64": imgBase64, "width": pix.width, "height": pix.height, "page": pageNum + 1, "totalPages": len(doc) } doc.close() return result # ============================================================================ # Authentication Dependencies # ============================================================================ async def _verifyApiKey(xApiKey: Optional[str] = Header(None, alias="X-API-Key")) -> str: """Verify the API key from header and return it for rate limiting.""" if not CONFIG["apiKey"]: logger.warning("No API key configured - running in development mode") return "dev-mode" if not xApiKey: raise HTTPException(status_code=401, detail="API key required") if xApiKey != CONFIG["apiKey"]: raise HTTPException(status_code=401, detail="Invalid API key") return xApiKey async def _verifyCursorApiKey(authorization: Optional[str] = Header(None)) -> str: """Verify Bearer token for Cursor OpenAI-compatible endpoints.""" expectedApiKey = CONFIG.get("cursorApiKey") if not expectedApiKey: raise HTTPException( status_code=503, detail="Cursor API key not configured on server" ) if not authorization: raise HTTPException(status_code=401, detail="Authorization header required") if not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Bearer token required") providedApiKey = authorization[len("Bearer "):].strip() if providedApiKey != expectedApiKey: raise HTTPException(status_code=401, detail="Invalid API key") return providedApiKey async def _checkRateLimit(apiKey: str = Depends(_verifyApiKey)) -> str: """Check rate limit for the authenticated API key.""" allowed, info = rateLimiter.isAllowed(apiKey) if not allowed: raise HTTPException( status_code=429, detail={ "error": "Rate limit exceeded", "message": f"Too many requests. Please retry after {info['retryAfter']} seconds.", "retryAfter": info["retryAfter"], "limit": info["limit"], "remaining": info["remaining"] }, headers={ "Retry-After": str(int(info["retryAfter"])), "X-RateLimit-Limit": str(info["limit"]), "X-RateLimit-Remaining": str(info["remaining"]), "X-RateLimit-Reset": str(info["resetSeconds"]) } ) return apiKey