import logging from typing import Dict, Any, List, Optional, Tuple, Union from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority from modules.interfaces.interfaceAiObjects import AiObjects logger = logging.getLogger(__name__) class SubCoreAi: """Core AI operations including image analysis, text generation, and planning calls.""" def __init__(self, services, aiObjects): """Initialize core AI operations. Args: services: Service center instance for accessing other services aiObjects: Initialized AiObjects instance """ self.services = services self.aiObjects = aiObjects # AI Processing Call async def callAi( self, prompt: str, documents: Optional[List[ChatDocument]] = None, placeholders: Optional[List[PromptPlaceholder]] = None, options: Optional[AiCallOptions] = None, outputFormat: Optional[str] = None, title: Optional[str] = None, documentProcessor=None, documentGenerator=None ) -> Union[str, Dict[str, Any]]: """ Unified AI call interface that automatically routes to appropriate handler. Args: prompt: The main prompt for the AI call documents: Optional list of documents to process placeholders: Optional list of placeholder replacements for planning calls options: AI call configuration options outputFormat: Optional output format (html, pdf, docx, txt, md, json, csv, xlsx) for document generation title: Optional title for generated documents documentProcessor: Document processing service instance documentGenerator: Document generation service instance Returns: AI response as string, or dict with documents if outputFormat is specified Raises: Exception: If all available models fail """ if options is None: options = AiCallOptions() # Normalize placeholders from List[PromptPlaceholder] placeholders_dict: Dict[str, str] = {} placeholders_meta: Dict[str, bool] = {} if placeholders: placeholders_dict = {p.label: p.content for p in placeholders} placeholders_meta = {p.label: bool(getattr(p, 'summaryAllowed', False)) for p in placeholders} # Auto-determine call type based on documents and operation type call_type = self._determineCallType(documents, options.operationType) options.callType = call_type try: # Build the full prompt that will be sent to AI if placeholders: full_prompt = prompt for p in placeholders: placeholder = f"{{{{KEY:{p.label}}}}}" full_prompt = full_prompt.replace(placeholder, p.content) else: full_prompt = prompt # Check for unresolved placeholders and clean them up try: import re # Find only {{KEY:...}} patterns that need to be removed unresolved_placeholders = re.findall(r'\{\{KEY:[^}]+\}\}', full_prompt) if unresolved_placeholders: logger.warning(f"Found unresolved KEY placeholders in prompt: {unresolved_placeholders}") # Remove only {{KEY:...}} patterns, leave other {{...}} content intact full_prompt = re.sub(r'\{\{KEY:[^}]+\}\}', '', full_prompt) # Clean up extra whitespace full_prompt = re.sub(r'\n\s*\n\s*\n', '\n\n', full_prompt) full_prompt = full_prompt.strip() logger.info("Cleaned up unresolved KEY placeholders from prompt") except Exception as e: logger.warning(f"Error cleaning up prompt placeholders: {str(e)}") # Log the final integrated prompt that AI will receive try: from modules.shared.debugLogger import writeDebugFile # Determine the prompt type based on operation type if options.operationType == OperationType.GENERATE_PLAN: prompt_type = "taskplanPrompt" elif options.operationType == OperationType.ANALYSE_CONTENT: prompt_type = "analysisPrompt" else: prompt_type = "aiPrompt" writeDebugFile(full_prompt, prompt_type, documents) except Exception: pass # Don't fail on debug logging except Exception: pass # Handle document generation with specific output format using unified approach if outputFormat and documentGenerator: # Use unified generation method for all document generation if documents and len(documents) > 0: # Extract content from documents first logger.info(f"Extracting content from {len(documents)} documents") extracted_content = await documentProcessor.callAiText(full_prompt, documents, options) # Generate with extracted content generated_json = await self._callAiUnifiedGeneration(full_prompt, extracted_content, options, outputFormat, title) else: # Direct generation without documents logger.info("No documents provided - using direct generation") generated_json = await self._callAiUnifiedGeneration(full_prompt, None, options, outputFormat, title) # Parse the generated JSON try: import json generated_data = json.loads(generated_json) except json.JSONDecodeError as e: logger.error(f"Failed to parse generated JSON: {str(e)}") return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"} # Render to final format using the existing renderer try: from modules.services.serviceGeneration.mainServiceGeneration import GenerationService generationService = GenerationService(self.services) rendered_content, mime_type = await generationService.renderReport( generated_data, outputFormat, title or "Generated Document", full_prompt, self ) # Build result in the expected format result = { "success": True, "content": generated_data, "documents": [{ "documentName": f"generated.{outputFormat}", "documentData": rendered_content, "mimeType": mime_type, "title": title or "Generated Document" }], "is_multi_file": False, "format": outputFormat, "title": title, "split_strategy": "single", "total_documents": 1, "processed_documents": 1 } # Log AI response for debugging try: from modules.shared.debugLogger import writeDebugFile writeDebugFile(str(result), "documentGenerationResponse", documents) except Exception: pass return result except Exception as e: logger.error(f"Error rendering document: {str(e)}") return {"success": False, "error": f"Rendering failed: {str(e)}"} if call_type == "planning": result = await self._callAiPlanning(prompt, placeholders_dict, placeholders_meta, options) # Log AI response for debugging try: from modules.shared.debugLogger import writeDebugFile writeDebugFile(str(result or ""), "taskplanResponse", documents) except Exception: pass return result else: # Set processDocumentsIndividually from the legacy parameter if not set in options if options.processDocumentsIndividually is None and documents: options.processDocumentsIndividually = False # Default to batch processing # For text calls, we need to build the full prompt with placeholders here # since _callAiText doesn't handle placeholders directly if placeholders_dict: full_prompt = self._buildPromptWithPlaceholders(prompt, placeholders_dict) else: full_prompt = prompt if documentProcessor and documents: result = await documentProcessor.callAiText(full_prompt, documents, options) else: # Enhanced direct AI call with partial results support result = await self._callAiWithPartialResults(full_prompt, options) # Log AI response for debugging (additional logging for text calls) try: from modules.shared.debugLogger import writeDebugFile writeDebugFile(str(result or ""), "aiTextResponse", documents) except Exception: pass return result # AI Image Analysis async def readImage( self, prompt: str, imageData: Union[str, bytes], mimeType: str = None, options: Optional[AiCallOptions] = None, ) -> str: """Call AI for image analysis using interface.callImage().""" try: # Check if imageData is valid if not imageData: error_msg = "No image data provided" self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE") logger.error(f"Error in AI image analysis: {error_msg}") return f"Error: {error_msg}" self.services.utils.debugLogToFile(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}", "AI_SERVICE") logger.info(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}") # Always use IMAGE_ANALYSIS operation type for image processing if options is None: options = AiCallOptions(operationType=OperationType.IMAGE_ANALYSIS) else: # Override the operation type to ensure image analysis options.operationType = OperationType.IMAGE_ANALYSIS self.services.utils.debugLogToFile(f"Calling aiObjects.callImage with operationType: {options.operationType}", "AI_SERVICE") logger.info(f"Calling aiObjects.callImage with operationType: {options.operationType}") response = await self.aiObjects.callImage(prompt, imageData, mimeType, options) # Emit stats for image analysis self.services.workflow.storeWorkflowStat( self.services.currentWorkflow, response, f"ai.image.{options.operationType}" ) # Debug the result self.services.utils.debugLogToFile(f"Raw AI result type: {type(response)}, value: {repr(response)}", "AI_SERVICE") # Extract content from response result = response.content if hasattr(response, 'content') else str(response) # Check if result is valid if not result or (isinstance(result, str) and not result.strip()): error_msg = f"No response from AI image analysis (result: {repr(result)})" self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE") logger.error(f"Error in AI image analysis: {error_msg}") return f"Error: {error_msg}" self.services.utils.debugLogToFile(f"callImage returned: {result[:200]}..." if len(result) > 200 else result, "AI_SERVICE") logger.info(f"callImage returned: {result[:200]}..." if len(result) > 200 else result) return result except Exception as e: self.services.utils.debugLogToFile(f"Error in AI image analysis: {str(e)}", "AI_SERVICE") logger.error(f"Error in AI image analysis: {str(e)}") return f"Error: {str(e)}" # AI Image Generation async def generateImage( self, prompt: str, size: str = "1024x1024", quality: str = "standard", style: str = "vivid", options: Optional[AiCallOptions] = None, ) -> Dict[str, Any]: """Generate an image using AI using interface.generateImage().""" try: response = await self.aiObjects.generateImage(prompt, size, quality, style, options) # Emit stats for image generation self.services.workflow.storeWorkflowStat( self.services.currentWorkflow, response, f"ai.generate.image" ) # Convert response to dict format for backward compatibility if hasattr(response, 'content'): return { "success": True, "content": response.content, "modelName": response.modelName, "priceUsd": response.priceUsd, "processingTime": response.processingTime } else: return response except Exception as e: logger.error(f"Error in AI image generation: {str(e)}") return {"success": False, "error": str(e)} def _determineCallType(self, documents: Optional[List[ChatDocument]], operation_type: str) -> str: """ Determine call type based on documents and operation type. Criteria: no documents AND operationType is "generate_plan" -> planning All other cases -> text """ has_documents = documents is not None and len(documents) > 0 is_planning_operation = operation_type == OperationType.GENERATE_PLAN if not has_documents and is_planning_operation: return "planning" else: return "text" async def _callAiPlanning( self, prompt: str, placeholders: Optional[Dict[str, str]], placeholdersMeta: Optional[Dict[str, bool]], options: AiCallOptions ) -> str: """ Handle planning calls with placeholder system and selective summarization. """ # Build full prompt with placeholders; if too large, summarize summaryAllowed placeholders proportionally effective_placeholders = placeholders or {} full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) if options.compressPrompt and placeholdersMeta: # Determine model capacity try: caps = self._getModelCapabilitiesForContent(full_prompt, None, options) max_bytes = caps.get("maxContextBytes", len(full_prompt.encode("utf-8"))) except Exception: max_bytes = len(full_prompt.encode("utf-8")) current_bytes = len(full_prompt.encode("utf-8")) if current_bytes > max_bytes: # Compute total bytes contributed by allowed placeholders (approximate by content length) allowed_labels = [l for l, allow in placeholdersMeta.items() if allow] allowed_sizes = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels} total_allowed = sum(allowed_sizes.values()) overage = current_bytes - max_bytes if total_allowed > 0 and overage > 0: # Target total for allowed after reduction target_allowed = max(total_allowed - overage, 0) # Global ratio to apply across allowed placeholders ratio = target_allowed / total_allowed if total_allowed > 0 else 1.0 ratio = max(0.0, min(1.0, ratio)) reduced: Dict[str, str] = {} for label, content in effective_placeholders.items(): if label in allowed_labels and isinstance(content, str) and len(content) > 0: old_len = len(content) # Reduce by proportional ratio on characters (fallback if empty) reduction_factor = ratio if old_len > 0 else 1.0 reduced[label] = self._reduceText(content, reduction_factor) else: reduced[label] = content effective_placeholders = reduced full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) # If still slightly over, perform a second-pass fine adjustment with updated ratio current_bytes = len(full_prompt.encode("utf-8")) if current_bytes > max_bytes and total_allowed > 0: overage2 = current_bytes - max_bytes # Recompute allowed sizes after first reduction allowed_sizes2 = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels} total_allowed2 = sum(allowed_sizes2.values()) if total_allowed2 > 0 and overage2 > 0: target_allowed2 = max(total_allowed2 - overage2, 0) ratio2 = target_allowed2 / total_allowed2 ratio2 = max(0.0, min(1.0, ratio2)) reduced2: Dict[str, str] = {} for label, content in effective_placeholders.items(): if label in allowed_labels and isinstance(content, str) and len(content) > 0: old_len = len(content) reduction_factor = ratio2 if old_len > 0 else 1.0 reduced2[label] = self._reduceText(content, reduction_factor) else: reduced2[label] = content effective_placeholders = reduced2 full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) # Make AI call using AiObjects (let it handle model selection) request = AiCallRequest( prompt=full_prompt, context="", # Context is already included in the prompt options=options ) response = await self.aiObjects.call(request) try: logger.debug(f"AI model selected (planning): {getattr(response, 'modelName', 'unknown')}") except Exception: pass return response.content async def _callAiWithPartialResults( self, prompt: str, options: AiCallOptions ) -> str: """ Call AI with partial results continuation logic for direct calls. Handles cases where AI needs to generate large responses in chunks. """ logger.info("Starting direct AI call with partial results support") # Build enhanced prompt with continuation instructions enhanced_prompt = self._buildDirectContinuationPrompt(prompt) # Process with continuation logic return await self._processDirectWithContinuationLoop(enhanced_prompt, options) def _buildDirectContinuationPrompt(self, base_prompt: str) -> str: """ Build a prompt for direct AI calls that includes partial results instructions. """ continuation_instructions = """ IMPORTANT: If your response is too large to generate completely in one response, you can deliver partial results and continue. CONTINUATION LOGIC: - If you cannot complete the full response, end your response with: [CONTINUE: brief description of what still needs to be generated] - The system will call you again to continue from where you left off - Continue generating from the exact point where you stopped - Maintain consistency with your previous partial response - Only stop when you have generated the complete response Examples: Example - Code Generation: If generating a large code file and you can only generate part of it: - Generate the first part (imports, classes, functions) - End with: [CONTINUE: Generate the remaining methods and main execution code] - In the next call, continue from where you left off Example - Documentation: If writing comprehensive documentation and you can only generate sections 1-3: - Generate sections 1-3 with full content - End with: [CONTINUE: Generate sections 4-8 covering advanced topics and examples] - In the next call, continue with sections 4-8 This allows you to handle very large responses that exceed normal limits. """ return f"{base_prompt}{continuation_instructions}" async def _processDirectWithContinuationLoop( self, enhanced_prompt: str, options: AiCallOptions ) -> str: """ Process direct AI call with continuation loop until complete. """ max_iterations = 10 # Prevent infinite loops iteration = 0 accumulated_content = [] continuation_hint = None while iteration < max_iterations: iteration += 1 logger.info(f"Direct AI continuation iteration {iteration}/{max_iterations}") # Build prompt for this iteration if continuation_hint: iteration_prompt = self._buildDirectContinuationIterationPrompt( enhanced_prompt, continuation_hint, accumulated_content ) else: iteration_prompt = enhanced_prompt # Make AI call for this iteration try: request = AiCallRequest( prompt=iteration_prompt, context="", options=options ) response = await self.aiObjects.call(request) result = response.content # Emit stats for this iteration self.services.workflow.storeWorkflowStat( self.services.currentWorkflow, response, f"ai.call.{options.operationType}.iteration_{iteration}" ) if not result or not result.strip(): logger.warning(f"Iteration {iteration}: Empty response, stopping") break # Check for continuation marker if "[CONTINUE:" in result: # Extract the continuation hint import re continue_match = re.search(r'\[CONTINUE:\s*([^\]]+)\]', result) if continue_match: continuation_hint = continue_match.group(1).strip() # Remove the continuation marker from the result result = re.sub(r'\s*\[CONTINUE:[^\]]+\]', '', result).strip() else: continuation_hint = "Continue from where you left off" # Add this partial result to accumulated content if result.strip(): accumulated_content.append(result.strip()) logger.info(f"Iteration {iteration}: Partial result added, continue hint: {continuation_hint}") else: # No continuation marker - this is the final result if result.strip(): accumulated_content.append(result.strip()) logger.info(f"Direct AI continuation complete after {iteration} iterations") break except Exception as e: logger.error(f"Direct AI iteration {iteration} failed: {str(e)}") break if iteration >= max_iterations: logger.warning(f"Direct AI continuation stopped after maximum iterations ({max_iterations})") # For JSON responses, we need to merge them properly instead of concatenating if accumulated_content: import json # Parse each part as JSON and merge them merged_documents = [] merged_metadata = None for content in accumulated_content: parsed = json.loads(content) if isinstance(parsed, dict): # Extract metadata from first valid JSON if merged_metadata is None and "metadata" in parsed: merged_metadata = parsed["metadata"] # Extract documents from this part if "documents" in parsed and isinstance(parsed["documents"], list): merged_documents.extend(parsed["documents"]) # Create final merged JSON - NO FALLBACK final_result = json.dumps({ "metadata": merged_metadata or { "title": "Generated Document", "splitStrategy": "single_document", "source_documents": [], "extraction_method": "ai_generation" }, "documents": merged_documents }, indent=2) else: # Return empty JSON structure if no content final_result = json.dumps({ "metadata": { "title": "Generated Document", "splitStrategy": "single_document", "source_documents": [], "extraction_method": "ai_generation" }, "documents": [] }, indent=2) logger.info(f"Final direct AI result: {len(accumulated_content)} parts from {iteration} iterations") return final_result def _buildDirectContinuationIterationPrompt( self, base_prompt: str, continuation_hint: str, accumulated_content: List[str] ) -> str: """ Build a prompt for continuation iteration with context. """ # Build context of what's already been generated context_summary = "PREVIOUSLY GENERATED CONTENT:\n" for i, content in enumerate(accumulated_content[-2:]): # Show last 2 parts for context preview = content[:200] + "..." if len(content) > 200 else content context_summary += f"Part {i+1}: {preview}\n" continuation_prompt = f""" {base_prompt} {context_summary} CONTINUATION INSTRUCTIONS: - Continue from where you left off - Continuation hint: {continuation_hint} - Generate the next part of the content - Maintain consistency with previously generated content - End with [CONTINUE: description] if more content is needed - End without [CONTINUE] if the response is complete """ return continuation_prompt async def _callAiUnifiedGeneration( self, prompt: str, extracted_content: Optional[str] = None, options: Optional[AiCallOptions] = None, outputFormat: str = "json", title: str = "Generated Document" ) -> str: """ Unified generation method that handles both scenarios: - With extracted content (from documents) - Without extracted content (direct generation) Always uses continuation logic for long responses. Always returns standardized JSON format using the multi-document schema. """ if options is None: options = AiCallOptions() logger.info("Starting unified AI generation with continuation logic") # Use the existing buildGenerationPrompt to get the proper canonical format instructions from modules.services.serviceGeneration.subPromptBuilder import buildGenerationPrompt # Build the generation prompt using the existing system generation_prompt = await buildGenerationPrompt( outputFormat=outputFormat, userPrompt=prompt, title=title, aiService=self, services=self.services ) # If we have extracted content, prepend it to the prompt if extracted_content: generation_prompt = f"""EXTRACTED CONTENT FROM DOCUMENTS: {extracted_content} {generation_prompt}""" # Use continuation logic for long responses return await self._processDirectWithContinuationLoop(generation_prompt, options) async def _callAiDirect( self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions, documentProcessor=None ) -> Dict[str, Any]: """ Call AI directly with prompt and documents for JSON output. Used for multi-file generation - uses the existing generation pipeline. """ # Use the existing generation pipeline that already works # This ensures proper document processing and content extraction logger.info(f"Using existing generation pipeline for {len(documents) if documents else 0} documents") if documentProcessor: # Process documents with JSON merging using the existing pipeline result = await documentProcessor.processDocumentsPerChunkJson(documents, prompt, options) else: # Fallback to simple AI call request = AiCallRequest( prompt=prompt, context="", options=options ) response = await self.aiObjects.call(request) result = {"metadata": {"title": "AI Response"}, "sections": [{"id": "section_1", "content_type": "paragraph", "elements": [{"text": response.content}]}]} # Convert single-file result to multi-file format if needed if "sections" in result and "documents" not in result: logger.info("Converting single-file result to multi-file format") # This is a single-file result, convert it to multi-file format return { "metadata": result.get("metadata", {"title": "Converted Document"}), "documents": [{ "id": "doc_1", "title": result.get("metadata", {}).get("title", "Document"), "filename": "document.txt", "sections": result.get("sections", []) }] } return result def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions) -> Dict[str, int]: """ Get model capabilities for content processing, including appropriate size limits for chunking. """ # Estimate total content size prompt_size = len(prompt.encode('utf-8')) document_size = 0 if documents: # Rough estimate of document content size for doc in documents: document_size += doc.fileSize or 0 total_size = prompt_size + document_size # Use AiObjects to select the best model for this content size # We'll simulate the model selection by checking available models from modules.interfaces.interfaceAiObjects import aiModels # Find the best model for this content size and operation best_model = None best_context_length = 0 for model_name, model_info in aiModels.items(): context_length = model_info.get("contextLength", 0) # Skip models with no context length or too small for content if context_length == 0: continue # Check if model supports the operation type capabilities = model_info.get("capabilities", []) if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities: continue elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities: continue elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities: continue elif "text_generation" not in capabilities: continue # Prefer models that can handle the content without chunking, but allow chunking if needed if context_length >= total_size * 0.8: # 80% of content size if context_length > best_context_length: best_model = model_info best_context_length = context_length elif best_model is None: # Fallback to largest available model if context_length > best_context_length: best_model = model_info best_context_length = context_length # Fallback to a reasonable default if no model found if best_model is None: best_model = { "contextLength": 128000, # GPT-4o default "llmName": "gpt-4o" } # Calculate appropriate sizes # Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters) context_length_bytes = int(best_model["contextLength"] * 4) max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}") logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes") logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes") return { "maxContextBytes": max_context_bytes, "textChunkSize": text_chunk_size, "imageChunkSize": image_chunk_size } def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]: """ Get models capable of handling the specific operation with capability filtering. """ # Use the actual AI objects model selection instead of hardcoded default if hasattr(self, 'aiObjects') and self.aiObjects: # Let AiObjects handle the model selection return [] else: # Fallback to default model if AiObjects not available default_model = ModelCapabilities( name="default", maxTokens=4000, capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"], costPerToken=0.001, processingTime=1.0, isAvailable=True ) return [default_model] def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str: """ Build full prompt by replacing placeholders with their content. Uses the new {{KEY:placeholder}} format. """ if not placeholders: return prompt full_prompt = prompt for placeholder, content in placeholders.items(): # Replace both old format {{placeholder}} and new format {{KEY:placeholder}} full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content) full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content) return full_prompt def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool: """ Check if text exceeds model token limit with safety margin. """ # Simple character-based estimation (4 chars per token) estimated_tokens = len(text) // 4 max_tokens = int(model.maxTokens * (1 - safety_margin)) return estimated_tokens > max_tokens def _reducePlanningPrompt( self, full_prompt: str, placeholders: Optional[Dict[str, str]], model: ModelCapabilities, options: AiCallOptions ) -> str: """ Reduce planning prompt size by summarizing placeholders while preserving prompt structure. """ if not placeholders: return self._reduceText(full_prompt, 0.7) # Reduce placeholders while preserving prompt reduced_placeholders = {} for placeholder, content in placeholders.items(): if len(content) > 1000: # Only reduce long content reduction_factor = 0.7 reduced_content = self._reduceText(content, reduction_factor) reduced_placeholders[placeholder] = reduced_content else: reduced_placeholders[placeholder] = content return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders) def _reduceTextPrompt( self, prompt: str, context: str, model: ModelCapabilities, options: AiCallOptions ) -> str: """ Reduce text prompt size using typeGroup-aware chunking and merging. """ max_size = int(model.maxTokens * (1 - options.safetyMargin)) if options.compressPrompt: # Reduce both prompt and context target_size = max_size current_size = len(prompt) + len(context) reduction_factor = (target_size * 0.7) / current_size if reduction_factor < 1.0: prompt = self._reduceText(prompt, reduction_factor) context = self._reduceText(context, reduction_factor) else: # Only reduce context, preserve prompt integrity max_context_size = max_size - len(prompt) if len(context) > max_context_size: reduction_factor = max_context_size / len(context) context = self._reduceText(context, reduction_factor) return prompt + "\n\n" + context if context else prompt def _extractTextFromContentParts(self, extracted_content) -> str: """ Extract text content from ExtractionService ContentPart objects. """ if not extracted_content or not hasattr(extracted_content, 'parts'): return "" text_parts = [] for part in extracted_content.parts: if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']: if hasattr(part, 'data') and part.data: text_parts.append(part.data) return "\n\n".join(text_parts) def _reduceText(self, text: str, reduction_factor: float) -> str: """ Reduce text size by the specified factor. """ if reduction_factor >= 1.0: return text target_length = int(len(text) * reduction_factor) return text[:target_length] + "... [reduced]"