import logging from typing import Dict, Any, List, Optional, Tuple, Union from modules.datamodels.datamodelChat import PromptPlaceholder from modules.datamodels.datamodelChat import ChatDocument from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority from modules.datamodels.datamodelExtraction import ChunkResult, ContentExtracted from modules.datamodels.datamodelWeb import ( WebResearchRequest, WebResearchActionResult, WebResearchDocumentData, WebResearchActionDocument, WebSearchResultItem, ) from modules.interfaces.interfaceAiObjects import AiObjects from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) # Model registry is now provided by interfaces via AiModels class AiService: """Centralized AI service orchestrating documents, model selection, failover, and web operations. """ def __init__(self, serviceCenter=None) -> None: """Initialize AI service with service center access. Args: serviceCenter: Service center instance for accessing other services """ self.serviceCenter = serviceCenter # Only depend on interfaces self.aiObjects = None # Will be initialized in create() self._extractionService = None # Lazy initialization @property def extractionService(self): """Lazy initialization of extraction service.""" if self._extractionService is None: logger.info("Lazy initializing ExtractionService...") self._extractionService = ExtractionService() return self._extractionService async def _ensureAiObjectsInitialized(self): """Ensure aiObjects is initialized.""" if self.aiObjects is None: logger.info("Lazy initializing AiObjects...") self.aiObjects = await AiObjects.create() logger.info("AiObjects initialization completed") @classmethod async def create(cls, serviceCenter=None) -> "AiService": """Create AiService instance with all connectors initialized.""" logger.info("AiService.create() called") instance = cls(serviceCenter) logger.info("AiService created, about to call AiObjects.create()...") instance.aiObjects = await AiObjects.create() logger.info("AiObjects.create() completed") return instance def _createImageAnalysisPrompt(self, userPrompt: str) -> str: """Create a safe image analysis prompt based on the user's original request.""" # Escape the user prompt to prevent string formatting issues safeUserPrompt = userPrompt.replace("'", "\\'").replace('"', '\\"').replace('\n', ' ').replace('\r', ' ') return f"Based on the user's request: '{safeUserPrompt}', \n\nAnalyze this image completely. Provide a detailed description of everything you see. Include all text, logos, colors, fonts, layout, objects, and visual elements. This is a document processing system that requires complete image analysis for all content types including logos, graphics, simple images, flags, and animations. You must analyze and describe everything visible in the image without any disclaimers or refusals." # AI Image Analysis async def readImage( self, prompt: str, imageData: Union[str, bytes], mimeType: str = None, options: Optional[AiCallOptions] = None, ) -> str: """Call AI for image analysis using interface.callImage().""" try: print(f"🔍 readImage called with prompt: '{prompt[:100]}...', imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}") logger.info(f"readImage called with prompt: '{prompt[:100]}...', imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}") # Always use IMAGE_ANALYSIS operation type for image processing if options is None: options = AiCallOptions(operationType=OperationType.IMAGE_ANALYSIS) else: # Override the operation type to ensure image analysis options.operationType = OperationType.IMAGE_ANALYSIS print(f"🔍 Calling aiObjects.callImage with operationType: {options.operationType}") logger.info(f"Calling aiObjects.callImage with operationType: {options.operationType}") result = await self.aiObjects.callImage(prompt, imageData, mimeType, options) print(f"🔍 callImage returned: {result[:200]}..." if len(result) > 200 else result) logger.info(f"callImage returned: {result[:200]}..." if len(result) > 200 else result) return result except Exception as e: print(f"🔍 Error in AI image analysis: {str(e)}") logger.error(f"Error in AI image analysis: {str(e)}") return f"Error: {str(e)}" # AI Image Generation async def generateImage( self, prompt: str, size: str = "1024x1024", quality: str = "standard", style: str = "vivid", options: Optional[AiCallOptions] = None, ) -> Dict[str, Any]: """Generate an image using AI using interface.generateImage().""" try: return await self.aiObjects.generateImage(prompt, size, quality, style, options) except Exception as e: logger.error(f"Error in AI image generation: {str(e)}") return {"success": False, "error": str(e)} # Web Research - Using interface functions async def webResearch(self, request: WebResearchRequest) -> WebResearchActionResult: """Perform web research using interface functions.""" try: logger.info(f"WEB RESEARCH STARTED") logger.info(f"User Query: {request.user_prompt}") logger.info(f"Max Results: {request.max_results}, Max Pages: {request.options.max_pages}") # Global URL index to track all processed URLs across the entire research session global_processed_urls = set() # Step 1: Find relevant websites - either provided URLs or AI-determined main URLs logger.info(f"=== STEP 1: INITIAL MAIN URLS LIST ===") if request.urls: # Use provided URLs as initial main URLs websites = request.urls logger.info(f"Using provided URLs ({len(websites)}):") for i, url in enumerate(websites, 1): logger.info(f" {i}. {url}") else: # Use AI to determine main URLs based on user's intention logger.info(f"AI analyzing user intent: '{request.user_prompt}'") # Use AI to generate optimized Tavily search query and search parameters query_optimizer_prompt = f"""You are a search query optimizer. USER QUERY: {request.user_prompt} Your task: Create a search query and parameters for the USER QUERY given. RULES: 1. The search query MUST be related to the user query above 2. Extract key terms from the user query 3. Determine appropriate country/language based on the query context 4. Keep search query short (2-6 words) Return ONLY this JSON format: {{ "user_prompt": "search query based on user query above", "country": "Full English country name (ISO-3166; map codes via pycountry/i18n-iso-countries)", "language": "language_code_or_null", "topic": "general|news|academic_or_null", "time_range": "d|w|m|y_or_null", "selection_strategy": "single|multiple|specific_page", "selection_criteria": "what URLs to prioritize", "expected_url_patterns": ["pattern1", "pattern2"], "estimated_result_count": number }}""" # Get AI response for query optimization ai_request = AiCallRequest( prompt=query_optimizer_prompt, options=AiCallOptions() ) ai_response_obj = await self.aiObjects.call(ai_request) ai_response = ai_response_obj.content logger.debug(f"AI query optimizer response: {ai_response}") # Parse AI response to extract search query import json try: # Clean the response by removing markdown code blocks cleaned_response = ai_response.strip() if cleaned_response.startswith('```json'): cleaned_response = cleaned_response[7:] # Remove ```json if cleaned_response.endswith('```'): cleaned_response = cleaned_response[:-3] # Remove ``` cleaned_response = cleaned_response.strip() query_data = json.loads(cleaned_response) search_query = query_data.get("user_prompt", request.user_prompt) ai_country = query_data.get("country") ai_language = query_data.get("language") ai_topic = query_data.get("topic") ai_time_range = query_data.get("time_range") selection_strategy = query_data.get("selection_strategy", "multiple") selection_criteria = query_data.get("selection_criteria", "relevant URLs") expected_patterns = query_data.get("expected_url_patterns", []) estimated_count = query_data.get("estimated_result_count", request.max_results) logger.info(f"AI optimized search query: '{search_query}'") logger.info(f"Selection strategy: {selection_strategy}") logger.info(f"Selection criteria: {selection_criteria}") logger.info(f"Expected URL patterns: {expected_patterns}") logger.info(f"Estimated result count: {estimated_count}") except json.JSONDecodeError: logger.warning("Failed to parse AI response as JSON, using original query") search_query = request.user_prompt ai_country = None ai_language = None ai_topic = None ai_time_range = None selection_strategy = "multiple" # Perform the web search with AI-determined parameters search_kwargs = { "query": search_query, "max_results": request.max_results, "search_depth": request.options.search_depth, "auto_parameters": False # Use explicit parameters } # Add parameters only if they have valid values def _normalizeCountry(c: Optional[str]) -> Optional[str]: if not c: return None s = str(c).strip() if not s or s.lower() in ['null', 'none', 'undefined']: return None # Map common codes to full English names when easy to do without extra deps mapping = { 'ch': 'Switzerland', 'che': 'Switzerland', 'de': 'Germany', 'ger': 'Germany', 'deu': 'Germany', 'at': 'Austria', 'aut': 'Austria', 'us': 'United States', 'usa': 'United States', 'uni ted states': 'United States', 'uk': 'United Kingdom', 'gb': 'United Kingdom', 'gbr': 'United Kingdom' } key = s.lower() if key in mapping: return mapping[key] # If looks like full name, capitalize first letter only (Tavily accepts English names) return s norm_ai_country = _normalizeCountry(ai_country) norm_req_country = _normalizeCountry(request.options.country) if norm_ai_country: search_kwargs["country"] = norm_ai_country elif norm_req_country: search_kwargs["country"] = norm_req_country if ai_language and ai_language not in ['null', '', 'none', 'undefined']: search_kwargs["language"] = ai_language elif request.options.language and request.options.language not in ['null', '', 'none', 'undefined']: search_kwargs["language"] = request.options.language if ai_topic and ai_topic in ['general', 'news', 'academic']: search_kwargs["topic"] = ai_topic elif request.options.topic and request.options.topic in ['general', 'news', 'academic']: search_kwargs["topic"] = request.options.topic if ai_time_range and ai_time_range in ['d', 'w', 'm', 'y']: search_kwargs["time_range"] = ai_time_range elif request.options.time_range and request.options.time_range in ['d', 'w', 'm', 'y']: search_kwargs["time_range"] = request.options.time_range # Constrain by expected domains if provided by AI try: include_domains = [] for p in expected_patterns or []: if not isinstance(p, str): continue # Extract bare domain from pattern or URL import re m = re.search(r"(?:https?://)?([^/\s]+)", p.strip()) if m: domain = m.group(1).lower() # strip leading www. if domain.startswith('www.'): domain = domain[4:] include_domains.append(domain) # Deduplicate if include_domains: seen = set() uniq = [] for d in include_domains: if d not in seen: seen.add(d) uniq.append(d) search_kwargs["include_domains"] = uniq except Exception: pass # Log the parameters being used logger.info(f"Search parameters: country={search_kwargs.get('country', 'not_set')}, language={search_kwargs.get('language', 'not_set')}, topic={search_kwargs.get('topic', 'not_set')}, time_range={search_kwargs.get('time_range', 'not_set')}, include_domains={search_kwargs.get('include_domains', [])}") search_results = await self.aiObjects.search_websites(**search_kwargs) logger.debug(f"Web search returned {len(search_results)} results:") for i, result in enumerate(search_results, 1): logger.debug(f" {i}. {result.url} - {result.title}") # Deduplicate while preserving order seen = set() search_urls = [] for r in search_results: u = str(r.url) if u not in seen: seen.add(u) search_urls.append(u) logger.info(f"After initial deduplication: {len(search_urls)} unique URLs from {len(search_results)} search results") if not search_urls: logger.error("No relevant websites found") return WebResearchActionResult(success=False, error="No relevant websites found") # Now use AI to determine the main URLs based on user's intention logger.info(f"AI selecting main URLs from {len(search_urls)} search results based on user intent") # Create a prompt for AI to identify main URLs based on user's intention ai_prompt = f""" Select the most relevant URLs from these search results: {chr(10).join([f"{i+1}. {url}" for i, url in enumerate(search_urls)])} Return only the URLs that are most relevant for the user's query. One URL per line. """ # Create AI call request ai_request = AiCallRequest( prompt=ai_prompt, options=AiCallOptions() ) ai_response_obj = await self.aiObjects.call(ai_request) ai_response = ai_response_obj.content logger.debug(f"AI response for main URL selection: {ai_response}") # Parse AI response to extract URLs websites = [] for line in ai_response.strip().split('\n'): line = line.strip() if line and ('http://' in line or 'https://' in line): # Extract URL from the line for word in line.split(): if word.startswith('http://') or word.startswith('https://'): websites.append(word.rstrip('.,;')) break if not websites: logger.warning("AI did not identify any main URLs, using first few search results") websites = search_urls[:3] # Fallback to first 3 search results # Deduplicate while preserving order seen = set() unique_websites = [] for url in websites: if url not in seen: seen.add(url) unique_websites.append(url) websites = unique_websites logger.info(f"After AI selection deduplication: {len(websites)} unique URLs from {len(websites)} AI-selected URLs") logger.info(f"AI selected {len(websites)} main URLs (after deduplication):") for i, url in enumerate(websites, 1): logger.info(f" {i}. {url}") # Step 2: Smart website selection using AI interface logger.info(f"=== STEP 2: FILTERED URL LIST BY USER PROMPT'S INTENTION ===") logger.info(f"AI analyzing {len(websites)} URLs for relevance to: '{request.user_prompt}'") selectedWebsites, aiResponse = await self.aiObjects.selectRelevantWebsites(websites, request.user_prompt) logger.debug(f"AI Response: {aiResponse}") logger.debug(f"AI selected {len(selectedWebsites)} most relevant URLs:") for i, url in enumerate(selectedWebsites, 1): logger.debug(f" {i}. {url}") # Show which were filtered out filtered_out = [url for url in websites if url not in selectedWebsites] if filtered_out: logger.debug(f"Filtered out {len(filtered_out)} less relevant URLs:") for i, url in enumerate(filtered_out, 1): logger.debug(f" {i}. {url}") # Step 3+4+5: Recursive crawling with configurable depth # Get configuration parameters max_depth = int(APP_CONFIG.get("Web_Research_MAX_DEPTH", "2")) max_links_per_domain = int(APP_CONFIG.get("Web_Research_MAX_LINKS_PER_DOMAIN", "4")) crawl_timeout_minutes = int(APP_CONFIG.get("Web_Research_CRAWL_TIMEOUT_MINUTES", "10")) crawl_timeout_seconds = crawl_timeout_minutes * 60 # Use the configured max_depth or the request's pages_search_depth, whichever is smaller effective_depth = min(max_depth, request.options.pages_search_depth) logger.info(f"=== STEP 3+4+5: RECURSIVE CRAWLING (DEPTH {effective_depth}) ===") logger.info(f"Starting recursive crawl of {len(selectedWebsites)} main websites...") logger.info(f"Search depth: {effective_depth} levels (max configured: {max_depth})") logger.info(f"Max links per domain: {max_links_per_domain}") logger.info(f"Crawl timeout: {crawl_timeout_minutes} minutes") # Use recursive crawling with URL index to avoid duplicates import asyncio try: allContent = await asyncio.wait_for( self.aiObjects.crawlRecursively( urls=selectedWebsites, max_depth=effective_depth, extract_depth=request.options.extract_depth, max_per_domain=max_links_per_domain, global_processed_urls=global_processed_urls ), timeout=crawl_timeout_seconds ) logger.info(f"Crawling completed within timeout: {len(allContent)} pages crawled") except asyncio.TimeoutError: logger.warning(f"Crawling timed out after {crawl_timeout_minutes} minutes, using partial results") # crawlRecursively now handles timeouts gracefully and returns partial results # Try to get the partial results that were collected allContent = {} if not allContent: logger.error("Could not extract content from any websites") return WebResearchActionResult(success=False, error="Could not extract content from any websites") logger.info(f"=== WEB RESEARCH COMPLETED ===") logger.info(f"Successfully crawled {len(allContent)} URLs total") logger.info(f"Crawl depth: {effective_depth} levels") # Create simple result with raw content sources = [WebSearchResultItem(title=url, url=url) for url in selectedWebsites] # Get all additional links (all URLs except main ones) additional_links = [url for url in allContent.keys() if url not in selectedWebsites] # Combine all content into a single result combinedContent = "" for url, content in allContent.items(): combinedContent += f"\n\n=== {url} ===\n{content}\n" documentData = WebResearchDocumentData( user_prompt=request.user_prompt, websites_analyzed=len(allContent), additional_links_found=len(additional_links), analysis_result=combinedContent, # Raw content, no analysis sources=sources, additional_links=additional_links, individual_content=allContent, # Individual URL -> content mapping debug_info={ "crawl_depth": effective_depth, "max_configured_depth": max_depth, "max_links_per_domain": max_links_per_domain, "crawl_timeout_minutes": crawl_timeout_minutes, "total_urls_crawled": len(allContent), "main_urls": len(selectedWebsites), "additional_urls": len(additional_links) } ) document = WebResearchActionDocument( documentName=f"web_research_{request.user_prompt[:50]}.json", documentData=documentData, mimeType="application/json" ) return WebResearchActionResult( success=True, documents=[document], resultLabel="web_research_results" ) except Exception as e: logger.error(f"Error in web research: {str(e)}") return WebResearchActionResult(success=False, error=str(e)) def _calculateMaxContextBytes(self, options: Optional[AiCallOptions]) -> int: """Calculate maximum context bytes based on model capabilities and options.""" if options and options.maxContextBytes: return options.maxContextBytes # Default model capabilities (this should be enhanced with actual model registry) defaultMaxTokens = 4000 safetyMargin = options.safetyMargin if options else 0.1 # Calculate bytes (4 chars per token estimation) maxContextBytes = int(defaultMaxTokens * (1 - safetyMargin) * 4) return maxContextBytes async def _processDocumentsPerChunk( self, documents: List[ChatDocument], prompt: str, options: Optional[AiCallOptions] = None ) -> str: """ Process documents with per-chunk AI calls and merge results. FIXED: Now preserves chunk relationships and document structure. Args: documents: List of ChatDocument objects to process prompt: AI prompt for processing options: AI call options Returns: Merged AI results as string with preserved document structure """ if not documents: return "" # Get model capabilities for size calculation model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options) # Build extraction options for chunking with intelligent merging extractionOptions: Dict[str, Any] = { "prompt": prompt, "operationType": options.operationType if options else "general", "processDocumentsIndividually": True, # Process each document separately "maxSize": model_capabilities["maxContextBytes"], "chunkAllowed": True, "textChunkSize": model_capabilities["textChunkSize"], "imageChunkSize": model_capabilities["imageChunkSize"], "imageMaxPixels": 1024 * 1024, "imageQuality": 85, "mergeStrategy": { "useIntelligentMerging": True, # Enable intelligent token-aware merging "modelCapabilities": model_capabilities, "prompt": prompt, "groupBy": "typeGroup", "orderBy": "id", "mergeType": "concatenate" }, } logger.debug(f"Per-chunk extraction options: {extractionOptions}") try: # Extract content with chunking extractionResult = self.extractionService.extractContent(documents, extractionOptions) if not isinstance(extractionResult, list): return "[Error: No extraction results]" # FIXED: Process chunks with proper mapping chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options) # FIXED: Merge with preserved chunk relationships mergedContent = self._mergeChunkResults(chunkResults, options) return mergedContent except Exception as e: logger.error(f"Error in per-chunk processing: {str(e)}") return f"[Error in per-chunk processing: {str(e)}]" async def _processDocumentsPerChunkClean( self, documents: List[ChatDocument], prompt: str, options: Optional[AiCallOptions] = None ) -> str: """ Process documents with per-chunk AI calls and merge results in CLEAN mode. This version excludes debug metadata and document headers for document generation. Args: documents: List of ChatDocument objects to process prompt: AI prompt for processing options: AI call options Returns: Clean merged AI results as string without debug metadata """ if not documents: return "" # Get model capabilities for size calculation model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options) # Build extraction options for chunking with intelligent merging extractionOptions: Dict[str, Any] = { "prompt": prompt, "operationType": options.operationType if options else "general", "processDocumentsIndividually": True, # Process each document separately "maxSize": model_capabilities["maxContextBytes"], "chunkAllowed": True, "textChunkSize": model_capabilities["textChunkSize"], "imageChunkSize": model_capabilities["imageChunkSize"], "imageMaxPixels": 1024 * 1024, "imageQuality": 85, "mergeStrategy": { "useIntelligentMerging": True, # Enable intelligent token-aware merging "modelCapabilities": model_capabilities, "prompt": prompt, "groupBy": "typeGroup", "orderBy": "id", "mergeType": "concatenate" }, } logger.debug(f"Per-chunk extraction options (clean mode): {extractionOptions}") try: # Extract content with chunking extractionResult = self.extractionService.extractContent(documents, extractionOptions) if not isinstance(extractionResult, list): return "[Error: No extraction results]" # Process chunks with proper mapping chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options) # Merge with CLEAN mode (no debug metadata) mergedContent = self._mergeChunkResultsClean(chunkResults, options) return mergedContent except Exception as e: logger.error(f"Error in per-chunk processing (clean mode): {str(e)}") return f"[Error in per-chunk processing: {str(e)}]" async def _processChunksWithMapping( self, extractionResult: List[ContentExtracted], prompt: str, options: Optional[AiCallOptions] = None ) -> List[ChunkResult]: """Process chunks with proper mapping to preserve relationships.""" from modules.datamodels.datamodelExtraction import ChunkResult import asyncio import time # Collect all chunks that need processing with proper indexing chunks_to_process = [] chunk_index = 0 for ec in extractionResult: # Get document MIME type from metadata document_mime_type = None for part in ec.parts: if part.metadata and 'documentMimeType' in part.metadata: document_mime_type = part.metadata['documentMimeType'] break for part in ec.parts: if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"): chunks_to_process.append({ 'part': part, 'chunk_index': chunk_index, 'document_id': ec.id, 'document_mime_type': document_mime_type }) chunk_index += 1 logger.info(f"Processing {len(chunks_to_process)} chunks with proper mapping") # Process chunks in parallel with proper mapping async def process_single_chunk(chunk_info: Dict) -> ChunkResult: part = chunk_info['part'] chunk_index = chunk_info['chunk_index'] document_id = chunk_info['document_id'] document_mime_type = chunk_info.get('document_mime_type', part.mimeType) start_time = time.time() try: # FIXED: Check MIME type first, then fallback to typeGroup is_image = ( (document_mime_type and document_mime_type.startswith('image/')) or (part.mimeType and part.mimeType.startswith('image/')) or (part.typeGroup == "image") ) # Debug logging print(f"🔍 Chunk {chunk_index}: document_mime_type={document_mime_type}, part.mimeType={part.mimeType}, part.typeGroup={part.typeGroup}, is_image={is_image}") logger.info(f"Chunk {chunk_index}: document_mime_type={document_mime_type}, part.mimeType={part.mimeType}, part.typeGroup={part.typeGroup}, is_image={is_image}") if is_image: # Create image analysis prompt based on user's original intent imagePrompt = self._createImageAnalysisPrompt(prompt) ai_result = await self.readImage( prompt=imagePrompt, imageData=part.data, mimeType=part.mimeType, options=options ) elif part.typeGroup in ("container", "binary"): # Handle container and binary content as text (skip processing) ai_result = f"[Skipped {part.typeGroup} content: {len(part.data)} bytes]" else: # Ensure options is not None and set correct operation type for text request_options = options if options is not None else AiCallOptions() # FIXED: Set operation type to general for text processing request_options.operationType = OperationType.GENERAL print(f"🔍 Chunk {chunk_index}: Calling aiObjects.call with operationType={request_options.operationType}") logger.info(f"Chunk {chunk_index}: Calling aiObjects.call with operationType={request_options.operationType}") request = AiCallRequest( prompt=prompt, context=part.data, options=request_options ) response = await self.aiObjects.call(request) ai_result = response.content processing_time = time.time() - start_time logger.info(f"Chunk {chunk_index} processed: {len(ai_result)} chars in {processing_time:.2f}s") return ChunkResult( originalChunk=part, aiResult=ai_result, chunkIndex=chunk_index, documentId=document_id, processingTime=processing_time, metadata={ "success": True, "chunkSize": len(part.data) if part.data else 0, "resultSize": len(ai_result), "typeGroup": part.typeGroup } ) except Exception as e: processing_time = time.time() - start_time logger.warning(f"Error processing chunk {chunk_index}: {str(e)}") return ChunkResult( originalChunk=part, aiResult=f"[Error processing chunk: {str(e)}]", chunkIndex=chunk_index, documentId=document_id, processingTime=processing_time, metadata={ "success": False, "error": str(e), "chunkSize": len(part.data) if part.data else 0, "typeGroup": part.typeGroup } ) # Process chunks with concurrency control max_concurrent = 5 # Default concurrency if options and hasattr(options, 'maxConcurrentChunks'): max_concurrent = options.maxConcurrentChunks elif options and hasattr(options, 'maxParallelChunks'): max_concurrent = options.maxParallelChunks logger.info(f"Processing {len(chunks_to_process)} chunks with max concurrency: {max_concurrent}") # Create semaphore for concurrency control semaphore = asyncio.Semaphore(max_concurrent) async def process_with_semaphore(chunk_info): async with semaphore: return await process_single_chunk(chunk_info) # Process all chunks in parallel with concurrency control tasks = [process_with_semaphore(chunk_info) for chunk_info in chunks_to_process] chunk_results = await asyncio.gather(*tasks, return_exceptions=True) # Handle any exceptions in the gather itself processed_results = [] for i, result in enumerate(chunk_results): if isinstance(result, Exception): # Create error ChunkResult chunk_info = chunks_to_process[i] processed_results.append(ChunkResult( originalChunk=chunk_info['part'], aiResult=f"[Error in parallel processing: {str(result)}]", chunkIndex=chunk_info['chunk_index'], documentId=chunk_info['document_id'], processingTime=0.0, metadata={"success": False, "error": str(result)} )) else: processed_results.append(result) logger.info(f"Completed processing {len(processed_results)} chunks") return processed_results def _mergeChunkResults( self, chunkResults: List[ChunkResult], options: Optional[AiCallOptions] = None ) -> str: """Merge chunk results while preserving document structure and chunk order.""" if not chunkResults: return "" # Get merging configuration from options chunk_separator = "\n\n---\n\n" include_document_headers = True include_chunk_metadata = False if options: if hasattr(options, 'chunkSeparator'): chunk_separator = options.chunkSeparator elif hasattr(options, 'mergeStrategy') and options.mergeStrategy: chunk_separator = options.mergeStrategy.get("chunkSeparator", "\n\n---\n\n") # Check for enhanced options if hasattr(options, 'preserveChunkMetadata'): include_chunk_metadata = options.preserveChunkMetadata # Group chunk results by document results_by_document = {} for chunk_result in chunkResults: doc_id = chunk_result.documentId if doc_id not in results_by_document: results_by_document[doc_id] = [] results_by_document[doc_id].append(chunk_result) # Sort chunks within each document by chunk index for doc_id in results_by_document: results_by_document[doc_id].sort(key=lambda x: x.chunkIndex) # Merge results for each document merged_documents = [] for doc_id, doc_chunks in results_by_document.items(): # Build document header if enabled doc_header = "" if include_document_headers: doc_header = f"\n\n=== DOCUMENT: {doc_id} ===\n\n" # Merge chunks for this document doc_content = "" for i, chunk_result in enumerate(doc_chunks): # Add chunk separator (except for first chunk) if i > 0: doc_content += chunk_separator # Add chunk content with optional metadata chunk_metadata = chunk_result.metadata if chunk_metadata.get("success", False): chunk_content = chunk_result.aiResult # Add chunk metadata if enabled if include_chunk_metadata: chunk_info = f"[Chunk {chunk_result.chunkIndex} - {chunk_metadata.get('typeGroup', 'unknown')} - {chunk_metadata.get('chunkSize', 0)} chars]" chunk_content = f"{chunk_info}\n{chunk_content}" doc_content += chunk_content else: # Handle error chunks error_msg = f"[ERROR in chunk {chunk_result.chunkIndex}: {chunk_metadata.get('error', 'Unknown error')}]" doc_content += error_msg merged_documents.append(doc_header + doc_content) # Join all documents final_result = "\n\n".join(merged_documents) logger.info(f"Merged {len(chunkResults)} chunks from {len(results_by_document)} documents") return final_result.strip() def _mergeChunkResultsClean( self, chunkResults: List[ChunkResult], options: Optional[AiCallOptions] = None ) -> str: """Merge chunk results in CLEAN mode - no debug metadata or document headers.""" if not chunkResults: return "" # Get merging configuration from options chunk_separator = "\n\n" include_document_headers = False # CLEAN MODE: No document headers include_chunk_metadata = False # CLEAN MODE: No chunk metadata if options: if hasattr(options, 'chunkSeparator'): chunk_separator = options.chunkSeparator elif hasattr(options, 'mergeStrategy') and options.mergeStrategy: chunk_separator = options.mergeStrategy.get("chunkSeparator", "\n\n") # Group chunk results by document results_by_document = {} for chunk_result in chunkResults: doc_id = chunk_result.documentId if doc_id not in results_by_document: results_by_document[doc_id] = [] results_by_document[doc_id].append(chunk_result) # Sort chunks within each document by chunk index for doc_id in results_by_document: results_by_document[doc_id].sort(key=lambda x: x.chunkIndex) # Merge results for each document in CLEAN mode merged_documents = [] for doc_id, doc_chunks in results_by_document.items(): # CLEAN MODE: No document headers doc_header = "" # Merge chunks for this document doc_content = "" for i, chunk_result in enumerate(doc_chunks): # Add chunk separator (except for first chunk) if i > 0: doc_content += chunk_separator # Add chunk content without metadata chunk_metadata = chunk_result.metadata if chunk_metadata.get("success", False): chunk_content = chunk_result.aiResult # CLEAN MODE: Skip container/binary chunks entirely if chunk_content.startswith("[Skipped ") and "content:" in chunk_content: continue # Skip container/binary chunks in clean mode # CLEAN MODE: Skip empty or whitespace-only chunks if not chunk_content.strip(): continue # Skip empty chunks in clean mode # CLEAN MODE: No chunk metadata doc_content += chunk_content else: # Handle error chunks silently in clean mode continue merged_documents.append(doc_header + doc_content) # Join all documents final_result = "\n\n".join(merged_documents) logger.info(f"Merged {len(chunkResults)} chunks from {len(results_by_document)} documents (clean mode)") return final_result.strip() async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str: if len(content.encode("utf-8")) <= targetSize: return content try: compressionPrompt = f""" Komprimiere den folgenden {contentType} auf maximal {targetSize} Zeichen, behalte aber alle wichtigen Informationen bei: {content} Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen. """ # Service must not call connectors directly; use simple truncation fallback here data = content.encode("utf-8") return data[:targetSize].decode("utf-8", errors="ignore") + "... [truncated]" except Exception as e: logger.warning(f"AI compression failed, using truncation: {str(e)}") return content[:targetSize] + "... [truncated]" # ===== DYNAMIC GENERIC AI CALLS IMPLEMENTATION ===== async def callAi( self, prompt: str, documents: Optional[List[ChatDocument]] = None, placeholders: Optional[List[PromptPlaceholder]] = None, options: Optional[AiCallOptions] = None, outputFormat: Optional[str] = None, title: Optional[str] = None ) -> Union[str, Dict[str, Any]]: """ Unified AI call interface that automatically routes to appropriate handler. Args: prompt: The main prompt for the AI call documents: Optional list of documents to process placeholders: Optional list of placeholder replacements for planning calls options: AI call configuration options outputFormat: Optional output format (html, pdf, docx, txt, md, json, csv, xlsx) for document generation title: Optional title for generated documents Returns: AI response as string, or dict with documents if outputFormat is specified Raises: Exception: If all available models fail """ # Ensure aiObjects is initialized await self._ensureAiObjectsInitialized() if options is None: options = AiCallOptions() # Normalize placeholders from List[PromptPlaceholder] placeholders_dict: Dict[str, str] = {} placeholders_meta: Dict[str, bool] = {} if placeholders: placeholders_dict = {p.label: p.content for p in placeholders} placeholders_meta = {p.label: bool(getattr(p, 'summaryAllowed', False)) for p in placeholders} # Auto-determine call type based on documents and operation type call_type = self._determineCallType(documents, options.operationType) options.callType = call_type # Log the prompt being sent to AI for debugging (before routing) TODO TO REMOVE try: # Build the full prompt that will be sent to AI if placeholders: full_prompt = prompt for p in placeholders: placeholder = f"{{{{KEY:{p.label}}}}}" full_prompt = full_prompt.replace(placeholder, p.content) else: full_prompt = prompt self._writeAiResponseDebug( label='ai_prompt_debug', content=full_prompt, partIndex=1, modelName=None, continuation=False ) except Exception: pass # Handle document generation with specific output format if outputFormat: result = await self._callAiWithDocumentGeneration(prompt, documents, options, outputFormat, title) # Log AI response for debugging TODO TO REMOVE try: if isinstance(result, dict) and 'content' in result: self._writeAiResponseDebug( label='ai_document_generation', content=result['content'], partIndex=1, modelName=None, # Document generation doesn't return model info continuation=False ) except Exception: pass return result if call_type == "planning": result = await self._callAiPlanning(prompt, placeholders_dict, placeholders_meta, options) # Log AI response for debugging TODO TO REMOVE try: self._writeAiResponseDebug( label='ai_planning', content=result or "", partIndex=1, modelName=None, # Planning doesn't return model info continuation=False ) except Exception: pass return result else: # Set processDocumentsIndividually from the legacy parameter if not set in options if options.processDocumentsIndividually is None and documents: options.processDocumentsIndividually = False # Default to batch processing # For text calls, we need to build the full prompt with placeholders here # since _callAiText doesn't handle placeholders directly if placeholders_dict: full_prompt = self._buildPromptWithPlaceholders(prompt, placeholders_dict) else: full_prompt = prompt result = await self._callAiText(full_prompt, documents, options) # Log AI response for debugging (additional logging for text calls) TODO TO REMOVE try: self._writeAiResponseDebug( label='ai_text_main', content=result or "", partIndex=1, modelName=None, # Text calls already log internally continuation=False ) except Exception: pass return result def _determineCallType(self, documents: Optional[List[ChatDocument]], operation_type: str) -> str: """ Determine call type based on documents and operation type. Criteria: no documents AND operationType is "generate_plan" -> planning All other cases -> text """ has_documents = documents is not None and len(documents) > 0 is_planning_operation = operation_type == OperationType.GENERATE_PLAN if not has_documents and is_planning_operation: return "planning" else: return "text" async def _callAiPlanning( self, prompt: str, placeholders: Optional[Dict[str, str]], placeholdersMeta: Optional[Dict[str, bool]], options: AiCallOptions ) -> str: """ Handle planning calls with placeholder system and selective summarization. """ # Ensure aiObjects is initialized await self._ensureAiObjectsInitialized() # Build full prompt with placeholders; if too large, summarize summaryAllowed placeholders proportionally effective_placeholders = placeholders or {} full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) if options.compressPrompt and placeholdersMeta: # Determine model capacity try: caps = self._getModelCapabilitiesForContent(full_prompt, None, options) max_bytes = caps.get("maxContextBytes", len(full_prompt.encode("utf-8"))) except Exception: max_bytes = len(full_prompt.encode("utf-8")) current_bytes = len(full_prompt.encode("utf-8")) if current_bytes > max_bytes: # Compute total bytes contributed by allowed placeholders (approximate by content length) allowed_labels = [l for l, allow in placeholdersMeta.items() if allow] allowed_sizes = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels} total_allowed = sum(allowed_sizes.values()) overage = current_bytes - max_bytes if total_allowed > 0 and overage > 0: # Target total for allowed after reduction target_allowed = max(total_allowed - overage, 0) # Global ratio to apply across allowed placeholders ratio = target_allowed / total_allowed if total_allowed > 0 else 1.0 ratio = max(0.0, min(1.0, ratio)) reduced: Dict[str, str] = {} for label, content in effective_placeholders.items(): if label in allowed_labels and isinstance(content, str) and len(content) > 0: old_len = len(content) # Reduce by proportional ratio on characters (fallback if empty) reduction_factor = ratio if old_len > 0 else 1.0 reduced[label] = self._reduceText(content, reduction_factor) else: reduced[label] = content effective_placeholders = reduced full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) # If still slightly over, perform a second-pass fine adjustment with updated ratio current_bytes = len(full_prompt.encode("utf-8")) if current_bytes > max_bytes and total_allowed > 0: overage2 = current_bytes - max_bytes # Recompute allowed sizes after first reduction allowed_sizes2 = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels} total_allowed2 = sum(allowed_sizes2.values()) if total_allowed2 > 0 and overage2 > 0: target_allowed2 = max(total_allowed2 - overage2, 0) ratio2 = target_allowed2 / total_allowed2 ratio2 = max(0.0, min(1.0, ratio2)) reduced2: Dict[str, str] = {} for label, content in effective_placeholders.items(): if label in allowed_labels and isinstance(content, str) and len(content) > 0: old_len = len(content) reduction_factor = ratio2 if old_len > 0 else 1.0 reduced2[label] = self._reduceText(content, reduction_factor) else: reduced2[label] = content effective_placeholders = reduced2 full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders) # Make AI call using AiObjects (let it handle model selection) request = AiCallRequest( prompt=full_prompt, context="", # Context is already included in the prompt options=options ) response = await self.aiObjects.call(request) try: logger.debug(f"AI model selected (planning): {getattr(response, 'modelName', 'unknown')}") except Exception: pass return response.content async def _callAiText( self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions ) -> str: """ Handle text calls with document processing through ExtractionService. UNIFIED PROCESSING: Always use per-chunk processing for consistency. """ # Ensure aiObjects is initialized await self._ensureAiObjectsInitialized() # UNIFIED PROCESSING: Always use per-chunk processing for consistency # This ensures MIME-type checking, chunk mapping, and parallel processing return await self._processDocumentsPerChunk(documents, prompt, options) async def _callAiTextClean( self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions ) -> str: """ Handle text calls with document processing in CLEAN mode for document generation. This version excludes debug metadata and document headers from the final output. """ # Ensure aiObjects is initialized await self._ensureAiObjectsInitialized() # Process documents with clean merging (no debug metadata) return await self._processDocumentsPerChunkClean(documents, prompt, options) def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions) -> Dict[str, int]: """ Get model capabilities for content processing, including appropriate size limits for chunking. """ # Estimate total content size prompt_size = len(prompt.encode('utf-8')) document_size = 0 if documents: # Rough estimate of document content size for doc in documents: document_size += doc.fileSize or 0 total_size = prompt_size + document_size # Use AiObjects to select the best model for this content size # We'll simulate the model selection by checking available models from modules.interfaces.interfaceAiObjects import aiModels # Find the best model for this content size and operation best_model = None best_context_length = 0 for model_name, model_info in aiModels.items(): context_length = model_info.get("contextLength", 0) # Skip models with no context length or too small for content if context_length == 0: continue # Check if model supports the operation type capabilities = model_info.get("capabilities", []) if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities: continue elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities: continue elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities: continue elif "text_generation" not in capabilities: continue # Prefer models that can handle the content without chunking, but allow chunking if needed if context_length >= total_size * 0.8: # 80% of content size if context_length > best_context_length: best_model = model_info best_context_length = context_length elif best_model is None: # Fallback to largest available model if context_length > best_context_length: best_model = model_info best_context_length = context_length # Fallback to a reasonable default if no model found if best_model is None: best_model = { "contextLength": 128000, # GPT-4o default "llmName": "gpt-4o" } # Calculate appropriate sizes # Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters) context_length_bytes = int(best_model["contextLength"] * 4) max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}") logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes") logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes") return { "maxContextBytes": max_context_bytes, "textChunkSize": text_chunk_size, "imageChunkSize": image_chunk_size } def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]: """ Get models capable of handling the specific operation with capability filtering. """ # Use the actual AI objects model selection instead of hardcoded default if hasattr(self, 'aiObjects') and self.aiObjects: # Let AiObjects handle the model selection return [] else: # Fallback to default model if AiObjects not available default_model = ModelCapabilities( name="default", maxTokens=4000, capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"], costPerToken=0.001, processingTime=1.0, isAvailable=True ) return [default_model] def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str: """ Build full prompt by replacing placeholders with their content. Uses the new {{KEY:placeholder}} format. """ if not placeholders: return prompt full_prompt = prompt for placeholder, content in placeholders.items(): # Replace both old format {{placeholder}} and new format {{KEY:placeholder}} full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content) full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content) return full_prompt def _writeTraceLog(self, contextText: str, data: Any) -> None: """Write raw data to the central trace log file without truncation.""" try: import os import json from datetime import datetime, UTC # Only write if logger is in debug mode if logger.level > logging.DEBUG: return # Get log directory from configuration via service center if possible logDir = None try: if self.serviceCenter and hasattr(self.serviceCenter, 'utils'): logDir = self.serviceCenter.utils.configGet("APP_LOGGING_LOG_DIR", "./") except Exception: pass if not logDir: logDir = "./" if not os.path.isabs(logDir): # Make it relative to gateway directory gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) logDir = os.path.join(gatewayDir, logDir) os.makedirs(logDir, exist_ok=True) traceFile = os.path.join(logDir, "log_trace.log") timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] traceEntry = f"[{timestamp}] {contextText}\n" + ("=" * 80) + "\n" if data is None: traceEntry += "No data provided\n" else: # Prefer exact text; if dict/list, pretty print JSON try: if isinstance(data, (dict, list)): traceEntry += f"JSON Data:\n{json.dumps(data, indent=2, ensure_ascii=False)}\n" else: text = str(data) traceEntry += f"Text Data:\n{text}\n" except Exception: traceEntry += f"Data (fallback): {str(data)}\n" traceEntry += ("=" * 80) + "\n\n" with open(traceFile, "a", encoding="utf-8") as f: f.write(traceEntry) except Exception: # Swallow to avoid recursive logging issues pass def _writeAiResponseDebug(self, label: str, content: str, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None: """Persist raw AI response parts for debugging under test-chat/ai.""" try: import os from datetime import datetime, UTC # Base dir: gateway/test-chat/ai (go up 4 levels from this file) # .../gateway/modules/services/serviceAi/mainServiceAi.py -> up to gateway root gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) outDir = os.path.join(gatewayDir, 'test-chat', 'ai') os.makedirs(outDir, exist_ok=True) ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3] suffix = [] if partIndex is not None: suffix.append(f"part{partIndex}") if continuation is not None: suffix.append(f"cont_{str(continuation).lower()}") if modelName: safeModel = ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in modelName) suffix.append(safeModel) suffixStr = ('_' + '_'.join(suffix)) if suffix else '' fname = f"{ts}_{label}{suffixStr}.txt" fpath = os.path.join(outDir, fname) with open(fpath, 'w', encoding='utf-8') as f: f.write(content or '') except Exception: # Do not raise; best-effort debug write pass def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool: """ Check if text exceeds model token limit with safety margin. """ # Simple character-based estimation (4 chars per token) estimated_tokens = len(text) // 4 max_tokens = int(model.maxTokens * (1 - safety_margin)) return estimated_tokens > max_tokens def _reducePlanningPrompt( self, full_prompt: str, placeholders: Optional[Dict[str, str]], model: ModelCapabilities, options: AiCallOptions ) -> str: """ Reduce planning prompt size by summarizing placeholders while preserving prompt structure. """ if not placeholders: return self._reduceText(full_prompt, 0.7) # Reduce placeholders while preserving prompt reduced_placeholders = {} for placeholder, content in placeholders.items(): if len(content) > 1000: # Only reduce long content reduction_factor = 0.7 reduced_content = self._reduceText(content, reduction_factor) reduced_placeholders[placeholder] = reduced_content else: reduced_placeholders[placeholder] = content return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders) def _reduceTextPrompt( self, prompt: str, context: str, model: ModelCapabilities, options: AiCallOptions ) -> str: """ Reduce text prompt size using typeGroup-aware chunking and merging. """ max_size = int(model.maxTokens * (1 - options.safetyMargin)) if options.compressPrompt: # Reduce both prompt and context target_size = max_size current_size = len(prompt) + len(context) reduction_factor = (target_size * 0.7) / current_size if reduction_factor < 1.0: prompt = self._reduceText(prompt, reduction_factor) context = self._reduceText(context, reduction_factor) else: # Only reduce context, preserve prompt integrity max_context_size = max_size - len(prompt) if len(context) > max_context_size: reduction_factor = max_context_size / len(context) context = self._reduceText(context, reduction_factor) return prompt + "\n\n" + context if context else prompt def _extractTextFromContentParts(self, extracted_content) -> str: """ Extract text content from ExtractionService ContentPart objects. """ if not extracted_content or not hasattr(extracted_content, 'parts'): return "" text_parts = [] for part in extracted_content.parts: if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']: if hasattr(part, 'data') and part.data: text_parts.append(part.data) return "\n\n".join(text_parts) def _reduceText(self, text: str, reduction_factor: float) -> str: """ Reduce text size by the specified factor. """ if reduction_factor >= 1.0: return text target_length = int(len(text) * reduction_factor) return text[:target_length] + "... [reduced]" async def _callAiWithDocumentGeneration( self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions, outputFormat: str, title: Optional[str] ) -> Dict[str, Any]: """ Handle AI calls with document generation in specific output format. Args: prompt: The main prompt for the AI call documents: Optional list of documents to process options: AI call configuration options outputFormat: Target output format (html, pdf, docx, txt, md, json, csv, xlsx) title: Optional title for generated documents Returns: Dict with generated documents and metadata """ try: # Get format-specific extraction prompt from generation service from modules.services.serviceGeneration.mainServiceGeneration import GenerationService generation_service = GenerationService(self.serviceCenter) # Use default title if not provided if not title: title = "AI Generated Document" # Get format-specific extraction prompt extractionPrompt = await generation_service.getExtractionPrompt( outputFormat=outputFormat, userPrompt=prompt, title=title, aiService=self ) # Process documents with format-specific prompt using CLEAN mode # This ensures no debug metadata is included in the final output aiResponse = await self._callAiTextClean(extractionPrompt, documents, options) # Parse filename header from AI response if present parsedFilename = None try: if aiResponse: firstNewline = aiResponse.find('\n') headerLine = aiResponse if firstNewline == -1 else aiResponse[:firstNewline] if headerLine.strip().lower().startswith('filename:'): parsed = headerLine.split(':', 1)[1].strip() # basic sanitization import re parsed = re.sub(r"[^a-zA-Z0-9._-]", "-", parsed) parsed = re.sub(r"-+", "-", parsed).strip('-') if parsed: parsedFilename = parsed # remove header line from content for rendering aiResponse = aiResponse[firstNewline+1:].lstrip('\n') if firstNewline != -1 else '' except Exception: parsedFilename = None if not aiResponse or aiResponse.strip() == "": raise Exception("AI content generation failed") # Render the content to the specified format renderedContent, mimeType = await generation_service.renderReport( extractedContent=aiResponse, outputFormat=outputFormat, title=title, userPrompt=prompt, aiService=self ) # Generate meaningful filename (use AI-provided if valid, else fallback) from datetime import datetime, UTC timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S") if parsedFilename and parsedFilename.lower().endswith(f".{outputFormat.lower()}"): filename = parsedFilename else: safeTitle = ''.join(c if c.isalnum() else '-' for c in (title or 'document')).strip('-') filename = f"{safeTitle or 'document'}-{timestamp}.{outputFormat}" # Return structured result with document information return { "success": True, "content": aiResponse, # Raw AI response "rendered_content": renderedContent, # Formatted content "mime_type": mimeType, "filename": filename, "format": outputFormat, "title": title, "documents": [{ "documentName": filename, "documentData": renderedContent, "mimeType": mimeType }] } except Exception as e: logger.error(f"Error in document generation: {str(e)}") return { "success": False, "error": str(e), "content": "", "rendered_content": "", "mime_type": "text/plain", "filename": f"error_{outputFormat}", "format": outputFormat, "title": title or "Error", "documents": [] }