import logging from typing import Dict, Any, List, Optional, Tuple, Union from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority from modules.interfaces.interfaceAiObjects import AiObjects logger = logging.getLogger(__name__) class SubCoreAi: """Core AI operations including image analysis, text generation, and planning calls.""" def __init__(self, services, aiObjects): """Initialize core AI operations. Args: services: Service center instance for accessing other services aiObjects: Initialized AiObjects instance """ self.services = services self.aiObjects = aiObjects # AI Processing Call async def callAi( self, prompt: str, documents: Optional[List[ChatDocument]] = None, placeholders: Optional[List[PromptPlaceholder]] = None, options: Optional[AiCallOptions] = None, outputFormat: Optional[str] = None, title: Optional[str] = None, documentProcessor=None, documentGenerator=None ) -> Union[str, Dict[str, Any]]: """ Unified AI call interface that automatically routes to appropriate handler. Args: prompt: The main prompt for the AI call documents: Optional list of documents to process placeholders: Optional list of placeholder replacements for planning calls options: AI call configuration options outputFormat: Optional output format (html, pdf, docx, txt, md, json, csv, xlsx) for document generation title: Optional title for generated documents documentProcessor: Document processing service instance documentGenerator: Document generation service instance Returns: AI response as string, or dict with documents if outputFormat is specified Raises: Exception: If all available models fail """ if options is None: options = AiCallOptions() # Normalize placeholders from List[PromptPlaceholder] placeholders_dict: Dict[str, str] = {} placeholders_meta: Dict[str, bool] = {} if placeholders: placeholders_dict = {p.label: p.content for p in placeholders} placeholders_meta = {p.label: bool(getattr(p, 'summaryAllowed', False)) for p in placeholders} # Auto-determine call type based on documents and operation type call_type = self._determineCallType(documents, options.operationType) options.callType = call_type try: # Build the full prompt that will be sent to AI if placeholders: full_prompt = prompt for p in placeholders: placeholder = f"{{{{KEY:{p.label}}}}}" full_prompt = full_prompt.replace(placeholder, p.content) else: full_prompt = prompt # Timestamp-only prompt debug writing removed except Exception: pass # Handle document generation with specific output format if outputFormat and documentGenerator: result = await documentGenerator.callAiWithDocumentGeneration(prompt, documents, options, outputFormat, title) # Log AI response for debugging try: if isinstance(result, dict) and 'content' in result: self._writeAiResponseDebug( label='ai_document_generation', content=result['content'], partIndex=1, modelName=None, # Document generation doesn't return model info continuation=False ) except Exception: pass return result if call_type == "planning": result = await self._callAiPlanning(prompt, placeholders_dict, placeholders_meta, options) # Log AI response for debugging try: self._writeAiResponseDebug( label='ai_planning', content=result or "", partIndex=1, modelName=None, # Planning doesn't return model info continuation=False ) except Exception: pass return result else: # Set processDocumentsIndividually from the legacy parameter if not set in options if options.processDocumentsIndividually is None and documents: options.processDocumentsIndividually = False # Default to batch processing # For text calls, we need to build the full prompt with placeholders here # since _callAiText doesn't handle placeholders directly if placeholders_dict: full_prompt = self._buildPromptWithPlaceholders(prompt, placeholders_dict) else: full_prompt = prompt if documentProcessor and documents: result = await documentProcessor.callAiText(full_prompt, documents, options) else: # Fallback to direct AI call if no document processor available request = AiCallRequest( prompt=full_prompt, context="", options=options ) response = await self.aiObjects.call(request) result = response.content # Emit stats for direct AI call self.services.workflow.storeWorkflowStat( self.services.workflow, response, f"ai.call.{options.operationType}" ) # Log AI response for debugging (additional logging for text calls) try: self._writeAiResponseDebug( label='ai_text_main', content=result or "", partIndex=1, modelName=None, # Text calls already log internally continuation=False ) except Exception: pass return result # AI Image Analysis async def readImage( self, prompt: str, imageData: Union[str, bytes], mimeType: str = None, options: Optional[AiCallOptions] = None, ) -> str: """Call AI for image analysis using interface.callImage().""" try: # Check if imageData is valid if not imageData: error_msg = "No image data provided" self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE") logger.error(f"Error in AI image analysis: {error_msg}") return f"Error: {error_msg}" self.services.utils.debugLogToFile(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}", "AI_SERVICE") logger.info(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}") # Always use IMAGE_ANALYSIS operation type for image processing if options is None: options = AiCallOptions(operationType=OperationType.IMAGE_ANALYSIS) else: # Override the operation type to ensure image analysis options.operationType = OperationType.IMAGE_ANALYSIS self.services.utils.debugLogToFile(f"Calling aiObjects.callImage with operationType: {options.operationType}", "AI_SERVICE") logger.info(f"Calling aiObjects.callImage with operationType: {options.operationType}") response = await self.aiObjects.callImage(prompt, imageData, mimeType, options) # Emit stats for image analysis self.services.workflow.storeWorkflowStat( self.services.workflow, response, f"ai.image.{options.operationType}" ) # Debug the result self.services.utils.debugLogToFile(f"Raw AI result type: {type(response)}, value: {repr(response)}", "AI_SERVICE") # Extract content from response result = response.content if hasattr(response, 'content') else str(response) # Check if result is valid if not result or (isinstance(result, str) and not result.strip()): error_msg = f"No response from AI image analysis (result: {repr(result)})" self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE") logger.error(f"Error in AI image analysis: {error_msg}") return f"Error: {error_msg}" self.services.utils.debugLogToFile(f"callImage returned: {result[:200]}..." if len(result) > 200 else result, "AI_SERVICE") logger.info(f"callImage returned: {result[:200]}..." if len(result) > 200 else result) return result except Exception as e: self.services.utils.debugLogToFile(f"Error in AI image analysis: {str(e)}", "AI_SERVICE") logger.error(f"Error in AI image analysis: {str(e)}") return f"Error: {str(e)}" # AI Image Generation async def generateImage( self, prompt: str, size: str = "1024x1024", quality: str = "standard", style: str = "vivid", options: Optional[AiCallOptions] = None, ) -> Dict[str, Any]: """Generate an image using AI using interface.generateImage().""" try: response = await self.aiObjects.generateImage(prompt, size, quality, style, options) # Emit stats for image generation self.services.workflow.storeWorkflowStat( self.services.workflow, response, f"ai.generate.image" ) # Convert response to dict format for backward compatibility if hasattr(response, 'content'): return { "success": True, "content": response.content, "modelName": response.modelName, "priceUsd": response.priceUsd, "processingTime": response.processingTime } else: return response except Exception as e: logger.error(f"Error in AI image generation: {str(e)}") return {"success": False, "error": str(e)} def _determineCallType(self, documents: Optional[List[ChatDocument]], operation_type: str) -> str: """ Determine call type based on documents and operation type. Criteria: no documents AND operationType is "generate_plan" -> planning All other cases -> text """ has_documents = documents is not None and len(documents) > 0 is_planning_operation = operation_type == OperationType.GENERATE_PLAN if not has_documents and is_planning_operation: return "planning" else: return "text" async def _callAiPlanning( self, prompt: str, placeholders: Optional[Dict[str, str]], placeholdersMeta: Optional[Dict[str, bool]], options: AiCallOptions ) -> str: """ Handle planning calls with placeholder system and selective summarization. """ # Build full prompt with placeholders; if too large, summarize summaryAllowed placeholders proportionally effective_placeholders = placeholders or {} full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) if options.compressPrompt and placeholdersMeta: # Determine model capacity try: caps = self._getModelCapabilitiesForContent(full_prompt, None, options) max_bytes = caps.get("maxContextBytes", len(full_prompt.encode("utf-8"))) except Exception: max_bytes = len(full_prompt.encode("utf-8")) current_bytes = len(full_prompt.encode("utf-8")) if current_bytes > max_bytes: # Compute total bytes contributed by allowed placeholders (approximate by content length) allowed_labels = [l for l, allow in placeholdersMeta.items() if allow] allowed_sizes = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels} total_allowed = sum(allowed_sizes.values()) overage = current_bytes - max_bytes if total_allowed > 0 and overage > 0: # Target total for allowed after reduction target_allowed = max(total_allowed - overage, 0) # Global ratio to apply across allowed placeholders ratio = target_allowed / total_allowed if total_allowed > 0 else 1.0 ratio = max(0.0, min(1.0, ratio)) reduced: Dict[str, str] = {} for label, content in effective_placeholders.items(): if label in allowed_labels and isinstance(content, str) and len(content) > 0: old_len = len(content) # Reduce by proportional ratio on characters (fallback if empty) reduction_factor = ratio if old_len > 0 else 1.0 reduced[label] = self._reduceText(content, reduction_factor) else: reduced[label] = content effective_placeholders = reduced full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) # If still slightly over, perform a second-pass fine adjustment with updated ratio current_bytes = len(full_prompt.encode("utf-8")) if current_bytes > max_bytes and total_allowed > 0: overage2 = current_bytes - max_bytes # Recompute allowed sizes after first reduction allowed_sizes2 = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels} total_allowed2 = sum(allowed_sizes2.values()) if total_allowed2 > 0 and overage2 > 0: target_allowed2 = max(total_allowed2 - overage2, 0) ratio2 = target_allowed2 / total_allowed2 ratio2 = max(0.0, min(1.0, ratio2)) reduced2: Dict[str, str] = {} for label, content in effective_placeholders.items(): if label in allowed_labels and isinstance(content, str) and len(content) > 0: old_len = len(content) reduction_factor = ratio2 if old_len > 0 else 1.0 reduced2[label] = self._reduceText(content, reduction_factor) else: reduced2[label] = content effective_placeholders = reduced2 full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) # Make AI call using AiObjects (let it handle model selection) request = AiCallRequest( prompt=full_prompt, context="", # Context is already included in the prompt options=options ) response = await self.aiObjects.call(request) try: logger.debug(f"AI model selected (planning): {getattr(response, 'modelName', 'unknown')}") except Exception: pass return response.content async def _callAiDirect( self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions, documentProcessor=None ) -> Dict[str, Any]: """ Call AI directly with prompt and documents for JSON output. Used for multi-file generation - uses the existing generation pipeline. """ # Use the existing generation pipeline that already works # This ensures proper document processing and content extraction logger.info(f"Using existing generation pipeline for {len(documents) if documents else 0} documents") if documentProcessor: # Process documents with JSON merging using the existing pipeline result = await documentProcessor.processDocumentsPerChunkJson(documents, prompt, options) else: # Fallback to simple AI call request = AiCallRequest( prompt=prompt, context="", options=options ) response = await self.aiObjects.call(request) result = {"metadata": {"title": "AI Response"}, "sections": [{"id": "section_1", "content_type": "paragraph", "elements": [{"text": response.content}]}]} # Convert single-file result to multi-file format if needed if "sections" in result and "documents" not in result: logger.info("Converting single-file result to multi-file format") # This is a single-file result, convert it to multi-file format return { "metadata": result.get("metadata", {"title": "Converted Document"}), "documents": [{ "id": "doc_1", "title": result.get("metadata", {}).get("title", "Document"), "filename": "document.txt", "sections": result.get("sections", []) }] } return result def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions) -> Dict[str, int]: """ Get model capabilities for content processing, including appropriate size limits for chunking. """ # Estimate total content size prompt_size = len(prompt.encode('utf-8')) document_size = 0 if documents: # Rough estimate of document content size for doc in documents: document_size += doc.fileSize or 0 total_size = prompt_size + document_size # Use AiObjects to select the best model for this content size # We'll simulate the model selection by checking available models from modules.interfaces.interfaceAiObjects import aiModels # Find the best model for this content size and operation best_model = None best_context_length = 0 for model_name, model_info in aiModels.items(): context_length = model_info.get("contextLength", 0) # Skip models with no context length or too small for content if context_length == 0: continue # Check if model supports the operation type capabilities = model_info.get("capabilities", []) if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities: continue elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities: continue elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities: continue elif "text_generation" not in capabilities: continue # Prefer models that can handle the content without chunking, but allow chunking if needed if context_length >= total_size * 0.8: # 80% of content size if context_length > best_context_length: best_model = model_info best_context_length = context_length elif best_model is None: # Fallback to largest available model if context_length > best_context_length: best_model = model_info best_context_length = context_length # Fallback to a reasonable default if no model found if best_model is None: best_model = { "contextLength": 128000, # GPT-4o default "llmName": "gpt-4o" } # Calculate appropriate sizes # Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters) context_length_bytes = int(best_model["contextLength"] * 4) max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}") logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes") logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes") return { "maxContextBytes": max_context_bytes, "textChunkSize": text_chunk_size, "imageChunkSize": image_chunk_size } def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]: """ Get models capable of handling the specific operation with capability filtering. """ # Use the actual AI objects model selection instead of hardcoded default if hasattr(self, 'aiObjects') and self.aiObjects: # Let AiObjects handle the model selection return [] else: # Fallback to default model if AiObjects not available default_model = ModelCapabilities( name="default", maxTokens=4000, capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"], costPerToken=0.001, processingTime=1.0, isAvailable=True ) return [default_model] def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str: """ Build full prompt by replacing placeholders with their content. Uses the new {{KEY:placeholder}} format. """ if not placeholders: return prompt full_prompt = prompt for placeholder, content in placeholders.items(): # Replace both old format {{placeholder}} and new format {{KEY:placeholder}} full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content) full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content) return full_prompt def _writeAiResponseDebug(self, label: str, content: Any, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None: """Disabled verbose debug writing; only minimal files elsewhere.""" return def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool: """ Check if text exceeds model token limit with safety margin. """ # Simple character-based estimation (4 chars per token) estimated_tokens = len(text) // 4 max_tokens = int(model.maxTokens * (1 - safety_margin)) return estimated_tokens > max_tokens def _reducePlanningPrompt( self, full_prompt: str, placeholders: Optional[Dict[str, str]], model: ModelCapabilities, options: AiCallOptions ) -> str: """ Reduce planning prompt size by summarizing placeholders while preserving prompt structure. """ if not placeholders: return self._reduceText(full_prompt, 0.7) # Reduce placeholders while preserving prompt reduced_placeholders = {} for placeholder, content in placeholders.items(): if len(content) > 1000: # Only reduce long content reduction_factor = 0.7 reduced_content = self._reduceText(content, reduction_factor) reduced_placeholders[placeholder] = reduced_content else: reduced_placeholders[placeholder] = content return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders) def _reduceTextPrompt( self, prompt: str, context: str, model: ModelCapabilities, options: AiCallOptions ) -> str: """ Reduce text prompt size using typeGroup-aware chunking and merging. """ max_size = int(model.maxTokens * (1 - options.safetyMargin)) if options.compressPrompt: # Reduce both prompt and context target_size = max_size current_size = len(prompt) + len(context) reduction_factor = (target_size * 0.7) / current_size if reduction_factor < 1.0: prompt = self._reduceText(prompt, reduction_factor) context = self._reduceText(context, reduction_factor) else: # Only reduce context, preserve prompt integrity max_context_size = max_size - len(prompt) if len(context) > max_context_size: reduction_factor = max_context_size / len(context) context = self._reduceText(context, reduction_factor) return prompt + "\n\n" + context if context else prompt def _extractTextFromContentParts(self, extracted_content) -> str: """ Extract text content from ExtractionService ContentPart objects. """ if not extracted_content or not hasattr(extracted_content, 'parts'): return "" text_parts = [] for part in extracted_content.parts: if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']: if hasattr(part, 'data') and part.data: text_parts.append(part.data) return "\n\n".join(text_parts) def _reduceText(self, text: str, reduction_factor: float) -> str: """ Reduce text size by the specified factor. """ if reduction_factor >= 1.0: return text target_length = int(len(text) * reduction_factor) return text[:target_length] + "... [reduced]"