gateway/modules/services/serviceAi/mainServiceAi.py
2025-10-06 12:53:47 +02:00

1395 lines
64 KiB
Python

import logging
from typing import Dict, Any, List, Optional, Tuple, Union
from modules.datamodels.datamodelChat import PromptPlaceholder
from modules.datamodels.datamodelChat import ChatDocument
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority
from modules.datamodels.datamodelWeb import (
WebResearchRequest,
WebResearchActionResult,
WebResearchDocumentData,
WebResearchActionDocument,
WebSearchResultItem,
)
from modules.interfaces.interfaceAiObjects import AiObjects
logger = logging.getLogger(__name__)
# Model registry is now provided by interfaces via AiModels
class AiService:
"""Centralized AI service orchestrating documents, model selection, failover, and web operations.
"""
def __init__(self, serviceCenter=None) -> None:
"""Initialize AI service with service center access.
Args:
serviceCenter: Service center instance for accessing other services
"""
self.serviceCenter = serviceCenter
# Only depend on interfaces
self.aiObjects = None # Will be initialized in create()
self._extractionService = None # Lazy initialization
@property
def extractionService(self):
"""Lazy initialization of extraction service."""
if self._extractionService is None:
logger.info("Lazy initializing ExtractionService...")
self._extractionService = ExtractionService()
return self._extractionService
async def _ensureAiObjectsInitialized(self):
"""Ensure aiObjects is initialized."""
if self.aiObjects is None:
logger.info("Lazy initializing AiObjects...")
self.aiObjects = await AiObjects.create()
logger.info("AiObjects initialization completed")
@classmethod
async def create(cls, serviceCenter=None) -> "AiService":
"""Create AiService instance with all connectors initialized."""
logger.info("AiService.create() called")
instance = cls(serviceCenter)
logger.info("AiService created, about to call AiObjects.create()...")
instance.aiObjects = await AiObjects.create()
logger.info("AiObjects.create() completed")
return instance
# AI Image Analysis
async def readImage(
self,
prompt: str,
imageData: Union[str, bytes],
mimeType: str = None,
options: Optional[AiCallOptions] = None,
) -> str:
"""Call AI for image analysis using interface.callImage()."""
try:
return await self.aiObjects.callImage(prompt, imageData, mimeType, options)
except Exception as e:
logger.error(f"Error in AI image analysis: {str(e)}")
return f"Error: {str(e)}"
# AI Image Generation
async def generateImage(
self,
prompt: str,
size: str = "1024x1024",
quality: str = "standard",
style: str = "vivid",
options: Optional[AiCallOptions] = None,
) -> Dict[str, Any]:
"""Generate an image using AI using interface.generateImage()."""
try:
return await self.aiObjects.generateImage(prompt, size, quality, style, options)
except Exception as e:
logger.error(f"Error in AI image generation: {str(e)}")
return {"success": False, "error": str(e)}
# Web Research - Using interface functions
async def webResearch(self, request: WebResearchRequest) -> WebResearchActionResult:
"""Perform web research using interface functions."""
try:
logger.info(f"WEB RESEARCH STARTED")
logger.info(f"User Query: {request.user_prompt}")
logger.info(f"Max Results: {request.max_results}, Max Pages: {request.options.max_pages}")
# Step 1: Find relevant websites - either provided URLs or AI-determined main URLs
logger.info(f"=== STEP 1: INITIAL MAIN URLS LIST ===")
if request.urls:
# Use provided URLs as initial main URLs
websites = request.urls
logger.info(f"Using provided URLs ({len(websites)}):")
for i, url in enumerate(websites, 1):
logger.info(f" {i}. {url}")
else:
# Use AI to determine main URLs based on user's intention
logger.info(f"AI analyzing user intent: '{request.user_prompt}'")
# Use AI to generate optimized Tavily search query and search parameters
query_optimizer_prompt = f"""You are a search query optimizer.
USER QUERY: {request.user_prompt}
Your task: Create a search query and parameters for the USER QUERY given.
RULES:
1. The search query MUST be related to the user query above
2. Extract key terms from the user query
3. Determine appropriate country/language based on the query context
4. Keep search query short (2-6 words)
Return ONLY this JSON format:
{{
"user_prompt": "search query based on user query above",
"country": "country_code_or_null",
"language": "language_code_or_null",
"topic": "general|news|academic_or_null",
"time_range": "d|w|m|y_or_null",
"selection_strategy": "single|multiple|specific_page",
"selection_criteria": "what URLs to prioritize",
"expected_url_patterns": ["pattern1", "pattern2"],
"estimated_result_count": number
}}"""
# Get AI response for query optimization
ai_request = AiCallRequest(
prompt=query_optimizer_prompt,
options=AiCallOptions()
)
ai_response_obj = await self.aiObjects.call(ai_request)
ai_response = ai_response_obj.content
logger.debug(f"AI query optimizer response: {ai_response}")
# Parse AI response to extract search query
import json
try:
# Clean the response by removing markdown code blocks
cleaned_response = ai_response.strip()
if cleaned_response.startswith('```json'):
cleaned_response = cleaned_response[7:] # Remove ```json
if cleaned_response.endswith('```'):
cleaned_response = cleaned_response[:-3] # Remove ```
cleaned_response = cleaned_response.strip()
query_data = json.loads(cleaned_response)
search_query = query_data.get("user_prompt", request.user_prompt)
ai_country = query_data.get("country")
ai_language = query_data.get("language")
ai_topic = query_data.get("topic")
ai_time_range = query_data.get("time_range")
selection_strategy = query_data.get("selection_strategy", "multiple")
selection_criteria = query_data.get("selection_criteria", "relevant URLs")
expected_patterns = query_data.get("expected_url_patterns", [])
estimated_count = query_data.get("estimated_result_count", request.max_results)
logger.info(f"AI optimized search query: '{search_query}'")
logger.info(f"Selection strategy: {selection_strategy}")
logger.info(f"Selection criteria: {selection_criteria}")
logger.info(f"Expected URL patterns: {expected_patterns}")
logger.info(f"Estimated result count: {estimated_count}")
except json.JSONDecodeError:
logger.warning("Failed to parse AI response as JSON, using original query")
search_query = request.user_prompt
ai_country = None
ai_language = None
ai_topic = None
ai_time_range = None
selection_strategy = "multiple"
# Perform the web search with AI-determined parameters
search_kwargs = {
"query": search_query,
"max_results": request.max_results,
"search_depth": request.options.search_depth,
"auto_parameters": False # Use explicit parameters
}
# Add parameters only if they have valid values
if ai_country and ai_country not in ['null', '', 'none', 'undefined']:
search_kwargs["country"] = ai_country
elif request.options.country and request.options.country not in ['null', '', 'none', 'undefined']:
search_kwargs["country"] = request.options.country
if ai_language and ai_language not in ['null', '', 'none', 'undefined']:
search_kwargs["language"] = ai_language
elif request.options.language and request.options.language not in ['null', '', 'none', 'undefined']:
search_kwargs["language"] = request.options.language
if ai_topic and ai_topic in ['general', 'news', 'academic']:
search_kwargs["topic"] = ai_topic
elif request.options.topic and request.options.topic in ['general', 'news', 'academic']:
search_kwargs["topic"] = request.options.topic
if ai_time_range and ai_time_range in ['d', 'w', 'm', 'y']:
search_kwargs["time_range"] = ai_time_range
elif request.options.time_range and request.options.time_range in ['d', 'w', 'm', 'y']:
search_kwargs["time_range"] = request.options.time_range
# Log the parameters being used
logger.info(f"Search parameters: country={search_kwargs.get('country', 'not_set')}, language={search_kwargs.get('language', 'not_set')}, topic={search_kwargs.get('topic', 'not_set')}, time_range={search_kwargs.get('time_range', 'not_set')}")
search_results = await self.aiObjects.search_websites(**search_kwargs)
logger.debug(f"Web search returned {len(search_results)} results:")
for i, result in enumerate(search_results, 1):
logger.debug(f" {i}. {result.url} - {result.title}")
# Deduplicate while preserving order
seen = set()
search_urls = []
for r in search_results:
u = str(r.url)
if u not in seen:
seen.add(u)
search_urls.append(u)
if not search_urls:
logger.error("No relevant websites found")
return WebResearchActionResult(success=False, error="No relevant websites found")
# Now use AI to determine the main URLs based on user's intention
logger.info(f"AI selecting main URLs from {len(search_urls)} search results based on user intent")
# Create a prompt for AI to identify main URLs based on user's intention
ai_prompt = f"""
Select the most relevant URLs from these search results:
{chr(10).join([f"{i+1}. {url}" for i, url in enumerate(search_urls)])}
Return only the URLs that are most relevant for the user's query.
One URL per line.
"""
# Create AI call request
ai_request = AiCallRequest(
prompt=ai_prompt,
options=AiCallOptions()
)
ai_response_obj = await self.aiObjects.call(ai_request)
ai_response = ai_response_obj.content
logger.debug(f"AI response for main URL selection: {ai_response}")
# Parse AI response to extract URLs
websites = []
for line in ai_response.strip().split('\n'):
line = line.strip()
if line and ('http://' in line or 'https://' in line):
# Extract URL from the line
for word in line.split():
if word.startswith('http://') or word.startswith('https://'):
websites.append(word.rstrip('.,;'))
break
if not websites:
logger.warning("AI did not identify any main URLs, using first few search results")
websites = search_urls[:3] # Fallback to first 3 search results
# Deduplicate while preserving order
seen = set()
unique_websites = []
for url in websites:
if url not in seen:
seen.add(url)
unique_websites.append(url)
websites = unique_websites
logger.info(f"AI selected {len(websites)} main URLs (after deduplication):")
for i, url in enumerate(websites, 1):
logger.info(f" {i}. {url}")
# Step 2: Smart website selection using AI interface
logger.info(f"=== STEP 2: FILTERED URL LIST BY USER PROMPT'S INTENTION ===")
logger.info(f"AI analyzing {len(websites)} URLs for relevance to: '{request.user_prompt}'")
selectedWebsites, aiResponse = await self.aiObjects.selectRelevantWebsites(websites, request.user_prompt)
logger.debug(f"AI Response: {aiResponse}")
logger.debug(f"AI selected {len(selectedWebsites)} most relevant URLs:")
for i, url in enumerate(selectedWebsites, 1):
logger.debug(f" {i}. {url}")
# Show which were filtered out
filtered_out = [url for url in websites if url not in selectedWebsites]
if filtered_out:
logger.debug(f"Filtered out {len(filtered_out)} less relevant URLs:")
for i, url in enumerate(filtered_out, 1):
logger.debug(f" {i}. {url}")
# Step 3+4+5: Recursive crawling with configurable depth
logger.info(f"=== STEP 3+4+5: RECURSIVE CRAWLING (DEPTH {request.options.pages_search_depth}) ===")
logger.info(f"Starting recursive crawl of {len(selectedWebsites)} main websites...")
logger.info(f"Search depth: {request.options.pages_search_depth} levels")
logger.info(f"DEBUG: request.options.pages_search_depth = {request.options.pages_search_depth}")
# Use recursive crawling with URL index to avoid duplicates
allContent = await self.aiObjects.crawlRecursively(
urls=selectedWebsites,
max_depth=request.options.pages_search_depth,
extract_depth=request.options.extract_depth,
max_per_domain=10
)
if not allContent:
logger.error("Could not extract content from any websites")
return WebResearchActionResult(success=False, error="Could not extract content from any websites")
logger.info(f"=== WEB RESEARCH COMPLETED ===")
logger.info(f"Successfully crawled {len(allContent)} URLs total")
logger.info(f"Crawl depth: {request.options.pages_search_depth} levels")
# Create simple result with raw content
sources = [WebSearchResultItem(title=url, url=url) for url in selectedWebsites]
# Get all additional links (all URLs except main ones)
additional_links = [url for url in allContent.keys() if url not in selectedWebsites]
# Combine all content into a single result
combinedContent = ""
for url, content in allContent.items():
combinedContent += f"\n\n=== {url} ===\n{content}\n"
documentData = WebResearchDocumentData(
user_prompt=request.user_prompt,
websites_analyzed=len(allContent),
additional_links_found=len(additional_links),
analysis_result=combinedContent, # Raw content, no analysis
sources=sources,
additional_links=additional_links,
individual_content=allContent, # Individual URL -> content mapping
debug_info={
"crawl_depth": request.options.pages_search_depth,
"total_urls_crawled": len(allContent),
"main_urls": len(selectedWebsites),
"additional_urls": len(additional_links)
}
)
document = WebResearchActionDocument(
documentName=f"web_research_{request.user_prompt[:50]}.json",
documentData=documentData,
mimeType="application/json"
)
return WebResearchActionResult(
success=True,
documents=[document],
resultLabel="web_research_results"
)
except Exception as e:
logger.error(f"Error in web research: {str(e)}")
return WebResearchActionResult(success=False, error=str(e))
async def _processDocumentsForAi(
self,
documents: List[ChatDocument],
operationType: str,
compressDocuments: bool,
processIndividually: bool,
userPrompt: str,
options: Optional[AiCallOptions] = None
) -> str:
if not documents:
return ""
# Calculate model-derived size limits
maxContextBytes = self._calculateMaxContextBytes(options)
# Build extraction options with model-derived limits
extractionOptions: Dict[str, Any] = {
"prompt": f"Extract content that supports the user's request: '{userPrompt}'. Focus on information relevant to: {operationType}",
"operationType": operationType,
"processDocumentsIndividually": processIndividually,
"maxSize": maxContextBytes,
"chunkAllowed": not options.compressContext if options else True,
"textChunkSize": int(maxContextBytes * 0.3), # 30% of max for text chunks
"imageChunkSize": int(maxContextBytes * 0.5), # 50% of max for image chunks
"imageMaxPixels": 1024 * 1024, # 1MP default
"imageQuality": 85,
"mergeStrategy": {
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
},
}
processedContents: List[str] = []
try:
# Use new ChatDocument-based API
logger.info(f"=== PROCESSING {len(documents)} DOCUMENTS FOR AI ===")
for i, doc in enumerate(documents):
logger.info(f"Document {i}: {doc.fileName} (MIME: {doc.mimeType})")
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
logger.info(f"Extraction completed: {len(extractionResult)} results")
async def _partsToText(parts, documentName: str, documentType: str, logger_ref) -> str:
lines: List[str] = []
logger_ref.debug(f"Processing {len(parts)} content parts for {documentName}")
for p in parts:
logger_ref.debug(f" Part: {p.typeGroup} ({p.mimeType}) - {len(p.data) if p.data else 0} chars")
if p.typeGroup in ("text", "table", "structure") and p.data and isinstance(p.data, str):
lines.append(p.data)
elif p.typeGroup == "image" and p.data:
# Use AI to extract text from image with user prompt
logger_ref.debug(f" Processing image with AI using user prompt...")
try:
imageResult = await self.aiObjects.callImage(
prompt=userPrompt,
imageData=p.data,
mimeType=p.mimeType
)
lines.append(f"[Image Analysis]: {imageResult}")
logger_ref.debug(f" AI image analysis completed: {len(imageResult)} chars")
except Exception as e:
logger_ref.warning(f" AI image processing failed: {e}")
lines.append(f"[Image Analysis Failed]: {str(e)}")
return "\n\n".join(lines)
if isinstance(extractionResult, list):
for i, ec in enumerate(extractionResult):
try:
# Get document info for this extraction result
doc = documents[i] if i < len(documents) else None
docName = doc.fileName if doc else f"Document_{i}"
docType = doc.mimeType if doc else "unknown"
contentText = await _partsToText(ec.parts, docName, docType, logger)
logger.debug(f"Document {i} content: {len(contentText)} chars")
if compressDocuments and len(contentText.encode("utf-8")) > 10000:
originalLength = len(contentText)
contentText = await self._compressContent(contentText, 10000, "document")
logger.debug(f"Document {i} compressed: {originalLength} -> {len(contentText)} chars")
processedContents.append(contentText)
except Exception as e:
logger.warning(f"Error aggregating extracted content: {str(e)}")
processedContents.append("[Error aggregating content]")
else:
# Fallback: no content
contentText = ""
if compressDocuments and len(contentText.encode("utf-8")) > 10000:
contentText = await self._compressContent(contentText, 10000, "document")
processedContents.append(contentText)
except Exception as e:
logger.warning(f"Error during extraction: {str(e)}")
processedContents.append("[Error during extraction]")
# Build JSON structure ONLY when adding to AI prompt
import json
documentsJson = []
for i, content in enumerate(processedContents):
doc = documents[i] if i < len(documents) else None
docName = doc.fileName if doc else f"Document_{i}"
docType = doc.mimeType if doc else "unknown"
documentData = {
"documentName": docName,
"documentType": docType,
"content": content
}
documentsJson.append(documentData)
finalContext = json.dumps({
"documents": documentsJson,
"totalDocuments": len(documentsJson)
}, indent=2, ensure_ascii=False)
logger.debug(f"=== FINAL CONTEXT ===")
logger.debug(f"Total context: {len(finalContext)} chars")
logger.debug(f"Documents: {len(documentsJson)}")
return finalContext
def _calculateMaxContextBytes(self, options: Optional[AiCallOptions]) -> int:
"""Calculate maximum context bytes based on model capabilities and options."""
if options and options.maxContextBytes:
return options.maxContextBytes
# Default model capabilities (this should be enhanced with actual model registry)
defaultMaxTokens = 4000
safetyMargin = options.safetyMargin if options else 0.1
# Calculate bytes (4 chars per token estimation)
maxContextBytes = int(defaultMaxTokens * (1 - safetyMargin) * 4)
return maxContextBytes
async def _processDocumentsPerChunk(
self,
documents: List[ChatDocument],
prompt: str,
options: Optional[AiCallOptions] = None
) -> str:
"""
Process documents with per-chunk AI calls and merge results.
Args:
documents: List of ChatDocument objects to process
prompt: AI prompt for processing
options: AI call options
Returns:
Merged AI results as string
"""
if not documents:
return ""
# Get model capabilities for size calculation
model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options)
# Build extraction options for chunking
extractionOptions: Dict[str, Any] = {
"prompt": prompt,
"operationType": options.operationType if options else "general",
"processDocumentsIndividually": True, # Process each document separately
"maxSize": model_capabilities["maxContextBytes"],
"chunkAllowed": True,
"textChunkSize": model_capabilities["textChunkSize"],
"imageChunkSize": model_capabilities["imageChunkSize"],
"imageMaxPixels": 1024 * 1024,
"imageQuality": 85,
"mergeStrategy": {
"groupBy": "typeGroup",
"orderBy": "id",
"mergeType": "concatenate"
},
}
logger.debug(f"Per-chunk extraction options: {extractionOptions}")
try:
# Extract content with chunking
extractionResult = self.extractionService.extractContent(documents, extractionOptions)
if not isinstance(extractionResult, list):
return "[Error: No extraction results]"
# Prepare debug directory TODO TO REMOVE
import os
from datetime import datetime
debug_root = "./test-chat/extraction"
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
debug_dir = os.path.join(debug_root, f"per_chunk_{ts}")
try:
os.makedirs(debug_dir, exist_ok=True)
except Exception:
pass
# Process each chunk with AI
aiResults: List[str] = []
for ec in extractionResult:
for part in ec.parts:
if part.typeGroup == "image":
# Process image with AI
try:
# Safety check for part.data
if not hasattr(part, 'data') or part.data is None:
logger.warning(f"Skipping image chunk with no data")
continue
aiResult = await self.readImage(
prompt=prompt,
imageData=part.data,
mimeType=part.mimeType,
options=options
)
aiResults.append(aiResult)
except Exception as e:
logger.warning(f"Error processing image chunk: {str(e)}")
aiResults.append(f"[Error processing image: {str(e)}]")
elif part.typeGroup in ("text", "table", "structure"):
# Process text content with AI
try:
# Safety check for part.data
if not hasattr(part, 'data') or part.data is None:
logger.warning(f"Skipping chunk with no data")
continue
logger.info(f"=== PROCESSING CHUNK {len(aiResults) + 1} ===")
logger.info(f"Chunk size: {len(part.data)} chars")
logger.info(f"Chunk preview: {part.data[:200]}...")
# Dump input chunk
try:
idx = len(aiResults) + 1
fpath = os.path.join(debug_dir, f"chunk_{idx:03d}_input.txt")
with open(fpath, "w", encoding="utf-8") as f:
f.write(str(part.data))
except Exception:
pass
# Create AI call request for this chunk
request = AiCallRequest(
prompt=prompt,
context=part.data,
options=options
)
# Make the call using AiObjects
response = await self.aiObjects.call(request)
aiResults.append(response.content)
logger.info(f"Chunk {len(aiResults)} processed: {len(response.content)} chars response")
# Dump AI response
try:
idx = len(aiResults)
fpath = os.path.join(debug_dir, f"chunk_{idx:03d}_response.txt")
with open(fpath, "w", encoding="utf-8") as f:
f.write(str(response.content))
except Exception:
pass
except Exception as e:
logger.warning(f"Error processing text chunk: {str(e)}")
aiResults.append(f"[Error processing text: {str(e)}]")
# Merge AI results using ExtractionService
from modules.datamodels.datamodelExtraction import MergeStrategy
mergeStrategy = MergeStrategy(
groupBy="typeGroup",
orderBy="id",
mergeType="concatenate",
chunkSeparator="\n\n---\n\n"
)
mergedContent = self.extractionService.mergeAiResults(
extractionResult,
aiResults,
mergeStrategy
)
# Extract only AI-generated text from merged content
resultText = ""
for part in mergedContent.parts:
if (
part.typeGroup in ("text", "table", "structure")
and part.data
and getattr(part, "metadata", {}).get("aiResult", False)
):
resultText += part.data + "\n\n"
# Dump merged output
try:
fpath = os.path.join(debug_dir, "merged_output.txt")
with open(fpath, "w", encoding="utf-8") as f:
f.write(resultText.strip())
except Exception:
pass
return resultText.strip()
except Exception as e:
logger.error(f"Error in per-chunk processing: {str(e)}")
return f"[Error in per-chunk processing: {str(e)}]"
async def _compressContent(self, content: str, targetSize: int, contentType: str) -> str:
if len(content.encode("utf-8")) <= targetSize:
return content
try:
compressionPrompt = f"""
Komprimiere den folgenden {contentType} auf maximal {targetSize} Zeichen,
behalte aber alle wichtigen Informationen bei:
{content}
Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
"""
# Service must not call connectors directly; use simple truncation fallback here
data = content.encode("utf-8")
return data[:targetSize].decode("utf-8", errors="ignore") + "... [truncated]"
except Exception as e:
logger.warning(f"AI compression failed, using truncation: {str(e)}")
return content[:targetSize] + "... [truncated]"
# ===== DYNAMIC GENERIC AI CALLS IMPLEMENTATION =====
async def callAi(
self,
prompt: str,
documents: Optional[List[ChatDocument]] = None,
placeholders: Optional[List[PromptPlaceholder]] = None,
options: Optional[AiCallOptions] = None,
outputFormat: Optional[str] = None,
title: Optional[str] = None
) -> Union[str, Dict[str, Any]]:
"""
Unified AI call interface that automatically routes to appropriate handler.
Args:
prompt: The main prompt for the AI call
documents: Optional list of documents to process
placeholders: Optional list of placeholder replacements for planning calls
options: AI call configuration options
outputFormat: Optional output format (html, pdf, docx, txt, md, json, csv, xlsx) for document generation
title: Optional title for generated documents
Returns:
AI response as string, or dict with documents if outputFormat is specified
Raises:
Exception: If all available models fail
"""
# Ensure aiObjects is initialized
await self._ensureAiObjectsInitialized()
if options is None:
options = AiCallOptions()
# Normalize placeholders from List[PromptPlaceholder]
placeholders_dict: Dict[str, str] = {}
placeholders_meta: Dict[str, bool] = {}
if placeholders:
placeholders_dict = {p.label: p.content for p in placeholders}
placeholders_meta = {p.label: bool(getattr(p, 'summaryAllowed', False)) for p in placeholders}
# Auto-determine call type based on documents and operation type
call_type = self._determineCallType(documents, options.operationType)
options.callType = call_type
# Handle document generation with specific output format
if outputFormat:
return await self._callAiWithDocumentGeneration(prompt, documents, options, outputFormat, title)
if call_type == "planning":
return await self._callAiPlanning(prompt, placeholders_dict, placeholders_meta, options)
else:
# Set processDocumentsIndividually from the legacy parameter if not set in options
if options.processDocumentsIndividually is None and documents:
options.processDocumentsIndividually = False # Default to batch processing
return await self._callAiText(prompt, documents, options)
def _determineCallType(self, documents: Optional[List[ChatDocument]], operation_type: str) -> str:
"""
Determine call type based on documents and operation type.
Criteria: no documents AND (operationType is "generate_plan" or "analyse_content") -> planning
"""
has_documents = documents is not None and len(documents) > 0
is_planning_operation = operation_type in [OperationType.GENERATE_PLAN, OperationType.ANALYSE_CONTENT]
if not has_documents and is_planning_operation:
return "planning"
else:
return "text"
async def _callAiPlanning(
self,
prompt: str,
placeholders: Optional[Dict[str, str]],
placeholdersMeta: Optional[Dict[str, bool]],
options: AiCallOptions
) -> str:
"""
Handle planning calls with placeholder system and selective summarization.
"""
# Ensure aiObjects is initialized
await self._ensureAiObjectsInitialized()
# Build full prompt with placeholders; if too large, summarize summaryAllowed placeholders proportionally
effective_placeholders = placeholders or {}
full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders)
if options.compressPrompt and placeholdersMeta:
# Determine model capacity
try:
caps = self._getModelCapabilitiesForContent(full_prompt, None, options)
max_bytes = caps.get("maxContextBytes", len(full_prompt.encode("utf-8")))
except Exception:
max_bytes = len(full_prompt.encode("utf-8"))
current_bytes = len(full_prompt.encode("utf-8"))
if current_bytes > max_bytes:
# Compute total bytes contributed by allowed placeholders (approximate by content length)
allowed_labels = [l for l, allow in placeholdersMeta.items() if allow]
allowed_sizes = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels}
total_allowed = sum(allowed_sizes.values())
overage = current_bytes - max_bytes
if total_allowed > 0 and overage > 0:
# Target total for allowed after reduction
target_allowed = max(total_allowed - overage, 0)
# Global ratio to apply across allowed placeholders
ratio = target_allowed / total_allowed if total_allowed > 0 else 1.0
ratio = max(0.0, min(1.0, ratio))
reduced: Dict[str, str] = {}
for label, content in effective_placeholders.items():
if label in allowed_labels and isinstance(content, str) and len(content) > 0:
old_len = len(content)
# Reduce by proportional ratio on characters (fallback if empty)
reduction_factor = ratio if old_len > 0 else 1.0
reduced[label] = self._reduceText(content, reduction_factor)
else:
reduced[label] = content
effective_placeholders = reduced
full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders)
# If still slightly over, perform a second-pass fine adjustment with updated ratio
current_bytes = len(full_prompt.encode("utf-8"))
if current_bytes > max_bytes and total_allowed > 0:
overage2 = current_bytes - max_bytes
# Recompute allowed sizes after first reduction
allowed_sizes2 = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels}
total_allowed2 = sum(allowed_sizes2.values())
if total_allowed2 > 0 and overage2 > 0:
target_allowed2 = max(total_allowed2 - overage2, 0)
ratio2 = target_allowed2 / total_allowed2
ratio2 = max(0.0, min(1.0, ratio2))
reduced2: Dict[str, str] = {}
for label, content in effective_placeholders.items():
if label in allowed_labels and isinstance(content, str) and len(content) > 0:
old_len = len(content)
reduction_factor = ratio2 if old_len > 0 else 1.0
reduced2[label] = self._reduceText(content, reduction_factor)
else:
reduced2[label] = content
effective_placeholders = reduced2
full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders)
# Make AI call using AiObjects (let it handle model selection)
request = AiCallRequest(
prompt=full_prompt,
context="", # Context is already included in the prompt
options=options
)
response = await self.aiObjects.call(request)
try:
logger.debug(f"AI model selected (planning): {getattr(response, 'modelName', 'unknown')}")
except Exception:
pass
# Write full planning response as JSON dump when possible (no duplicates)
try:
import json
content = response.content
cleaned = content.strip()
if cleaned.startswith('```json'):
cleaned = cleaned[7:]
if cleaned.endswith('```'):
cleaned = cleaned[:-3]
cleaned = cleaned.strip()
obj = json.loads(cleaned)
self._writeTraceLog("AI Planning Raw Response", obj)
except Exception:
# Fallback to plain text once
try:
self._writeTraceLog("AI Planning Raw Response", response.content)
except Exception:
pass
return response.content
async def _callAiText(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions
) -> str:
"""
Handle text calls with document processing through ExtractionService.
"""
# Ensure aiObjects is initialized
await self._ensureAiObjectsInitialized()
# Determine processing strategy based on options
if options.processDocumentsIndividually and documents:
# Use per-chunk processing for individual document processing
return await self._processDocumentsPerChunk(documents, prompt, options)
# Check if we need chunking - if so, use per-chunk processing
if documents and not options.compressContext:
# Get model capabilities to check if chunking will be needed
model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options)
total_doc_size = sum(doc.fileSize or 0 for doc in documents)
if total_doc_size > model_capabilities["maxContextBytes"]:
logger.info(f"Document size ({total_doc_size}) exceeds model capacity ({model_capabilities['maxContextBytes']}), using per-chunk processing")
return await self._processDocumentsPerChunk(documents, prompt, options)
# Extract and process documents using ExtractionService
context = ""
if documents:
logger.info(f"=== EXTRACTING CONTENT FROM {len(documents)} DOCUMENTS ===")
# Get model capabilities for size calculation
model_capabilities = self._getModelCapabilitiesForContent(prompt, documents, options)
# Use new ChatDocument-based API
extraction_options = {
"prompt": prompt,
"operationType": options.operationType,
"processDocumentsIndividually": options.processDocumentsIndividually,
"maxSize": options.maxContextBytes or model_capabilities["maxContextBytes"],
"chunkAllowed": not options.compressContext,
"textChunkSize": model_capabilities["textChunkSize"],
"imageChunkSize": model_capabilities["imageChunkSize"],
"imageMaxPixels": 1024 * 1024,
"imageQuality": 85,
"mergeStrategy": {"groupBy": "typeGroup", "orderBy": "id", "mergeType": "concatenate"}
}
logger.debug(f"Extraction options: {extraction_options}")
extracted_content = self.extractionService.extractContent(
documents=documents,
options=extraction_options
)
logger.info(f"Extraction completed: {len(extracted_content)} documents")
# Build context from list of extracted content
if isinstance(extracted_content, list):
context_parts = []
chunk_count = 0
for ec in extracted_content:
for p in ec.parts:
if p.typeGroup in ["text", "table", "structure"] and p.data:
if p.metadata.get("chunk", False):
chunk_count += 1
context_parts.append(p.data)
elif p.typeGroup == "image" and p.data:
# Process image with AI using user prompt
try:
imageResult = await self.aiObjects.callImage(
prompt=prompt,
imageData=p.data,
mimeType=p.mimeType
)
context_parts.append(f"[Image Analysis]: {imageResult}")
except Exception as e:
logger.warning(f"AI image processing failed: {e}")
context_parts.append(f"[Image Analysis Failed]: {str(e)}")
if chunk_count > 0:
logger.debug(f"=== PROCESSING CHUNKED CONTENT ===")
logger.debug(f"Total chunks: {chunk_count}")
logger.debug(f"Total context parts: {len(context_parts)}")
context = "\n\n---\n\n".join(context_parts)
else:
context = ""
# Check size and reduce if needed
full_prompt = prompt + "\n\n" + context if context else prompt
# Add generic completeness guidance: first vs subsequent (based on presence of context)
try:
if context and context.strip():
# Subsequent calls with prior context: continue next part only
full_prompt += (
"\n\nINSTRUCTIONS (COMPLETENESS):\n"
"- Continue from where the previous content ended. Do NOT repeat earlier content.\n"
"- If more parts are still needed after this response, append a final line exactly: 'CONTINUATION: true'.\n"
"- If the content is now complete, append a final line exactly: 'CONTINUATION: false'.\n"
)
else:
# First call (no prior context): deliver full content or first part
full_prompt += (
"\n\nINSTRUCTIONS (COMPLETENESS):\n"
"- Deliver the complete content. Do NOT truncate.\n"
"- If platform limits force truncation, provide the first complete section(s) only and append a final line exactly: 'CONTINUATION: true'.\n"
"- If the entire content is fully included, append a final line exactly: 'CONTINUATION: false'.\n"
)
except Exception:
# Non-fatal if any issue building guidance
pass
logger.debug(f"AI call: {len(full_prompt)} chars (prompt: {len(prompt)}, context: {len(context)})")
# Use AiObjects to select the best model and make the call
try:
# Helper to detect and strip continuation flag
import re
def _split_content_and_flag(text: str) -> (str, bool):
if not text:
return "", False
lines = text.strip().splitlines()
cont = False
# Scan last 3 lines for flag to be robust
for i in range(1, min(4, len(lines))+1):
m = re.match(r"^\s*CONTINUATION:\s*(true|false)\s*$", lines[-i].strip(), re.IGNORECASE)
if m:
cont = m.group(1).lower() == 'true'
# remove the matched flag line
del lines[-i]
break
return "\n".join(lines).strip(), cont
# First call
request = AiCallRequest(
prompt=full_prompt,
context="",
options=options
)
response = await self.aiObjects.call(request)
try:
logger.debug(f"AI model selected (text): {getattr(response, 'modelName', 'unknown')}")
except Exception:
pass
content_first = response.content or ""
merged_content, needs_more = _split_content_and_flag(content_first)
# Iteratively request next parts if flagged
max_parts = 10
part_index = 1
while needs_more and part_index < max_parts:
part_index += 1
# Build subsequent prompt with explicit continuation instructions
subsequent_prompt = (
prompt
+ "\n\nINSTRUCTIONS (CONTINUE NEXT PART ONLY):\n"
"- Continue from where the previous content ended.\n"
"- Do NOT repeat earlier content.\n"
"- Append a final line exactly: 'CONTINUATION: true' if more parts are needed, otherwise 'CONTINUATION: false'.\n"
)
next_request = AiCallRequest(
prompt=subsequent_prompt,
context=merged_content,
options=options
)
next_response = await self.aiObjects.call(next_request)
part_text = next_response.content or ""
part_clean, needs_more = _split_content_and_flag(part_text)
if part_clean:
# Separate parts clearly
merged_content = (merged_content + "\n\n" + part_clean).strip()
else:
# Avoid infinite loops on empty parts
break
logger.debug(f"=== AI RESPONSE (MERGED) ===")
logger.debug(f"Response length: {len(merged_content)} chars")
logger.debug(f"Response preview: {merged_content[:200]}...")
return merged_content
except Exception as e:
logger.error(f"AI call failed: {e}")
raise Exception(f"AI call failed: {e}")
def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions) -> Dict[str, int]:
"""
Get model capabilities for content processing, including appropriate size limits for chunking.
"""
# Estimate total content size
prompt_size = len(prompt.encode('utf-8'))
document_size = 0
if documents:
# Rough estimate of document content size
for doc in documents:
document_size += doc.fileSize or 0
total_size = prompt_size + document_size
# Use AiObjects to select the best model for this content size
# We'll simulate the model selection by checking available models
from modules.interfaces.interfaceAiObjects import aiModels
# Find the best model for this content size and operation
best_model = None
best_context_length = 0
for model_name, model_info in aiModels.items():
context_length = model_info.get("contextLength", 0)
# Skip models with no context length or too small for content
if context_length == 0:
continue
# Check if model supports the operation type
capabilities = model_info.get("capabilities", [])
if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities:
continue
elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities:
continue
elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities:
continue
elif "text_generation" not in capabilities:
continue
# Prefer models that can handle the content without chunking, but allow chunking if needed
if context_length >= total_size * 0.8: # 80% of content size
if context_length > best_context_length:
best_model = model_info
best_context_length = context_length
elif best_model is None: # Fallback to largest available model
if context_length > best_context_length:
best_model = model_info
best_context_length = context_length
# Fallback to a reasonable default if no model found
if best_model is None:
best_model = {
"contextLength": 128000, # GPT-4o default
"llmName": "gpt-4o"
}
# Calculate appropriate sizes
# Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters)
context_length_bytes = int(best_model["contextLength"] * 4)
max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length
text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks
image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks
logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}")
logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes")
logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes")
return {
"maxContextBytes": max_context_bytes,
"textChunkSize": text_chunk_size,
"imageChunkSize": image_chunk_size
}
def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]:
"""
Get models capable of handling the specific operation with capability filtering.
"""
# Use the actual AI objects model selection instead of hardcoded default
if hasattr(self, 'aiObjects') and self.aiObjects:
# Let AiObjects handle the model selection
return []
else:
# Fallback to default model if AiObjects not available
default_model = ModelCapabilities(
name="default",
maxTokens=4000,
capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"],
costPerToken=0.001,
processingTime=1.0,
isAvailable=True
)
return [default_model]
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
"""
Build full prompt by replacing placeholders with their content.
Uses the new {{KEY:placeholder}} format.
"""
if not placeholders:
return prompt
full_prompt = prompt
for placeholder, content in placeholders.items():
# Replace both old format {{placeholder}} and new format {{KEY:placeholder}}
full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content)
full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content)
return full_prompt
def _writeTraceLog(self, contextText: str, data: Any) -> None:
"""Write raw data to the central trace log file without truncation."""
try:
import os
import json
from datetime import datetime, UTC
# Only write if logger is in debug mode
if logger.level > logging.DEBUG:
return
# Get log directory from configuration via service center if possible
logDir = None
try:
if self.serviceCenter and hasattr(self.serviceCenter, 'utils'):
logDir = self.serviceCenter.utils.configGet("APP_LOGGING_LOG_DIR", "./")
except Exception:
pass
if not logDir:
logDir = "./"
if not os.path.isabs(logDir):
# Make it relative to gateway directory
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
logDir = os.path.join(gatewayDir, logDir)
os.makedirs(logDir, exist_ok=True)
traceFile = os.path.join(logDir, "log_trace.log")
timestamp = datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
traceEntry = f"[{timestamp}] {contextText}\n" + ("=" * 80) + "\n"
if data is None:
traceEntry += "No data provided\n"
else:
# Prefer exact text; if dict/list, pretty print JSON
try:
if isinstance(data, (dict, list)):
traceEntry += f"JSON Data:\n{json.dumps(data, indent=2, ensure_ascii=False)}\n"
else:
text = str(data)
traceEntry += f"Text Data:\n{text}\n"
except Exception:
traceEntry += f"Data (fallback): {str(data)}\n"
traceEntry += ("=" * 80) + "\n\n"
with open(traceFile, "a", encoding="utf-8") as f:
f.write(traceEntry)
except Exception:
# Swallow to avoid recursive logging issues
pass
def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool:
"""
Check if text exceeds model token limit with safety margin.
"""
# Simple character-based estimation (4 chars per token)
estimated_tokens = len(text) // 4
max_tokens = int(model.maxTokens * (1 - safety_margin))
return estimated_tokens > max_tokens
def _reducePlanningPrompt(
self,
full_prompt: str,
placeholders: Optional[Dict[str, str]],
model: ModelCapabilities,
options: AiCallOptions
) -> str:
"""
Reduce planning prompt size by summarizing placeholders while preserving prompt structure.
"""
if not placeholders:
return self._reduceText(full_prompt, 0.7)
# Reduce placeholders while preserving prompt
reduced_placeholders = {}
for placeholder, content in placeholders.items():
if len(content) > 1000: # Only reduce long content
reduction_factor = 0.7
reduced_content = self._reduceText(content, reduction_factor)
reduced_placeholders[placeholder] = reduced_content
else:
reduced_placeholders[placeholder] = content
return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders)
def _reduceTextPrompt(
self,
prompt: str,
context: str,
model: ModelCapabilities,
options: AiCallOptions
) -> str:
"""
Reduce text prompt size using typeGroup-aware chunking and merging.
"""
max_size = int(model.maxTokens * (1 - options.safetyMargin))
if options.compressPrompt:
# Reduce both prompt and context
target_size = max_size
current_size = len(prompt) + len(context)
reduction_factor = (target_size * 0.7) / current_size
if reduction_factor < 1.0:
prompt = self._reduceText(prompt, reduction_factor)
context = self._reduceText(context, reduction_factor)
else:
# Only reduce context, preserve prompt integrity
max_context_size = max_size - len(prompt)
if len(context) > max_context_size:
reduction_factor = max_context_size / len(context)
context = self._reduceText(context, reduction_factor)
return prompt + "\n\n" + context if context else prompt
def _extractTextFromContentParts(self, extracted_content) -> str:
"""
Extract text content from ExtractionService ContentPart objects.
"""
if not extracted_content or not hasattr(extracted_content, 'parts'):
return ""
text_parts = []
for part in extracted_content.parts:
if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']:
if hasattr(part, 'data') and part.data:
text_parts.append(part.data)
return "\n\n".join(text_parts)
def _reduceText(self, text: str, reduction_factor: float) -> str:
"""
Reduce text size by the specified factor.
"""
if reduction_factor >= 1.0:
return text
target_length = int(len(text) * reduction_factor)
return text[:target_length] + "... [reduced]"
async def _callAiWithDocumentGeneration(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions,
outputFormat: str,
title: Optional[str]
) -> Dict[str, Any]:
"""
Handle AI calls with document generation in specific output format.
Args:
prompt: The main prompt for the AI call
documents: Optional list of documents to process
options: AI call configuration options
outputFormat: Target output format (html, pdf, docx, txt, md, json, csv, xlsx)
title: Optional title for generated documents
Returns:
Dict with generated documents and metadata
"""
try:
# Get format-specific extraction prompt from generation service
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generation_service = GenerationService(self.serviceCenter)
# Use default title if not provided
if not title:
title = "AI Generated Document"
# Get format-specific extraction prompt
extraction_prompt = generation_service.getExtractionPrompt(
output_format=outputFormat,
user_prompt=prompt,
title=title
)
# Process documents with format-specific prompt
ai_response = await self._callAiText(extraction_prompt, documents, options)
if not ai_response or ai_response.strip() == "":
raise Exception("AI content generation failed")
# Render the content to the specified format
rendered_content, mime_type = await generation_service.renderReport(
extracted_content=ai_response,
output_format=outputFormat,
title=title
)
# Generate meaningful filename
from datetime import datetime, UTC
timestamp = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
filename = f"{title.replace(' ', '_')}_{timestamp}.{outputFormat}"
# Return structured result with document information
return {
"success": True,
"content": ai_response, # Raw AI response
"rendered_content": rendered_content, # Formatted content
"mime_type": mime_type,
"filename": filename,
"format": outputFormat,
"title": title,
"documents": [{
"documentName": filename,
"documentData": rendered_content,
"mimeType": mime_type
}]
}
except Exception as e:
logger.error(f"Error in document generation: {str(e)}")
return {
"success": False,
"error": str(e),
"content": "",
"rendered_content": "",
"mime_type": "text/plain",
"filename": f"error_{outputFormat}",
"format": outputFormat,
"title": title or "Error",
"documents": []
}