platform-core/modules/aicore/aicorePluginPrivateLlm.py
ValueOn AG d61e29bcac
Some checks failed
Deploy Plattform-Core (Int) / test (push) Failing after 24s
Deploy Plattform-Core (Int) / deploy (push) Has been skipped
fixes private model and udb scoping sources
2026-06-03 09:37:03 +02:00

604 lines
25 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
AI Connector for PowerOn Private-LLM Service.
Connects to the private-llm service running on-premise with Ollama backend.
Provides OCR and Vision capabilities via local AI models.
Models (current — L4 24 GB):
- poweron-text-general: Text (qwen2.5:7b); NEUTRALIZATION_TEXT + data/plan ops
- poweron-vision-general: Vision (qwen2.5vl:7b); IMAGE_ANALYSE + NEUTRALIZATION_IMAGE
- poweron-vision-deep: Vision (granite3.2); IMAGE_ANALYSE + NEUTRALIZATION_IMAGE
Models (next-gen — RTX PRO 6000 96 GB, auto-activated when pulled in Ollama):
- poweron-text-reasoning: Reasoning (deepseek-r1:70b); complex logic, math, planning
- poweron-vision-general: Vision (llama4:scout); multimodal, long-context documents
- poweron-embed: Embedding (nomic-embed-text); local RAG embedding
Pricing: byte-based (~per-token via bytes/4), configured via the PRICE_* constants below.
"""
import logging
import httpx
import time
from typing import List, Optional, Dict, Any
from fastapi import HTTPException
from modules.shared.configuration import APP_CONFIG
from .aicoreBase import BaseConnectorAi, RateLimitExceededException
from modules.datamodels.datamodelAi import (
AiModel,
PriorityEnum,
ProcessingModeEnum,
OperationTypeEnum,
AiModelCall,
AiModelResponse,
createOperationTypeRatings
)
# Configure logger
logger = logging.getLogger(__name__)
# Pricing constants (CHF per 1k tokens; billed byte-based via bytes/4 ~ 1 token)
PRICE_INPUT_PER_1K = 0.0075
PRICE_OUTPUT_PER_1K = 0.0375
PRICE_EMBED_PER_1K = 0.0005
def _calcPrivatePriceCHF(processingTime, bytesSent, bytesReceived):
"""Byte-based price for private text/vision/reasoning models."""
return (bytesSent / 4 / 1000) * PRICE_INPUT_PER_1K + (bytesReceived / 4 / 1000) * PRICE_OUTPUT_PER_1K
def _calcPrivateEmbedPriceCHF(processingTime, bytesSent, bytesReceived):
"""Byte-based price for private embedding (input only)."""
return (bytesSent / 4 / 1000) * PRICE_EMBED_PER_1K
# Private-LLM Service URL (fix, nicht via env konfigurierbar)
PRIVATE_LLM_BASE_URL = "https://llm.poweron.swiss:8000"
def _loadConfigData():
"""Load configuration data for Private-LLM connector."""
return {
"apiKey": APP_CONFIG.get("Connector_AiPrivateLlm_API_SECRET"),
"baseUrl": PRIVATE_LLM_BASE_URL,
}
class AiPrivateLlm(BaseConnectorAi):
"""Connector for communication with the PowerOn Private-LLM Service."""
def __init__(self):
super().__init__()
# Load configuration
self.config = _loadConfigData()
self.apiKey = self.config["apiKey"]
self.baseUrl = self.config["baseUrl"]
# HTTP client for API calls
# Timeout set to 3600 seconds (60 minutes) for large model processing
headers = {"Content-Type": "application/json"}
if self.apiKey:
headers["X-API-Key"] = self.apiKey
self.httpClient = httpx.AsyncClient(
timeout=3600.0,
headers=headers
)
# Cache for service availability check
self._serviceAvailable: Optional[bool] = None
self._availableOllamaModels: Optional[List[str]] = None
self._lastAvailabilityCheck: float = 0
self._availabilityCacheTtl: float = 60.0 # 60 seconds cache
logger.info(f"Private-LLM Connector initialized (URL: {self.baseUrl})")
def getConnectorType(self) -> str:
"""Get the connector type identifier."""
return "privatellm"
def _checkServiceAvailability(self) -> Dict[str, Any]:
"""
Check if the Private-LLM service is available and which Ollama models are installed.
Uses caching to avoid excessive health checks.
Returns:
Dict with 'serviceAvailable', 'ollamaConnected', 'availableModels'
"""
import asyncio
currentTime = time.time()
# Return cached result if still valid
if (self._serviceAvailable is not None and
currentTime - self._lastAvailabilityCheck < self._availabilityCacheTtl):
return {
"serviceAvailable": self._serviceAvailable,
"ollamaConnected": self._serviceAvailable,
"availableModels": self._availableOllamaModels or []
}
# Perform availability check
try:
# Use synchronous client for blocking check during initialization
with httpx.Client(timeout=5.0) as client:
headers = {"Content-Type": "application/json"}
if self.apiKey:
headers["X-API-Key"] = self.apiKey
# Check health endpoint
healthResponse = client.get(
f"{self.baseUrl}/api/health",
headers=headers
)
if healthResponse.status_code != 200:
logger.warning(f"Private-LLM service not available: HTTP {healthResponse.status_code}")
self._serviceAvailable = False
self._availableOllamaModels = []
self._lastAvailabilityCheck = currentTime
return {"serviceAvailable": False, "ollamaConnected": False, "availableModels": []}
healthData = healthResponse.json()
ollamaConnected = healthData.get("ollamaConnected", False)
if not ollamaConnected:
logger.warning("Private-LLM service available but Ollama not connected")
self._serviceAvailable = True
self._availableOllamaModels = []
self._lastAvailabilityCheck = currentTime
return {"serviceAvailable": True, "ollamaConnected": False, "availableModels": []}
# Check Ollama status for available models
statusResponse = client.get(
f"{self.baseUrl}/api/ollama/status",
headers=headers
)
if statusResponse.status_code == 200:
statusData = statusResponse.json()
self._availableOllamaModels = statusData.get("models", [])
else:
self._availableOllamaModels = []
self._serviceAvailable = True
self._lastAvailabilityCheck = currentTime
logger.info(f"Private-LLM availability check: service=OK, ollama=OK, models={len(self._availableOllamaModels)}")
return {
"serviceAvailable": True,
"ollamaConnected": True,
"availableModels": self._availableOllamaModels
}
except httpx.ConnectError:
logger.warning(f"Private-LLM service not reachable at {self.baseUrl}")
self._serviceAvailable = False
self._availableOllamaModels = []
self._lastAvailabilityCheck = currentTime
return {"serviceAvailable": False, "ollamaConnected": False, "availableModels": []}
except Exception as e:
logger.warning(f"Error checking Private-LLM availability: {e}")
self._serviceAvailable = False
self._availableOllamaModels = []
self._lastAvailabilityCheck = currentTime
return {"serviceAvailable": False, "ollamaConnected": False, "availableModels": []}
def _isModelAvailableInOllama(self, ollamaModelName: str, availableModels: List[str]) -> bool:
"""
Check if a model is available in Ollama.
Handles model name variations (with/without tags).
"""
if not availableModels:
return False
# Direct match
if ollamaModelName in availableModels:
return True
# Check without tag (e.g., "qwen2.5vl:72b" -> "qwen2.5vl")
baseModelName = ollamaModelName.split(":")[0]
for availModel in availableModels:
availBase = availModel.split(":")[0]
if baseModelName == availBase:
return True
return False
def getModels(self) -> List[AiModel]:
"""
Get all available Private-LLM models.
Checks service availability and returns only models that are actually available
in the connected Ollama instance. Returns empty list if service is not reachable.
"""
# Check service availability
availability = self._checkServiceAvailability()
if not availability["serviceAvailable"]:
logger.warning("Private-LLM service not available - no models returned")
return []
if not availability["ollamaConnected"]:
logger.warning("Private-LLM service available but Ollama not connected - no models returned")
return []
availableOllamaModels = availability.get("availableModels", [])
# Define all models with their Ollama backend names
# Actual model specs (for 31GB RAM + 22GB GPU server):
# Context sizes reduced to fit in available RAM
# - qwen2.5:7b: 7.6B params, ~4.7GB RAM (Text) - 8K context
# - qwen2.5vl:7b: 8.29B params, ~6GB RAM (Vision) - 4K context
# - granite3.2-vision: 2B params, ~2.4GB RAM (Vision) - 4K context
# - deepseek-ocr: ~6.7GB RAM (OCR) - 4K context
modelDefinitions = [
# Text Model (qwen2.5:7b: 7.6B)
{
"model": AiModel(
name="poweron-text-general",
displayName="PowerOn Text General",
connectorType="privatellm",
apiUrl=f"{self.baseUrl}/api/analyze",
temperature=0.1,
maxTokens=4096,
contextLength=8192, # Reduced for RAM constraints
costPer1kTokensInput=PRICE_INPUT_PER_1K,
costPer1kTokensOutput=PRICE_OUTPUT_PER_1K,
speedRating=8, # Fast and efficient
qualityRating=9, # High quality text model
functionCall=self.callAiText,
priority=PriorityEnum.COST,
processingMode=ProcessingModeEnum.BASIC,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.PLAN, 7),
(OperationTypeEnum.DATA_ANALYSE, 8),
(OperationTypeEnum.DATA_GENERATE, 8),
(OperationTypeEnum.DATA_EXTRACT, 8),
(OperationTypeEnum.NEUTRALIZATION_TEXT, 9),
# Agent loop (workspace etc.) selects models by OperationTypeEnum.AGENT for streaming.
(OperationTypeEnum.AGENT, 8),
),
version="qwen2.5:7b",
calculatepriceCHF=_calcPrivatePriceCHF
),
"ollamaModel": "qwen2.5:7b"
},
# Vision General Model (qwen2.5vl:7b: 8.29B)
{
"model": AiModel(
name="poweron-vision-general",
displayName="PowerOn Vision General",
connectorType="privatellm",
apiUrl=f"{self.baseUrl}/api/analyze",
temperature=0.2,
maxTokens=2048,
contextLength=4096, # Reduced for RAM constraints (vision needs more)
costPer1kTokensInput=PRICE_INPUT_PER_1K,
costPer1kTokensOutput=PRICE_OUTPUT_PER_1K,
speedRating=7,
qualityRating=9,
functionCall=self.callAiVision,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.ADVANCED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.IMAGE_ANALYSE, 9),
(OperationTypeEnum.NEUTRALIZATION_IMAGE, 9),
),
version="qwen2.5vl:7b",
calculatepriceCHF=_calcPrivatePriceCHF
),
"ollamaModel": "qwen2.5vl:7b"
},
# Vision Deep Model (granite3.2-vision: 2B)
{
"model": AiModel(
name="poweron-vision-deep",
displayName="PowerOn Vision Deep",
connectorType="privatellm",
apiUrl=f"{self.baseUrl}/api/analyze",
temperature=0.1,
maxTokens=2048,
contextLength=4096, # Reduced for RAM constraints
costPer1kTokensInput=PRICE_INPUT_PER_1K,
costPer1kTokensOutput=PRICE_OUTPUT_PER_1K,
speedRating=9, # Fast due to small 2B model
qualityRating=8, # Good for document understanding
functionCall=self.callAiVision,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.IMAGE_ANALYSE, 9),
(OperationTypeEnum.NEUTRALIZATION_IMAGE, 9),
),
version="granite3.2-vision",
calculatepriceCHF=_calcPrivatePriceCHF
),
"ollamaModel": "granite3.2-vision"
},
# --- Next-gen models (auto-activated when available in Ollama) ---
# Reasoning Model (deepseek-r1:70b — chain-of-thought, math, logic)
{
"model": AiModel(
name="poweron-text-reasoning",
displayName="PowerOn Reasoning",
connectorType="privatellm",
apiUrl=f"{self.baseUrl}/api/analyze",
temperature=0.1,
maxTokens=8192,
contextLength=65536,
costPer1kTokensInput=PRICE_INPUT_PER_1K,
costPer1kTokensOutput=PRICE_OUTPUT_PER_1K,
speedRating=5,
qualityRating=10,
functionCall=self.callAiText,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.PLAN, 10),
(OperationTypeEnum.DATA_ANALYSE, 10),
(OperationTypeEnum.DATA_GENERATE, 9),
(OperationTypeEnum.DATA_EXTRACT, 9),
(OperationTypeEnum.NEUTRALIZATION_TEXT, 10),
(OperationTypeEnum.AGENT, 9),
),
version="deepseek-r1:70b",
calculatepriceCHF=_calcPrivatePriceCHF
),
"ollamaModel": "deepseek-r1:70b"
},
# Vision Multimodal (llama4:scout — native vision, 10M context)
{
"model": AiModel(
name="poweron-vision-multimodal",
displayName="PowerOn Vision Multimodal",
connectorType="privatellm",
apiUrl=f"{self.baseUrl}/api/analyze",
temperature=0.2,
maxTokens=4096,
contextLength=131072,
costPer1kTokensInput=PRICE_INPUT_PER_1K,
costPer1kTokensOutput=PRICE_OUTPUT_PER_1K,
speedRating=7,
qualityRating=10,
functionCall=self.callAiVision,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.IMAGE_ANALYSE, 10),
(OperationTypeEnum.NEUTRALIZATION_IMAGE, 10),
),
version="llama4:scout",
calculatepriceCHF=_calcPrivatePriceCHF
),
"ollamaModel": "llama4:scout"
},
# Local Embedding (nomic-embed-text — replaces OpenAI text-embedding-3-small)
{
"model": AiModel(
name="poweron-embed",
displayName="PowerOn Embedding",
connectorType="privatellm",
apiUrl=f"{self.baseUrl}/v1/embeddings",
temperature=0.0,
maxTokens=0,
contextLength=8192,
costPer1kTokensInput=PRICE_EMBED_PER_1K,
costPer1kTokensOutput=0.0,
speedRating=10,
qualityRating=8,
functionCall=self.callAiText,
priority=PriorityEnum.COST,
processingMode=ProcessingModeEnum.BASIC,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.EMBEDDING, 9),
),
version="nomic-embed-text",
calculatepriceCHF=_calcPrivateEmbedPriceCHF
),
"ollamaModel": "nomic-embed-text"
},
]
# Filter models by Ollama availability
availableModels = []
unavailableModels = []
for modelDef in modelDefinitions:
ollamaModelName = modelDef["ollamaModel"]
if self._isModelAvailableInOllama(ollamaModelName, availableOllamaModels):
availableModels.append(modelDef["model"])
else:
unavailableModels.append(modelDef["model"].name)
if unavailableModels:
logger.info(
f"Private-LLM: {len(unavailableModels)} models not available in Ollama: {', '.join(unavailableModels)}. "
f"Install with: ollama pull <model-name>"
)
if availableModels:
logger.info(f"Private-LLM: {len(availableModels)} models available")
else:
logger.warning("Private-LLM: No models available. Check Ollama installation.")
return availableModels
async def callAiText(self, modelCall: AiModelCall) -> AiModelResponse:
"""
Call the Private-LLM API for text-based analysis.
Args:
modelCall: AiModelCall with messages
Returns:
AiModelResponse with content and metadata
"""
try:
messages = modelCall.messages
model = modelCall.model
# Extract prompt from messages
prompt = ""
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
prompt += content + "\n"
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
prompt += part.get("text", "") + "\n"
payload = {
"modelName": model.name,
"prompt": prompt.strip(),
"imageBase64": None
}
logger.debug(f"Calling Private-LLM text API with model {model.name}")
response = await self.httpClient.post(
model.apiUrl,
json=payload
)
if response.status_code != 200:
errorMessage = f"Private-LLM API error: {response.status_code} - {response.text}"
if response.status_code == 429:
logger.warning(errorMessage)
raise RateLimitExceededException(errorMessage)
logger.error(errorMessage)
raise HTTPException(status_code=500, detail=errorMessage)
responseJson = response.json()
if not responseJson.get("success", False):
errorMsg = responseJson.get("error", "Unknown error")
logger.error(f"Private-LLM returned error: {errorMsg}")
return AiModelResponse(
content="",
success=False,
error=errorMsg
)
# Extract content from response
data = responseJson.get("data", {})
rawResponse = responseJson.get("rawResponse", "")
# Prefer rawResponse for full content, fall back to data
content = rawResponse if rawResponse else str(data.get("response", data))
return AiModelResponse(
content=content,
success=True,
modelId=model.name,
metadata={"data": data}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error calling Private-LLM text API: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error calling Private-LLM API: {str(e)}")
async def callAiVision(self, modelCall: AiModelCall) -> AiModelResponse:
"""
Call the Private-LLM API for vision-based analysis.
Args:
modelCall: AiModelCall with messages containing image data
Returns:
AiModelResponse with analysis content
"""
try:
messages = modelCall.messages
model = modelCall.model
# Extract prompt and image from messages
prompt = ""
imageBase64 = None
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
prompt += content + "\n"
elif isinstance(content, list):
for part in content:
if isinstance(part, dict):
if part.get("type") == "text":
prompt += part.get("text", "") + "\n"
elif part.get("type") == "image_url":
imageUrl = part.get("image_url", {}).get("url", "")
# Extract base64 from data URL
if imageUrl.startswith("data:"):
# Format: data:image/png;base64,<base64data>
parts = imageUrl.split(",", 1)
if len(parts) == 2:
imageBase64 = parts[1]
else:
imageBase64 = imageUrl
if not imageBase64:
logger.warning("No image provided for vision model call")
payload = {
"modelName": model.name,
"prompt": prompt.strip(),
"imageBase64": imageBase64
}
logger.debug(f"Calling Private-LLM vision API with model {model.name}")
response = await self.httpClient.post(
model.apiUrl,
json=payload
)
if response.status_code != 200:
errorMessage = f"Private-LLM API error: {response.status_code} - {response.text}"
if response.status_code == 429:
logger.warning(errorMessage)
raise RateLimitExceededException(errorMessage)
logger.error(errorMessage)
raise HTTPException(status_code=500, detail=errorMessage)
responseJson = response.json()
if not responseJson.get("success", False):
errorMsg = responseJson.get("error", "Unknown error")
logger.error(f"Private-LLM returned error: {errorMsg}")
return AiModelResponse(
content="",
success=False,
error=errorMsg
)
# Extract content from response
data = responseJson.get("data", {})
rawResponse = responseJson.get("rawResponse", "")
# Prefer rawResponse for full content
content = rawResponse if rawResponse else str(data.get("response", data))
return AiModelResponse(
content=content,
success=True,
modelId=model.name,
metadata={"data": data}
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error calling Private-LLM vision API: {str(e)}", exc_info=True)
return AiModelResponse(
content="",
success=False,
error=f"Error during vision analysis: {str(e)}"
)