gateway/modules/services/serviceAi/subCoreAi.py

716 lines
31 KiB
Python

import json
import logging
from typing import Dict, Any, List, Optional, Tuple, Union
from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority
logger = logging.getLogger(__name__)
# Loop instruction texts for different formats
LoopInstructionTexts = {
"json": """
CRITICAL LIMITS: <TOKEN_LIMIT> tokens total (reserve 20% for JSON structure)
MANDATORY RULES:
1. STOP at approximately 80% of limit to ensure valid JSON completion
2. Return ONLY raw JSON (no ```json blocks, no text before/after)
CONTINUATION REQUIREMENTS:
Refer to the json object below where to set the "continuation" information:
- If you can complete the full request: {"continuation": null}
- If you must stop early: {
"continuation": {
"last_data_items": "delivered last data for context (copy them)",
"next_instruction": "instruction for next data to deliver"
}
}
BE CONSERVATIVE: Stop generating content when you reach approximately 3200-3500 characters to ensure JSON completion.
""",
# Add more formats here as needed
# "xml": "...",
# "text": "...",
}
class SubCoreAi:
"""Core AI operations including image analysis, text generation, and planning calls."""
def __init__(self, services, aiObjects):
"""Initialize core AI operations.
Args:
services: Service center instance for accessing other services
aiObjects: Initialized AiObjects instance
"""
self.services = services
self.aiObjects = aiObjects
# Shared Core Function for AI Calls with Looping
async def _callAiWithLooping(
self,
prompt: str,
options: AiCallOptions,
debugPrefix: str = "ai_call",
loopInstructionFormat: str = None
) -> str:
"""
Shared core function for AI calls with looping system.
Handles continuation logic when response needs multiple rounds.
Delivers prompt and response to debug file log.
Args:
prompt: The prompt to send to AI
options: AI call configuration options
debugPrefix: Prefix for debug file names
loopInstructionFormat: If provided, replaces LOOP_INSTRUCTION placeholder and includes in continuation prompts
Returns:
Complete AI response after all iterations
"""
max_iterations = 100 # Prevent infinite loops
iteration = 0
accumulatedContent = []
logger.debug(f"Starting AI call with looping (debug prefix: {debugPrefix}, loopInstructionFormat: {loopInstructionFormat is not None})")
# Determine loopInstruction based on loopInstructionFormat (before iterations)
if not loopInstructionFormat:
loopInstruction = ""
elif loopInstructionFormat in LoopInstructionTexts:
loopInstruction = LoopInstructionTexts[loopInstructionFormat]
else:
logger.error(f"Unsupported loopInstructionFormat for prompt: {loopInstructionFormat}")
loopInstruction = ""
while iteration < max_iterations:
iteration += 1
logger.debug(f"AI call iteration {iteration}/{max_iterations}")
# Build iteration prompt
if iteration == 1:
if "LOOP_INSTRUCTION" in prompt:
iterationPrompt = prompt.replace("LOOP_INSTRUCTION", loopInstruction)
else:
iterationPrompt = prompt
elif loopInstruction and iteration > 1:
continuationContent = self._buildContinuationContent(accumulatedContent, iteration)
if "LOOP_INSTRUCTION" in prompt:
iterationPrompt = prompt.replace("LOOP_INSTRUCTION", f"{continuationContent}\n\n{loopInstruction}")
else:
iterationPrompt = prompt
else:
iterationPrompt = prompt
# Make AI call
try:
from modules.datamodels.datamodelAi import AiCallRequest
request = AiCallRequest(
prompt=iterationPrompt,
context="",
options=options
)
# Write the ACTUAL prompt sent to AI (including continuation context)
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
response = await self.aiObjects.call(request)
result = response.content
# Write raw AI response to debug file
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# Emit stats for this iteration
self.services.workflow.storeWorkflowStat(
self.services.currentWorkflow,
response,
f"ai.call.{debugPrefix}.iteration_{iteration}"
)
if not result or not result.strip():
logger.warning(f"Iteration {iteration}: Empty response, stopping")
break
# Check if this is a continuation response (only for supported formats)
if loopInstructionFormat in LoopInstructionTexts:
try:
# Extract JSON substring if wrapped (e.g., ```json ... ```)
extracted = self.services.utils.jsonExtractString(result)
# Try to parse as JSON to check for continuation attribute
parsed_result = json.loads(extracted)
if isinstance(parsed_result, dict) and parsed_result.get("continuation") is not None:
# This is a continuation response
accumulatedContent.append(result)
logger.debug(f"Iteration {iteration}: Continuation detected in JSON, continuing...")
continue
else:
# This is the final response (continuation is null or missing)
accumulatedContent.append(result)
logger.debug(f"Iteration {iteration}: Final response received")
break
except json.JSONDecodeError:
# Not JSON, treat as final response
accumulatedContent.append(result)
logger.warning(f"Iteration {iteration}: Non-JSON response received")
self.services.utils.writeDebugFile(result, f"{debugPrefix}_error_non_json_response_iteration_{iteration}")
break
else:
# This is the final response
accumulatedContent.append(result)
logger.debug(f"Iteration {iteration}: Final response received")
break
except Exception as e:
logger.error(f"Error in AI call iteration {iteration}: {str(e)}")
break
if iteration >= max_iterations:
logger.warning(f"AI call stopped after maximum iterations ({max_iterations})")
# Intelligently merge JSON content from all iterations
final_result = self._mergeJsonContent(accumulatedContent) if accumulatedContent else ""
# Write final result to debug file
self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
logger.info(f"AI call completed: {len(accumulatedContent)} parts from {iteration} iterations")
return final_result
def _buildContinuationContent(
self,
accumulatedContent: List[str],
iteration: int
) -> str:
"""
Build continuation content for follow-up iterations.
"""
# Extract continuation description from the last response
continuation_description = ""
if accumulatedContent:
try:
last_response = accumulatedContent[-1]
# Use the same JSON extraction logic as the main loop
extracted = self.services.utils.jsonExtractString(last_response)
parsed_response = json.loads(extracted)
if isinstance(parsed_response, dict):
# Check for continuation at root level or in metadata
continuation = parsed_response.get("continuation")
if continuation is None and "metadata" in parsed_response:
continuation = parsed_response["metadata"].get("continuation")
if continuation:
continuation_description = continuation
except (json.JSONDecodeError, KeyError, ValueError):
pass
# Extract specific attributes from continuation object
last_data_items = ""
next_instruction = ""
if continuation_description:
try:
if isinstance(continuation_description, str):
continuation_obj = json.loads(continuation_description)
else:
continuation_obj = continuation_description
if isinstance(continuation_obj, dict):
last_data_items = continuation_obj.get("last_data_items", "")
next_instruction = continuation_obj.get("next_instruction", "")
except (json.JSONDecodeError, TypeError):
pass
continuation_content = f"""CONTINUATION REQUEST (Iteration {iteration}):
You are continuing a previous response. DO NOT repeat any previous content.
{f"Already delivered data: {last_data_items}" if last_data_items else "No previous data specified"}
{f"Your task to deliver: {next_instruction}" if next_instruction else "No specific task provided"}
CRITICAL REQUIREMENTS:
- Start from the exact point specified in continuation instructions
- DO NOT repeat any previous content
- BE CONSERVATIVE: Stop at approximately 3200-3500 characters to ensure JSON completion
- ALWAYS include continuation field - set to null if complete, or provide next instruction if incomplete
"""
return continuation_content
def _mergeJsonContent(self, accumulatedContent: List[str]) -> str:
"""
Generic JSON merger that combines all lists from multiple iterations.
Structure: root attributes + 1..n lists that get merged together.
"""
if not accumulatedContent:
return ""
if len(accumulatedContent) == 1:
return accumulatedContent[0]
try:
# Parse all JSON responses
parsed_responses = []
for content in accumulatedContent:
try:
extracted = self.services.utils.jsonExtractString(content)
parsed = json.loads(extracted)
parsed_responses.append(parsed)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON content: {str(e)}")
continue
if not parsed_responses:
return accumulatedContent[0] # Return first response if all parsing failed
# Start with first response as base
merged = parsed_responses[0].copy()
# Merge all lists from all responses
for response in parsed_responses[1:]:
for key, value in response.items():
if isinstance(value, list) and key in merged and isinstance(merged[key], list):
# Merge lists by extending
merged[key].extend(value)
elif key not in merged:
# Add new fields
merged[key] = value
# Mark as complete
merged["continuation"] = None
return json.dumps(merged, indent=2)
except Exception as e:
logger.error(f"Error merging JSON content: {str(e)}")
return accumulatedContent[0] # Return first response on error
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
"""
Build full prompt by replacing placeholders with their content.
Uses the new {{KEY:placeholder}} format.
"""
if not placeholders:
return prompt
full_prompt = prompt
for placeholder, content in placeholders.items():
# Replace both old format {{placeholder}} and new format {{KEY:placeholder}}
full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content)
full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content)
return full_prompt
async def _buildGenerationPrompt(
self,
prompt: str,
extracted_content: Optional[str],
outputFormat: str,
title: str
) -> str:
"""
Build generation prompt for document generation.
"""
from modules.services.serviceGeneration.subPromptBuilder import buildGenerationPrompt
# Build the generation prompt using the existing system
generation_prompt = await buildGenerationPrompt(
outputFormat=outputFormat,
userPrompt=prompt,
title=title
)
# If we have extracted content, prepend it to the prompt
if extracted_content:
generation_prompt = f"""EXTRACTED CONTENT FROM DOCUMENTS:
{extracted_content}
{generation_prompt}"""
return generation_prompt
# Planning AI Call
async def callAiPlanning(
self,
prompt: str,
placeholders: Optional[List[PromptPlaceholder]] = None,
options: Optional[AiCallOptions] = None,
loopInstructionFormat: Optional[str] = None
) -> str:
"""
Planning AI call for task planning, action planning, action selection, etc.
Args:
prompt: The planning prompt
placeholders: Optional list of placeholder replacements
options: AI call configuration options
Returns:
Planning JSON response
"""
if options is None:
options = AiCallOptions()
# Build full prompt with placeholders
if placeholders:
placeholders_dict = {p.label: p.content for p in placeholders}
full_prompt = self._buildPromptWithPlaceholders(prompt, placeholders_dict)
else:
full_prompt = prompt
# Use shared core function with planning-specific debug prefix
return await self._callAiWithLooping(full_prompt, options, "planning", loopInstructionFormat=loopInstructionFormat)
# Document Generation AI Call
async def callAiDocuments(
self,
prompt: str,
documents: Optional[List[ChatDocument]] = None,
options: Optional[AiCallOptions] = None,
outputFormat: Optional[str] = None,
title: Optional[str] = None,
loopInstructionFormat: Optional[str] = None
) -> Union[str, Dict[str, Any]]:
"""
Document generation AI call for all non-planning calls.
Uses the current unified path with extraction and generation.
Args:
prompt: The main prompt for the AI call
documents: Optional list of documents to process
options: AI call configuration options
outputFormat: Optional output format for document generation
title: Optional title for generated documents
Returns:
AI response as string, or dict with documents if outputFormat is specified
"""
if options is None:
options = AiCallOptions()
# Handle document generation with specific output format using unified approach
if outputFormat:
# Use unified generation method for all document generation
if documents and len(documents) > 0:
logger.info(f"Extracting content from {len(documents)} documents")
extracted_content = await self.services.ai.documentProcessor.callAiText(prompt, documents, options)
else:
logger.info("No documents provided - using direct generation")
extracted_content = None
generation_prompt = await self._buildGenerationPrompt(prompt, extracted_content, outputFormat, title)
generated_json = await self._callAiWithLooping(generation_prompt, options, "document_generation", loopInstructionFormat=loopInstructionFormat)
# Parse the generated JSON (extract fenced/embedded JSON first)
try:
extracted_json = self.services.utils.jsonExtractString(generated_json)
generated_data = json.loads(extracted_json)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse generated JSON: {str(e)}")
logger.error(f"JSON content length: {len(generated_json)}")
logger.error(f"JSON content preview (last 200 chars): ...{generated_json[-200:]}")
logger.error(f"JSON content around error position: {generated_json[max(0, e.pos-50):e.pos+50]}")
# Write the problematic JSON to debug file
self.services.utils.writeDebugFile(generated_json, "failed_json_parsing")
return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"}
# Render to final format using the existing renderer
try:
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generationService = GenerationService(self.services)
rendered_content, mime_type = await generationService.renderReport(
generated_data, outputFormat, title or "Generated Document", prompt, self
)
# Build result in the expected format
result = {
"success": True,
"content": generated_data,
"documents": [{
"documentName": f"generated.{outputFormat}",
"documentData": rendered_content,
"mimeType": mime_type,
"title": title or "Generated Document"
}],
"is_multi_file": False,
"format": outputFormat,
"title": title,
"split_strategy": "single",
"total_documents": 1,
"processed_documents": 1
}
# Log AI response for debugging
self.services.utils.writeDebugFile(str(result), "documentGenerationResponse", documents)
return result
except Exception as e:
logger.error(f"Error rendering document: {str(e)}")
return {"success": False, "error": f"Rendering failed: {str(e)}"}
# Handle text calls (no output format specified)
if documents:
# Use document processing for text calls with documents
result = await self.services.ai.documentProcessor.callAiText(prompt, documents, options)
else:
# Use shared core function for direct text calls
result = await self._callAiWithLooping(prompt, options, "text", loopInstructionFormat=None)
return result
# AI Image Analysis
async def readImage(
self,
prompt: str,
imageData: Union[str, bytes],
mimeType: str = None,
options: Optional[AiCallOptions] = None,
) -> str:
"""Call AI for image analysis using interface.callImage()."""
try:
# Check if imageData is valid
if not imageData:
error_msg = "No image data provided"
self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {error_msg}")
return f"Error: {error_msg}"
self.services.utils.debugLogToFile(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}", "AI_SERVICE")
logger.info(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}")
# Always use IMAGE_ANALYSIS operation type for image processing
if options is None:
options = AiCallOptions(operationType=OperationType.IMAGE_ANALYSIS)
else:
# Override the operation type to ensure image analysis
options.operationType = OperationType.IMAGE_ANALYSIS
self.services.utils.debugLogToFile(f"Calling aiObjects.callImage with operationType: {options.operationType}", "AI_SERVICE")
logger.info(f"Calling aiObjects.callImage with operationType: {options.operationType}")
response = await self.aiObjects.callImage(prompt, imageData, mimeType, options)
# Emit stats for image analysis
self.services.workflow.storeWorkflowStat(
self.services.currentWorkflow,
response,
f"ai.image.{options.operationType}"
)
# Debug the result
self.services.utils.debugLogToFile(f"Raw AI result type: {type(response)}, value: {repr(response)}", "AI_SERVICE")
# Extract content from response
result = response.content if hasattr(response, 'content') else str(response)
# Check if result is valid
if not result or (isinstance(result, str) and not result.strip()):
error_msg = f"No response from AI image analysis (result: {repr(result)})"
self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {error_msg}")
return f"Error: {error_msg}"
self.services.utils.debugLogToFile(f"callImage returned: {result[:200]}..." if len(result) > 200 else result, "AI_SERVICE")
logger.info(f"callImage returned: {result[:200]}..." if len(result) > 200 else result)
return result
except Exception as e:
self.services.utils.debugLogToFile(f"Error in AI image analysis: {str(e)}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {str(e)}")
return f"Error: {str(e)}"
# AI Image Generation
async def generateImage(
self,
prompt: str,
size: str = "1024x1024",
quality: str = "standard",
style: str = "vivid",
options: Optional[AiCallOptions] = None,
) -> Dict[str, Any]:
"""Generate an image using AI using interface.generateImage()."""
try:
response = await self.aiObjects.generateImage(prompt, size, quality, style, options)
# Emit stats for image generation
self.services.workflow.storeWorkflowStat(
self.services.currentWorkflow,
response,
f"ai.generate.image"
)
# Convert response to dict format for backward compatibility
if hasattr(response, 'content'):
return {
"success": True,
"content": response.content,
"modelName": response.modelName,
"priceUsd": response.priceUsd,
"processingTime": response.processingTime
}
else:
return response
except Exception as e:
logger.error(f"Error in AI image generation: {str(e)}")
return {"success": False, "error": str(e)}
def _determineCallType(self, documents: Optional[List[ChatDocument]], operation_type: str) -> str:
"""
Determine call type based on documents and operation type.
Criteria: no documents AND operationType is "generate_plan" -> planning
All other cases -> text
"""
has_documents = documents is not None and len(documents) > 0
is_planning_operation = operation_type == OperationType.GENERATE_PLAN
if not has_documents and is_planning_operation:
return "planning"
else:
return "text"
def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions) -> Dict[str, int]:
"""
Get model capabilities for content processing, including appropriate size limits for chunking.
"""
# Estimate total content size
prompt_size = len(prompt.encode('utf-8'))
document_size = 0
if documents:
# Rough estimate of document content size
for doc in documents:
document_size += doc.fileSize or 0
total_size = prompt_size + document_size
# Use AiObjects to select the best model for this content size
# We'll simulate the model selection by checking available models
from modules.interfaces.interfaceAiObjects import aiModels
# Find the best model for this content size and operation
best_model = None
best_context_length = 0
for model_name, model_info in aiModels.items():
context_length = model_info.get("contextLength", 0)
# Skip models with no context length or too small for content
if context_length == 0:
continue
# Check if model supports the operation type
capabilities = model_info.get("capabilities", [])
if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities:
continue
elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities:
continue
elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities:
continue
elif "text_generation" not in capabilities:
continue
# Prefer models that can handle the content without chunking, but allow chunking if needed
if context_length >= total_size * 0.8: # 80% of content size
if context_length > best_context_length:
best_model = model_info
best_context_length = context_length
elif best_model is None: # Fallback to largest available model
if context_length > best_context_length:
best_model = model_info
best_context_length = context_length
# Fallback to a reasonable default if no model found
if best_model is None:
best_model = {
"contextLength": 128000, # GPT-4o default
"llmName": "gpt-4o"
}
# Calculate appropriate sizes
# Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters)
context_length_bytes = int(best_model["contextLength"] * 4)
max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length
text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks
image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks
logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}")
logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes")
logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes")
return {
"maxContextBytes": max_context_bytes,
"textChunkSize": text_chunk_size,
"imageChunkSize": image_chunk_size
}
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
"""
Build full prompt by replacing placeholders with their content.
Uses the new {{KEY:placeholder}} format.
"""
if not placeholders:
return prompt
full_prompt = prompt
for placeholder, content in placeholders.items():
# Replace both old format {{placeholder}} and new format {{KEY:placeholder}}
full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content)
full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content)
return full_prompt
def _reducePlanningPrompt(
self,
full_prompt: str,
placeholders: Optional[Dict[str, str]],
model: ModelCapabilities,
options: AiCallOptions
) -> str:
"""
Reduce planning prompt size by summarizing placeholders while preserving prompt structure.
"""
if not placeholders:
return self._reduceText(full_prompt, 0.7)
# Reduce placeholders while preserving prompt
reduced_placeholders = {}
for placeholder, content in placeholders.items():
if len(content) > 1000: # Only reduce long content
reduction_factor = 0.7
reduced_content = self._reduceText(content, reduction_factor)
reduced_placeholders[placeholder] = reduced_content
else:
reduced_placeholders[placeholder] = content
return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders)
def _extractTextFromContentParts(self, extracted_content) -> str:
"""
Extract text content from ExtractionService ContentPart objects.
"""
if not extracted_content or not hasattr(extracted_content, 'parts'):
return ""
text_parts = []
for part in extracted_content.parts:
if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']:
if hasattr(part, 'data') and part.data:
text_parts.append(part.data)
return "\n\n".join(text_parts)
def _reduceText(self, text: str, reduction_factor: float) -> str:
"""
Reduce text size by the specified factor.
"""
if reduction_factor >= 1.0:
return text
target_length = int(len(text) * reduction_factor)
return text[:target_length] + "... [reduced]"