# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Private-LLM Service - FastAPI Web App Provides AI model endpoints for OCR and Vision processing via Ollama. Models exposed: - poweron-ocr-general (deepseek) - poweron-vision-general (qwen2.5) - poweron-vision-deep (granite3.2) """ import os import sys import base64 import json import re import logging import time from collections import defaultdict from typing import Optional, List, Dict, Any from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Depends, Header, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pydantic import BaseModel, Field import httpx # PDF Support try: import fitz # PyMuPDF PDF_SUPPORT = True except ImportError: PDF_SUPPORT = False print("WARNUNG: PyMuPDF nicht installiert. PDF-Support deaktiviert.") print("Installieren mit: pip install pymupdf") # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) # ============================================================================ # Configuration # ============================================================================ def _loadConfig() -> Dict[str, Any]: """Load configuration from config.ini file.""" configPath = os.path.join(os.path.dirname(__file__), "config.ini") config = { "apiKey": None, "ollamaUrl": "http://localhost:11434", "authUsername": "poweron", "authPassword": "poweron", "secretKey": "poweron-secret-key-change-in-production", "rateLimitRequestsPerMinute": 60, "rateLimitBurstSize": 10, } if os.path.exists(configPath): try: with open(configPath, "r") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" in line: key, value = line.split("=", 1) key = key.strip() value = value.strip() # Map config keys if key == "PRIVATE_LLM_API_KEY": config["apiKey"] = value elif key == "OLLAMA_URL": config["ollamaUrl"] = value elif key == "AUTH_USERNAME": config["authUsername"] = value elif key == "AUTH_PASSWORD": config["authPassword"] = value elif key == "SECRET_KEY": config["secretKey"] = value elif key == "RATE_LIMIT_REQUESTS_PER_MINUTE": config["rateLimitRequestsPerMinute"] = int(value) elif key == "RATE_LIMIT_BURST_SIZE": config["rateLimitBurstSize"] = int(value) except Exception as e: logger.warning(f"Error loading config.ini: {e}") # Override with environment variables if set config["apiKey"] = os.environ.get("PRIVATE_LLM_API_KEY", config["apiKey"]) config["ollamaUrl"] = os.environ.get("OLLAMA_URL", config["ollamaUrl"]) config["authUsername"] = os.environ.get("AUTH_USERNAME", config["authUsername"]) config["authPassword"] = os.environ.get("AUTH_PASSWORD", config["authPassword"]) config["secretKey"] = os.environ.get("SECRET_KEY", config["secretKey"]) config["rateLimitRequestsPerMinute"] = int(os.environ.get("RATE_LIMIT_REQUESTS_PER_MINUTE", config["rateLimitRequestsPerMinute"])) config["rateLimitBurstSize"] = int(os.environ.get("RATE_LIMIT_BURST_SIZE", config["rateLimitBurstSize"])) return config CONFIG = _loadConfig() # ============================================================================ # Rate Limiting (Token Bucket Algorithm) # ============================================================================ class RateLimiter: """ Token bucket rate limiter with per-API-key tracking. Each API key gets its own bucket. Tokens are added at a constant rate (requestsPerMinute / 60 per second) up to a maximum burst size. """ def __init__(self, requestsPerMinute: int = 60, burstSize: int = 10): self.requestsPerMinute = requestsPerMinute self.burstSize = burstSize self.tokensPerSecond = requestsPerMinute / 60.0 # Track tokens and last update time per API key # Format: {apiKey: {"tokens": float, "lastUpdate": float}} self._buckets: Dict[str, Dict[str, float]] = defaultdict( lambda: {"tokens": burstSize, "lastUpdate": time.time()} ) def _refillTokens(self, bucket: Dict[str, float]) -> None: """Refill tokens based on elapsed time.""" now = time.time() elapsed = now - bucket["lastUpdate"] bucket["tokens"] = min( self.burstSize, bucket["tokens"] + elapsed * self.tokensPerSecond ) bucket["lastUpdate"] = now def isAllowed(self, apiKey: str) -> tuple[bool, Dict[str, Any]]: """ Check if a request is allowed and consume a token if so. Returns: Tuple of (allowed: bool, info: dict with remaining tokens and retry_after) """ bucket = self._buckets[apiKey] self._refillTokens(bucket) if bucket["tokens"] >= 1.0: bucket["tokens"] -= 1.0 return True, { "remaining": int(bucket["tokens"]), "limit": self.requestsPerMinute, "resetSeconds": 60 } else: # Calculate when the next token will be available retryAfter = (1.0 - bucket["tokens"]) / self.tokensPerSecond return False, { "remaining": 0, "limit": self.requestsPerMinute, "retryAfter": round(retryAfter, 1), "resetSeconds": 60 } def cleanup(self, maxAgeSeconds: int = 3600) -> int: """Remove stale buckets to prevent memory growth.""" now = time.time() staleKeys = [ key for key, bucket in self._buckets.items() if now - bucket["lastUpdate"] > maxAgeSeconds ] for key in staleKeys: del self._buckets[key] return len(staleKeys) # Global rate limiter instance rateLimiter = RateLimiter( requestsPerMinute=CONFIG["rateLimitRequestsPerMinute"], burstSize=CONFIG["rateLimitBurstSize"] ) # Model mapping: external name -> internal Ollama model name # Production models (optimized for 32GB RAM server): # - qwen2.5:7b: 7.6B params, 128K context, ~4.7GB RAM (Text) # - qwen2.5vl:7b: 8.29B params, 125K context, ~6GB RAM (Vision) # - granite3.2-vision: 2B params, 16K context, ~2.4GB RAM (Vision) MODEL_MAPPING = { "poweron-text-general": "qwen2.5:7b", "poweron-vision-general": "qwen2.5vl:7b", "poweron-vision-deep": "granite3.2-vision", } # Reverse mapping for lookups INTERNAL_TO_EXTERNAL = {v: k for k, v in MODEL_MAPPING.items()} # ============================================================================ # Request/Response Models # ============================================================================ class AnalyzeRequest(BaseModel): """Request model for document analysis.""" imageBase64: Optional[str] = Field(default=None, description="Base64 encoded image") prompt: str = Field(description="Analysis prompt") modelName: str = Field(default="poweron-vision-general", description="Model to use") class AnalyzeResponse(BaseModel): """Response model for document analysis.""" success: bool = Field(description="Whether the analysis was successful") data: Optional[Dict[str, Any]] = Field(default=None, description="Extracted data") rawResponse: Optional[str] = Field(default=None, description="Raw model response") error: Optional[str] = Field(default=None, description="Error message if failed") class PdfExtractRequest(BaseModel): """Request model for PDF extraction.""" pdfBase64: str = Field(description="Base64 encoded PDF") page: Optional[int] = Field(default=None, description="Specific page number (1-indexed)") class ModelInfo(BaseModel): """Model information.""" name: str = Field(description="External model name") internalName: str = Field(description="Internal Ollama model name") isVision: bool = Field(description="Whether it's a vision model") pricePerCall: float = Field(description="Price per call in CHF") class HealthResponse(BaseModel): """Health check response.""" status: str service: str pdfSupport: bool ollamaConnected: bool class OllamaStatusResponse(BaseModel): """Ollama status response.""" connected: bool models: Optional[List[str]] = None visionModels: Optional[List[str]] = None totalModels: Optional[int] = None error: Optional[str] = None # ============================================================================ # PDF Helper Functions # ============================================================================ def _extractImagesFromPdf(pdfBytes: bytes, maxPages: int = 5) -> List[Dict[str, Any]]: """Extract images from a PDF.""" if not PDF_SUPPORT: raise Exception("PDF-Support nicht verfügbar. Bitte PyMuPDF installieren.") images = [] doc = fitz.open(stream=pdfBytes, filetype="pdf") numPages = min(len(doc), maxPages) for pageNum in range(numPages): page = doc[pageNum] mat = fitz.Matrix(2.0, 2.0) # 2x Zoom for better quality pix = page.get_pixmap(matrix=mat) imgBytes = pix.tobytes("png") imgBase64 = base64.b64encode(imgBytes).decode("utf-8") images.append({ "page": pageNum + 1, "base64": imgBase64, "width": pix.width, "height": pix.height }) doc.close() return images def _renderPdfPageAsImage(pdfBytes: bytes, pageNum: int = 0, zoom: float = 2.0) -> Dict[str, Any]: """Render a single PDF page as an image.""" if not PDF_SUPPORT: raise Exception("PDF-Support nicht verfügbar.") doc = fitz.open(stream=pdfBytes, filetype="pdf") if pageNum >= len(doc): pageNum = len(doc) - 1 page = doc[pageNum] mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat) imgBytes = pix.tobytes("png") imgBase64 = base64.b64encode(imgBytes).decode("utf-8") result = { "base64": imgBase64, "width": pix.width, "height": pix.height, "page": pageNum + 1, "totalPages": len(doc) } doc.close() return result # ============================================================================ # Model Helper Functions # ============================================================================ def _isVisionModel(modelName: str) -> bool: """Check if a model is a vision model based on naming conventions.""" if not modelName: return False modelLower = modelName.lower() visionIndicators = ["vision", "vl", "llava", "bakllava", "granite"] return any(indicator in modelLower for indicator in visionIndicators) def _getInternalModelName(externalName: str) -> str: """Get the internal Ollama model name from external name.""" return MODEL_MAPPING.get(externalName, externalName) def _getExternalModelName(internalName: str) -> str: """Get the external model name from internal Ollama name.""" return INTERNAL_TO_EXTERNAL.get(internalName, internalName) # ============================================================================ # Authentication & Rate Limiting # ============================================================================ async def _verifyApiKey(xApiKey: Optional[str] = Header(None, alias="X-API-Key")) -> str: """Verify the API key from header and return it for rate limiting.""" if not CONFIG["apiKey"]: # No API key configured, allow all requests (development mode) logger.warning("No API key configured - running in development mode") return "dev-mode" if not xApiKey: raise HTTPException(status_code=401, detail="API key required") if xApiKey != CONFIG["apiKey"]: raise HTTPException(status_code=401, detail="Invalid API key") return xApiKey async def _checkRateLimit(apiKey: str = Depends(_verifyApiKey)) -> str: """Check rate limit for the authenticated API key.""" allowed, info = rateLimiter.isAllowed(apiKey) if not allowed: raise HTTPException( status_code=429, detail={ "error": "Rate limit exceeded", "message": f"Too many requests. Please retry after {info['retryAfter']} seconds.", "retryAfter": info["retryAfter"], "limit": info["limit"], "remaining": info["remaining"] }, headers={ "Retry-After": str(int(info["retryAfter"])), "X-RateLimit-Limit": str(info["limit"]), "X-RateLimit-Remaining": str(info["remaining"]), "X-RateLimit-Reset": str(info["resetSeconds"]) } ) return apiKey # ============================================================================ # Application Lifecycle # ============================================================================ @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan handler.""" logger.info("Private-LLM Service starting up...") logger.info(f"Ollama URL: {CONFIG['ollamaUrl']}") logger.info(f"API Key configured: {'Yes' if CONFIG['apiKey'] else 'No (development mode)'}") logger.info(f"PDF Support: {'Enabled' if PDF_SUPPORT else 'Disabled'}") yield logger.info("Private-LLM Service shutting down...") # ============================================================================ # FastAPI Application # ============================================================================ app = FastAPI( title="PowerOn Private-LLM Service", description="AI model endpoints for OCR and Vision processing", version="1.0.0", lifespan=lifespan, ) # CORS Configuration - Allow gateway instances ALLOWED_ORIGINS = [ "http://localhost:8000", "http://localhost:8080", "http://localhost:5000", "http://127.0.0.1:8000", "http://127.0.0.1:8080", "http://127.0.0.1:5000", ] # Add production origins PRODUCTION_PATTERNS = [ "poweron.swiss", "poweron-center.net", ] # Build full origins list with https variants for pattern in PRODUCTION_PATTERNS: ALLOWED_ORIGINS.extend([ f"https://{pattern}", f"https://www.{pattern}", f"https://api.{pattern}", f"https://gateway.{pattern}", f"https://app.{pattern}", f"https://nyla.{pattern}", f"https://playground.{pattern}", ]) # Allow all subdomains via regex in middleware app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, allow_origin_regex=r"https://.*\.(poweron\.swiss|poweron-center\.net)", allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["*"], expose_headers=["*"], max_age=86400, ) # Static files and templates (for web UI) app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # ============================================================================ # API Routes # ============================================================================ @app.get("/api/health", response_model=HealthResponse, tags=["System"]) async def _healthCheck(): """Health check endpoint.""" ollamaConnected = False try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"{CONFIG['ollamaUrl']}/api/tags") ollamaConnected = response.status_code == 200 except Exception: pass return HealthResponse( status="ok", service="private-llm", pdfSupport=PDF_SUPPORT, ollamaConnected=ollamaConnected ) @app.get("/api/models", response_model=List[ModelInfo], tags=["Models"]) async def _listModels(authenticated: bool = Depends(_verifyApiKey)): """List available models with pricing.""" models = [] for externalName, internalName in MODEL_MAPPING.items(): isVision = _isVisionModel(internalName) pricePerCall = 0.10 if isVision else 0.01 # CHF pricing models.append(ModelInfo( name=externalName, internalName=internalName, isVision=isVision, pricePerCall=pricePerCall )) return models @app.get("/api/ollama/status", response_model=OllamaStatusResponse, tags=["System"]) async def _ollamaStatus(): """Check Ollama connection status and list available models.""" try: async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(f"{CONFIG['ollamaUrl']}/api/tags") if response.status_code != 200: return OllamaStatusResponse( connected=False, error=f"Ollama responded with status {response.status_code}" ) data = response.json() models = [m.get("name", "") for m in data.get("models", [])] visionModels = [m for m in models if _isVisionModel(m)] return OllamaStatusResponse( connected=True, models=models, visionModels=visionModels, totalModels=len(models) ) except httpx.ConnectError: return OllamaStatusResponse( connected=False, error="Keine Verbindung zu Ollama. Ist Ollama gestartet?" ) except Exception as e: return OllamaStatusResponse( connected=False, error=str(e) ) @app.post("/api/analyze", response_model=AnalyzeResponse, tags=["AI"]) async def _analyzeDocument( request: AnalyzeRequest, xApiKey: Optional[str] = Header(None, alias="X-API-Key") ): """ Analyze a document with AI Vision API. Supports both vision models (with images) and text models (without images). Authentication: - Gateway calls: Must include X-API-Key header - Test UI calls: No auth required (same-origin) Rate limiting is applied when API key is provided. """ # Apply rate limiting only for authenticated requests (Gateway) if xApiKey: if CONFIG["apiKey"] and xApiKey != CONFIG["apiKey"]: raise HTTPException(status_code=401, detail="Invalid API key") # Check rate limit for authenticated requests allowed, info = rateLimiter.isAllowed(xApiKey) if not allowed: raise HTTPException( status_code=429, detail=f"Rate limit exceeded. Retry after {info['retryAfter']} seconds." ) try: # Get internal model name internalModelName = _getInternalModelName(request.modelName) isVision = _isVisionModel(internalModelName) # Validate request if isVision and not request.imageBase64: raise HTTPException( status_code=400, detail="Kein Bild übermittelt (erforderlich für Vision-Modelle)" ) if not request.prompt: raise HTTPException(status_code=400, detail="Kein Prompt übermittelt") # Model-specific context lengths (reduced for RAM constraints) # Server has 31GB RAM + 22GB GPU - vision models need smaller context modelContextLengths = { "qwen2.5:7b": 8192, # Text model - 8K context "qwen2.5vl:7b": 4096, # Vision model - 4K context (images use lots of RAM) "granite3.2-vision": 4096, # Vision model - 4K context "granite3.2-vision:latest": 4096, "deepseek-ocr": 4096, # OCR model - 4K context "deepseek-ocr:latest": 4096, } numCtx = modelContextLengths.get(internalModelName, 4096) # Build request body with model-specific context window requestBody = { "model": internalModelName, "prompt": request.prompt, "stream": False, "options": { "num_ctx": numCtx } } if request.imageBase64: requestBody["images"] = [request.imageBase64] # Call Ollama API async with httpx.AsyncClient(timeout=3600.0) as client: # 60 min timeout response = await client.post( f"{CONFIG['ollamaUrl']}/api/generate", json=requestBody ) if response.status_code == 404: raise HTTPException( status_code=404, detail=f'Modell "{internalModelName}" nicht gefunden. Bitte installieren mit: ollama pull {internalModelName}' ) if response.status_code != 200: raise HTTPException( status_code=response.status_code, detail=f"Ollama API Fehler: {response.status_code} - {response.text[:200]}" ) responseData = response.json() responseText = responseData.get("response", "") # Try to extract JSON from response extractedData = None jsonMatch = re.search(r"\{[\s\S]*\}", responseText) if jsonMatch: try: extractedData = json.loads(jsonMatch.group()) except json.JSONDecodeError: extractedData = None # Wrap plain text response in JSON object if extractedData is None: extractedData = {"response": responseText.strip()} return AnalyzeResponse( success=True, data=extractedData, rawResponse=responseText ) except httpx.TimeoutException: return AnalyzeResponse( success=False, error="Zeitüberschreitung bei der Ollama API" ) except httpx.ConnectError: return AnalyzeResponse( success=False, error="Verbindung zu Ollama fehlgeschlagen. Ist Ollama gestartet?" ) except HTTPException: raise except Exception as e: logger.error(f"Error analyzing document: {e}") return AnalyzeResponse( success=False, error=f"Unerwarteter Fehler: {str(e)}" ) @app.post("/api/pdf/extract", tags=["PDF"]) async def _extractPdfImages(request: PdfExtractRequest): """ Extract images from a PDF. No API key required - this endpoint is for local test UI only, not used by gateway (gateway sends images directly). """ if not PDF_SUPPORT: raise HTTPException( status_code=501, detail="PDF-Support nicht verfügbar. Bitte PyMuPDF installieren: pip install pymupdf" ) try: pdfBytes = base64.b64decode(request.pdfBase64) if request.page is not None: # Extract single page result = _renderPdfPageAsImage(pdfBytes, request.page - 1) return {"success": True, "image": result} else: # Extract all pages (max 5) images = _extractImagesFromPdf(pdfBytes, maxPages=5) return { "success": True, "images": images, "totalExtracted": len(images) } except Exception as e: raise HTTPException( status_code=500, detail=f"PDF-Verarbeitungsfehler: {str(e)}" ) # ============================================================================ # Web UI Routes (Optional - for direct browser access) # ============================================================================ @app.get("/", response_class=HTMLResponse, tags=["Web UI"]) async def _index(request: Request): """Main page with document scanner UI.""" return templates.TemplateResponse("index.html", {"request": request}) @app.get("/login", response_class=HTMLResponse, tags=["Web UI"]) async def _loginPage(request: Request): """Login page.""" return templates.TemplateResponse("login.html", {"request": request}) @app.get("/logout", response_class=HTMLResponse, tags=["Web UI"]) async def _logout(request: Request): """Logout - redirect to login page.""" from starlette.responses import RedirectResponse return RedirectResponse(url="/login", status_code=302) # ============================================================================ # Main # ============================================================================ if __name__ == "__main__": import uvicorn print("\n" + "=" * 60) print(" Private-LLM Service - KI-Dokumentenanalyse") print(" Powered by PowerOn") print("=" * 60) print(f"\n Server läuft auf: http://localhost:5000") print(f" API Docs: http://localhost:5000/docs") print(f" Ollama URL: {CONFIG['ollamaUrl']}") print("\n Drücke Ctrl+C zum Beenden") print("=" * 60 + "\n") uvicorn.run(app, host="0.0.0.0", port=5000)