service-llm-private/config.py
ValueOn AG 1f5d8e923b Refactor: extract routes and config from app.py into separate modules
Move all API routes, OpenAI-compatible routes, web UI routes, shared config, models, rate limiter, and auth logic into dedicated files (config.py, routeApi.py, routeOpenAi.py, routeWeb.py). app.py now serves as a clean entry point.

Made-with: Cursor
2026-03-30 14:49:35 +02:00

435 lines
14 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Shared configuration, models, helpers, and auth for the Private-LLM service."""
import os
import base64
import json
import re
import logging
import time
import uuid
from collections import defaultdict
from typing import Optional, List, Dict, Any
from fastapi import HTTPException, Header, Depends
from pydantic import BaseModel, Field
# PDF Support
try:
import fitz # PyMuPDF
PDF_SUPPORT = True
except ImportError:
PDF_SUPPORT = False
logger = logging.getLogger(__name__)
# ============================================================================
# Configuration
# ============================================================================
def _loadConfig() -> Dict[str, Any]:
"""Load configuration from config.ini file."""
configPath = os.path.join(os.path.dirname(__file__), "config.ini")
config = {
"apiKey": None,
"cursorApiKey": None,
"ollamaUrl": "http://localhost:11434",
"authUsername": "poweron",
"authPassword": "poweron",
"secretKey": "poweron-secret-key-change-in-production",
"rateLimitRequestsPerMinute": 60,
"rateLimitBurstSize": 10,
}
if os.path.exists(configPath):
try:
with open(configPath, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
if key == "PRIVATE_LLM_API_KEY":
config["apiKey"] = value
elif key == "CURSOR_API_KEY":
config["cursorApiKey"] = value
elif key == "OLLAMA_URL":
config["ollamaUrl"] = value
elif key == "AUTH_USERNAME":
config["authUsername"] = value
elif key == "AUTH_PASSWORD":
config["authPassword"] = value
elif key == "SECRET_KEY":
config["secretKey"] = value
elif key == "RATE_LIMIT_REQUESTS_PER_MINUTE":
config["rateLimitRequestsPerMinute"] = int(value)
elif key == "RATE_LIMIT_BURST_SIZE":
config["rateLimitBurstSize"] = int(value)
except Exception as e:
logger.warning(f"Error loading config.ini: {e}")
config["apiKey"] = os.environ.get("PRIVATE_LLM_API_KEY", config["apiKey"])
config["cursorApiKey"] = os.environ.get("CURSOR_API_KEY", config["cursorApiKey"])
config["ollamaUrl"] = os.environ.get("OLLAMA_URL", config["ollamaUrl"])
config["authUsername"] = os.environ.get("AUTH_USERNAME", config["authUsername"])
config["authPassword"] = os.environ.get("AUTH_PASSWORD", config["authPassword"])
config["secretKey"] = os.environ.get("SECRET_KEY", config["secretKey"])
config["rateLimitRequestsPerMinute"] = int(os.environ.get("RATE_LIMIT_REQUESTS_PER_MINUTE", config["rateLimitRequestsPerMinute"]))
config["rateLimitBurstSize"] = int(os.environ.get("RATE_LIMIT_BURST_SIZE", config["rateLimitBurstSize"]))
return config
CONFIG = _loadConfig()
# ============================================================================
# Rate Limiting (Token Bucket Algorithm)
# ============================================================================
class RateLimiter:
"""Token bucket rate limiter with per-API-key tracking."""
def __init__(self, requestsPerMinute: int = 60, burstSize: int = 10):
self.requestsPerMinute = requestsPerMinute
self.burstSize = burstSize
self.tokensPerSecond = requestsPerMinute / 60.0
self._buckets: Dict[str, Dict[str, float]] = defaultdict(
lambda: {"tokens": burstSize, "lastUpdate": time.time()}
)
def _refillTokens(self, bucket: Dict[str, float]) -> None:
now = time.time()
elapsed = now - bucket["lastUpdate"]
bucket["tokens"] = min(
self.burstSize,
bucket["tokens"] + elapsed * self.tokensPerSecond
)
bucket["lastUpdate"] = now
def isAllowed(self, apiKey: str) -> tuple[bool, Dict[str, Any]]:
bucket = self._buckets[apiKey]
self._refillTokens(bucket)
if bucket["tokens"] >= 1.0:
bucket["tokens"] -= 1.0
return True, {
"remaining": int(bucket["tokens"]),
"limit": self.requestsPerMinute,
"resetSeconds": 60
}
else:
retryAfter = (1.0 - bucket["tokens"]) / self.tokensPerSecond
return False, {
"remaining": 0,
"limit": self.requestsPerMinute,
"retryAfter": round(retryAfter, 1),
"resetSeconds": 60
}
def cleanup(self, maxAgeSeconds: int = 3600) -> int:
now = time.time()
staleKeys = [
key for key, bucket in self._buckets.items()
if now - bucket["lastUpdate"] > maxAgeSeconds
]
for key in staleKeys:
del self._buckets[key]
return len(staleKeys)
rateLimiter = RateLimiter(
requestsPerMinute=CONFIG["rateLimitRequestsPerMinute"],
burstSize=CONFIG["rateLimitBurstSize"]
)
# ============================================================================
# Model Mapping
# ============================================================================
MODEL_MAPPING = {
"poweron-text-general": "qwen2.5:7b",
"poweron-vision-general": "qwen2.5vl:7b",
"poweron-vision-deep": "granite3.2-vision",
}
INTERNAL_TO_EXTERNAL = {v: k for k, v in MODEL_MAPPING.items()}
# ============================================================================
# Request/Response Models
# ============================================================================
class AnalyzeRequest(BaseModel):
imageBase64: Optional[str] = Field(default=None, description="Base64 encoded image")
prompt: str = Field(description="Analysis prompt")
modelName: str = Field(default="poweron-vision-general", description="Model to use")
class AnalyzeResponse(BaseModel):
success: bool = Field(description="Whether the analysis was successful")
data: Optional[Dict[str, Any]] = Field(default=None, description="Extracted data")
rawResponse: Optional[str] = Field(default=None, description="Raw model response")
error: Optional[str] = Field(default=None, description="Error message if failed")
class PdfExtractRequest(BaseModel):
pdfBase64: str = Field(description="Base64 encoded PDF")
page: Optional[int] = Field(default=None, description="Specific page number (1-indexed)")
class ModelInfo(BaseModel):
name: str = Field(description="External model name")
internalName: str = Field(description="Internal Ollama model name")
isVision: bool = Field(description="Whether it's a vision model")
pricePerCall: float = Field(description="Price per call in CHF")
class HealthResponse(BaseModel):
status: str
service: str
pdfSupport: bool
ollamaConnected: bool
class OllamaStatusResponse(BaseModel):
connected: bool
models: Optional[List[str]] = None
visionModels: Optional[List[str]] = None
totalModels: Optional[int] = None
error: Optional[str] = None
class OpenAiModelInfo(BaseModel):
id: str
object: str = "model"
created: int
ownedBy: str = Field(default="poweron", alias="owned_by")
class OpenAiModelsResponse(BaseModel):
object: str = "list"
data: List[OpenAiModelInfo]
class OpenAiChatMessage(BaseModel):
role: str
content: Any
class OpenAiChatCompletionRequest(BaseModel):
model: str
messages: List[OpenAiChatMessage]
stream: Optional[bool] = False
maxTokens: Optional[int] = Field(default=None, alias="max_tokens")
temperature: Optional[float] = None
class OpenAiChatCompletionChoice(BaseModel):
index: int
message: OpenAiChatMessage
finishReason: str = Field(default="stop", alias="finish_reason")
class OpenAiChatCompletionUsage(BaseModel):
promptTokens: int = Field(default=0, alias="prompt_tokens")
completionTokens: int = Field(default=0, alias="completion_tokens")
totalTokens: int = Field(default=0, alias="total_tokens")
class OpenAiChatCompletionResponse(BaseModel):
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[OpenAiChatCompletionChoice]
usage: OpenAiChatCompletionUsage
# ============================================================================
# Helper Functions
# ============================================================================
def _isVisionModel(modelName: str) -> bool:
if not modelName:
return False
modelLower = modelName.lower()
visionIndicators = ["vision", "vl", "llava", "bakllava", "granite"]
return any(indicator in modelLower for indicator in visionIndicators)
def _getInternalModelName(externalName: str) -> str:
return MODEL_MAPPING.get(externalName, externalName)
def _getExternalModelName(internalName: str) -> str:
return INTERNAL_TO_EXTERNAL.get(internalName, internalName)
def _contentToText(content: Any) -> str:
"""Normalize OpenAI message content into plain text."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
textParts = []
for part in content:
if isinstance(part, str):
textParts.append(part)
continue
if isinstance(part, dict):
partText = part.get("text")
if isinstance(partText, str):
textParts.append(partText)
return "\n".join([part for part in textParts if part.strip()])
if isinstance(content, dict):
contentText = content.get("text")
if isinstance(contentText, str):
return contentText
return str(content)
def _messagesToPrompt(messages: List[OpenAiChatMessage]) -> str:
"""Convert OpenAI chat messages to a single prompt for Ollama generate."""
promptLines = []
for message in messages:
normalizedText = _contentToText(message.content).strip()
if not normalizedText:
continue
promptLines.append(f"{message.role}: {normalizedText}")
if not promptLines:
return ""
promptLines.append("assistant:")
return "\n\n".join(promptLines)
# ============================================================================
# PDF Helper Functions
# ============================================================================
def _extractImagesFromPdf(pdfBytes: bytes, maxPages: int = 5) -> List[Dict[str, Any]]:
if not PDF_SUPPORT:
raise Exception("PDF-Support nicht verfügbar. Bitte PyMuPDF installieren.")
images = []
doc = fitz.open(stream=pdfBytes, filetype="pdf")
numPages = min(len(doc), maxPages)
for pageNum in range(numPages):
page = doc[pageNum]
mat = fitz.Matrix(2.0, 2.0)
pix = page.get_pixmap(matrix=mat)
imgBytes = pix.tobytes("png")
imgBase64 = base64.b64encode(imgBytes).decode("utf-8")
images.append({
"page": pageNum + 1,
"base64": imgBase64,
"width": pix.width,
"height": pix.height
})
doc.close()
return images
def _renderPdfPageAsImage(pdfBytes: bytes, pageNum: int = 0, zoom: float = 2.0) -> Dict[str, Any]:
if not PDF_SUPPORT:
raise Exception("PDF-Support nicht verfügbar.")
doc = fitz.open(stream=pdfBytes, filetype="pdf")
if pageNum >= len(doc):
pageNum = len(doc) - 1
page = doc[pageNum]
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
imgBytes = pix.tobytes("png")
imgBase64 = base64.b64encode(imgBytes).decode("utf-8")
result = {
"base64": imgBase64,
"width": pix.width,
"height": pix.height,
"page": pageNum + 1,
"totalPages": len(doc)
}
doc.close()
return result
# ============================================================================
# Authentication Dependencies
# ============================================================================
async def _verifyApiKey(xApiKey: Optional[str] = Header(None, alias="X-API-Key")) -> str:
"""Verify the API key from header and return it for rate limiting."""
if not CONFIG["apiKey"]:
logger.warning("No API key configured - running in development mode")
return "dev-mode"
if not xApiKey:
raise HTTPException(status_code=401, detail="API key required")
if xApiKey != CONFIG["apiKey"]:
raise HTTPException(status_code=401, detail="Invalid API key")
return xApiKey
async def _verifyCursorApiKey(authorization: Optional[str] = Header(None)) -> str:
"""Verify Bearer token for Cursor OpenAI-compatible endpoints."""
expectedApiKey = CONFIG.get("cursorApiKey")
if not expectedApiKey:
raise HTTPException(
status_code=503,
detail="Cursor API key not configured on server"
)
if not authorization:
raise HTTPException(status_code=401, detail="Authorization header required")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Bearer token required")
providedApiKey = authorization[len("Bearer "):].strip()
if providedApiKey != expectedApiKey:
raise HTTPException(status_code=401, detail="Invalid API key")
return providedApiKey
async def _checkRateLimit(apiKey: str = Depends(_verifyApiKey)) -> str:
"""Check rate limit for the authenticated API key."""
allowed, info = rateLimiter.isAllowed(apiKey)
if not allowed:
raise HTTPException(
status_code=429,
detail={
"error": "Rate limit exceeded",
"message": f"Too many requests. Please retry after {info['retryAfter']} seconds.",
"retryAfter": info["retryAfter"],
"limit": info["limit"],
"remaining": info["remaining"]
},
headers={
"Retry-After": str(int(info["retryAfter"])),
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["resetSeconds"])
}
)
return apiKey