gateway/modules/aicore/aicorePluginPrivateLlm.py
2026-02-06 10:26:54 +01:00

496 lines
20 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
AI Connector for PowerOn Private-LLM Service.
Connects to the private-llm service running on-premise with Ollama backend.
Provides OCR and Vision capabilities via local AI models.
Models:
- poweron-ocr-general: Text extraction and OCR (deepseek backend)
- poweron-vision-general: General vision tasks (qwen2.5vl backend)
- poweron-vision-deep: Deep vision analysis (granite3.2 backend)
Pricing (CHF per call):
- Text models: CHF 0.010
- Vision models: CHF 0.100
"""
import logging
import httpx
import time
from typing import List, Optional, Dict, Any
from fastapi import HTTPException
from modules.shared.configuration import APP_CONFIG
from .aicoreBase import BaseConnectorAi
from modules.datamodels.datamodelAi import (
AiModel,
PriorityEnum,
ProcessingModeEnum,
OperationTypeEnum,
AiModelCall,
AiModelResponse,
createOperationTypeRatings
)
# Configure logger
logger = logging.getLogger(__name__)
# Pricing constants (CHF)
PRICE_TEXT_PER_CALL = 0.01 # CHF 0.010 per text model call
PRICE_VISION_PER_CALL = 0.10 # CHF 0.100 per vision model call
# Private-LLM Service URL (fix, nicht via env konfigurierbar)
PRIVATE_LLM_BASE_URL = "https://llm.poweron.swiss:8000"
def _loadConfigData():
"""Load configuration data for Private-LLM connector."""
return {
"apiKey": APP_CONFIG.get("Connector_AiPrivateLlm_API_SECRET"),
"baseUrl": PRIVATE_LLM_BASE_URL,
}
class AiPrivateLlm(BaseConnectorAi):
"""Connector for communication with the PowerOn Private-LLM Service."""
def __init__(self):
super().__init__()
# Load configuration
self.config = _loadConfigData()
self.apiKey = self.config["apiKey"]
self.baseUrl = self.config["baseUrl"]
# HTTP client for API calls
# Timeout set to 3600 seconds (60 minutes) for large model processing
headers = {"Content-Type": "application/json"}
if self.apiKey:
headers["X-API-Key"] = self.apiKey
self.httpClient = httpx.AsyncClient(
timeout=3600.0,
headers=headers
)
# Cache for service availability check
self._serviceAvailable: Optional[bool] = None
self._availableOllamaModels: Optional[List[str]] = None
self._lastAvailabilityCheck: float = 0
self._availabilityCacheTtl: float = 60.0 # 60 seconds cache
logger.info(f"Private-LLM Connector initialized (URL: {self.baseUrl})")
def getConnectorType(self) -> str:
"""Get the connector type identifier."""
return "privatellm"
def _checkServiceAvailability(self) -> Dict[str, Any]:
"""
Check if the Private-LLM service is available and which Ollama models are installed.
Uses caching to avoid excessive health checks.
Returns:
Dict with 'serviceAvailable', 'ollamaConnected', 'availableModels'
"""
import asyncio
currentTime = time.time()
# Return cached result if still valid
if (self._serviceAvailable is not None and
currentTime - self._lastAvailabilityCheck < self._availabilityCacheTtl):
return {
"serviceAvailable": self._serviceAvailable,
"ollamaConnected": self._serviceAvailable,
"availableModels": self._availableOllamaModels or []
}
# Perform availability check
try:
# Use synchronous client for blocking check during initialization
with httpx.Client(timeout=5.0) as client:
headers = {"Content-Type": "application/json"}
if self.apiKey:
headers["X-API-Key"] = self.apiKey
# Check health endpoint
healthResponse = client.get(
f"{self.baseUrl}/api/health",
headers=headers
)
if healthResponse.status_code != 200:
logger.warning(f"Private-LLM service not available: HTTP {healthResponse.status_code}")
self._serviceAvailable = False
self._availableOllamaModels = []
self._lastAvailabilityCheck = currentTime
return {"serviceAvailable": False, "ollamaConnected": False, "availableModels": []}
healthData = healthResponse.json()
ollamaConnected = healthData.get("ollamaConnected", False)
if not ollamaConnected:
logger.warning("Private-LLM service available but Ollama not connected")
self._serviceAvailable = True
self._availableOllamaModels = []
self._lastAvailabilityCheck = currentTime
return {"serviceAvailable": True, "ollamaConnected": False, "availableModels": []}
# Check Ollama status for available models
statusResponse = client.get(
f"{self.baseUrl}/api/ollama/status",
headers=headers
)
if statusResponse.status_code == 200:
statusData = statusResponse.json()
self._availableOllamaModels = statusData.get("models", [])
else:
self._availableOllamaModels = []
self._serviceAvailable = True
self._lastAvailabilityCheck = currentTime
logger.info(f"Private-LLM availability check: service=OK, ollama=OK, models={len(self._availableOllamaModels)}")
return {
"serviceAvailable": True,
"ollamaConnected": True,
"availableModels": self._availableOllamaModels
}
except httpx.ConnectError:
logger.warning(f"Private-LLM service not reachable at {self.baseUrl}")
self._serviceAvailable = False
self._availableOllamaModels = []
self._lastAvailabilityCheck = currentTime
return {"serviceAvailable": False, "ollamaConnected": False, "availableModels": []}
except Exception as e:
logger.warning(f"Error checking Private-LLM availability: {e}")
self._serviceAvailable = False
self._availableOllamaModels = []
self._lastAvailabilityCheck = currentTime
return {"serviceAvailable": False, "ollamaConnected": False, "availableModels": []}
def _isModelAvailableInOllama(self, ollamaModelName: str, availableModels: List[str]) -> bool:
"""
Check if a model is available in Ollama.
Handles model name variations (with/without tags).
"""
if not availableModels:
return False
# Direct match
if ollamaModelName in availableModels:
return True
# Check without tag (e.g., "qwen2.5vl:72b" -> "qwen2.5vl")
baseModelName = ollamaModelName.split(":")[0]
for availModel in availableModels:
availBase = availModel.split(":")[0]
if baseModelName == availBase:
return True
return False
def getModels(self) -> List[AiModel]:
"""
Get all available Private-LLM models.
Checks service availability and returns only models that are actually available
in the connected Ollama instance. Returns empty list if service is not reachable.
"""
# Check service availability
availability = self._checkServiceAvailability()
if not availability["serviceAvailable"]:
logger.warning("Private-LLM service not available - no models returned")
return []
if not availability["ollamaConnected"]:
logger.warning("Private-LLM service available but Ollama not connected - no models returned")
return []
availableOllamaModels = availability.get("availableModels", [])
# Define all models with their Ollama backend names
# Actual model specs (for 32GB RAM server):
# - deepseek-ocr: 3.34B params, 8K context, ~6.7GB RAM
# - qwen2.5vl:7b: 8.29B params, 125K context, ~6GB RAM
# - granite3.2-vision: 2B params, 16K context, ~2.4GB RAM
modelDefinitions = [
# OCR Text Model (deepseek-ocr: 3.34B, 8K context)
{
"model": AiModel(
name="poweron-ocr-general",
displayName="PowerOn OCR General",
connectorType="privatellm",
apiUrl=f"{self.baseUrl}/api/analyze",
temperature=0.1,
maxTokens=4096,
contextLength=8192, # deepseek-ocr actual context: 8K
costPer1kTokensInput=0.0, # Flat rate pricing
costPer1kTokensOutput=0.0, # Flat rate pricing
speedRating=8, # Fast due to smaller model
qualityRating=8,
functionCall=self.callAiText,
priority=PriorityEnum.COST,
processingMode=ProcessingModeEnum.BASIC,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.DATA_EXTRACT, 9),
(OperationTypeEnum.DATA_ANALYSE, 7),
),
version="deepseek-ocr",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: PRICE_TEXT_PER_CALL
),
"ollamaModel": "deepseek-ocr"
},
# Vision General Model (qwen2.5vl:7b: 8.29B, 125K context)
{
"model": AiModel(
name="poweron-vision-general",
displayName="PowerOn Vision General",
connectorType="privatellm",
apiUrl=f"{self.baseUrl}/api/analyze",
temperature=0.2,
maxTokens=8192,
contextLength=125000, # qwen2.5vl:7b actual context: 125K
costPer1kTokensInput=0.0, # Flat rate pricing
costPer1kTokensOutput=0.0, # Flat rate pricing
speedRating=7,
qualityRating=9,
functionCall=self.callAiVision,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.ADVANCED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.IMAGE_ANALYSE, 9),
(OperationTypeEnum.DATA_EXTRACT, 8),
),
version="qwen2.5vl:7b",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: PRICE_VISION_PER_CALL
),
"ollamaModel": "qwen2.5vl:7b"
},
# Vision Deep Model (granite3.2-vision: 2B, 16K context)
{
"model": AiModel(
name="poweron-vision-deep",
displayName="PowerOn Vision Deep",
connectorType="privatellm",
apiUrl=f"{self.baseUrl}/api/analyze",
temperature=0.1,
maxTokens=4096,
contextLength=16000, # granite3.2-vision actual context: 16K
costPer1kTokensInput=0.0, # Flat rate pricing
costPer1kTokensOutput=0.0, # Flat rate pricing
speedRating=9, # Fast due to small 2B model
qualityRating=8, # Good for document understanding
functionCall=self.callAiVision,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.IMAGE_ANALYSE, 9),
(OperationTypeEnum.DATA_EXTRACT, 9),
(OperationTypeEnum.DATA_ANALYSE, 8),
),
version="granite3.2-vision",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: PRICE_VISION_PER_CALL
),
"ollamaModel": "granite3.2-vision"
},
]
# Filter models by Ollama availability
availableModels = []
unavailableModels = []
for modelDef in modelDefinitions:
ollamaModelName = modelDef["ollamaModel"]
if self._isModelAvailableInOllama(ollamaModelName, availableOllamaModels):
availableModels.append(modelDef["model"])
else:
unavailableModels.append(modelDef["model"].name)
if unavailableModels:
logger.warning(
f"Private-LLM: {len(unavailableModels)} models not available in Ollama: {', '.join(unavailableModels)}. "
f"Install with: ollama pull <model-name>"
)
if availableModels:
logger.info(f"Private-LLM: {len(availableModels)} models available")
else:
logger.warning("Private-LLM: No models available. Check Ollama installation.")
return availableModels
async def callAiText(self, modelCall: AiModelCall) -> AiModelResponse:
"""
Call the Private-LLM API for text-based analysis.
Args:
modelCall: AiModelCall with messages
Returns:
AiModelResponse with content and metadata
"""
try:
messages = modelCall.messages
model = modelCall.model
# Extract prompt from messages
prompt = ""
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
prompt += content + "\n"
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
prompt += part.get("text", "") + "\n"
payload = {
"modelName": model.name,
"prompt": prompt.strip(),
"imageBase64": None
}
logger.debug(f"Calling Private-LLM text API with model {model.name}")
response = await self.httpClient.post(
model.apiUrl,
json=payload
)
if response.status_code != 200:
errorMessage = f"Private-LLM API error: {response.status_code} - {response.text}"
logger.error(errorMessage)
raise HTTPException(status_code=500, detail=errorMessage)
responseJson = response.json()
if not responseJson.get("success", False):
errorMsg = responseJson.get("error", "Unknown error")
logger.error(f"Private-LLM returned error: {errorMsg}")
return AiModelResponse(
content="",
success=False,
error=errorMsg
)
# Extract content from response
data = responseJson.get("data", {})
rawResponse = responseJson.get("rawResponse", "")
# Prefer rawResponse for full content, fall back to data
content = rawResponse if rawResponse else str(data.get("response", data))
return AiModelResponse(
content=content,
success=True,
modelId=model.name,
metadata={"data": data}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error calling Private-LLM text API: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error calling Private-LLM API: {str(e)}")
async def callAiVision(self, modelCall: AiModelCall) -> AiModelResponse:
"""
Call the Private-LLM API for vision-based analysis.
Args:
modelCall: AiModelCall with messages containing image data
Returns:
AiModelResponse with analysis content
"""
try:
messages = modelCall.messages
model = modelCall.model
# Extract prompt and image from messages
prompt = ""
imageBase64 = None
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
prompt += content + "\n"
elif isinstance(content, list):
for part in content:
if isinstance(part, dict):
if part.get("type") == "text":
prompt += part.get("text", "") + "\n"
elif part.get("type") == "image_url":
imageUrl = part.get("image_url", {}).get("url", "")
# Extract base64 from data URL
if imageUrl.startswith("data:"):
# Format: data:image/png;base64,<base64data>
parts = imageUrl.split(",", 1)
if len(parts) == 2:
imageBase64 = parts[1]
else:
imageBase64 = imageUrl
if not imageBase64:
logger.warning("No image provided for vision model call")
payload = {
"modelName": model.name,
"prompt": prompt.strip(),
"imageBase64": imageBase64
}
logger.debug(f"Calling Private-LLM vision API with model {model.name}")
response = await self.httpClient.post(
model.apiUrl,
json=payload
)
if response.status_code != 200:
errorMessage = f"Private-LLM API error: {response.status_code} - {response.text}"
logger.error(errorMessage)
raise HTTPException(status_code=500, detail=errorMessage)
responseJson = response.json()
if not responseJson.get("success", False):
errorMsg = responseJson.get("error", "Unknown error")
logger.error(f"Private-LLM returned error: {errorMsg}")
return AiModelResponse(
content="",
success=False,
error=errorMsg
)
# Extract content from response
data = responseJson.get("data", {})
rawResponse = responseJson.get("rawResponse", "")
# Prefer rawResponse for full content
content = rawResponse if rawResponse else str(data.get("response", data))
return AiModelResponse(
content=content,
success=True,
modelId=model.name,
metadata={"data": data}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error calling Private-LLM vision API: {str(e)}", exc_info=True)
return AiModelResponse(
content="",
success=False,
error=f"Error during vision analysis: {str(e)}"
)