import logging from typing import Dict, Any, List, Optional, Tuple, Union from modules.datamodels.datamodelAi import ModelCapabilities, AiCallOptions logger = logging.getLogger(__name__) class SubUtilities: """Utility functions for text processing, debugging, and helper operations.""" def __init__(self, services): """Initialize utilities service. Args: services: Service center instance for accessing other services """ self.services = services def _writeTraceLog(self, contextText: str, data: Any) -> None: """Write raw data to the central trace log file without truncation.""" try: import os import json from datetime import datetime, UTC # Only write if logger is in debug mode if logger.level > logging.DEBUG: return # Get log directory from configuration via service center if possible logDir = None try: logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./") except Exception: pass if not logDir: logDir = "./" if not os.path.isabs(logDir): # Make it relative to gateway directory gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) logDir = os.path.join(gatewayDir, logDir) os.makedirs(logDir, exist_ok=True) traceFile = os.path.join(logDir, "log_trace.log") timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] traceEntry = f"[{timestamp}] {contextText}\n" + ("=" * 80) + "\n" if data is None: traceEntry += "No data provided\n" else: # Prefer exact text; if dict/list, pretty print JSON try: if isinstance(data, (dict, list)): traceEntry += f"JSON Data:\n{json.dumps(data, indent=2, ensure_ascii=False)}\n" else: text = str(data) traceEntry += f"Text Data:\n{text}\n" except Exception: traceEntry += f"Data (fallback): {str(data)}\n" traceEntry += ("=" * 80) + "\n\n" with open(traceFile, "a", encoding="utf-8") as f: f.write(traceEntry) except Exception: # Swallow to avoid recursive logging issues pass def _writeAiResponseDebug(self, label: str, content: str, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None: """Persist raw AI response parts for debugging under configured log directory - only if debug enabled.""" try: # Check if debug logging is enabled debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False) if not debug_enabled: return import os from datetime import datetime, UTC # Use configured log directory instead of hardcoded test-chat from modules.shared.configuration import APP_CONFIG logDir = APP_CONFIG.get("APP_LOGGING_LOG_DIR", "./") if not os.path.isabs(logDir): gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) logDir = os.path.join(gatewayDir, logDir) outDir = os.path.join(logDir, 'debug') os.makedirs(outDir, exist_ok=True) ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3] suffix = [] if partIndex is not None: suffix.append(f"part{partIndex}") if continuation is not None: suffix.append(f"cont_{str(continuation).lower()}") if modelName: safeModel = ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in modelName) suffix.append(safeModel) suffixStr = ('_' + '_'.join(suffix)) if suffix else '' fname = f"{ts}_{label}{suffixStr}.txt" fpath = os.path.join(outDir, fname) with open(fpath, 'w', encoding='utf-8') as f: f.write(content or '') except Exception: # Do not raise; best-effort debug write pass def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool: """ Check if text exceeds model token limit with safety margin. """ # Simple character-based estimation (4 chars per token) estimated_tokens = len(text) // 4 max_tokens = int(model.maxTokens * (1 - safety_margin)) return estimated_tokens > max_tokens def _reduceText(self, text: str, reduction_factor: float) -> str: """ Reduce text size by the specified factor. """ if reduction_factor >= 1.0: return text target_length = int(len(text) * reduction_factor) return text[:target_length] + "... [reduced]" def _extractTextFromContentParts(self, extracted_content) -> str: """ Extract text content from ExtractionService ContentPart objects. """ if not extracted_content or not hasattr(extracted_content, 'parts'): return "" text_parts = [] for part in extracted_content.parts: if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']: if hasattr(part, 'data') and part.data: text_parts.append(part.data) return "\n\n".join(text_parts) def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str: """ Build full prompt by replacing placeholders with their content. Uses the new {{KEY:placeholder}} format. """ if not placeholders: return prompt full_prompt = prompt for placeholder, content in placeholders.items(): # Replace both old format {{placeholder}} and new format {{KEY:placeholder}} full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content) full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content) return full_prompt def _reducePlanningPrompt( self, full_prompt: str, placeholders: Optional[Dict[str, str]], model: ModelCapabilities, options: AiCallOptions ) -> str: """ Reduce planning prompt size by summarizing placeholders while preserving prompt structure. """ if not placeholders: return self._reduceText(full_prompt, 0.7) # Reduce placeholders while preserving prompt reduced_placeholders = {} for placeholder, content in placeholders.items(): if len(content) > 1000: # Only reduce long content reduction_factor = 0.7 reduced_content = self._reduceText(content, reduction_factor) reduced_placeholders[placeholder] = reduced_content else: reduced_placeholders[placeholder] = content return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders) def _reduceTextPrompt( self, prompt: str, context: str, model: ModelCapabilities, options: AiCallOptions ) -> str: """ Reduce text prompt size using typeGroup-aware chunking and merging. """ max_size = int(model.maxTokens * (1 - options.safetyMargin)) if options.compressPrompt: # Reduce both prompt and context target_size = max_size current_size = len(prompt) + len(context) reduction_factor = (target_size * 0.7) / current_size if reduction_factor < 1.0: prompt = self._reduceText(prompt, reduction_factor) context = self._reduceText(context, reduction_factor) else: # Only reduce context, preserve prompt integrity max_context_size = max_size - len(prompt) if len(context) > max_context_size: reduction_factor = max_context_size / len(context) context = self._reduceText(context, reduction_factor) return prompt + "\n\n" + context if context else prompt async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str: """Compress content to target size.""" if len(content.encode("utf-8")) <= targetSize: return content try: compressionPrompt = f""" Komprimiere den folgenden {contentType} auf maximal {targetSize} Zeichen, behalte aber alle wichtigen Informationen bei: {content} Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen. """ # Service must not call connectors directly; use simple truncation fallback here data = content.encode("utf-8") return data[:targetSize].decode("utf-8", errors="ignore") + "... [truncated]" except Exception as e: logger.warning(f"AI compression failed, using truncation: {str(e)}") return content[:targetSize] + "... [truncated]" def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List], options: AiCallOptions) -> Dict[str, int]: """ Get model capabilities for content processing, including appropriate size limits for chunking. """ # Estimate total content size prompt_size = len(prompt.encode('utf-8')) document_size = 0 if documents: # Rough estimate of document content size for doc in documents: document_size += getattr(doc, 'fileSize', 0) or 0 total_size = prompt_size + document_size # Use AiObjects to select the best model for this content size # We'll simulate the model selection by checking available models from modules.interfaces.interfaceAiObjects import aiModels # Find the best model for this content size and operation best_model = None best_context_length = 0 for model_name, model_info in aiModels.items(): context_length = model_info.get("contextLength", 0) # Skip models with no context length or too small for content if context_length == 0: continue # Check if model supports the operation type capabilities = model_info.get("capabilities", []) from modules.datamodels.datamodelAi import OperationType if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities: continue elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities: continue elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities: continue elif "text_generation" not in capabilities: continue # Prefer models that can handle the content without chunking, but allow chunking if needed if context_length >= total_size * 0.8: # 80% of content size if context_length > best_context_length: best_model = model_info best_context_length = context_length elif best_model is None: # Fallback to largest available model if context_length > best_context_length: best_model = model_info best_context_length = context_length # Fallback to a reasonable default if no model found if best_model is None: best_model = { "contextLength": 128000, # GPT-4o default "llmName": "gpt-4o" } # Calculate appropriate sizes # Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters) context_length_bytes = int(best_model["contextLength"] * 4) max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}") logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes") logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes") return { "maxContextBytes": max_context_bytes, "textChunkSize": text_chunk_size, "imageChunkSize": image_chunk_size } def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]: """ Get models capable of handling the specific operation with capability filtering. """ # Use the actual AI objects model selection instead of hardcoded default if hasattr(self, 'aiObjects') and self.aiObjects: # Let AiObjects handle the model selection return [] else: # Fallback to default model if AiObjects not available default_model = ModelCapabilities( name="default", maxTokens=4000, capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"], costPerToken=0.001, processingTime=1.0, isAvailable=True ) return [default_model]