Refactor: extract routes and config from app.py into separate modules

Move all API routes, OpenAI-compatible routes, web UI routes, shared config, models, rate limiter, and auth logic into dedicated files (config.py, routeApi.py, routeOpenAi.py, routeWeb.py). app.py now serves as a clean entry point.

Made-with: Cursor
This commit is contained in:
ValueOn AG 2026-03-30 14:49:35 +02:00
parent 7ca59120dc
commit 1f5d8e923b
5 changed files with 886 additions and 843 deletions

859
app.py
View file

@ -10,36 +10,15 @@ Models exposed:
- poweron-vision-deep (granite3.2)
"""
import os
import sys
import base64
import json
import re
import logging
import time
import uuid
from collections import defaultdict
from typing import Optional, List, Dict, Any
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Depends, Header, Request
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pydantic import BaseModel, Field
import httpx
# PDF Support
try:
import fitz # PyMuPDF
PDF_SUPPORT = True
except ImportError:
PDF_SUPPORT = False
print("WARNUNG: PyMuPDF nicht installiert. PDF-Support deaktiviert.")
print("Installieren mit: pip install pymupdf")
from config import CONFIG
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
@ -47,444 +26,6 @@ logging.basicConfig(
)
logger = logging.getLogger(__name__)
# ============================================================================
# Configuration
# ============================================================================
def _loadConfig() -> Dict[str, Any]:
"""Load configuration from config.ini file."""
configPath = os.path.join(os.path.dirname(__file__), "config.ini")
config = {
"apiKey": None,
"cursorApiKey": None,
"ollamaUrl": "http://localhost:11434",
"authUsername": "poweron",
"authPassword": "poweron",
"secretKey": "poweron-secret-key-change-in-production",
"rateLimitRequestsPerMinute": 60,
"rateLimitBurstSize": 10,
}
if os.path.exists(configPath):
try:
with open(configPath, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
# Map config keys
if key == "PRIVATE_LLM_API_KEY":
config["apiKey"] = value
elif key == "CURSOR_API_KEY":
config["cursorApiKey"] = value
elif key == "OLLAMA_URL":
config["ollamaUrl"] = value
elif key == "AUTH_USERNAME":
config["authUsername"] = value
elif key == "AUTH_PASSWORD":
config["authPassword"] = value
elif key == "SECRET_KEY":
config["secretKey"] = value
elif key == "RATE_LIMIT_REQUESTS_PER_MINUTE":
config["rateLimitRequestsPerMinute"] = int(value)
elif key == "RATE_LIMIT_BURST_SIZE":
config["rateLimitBurstSize"] = int(value)
except Exception as e:
logger.warning(f"Error loading config.ini: {e}")
# Override with environment variables if set
config["apiKey"] = os.environ.get("PRIVATE_LLM_API_KEY", config["apiKey"])
config["cursorApiKey"] = os.environ.get("CURSOR_API_KEY", config["cursorApiKey"])
config["ollamaUrl"] = os.environ.get("OLLAMA_URL", config["ollamaUrl"])
config["authUsername"] = os.environ.get("AUTH_USERNAME", config["authUsername"])
config["authPassword"] = os.environ.get("AUTH_PASSWORD", config["authPassword"])
config["secretKey"] = os.environ.get("SECRET_KEY", config["secretKey"])
config["rateLimitRequestsPerMinute"] = int(os.environ.get("RATE_LIMIT_REQUESTS_PER_MINUTE", config["rateLimitRequestsPerMinute"]))
config["rateLimitBurstSize"] = int(os.environ.get("RATE_LIMIT_BURST_SIZE", config["rateLimitBurstSize"]))
return config
CONFIG = _loadConfig()
# ============================================================================
# Rate Limiting (Token Bucket Algorithm)
# ============================================================================
class RateLimiter:
"""
Token bucket rate limiter with per-API-key tracking.
Each API key gets its own bucket. Tokens are added at a constant rate
(requestsPerMinute / 60 per second) up to a maximum burst size.
"""
def __init__(self, requestsPerMinute: int = 60, burstSize: int = 10):
self.requestsPerMinute = requestsPerMinute
self.burstSize = burstSize
self.tokensPerSecond = requestsPerMinute / 60.0
# Track tokens and last update time per API key
# Format: {apiKey: {"tokens": float, "lastUpdate": float}}
self._buckets: Dict[str, Dict[str, float]] = defaultdict(
lambda: {"tokens": burstSize, "lastUpdate": time.time()}
)
def _refillTokens(self, bucket: Dict[str, float]) -> None:
"""Refill tokens based on elapsed time."""
now = time.time()
elapsed = now - bucket["lastUpdate"]
bucket["tokens"] = min(
self.burstSize,
bucket["tokens"] + elapsed * self.tokensPerSecond
)
bucket["lastUpdate"] = now
def isAllowed(self, apiKey: str) -> tuple[bool, Dict[str, Any]]:
"""
Check if a request is allowed and consume a token if so.
Returns:
Tuple of (allowed: bool, info: dict with remaining tokens and retry_after)
"""
bucket = self._buckets[apiKey]
self._refillTokens(bucket)
if bucket["tokens"] >= 1.0:
bucket["tokens"] -= 1.0
return True, {
"remaining": int(bucket["tokens"]),
"limit": self.requestsPerMinute,
"resetSeconds": 60
}
else:
# Calculate when the next token will be available
retryAfter = (1.0 - bucket["tokens"]) / self.tokensPerSecond
return False, {
"remaining": 0,
"limit": self.requestsPerMinute,
"retryAfter": round(retryAfter, 1),
"resetSeconds": 60
}
def cleanup(self, maxAgeSeconds: int = 3600) -> int:
"""Remove stale buckets to prevent memory growth."""
now = time.time()
staleKeys = [
key for key, bucket in self._buckets.items()
if now - bucket["lastUpdate"] > maxAgeSeconds
]
for key in staleKeys:
del self._buckets[key]
return len(staleKeys)
# Global rate limiter instance
rateLimiter = RateLimiter(
requestsPerMinute=CONFIG["rateLimitRequestsPerMinute"],
burstSize=CONFIG["rateLimitBurstSize"]
)
# Model mapping: external name -> internal Ollama model name
# Production models (optimized for 32GB RAM server):
# - qwen2.5:7b: 7.6B params, 128K context, ~4.7GB RAM (Text)
# - qwen2.5vl:7b: 8.29B params, 125K context, ~6GB RAM (Vision)
# - granite3.2-vision: 2B params, 16K context, ~2.4GB RAM (Vision)
MODEL_MAPPING = {
"poweron-text-general": "qwen2.5:7b",
"poweron-vision-general": "qwen2.5vl:7b",
"poweron-vision-deep": "granite3.2-vision",
}
# Reverse mapping for lookups
INTERNAL_TO_EXTERNAL = {v: k for k, v in MODEL_MAPPING.items()}
# ============================================================================
# Request/Response Models
# ============================================================================
class AnalyzeRequest(BaseModel):
"""Request model for document analysis."""
imageBase64: Optional[str] = Field(default=None, description="Base64 encoded image")
prompt: str = Field(description="Analysis prompt")
modelName: str = Field(default="poweron-vision-general", description="Model to use")
class AnalyzeResponse(BaseModel):
"""Response model for document analysis."""
success: bool = Field(description="Whether the analysis was successful")
data: Optional[Dict[str, Any]] = Field(default=None, description="Extracted data")
rawResponse: Optional[str] = Field(default=None, description="Raw model response")
error: Optional[str] = Field(default=None, description="Error message if failed")
class PdfExtractRequest(BaseModel):
"""Request model for PDF extraction."""
pdfBase64: str = Field(description="Base64 encoded PDF")
page: Optional[int] = Field(default=None, description="Specific page number (1-indexed)")
class ModelInfo(BaseModel):
"""Model information."""
name: str = Field(description="External model name")
internalName: str = Field(description="Internal Ollama model name")
isVision: bool = Field(description="Whether it's a vision model")
pricePerCall: float = Field(description="Price per call in CHF")
class HealthResponse(BaseModel):
"""Health check response."""
status: str
service: str
pdfSupport: bool
ollamaConnected: bool
class OllamaStatusResponse(BaseModel):
"""Ollama status response."""
connected: bool
models: Optional[List[str]] = None
visionModels: Optional[List[str]] = None
totalModels: Optional[int] = None
error: Optional[str] = None
class OpenAiModelInfo(BaseModel):
"""OpenAI-compatible model object."""
id: str
object: str = "model"
created: int
ownedBy: str = Field(default="poweron", alias="owned_by")
class OpenAiModelsResponse(BaseModel):
"""OpenAI-compatible models list response."""
object: str = "list"
data: List[OpenAiModelInfo]
class OpenAiChatMessage(BaseModel):
"""OpenAI-compatible chat message."""
role: str
content: Any
class OpenAiChatCompletionRequest(BaseModel):
"""OpenAI-compatible chat completion request."""
model: str
messages: List[OpenAiChatMessage]
stream: Optional[bool] = False
maxTokens: Optional[int] = Field(default=None, alias="max_tokens")
temperature: Optional[float] = None
class OpenAiChatCompletionChoice(BaseModel):
"""OpenAI-compatible completion choice."""
index: int
message: OpenAiChatMessage
finishReason: str = Field(default="stop", alias="finish_reason")
class OpenAiChatCompletionUsage(BaseModel):
"""OpenAI-compatible token usage."""
promptTokens: int = Field(default=0, alias="prompt_tokens")
completionTokens: int = Field(default=0, alias="completion_tokens")
totalTokens: int = Field(default=0, alias="total_tokens")
class OpenAiChatCompletionResponse(BaseModel):
"""OpenAI-compatible chat completion response."""
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[OpenAiChatCompletionChoice]
usage: OpenAiChatCompletionUsage
# ============================================================================
# PDF Helper Functions
# ============================================================================
def _extractImagesFromPdf(pdfBytes: bytes, maxPages: int = 5) -> List[Dict[str, Any]]:
"""Extract images from a PDF."""
if not PDF_SUPPORT:
raise Exception("PDF-Support nicht verfügbar. Bitte PyMuPDF installieren.")
images = []
doc = fitz.open(stream=pdfBytes, filetype="pdf")
numPages = min(len(doc), maxPages)
for pageNum in range(numPages):
page = doc[pageNum]
mat = fitz.Matrix(2.0, 2.0) # 2x Zoom for better quality
pix = page.get_pixmap(matrix=mat)
imgBytes = pix.tobytes("png")
imgBase64 = base64.b64encode(imgBytes).decode("utf-8")
images.append({
"page": pageNum + 1,
"base64": imgBase64,
"width": pix.width,
"height": pix.height
})
doc.close()
return images
def _renderPdfPageAsImage(pdfBytes: bytes, pageNum: int = 0, zoom: float = 2.0) -> Dict[str, Any]:
"""Render a single PDF page as an image."""
if not PDF_SUPPORT:
raise Exception("PDF-Support nicht verfügbar.")
doc = fitz.open(stream=pdfBytes, filetype="pdf")
if pageNum >= len(doc):
pageNum = len(doc) - 1
page = doc[pageNum]
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
imgBytes = pix.tobytes("png")
imgBase64 = base64.b64encode(imgBytes).decode("utf-8")
result = {
"base64": imgBase64,
"width": pix.width,
"height": pix.height,
"page": pageNum + 1,
"totalPages": len(doc)
}
doc.close()
return result
# ============================================================================
# Model Helper Functions
# ============================================================================
def _isVisionModel(modelName: str) -> bool:
"""Check if a model is a vision model based on naming conventions."""
if not modelName:
return False
modelLower = modelName.lower()
visionIndicators = ["vision", "vl", "llava", "bakllava", "granite"]
return any(indicator in modelLower for indicator in visionIndicators)
def _getInternalModelName(externalName: str) -> str:
"""Get the internal Ollama model name from external name."""
return MODEL_MAPPING.get(externalName, externalName)
def _getExternalModelName(internalName: str) -> str:
"""Get the external model name from internal Ollama name."""
return INTERNAL_TO_EXTERNAL.get(internalName, internalName)
def _contentToText(content: Any) -> str:
"""Normalize OpenAI message content into plain text."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
textParts = []
for part in content:
if isinstance(part, str):
textParts.append(part)
continue
if isinstance(part, dict):
partText = part.get("text")
if isinstance(partText, str):
textParts.append(partText)
return "\n".join([part for part in textParts if part.strip()])
if isinstance(content, dict):
contentText = content.get("text")
if isinstance(contentText, str):
return contentText
return str(content)
def _messagesToPrompt(messages: List[OpenAiChatMessage]) -> str:
"""Convert OpenAI chat messages to a single prompt for Ollama generate."""
promptLines = []
for message in messages:
normalizedText = _contentToText(message.content).strip()
if not normalizedText:
continue
promptLines.append(f"{message.role}: {normalizedText}")
if not promptLines:
return ""
promptLines.append("assistant:")
return "\n\n".join(promptLines)
# ============================================================================
# Authentication & Rate Limiting
# ============================================================================
async def _verifyApiKey(xApiKey: Optional[str] = Header(None, alias="X-API-Key")) -> str:
"""Verify the API key from header and return it for rate limiting."""
if not CONFIG["apiKey"]:
# No API key configured, allow all requests (development mode)
logger.warning("No API key configured - running in development mode")
return "dev-mode"
if not xApiKey:
raise HTTPException(status_code=401, detail="API key required")
if xApiKey != CONFIG["apiKey"]:
raise HTTPException(status_code=401, detail="Invalid API key")
return xApiKey
async def _verifyCursorApiKey(authorization: Optional[str] = Header(None)) -> str:
"""Verify Bearer token for Cursor OpenAI-compatible endpoints."""
expectedApiKey = CONFIG.get("cursorApiKey")
if not expectedApiKey:
raise HTTPException(
status_code=503,
detail="Cursor API key not configured on server"
)
if not authorization:
raise HTTPException(status_code=401, detail="Authorization header required")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Bearer token required")
providedApiKey = authorization[len("Bearer "):].strip()
if providedApiKey != expectedApiKey:
raise HTTPException(status_code=401, detail="Invalid API key")
return providedApiKey
async def _checkRateLimit(apiKey: str = Depends(_verifyApiKey)) -> str:
"""Check rate limit for the authenticated API key."""
allowed, info = rateLimiter.isAllowed(apiKey)
if not allowed:
raise HTTPException(
status_code=429,
detail={
"error": "Rate limit exceeded",
"message": f"Too many requests. Please retry after {info['retryAfter']} seconds.",
"retryAfter": info["retryAfter"],
"limit": info["limit"],
"remaining": info["remaining"]
},
headers={
"Retry-After": str(int(info["retryAfter"])),
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["resetSeconds"])
}
)
return apiKey
# ============================================================================
# Application Lifecycle
@ -496,10 +37,12 @@ async def lifespan(app: FastAPI):
logger.info("Private-LLM Service starting up...")
logger.info(f"Ollama URL: {CONFIG['ollamaUrl']}")
logger.info(f"API Key configured: {'Yes' if CONFIG['apiKey'] else 'No (development mode)'}")
from config import PDF_SUPPORT
logger.info(f"PDF Support: {'Enabled' if PDF_SUPPORT else 'Disabled'}")
yield
logger.info("Private-LLM Service shutting down...")
# ============================================================================
# FastAPI Application
# ============================================================================
@ -511,7 +54,7 @@ app = FastAPI(
lifespan=lifespan,
)
# CORS Configuration - Allow gateway instances
# CORS Configuration
ALLOWED_ORIGINS = [
"http://localhost:8000",
"http://localhost:8080",
@ -521,13 +64,11 @@ ALLOWED_ORIGINS = [
"http://127.0.0.1:5000",
]
# Add production origins
PRODUCTION_PATTERNS = [
"poweron.swiss",
"poweron-center.net",
]
# Build full origins list with https variants
for pattern in PRODUCTION_PATTERNS:
ALLOWED_ORIGINS.extend([
f"https://{pattern}",
@ -539,7 +80,6 @@ for pattern in PRODUCTION_PATTERNS:
f"https://playground.{pattern}",
])
# Allow all subdomains via regex in middleware
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
@ -551,390 +91,23 @@ app.add_middleware(
max_age=86400,
)
# Static files and templates (for web UI)
# Static files (for web UI)
app.mount("/static", StaticFiles(directory="static"), name="static")
templates = Jinja2Templates(directory="templates")
# ============================================================================
# API Routes
# Route Registration
# ============================================================================
@app.get("/api/health", response_model=HealthResponse, tags=["System"])
async def _healthCheck():
"""Health check endpoint."""
ollamaConnected = False
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{CONFIG['ollamaUrl']}/api/tags")
ollamaConnected = response.status_code == 200
except Exception:
pass
return HealthResponse(
status="ok",
service="private-llm",
pdfSupport=PDF_SUPPORT,
ollamaConnected=ollamaConnected
)
from routeApi import router as apiRouter
app.include_router(apiRouter)
@app.get("/api/models", response_model=List[ModelInfo], tags=["Models"])
async def _listModels(authenticated: bool = Depends(_verifyApiKey)):
"""List available models with pricing."""
models = []
for externalName, internalName in MODEL_MAPPING.items():
isVision = _isVisionModel(internalName)
pricePerCall = 0.10 if isVision else 0.01 # CHF pricing
models.append(ModelInfo(
name=externalName,
internalName=internalName,
isVision=isVision,
pricePerCall=pricePerCall
))
return models
from routeOpenAi import router as openAiRouter
app.include_router(openAiRouter)
from routeWeb import router as webRouter
app.include_router(webRouter)
@app.get("/v1/models", response_model=OpenAiModelsResponse, tags=["OpenAI Compatible"])
async def _listOpenAiModels(cursorApiKey: str = Depends(_verifyCursorApiKey)):
"""OpenAI-compatible models endpoint for Cursor."""
createdAt = int(time.time())
modelData = []
for externalName in MODEL_MAPPING.keys():
modelData.append(
OpenAiModelInfo(
id=externalName,
created=createdAt
)
)
return OpenAiModelsResponse(data=modelData)
@app.post(
"/v1/chat/completions",
response_model=OpenAiChatCompletionResponse,
tags=["OpenAI Compatible"]
)
async def _openAiChatCompletions(
request: OpenAiChatCompletionRequest,
cursorApiKey: str = Depends(_verifyCursorApiKey)
):
"""OpenAI-compatible chat completions endpoint for Cursor."""
if request.stream:
raise HTTPException(
status_code=400,
detail="Streaming is not supported by this endpoint"
)
allowed, info = rateLimiter.isAllowed(f"cursor:{cursorApiKey}")
if not allowed:
raise HTTPException(
status_code=429,
detail={
"error": "Rate limit exceeded",
"message": f"Too many requests. Please retry after {info['retryAfter']} seconds.",
"retryAfter": info["retryAfter"],
"limit": info["limit"],
"remaining": info["remaining"]
},
headers={
"Retry-After": str(int(info["retryAfter"])),
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["resetSeconds"])
}
)
promptText = _messagesToPrompt(request.messages).strip()
if not promptText:
raise HTTPException(status_code=400, detail="messages must contain text content")
internalModelName = _getInternalModelName(request.model)
if _isVisionModel(internalModelName):
raise HTTPException(
status_code=400,
detail="Vision models are not supported on /v1/chat/completions"
)
requestOptions = {
"num_ctx": 8192
}
if request.temperature is not None:
requestOptions["temperature"] = request.temperature
if request.maxTokens is not None:
requestOptions["num_predict"] = request.maxTokens
requestBody = {
"model": internalModelName,
"prompt": promptText,
"stream": False,
"options": requestOptions
}
try:
async with httpx.AsyncClient(timeout=3600.0) as client:
response = await client.post(
f"{CONFIG['ollamaUrl']}/api/generate",
json=requestBody
)
if response.status_code == 404:
raise HTTPException(
status_code=404,
detail=f'Model "{request.model}" not found'
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Ollama API error: {response.status_code} - {response.text[:200]}"
)
responseData = response.json()
responseText = responseData.get("response", "").strip()
promptEvalCount = int(responseData.get("prompt_eval_count", 0))
evalCount = int(responseData.get("eval_count", 0))
return OpenAiChatCompletionResponse(
id=f"chatcmpl-{uuid.uuid4().hex}",
created=int(time.time()),
model=request.model,
choices=[
OpenAiChatCompletionChoice(
index=0,
message=OpenAiChatMessage(role="assistant", content=responseText)
)
],
usage=OpenAiChatCompletionUsage(
promptTokens=promptEvalCount,
completionTokens=evalCount,
totalTokens=promptEvalCount + evalCount
)
)
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Upstream timeout (Ollama)")
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="Cannot connect to Ollama upstream")
@app.get("/api/ollama/status", response_model=OllamaStatusResponse, tags=["System"])
async def _ollamaStatus():
"""Check Ollama connection status and list available models."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{CONFIG['ollamaUrl']}/api/tags")
if response.status_code != 200:
return OllamaStatusResponse(
connected=False,
error=f"Ollama responded with status {response.status_code}"
)
data = response.json()
models = [m.get("name", "") for m in data.get("models", [])]
visionModels = [m for m in models if _isVisionModel(m)]
return OllamaStatusResponse(
connected=True,
models=models,
visionModels=visionModels,
totalModels=len(models)
)
except httpx.ConnectError:
return OllamaStatusResponse(
connected=False,
error="Keine Verbindung zu Ollama. Ist Ollama gestartet?"
)
except Exception as e:
return OllamaStatusResponse(
connected=False,
error=str(e)
)
@app.post("/api/analyze", response_model=AnalyzeResponse, tags=["AI"])
async def _analyzeDocument(
request: AnalyzeRequest,
xApiKey: Optional[str] = Header(None, alias="X-API-Key")
):
"""
Analyze a document with AI Vision API.
Supports both vision models (with images) and text models (without images).
Authentication:
- Gateway calls: Must include X-API-Key header
- Test UI calls: No auth required (same-origin)
Rate limiting is applied when API key is provided.
"""
# Apply rate limiting only for authenticated requests (Gateway)
if xApiKey:
if CONFIG["apiKey"] and xApiKey != CONFIG["apiKey"]:
raise HTTPException(status_code=401, detail="Invalid API key")
# Check rate limit for authenticated requests
allowed, info = rateLimiter.isAllowed(xApiKey)
if not allowed:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Retry after {info['retryAfter']} seconds.",
headers={"Retry-After": str(int(info["retryAfter"]) + 1)},
)
try:
# Get internal model name
internalModelName = _getInternalModelName(request.modelName)
isVision = _isVisionModel(internalModelName)
# Validate request
if isVision and not request.imageBase64:
raise HTTPException(
status_code=400,
detail="Kein Bild übermittelt (erforderlich für Vision-Modelle)"
)
if not request.prompt:
raise HTTPException(status_code=400, detail="Kein Prompt übermittelt")
# Model-specific context lengths (reduced for RAM constraints)
# Server has 31GB RAM + 22GB GPU - vision models need smaller context
modelContextLengths = {
"qwen2.5:7b": 8192, # Text model - 8K context
"qwen2.5vl:7b": 4096, # Vision model - 4K context (images use lots of RAM)
"granite3.2-vision": 4096, # Vision model - 4K context
"granite3.2-vision:latest": 4096,
"deepseek-ocr": 4096, # OCR model - 4K context
"deepseek-ocr:latest": 4096,
}
numCtx = modelContextLengths.get(internalModelName, 4096)
# Build request body with model-specific context window
requestBody = {
"model": internalModelName,
"prompt": request.prompt,
"stream": False,
"options": {
"num_ctx": numCtx
}
}
if request.imageBase64:
requestBody["images"] = [request.imageBase64]
# Call Ollama API
async with httpx.AsyncClient(timeout=3600.0) as client: # 60 min timeout
response = await client.post(
f"{CONFIG['ollamaUrl']}/api/generate",
json=requestBody
)
if response.status_code == 404:
raise HTTPException(
status_code=404,
detail=f'Modell "{internalModelName}" nicht gefunden. Bitte installieren mit: ollama pull {internalModelName}'
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Ollama API Fehler: {response.status_code} - {response.text[:200]}"
)
responseData = response.json()
responseText = responseData.get("response", "")
# Try to extract JSON from response
extractedData = None
jsonMatch = re.search(r"\{[\s\S]*\}", responseText)
if jsonMatch:
try:
extractedData = json.loads(jsonMatch.group())
except json.JSONDecodeError:
extractedData = None
# Wrap plain text response in JSON object
if extractedData is None:
extractedData = {"response": responseText.strip()}
return AnalyzeResponse(
success=True,
data=extractedData,
rawResponse=responseText
)
except httpx.TimeoutException:
return AnalyzeResponse(
success=False,
error="Zeitüberschreitung bei der Ollama API"
)
except httpx.ConnectError:
return AnalyzeResponse(
success=False,
error="Verbindung zu Ollama fehlgeschlagen. Ist Ollama gestartet?"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error analyzing document: {e}")
return AnalyzeResponse(
success=False,
error=f"Unerwarteter Fehler: {str(e)}"
)
@app.post("/api/pdf/extract", tags=["PDF"])
async def _extractPdfImages(request: PdfExtractRequest):
"""
Extract images from a PDF.
No API key required - this endpoint is for local test UI only,
not used by gateway (gateway sends images directly).
"""
if not PDF_SUPPORT:
raise HTTPException(
status_code=501,
detail="PDF-Support nicht verfügbar. Bitte PyMuPDF installieren: pip install pymupdf"
)
try:
pdfBytes = base64.b64decode(request.pdfBase64)
if request.page is not None:
# Extract single page
result = _renderPdfPageAsImage(pdfBytes, request.page - 1)
return {"success": True, "image": result}
else:
# Extract all pages (max 5)
images = _extractImagesFromPdf(pdfBytes, maxPages=5)
return {
"success": True,
"images": images,
"totalExtracted": len(images)
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"PDF-Verarbeitungsfehler: {str(e)}"
)
# ============================================================================
# Web UI Routes (Optional - for direct browser access)
# ============================================================================
@app.get("/", response_class=HTMLResponse, tags=["Web UI"])
async def _index(request: Request):
"""Main page with document scanner UI."""
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/login", response_class=HTMLResponse, tags=["Web UI"])
async def _loginPage(request: Request):
"""Login page."""
return templates.TemplateResponse("login.html", {"request": request})
@app.get("/logout", response_class=HTMLResponse, tags=["Web UI"])
async def _logout(request: Request):
"""Logout - redirect to login page."""
from starlette.responses import RedirectResponse
return RedirectResponse(url="/login", status_code=302)
# ============================================================================
# Main
@ -942,7 +115,7 @@ async def _logout(request: Request):
if __name__ == "__main__":
import uvicorn
print("\n" + "=" * 60)
print(" Private-LLM Service - KI-Dokumentenanalyse")
print(" Powered by PowerOn")
@ -952,5 +125,5 @@ if __name__ == "__main__":
print(f" Ollama URL: {CONFIG['ollamaUrl']}")
print("\n Drücke Ctrl+C zum Beenden")
print("=" * 60 + "\n")
uvicorn.run(app, host="0.0.0.0", port=5000)

435
config.py Normal file
View file

@ -0,0 +1,435 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Shared configuration, models, helpers, and auth for the Private-LLM service."""
import os
import base64
import json
import re
import logging
import time
import uuid
from collections import defaultdict
from typing import Optional, List, Dict, Any
from fastapi import HTTPException, Header, Depends
from pydantic import BaseModel, Field
# PDF Support
try:
import fitz # PyMuPDF
PDF_SUPPORT = True
except ImportError:
PDF_SUPPORT = False
logger = logging.getLogger(__name__)
# ============================================================================
# Configuration
# ============================================================================
def _loadConfig() -> Dict[str, Any]:
"""Load configuration from config.ini file."""
configPath = os.path.join(os.path.dirname(__file__), "config.ini")
config = {
"apiKey": None,
"cursorApiKey": None,
"ollamaUrl": "http://localhost:11434",
"authUsername": "poweron",
"authPassword": "poweron",
"secretKey": "poweron-secret-key-change-in-production",
"rateLimitRequestsPerMinute": 60,
"rateLimitBurstSize": 10,
}
if os.path.exists(configPath):
try:
with open(configPath, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, value = line.split("=", 1)
key = key.strip()
value = value.strip()
if key == "PRIVATE_LLM_API_KEY":
config["apiKey"] = value
elif key == "CURSOR_API_KEY":
config["cursorApiKey"] = value
elif key == "OLLAMA_URL":
config["ollamaUrl"] = value
elif key == "AUTH_USERNAME":
config["authUsername"] = value
elif key == "AUTH_PASSWORD":
config["authPassword"] = value
elif key == "SECRET_KEY":
config["secretKey"] = value
elif key == "RATE_LIMIT_REQUESTS_PER_MINUTE":
config["rateLimitRequestsPerMinute"] = int(value)
elif key == "RATE_LIMIT_BURST_SIZE":
config["rateLimitBurstSize"] = int(value)
except Exception as e:
logger.warning(f"Error loading config.ini: {e}")
config["apiKey"] = os.environ.get("PRIVATE_LLM_API_KEY", config["apiKey"])
config["cursorApiKey"] = os.environ.get("CURSOR_API_KEY", config["cursorApiKey"])
config["ollamaUrl"] = os.environ.get("OLLAMA_URL", config["ollamaUrl"])
config["authUsername"] = os.environ.get("AUTH_USERNAME", config["authUsername"])
config["authPassword"] = os.environ.get("AUTH_PASSWORD", config["authPassword"])
config["secretKey"] = os.environ.get("SECRET_KEY", config["secretKey"])
config["rateLimitRequestsPerMinute"] = int(os.environ.get("RATE_LIMIT_REQUESTS_PER_MINUTE", config["rateLimitRequestsPerMinute"]))
config["rateLimitBurstSize"] = int(os.environ.get("RATE_LIMIT_BURST_SIZE", config["rateLimitBurstSize"]))
return config
CONFIG = _loadConfig()
# ============================================================================
# Rate Limiting (Token Bucket Algorithm)
# ============================================================================
class RateLimiter:
"""Token bucket rate limiter with per-API-key tracking."""
def __init__(self, requestsPerMinute: int = 60, burstSize: int = 10):
self.requestsPerMinute = requestsPerMinute
self.burstSize = burstSize
self.tokensPerSecond = requestsPerMinute / 60.0
self._buckets: Dict[str, Dict[str, float]] = defaultdict(
lambda: {"tokens": burstSize, "lastUpdate": time.time()}
)
def _refillTokens(self, bucket: Dict[str, float]) -> None:
now = time.time()
elapsed = now - bucket["lastUpdate"]
bucket["tokens"] = min(
self.burstSize,
bucket["tokens"] + elapsed * self.tokensPerSecond
)
bucket["lastUpdate"] = now
def isAllowed(self, apiKey: str) -> tuple[bool, Dict[str, Any]]:
bucket = self._buckets[apiKey]
self._refillTokens(bucket)
if bucket["tokens"] >= 1.0:
bucket["tokens"] -= 1.0
return True, {
"remaining": int(bucket["tokens"]),
"limit": self.requestsPerMinute,
"resetSeconds": 60
}
else:
retryAfter = (1.0 - bucket["tokens"]) / self.tokensPerSecond
return False, {
"remaining": 0,
"limit": self.requestsPerMinute,
"retryAfter": round(retryAfter, 1),
"resetSeconds": 60
}
def cleanup(self, maxAgeSeconds: int = 3600) -> int:
now = time.time()
staleKeys = [
key for key, bucket in self._buckets.items()
if now - bucket["lastUpdate"] > maxAgeSeconds
]
for key in staleKeys:
del self._buckets[key]
return len(staleKeys)
rateLimiter = RateLimiter(
requestsPerMinute=CONFIG["rateLimitRequestsPerMinute"],
burstSize=CONFIG["rateLimitBurstSize"]
)
# ============================================================================
# Model Mapping
# ============================================================================
MODEL_MAPPING = {
"poweron-text-general": "qwen2.5:7b",
"poweron-vision-general": "qwen2.5vl:7b",
"poweron-vision-deep": "granite3.2-vision",
}
INTERNAL_TO_EXTERNAL = {v: k for k, v in MODEL_MAPPING.items()}
# ============================================================================
# Request/Response Models
# ============================================================================
class AnalyzeRequest(BaseModel):
imageBase64: Optional[str] = Field(default=None, description="Base64 encoded image")
prompt: str = Field(description="Analysis prompt")
modelName: str = Field(default="poweron-vision-general", description="Model to use")
class AnalyzeResponse(BaseModel):
success: bool = Field(description="Whether the analysis was successful")
data: Optional[Dict[str, Any]] = Field(default=None, description="Extracted data")
rawResponse: Optional[str] = Field(default=None, description="Raw model response")
error: Optional[str] = Field(default=None, description="Error message if failed")
class PdfExtractRequest(BaseModel):
pdfBase64: str = Field(description="Base64 encoded PDF")
page: Optional[int] = Field(default=None, description="Specific page number (1-indexed)")
class ModelInfo(BaseModel):
name: str = Field(description="External model name")
internalName: str = Field(description="Internal Ollama model name")
isVision: bool = Field(description="Whether it's a vision model")
pricePerCall: float = Field(description="Price per call in CHF")
class HealthResponse(BaseModel):
status: str
service: str
pdfSupport: bool
ollamaConnected: bool
class OllamaStatusResponse(BaseModel):
connected: bool
models: Optional[List[str]] = None
visionModels: Optional[List[str]] = None
totalModels: Optional[int] = None
error: Optional[str] = None
class OpenAiModelInfo(BaseModel):
id: str
object: str = "model"
created: int
ownedBy: str = Field(default="poweron", alias="owned_by")
class OpenAiModelsResponse(BaseModel):
object: str = "list"
data: List[OpenAiModelInfo]
class OpenAiChatMessage(BaseModel):
role: str
content: Any
class OpenAiChatCompletionRequest(BaseModel):
model: str
messages: List[OpenAiChatMessage]
stream: Optional[bool] = False
maxTokens: Optional[int] = Field(default=None, alias="max_tokens")
temperature: Optional[float] = None
class OpenAiChatCompletionChoice(BaseModel):
index: int
message: OpenAiChatMessage
finishReason: str = Field(default="stop", alias="finish_reason")
class OpenAiChatCompletionUsage(BaseModel):
promptTokens: int = Field(default=0, alias="prompt_tokens")
completionTokens: int = Field(default=0, alias="completion_tokens")
totalTokens: int = Field(default=0, alias="total_tokens")
class OpenAiChatCompletionResponse(BaseModel):
id: str
object: str = "chat.completion"
created: int
model: str
choices: List[OpenAiChatCompletionChoice]
usage: OpenAiChatCompletionUsage
# ============================================================================
# Helper Functions
# ============================================================================
def _isVisionModel(modelName: str) -> bool:
if not modelName:
return False
modelLower = modelName.lower()
visionIndicators = ["vision", "vl", "llava", "bakllava", "granite"]
return any(indicator in modelLower for indicator in visionIndicators)
def _getInternalModelName(externalName: str) -> str:
return MODEL_MAPPING.get(externalName, externalName)
def _getExternalModelName(internalName: str) -> str:
return INTERNAL_TO_EXTERNAL.get(internalName, internalName)
def _contentToText(content: Any) -> str:
"""Normalize OpenAI message content into plain text."""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
textParts = []
for part in content:
if isinstance(part, str):
textParts.append(part)
continue
if isinstance(part, dict):
partText = part.get("text")
if isinstance(partText, str):
textParts.append(partText)
return "\n".join([part for part in textParts if part.strip()])
if isinstance(content, dict):
contentText = content.get("text")
if isinstance(contentText, str):
return contentText
return str(content)
def _messagesToPrompt(messages: List[OpenAiChatMessage]) -> str:
"""Convert OpenAI chat messages to a single prompt for Ollama generate."""
promptLines = []
for message in messages:
normalizedText = _contentToText(message.content).strip()
if not normalizedText:
continue
promptLines.append(f"{message.role}: {normalizedText}")
if not promptLines:
return ""
promptLines.append("assistant:")
return "\n\n".join(promptLines)
# ============================================================================
# PDF Helper Functions
# ============================================================================
def _extractImagesFromPdf(pdfBytes: bytes, maxPages: int = 5) -> List[Dict[str, Any]]:
if not PDF_SUPPORT:
raise Exception("PDF-Support nicht verfügbar. Bitte PyMuPDF installieren.")
images = []
doc = fitz.open(stream=pdfBytes, filetype="pdf")
numPages = min(len(doc), maxPages)
for pageNum in range(numPages):
page = doc[pageNum]
mat = fitz.Matrix(2.0, 2.0)
pix = page.get_pixmap(matrix=mat)
imgBytes = pix.tobytes("png")
imgBase64 = base64.b64encode(imgBytes).decode("utf-8")
images.append({
"page": pageNum + 1,
"base64": imgBase64,
"width": pix.width,
"height": pix.height
})
doc.close()
return images
def _renderPdfPageAsImage(pdfBytes: bytes, pageNum: int = 0, zoom: float = 2.0) -> Dict[str, Any]:
if not PDF_SUPPORT:
raise Exception("PDF-Support nicht verfügbar.")
doc = fitz.open(stream=pdfBytes, filetype="pdf")
if pageNum >= len(doc):
pageNum = len(doc) - 1
page = doc[pageNum]
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
imgBytes = pix.tobytes("png")
imgBase64 = base64.b64encode(imgBytes).decode("utf-8")
result = {
"base64": imgBase64,
"width": pix.width,
"height": pix.height,
"page": pageNum + 1,
"totalPages": len(doc)
}
doc.close()
return result
# ============================================================================
# Authentication Dependencies
# ============================================================================
async def _verifyApiKey(xApiKey: Optional[str] = Header(None, alias="X-API-Key")) -> str:
"""Verify the API key from header and return it for rate limiting."""
if not CONFIG["apiKey"]:
logger.warning("No API key configured - running in development mode")
return "dev-mode"
if not xApiKey:
raise HTTPException(status_code=401, detail="API key required")
if xApiKey != CONFIG["apiKey"]:
raise HTTPException(status_code=401, detail="Invalid API key")
return xApiKey
async def _verifyCursorApiKey(authorization: Optional[str] = Header(None)) -> str:
"""Verify Bearer token for Cursor OpenAI-compatible endpoints."""
expectedApiKey = CONFIG.get("cursorApiKey")
if not expectedApiKey:
raise HTTPException(
status_code=503,
detail="Cursor API key not configured on server"
)
if not authorization:
raise HTTPException(status_code=401, detail="Authorization header required")
if not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Bearer token required")
providedApiKey = authorization[len("Bearer "):].strip()
if providedApiKey != expectedApiKey:
raise HTTPException(status_code=401, detail="Invalid API key")
return providedApiKey
async def _checkRateLimit(apiKey: str = Depends(_verifyApiKey)) -> str:
"""Check rate limit for the authenticated API key."""
allowed, info = rateLimiter.isAllowed(apiKey)
if not allowed:
raise HTTPException(
status_code=429,
detail={
"error": "Rate limit exceeded",
"message": f"Too many requests. Please retry after {info['retryAfter']} seconds.",
"retryAfter": info["retryAfter"],
"limit": info["limit"],
"remaining": info["remaining"]
},
headers={
"Retry-After": str(int(info["retryAfter"])),
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["resetSeconds"])
}
)
return apiKey

256
routeApi.py Normal file
View file

@ -0,0 +1,256 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""API routes for Private-LLM: health, models, analyze, PDF extract, Ollama status."""
import base64
import json
import re
import logging
from typing import Optional, List
import httpx
from fastapi import APIRouter, HTTPException, Depends, Header
from config import (
CONFIG, MODEL_MAPPING, PDF_SUPPORT,
rateLimiter,
_isVisionModel, _getInternalModelName,
_extractImagesFromPdf, _renderPdfPageAsImage,
_verifyApiKey,
AnalyzeRequest, AnalyzeResponse,
PdfExtractRequest, ModelInfo,
HealthResponse, OllamaStatusResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["API"])
@router.get("/api/health", response_model=HealthResponse, tags=["System"])
async def _healthCheck():
"""Health check endpoint."""
ollamaConnected = False
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{CONFIG['ollamaUrl']}/api/tags")
ollamaConnected = response.status_code == 200
except Exception:
pass
return HealthResponse(
status="ok",
service="private-llm",
pdfSupport=PDF_SUPPORT,
ollamaConnected=ollamaConnected
)
@router.get("/api/models", response_model=List[ModelInfo], tags=["Models"])
async def _listModels(authenticated: bool = Depends(_verifyApiKey)):
"""List available models with pricing."""
models = []
for externalName, internalName in MODEL_MAPPING.items():
isVision = _isVisionModel(internalName)
pricePerCall = 0.10 if isVision else 0.01
models.append(ModelInfo(
name=externalName,
internalName=internalName,
isVision=isVision,
pricePerCall=pricePerCall
))
return models
@router.get("/api/ollama/status", response_model=OllamaStatusResponse, tags=["System"])
async def _ollamaStatus():
"""Check Ollama connection status and list available models."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(f"{CONFIG['ollamaUrl']}/api/tags")
if response.status_code != 200:
return OllamaStatusResponse(
connected=False,
error=f"Ollama responded with status {response.status_code}"
)
data = response.json()
models = [m.get("name", "") for m in data.get("models", [])]
visionModels = [m for m in models if _isVisionModel(m)]
return OllamaStatusResponse(
connected=True,
models=models,
visionModels=visionModels,
totalModels=len(models)
)
except httpx.ConnectError:
return OllamaStatusResponse(
connected=False,
error="Keine Verbindung zu Ollama. Ist Ollama gestartet?"
)
except Exception as e:
return OllamaStatusResponse(
connected=False,
error=str(e)
)
@router.post("/api/analyze", response_model=AnalyzeResponse, tags=["AI"])
async def _analyzeDocument(
request: AnalyzeRequest,
xApiKey: Optional[str] = Header(None, alias="X-API-Key")
):
"""
Analyze a document with AI Vision API.
Supports both vision models (with images) and text models (without images).
Authentication:
- Gateway calls: Must include X-API-Key header
- Test UI calls: No auth required (same-origin)
Rate limiting is applied when API key is provided.
"""
if xApiKey:
if CONFIG["apiKey"] and xApiKey != CONFIG["apiKey"]:
raise HTTPException(status_code=401, detail="Invalid API key")
allowed, info = rateLimiter.isAllowed(xApiKey)
if not allowed:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Retry after {info['retryAfter']} seconds.",
headers={"Retry-After": str(int(info["retryAfter"]) + 1)},
)
try:
internalModelName = _getInternalModelName(request.modelName)
isVision = _isVisionModel(internalModelName)
if isVision and not request.imageBase64:
raise HTTPException(
status_code=400,
detail="Kein Bild übermittelt (erforderlich für Vision-Modelle)"
)
if not request.prompt:
raise HTTPException(status_code=400, detail="Kein Prompt übermittelt")
# Server has 31GB RAM + 22GB GPU - vision models need smaller context
modelContextLengths = {
"qwen2.5:7b": 8192,
"qwen2.5vl:7b": 4096,
"granite3.2-vision": 4096,
"granite3.2-vision:latest": 4096,
"deepseek-ocr": 4096,
"deepseek-ocr:latest": 4096,
}
numCtx = modelContextLengths.get(internalModelName, 4096)
requestBody = {
"model": internalModelName,
"prompt": request.prompt,
"stream": False,
"options": {
"num_ctx": numCtx
}
}
if request.imageBase64:
requestBody["images"] = [request.imageBase64]
async with httpx.AsyncClient(timeout=3600.0) as client:
response = await client.post(
f"{CONFIG['ollamaUrl']}/api/generate",
json=requestBody
)
if response.status_code == 404:
raise HTTPException(
status_code=404,
detail=f'Modell "{internalModelName}" nicht gefunden. Bitte installieren mit: ollama pull {internalModelName}'
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Ollama API Fehler: {response.status_code} - {response.text[:200]}"
)
responseData = response.json()
responseText = responseData.get("response", "")
extractedData = None
jsonMatch = re.search(r"\{[\s\S]*\}", responseText)
if jsonMatch:
try:
extractedData = json.loads(jsonMatch.group())
except json.JSONDecodeError:
extractedData = None
if extractedData is None:
extractedData = {"response": responseText.strip()}
return AnalyzeResponse(
success=True,
data=extractedData,
rawResponse=responseText
)
except httpx.TimeoutException:
return AnalyzeResponse(
success=False,
error="Zeitüberschreitung bei der Ollama API"
)
except httpx.ConnectError:
return AnalyzeResponse(
success=False,
error="Verbindung zu Ollama fehlgeschlagen. Ist Ollama gestartet?"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error analyzing document: {e}")
return AnalyzeResponse(
success=False,
error=f"Unerwarteter Fehler: {str(e)}"
)
@router.post("/api/pdf/extract", tags=["PDF"])
async def _extractPdfImages(request: PdfExtractRequest):
"""
Extract images from a PDF.
No API key required - this endpoint is for local test UI only,
not used by gateway (gateway sends images directly).
"""
if not PDF_SUPPORT:
raise HTTPException(
status_code=501,
detail="PDF-Support nicht verfügbar. Bitte PyMuPDF installieren: pip install pymupdf"
)
try:
pdfBytes = base64.b64decode(request.pdfBase64)
if request.page is not None:
result = _renderPdfPageAsImage(pdfBytes, request.page - 1)
return {"success": True, "image": result}
else:
images = _extractImagesFromPdf(pdfBytes, maxPages=5)
return {
"success": True,
"images": images,
"totalExtracted": len(images)
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"PDF-Verarbeitungsfehler: {str(e)}"
)

145
routeOpenAi.py Normal file
View file

@ -0,0 +1,145 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""OpenAI-compatible routes for Cursor integration (/v1/models, /v1/chat/completions)."""
import time
import uuid
import logging
import httpx
from fastapi import APIRouter, HTTPException, Depends
from config import (
CONFIG, MODEL_MAPPING,
rateLimiter,
_isVisionModel, _getInternalModelName, _messagesToPrompt,
_verifyCursorApiKey,
OpenAiChatCompletionRequest, OpenAiChatCompletionResponse,
OpenAiChatCompletionChoice, OpenAiChatCompletionUsage,
OpenAiChatMessage, OpenAiModelInfo, OpenAiModelsResponse,
)
logger = logging.getLogger(__name__)
router = APIRouter(tags=["OpenAI Compatible"])
@router.get("/v1/models", response_model=OpenAiModelsResponse)
async def _listOpenAiModels(cursorApiKey: str = Depends(_verifyCursorApiKey)):
"""OpenAI-compatible models endpoint for Cursor."""
createdAt = int(time.time())
modelData = []
for externalName in MODEL_MAPPING.keys():
modelData.append(
OpenAiModelInfo(
id=externalName,
created=createdAt
)
)
return OpenAiModelsResponse(data=modelData)
@router.post(
"/v1/chat/completions",
response_model=OpenAiChatCompletionResponse,
)
async def _openAiChatCompletions(
request: OpenAiChatCompletionRequest,
cursorApiKey: str = Depends(_verifyCursorApiKey)
):
"""OpenAI-compatible chat completions endpoint for Cursor."""
if request.stream:
raise HTTPException(
status_code=400,
detail="Streaming is not supported by this endpoint"
)
allowed, info = rateLimiter.isAllowed(f"cursor:{cursorApiKey}")
if not allowed:
raise HTTPException(
status_code=429,
detail={
"error": "Rate limit exceeded",
"message": f"Too many requests. Please retry after {info['retryAfter']} seconds.",
"retryAfter": info["retryAfter"],
"limit": info["limit"],
"remaining": info["remaining"]
},
headers={
"Retry-After": str(int(info["retryAfter"])),
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["resetSeconds"])
}
)
promptText = _messagesToPrompt(request.messages).strip()
if not promptText:
raise HTTPException(status_code=400, detail="messages must contain text content")
internalModelName = _getInternalModelName(request.model)
if _isVisionModel(internalModelName):
raise HTTPException(
status_code=400,
detail="Vision models are not supported on /v1/chat/completions"
)
requestOptions = {
"num_ctx": 8192
}
if request.temperature is not None:
requestOptions["temperature"] = request.temperature
if request.maxTokens is not None:
requestOptions["num_predict"] = request.maxTokens
requestBody = {
"model": internalModelName,
"prompt": promptText,
"stream": False,
"options": requestOptions
}
try:
async with httpx.AsyncClient(timeout=3600.0) as client:
response = await client.post(
f"{CONFIG['ollamaUrl']}/api/generate",
json=requestBody
)
if response.status_code == 404:
raise HTTPException(
status_code=404,
detail=f'Model "{request.model}" not found'
)
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Ollama API error: {response.status_code} - {response.text[:200]}"
)
responseData = response.json()
responseText = responseData.get("response", "").strip()
promptEvalCount = int(responseData.get("prompt_eval_count", 0))
evalCount = int(responseData.get("eval_count", 0))
return OpenAiChatCompletionResponse(
id=f"chatcmpl-{uuid.uuid4().hex}",
created=int(time.time()),
model=request.model,
choices=[
OpenAiChatCompletionChoice(
index=0,
message=OpenAiChatMessage(role="assistant", content=responseText)
)
],
usage=OpenAiChatCompletionUsage(
promptTokens=promptEvalCount,
completionTokens=evalCount,
totalTokens=promptEvalCount + evalCount
)
)
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Upstream timeout (Ollama)")
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="Cannot connect to Ollama upstream")

34
routeWeb.py Normal file
View file

@ -0,0 +1,34 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Web UI routes for the Private-LLM test interface."""
import logging
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from starlette.responses import RedirectResponse
logger = logging.getLogger(__name__)
templates = Jinja2Templates(directory="templates")
router = APIRouter(tags=["Web UI"])
@router.get("/", response_class=HTMLResponse)
async def _index(request: Request):
"""Main page with document scanner UI."""
return templates.TemplateResponse("index.html", {"request": request})
@router.get("/login", response_class=HTMLResponse)
async def _loginPage(request: Request):
"""Login page."""
return templates.TemplateResponse("login.html", {"request": request})
@router.get("/logout", response_class=HTMLResponse)
async def _logout(request: Request):
"""Logout - redirect to login page."""
return RedirectResponse(url="/login", status_code=302)