service-llm-private/app.py
2026-02-06 10:27:06 +01:00

684 lines
24 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Private-LLM Service - FastAPI Web App
Provides AI model endpoints for OCR and Vision processing via Ollama.
Models exposed:
- poweron-ocr-general (deepseek)
- poweron-vision-general (qwen2.5)
- poweron-vision-deep (granite3.2)
"""
import os
import sys
import base64
import json
import re
import logging
import time
from collections import defaultdict
from typing import Optional, List, Dict, Any
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Depends, Header, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field
import httpx
# PDF Support
try:
import fitz # PyMuPDF
PDF_SUPPORT = True
except ImportError:
PDF_SUPPORT = False
print("WARNUNG: PyMuPDF nicht installiert. PDF-Support deaktiviert.")
print("Installieren mit: pip install pymupdf")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
# ============================================================================
# Configuration
# ============================================================================
def _loadConfig() -> Dict[str, Any]:
"""Load configuration from config.ini file."""
configPath = os.path.join(os.path.dirname(__file__), "config.ini")
config = {
"apiKey": None,
"ollamaUrl": "http://localhost:11434",
"authUsername": "poweron",
"authPassword": "poweron",
"secretKey": "poweron-secret-key-change-in-production",
"rateLimitRequestsPerMinute": 60,
"rateLimitBurstSize": 10,
}
if os.path.exists(configPath):
try:
with open(configPath, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
# Map config keys
if key == "PRIVATE_LLM_API_KEY":
config["apiKey"] = value
elif key == "OLLAMA_URL":
config["ollamaUrl"] = value
elif key == "AUTH_USERNAME":
config["authUsername"] = value
elif key == "AUTH_PASSWORD":
config["authPassword"] = value
elif key == "SECRET_KEY":
config["secretKey"] = value
elif key == "RATE_LIMIT_REQUESTS_PER_MINUTE":
config["rateLimitRequestsPerMinute"] = int(value)
elif key == "RATE_LIMIT_BURST_SIZE":
config["rateLimitBurstSize"] = int(value)
except Exception as e:
logger.warning(f"Error loading config.ini: {e}")
# Override with environment variables if set
config["apiKey"] = os.environ.get("PRIVATE_LLM_API_KEY", config["apiKey"])
config["ollamaUrl"] = os.environ.get("OLLAMA_URL", config["ollamaUrl"])
config["authUsername"] = os.environ.get("AUTH_USERNAME", config["authUsername"])
config["authPassword"] = os.environ.get("AUTH_PASSWORD", config["authPassword"])
config["secretKey"] = os.environ.get("SECRET_KEY", config["secretKey"])
config["rateLimitRequestsPerMinute"] = int(os.environ.get("RATE_LIMIT_REQUESTS_PER_MINUTE", config["rateLimitRequestsPerMinute"]))
config["rateLimitBurstSize"] = int(os.environ.get("RATE_LIMIT_BURST_SIZE", config["rateLimitBurstSize"]))
return config
CONFIG = _loadConfig()
# ============================================================================
# Rate Limiting (Token Bucket Algorithm)
# ============================================================================
class RateLimiter:
"""
Token bucket rate limiter with per-API-key tracking.
Each API key gets its own bucket. Tokens are added at a constant rate
(requestsPerMinute / 60 per second) up to a maximum burst size.
"""
def __init__(self, requestsPerMinute: int = 60, burstSize: int = 10):
self.requestsPerMinute = requestsPerMinute
self.burstSize = burstSize
self.tokensPerSecond = requestsPerMinute / 60.0
# Track tokens and last update time per API key
# Format: {apiKey: {"tokens": float, "lastUpdate": float}}
self._buckets: Dict[str, Dict[str, float]] = defaultdict(
lambda: {"tokens": burstSize, "lastUpdate": time.time()}
)
def _refillTokens(self, bucket: Dict[str, float]) -> None:
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - bucket["lastUpdate"]
bucket["tokens"] = min(
self.burstSize,
bucket["tokens"] + elapsed * self.tokensPerSecond
)
bucket["lastUpdate"] = now
def isAllowed(self, apiKey: str) -> tuple[bool, Dict[str, Any]]:
"""
Check if a request is allowed and consume a token if so.
Returns:
Tuple of (allowed: bool, info: dict with remaining tokens and retry_after)
"""
bucket = self._buckets[apiKey]
self._refillTokens(bucket)
if bucket["tokens"] >= 1.0:
bucket["tokens"] -= 1.0
return True, {
"remaining": int(bucket["tokens"]),
"limit": self.requestsPerMinute,
"resetSeconds": 60
}
else:
# Calculate when the next token will be available
retryAfter = (1.0 - bucket["tokens"]) / self.tokensPerSecond
return False, {
"remaining": 0,
"limit": self.requestsPerMinute,
"retryAfter": round(retryAfter, 1),
"resetSeconds": 60
}
def cleanup(self, maxAgeSeconds: int = 3600) -> int:
"""Remove stale buckets to prevent memory growth."""
now = time.time()
staleKeys = [
key for key, bucket in self._buckets.items()
if now - bucket["lastUpdate"] > maxAgeSeconds
]
for key in staleKeys:
del self._buckets[key]
return len(staleKeys)
# Global rate limiter instance
rateLimiter = RateLimiter(
requestsPerMinute=CONFIG["rateLimitRequestsPerMinute"],
burstSize=CONFIG["rateLimitBurstSize"]
)
# Model mapping: external name -> internal Ollama model name
# Production models (optimized for 32GB RAM server):
# - deepseek-ocr: 3.34B params, 8K context, ~6.7GB RAM
# - qwen2.5vl:7b: 8.29B params, 125K context, ~6GB RAM
# - granite3.2-vision: 2B params, 16K context, ~2.4GB RAM
MODEL_MAPPING = {
"poweron-ocr-general": "deepseek-ocr",
"poweron-vision-general": "qwen2.5vl:7b",
"poweron-vision-deep": "granite3.2-vision",
}
# Reverse mapping for lookups
INTERNAL_TO_EXTERNAL = {v: k for k, v in MODEL_MAPPING.items()}
# ============================================================================
# Request/Response Models
# ============================================================================
class AnalyzeRequest(BaseModel):
"""Request model for document analysis."""
imageBase64: Optional[str] = Field(default=None, description="Base64 encoded image")
prompt: str = Field(description="Analysis prompt")
modelName: str = Field(default="poweron-vision-general", description="Model to use")
class AnalyzeResponse(BaseModel):
"""Response model for document analysis."""
success: bool = Field(description="Whether the analysis was successful")
data: Optional[Dict[str, Any]] = Field(default=None, description="Extracted data")
rawResponse: Optional[str] = Field(default=None, description="Raw model response")
error: Optional[str] = Field(default=None, description="Error message if failed")
class PdfExtractRequest(BaseModel):
"""Request model for PDF extraction."""
pdfBase64: str = Field(description="Base64 encoded PDF")
page: Optional[int] = Field(default=None, description="Specific page number (1-indexed)")
class ModelInfo(BaseModel):
"""Model information."""
name: str = Field(description="External model name")
internalName: str = Field(description="Internal Ollama model name")
isVision: bool = Field(description="Whether it's a vision model")
pricePerCall: float = Field(description="Price per call in CHF")
class HealthResponse(BaseModel):
"""Health check response."""
status: str
service: str
pdfSupport: bool
ollamaConnected: bool
class OllamaStatusResponse(BaseModel):
"""Ollama status response."""
connected: bool
models: Optional[List[str]] = None
visionModels: Optional[List[str]] = None
totalModels: Optional[int] = None
error: Optional[str] = None
# ============================================================================
# PDF Helper Functions
# ============================================================================
def _extractImagesFromPdf(pdfBytes: bytes, maxPages: int = 5) -> List[Dict[str, Any]]:
"""Extract images from a PDF."""
if not PDF_SUPPORT:
raise Exception("PDF-Support nicht verfügbar. Bitte PyMuPDF installieren.")
images = []
doc = fitz.open(stream=pdfBytes, filetype="pdf")
numPages = min(len(doc), maxPages)
for pageNum in range(numPages):
page = doc[pageNum]
mat = fitz.Matrix(2.0, 2.0) # 2x Zoom for better quality
pix = page.get_pixmap(matrix=mat)
imgBytes = pix.tobytes("png")
imgBase64 = base64.b64encode(imgBytes).decode("utf-8")
images.append({
"page": pageNum + 1,
"base64": imgBase64,
"width": pix.width,
"height": pix.height
})
doc.close()
return images
def _renderPdfPageAsImage(pdfBytes: bytes, pageNum: int = 0, zoom: float = 2.0) -> Dict[str, Any]:
"""Render a single PDF page as an image."""
if not PDF_SUPPORT:
raise Exception("PDF-Support nicht verfügbar.")
doc = fitz.open(stream=pdfBytes, filetype="pdf")
if pageNum >= len(doc):
pageNum = len(doc) - 1
page = doc[pageNum]
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
imgBytes = pix.tobytes("png")
imgBase64 = base64.b64encode(imgBytes).decode("utf-8")
result = {
"base64": imgBase64,
"width": pix.width,
"height": pix.height,
"page": pageNum + 1,
"totalPages": len(doc)
}
doc.close()
return result
# ============================================================================
# Model Helper Functions
# ============================================================================
def _isVisionModel(modelName: str) -> bool:
"""Check if a model is a vision model based on naming conventions."""
if not modelName:
return False
modelLower = modelName.lower()
visionIndicators = ["vision", "vl", "llava", "bakllava", "granite"]
return any(indicator in modelLower for indicator in visionIndicators)
def _getInternalModelName(externalName: str) -> str:
"""Get the internal Ollama model name from external name."""
return MODEL_MAPPING.get(externalName, externalName)
def _getExternalModelName(internalName: str) -> str:
"""Get the external model name from internal Ollama name."""
return INTERNAL_TO_EXTERNAL.get(internalName, internalName)
# ============================================================================
# Authentication & Rate Limiting
# ============================================================================
async def _verifyApiKey(xApiKey: Optional[str] = Header(None, alias="X-API-Key")) -> str:
"""Verify the API key from header and return it for rate limiting."""
if not CONFIG["apiKey"]:
# No API key configured, allow all requests (development mode)
logger.warning("No API key configured - running in development mode")
return "dev-mode"
if not xApiKey:
raise HTTPException(status_code=401, detail="API key required")
if xApiKey != CONFIG["apiKey"]:
raise HTTPException(status_code=401, detail="Invalid API key")
return xApiKey
async def _checkRateLimit(apiKey: str = Depends(_verifyApiKey)) -> str:
"""Check rate limit for the authenticated API key."""
allowed, info = rateLimiter.isAllowed(apiKey)
if not allowed:
raise HTTPException(
status_code=429,
detail={
"error": "Rate limit exceeded",
"message": f"Too many requests. Please retry after {info['retryAfter']} seconds.",
"retryAfter": info["retryAfter"],
"limit": info["limit"],
"remaining": info["remaining"]
},
headers={
"Retry-After": str(int(info["retryAfter"])),
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["resetSeconds"])
}
)
return apiKey
# ============================================================================
# Application Lifecycle
# ============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
logger.info("Private-LLM Service starting up...")
logger.info(f"Ollama URL: {CONFIG['ollamaUrl']}")
logger.info(f"API Key configured: {'Yes' if CONFIG['apiKey'] else 'No (development mode)'}")
logger.info(f"PDF Support: {'Enabled' if PDF_SUPPORT else 'Disabled'}")
yield
logger.info("Private-LLM Service shutting down...")
# ============================================================================
# FastAPI Application
# ============================================================================
app = FastAPI(
title="PowerOn Private-LLM Service",
description="AI model endpoints for OCR and Vision processing",
version="1.0.0",
lifespan=lifespan,
)
# CORS Configuration - Allow gateway instances
ALLOWED_ORIGINS = [
"http://localhost:8000",
"http://localhost:8080",
"http://localhost:5000",
"http://127.0.0.1:8000",
"http://127.0.0.1:8080",
"http://127.0.0.1:5000",
]
# Add production origins
PRODUCTION_PATTERNS = [
"poweron.swiss",
"poweron-center.net",
]
# Build full origins list with https variants
for pattern in PRODUCTION_PATTERNS:
ALLOWED_ORIGINS.extend([
f"https://{pattern}",
f"https://www.{pattern}",
f"https://api.{pattern}",
f"https://gateway.{pattern}",
f"https://app.{pattern}",
f"https://nyla.{pattern}",
f"https://playground.{pattern}",
])
# Allow all subdomains via regex in middleware
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_origin_regex=r"https://.*\.(poweron\.swiss|poweron-center\.net)",
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["*"],
expose_headers=["*"],
max_age=86400,
)
# Static files and templates (for web UI)
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# ============================================================================
# API Routes
# ============================================================================
@app.get("/api/health", response_model=HealthResponse, tags=["System"])
async def _healthCheck():
"""Health check endpoint."""
ollamaConnected = False
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{CONFIG['ollamaUrl']}/api/tags")
ollamaConnected = response.status_code == 200
except Exception:
pass
return HealthResponse(
status="ok",
service="private-llm",
pdfSupport=PDF_SUPPORT,
ollamaConnected=ollamaConnected
)
@app.get("/api/models", response_model=List[ModelInfo], tags=["Models"])
async def _listModels(authenticated: bool = Depends(_verifyApiKey)):
"""List available models with pricing."""
models = []
for externalName, internalName in MODEL_MAPPING.items():
isVision = _isVisionModel(internalName)
pricePerCall = 0.10 if isVision else 0.01 # CHF pricing
models.append(ModelInfo(
name=externalName,
internalName=internalName,
isVision=isVision,
pricePerCall=pricePerCall
))
return models
@app.get("/api/ollama/status", response_model=OllamaStatusResponse, tags=["System"])
async def _ollamaStatus(authenticated: bool = Depends(_verifyApiKey)):
"""Check Ollama connection status and list available models."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{CONFIG['ollamaUrl']}/api/tags")
if response.status_code != 200:
return OllamaStatusResponse(
connected=False,
error=f"Ollama responded with status {response.status_code}"
)
data = response.json()
models = [m.get("name", "") for m in data.get("models", [])]
visionModels = [m for m in models if _isVisionModel(m)]
return OllamaStatusResponse(
connected=True,
models=models,
visionModels=visionModels,
totalModels=len(models)
)
except httpx.ConnectError:
return OllamaStatusResponse(
connected=False,
error="Keine Verbindung zu Ollama. Ist Ollama gestartet?"
)
except Exception as e:
return OllamaStatusResponse(
connected=False,
error=str(e)
)
@app.post("/api/analyze", response_model=AnalyzeResponse, tags=["AI"])
async def _analyzeDocument(
request: AnalyzeRequest,
apiKey: str = Depends(_checkRateLimit)
):
"""
Analyze a document with AI Vision API.
Supports both vision models (with images) and text models (without images).
"""
try:
# Get internal model name
internalModelName = _getInternalModelName(request.modelName)
isVision = _isVisionModel(internalModelName)
# Validate request
if isVision and not request.imageBase64:
raise HTTPException(
status_code=400,
detail="Kein Bild übermittelt (erforderlich für Vision-Modelle)"
)
if not request.prompt:
raise HTTPException(status_code=400, detail="Kein Prompt übermittelt")
# Model-specific context lengths (actual model limits)
modelContextLengths = {
"deepseek-ocr": 8192, # 8K context
"qwen2.5vl:7b": 32768, # Use 32K (model supports 125K but RAM limited)
"granite3.2-vision": 16000, # 16K context
}
numCtx = modelContextLengths.get(internalModelName, 8192)
# Build request body with model-specific context window
requestBody = {
"model": internalModelName,
"prompt": request.prompt,
"stream": False,
"options": {
"num_ctx": numCtx
}
}
if request.imageBase64:
requestBody["images"] = [request.imageBase64]
# Call Ollama API
async with httpx.AsyncClient(timeout=3600.0) as client: # 60 min timeout
response = await client.post(
f"{CONFIG['ollamaUrl']}/api/generate",
json=requestBody
)
if response.status_code == 404:
raise HTTPException(
status_code=404,
detail=f'Modell "{internalModelName}" nicht gefunden. Bitte installieren mit: ollama pull {internalModelName}'
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Ollama API Fehler: {response.status_code} - {response.text[:200]}"
)
responseData = response.json()
responseText = responseData.get("response", "")
# Try to extract JSON from response
extractedData = None
jsonMatch = re.search(r"\{[\s\S]*\}", responseText)
if jsonMatch:
try:
extractedData = json.loads(jsonMatch.group())
except json.JSONDecodeError:
extractedData = None
# Wrap plain text response in JSON object
if extractedData is None:
extractedData = {"response": responseText.strip()}
return AnalyzeResponse(
success=True,
data=extractedData,
rawResponse=responseText
)
except httpx.TimeoutException:
return AnalyzeResponse(
success=False,
error="Zeitüberschreitung bei der Ollama API"
)
except httpx.ConnectError:
return AnalyzeResponse(
success=False,
error="Verbindung zu Ollama fehlgeschlagen. Ist Ollama gestartet?"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error analyzing document: {e}")
return AnalyzeResponse(
success=False,
error=f"Unerwarteter Fehler: {str(e)}"
)
@app.post("/api/pdf/extract", tags=["PDF"])
async def _extractPdfImages(
request: PdfExtractRequest,
authenticated: bool = Depends(_verifyApiKey)
):
"""Extract images from a PDF."""
if not PDF_SUPPORT:
raise HTTPException(
status_code=501,
detail="PDF-Support nicht verfügbar. Bitte PyMuPDF installieren: pip install pymupdf"
)
try:
pdfBytes = base64.b64decode(request.pdfBase64)
if request.page is not None:
# Extract single page
result = _renderPdfPageAsImage(pdfBytes, request.page - 1)
return {"success": True, "image": result}
else:
# Extract all pages (max 5)
images = _extractImagesFromPdf(pdfBytes, maxPages=5)
return {
"success": True,
"images": images,
"totalExtracted": len(images)
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"PDF-Verarbeitungsfehler: {str(e)}"
)
# ============================================================================
# Web UI Routes (Optional - for direct browser access)
# ============================================================================
@app.get("/", response_class=HTMLResponse, tags=["Web UI"])
async def _index(request: Request):
"""Main page with document scanner UI."""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/login", response_class=HTMLResponse, tags=["Web UI"])
async def _loginPage(request: Request):
"""Login page."""
return templates.TemplateResponse("login.html", {"request": request})
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
import uvicorn
print("\n" + "=" * 60)
print(" Private-LLM Service - KI-Dokumentenanalyse")
print(" Powered by PowerOn")
print("=" * 60)
print(f"\n Server läuft auf: http://localhost:5000")
print(f" API Docs: http://localhost:5000/docs")
print(f" Ollama URL: {CONFIG['ollamaUrl']}")
print("\n Drücke Ctrl+C zum Beenden")
print("=" * 60 + "\n")
uvicorn.run(app, host="0.0.0.0", port=5000)