gateway/modules/services/serviceAi/subUtilities.py

316 lines
14 KiB
Python

import logging
from typing import Dict, Any, List, Optional, Tuple, Union
from modules.datamodels.datamodelAi import ModelCapabilities, AiCallOptions
logger = logging.getLogger(__name__)
class SubUtilities:
"""Utility functions for text processing, debugging, and helper operations."""
def __init__(self, services):
"""Initialize utilities service.
Args:
services: Service center instance for accessing other services
"""
self.services = services
def _writeTraceLog(self, contextText: str, data: Any) -> None:
"""Write raw data to the central trace log file without truncation."""
try:
import os
import json
from datetime import datetime, UTC
# Only write if logger is in debug mode
if logger.level > logging.DEBUG:
return
# Get log directory from configuration via service center if possible
logDir = None
try:
logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./")
except Exception:
pass
if not logDir:
logDir = "./"
if not os.path.isabs(logDir):
# Make it relative to gateway directory
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
logDir = os.path.join(gatewayDir, logDir)
os.makedirs(logDir, exist_ok=True)
traceFile = os.path.join(logDir, "log_trace.log")
timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
traceEntry = f"[{timestamp}] {contextText}\n" + ("=" * 80) + "\n"
if data is None:
traceEntry += "No data provided\n"
else:
# Prefer exact text; if dict/list, pretty print JSON
try:
if isinstance(data, (dict, list)):
traceEntry += f"JSON Data:\n{json.dumps(data, indent=2, ensure_ascii=False)}\n"
else:
text = str(data)
traceEntry += f"Text Data:\n{text}\n"
except Exception:
traceEntry += f"Data (fallback): {str(data)}\n"
traceEntry += ("=" * 80) + "\n\n"
with open(traceFile, "a", encoding="utf-8") as f:
f.write(traceEntry)
except Exception:
# Swallow to avoid recursive logging issues
pass
def _writeAiResponseDebug(self, label: str, content: str, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None:
"""Persist raw AI response parts for debugging under test-chat/ai - only if debug enabled."""
try:
# Check if debug logging is enabled
debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
if not debug_enabled:
return
import os
from datetime import datetime, UTC
# Base dir: gateway/test-chat/ai (go up 4 levels from this file)
# .../gateway/modules/services/serviceAi/subUtilities.py -> up to gateway root
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
outDir = os.path.join(gatewayDir, 'test-chat', 'ai')
os.makedirs(outDir, exist_ok=True)
ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
suffix = []
if partIndex is not None:
suffix.append(f"part{partIndex}")
if continuation is not None:
suffix.append(f"cont_{str(continuation).lower()}")
if modelName:
safeModel = ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in modelName)
suffix.append(safeModel)
suffixStr = ('_' + '_'.join(suffix)) if suffix else ''
fname = f"{ts}_{label}{suffixStr}.txt"
fpath = os.path.join(outDir, fname)
with open(fpath, 'w', encoding='utf-8') as f:
f.write(content or '')
except Exception:
# Do not raise; best-effort debug write
pass
def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool:
"""
Check if text exceeds model token limit with safety margin.
"""
# Simple character-based estimation (4 chars per token)
estimated_tokens = len(text) // 4
max_tokens = int(model.maxTokens * (1 - safety_margin))
return estimated_tokens > max_tokens
def _reduceText(self, text: str, reduction_factor: float) -> str:
"""
Reduce text size by the specified factor.
"""
if reduction_factor >= 1.0:
return text
target_length = int(len(text) * reduction_factor)
return text[:target_length] + "... [reduced]"
def _extractTextFromContentParts(self, extracted_content) -> str:
"""
Extract text content from ExtractionService ContentPart objects.
"""
if not extracted_content or not hasattr(extracted_content, 'parts'):
return ""
text_parts = []
for part in extracted_content.parts:
if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']:
if hasattr(part, 'data') and part.data:
text_parts.append(part.data)
return "\n\n".join(text_parts)
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
"""
Build full prompt by replacing placeholders with their content.
Uses the new {{KEY:placeholder}} format.
"""
if not placeholders:
return prompt
full_prompt = prompt
for placeholder, content in placeholders.items():
# Replace both old format {{placeholder}} and new format {{KEY:placeholder}}
full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content)
full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content)
return full_prompt
def _reducePlanningPrompt(
self,
full_prompt: str,
placeholders: Optional[Dict[str, str]],
model: ModelCapabilities,
options: AiCallOptions
) -> str:
"""
Reduce planning prompt size by summarizing placeholders while preserving prompt structure.
"""
if not placeholders:
return self._reduceText(full_prompt, 0.7)
# Reduce placeholders while preserving prompt
reduced_placeholders = {}
for placeholder, content in placeholders.items():
if len(content) > 1000: # Only reduce long content
reduction_factor = 0.7
reduced_content = self._reduceText(content, reduction_factor)
reduced_placeholders[placeholder] = reduced_content
else:
reduced_placeholders[placeholder] = content
return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders)
def _reduceTextPrompt(
self,
prompt: str,
context: str,
model: ModelCapabilities,
options: AiCallOptions
) -> str:
"""
Reduce text prompt size using typeGroup-aware chunking and merging.
"""
max_size = int(model.maxTokens * (1 - options.safetyMargin))
if options.compressPrompt:
# Reduce both prompt and context
target_size = max_size
current_size = len(prompt) + len(context)
reduction_factor = (target_size * 0.7) / current_size
if reduction_factor < 1.0:
prompt = self._reduceText(prompt, reduction_factor)
context = self._reduceText(context, reduction_factor)
else:
# Only reduce context, preserve prompt integrity
max_context_size = max_size - len(prompt)
if len(context) > max_context_size:
reduction_factor = max_context_size / len(context)
context = self._reduceText(context, reduction_factor)
return prompt + "\n\n" + context if context else prompt
async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str:
"""Compress content to target size."""
if len(content.encode("utf-8")) <= targetSize:
return content
try:
compressionPrompt = f"""
Komprimiere den folgenden {contentType} auf maximal {targetSize} Zeichen,
behalte aber alle wichtigen Informationen bei:
{content}
Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
"""
# Service must not call connectors directly; use simple truncation fallback here
data = content.encode("utf-8")
return data[:targetSize].decode("utf-8", errors="ignore") + "... [truncated]"
except Exception as e:
logger.warning(f"AI compression failed, using truncation: {str(e)}")
return content[:targetSize] + "... [truncated]"
def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List], options: AiCallOptions) -> Dict[str, int]:
"""
Get model capabilities for content processing, including appropriate size limits for chunking.
"""
# Estimate total content size
prompt_size = len(prompt.encode('utf-8'))
document_size = 0
if documents:
# Rough estimate of document content size
for doc in documents:
document_size += getattr(doc, 'fileSize', 0) or 0
total_size = prompt_size + document_size
# Use AiObjects to select the best model for this content size
# We'll simulate the model selection by checking available models
from modules.interfaces.interfaceAiObjects import aiModels
# Find the best model for this content size and operation
best_model = None
best_context_length = 0
for model_name, model_info in aiModels.items():
context_length = model_info.get("contextLength", 0)
# Skip models with no context length or too small for content
if context_length == 0:
continue
# Check if model supports the operation type
capabilities = model_info.get("capabilities", [])
from modules.datamodels.datamodelAi import OperationType
if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities:
continue
elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities:
continue
elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities:
continue
elif "text_generation" not in capabilities:
continue
# Prefer models that can handle the content without chunking, but allow chunking if needed
if context_length >= total_size * 0.8: # 80% of content size
if context_length > best_context_length:
best_model = model_info
best_context_length = context_length
elif best_model is None: # Fallback to largest available model
if context_length > best_context_length:
best_model = model_info
best_context_length = context_length
# Fallback to a reasonable default if no model found
if best_model is None:
best_model = {
"contextLength": 128000, # GPT-4o default
"llmName": "gpt-4o"
}
# Calculate appropriate sizes
# Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters)
context_length_bytes = int(best_model["contextLength"] * 4)
max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length
text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks
image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks
logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}")
logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes")
logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes")
return {
"maxContextBytes": max_context_bytes,
"textChunkSize": text_chunk_size,
"imageChunkSize": image_chunk_size
}
def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]:
"""
Get models capable of handling the specific operation with capability filtering.
"""
# Use the actual AI objects model selection instead of hardcoded default
if hasattr(self, 'aiObjects') and self.aiObjects:
# Let AiObjects handle the model selection
return []
else:
# Fallback to default model if AiObjects not available
default_model = ModelCapabilities(
name="default",
maxTokens=4000,
capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"],
costPerToken=0.001,
processingTime=1.0,
isAvailable=True
)
return [default_model]