316 lines
14 KiB
Python
316 lines
14 KiB
Python
import logging
|
|
from typing import Dict, Any, List, Optional, Tuple, Union
|
|
from modules.datamodels.datamodelAi import ModelCapabilities, AiCallOptions
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SubUtilities:
|
|
"""Utility functions for text processing, debugging, and helper operations."""
|
|
|
|
def __init__(self, services):
|
|
"""Initialize utilities service.
|
|
|
|
Args:
|
|
services: Service center instance for accessing other services
|
|
"""
|
|
self.services = services
|
|
|
|
def _writeTraceLog(self, contextText: str, data: Any) -> None:
|
|
"""Write raw data to the central trace log file without truncation."""
|
|
try:
|
|
import os
|
|
import json
|
|
from datetime import datetime, UTC
|
|
# Only write if logger is in debug mode
|
|
if logger.level > logging.DEBUG:
|
|
return
|
|
# Get log directory from configuration via service center if possible
|
|
logDir = None
|
|
try:
|
|
logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./")
|
|
except Exception:
|
|
pass
|
|
if not logDir:
|
|
logDir = "./"
|
|
if not os.path.isabs(logDir):
|
|
# Make it relative to gateway directory
|
|
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
|
|
logDir = os.path.join(gatewayDir, logDir)
|
|
os.makedirs(logDir, exist_ok=True)
|
|
traceFile = os.path.join(logDir, "log_trace.log")
|
|
timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
|
|
traceEntry = f"[{timestamp}] {contextText}\n" + ("=" * 80) + "\n"
|
|
if data is None:
|
|
traceEntry += "No data provided\n"
|
|
else:
|
|
# Prefer exact text; if dict/list, pretty print JSON
|
|
try:
|
|
if isinstance(data, (dict, list)):
|
|
traceEntry += f"JSON Data:\n{json.dumps(data, indent=2, ensure_ascii=False)}\n"
|
|
else:
|
|
text = str(data)
|
|
traceEntry += f"Text Data:\n{text}\n"
|
|
except Exception:
|
|
traceEntry += f"Data (fallback): {str(data)}\n"
|
|
traceEntry += ("=" * 80) + "\n\n"
|
|
with open(traceFile, "a", encoding="utf-8") as f:
|
|
f.write(traceEntry)
|
|
except Exception:
|
|
# Swallow to avoid recursive logging issues
|
|
pass
|
|
|
|
def _writeAiResponseDebug(self, label: str, content: str, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None:
|
|
"""Persist raw AI response parts for debugging under test-chat/ai - only if debug enabled."""
|
|
try:
|
|
# Check if debug logging is enabled
|
|
debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
|
|
if not debug_enabled:
|
|
return
|
|
|
|
import os
|
|
from datetime import datetime, UTC
|
|
# Base dir: gateway/test-chat/ai (go up 4 levels from this file)
|
|
# .../gateway/modules/services/serviceAi/subUtilities.py -> up to gateway root
|
|
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
|
|
outDir = os.path.join(gatewayDir, 'test-chat', 'ai')
|
|
os.makedirs(outDir, exist_ok=True)
|
|
ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
|
|
suffix = []
|
|
if partIndex is not None:
|
|
suffix.append(f"part{partIndex}")
|
|
if continuation is not None:
|
|
suffix.append(f"cont_{str(continuation).lower()}")
|
|
if modelName:
|
|
safeModel = ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in modelName)
|
|
suffix.append(safeModel)
|
|
suffixStr = ('_' + '_'.join(suffix)) if suffix else ''
|
|
fname = f"{ts}_{label}{suffixStr}.txt"
|
|
fpath = os.path.join(outDir, fname)
|
|
with open(fpath, 'w', encoding='utf-8') as f:
|
|
f.write(content or '')
|
|
except Exception:
|
|
# Do not raise; best-effort debug write
|
|
pass
|
|
|
|
def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool:
|
|
"""
|
|
Check if text exceeds model token limit with safety margin.
|
|
"""
|
|
# Simple character-based estimation (4 chars per token)
|
|
estimated_tokens = len(text) // 4
|
|
max_tokens = int(model.maxTokens * (1 - safety_margin))
|
|
return estimated_tokens > max_tokens
|
|
|
|
def _reduceText(self, text: str, reduction_factor: float) -> str:
|
|
"""
|
|
Reduce text size by the specified factor.
|
|
"""
|
|
if reduction_factor >= 1.0:
|
|
return text
|
|
|
|
target_length = int(len(text) * reduction_factor)
|
|
return text[:target_length] + "... [reduced]"
|
|
|
|
def _extractTextFromContentParts(self, extracted_content) -> str:
|
|
"""
|
|
Extract text content from ExtractionService ContentPart objects.
|
|
"""
|
|
if not extracted_content or not hasattr(extracted_content, 'parts'):
|
|
return ""
|
|
|
|
text_parts = []
|
|
for part in extracted_content.parts:
|
|
if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']:
|
|
if hasattr(part, 'data') and part.data:
|
|
text_parts.append(part.data)
|
|
|
|
return "\n\n".join(text_parts)
|
|
|
|
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
|
|
"""
|
|
Build full prompt by replacing placeholders with their content.
|
|
Uses the new {{KEY:placeholder}} format.
|
|
"""
|
|
if not placeholders:
|
|
return prompt
|
|
|
|
full_prompt = prompt
|
|
for placeholder, content in placeholders.items():
|
|
# Replace both old format {{placeholder}} and new format {{KEY:placeholder}}
|
|
full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content)
|
|
full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content)
|
|
|
|
return full_prompt
|
|
|
|
def _reducePlanningPrompt(
|
|
self,
|
|
full_prompt: str,
|
|
placeholders: Optional[Dict[str, str]],
|
|
model: ModelCapabilities,
|
|
options: AiCallOptions
|
|
) -> str:
|
|
"""
|
|
Reduce planning prompt size by summarizing placeholders while preserving prompt structure.
|
|
"""
|
|
if not placeholders:
|
|
return self._reduceText(full_prompt, 0.7)
|
|
|
|
# Reduce placeholders while preserving prompt
|
|
reduced_placeholders = {}
|
|
for placeholder, content in placeholders.items():
|
|
if len(content) > 1000: # Only reduce long content
|
|
reduction_factor = 0.7
|
|
reduced_content = self._reduceText(content, reduction_factor)
|
|
reduced_placeholders[placeholder] = reduced_content
|
|
else:
|
|
reduced_placeholders[placeholder] = content
|
|
|
|
return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders)
|
|
|
|
def _reduceTextPrompt(
|
|
self,
|
|
prompt: str,
|
|
context: str,
|
|
model: ModelCapabilities,
|
|
options: AiCallOptions
|
|
) -> str:
|
|
"""
|
|
Reduce text prompt size using typeGroup-aware chunking and merging.
|
|
"""
|
|
max_size = int(model.maxTokens * (1 - options.safetyMargin))
|
|
|
|
if options.compressPrompt:
|
|
# Reduce both prompt and context
|
|
target_size = max_size
|
|
current_size = len(prompt) + len(context)
|
|
reduction_factor = (target_size * 0.7) / current_size
|
|
|
|
if reduction_factor < 1.0:
|
|
prompt = self._reduceText(prompt, reduction_factor)
|
|
context = self._reduceText(context, reduction_factor)
|
|
else:
|
|
# Only reduce context, preserve prompt integrity
|
|
max_context_size = max_size - len(prompt)
|
|
if len(context) > max_context_size:
|
|
reduction_factor = max_context_size / len(context)
|
|
context = self._reduceText(context, reduction_factor)
|
|
|
|
return prompt + "\n\n" + context if context else prompt
|
|
|
|
async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str:
|
|
"""Compress content to target size."""
|
|
if len(content.encode("utf-8")) <= targetSize:
|
|
return content
|
|
|
|
try:
|
|
compressionPrompt = f"""
|
|
Komprimiere den folgenden {contentType} auf maximal {targetSize} Zeichen,
|
|
behalte aber alle wichtigen Informationen bei:
|
|
|
|
{content}
|
|
|
|
Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
|
|
"""
|
|
|
|
# Service must not call connectors directly; use simple truncation fallback here
|
|
data = content.encode("utf-8")
|
|
return data[:targetSize].decode("utf-8", errors="ignore") + "... [truncated]"
|
|
except Exception as e:
|
|
logger.warning(f"AI compression failed, using truncation: {str(e)}")
|
|
return content[:targetSize] + "... [truncated]"
|
|
|
|
def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List], options: AiCallOptions) -> Dict[str, int]:
|
|
"""
|
|
Get model capabilities for content processing, including appropriate size limits for chunking.
|
|
"""
|
|
# Estimate total content size
|
|
prompt_size = len(prompt.encode('utf-8'))
|
|
document_size = 0
|
|
if documents:
|
|
# Rough estimate of document content size
|
|
for doc in documents:
|
|
document_size += getattr(doc, 'fileSize', 0) or 0
|
|
|
|
total_size = prompt_size + document_size
|
|
|
|
# Use AiObjects to select the best model for this content size
|
|
# We'll simulate the model selection by checking available models
|
|
from modules.interfaces.interfaceAiObjects import aiModels
|
|
|
|
# Find the best model for this content size and operation
|
|
best_model = None
|
|
best_context_length = 0
|
|
|
|
for model_name, model_info in aiModels.items():
|
|
context_length = model_info.get("contextLength", 0)
|
|
|
|
# Skip models with no context length or too small for content
|
|
if context_length == 0:
|
|
continue
|
|
|
|
# Check if model supports the operation type
|
|
capabilities = model_info.get("capabilities", [])
|
|
from modules.datamodels.datamodelAi import OperationType
|
|
if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities:
|
|
continue
|
|
elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities:
|
|
continue
|
|
elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities:
|
|
continue
|
|
elif "text_generation" not in capabilities:
|
|
continue
|
|
|
|
# Prefer models that can handle the content without chunking, but allow chunking if needed
|
|
if context_length >= total_size * 0.8: # 80% of content size
|
|
if context_length > best_context_length:
|
|
best_model = model_info
|
|
best_context_length = context_length
|
|
elif best_model is None: # Fallback to largest available model
|
|
if context_length > best_context_length:
|
|
best_model = model_info
|
|
best_context_length = context_length
|
|
|
|
# Fallback to a reasonable default if no model found
|
|
if best_model is None:
|
|
best_model = {
|
|
"contextLength": 128000, # GPT-4o default
|
|
"llmName": "gpt-4o"
|
|
}
|
|
|
|
# Calculate appropriate sizes
|
|
# Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters)
|
|
context_length_bytes = int(best_model["contextLength"] * 4)
|
|
max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length
|
|
text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks
|
|
image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks
|
|
|
|
logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}")
|
|
logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes")
|
|
logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes")
|
|
|
|
return {
|
|
"maxContextBytes": max_context_bytes,
|
|
"textChunkSize": text_chunk_size,
|
|
"imageChunkSize": image_chunk_size
|
|
}
|
|
|
|
def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]:
|
|
"""
|
|
Get models capable of handling the specific operation with capability filtering.
|
|
"""
|
|
# Use the actual AI objects model selection instead of hardcoded default
|
|
if hasattr(self, 'aiObjects') and self.aiObjects:
|
|
# Let AiObjects handle the model selection
|
|
return []
|
|
else:
|
|
# Fallback to default model if AiObjects not available
|
|
default_model = ModelCapabilities(
|
|
name="default",
|
|
maxTokens=4000,
|
|
capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"],
|
|
costPerToken=0.001,
|
|
processingTime=1.0,
|
|
isAvailable=True
|
|
)
|
|
return [default_model]
|