gateway/modules/services/serviceAi/mainServiceAi.py
2025-10-14 00:30:47 +02:00

2419 lines
119 KiB
Python

import logging
from typing import Dict, Any, List, Optional, Tuple, Union
from modules.datamodels.datamodelChat import PromptPlaceholder
from modules.datamodels.datamodelChat import ChatDocument
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority
from modules.datamodels.datamodelExtraction import ChunkResult, ContentExtracted
from modules.datamodels.datamodelWeb import (
WebResearchRequest,
WebResearchActionResult,
WebResearchDocumentData,
WebResearchActionDocument,
WebSearchResultItem,
)
from modules.interfaces.interfaceAiObjects import AiObjects
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
# Model registry is now provided by interfaces via AiModels
class AiService:
"""Centralized AI service orchestrating documents, model selection, failover, and web operations.
"""
def __init__(self, serviceCenter=None) -> None:
"""Initialize AI service with service center access.
Args:
serviceCenter: Service center instance for accessing other services
"""
self.services = serviceCenter
# Only depend on interfaces
self.aiObjects = None # Will be initialized in create()
self._extractionService = None # Lazy initialization
@property
def extractionService(self):
"""Lazy initialization of extraction service."""
if self._extractionService is None:
logger.info("Lazy initializing ExtractionService...")
self._extractionService = ExtractionService(self.services)
return self._extractionService
async def _ensureAiObjectsInitialized(self):
"""Ensure aiObjects is initialized."""
if self.aiObjects is None:
logger.info("Lazy initializing AiObjects...")
self.aiObjects = await AiObjects.create()
logger.info("AiObjects initialization completed")
@classmethod
async def create(cls, serviceCenter=None) -> "AiService":
"""Create AiService instance with all connectors initialized."""
logger.info("AiService.create() called")
instance = cls(serviceCenter)
logger.info("AiService created, about to call AiObjects.create()...")
instance.aiObjects = await AiObjects.create()
logger.info("AiObjects.create() completed")
return instance
# AI Image Analysis
async def readImage(
self,
prompt: str,
imageData: Union[str, bytes],
mimeType: str = None,
options: Optional[AiCallOptions] = None,
) -> str:
"""Call AI for image analysis using interface.callImage()."""
try:
# Check if imageData is valid
if not imageData:
error_msg = "No image data provided"
self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {error_msg}")
return f"Error: {error_msg}"
self.services.utils.debugLogToFile(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}", "AI_SERVICE")
logger.info(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}")
# Always use IMAGE_ANALYSIS operation type for image processing
if options is None:
options = AiCallOptions(operationType=OperationType.IMAGE_ANALYSIS)
else:
# Override the operation type to ensure image analysis
options.operationType = OperationType.IMAGE_ANALYSIS
self.services.utils.debugLogToFile(f"Calling aiObjects.callImage with operationType: {options.operationType}", "AI_SERVICE")
logger.info(f"Calling aiObjects.callImage with operationType: {options.operationType}")
result = await self.aiObjects.callImage(prompt, imageData, mimeType, options)
# Debug the result
self.services.utils.debugLogToFile(f"Raw AI result type: {type(result)}, value: {repr(result)}", "AI_SERVICE")
# Check if result is valid
if not result or (isinstance(result, str) and not result.strip()):
error_msg = f"No response from AI image analysis (result: {repr(result)})"
self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {error_msg}")
return f"Error: {error_msg}"
self.services.utils.debugLogToFile(f"callImage returned: {result[:200]}..." if len(result) > 200 else result, "AI_SERVICE")
logger.info(f"callImage returned: {result[:200]}..." if len(result) > 200 else result)
return result
except Exception as e:
self.services.utils.debugLogToFile(f"Error in AI image analysis: {str(e)}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {str(e)}")
return f"Error: {str(e)}"
# AI Image Generation
async def generateImage(
self,
prompt: str,
size: str = "1024x1024",
quality: str = "standard",
style: str = "vivid",
options: Optional[AiCallOptions] = None,
) -> Dict[str, Any]:
"""Generate an image using AI using interface.generateImage()."""
try:
return await self.aiObjects.generateImage(prompt, size, quality, style, options)
except Exception as e:
logger.error(f"Error in AI image generation: {str(e)}")
return {"success": False, "error": str(e)}
# Web Research - Using interface functions
async def webResearch(self, request: WebResearchRequest) -> WebResearchActionResult:
"""Perform web research using interface functions."""
try:
logger.info(f"WEB RESEARCH STARTED")
logger.info(f"User Query: {request.user_prompt}")
logger.info(f"Max Results: {request.max_results}, Max Pages: {request.options.max_pages}")
# Global URL index to track all processed URLs across the entire research session
global_processed_urls = set()
# Step 1: Find relevant websites - either provided URLs or AI-determined main URLs
logger.info(f"=== STEP 1: INITIAL MAIN URLS LIST ===")
if request.urls:
# Use provided URLs as initial main URLs
websites = request.urls
logger.info(f"Using provided URLs ({len(websites)}):")
for i, url in enumerate(websites, 1):
logger.info(f" {i}. {url}")
else:
# Use AI to determine main URLs based on user's intention
logger.info(f"AI analyzing user intent: '{request.user_prompt}'")
# Use AI to generate optimized Tavily search query and search parameters
query_optimizer_prompt = f"""You are a search query optimizer.
USER QUERY: {request.user_prompt}
Your task: Create a search query and parameters for the USER QUERY given.
RULES:
1. The search query MUST be related to the user query above
2. Extract key terms from the user query
3. Determine appropriate country/language based on the query context
4. Keep search query short (2-6 words)
Return ONLY this JSON format:
{{
"user_prompt": "search query based on user query above",
"country": "Full English country name (ISO-3166; map codes via pycountry/i18n-iso-countries)",
"language": "language_code_or_null",
"topic": "general|news|academic_or_null",
"time_range": "d|w|m|y_or_null",
"selection_strategy": "single|multiple|specific_page",
"selection_criteria": "what URLs to prioritize",
"expected_url_patterns": ["pattern1", "pattern2"],
"estimated_result_count": number
}}"""
# Get AI response for query optimization
ai_request = AiCallRequest(
prompt=query_optimizer_prompt,
options=AiCallOptions()
)
ai_response_obj = await self.aiObjects.call(ai_request)
ai_response = ai_response_obj.content
logger.debug(f"AI query optimizer response: {ai_response}")
# Parse AI response to extract search query
import json
try:
# Clean the response by removing markdown code blocks
cleaned_response = ai_response.strip()
if cleaned_response.startswith('```json'):
cleaned_response = cleaned_response[7:] # Remove ```json
if cleaned_response.endswith('```'):
cleaned_response = cleaned_response[:-3] # Remove ```
cleaned_response = cleaned_response.strip()
query_data = json.loads(cleaned_response)
search_query = query_data.get("user_prompt", request.user_prompt)
ai_country = query_data.get("country")
ai_language = query_data.get("language")
ai_topic = query_data.get("topic")
ai_time_range = query_data.get("time_range")
selection_strategy = query_data.get("selection_strategy", "multiple")
selection_criteria = query_data.get("selection_criteria", "relevant URLs")
expected_patterns = query_data.get("expected_url_patterns", [])
estimated_count = query_data.get("estimated_result_count", request.max_results)
logger.info(f"AI optimized search query: '{search_query}'")
logger.info(f"Selection strategy: {selection_strategy}")
logger.info(f"Selection criteria: {selection_criteria}")
logger.info(f"Expected URL patterns: {expected_patterns}")
logger.info(f"Estimated result count: {estimated_count}")
except json.JSONDecodeError:
logger.warning("Failed to parse AI response as JSON, using original query")
search_query = request.user_prompt
ai_country = None
ai_language = None
ai_topic = None
ai_time_range = None
selection_strategy = "multiple"
# Perform the web search with AI-determined parameters
search_kwargs = {
"query": search_query,
"max_results": request.max_results,
"search_depth": request.options.search_depth,
"auto_parameters": False # Use explicit parameters
}
# Add parameters only if they have valid values
def _normalizeCountry(c: Optional[str]) -> Optional[str]:
if not c:
return None
s = str(c).strip()
if not s or s.lower() in ['null', 'none', 'undefined']:
return None
# Map common codes to full English names when easy to do without extra deps
mapping = {
'ch': 'Switzerland', 'che': 'Switzerland',
'de': 'Germany', 'ger': 'Germany', 'deu': 'Germany',
'at': 'Austria', 'aut': 'Austria',
'us': 'United States', 'usa': 'United States', 'uni ted states': 'United States',
'uk': 'United Kingdom', 'gb': 'United Kingdom', 'gbr': 'United Kingdom'
}
key = s.lower()
if key in mapping:
return mapping[key]
# If looks like full name, capitalize first letter only (Tavily accepts English names)
return s
norm_ai_country = _normalizeCountry(ai_country)
norm_req_country = _normalizeCountry(request.options.country)
if norm_ai_country:
search_kwargs["country"] = norm_ai_country
elif norm_req_country:
search_kwargs["country"] = norm_req_country
if ai_language and ai_language not in ['null', '', 'none', 'undefined']:
search_kwargs["language"] = ai_language
elif request.options.language and request.options.language not in ['null', '', 'none', 'undefined']:
search_kwargs["language"] = request.options.language
if ai_topic and ai_topic in ['general', 'news', 'academic']:
search_kwargs["topic"] = ai_topic
elif request.options.topic and request.options.topic in ['general', 'news', 'academic']:
search_kwargs["topic"] = request.options.topic
if ai_time_range and ai_time_range in ['d', 'w', 'm', 'y']:
search_kwargs["time_range"] = ai_time_range
elif request.options.time_range and request.options.time_range in ['d', 'w', 'm', 'y']:
search_kwargs["time_range"] = request.options.time_range
# Constrain by expected domains if provided by AI
try:
include_domains = []
for p in expected_patterns or []:
if not isinstance(p, str):
continue
# Extract bare domain from pattern or URL
import re
m = re.search(r"(?:https?://)?([^/\s]+)", p.strip())
if m:
domain = m.group(1).lower()
# strip leading www.
if domain.startswith('www.'):
domain = domain[4:]
include_domains.append(domain)
# Deduplicate
if include_domains:
seen = set()
uniq = []
for d in include_domains:
if d not in seen:
seen.add(d)
uniq.append(d)
search_kwargs["include_domains"] = uniq
except Exception:
pass
# Log the parameters being used
logger.info(f"Search parameters: country={search_kwargs.get('country', 'not_set')}, language={search_kwargs.get('language', 'not_set')}, topic={search_kwargs.get('topic', 'not_set')}, time_range={search_kwargs.get('time_range', 'not_set')}, include_domains={search_kwargs.get('include_domains', [])}")
search_results = await self.aiObjects.search_websites(**search_kwargs)
logger.debug(f"Web search returned {len(search_results)} results:")
for i, result in enumerate(search_results, 1):
logger.debug(f" {i}. {result.url} - {result.title}")
# Deduplicate while preserving order
seen = set()
search_urls = []
for r in search_results:
u = str(r.url)
if u not in seen:
seen.add(u)
search_urls.append(u)
logger.info(f"After initial deduplication: {len(search_urls)} unique URLs from {len(search_results)} search results")
if not search_urls:
logger.error("No relevant websites found")
return WebResearchActionResult(success=False, error="No relevant websites found")
# Now use AI to determine the main URLs based on user's intention
logger.info(f"AI selecting main URLs from {len(search_urls)} search results based on user intent")
# Create a prompt for AI to identify main URLs based on user's intention
ai_prompt = f"""
Select the most relevant URLs from these search results:
{chr(10).join([f"{i+1}. {url}" for i, url in enumerate(search_urls)])}
Return only the URLs that are most relevant for the user's query.
One URL per line.
"""
# Create AI call request
ai_request = AiCallRequest(
prompt=ai_prompt,
options=AiCallOptions()
)
ai_response_obj = await self.aiObjects.call(ai_request)
ai_response = ai_response_obj.content
logger.debug(f"AI response for main URL selection: {ai_response}")
# Parse AI response to extract URLs
websites = []
for line in ai_response.strip().split('\n'):
line = line.strip()
if line and ('http://' in line or 'https://' in line):
# Extract URL from the line
for word in line.split():
if word.startswith('http://') or word.startswith('https://'):
websites.append(word.rstrip('.,;'))
break
if not websites:
logger.warning("AI did not identify any main URLs, using first few search results")
websites = search_urls[:3] # Fallback to first 3 search results
# Deduplicate while preserving order
seen = set()
unique_websites = []
for url in websites:
if url not in seen:
seen.add(url)
unique_websites.append(url)
websites = unique_websites
logger.info(f"After AI selection deduplication: {len(websites)} unique URLs from {len(websites)} AI-selected URLs")
logger.info(f"AI selected {len(websites)} main URLs (after deduplication):")
for i, url in enumerate(websites, 1):
logger.info(f" {i}. {url}")
# Step 2: Smart website selection using AI interface
logger.info(f"=== STEP 2: FILTERED URL LIST BY USER PROMPT'S INTENTION ===")
logger.info(f"AI analyzing {len(websites)} URLs for relevance to: '{request.user_prompt}'")
selectedWebsites, aiResponse = await self.aiObjects.selectRelevantWebsites(websites, request.user_prompt)
logger.debug(f"AI Response: {aiResponse}")
logger.debug(f"AI selected {len(selectedWebsites)} most relevant URLs:")
for i, url in enumerate(selectedWebsites, 1):
logger.debug(f" {i}. {url}")
# Show which were filtered out
filtered_out = [url for url in websites if url not in selectedWebsites]
if filtered_out:
logger.debug(f"Filtered out {len(filtered_out)} less relevant URLs:")
for i, url in enumerate(filtered_out, 1):
logger.debug(f" {i}. {url}")
# Step 3+4+5: Recursive crawling with configurable depth
# Get configuration parameters
max_depth = int(APP_CONFIG.get("Web_Research_MAX_DEPTH", "2"))
max_links_per_domain = int(APP_CONFIG.get("Web_Research_MAX_LINKS_PER_DOMAIN", "4"))
crawl_timeout_minutes = int(APP_CONFIG.get("Web_Research_CRAWL_TIMEOUT_MINUTES", "10"))
crawl_timeout_seconds = crawl_timeout_minutes * 60
# Use the configured max_depth or the request's pages_search_depth, whichever is smaller
effective_depth = min(max_depth, request.options.pages_search_depth)
logger.info(f"=== STEP 3+4+5: RECURSIVE CRAWLING (DEPTH {effective_depth}) ===")
logger.info(f"Starting recursive crawl of {len(selectedWebsites)} main websites...")
logger.info(f"Search depth: {effective_depth} levels (max configured: {max_depth})")
logger.info(f"Max links per domain: {max_links_per_domain}")
logger.info(f"Crawl timeout: {crawl_timeout_minutes} minutes")
# Use recursive crawling with URL index to avoid duplicates
import asyncio
try:
allContent = await asyncio.wait_for(
self.aiObjects.crawlRecursively(
urls=selectedWebsites,
max_depth=effective_depth,
extract_depth=request.options.extract_depth,
max_per_domain=max_links_per_domain,
global_processed_urls=global_processed_urls
),
timeout=crawl_timeout_seconds
)
logger.info(f"Crawling completed within timeout: {len(allContent)} pages crawled")
except asyncio.TimeoutError:
logger.warning(f"Crawling timed out after {crawl_timeout_minutes} minutes, using partial results")
# crawlRecursively now handles timeouts gracefully and returns partial results
# Try to get the partial results that were collected
allContent = {}
if not allContent:
logger.error("Could not extract content from any websites")
return WebResearchActionResult(success=False, error="Could not extract content from any websites")
logger.info(f"=== WEB RESEARCH COMPLETED ===")
logger.info(f"Successfully crawled {len(allContent)} URLs total")
logger.info(f"Crawl depth: {effective_depth} levels")
# Create simple result with raw content
sources = [WebSearchResultItem(title=url, url=url) for url in selectedWebsites]
# Get all additional links (all URLs except main ones)
additional_links = [url for url in allContent.keys() if url not in selectedWebsites]
# Combine all content into a single result
combinedContent = ""
for url, content in allContent.items():
combinedContent += f"\n\n=== {url} ===\n{content}\n"
documentData = WebResearchDocumentData(
user_prompt=request.user_prompt,
websites_analyzed=len(allContent),
additional_links_found=len(additional_links),
analysis_result=combinedContent, # Raw content, no analysis
sources=sources,
additional_links=additional_links,
individual_content=allContent, # Individual URL -> content mapping
debug_info={
"crawl_depth": effective_depth,
"max_configured_depth": max_depth,
"max_links_per_domain": max_links_per_domain,
"crawl_timeout_minutes": crawl_timeout_minutes,
"total_urls_crawled": len(allContent),
"main_urls": len(selectedWebsites),
"additional_urls": len(additional_links)
}
)
document = WebResearchActionDocument(
documentName=f"web_research_{request.user_prompt[:50]}.json",
documentData=documentData,
mimeType="application/json"
)
return WebResearchActionResult(
success=True,
documents=[document],
resultLabel="web_research_results"
)
except Exception as e:
logger.error(f"Error in web research: {str(e)}")
return WebResearchActionResult(success=False, error=str(e))
def _calculateMaxContextBytes(self, options: Optional[AiCallOptions]) -> int:
"""Calculate maximum context bytes based on model capabilities and options."""
if options and options.maxContextBytes:
return options.maxContextBytes
# Default model capabilities (this should be enhanced with actual model registry)
defaultMaxTokens = 4000
safetyMargin = options.safetyMargin if options else 0.1
# Calculate bytes (4 chars per token estimation)
maxContextBytes = int(defaultMaxTokens * (1 - safetyMargin) * 4)
return maxContextBytes
async def _processDocumentsPerChunk(
self,
documents: List[ChatDocument],
prompt: str,
options: Optional[AiCallOptions] = None
) -> str:
"""
Process documents with per-chunk AI calls and merge results.
FIXED: Now preserves chunk relationships and document structure.
Args:
documents: List of ChatDocument objects to process
prompt: AI prompt for processing
options: AI call options
Returns:
Merged AI results as string with preserved document structure
"""
if not documents:
return ""
# Get model capabilities for size calculation
model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options)
# Build extraction options for chunking with intelligent merging
extractionOptions: Dict[str, Any] = {
"prompt": prompt,
"operationType": options.operationType if options else "general",
"processDocumentsIndividually": True, # Process each document separately
"maxSize": model_capabilities["maxContextBytes"],
"chunkAllowed": True,
"textChunkSize": model_capabilities["textChunkSize"],
"imageChunkSize": model_capabilities["imageChunkSize"],
"imageMaxPixels": 1024 * 1024,
"imageQuality": 85,
"mergeStrategy": {
"useIntelligentMerging": True, # Enable intelligent token-aware merging
"modelCapabilities": model_capabilities,
"prompt": prompt,
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
},
}
logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.get('prompt', ''))} chars, operationType={extractionOptions.get('operationType')}")
try:
# Extract content with chunking
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
return "[Error: No extraction results]"
# FIXED: Process chunks with proper mapping
chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options)
# FIXED: Merge with preserved chunk relationships
mergedContent = self._mergeChunkResults(chunkResults, options)
return mergedContent
except Exception as e:
logger.error(f"Error in per-chunk processing: {str(e)}")
return f"[Error in per-chunk processing: {str(e)}]"
async def _processDocumentsPerChunkJson(
self,
documents: List[ChatDocument],
prompt: str,
options: Optional[AiCallOptions] = None
) -> Dict[str, Any]:
"""
Process documents with per-chunk AI calls and merge results in JSON mode.
Returns structured JSON document instead of text.
"""
if not documents:
return {"metadata": {"title": "Empty Document"}, "sections": []}
# Get model capabilities for size calculation
model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options)
# Build extraction options for chunking with intelligent merging
extractionOptions: Dict[str, Any] = {
"prompt": prompt,
"operationType": options.operationType if options else "general",
"processDocumentsIndividually": True, # Process each document separately
"maxSize": model_capabilities["maxContextBytes"],
"chunkAllowed": True,
"textChunkSize": model_capabilities["textChunkSize"],
"imageChunkSize": model_capabilities["imageChunkSize"],
"imageMaxPixels": 1024 * 1024,
"imageQuality": 85,
"mergeStrategy": {
"useIntelligentMerging": True, # Enable intelligent token-aware merging
"modelCapabilities": model_capabilities,
"prompt": prompt,
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
},
}
logger.debug(f"Per-chunk extraction options (JSON mode): prompt length={len(extractionOptions.get('prompt', ''))} chars, operationType={extractionOptions.get('operationType')}")
try:
# Extract content with chunking
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
return {"metadata": {"title": "Error Document"}, "sections": []}
# Process chunks with proper mapping
chunkResults = await self._processChunksWithMapping(extractionResult, prompt, options, generate_json=True)
# Merge with JSON mode
mergedJsonDocument = self._mergeChunkResultsJson(chunkResults, options)
return mergedJsonDocument
except Exception as e:
logger.error(f"Error in per-chunk processing (JSON mode): {str(e)}")
return {"metadata": {"title": "Error Document"}, "sections": []}
async def _processChunksWithMapping(
self,
extractionResult: List[ContentExtracted],
prompt: str,
options: Optional[AiCallOptions] = None,
generate_json: bool = False
) -> List[ChunkResult]:
"""Process chunks with proper mapping to preserve relationships."""
from modules.datamodels.datamodelExtraction import ChunkResult
import asyncio
import time
# Collect all chunks that need processing with proper indexing
chunks_to_process = []
chunk_index = 0
for ec in extractionResult:
# Get document MIME type from metadata
document_mime_type = None
for part in ec.parts:
if part.metadata and 'documentMimeType' in part.metadata:
document_mime_type = part.metadata['documentMimeType']
break
for part in ec.parts:
if part.typeGroup in ("text", "table", "structure", "image", "container", "binary"):
# Skip empty container chunks (they're just metadata containers)
if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0):
logger.debug(f"Skipping empty container chunk: mimeType={part.mimeType}, data_length={len(part.data) if part.data else 0}")
continue
chunks_to_process.append({
'part': part,
'chunk_index': chunk_index,
'document_id': ec.id,
'document_mime_type': document_mime_type
})
chunk_index += 1
logger.info(f"Processing {len(chunks_to_process)} chunks with proper mapping")
# Process chunks in parallel with proper mapping
async def process_single_chunk(chunk_info: Dict) -> ChunkResult:
part = chunk_info['part']
chunk_index = chunk_info['chunk_index']
document_id = chunk_info['document_id']
document_mime_type = chunk_info.get('document_mime_type', part.mimeType)
start_time = time.time()
try:
# FIXED: Check MIME type first, then fallback to typeGroup
is_image = (
(document_mime_type and document_mime_type.startswith('image/')) or
(part.mimeType and part.mimeType.startswith('image/')) or
(part.typeGroup == "image")
)
# Debug logging
self.services.utils.debugLogToFile(f"Chunk {chunk_index}: document_mime_type={document_mime_type}, part.mimeType={part.mimeType}, part.typeGroup={part.typeGroup}, is_image={is_image}", "AI_SERVICE")
logger.info(f"Chunk {chunk_index}: document_mime_type={document_mime_type}, part.mimeType={part.mimeType}, part.typeGroup={part.typeGroup}, is_image={is_image}")
if is_image:
# Use the same extraction prompt for image analysis (contains table JSON format)
self.services.utils.debugLogToFile(f"Processing image chunk {chunk_index}: mimeType={part.mimeType}, data_length={len(part.data) if part.data else 0}", "AI_SERVICE")
# Check if image data is available
if not part.data:
error_msg = f"No image data available for chunk {chunk_index}"
logger.warning(error_msg)
ai_result = f"Error: {error_msg}"
else:
try:
ai_result = await self.readImage(
prompt=prompt,
imageData=part.data,
mimeType=part.mimeType,
options=options
)
self.services.utils.debugLogToFile(f"Image analysis result for chunk {chunk_index}: length={len(ai_result) if ai_result else 0}, preview={ai_result[:200] if ai_result else 'None'}...", "AI_SERVICE")
# Check if result is empty or None
if not ai_result or not ai_result.strip():
logger.warning(f"Image chunk {chunk_index} returned empty response from AI")
ai_result = "No content detected in image"
except Exception as e:
logger.error(f"Error processing image chunk {chunk_index}: {str(e)}")
ai_result = f"Error analyzing image: {str(e)}"
# If generating JSON, clean image analysis result
if generate_json:
try:
import json
import re
# Clean the response - remove markdown code blocks if present
cleaned_result = ai_result.strip()
# Remove various markdown patterns
if cleaned_result.startswith('```json'):
cleaned_result = re.sub(r'^```json\s*', '', cleaned_result)
cleaned_result = re.sub(r'\s*```$', '', cleaned_result)
elif cleaned_result.startswith('```'):
cleaned_result = re.sub(r'^```\s*', '', cleaned_result)
cleaned_result = re.sub(r'\s*```$', '', cleaned_result)
# Remove any leading/trailing text that's not JSON
# Look for the first { and last } to extract JSON
first_brace = cleaned_result.find('{')
last_brace = cleaned_result.rfind('}')
if first_brace != -1 and last_brace != -1 and last_brace > first_brace:
cleaned_result = cleaned_result[first_brace:last_brace + 1]
# Additional cleaning for common AI response issues
cleaned_result = cleaned_result.strip()
# Validate JSON
json.loads(cleaned_result)
ai_result = cleaned_result # Use cleaned version
self.services.utils.debugLogToFile(f"Image chunk {chunk_index} JSON validation successful", "AI_SERVICE")
except json.JSONDecodeError as e:
logger.warning(f"Image chunk {chunk_index} returned invalid JSON: {str(e)}")
logger.warning(f"Raw response was: '{ai_result[:500]}...'")
# Create fallback JSON with the actual response content (not the error message)
# Use the original AI response content, not the error message
fallback_content = ai_result if ai_result and ai_result.strip() else "No content detected"
self.services.utils.debugLogToFile(f"IMAGE FALLBACK CONTENT PREVIEW: '{fallback_content[:200]}...'", "AI_SERVICE")
ai_result = json.dumps({
"metadata": {"title": f"Image Analysis - Chunk {chunk_index}"},
"sections": [{
"id": f"image_section_{chunk_index}",
"type": "paragraph",
"data": {"text": fallback_content}
}]
})
self.services.utils.debugLogToFile(f"Created fallback JSON for image chunk {chunk_index} with actual content", "AI_SERVICE")
elif part.typeGroup in ("container", "binary"):
# Handle ALL container and binary content generically - let AI process any document type
self.services.utils.debugLogToFile(f"DEBUG: Chunk {chunk_index}: typeGroup={part.typeGroup}, mimeType={part.mimeType}, data_length={len(part.data) if part.data else 0}", "AI_SERVICE")
# Skip empty container chunks (they're just metadata containers)
if part.typeGroup == "container" and (not part.data or len(part.data.strip()) == 0):
self.services.utils.debugLogToFile(f"DEBUG: Skipping empty container - mimeType={part.mimeType}, data_length={len(part.data) if part.data else 0}", "AI_SERVICE")
logger.info(f"Chunk {chunk_index}: Skipping empty container - mimeType={part.mimeType}, data_length={len(part.data) if part.data else 0}")
# Skip processing this chunk
pass
elif part.mimeType and part.data and len(part.data.strip()) > 0:
# Process any document container as text content
request_options = options if options is not None else AiCallOptions()
request_options.operationType = OperationType.GENERAL
self.services.utils.debugLogToFile(f"EXTRACTION CONTAINER CHUNK {chunk_index}: Processing {part.mimeType} container as text with generate_json={generate_json}", "AI_SERVICE")
logger.info(f"Chunk {chunk_index}: Processing {part.mimeType} container as text with generate_json={generate_json}")
# Log extraction prompt and context
self.services.utils.debugLogToFile(f"EXTRACTION PROMPT: {prompt}", "AI_SERVICE")
self.services.utils.debugLogToFile(f"EXTRACTION CONTEXT LENGTH: {len(part.data) if part.data else 0} characters", "AI_SERVICE")
request = AiCallRequest(
prompt=prompt,
context=part.data,
options=request_options
)
response = await self.aiObjects.call(request)
ai_result = response.content
# Log extraction response
self.services.utils.debugLogToFile(f"EXTRACTION RESPONSE LENGTH: {len(ai_result) if ai_result else 0} characters", "AI_SERVICE")
# Save full extraction prompt and response to debug file - only if debug enabled
debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
if debug_enabled:
try:
import os
from datetime import datetime, UTC
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
debug_root = "./test-chat/ai"
os.makedirs(debug_root, exist_ok=True)
with open(os.path.join(debug_root, f"{ts}_extraction_container_chunk_{chunk_index}.txt"), "w", encoding="utf-8") as f:
f.write(f"EXTRACTION PROMPT:\n{prompt}\n\n")
f.write(f"EXTRACTION CONTEXT:\n{part.data if part.data else 'No context'}\n\n")
f.write(f"EXTRACTION RESPONSE:\n{ai_result if ai_result else 'No response'}\n")
except Exception:
pass
# If generating JSON, validate the response
if generate_json:
try:
import json
import re
# Clean the response - remove markdown code blocks if present
cleaned_result = ai_result.strip()
# Remove various markdown patterns
if cleaned_result.startswith('```json'):
cleaned_result = re.sub(r'^```json\s*', '', cleaned_result)
cleaned_result = re.sub(r'\s*```$', '', cleaned_result)
elif cleaned_result.startswith('```'):
cleaned_result = re.sub(r'^```\s*', '', cleaned_result)
cleaned_result = re.sub(r'\s*```$', '', cleaned_result)
# Remove any leading/trailing text that's not JSON
# Look for the first { and last } to extract JSON
first_brace = cleaned_result.find('{')
last_brace = cleaned_result.rfind('}')
if first_brace != -1 and last_brace != -1 and last_brace > first_brace:
cleaned_result = cleaned_result[first_brace:last_brace + 1]
# Additional cleaning for common AI response issues
cleaned_result = cleaned_result.strip()
# Validate JSON
json.loads(cleaned_result)
ai_result = cleaned_result # Use cleaned version
except json.JSONDecodeError as e:
logger.warning(f"Container chunk {chunk_index} ({part.mimeType}) returned invalid JSON: {str(e)}")
logger.warning(f"Raw response was: '{ai_result[:500]}...'")
# Create fallback JSON with the actual response content (not the error message)
# Use the original AI response content, not the error message
fallback_content = ai_result if ai_result and ai_result.strip() else "No content detected"
self.services.utils.debugLogToFile(f"FALLBACK CONTENT PREVIEW: '{fallback_content[:200]}...'", "AI_SERVICE")
ai_result = json.dumps({
"metadata": {"title": f"Document Analysis - Chunk {chunk_index}"},
"sections": [{
"id": f"analysis_section_{chunk_index}",
"type": "paragraph",
"data": {"text": fallback_content}
}]
})
self.services.utils.debugLogToFile(f"Created fallback JSON for container chunk {chunk_index} with actual content", "AI_SERVICE")
else:
# Skip empty or invalid container/binary content - don't create a result
self.services.utils.debugLogToFile(f"DEBUG: Chunk {chunk_index}: Skipping empty container - mimeType={part.mimeType}, data_length={len(part.data) if part.data else 0}", "AI_SERVICE")
# Return None to indicate this chunk should be completely skipped
return None
else:
# Ensure options is not None and set correct operation type for text
request_options = options if options is not None else AiCallOptions()
# FIXED: Set operation type to general for text processing
request_options.operationType = OperationType.GENERAL
self.services.utils.debugLogToFile(f"EXTRACTION CHUNK {chunk_index}: Calling aiObjects.call with operationType={request_options.operationType}, generate_json={generate_json}", "AI_SERVICE")
logger.info(f"Chunk {chunk_index}: Calling aiObjects.call with operationType={request_options.operationType}, generate_json={generate_json}")
# Log extraction context length
self.services.utils.debugLogToFile(f"EXTRACTION CONTEXT LENGTH: {len(part.data) if part.data else 0} characters", "AI_SERVICE")
# Debug: Log the actual prompt being sent to AI
logger.debug(f"AI PROMPT PREVIEW: {prompt[:300]}...")
logger.debug(f"AI CONTEXT PREVIEW: {part.data[:200] if part.data else 'None'}...")
request = AiCallRequest(
prompt=prompt,
context=part.data,
options=request_options
)
response = await self.aiObjects.call(request)
# Debug: Log what AI actually returned
logger.debug(f"AI RESPONSE PREVIEW: {response.content[:300] if response.content else 'None'}...")
ai_result = response.content
# Log extraction response length
self.services.utils.debugLogToFile(f"EXTRACTION RESPONSE LENGTH: {len(ai_result) if ai_result else 0} characters", "AI_SERVICE")
# Save extraction response to debug file (without verbose prompt) - only if debug enabled
debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
if debug_enabled:
try:
import os
from datetime import datetime, UTC
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
debug_root = "./test-chat/ai"
os.makedirs(debug_root, exist_ok=True)
with open(os.path.join(debug_root, f"{ts}_extraction_chunk_{chunk_index}.txt"), "w", encoding="utf-8") as f:
f.write(f"EXTRACTION RESPONSE:\n{ai_result if ai_result else 'No response'}\n")
except Exception:
pass
# If generating JSON, validate the response
if generate_json:
try:
import json
import re
# Clean the response - remove markdown code blocks and extra formatting
cleaned_result = ai_result.strip()
# Remove any markdown code block markers (```json, ```, etc.)
cleaned_result = re.sub(r'^```(?:json)?\s*', '', cleaned_result, flags=re.MULTILINE)
cleaned_result = re.sub(r'\s*```\s*$', '', cleaned_result, flags=re.MULTILINE)
# Remove any remaining ``` markers anywhere in the text
cleaned_result = re.sub(r'```', '', cleaned_result)
# Try to extract JSON from the response if it's embedded in other text
json_match = re.search(r'\{.*\}', cleaned_result, re.DOTALL)
if json_match:
cleaned_result = json_match.group(0)
# Validate JSON
json.loads(cleaned_result)
ai_result = cleaned_result # Use cleaned version
except json.JSONDecodeError as e:
logger.warning(f"Chunk {chunk_index} returned invalid JSON: {str(e)}")
# Create fallback JSON
ai_result = json.dumps({
"metadata": {"title": "Error Section"},
"sections": [{
"id": f"error_section_{chunk_index}",
"type": "paragraph",
"data": {"text": f"Error parsing JSON: {str(e)}"}
}]
})
processing_time = time.time() - start_time
logger.info(f"Chunk {chunk_index} processed: {len(ai_result)} chars in {processing_time:.2f}s")
return ChunkResult(
originalChunk=part,
aiResult=ai_result,
chunkIndex=chunk_index,
documentId=document_id,
processingTime=processing_time,
metadata={
"success": True,
"chunkSize": len(part.data) if part.data else 0,
"resultSize": len(ai_result),
"typeGroup": part.typeGroup
}
)
except Exception as e:
processing_time = time.time() - start_time
logger.warning(f"Error processing chunk {chunk_index}: {str(e)}")
return ChunkResult(
originalChunk=part,
aiResult=f"[Error processing chunk: {str(e)}]",
chunkIndex=chunk_index,
documentId=document_id,
processingTime=processing_time,
metadata={
"success": False,
"error": str(e),
"chunkSize": len(part.data) if part.data else 0,
"typeGroup": part.typeGroup
}
)
# Process chunks with concurrency control
max_concurrent = 5 # Default concurrency
if options and hasattr(options, 'maxConcurrentChunks'):
max_concurrent = options.maxConcurrentChunks
elif options and hasattr(options, 'maxParallelChunks'):
max_concurrent = options.maxParallelChunks
logger.info(f"Processing {len(chunks_to_process)} chunks with max concurrency: {max_concurrent}")
self.services.utils.debugLogToFile(f"DEBUG: Chunks to process: {len(chunks_to_process)}", "AI_SERVICE")
for i, chunk_info in enumerate(chunks_to_process):
self.services.utils.debugLogToFile(f"DEBUG: Chunk {i}: typeGroup={chunk_info['part'].typeGroup}, mimeType={chunk_info['part'].mimeType}, data_length={len(chunk_info['part'].data) if chunk_info['part'].data else 0}", "AI_SERVICE")
# Create semaphore for concurrency control
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_semaphore(chunk_info):
async with semaphore:
return await process_single_chunk(chunk_info)
# Process all chunks in parallel with concurrency control
tasks = [process_with_semaphore(chunk_info) for chunk_info in chunks_to_process]
self.services.utils.debugLogToFile(f"DEBUG: Created {len(tasks)} tasks for parallel processing", "AI_SERVICE")
chunk_results = await asyncio.gather(*tasks, return_exceptions=True)
self.services.utils.debugLogToFile(f"DEBUG: Got {len(chunk_results)} results from parallel processing", "AI_SERVICE")
# Handle any exceptions in the gather itself
processed_results = []
for i, result in enumerate(chunk_results):
if isinstance(result, Exception):
# Create error ChunkResult
chunk_info = chunks_to_process[i]
processed_results.append(ChunkResult(
originalChunk=chunk_info['part'],
aiResult=f"[Error in parallel processing: {str(result)}]",
chunkIndex=chunk_info['chunk_index'],
documentId=chunk_info['document_id'],
processingTime=0.0,
metadata={"success": False, "error": str(result)}
))
elif result is not None:
# Only add non-None results (skip empty containers)
processed_results.append(result)
logger.info(f"Completed processing {len(processed_results)} chunks")
return processed_results
def _mergeChunkResults(
self,
chunkResults: List[ChunkResult],
options: Optional[AiCallOptions] = None
) -> str:
"""Merge chunk results while preserving document structure and chunk order."""
if not chunkResults:
return ""
# Get merging configuration from options
chunk_separator = "\n\n---\n\n"
include_document_headers = True
include_chunk_metadata = False
if options:
if hasattr(options, 'chunkSeparator'):
chunk_separator = options.chunkSeparator
elif hasattr(options, 'mergeStrategy') and options.mergeStrategy:
chunk_separator = options.mergeStrategy.get("chunkSeparator", "\n\n---\n\n")
# Check for enhanced options
if hasattr(options, 'preserveChunkMetadata'):
include_chunk_metadata = options.preserveChunkMetadata
# Group chunk results by document
results_by_document = {}
for chunk_result in chunkResults:
doc_id = chunk_result.documentId
if doc_id not in results_by_document:
results_by_document[doc_id] = []
results_by_document[doc_id].append(chunk_result)
# Sort chunks within each document by chunk index
for doc_id in results_by_document:
results_by_document[doc_id].sort(key=lambda x: x.chunkIndex)
# Merge results for each document
merged_documents = []
for doc_id, doc_chunks in results_by_document.items():
# Build document header if enabled
doc_header = ""
if include_document_headers:
doc_header = f"\n\n=== DOCUMENT: {doc_id} ===\n\n"
# Merge chunks for this document
doc_content = ""
for i, chunk_result in enumerate(doc_chunks):
# Add chunk separator (except for first chunk)
if i > 0:
doc_content += chunk_separator
# Add chunk content with optional metadata
chunk_metadata = chunk_result.metadata
if chunk_metadata.get("success", False):
chunk_content = chunk_result.aiResult
# Add chunk metadata if enabled
if include_chunk_metadata:
chunk_info = f"[Chunk {chunk_result.chunkIndex} - {chunk_metadata.get('typeGroup', 'unknown')} - {chunk_metadata.get('chunkSize', 0)} chars]"
chunk_content = f"{chunk_info}\n{chunk_content}"
doc_content += chunk_content
else:
# Handle error chunks
error_msg = f"[ERROR in chunk {chunk_result.chunkIndex}: {chunk_metadata.get('error', 'Unknown error')}]"
doc_content += error_msg
merged_documents.append(doc_header + doc_content)
# Join all documents
final_result = "\n\n".join(merged_documents)
logger.info(f"Merged {len(chunkResults)} chunks from {len(results_by_document)} documents")
return final_result.strip()
def _mergeChunkResultsClean(
self,
chunkResults: List[ChunkResult],
options: Optional[AiCallOptions] = None
) -> str:
"""Merge chunk results in CLEAN mode - no debug metadata or document headers."""
if not chunkResults:
return ""
# Get merging configuration from options
chunk_separator = "\n\n"
include_document_headers = False # CLEAN MODE: No document headers
include_chunk_metadata = False # CLEAN MODE: No chunk metadata
if options:
if hasattr(options, 'chunkSeparator'):
chunk_separator = options.chunkSeparator
elif hasattr(options, 'mergeStrategy') and options.mergeStrategy:
chunk_separator = options.mergeStrategy.get("chunkSeparator", "\n\n")
# Group chunk results by document
results_by_document = {}
for chunk_result in chunkResults:
doc_id = chunk_result.documentId
if doc_id not in results_by_document:
results_by_document[doc_id] = []
results_by_document[doc_id].append(chunk_result)
# Sort chunks within each document by chunk index
for doc_id in results_by_document:
results_by_document[doc_id].sort(key=lambda x: x.chunkIndex)
# Merge results for each document in CLEAN mode
merged_documents = []
for doc_id, doc_chunks in results_by_document.items():
# CLEAN MODE: No document headers
doc_header = ""
# Merge chunks for this document
doc_content = ""
for i, chunk_result in enumerate(doc_chunks):
# Add chunk separator (except for first chunk)
if i > 0:
doc_content += chunk_separator
# Add chunk content without metadata
chunk_metadata = chunk_result.metadata
if chunk_metadata.get("success", False):
chunk_content = chunk_result.aiResult
# CLEAN MODE: Skip container/binary chunks entirely
if chunk_content.startswith("[Skipped ") and "content:" in chunk_content:
continue # Skip container/binary chunks in clean mode
# CLEAN MODE: Skip empty or whitespace-only chunks
if not chunk_content.strip():
continue # Skip empty chunks in clean mode
# CLEAN MODE: No chunk metadata
doc_content += chunk_content
else:
# Handle error chunks silently in clean mode
continue
merged_documents.append(doc_header + doc_content)
# Join all documents
final_result = "\n\n".join(merged_documents)
def _mergeChunkResultsJson(
self,
chunkResults: List[ChunkResult],
options: Optional[AiCallOptions] = None
) -> Dict[str, Any]:
"""Merge chunk results in JSON mode - returns structured JSON document."""
import json
if not chunkResults:
return {"metadata": {"title": "Empty Document"}, "sections": []}
# Group chunk results by document
results_by_document = {}
for chunk_result in chunkResults:
doc_id = chunk_result.documentId
if doc_id not in results_by_document:
results_by_document[doc_id] = []
results_by_document[doc_id].append(chunk_result)
# Sort chunks within each document by chunk index
for doc_id in results_by_document:
results_by_document[doc_id].sort(key=lambda x: x.chunkIndex)
# Merge JSON results for each document
all_documents = []
all_sections = []
document_titles = []
combined_metadata = {"title": "Merged Document", "splitStrategy": "by_section"}
for doc_id, doc_chunks in results_by_document.items():
# Process each chunk's JSON result
for chunk_result in doc_chunks:
chunk_metadata = chunk_result.metadata
if chunk_metadata.get("success", False):
try:
# Parse JSON from AI result
chunk_json = json.loads(chunk_result.aiResult)
# Check if this is a multi-file response (has "documents" key)
if isinstance(chunk_json, dict) and "documents" in chunk_json:
# This is a multi-file response - merge all documents
logger.debug(f"Processing multi-file response from chunk {chunk_result.chunkIndex} with {len(chunk_json['documents'])} documents")
# Add all documents from this chunk
for doc in chunk_json["documents"]:
# Add chunk context to document
doc["metadata"] = doc.get("metadata", {})
doc["metadata"]["source_chunk"] = chunk_result.chunkIndex
doc["metadata"]["source_document"] = doc_id
all_documents.append(doc)
# Update combined metadata
if "metadata" in chunk_json:
combined_metadata.update(chunk_json["metadata"])
# Extract sections from single-file response (fallback)
elif isinstance(chunk_json, dict) and "sections" in chunk_json:
for section in chunk_json["sections"]:
# Add document context to section
section["metadata"] = section.get("metadata", {})
section["metadata"]["source_document"] = doc_id
section["metadata"]["chunk_index"] = chunk_result.chunkIndex
all_sections.append(section)
# Extract document title
if isinstance(chunk_json, dict) and "metadata" in chunk_json:
title = chunk_json["metadata"].get("title", "")
if title and title not in document_titles:
document_titles.append(title)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON from chunk {chunk_result.chunkIndex}: {str(e)}")
# Create a fallback section for invalid JSON
fallback_section = {
"id": f"error_section_{chunk_result.chunkIndex}",
"title": "Error Section",
"content_type": "paragraph",
"elements": [{
"text": f"Error parsing chunk {chunk_result.chunkIndex}: {str(e)}"
}],
"order": chunk_result.chunkIndex,
"metadata": {
"source_document": doc_id,
"chunk_index": chunk_result.chunkIndex,
"error": str(e)
}
}
all_sections.append(fallback_section)
else:
# Handle error chunks
error_section = {
"id": f"error_section_{chunk_result.chunkIndex}",
"title": "Error Section",
"content_type": "paragraph",
"elements": [{
"text": f"Error in chunk {chunk_result.chunkIndex}: {chunk_metadata.get('error', 'Unknown error')}"
}],
"order": chunk_result.chunkIndex,
"metadata": {
"source_document": doc_id,
"chunk_index": chunk_result.chunkIndex,
"error": chunk_metadata.get('error', 'Unknown error')
}
}
all_sections.append(error_section)
# Sort sections by order
all_sections.sort(key=lambda x: x.get("order", 0))
# If we have merged documents from multi-file responses, return them
if all_documents:
logger.info(f"Merged {len(all_documents)} documents from {len(chunkResults)} chunks")
return {
"metadata": combined_metadata,
"documents": all_documents
}
# Otherwise, create merged document with sections (single-file fallback)
merged_document = {
"metadata": {
"title": document_titles[0] if document_titles else "Merged Document",
"source_documents": list(results_by_document.keys()),
"extraction_method": "ai_json_extraction",
"version": "1.0"
},
"sections": all_sections,
"summary": f"Merged document from {len(results_by_document)} source documents",
"tags": ["merged", "ai_generated"]
}
logger.info(f"Merged {len(chunkResults)} chunks from {len(results_by_document)} documents (JSON mode)")
return merged_document
async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str:
if len(content.encode("utf-8")) <= targetSize:
return content
try:
compressionPrompt = f"""
Komprimiere den folgenden {contentType} auf maximal {targetSize} Zeichen,
behalte aber alle wichtigen Informationen bei:
{content}
Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
"""
# Service must not call connectors directly; use simple truncation fallback here
data = content.encode("utf-8")
return data[:targetSize].decode("utf-8", errors="ignore") + "... [truncated]"
except Exception as e:
logger.warning(f"AI compression failed, using truncation: {str(e)}")
return content[:targetSize] + "... [truncated]"
# ===== DYNAMIC GENERIC AI CALLS IMPLEMENTATION =====
async def callAi(
self,
prompt: str,
documents: Optional[List[ChatDocument]] = None,
placeholders: Optional[List[PromptPlaceholder]] = None,
options: Optional[AiCallOptions] = None,
outputFormat: Optional[str] = None,
title: Optional[str] = None
) -> Union[str, Dict[str, Any]]:
"""
Unified AI call interface that automatically routes to appropriate handler.
Args:
prompt: The main prompt for the AI call
documents: Optional list of documents to process
placeholders: Optional list of placeholder replacements for planning calls
options: AI call configuration options
outputFormat: Optional output format (html, pdf, docx, txt, md, json, csv, xlsx) for document generation
title: Optional title for generated documents
Returns:
AI response as string, or dict with documents if outputFormat is specified
Raises:
Exception: If all available models fail
"""
# Ensure aiObjects is initialized
await self._ensureAiObjectsInitialized()
if options is None:
options = AiCallOptions()
# Normalize placeholders from List[PromptPlaceholder]
placeholders_dict: Dict[str, str] = {}
placeholders_meta: Dict[str, bool] = {}
if placeholders:
placeholders_dict = {p.label: p.content for p in placeholders}
placeholders_meta = {p.label: bool(getattr(p, 'summaryAllowed', False)) for p in placeholders}
# Auto-determine call type based on documents and operation type
call_type = self._determineCallType(documents, options.operationType)
options.callType = call_type
try:
# Build the full prompt that will be sent to AI
if placeholders:
full_prompt = prompt
for p in placeholders:
placeholder = f"{{{{KEY:{p.label}}}}}"
full_prompt = full_prompt.replace(placeholder, p.content)
else:
full_prompt = prompt
self._writeAiResponseDebug(
label='ai_prompt_debug',
content=full_prompt,
partIndex=1,
modelName=None,
continuation=False
)
except Exception:
pass
# Handle document generation with specific output format
if outputFormat:
result = await self._callAiWithDocumentGeneration(prompt, documents, options, outputFormat, title)
# Log AI response for debugging
try:
if isinstance(result, dict) and 'content' in result:
self._writeAiResponseDebug(
label='ai_document_generation',
content=result['content'],
partIndex=1,
modelName=None, # Document generation doesn't return model info
continuation=False
)
except Exception:
pass
return result
if call_type == "planning":
result = await self._callAiPlanning(prompt, placeholders_dict, placeholders_meta, options)
# Log AI response for debugging
try:
self._writeAiResponseDebug(
label='ai_planning',
content=result or "",
partIndex=1,
modelName=None, # Planning doesn't return model info
continuation=False
)
except Exception:
pass
return result
else:
# Set processDocumentsIndividually from the legacy parameter if not set in options
if options.processDocumentsIndividually is None and documents:
options.processDocumentsIndividually = False # Default to batch processing
# For text calls, we need to build the full prompt with placeholders here
# since _callAiText doesn't handle placeholders directly
if placeholders_dict:
full_prompt = self._buildPromptWithPlaceholders(prompt, placeholders_dict)
else:
full_prompt = prompt
result = await self._callAiText(full_prompt, documents, options)
# Log AI response for debugging (additional logging for text calls)
try:
self._writeAiResponseDebug(
label='ai_text_main',
content=result or "",
partIndex=1,
modelName=None, # Text calls already log internally
continuation=False
)
except Exception:
pass
return result
def _determineCallType(self, documents: Optional[List[ChatDocument]], operation_type: str) -> str:
"""
Determine call type based on documents and operation type.
Criteria: no documents AND operationType is "generate_plan" -> planning
All other cases -> text
"""
has_documents = documents is not None and len(documents) > 0
is_planning_operation = operation_type == OperationType.GENERATE_PLAN
if not has_documents and is_planning_operation:
return "planning"
else:
return "text"
async def _callAiPlanning(
self,
prompt: str,
placeholders: Optional[Dict[str, str]],
placeholdersMeta: Optional[Dict[str, bool]],
options: AiCallOptions
) -> str:
"""
Handle planning calls with placeholder system and selective summarization.
"""
# Ensure aiObjects is initialized
await self._ensureAiObjectsInitialized()
# Build full prompt with placeholders; if too large, summarize summaryAllowed placeholders proportionally
effective_placeholders = placeholders or {}
full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders)
if options.compressPrompt and placeholdersMeta:
# Determine model capacity
try:
caps = self._getModelCapabilitiesForContent(full_prompt, None, options)
max_bytes = caps.get("maxContextBytes", len(full_prompt.encode("utf-8")))
except Exception:
max_bytes = len(full_prompt.encode("utf-8"))
current_bytes = len(full_prompt.encode("utf-8"))
if current_bytes > max_bytes:
# Compute total bytes contributed by allowed placeholders (approximate by content length)
allowed_labels = [l for l, allow in placeholdersMeta.items() if allow]
allowed_sizes = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels}
total_allowed = sum(allowed_sizes.values())
overage = current_bytes - max_bytes
if total_allowed > 0 and overage > 0:
# Target total for allowed after reduction
target_allowed = max(total_allowed - overage, 0)
# Global ratio to apply across allowed placeholders
ratio = target_allowed / total_allowed if total_allowed > 0 else 1.0
ratio = max(0.0, min(1.0, ratio))
reduced: Dict[str, str] = {}
for label, content in effective_placeholders.items():
if label in allowed_labels and isinstance(content, str) and len(content) > 0:
old_len = len(content)
# Reduce by proportional ratio on characters (fallback if empty)
reduction_factor = ratio if old_len > 0 else 1.0
reduced[label] = self._reduceText(content, reduction_factor)
else:
reduced[label] = content
effective_placeholders = reduced
full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders)
# If still slightly over, perform a second-pass fine adjustment with updated ratio
current_bytes = len(full_prompt.encode("utf-8"))
if current_bytes > max_bytes and total_allowed > 0:
overage2 = current_bytes - max_bytes
# Recompute allowed sizes after first reduction
allowed_sizes2 = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels}
total_allowed2 = sum(allowed_sizes2.values())
if total_allowed2 > 0 and overage2 > 0:
target_allowed2 = max(total_allowed2 - overage2, 0)
ratio2 = target_allowed2 / total_allowed2
ratio2 = max(0.0, min(1.0, ratio2))
reduced2: Dict[str, str] = {}
for label, content in effective_placeholders.items():
if label in allowed_labels and isinstance(content, str) and len(content) > 0:
old_len = len(content)
reduction_factor = ratio2 if old_len > 0 else 1.0
reduced2[label] = self._reduceText(content, reduction_factor)
else:
reduced2[label] = content
effective_placeholders = reduced2
full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders)
# Make AI call using AiObjects (let it handle model selection)
request = AiCallRequest(
prompt=full_prompt,
context="", # Context is already included in the prompt
options=options
)
response = await self.aiObjects.call(request)
try:
logger.debug(f"AI model selected (planning): {getattr(response, 'modelName', 'unknown')}")
except Exception:
pass
return response.content
async def _callAiText(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions
) -> str:
"""
Handle text calls with document processing through ExtractionService.
UNIFIED PROCESSING: Always use per-chunk processing for consistency.
"""
# Ensure aiObjects is initialized
await self._ensureAiObjectsInitialized()
# UNIFIED PROCESSING: Always use per-chunk processing for consistency
# This ensures MIME-type checking, chunk mapping, and parallel processing
return await self._processDocumentsPerChunk(documents, prompt, options)
async def _callAiDirect(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions
) -> Dict[str, Any]:
"""
Call AI directly with prompt and documents for JSON output.
Used for multi-file generation - uses the existing generation pipeline.
"""
# Use the existing generation pipeline that already works
# This ensures proper document processing and content extraction
logger.info(f"Using existing generation pipeline for {len(documents) if documents else 0} documents")
# Process documents with JSON merging using the existing pipeline
result = await self._processDocumentsPerChunkJson(documents, prompt, options)
# Convert single-file result to multi-file format if needed
if "sections" in result and "documents" not in result:
logger.info("Converting single-file result to multi-file format")
# This is a single-file result, convert it to multi-file format
return {
"metadata": result.get("metadata", {"title": "Converted Document"}),
"documents": [{
"id": "doc_1",
"title": result.get("metadata", {}).get("title", "Document"),
"filename": "document.txt",
"sections": result.get("sections", [])
}]
}
return result
async def _processDocumentsPerChunkJsonWithPrompt(
self,
documents: List[ChatDocument],
custom_prompt: str,
options: Optional[AiCallOptions] = None
) -> Dict[str, Any]:
"""
Process documents with per-chunk AI calls and merge results in JSON mode.
Uses a custom prompt instead of the default extraction prompt.
"""
if not documents:
return {"metadata": {"title": "Empty Document"}, "sections": []}
# Get model capabilities for size calculation
model_capabilities = self._getModelCapabilitiesForContent(custom_prompt, documents, options)
# Build extraction options for chunking with intelligent merging
extractionOptions: Dict[str, Any] = {
"prompt": custom_prompt, # Use the custom prompt instead of default
"operationType": options.operationType if options else "general",
"processDocumentsIndividually": True, # Process each document separately
"maxSize": model_capabilities["maxContextBytes"],
"chunkAllowed": True,
"textChunkSize": model_capabilities["textChunkSize"],
"imageChunkSize": model_capabilities["imageChunkSize"],
"imageMaxPixels": 1024 * 1024,
"imageQuality": 85,
"mergeStrategy": {
"useIntelligentMerging": True, # Enable intelligent token-aware merging
"modelCapabilities": model_capabilities,
"prompt": custom_prompt, # Use the custom prompt
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
},
}
logger.debug(f"Per-chunk extraction options (JSON mode): prompt length={len(extractionOptions.get('prompt', ''))} chars, operationType={extractionOptions.get('operationType')}")
try:
# Extract content with chunking
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
return {"metadata": {"title": "Error Document"}, "sections": []}
# Process chunks with proper mapping
logger.info(f"Processing {len(extractionResult)} chunks with custom prompt")
logger.debug(f"Custom prompt preview: {custom_prompt[:200]}...")
# Debug: Show what content is being processed (before filtering)
for i, ec in enumerate(extractionResult):
if hasattr(ec, 'parts'):
for j, part in enumerate(ec.parts):
if not (hasattr(part, 'data') and part.data):
# Check if this is an empty container chunk (which is expected)
part_type = getattr(part, 'typeGroup', None)
part_mime = getattr(part, 'mimeType', '')
is_empty_container = (
part_type == "container" and
part_mime and
'document' in part_mime.lower()
)
if not is_empty_container:
logger.warning(f"Part {j} has no data - typeGroup='{part_type}', mimeType='{part_mime}'")
chunkResults = await self._processChunksWithMapping(extractionResult, custom_prompt, options, generate_json=True)
# Debug: Show what chunks were actually processed (after filtering)
logger.info(f"After filtering: {len(chunkResults)} chunks will be processed")
# Merge with JSON mode
mergedJsonDocument = self._mergeChunkResultsJson(chunkResults, options)
# Debug: Show what the AI actually returned
logger.info(f"AI returned document with keys: {list(mergedJsonDocument.keys())}")
if 'documents' in mergedJsonDocument:
logger.info(f"Number of documents: {len(mergedJsonDocument['documents'])}")
elif 'sections' in mergedJsonDocument:
logger.info(f"Number of sections: {len(mergedJsonDocument['sections'])}")
return mergedJsonDocument
except Exception as e:
logger.error(f"Error in per-chunk JSON processing: {str(e)}")
return {"metadata": {"title": "Error Document"}, "sections": []}
async def _callAiJson(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions
) -> Dict[str, Any]:
"""
Handle AI calls with document processing for JSON output.
Returns structured JSON document instead of text.
"""
# Ensure aiObjects is initialized
await self._ensureAiObjectsInitialized()
# Process documents with JSON merging
return await self._processDocumentsPerChunkJson(documents, prompt, options)
def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions) -> Dict[str, int]:
"""
Get model capabilities for content processing, including appropriate size limits for chunking.
"""
# Estimate total content size
prompt_size = len(prompt.encode('utf-8'))
document_size = 0
if documents:
# Rough estimate of document content size
for doc in documents:
document_size += doc.fileSize or 0
total_size = prompt_size + document_size
# Use AiObjects to select the best model for this content size
# We'll simulate the model selection by checking available models
from modules.interfaces.interfaceAiObjects import aiModels
# Find the best model for this content size and operation
best_model = None
best_context_length = 0
for model_name, model_info in aiModels.items():
context_length = model_info.get("contextLength", 0)
# Skip models with no context length or too small for content
if context_length == 0:
continue
# Check if model supports the operation type
capabilities = model_info.get("capabilities", [])
if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities:
continue
elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities:
continue
elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities:
continue
elif "text_generation" not in capabilities:
continue
# Prefer models that can handle the content without chunking, but allow chunking if needed
if context_length >= total_size * 0.8: # 80% of content size
if context_length > best_context_length:
best_model = model_info
best_context_length = context_length
elif best_model is None: # Fallback to largest available model
if context_length > best_context_length:
best_model = model_info
best_context_length = context_length
# Fallback to a reasonable default if no model found
if best_model is None:
best_model = {
"contextLength": 128000, # GPT-4o default
"llmName": "gpt-4o"
}
# Calculate appropriate sizes
# Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters)
context_length_bytes = int(best_model["contextLength"] * 4)
max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length
text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks
image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks
logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}")
logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes")
logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes")
return {
"maxContextBytes": max_context_bytes,
"textChunkSize": text_chunk_size,
"imageChunkSize": image_chunk_size
}
def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]:
"""
Get models capable of handling the specific operation with capability filtering.
"""
# Use the actual AI objects model selection instead of hardcoded default
if hasattr(self, 'aiObjects') and self.aiObjects:
# Let AiObjects handle the model selection
return []
else:
# Fallback to default model if AiObjects not available
default_model = ModelCapabilities(
name="default",
maxTokens=4000,
capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"],
costPerToken=0.001,
processingTime=1.0,
isAvailable=True
)
return [default_model]
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
"""
Build full prompt by replacing placeholders with their content.
Uses the new {{KEY:placeholder}} format.
"""
if not placeholders:
return prompt
full_prompt = prompt
for placeholder, content in placeholders.items():
# Replace both old format {{placeholder}} and new format {{KEY:placeholder}}
full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content)
full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content)
return full_prompt
def _writeTraceLog(self, contextText: str, data: Any) -> None:
"""Write raw data to the central trace log file without truncation."""
try:
import os
import json
from datetime import datetime, UTC
# Only write if logger is in debug mode
if logger.level > logging.DEBUG:
return
# Get log directory from configuration via service center if possible
logDir = None
try:
logDir = self.services.utils.configGet("APP_LOGGING_LOG_DIR", "./")
except Exception:
pass
if not logDir:
logDir = "./"
if not os.path.isabs(logDir):
# Make it relative to gateway directory
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logDir = os.path.join(gatewayDir, logDir)
os.makedirs(logDir, exist_ok=True)
traceFile = os.path.join(logDir, "log_trace.log")
timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
traceEntry = f"[{timestamp}] {contextText}\n" + ("=" * 80) + "\n"
if data is None:
traceEntry += "No data provided\n"
else:
# Prefer exact text; if dict/list, pretty print JSON
try:
if isinstance(data, (dict, list)):
traceEntry += f"JSON Data:\n{json.dumps(data, indent=2, ensure_ascii=False)}\n"
else:
text = str(data)
traceEntry += f"Text Data:\n{text}\n"
except Exception:
traceEntry += f"Data (fallback): {str(data)}\n"
traceEntry += ("=" * 80) + "\n\n"
with open(traceFile, "a", encoding="utf-8") as f:
f.write(traceEntry)
except Exception:
# Swallow to avoid recursive logging issues
pass
def _writeAiResponseDebug(self, label: str, content: str, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None:
"""Persist raw AI response parts for debugging under test-chat/ai - only if debug enabled."""
try:
# Check if debug logging is enabled
debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
if not debug_enabled:
return
import os
from datetime import datetime, UTC
# Base dir: gateway/test-chat/ai (go up 4 levels from this file)
# .../gateway/modules/services/serviceAi/mainServiceAi.py -> up to gateway root
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
outDir = os.path.join(gatewayDir, 'test-chat', 'ai')
os.makedirs(outDir, exist_ok=True)
ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
suffix = []
if partIndex is not None:
suffix.append(f"part{partIndex}")
if continuation is not None:
suffix.append(f"cont_{str(continuation).lower()}")
if modelName:
safeModel = ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in modelName)
suffix.append(safeModel)
suffixStr = ('_' + '_'.join(suffix)) if suffix else ''
fname = f"{ts}_{label}{suffixStr}.txt"
fpath = os.path.join(outDir, fname)
with open(fpath, 'w', encoding='utf-8') as f:
f.write(content or '')
except Exception:
# Do not raise; best-effort debug write
pass
def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool:
"""
Check if text exceeds model token limit with safety margin.
"""
# Simple character-based estimation (4 chars per token)
estimated_tokens = len(text) // 4
max_tokens = int(model.maxTokens * (1 - safety_margin))
return estimated_tokens > max_tokens
def _reducePlanningPrompt(
self,
full_prompt: str,
placeholders: Optional[Dict[str, str]],
model: ModelCapabilities,
options: AiCallOptions
) -> str:
"""
Reduce planning prompt size by summarizing placeholders while preserving prompt structure.
"""
if not placeholders:
return self._reduceText(full_prompt, 0.7)
# Reduce placeholders while preserving prompt
reduced_placeholders = {}
for placeholder, content in placeholders.items():
if len(content) > 1000: # Only reduce long content
reduction_factor = 0.7
reduced_content = self._reduceText(content, reduction_factor)
reduced_placeholders[placeholder] = reduced_content
else:
reduced_placeholders[placeholder] = content
return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders)
def _reduceTextPrompt(
self,
prompt: str,
context: str,
model: ModelCapabilities,
options: AiCallOptions
) -> str:
"""
Reduce text prompt size using typeGroup-aware chunking and merging.
"""
max_size = int(model.maxTokens * (1 - options.safetyMargin))
if options.compressPrompt:
# Reduce both prompt and context
target_size = max_size
current_size = len(prompt) + len(context)
reduction_factor = (target_size * 0.7) / current_size
if reduction_factor < 1.0:
prompt = self._reduceText(prompt, reduction_factor)
context = self._reduceText(context, reduction_factor)
else:
# Only reduce context, preserve prompt integrity
max_context_size = max_size - len(prompt)
if len(context) > max_context_size:
reduction_factor = max_context_size / len(context)
context = self._reduceText(context, reduction_factor)
return prompt + "\n\n" + context if context else prompt
def _extractTextFromContentParts(self, extracted_content) -> str:
"""
Extract text content from ExtractionService ContentPart objects.
"""
if not extracted_content or not hasattr(extracted_content, 'parts'):
return ""
text_parts = []
for part in extracted_content.parts:
if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']:
if hasattr(part, 'data') and part.data:
text_parts.append(part.data)
return "\n\n".join(text_parts)
def _reduceText(self, text: str, reduction_factor: float) -> str:
"""
Reduce text size by the specified factor.
"""
if reduction_factor >= 1.0:
return text
target_length = int(len(text) * reduction_factor)
return text[:target_length] + "... [reduced]"
async def _analyzePromptIntent(self, prompt: str, ai_service=None) -> Dict[str, Any]:
"""Use AI to analyze user prompt and determine processing requirements."""
if not ai_service:
return {"is_multi_file": False, "strategy": "single", "criteria": None}
try:
analysis_prompt = f"""
Analyze this user request and determine if it requires multiple file output or single file output.
User request: "{prompt}"
Respond with JSON only in this exact format:
{{
"is_multi_file": true/false,
"strategy": "single|per_entity|by_section|by_criteria|custom",
"criteria": "description of how to split content",
"file_naming_pattern": "suggested pattern for filenames",
"reasoning": "brief explanation of the analysis"
}}
Consider:
- Does the user want separate files for different entities (customers, products, etc.)?
- Does the user want to split content into multiple documents?
- What would be the most logical way to organize the content?
- What language is the request in? (analyze in the original language)
Return only the JSON response.
"""
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationType
request_options = AiCallOptions()
request_options.operationType = OperationType.GENERAL
request = AiCallRequest(prompt=analysis_prompt, context="", options=request_options)
response = await ai_service.aiObjects.call(request)
if response and response.content:
import json
import re
# Extract JSON from response
result = response.content.strip()
json_match = re.search(r'\{.*\}', result, re.DOTALL)
if json_match:
result = json_match.group(0)
analysis = json.loads(result)
return analysis
else:
return {"is_multi_file": False, "strategy": "single", "criteria": None}
except Exception as e:
logger.warning(f"AI prompt analysis failed: {str(e)}, defaulting to single file")
return {"is_multi_file": False, "strategy": "single", "criteria": None}
def _validateResponseStructure(self, response: Dict[str, Any], prompt_analysis: Dict[str, Any]) -> bool:
"""Validate that AI response matches the expected structure."""
try:
if not isinstance(response, dict):
logger.warning(f"Response validation failed: Response is not a dict, got {type(response)}")
return False
# Check for multi-file structure
if prompt_analysis.get("is_multi_file", False):
has_documents = "documents" in response
is_documents_list = isinstance(response.get("documents"), list)
logger.info(f"Multi-file validation: has_documents={has_documents}, is_documents_list={is_documents_list}")
if has_documents and is_documents_list:
logger.info(f"Multi-file validation passed: {len(response['documents'])} documents found")
else:
logger.warning(f"Multi-file validation failed: documents key present={has_documents}, documents is list={is_documents_list}")
logger.warning(f"Available keys: {list(response.keys())}")
return has_documents and is_documents_list
else:
has_sections = "sections" in response
is_sections_list = isinstance(response.get("sections"), list)
logger.info(f"Single-file validation: has_sections={has_sections}, is_sections_list={is_sections_list}")
return has_sections and is_sections_list
except Exception as e:
logger.warning(f"Response validation failed with exception: {str(e)}")
return False
async def _callAiWithDocumentGeneration(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions,
outputFormat: str,
title: Optional[str]
) -> Dict[str, Any]:
"""
Handle AI calls with document generation in specific output format.
Now supports both single-file and multi-file generation.
Args:
prompt: The main prompt for the AI call
documents: Optional list of documents to process
options: AI call configuration options
outputFormat: Target output format (html, pdf, docx, txt, md, json, csv, xlsx)
title: Optional title for generated documents
Returns:
Dict with generated documents and metadata
"""
try:
# Use AI to analyze prompt intent
prompt_analysis = await self._analyzePromptIntent(prompt, self)
logger.info(f"Prompt analysis result: {prompt_analysis}")
if prompt_analysis.get("is_multi_file", False):
return await self._callAiWithMultiFileGeneration(
prompt, documents, options, outputFormat, title, prompt_analysis
)
else:
return await self._callAiWithSingleFileGeneration(
prompt, documents, options, outputFormat, title
)
except Exception as e:
logger.error(f"Error in document generation: {str(e)}")
return {
"success": False,
"error": str(e),
"content": "",
"rendered_content": "",
"mime_type": "text/plain",
"filename": f"error_{outputFormat}",
"format": outputFormat,
"title": title or "Error",
"documents": []
}
async def _callAiWithSingleFileGeneration(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions,
outputFormat: str,
title: Optional[str]
) -> Dict[str, Any]:
"""Handle single-file document generation (existing functionality)."""
try:
# Get format-specific extraction prompt from generation service
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generation_service = GenerationService(self.services)
# Use default title if not provided
if not title:
title = "AI Generated Document"
# Get format-specific extraction prompt
extractionPrompt = await generation_service.getExtractionPrompt(
outputFormat=outputFormat,
userPrompt=prompt,
title=title,
aiService=self
)
# Process documents with format-specific prompt using JSON mode
# This ensures structured JSON output instead of text
aiResponseJson = await self._callAiJson(extractionPrompt, documents, options)
# Validate JSON response
if not isinstance(aiResponseJson, dict) or "sections" not in aiResponseJson:
raise Exception("AI response is not valid JSON document structure")
# Generate filename from document metadata
parsedFilename = None
try:
if aiResponseJson.get("metadata", {}).get("title"):
title = aiResponseJson["metadata"]["title"]
# Clean title for filename
import re
parsed = re.sub(r"[^a-zA-Z0-9._-]", "-", title)
parsed = re.sub(r"-+", "-", parsed).strip('-')
if parsed:
parsedFilename = f"{parsed}.{outputFormat}"
except Exception:
parsedFilename = None
# Render the JSON content to the specified format
renderedContent, mimeType = await generation_service.renderReport(
extractedContent=aiResponseJson,
outputFormat=outputFormat,
title=title,
userPrompt=prompt,
aiService=self
)
# Generate meaningful filename (use AI-provided if valid, else fallback)
from datetime import datetime, UTC
timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
if parsedFilename and parsedFilename.lower().endswith(f".{outputFormat.lower()}"):
filename = parsedFilename
else:
safeTitle = ''.join(c if c.isalnum() else '-' for c in (title or 'document')).strip('-')
filename = f"{safeTitle or 'document'}-{timestamp}.{outputFormat}"
# Return structured result with document information
return {
"success": True,
"content": aiResponseJson, # Structured JSON document
"rendered_content": renderedContent, # Formatted content
"mime_type": mimeType,
"filename": filename,
"format": outputFormat,
"title": title,
"documents": [{
"documentName": filename,
"documentData": renderedContent,
"mimeType": mimeType
}],
"is_multi_file": False
}
except Exception as e:
logger.error(f"Error in single-file document generation: {str(e)}")
raise
async def _callAiWithMultiFileGeneration(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions,
outputFormat: str,
title: Optional[str],
prompt_analysis: Dict[str, Any]
) -> Dict[str, Any]:
"""Handle multi-file document generation using AI analysis."""
try:
# Get multi-file extraction prompt based on AI analysis
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generation_service = GenerationService(self.services)
# Use default title if not provided
if not title:
title = "AI Generated Documents"
# Get adaptive extraction prompt
extraction_prompt = await generation_service.getAdaptiveExtractionPrompt(
outputFormat=outputFormat,
userPrompt=prompt,
title=title,
promptAnalysis=prompt_analysis,
aiService=self
)
logger.info(f"Adaptive extraction prompt length: {len(extraction_prompt)} characters")
logger.debug(f"Adaptive extraction prompt preview: {extraction_prompt[:500]}...")
# Process with adaptive JSON schema - use the existing pipeline but with adaptive prompt
logger.info(f"Using adaptive prompt with existing pipeline: {len(extraction_prompt)} chars")
logger.debug(f"Processing documents: {len(documents) if documents else 0} documents")
# Use the existing pipeline but replace the prompt with our adaptive one
# This ensures proper document processing while using the multi-file prompt
ai_response = await self._processDocumentsPerChunkJsonWithPrompt(documents, extraction_prompt, options)
logger.info(f"AI response type: {type(ai_response)}")
logger.info(f"AI response keys: {list(ai_response.keys()) if isinstance(ai_response, dict) else 'Not a dict'}")
logger.debug(f"AI response preview: {str(ai_response)[:500]}...")
# Validate response structure
if not self._validateResponseStructure(ai_response, prompt_analysis):
# Fallback to single-file if multi-file fails
logger.warning(f"Multi-file processing failed - Invalid response structure. Expected multi-file but got: {list(ai_response.keys()) if isinstance(ai_response, dict) else type(ai_response)}")
logger.warning(f"Prompt analysis: {prompt_analysis}")
logger.warning("Falling back to single-file generation")
return await self._callAiWithSingleFileGeneration(
prompt, documents, options, outputFormat, title
)
# Process multiple documents
generated_documents = []
for i, doc_data in enumerate(ai_response.get("documents", [])):
# Transform AI-generated sections to renderer-compatible format
transformed_sections = []
for section in doc_data.get("sections", []):
# Convert AI format to renderer format
transformed_section = {
"id": section.get("id", f"section_{len(transformed_sections) + 1}"),
"type": section.get("content_type", "paragraph"),
"data": {
"text": "",
"elements": section.get("elements", [])
},
"order": section.get("order", len(transformed_sections) + 1)
}
# Extract text from elements for simple text-based sections
if section.get("content_type") in ["paragraph", "heading"]:
text_parts = []
for element in section.get("elements", []):
if "text" in element:
text_parts.append(element["text"])
transformed_section["data"]["text"] = "\n".join(text_parts)
transformed_sections.append(transformed_section)
# Create complete document structure for rendering
complete_document = {
"metadata": {
"title": doc_data["title"],
"source_document": "multi_file_generation",
"document_id": doc_data.get("id", f"doc_{i+1}"),
"filename": doc_data.get("filename", f"document_{i+1}"),
"split_strategy": prompt_analysis.get("strategy", "custom")
},
"sections": transformed_sections,
"summary": f"Generated document: {doc_data['title']}",
"tags": ["multi_file", "ai_generated"]
}
rendered_content, mime_type = await generation_service.renderReport(
extractedContent=complete_document,
outputFormat=outputFormat,
title=doc_data["title"],
userPrompt=prompt,
aiService=self
)
# Generate proper filename with correct extension
base_filename = doc_data.get("filename", f"document_{i+1}")
# Remove any existing extension and add the correct one
if '.' in base_filename:
base_filename = base_filename.rsplit('.', 1)[0]
# Add proper extension based on output format
if outputFormat.lower() == "docx":
filename = f"{base_filename}.docx"
elif outputFormat.lower() == "pdf":
filename = f"{base_filename}.pdf"
elif outputFormat.lower() == "html":
filename = f"{base_filename}.html"
else:
filename = f"{base_filename}.{outputFormat}"
generated_documents.append({
"documentName": filename,
"documentData": rendered_content,
"mimeType": mime_type
})
# Save debug files for multi-file generation - only if debug enabled
debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
if debug_enabled:
try:
import os
from datetime import datetime, UTC
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
debug_root = "./test-chat/ai"
debug_dir = os.path.join(debug_root, f"multifile_output_{ts}")
os.makedirs(debug_dir, exist_ok=True)
# Save metadata
with open(os.path.join(debug_dir, "metadata.txt"), "w", encoding="utf-8") as f:
f.write(f"title: {title}\n")
f.write(f"format: {outputFormat}\n")
f.write(f"documents_count: {len(generated_documents)}\n")
f.write(f"split_strategy: {prompt_analysis.get('strategy', 'custom')}\n")
f.write(f"prompt_analysis: {prompt_analysis}\n")
# Save each generated document
for i, doc in enumerate(generated_documents):
doc_filename = doc["documentName"]
doc_data = doc["documentData"]
doc_mime = doc["mimeType"]
# Determine file extension
if outputFormat.lower() == "docx":
file_ext = ".docx"
elif outputFormat.lower() == "pdf":
file_ext = ".pdf"
elif outputFormat.lower() == "html":
file_ext = ".html"
else:
file_ext = f".{outputFormat}"
# Save the rendered document
output_path = os.path.join(debug_dir, f"document_{i+1}_{doc_filename}")
if file_ext in ['.md', '.txt', '.html', '.json', '.csv']:
# Text-based formats
with open(output_path, 'w', encoding='utf-8') as f:
f.write(doc_data)
else:
# Binary formats - decode from base64 if needed
try:
import base64
doc_bytes = base64.b64decode(doc_data)
with open(output_path, 'wb') as f:
f.write(doc_bytes)
except Exception:
# If not base64, save as text
with open(output_path, 'w', encoding='utf-8') as f:
f.write(doc_data)
logger.info(f"💾 Debug: Saved multi-file document {i+1}: {output_path}")
logger.info(f"💾 Debug: Multi-file output saved to: {debug_dir}")
except Exception as e:
logger.warning(f"Failed to save multi-file debug output: {e}")
return {
"success": True,
"content": ai_response,
"rendered_content": None, # Not applicable for multi-file
"mime_type": None, # Not applicable for multi-file
"filename": None, # Not applicable for multi-file
"format": outputFormat,
"title": title,
"documents": generated_documents,
"is_multi_file": True,
"split_strategy": prompt_analysis.get("strategy", "custom")
}
except Exception as e:
logger.error(f"Error in multi-file document generation: {str(e)}")
# Fallback to single-file
return await self._callAiWithSingleFileGeneration(
prompt, documents, options, outputFormat, title
)