gateway/modules/services/serviceAi/subCoreAi.py

679 lines
30 KiB
Python

import json
import logging
from typing import Dict, Any, List, Optional, Tuple, Union
from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
from modules.services.serviceAi.subSharedAiUtils import (
buildPromptWithPlaceholders,
extractTextFromContentParts,
reduceText,
determineCallType
)
from modules.shared.jsonUtils import (
extractJsonString,
repairBrokenJson,
extractSectionsFromDocument,
buildContinuationContext
)
logger = logging.getLogger(__name__)
# Repair-based looping system - no longer needs LOOP_INSTRUCTION_TEXT
# Sections are accumulated and repair mechanism handles broken JSON automatically
# Rebuild the model to resolve forward references
AiCallRequest.model_rebuild()
class SubCoreAi:
"""Core AI operations including image analysis, text generation, and planning calls."""
def __init__(self, services, aiObjects):
"""Initialize core AI operations.
Args:
services: Service center instance for accessing other services
aiObjects: Initialized AiObjects instance
"""
self.services = services
self.aiObjects = aiObjects
async def _analyzePromptAndCreateOptions(self, prompt: str) -> AiCallOptions:
"""Analyze prompt to determine appropriate AiCallOptions parameters."""
try:
# Get dynamic enum values from Pydantic models
operation_types = [e.value for e in OperationTypeEnum]
priorities = [e.value for e in PriorityEnum]
processing_modes = [e.value for e in ProcessingModeEnum]
# Create analysis prompt for AI to determine operation type and parameters
analysisPrompt = f"""
You are an AI operation analyzer. Analyze the following prompt and determine the most appropriate operation type and parameters.
PROMPT TO ANALYZE:
{self.services.ai.sanitizePromptContent(prompt, 'userinput')}
Based on the prompt content, determine:
1. operationType: Choose the most appropriate from: {', '.join(operation_types)}
2. priority: Choose from: {', '.join(priorities)}
3. processingMode: Choose from: {', '.join(processing_modes)}
4. compressPrompt: true/false (true for story-like prompts, false for structured prompts with JSON/schemas)
5. compressContext: true/false (true to summarize context, false to process fully)
Respond with ONLY a JSON object in this exact format:
{{
"operationType": "dataAnalyse",
"priority": "balanced",
"processingMode": "basic",
"compressPrompt": true,
"compressContext": true
}}
"""
# Use AI to analyze the prompt
request = AiCallRequest(
prompt=analysisPrompt,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.SPEED,
processingMode=ProcessingModeEnum.BASIC,
compressPrompt=True,
compressContext=False
)
)
response = await self.aiObjects.call(request)
# Parse AI response
try:
import json
json_start = response.content.find('{')
json_end = response.content.rfind('}') + 1
if json_start != -1 and json_end > json_start:
analysis = json.loads(response.content[json_start:json_end])
# Map string values to enums
operation_type = OperationTypeEnum(analysis.get('operationType', 'dataAnalyse'))
priority = PriorityEnum(analysis.get('priority', 'balanced'))
processing_mode = ProcessingModeEnum(analysis.get('processingMode', 'basic'))
return AiCallOptions(
operationType=operation_type,
priority=priority,
processingMode=processing_mode,
compressPrompt=analysis.get('compressPrompt', True),
compressContext=analysis.get('compressContext', True)
)
except Exception as e:
logger.warning(f"Failed to parse AI analysis response: {e}")
except Exception as e:
logger.warning(f"Prompt analysis failed: {e}")
# Fallback to default options
return AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.BASIC
)
# Shared Core Function for AI Calls with Looping and Repair
async def _callAiWithLooping(
self,
prompt: str,
options: AiCallOptions,
debugPrefix: str = "ai_call",
promptBuilder: Optional[callable] = None,
promptArgs: Optional[Dict[str, Any]] = None
) -> str:
"""
Shared core function for AI calls with repair-based looping system.
Automatically repairs broken JSON and continues generation seamlessly.
Args:
prompt: The prompt to send to AI
options: AI call configuration options
debugPrefix: Prefix for debug file names
Returns:
Complete AI response after all iterations
"""
max_iterations = 50 # Prevent infinite loops
iteration = 0
allSections = [] # Accumulate all sections across iterations
lastRawResponse = None # Store last raw JSON response for continuation
logger.debug(f"Starting AI call with repair-based looping (debug prefix: {debugPrefix})")
while iteration < max_iterations:
iteration += 1
logger.debug(f"AI call iteration {iteration}/{max_iterations}")
# Build iteration prompt
if len(allSections) > 0 and promptBuilder and promptArgs:
# This is a continuation - build continuation context with raw JSON and rebuild prompt
continuationContext = buildContinuationContext(allSections, lastRawResponse)
logger.info(f"Continuation context: {continuationContext.get('section_count')} sections")
if lastRawResponse:
logger.debug(f"Iteration {iteration}: Including previous response in continuation context ({len(lastRawResponse)} chars)")
else:
logger.warning(f"Iteration {iteration}: No previous response available for continuation!")
# Rebuild prompt with continuation context using the provided prompt builder
iterationPrompt = await promptBuilder(**promptArgs, continuationContext=continuationContext)
logger.debug(f"Rebuilt prompt with continuation context for iteration {iteration}")
else:
# First iteration - use original prompt
iterationPrompt = prompt
# Make AI call
try:
from modules.datamodels.datamodelAi import AiCallRequest
request = AiCallRequest(
prompt=iterationPrompt,
context="",
options=options
)
# Write the ACTUAL prompt sent to AI
if iteration == 1:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt")
else:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
response = await self.aiObjects.call(request)
result = response.content
# Debug: Check response immediately from API
if iteration == 1 and result:
first_chars = result[:200].replace('\n', '\\n').replace('\r', '\\r')
logger.debug(f"Iteration 1: Raw API response starts with (first 200 chars): '{first_chars}'")
if result.strip().startswith('},') or result.strip().startswith('],'):
logger.error(f"Iteration 1: API returned fragment! Full start: '{result[:200]}'")
# Write raw AI response to debug file
if iteration == 1:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
else:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# Emit stats for this iteration
self.services.workflow.storeWorkflowStat(
self.services.currentWorkflow,
response,
f"ai.call.{debugPrefix}.iteration_{iteration}"
)
if not result or not result.strip():
logger.warning(f"Iteration {iteration}: Empty response, stopping")
break
# Store raw response for continuation (even if broken)
lastRawResponse = result
# Check for complete_response flag in raw response (before parsing)
import re
if re.search(r'"complete_response"\s*:\s*true', result, re.IGNORECASE):
logger.info(f"Iteration {iteration}: Detected complete_response flag in raw response")
# Extract sections from response (handles both valid and broken JSON)
extractedSections, wasJsonComplete = self._extractSectionsFromResponse(result, iteration, debugPrefix)
if not extractedSections:
# If we're in continuation mode and JSON was incomplete, don't stop - continue to allow retry
if iteration > 1 and not wasJsonComplete:
logger.warning(f"Iteration {iteration}: No sections extracted from continuation fragment, continuing for another attempt")
continue
# Otherwise, stop if no sections
logger.warning(f"Iteration {iteration}: No sections extracted, stopping")
break
# Add new sections to accumulator
allSections.extend(extractedSections)
logger.info(f"Iteration {iteration}: Extracted {len(extractedSections)} sections (total: {len(allSections)})")
# Check if we should continue (completion detection)
if self._shouldContinueGeneration(allSections, iteration, wasJsonComplete, result):
logger.debug(f"Iteration {iteration}: Continuing generation")
continue
else:
# Done - build final result
logger.info(f"Iteration {iteration}: Generation complete")
break
except Exception as e:
logger.error(f"Error in AI call iteration {iteration}: {str(e)}")
break
if iteration >= max_iterations:
logger.warning(f"AI call stopped after maximum iterations ({max_iterations})")
# Build final result from accumulated sections
final_result = self._buildFinalResultFromSections(allSections)
# Write final result to debug file
self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
logger.info(f"AI call completed: {len(allSections)} total sections from {iteration} iterations")
return final_result
def _extractSectionsFromResponse(
self,
result: str,
iteration: int,
debugPrefix: str
) -> Tuple[List[Dict[str, Any]], bool]:
"""
Extract sections from AI response, handling both valid and broken JSON.
Uses repair mechanism for broken JSON.
Checks for "complete_response": true flag to determine completion.
Returns (sections, wasJsonComplete)
"""
# First, try to parse as valid JSON
try:
extracted = extractJsonString(result)
parsed_result = json.loads(extracted)
# Check if AI marked response as complete
isComplete = parsed_result.get("complete_response", False) == True
if isComplete:
logger.info(f"Iteration {iteration}: AI marked response as complete (complete_response: true)")
# Extract sections from parsed JSON
sections = extractSectionsFromDocument(parsed_result)
logger.debug(f"Iteration {iteration}: Valid JSON - extracted {len(sections)} sections")
# If AI marked as complete, always return as complete
if isComplete:
return sections, True
# If in continuation mode (iteration > 1), continuation responses are expected to be fragments
# A fragment with 0 extractable sections means JSON is incomplete - need another iteration
# Don't use repair mechanism - just mark as incomplete so loop continues
if len(sections) == 0 and iteration > 1:
logger.info(f"Iteration {iteration}: Continuation fragment with 0 extractable sections - JSON incomplete, continuing")
return sections, False # Mark as incomplete so loop continues
# First iteration with 0 sections means empty response - stop
if len(sections) == 0:
return sections, True # Complete but empty
return sections, True # JSON was complete with sections
except json.JSONDecodeError as e:
# Broken JSON - try repair mechanism (normal in iterative generation)
logger.info(f"Iteration {iteration}: JSON incomplete/broken, attempting repair: {str(e)}")
self.services.utils.writeDebugFile(result, f"{debugPrefix}_broken_json_iteration_{iteration}")
# Try to repair
repaired_json = repairBrokenJson(result)
if repaired_json:
# Extract sections from repaired JSON
sections = extractSectionsFromDocument(repaired_json)
logger.info(f"Iteration {iteration}: Repaired JSON - extracted {len(sections)} sections")
return sections, False # JSON was broken but repaired
else:
# Repair failed - log error
logger.error(f"Iteration {iteration}: All repair strategies failed")
return [], False
except Exception as e:
logger.error(f"Iteration {iteration}: Unexpected error during parsing: {str(e)}")
return [], False
def _shouldContinueGeneration(
self,
allSections: List[Dict[str, Any]],
iteration: int,
wasJsonComplete: bool,
rawResponse: str = None
) -> bool:
"""
Determine if generation should continue based on JSON completeness and complete_response flag.
Returns True if we should continue, False if done.
"""
if len(allSections) == 0:
return True # No sections yet, continue
# Check for complete_response flag in raw response
if rawResponse:
import re
# Look for complete_response: true pattern (allowing for whitespace variations)
if re.search(r'"complete_response"\s*:\s*true', rawResponse, re.IGNORECASE):
logger.info("AI marked response as complete (complete_response: true) - stopping generation")
return False
# If JSON was complete (and no complete_response flag), we're done
# If JSON was broken and repaired, continue to get more content
if wasJsonComplete:
logger.info("JSON was complete - stopping generation")
return False
else:
logger.info("JSON was broken/repaired - continuing generation")
return True
def _buildFinalResultFromSections(
self,
allSections: List[Dict[str, Any]]
) -> str:
"""
Build final JSON result from accumulated sections.
"""
if not allSections:
return ""
# Build documents structure
# Assuming single document for now
documents = [{
"id": "doc_1",
"title": "Generated Document", # This should come from prompt
"filename": "document.json",
"sections": allSections
}]
result = {
"metadata": {
"split_strategy": "single_document",
"source_documents": [],
"extraction_method": "ai_generation"
},
"documents": documents
}
return json.dumps(result, indent=2)
# Old _buildContinuationPrompt and _mergeJsonContent methods removed
# Now handled by repair mechanism in jsonUtils.py and section accumulation
# Planning AI Call
async def callAiPlanning(
self,
prompt: str,
placeholders: Optional[List[PromptPlaceholder]] = None
) -> str:
"""
Planning AI call for task planning, action planning, action selection, etc.
Always uses static parameters optimized for planning tasks.
Args:
prompt: The planning prompt
placeholders: Optional list of placeholder replacements
Returns:
Planning JSON response
"""
# Planning calls always use static parameters
logger.debug("Using static parameters for planning call")
options = AiCallOptions(
operationType=OperationTypeEnum.PLAN,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
compressPrompt=False,
compressContext=False
)
# Build full prompt with placeholders
if placeholders:
placeholders_dict = {p.label: p.content for p in placeholders}
full_prompt = buildPromptWithPlaceholders(prompt, placeholders_dict)
else:
full_prompt = prompt
# Use shared core function with planning-specific debug prefix
return await self._callAiWithLooping(full_prompt, options, "plan")
# Document Generation AI Call
async def callAiDocuments(
self,
prompt: str,
documents: Optional[List[ChatDocument]] = None,
options: Optional[AiCallOptions] = None,
outputFormat: Optional[str] = None,
title: Optional[str] = None
) -> Union[str, Dict[str, Any]]:
"""
Document generation AI call for all non-planning calls.
Uses the current unified path with extraction and generation.
Args:
prompt: The main prompt for the AI call
documents: Optional list of documents to process
options: AI call configuration options
outputFormat: Optional output format for document generation
title: Optional title for generated documents
Returns:
AI response as string, or dict with documents if outputFormat is specified
"""
if options is None or (hasattr(options, 'operationType') and options.operationType is None):
# Use AI to determine parameters ONLY when truly needed (options=None OR operationType=None)
logger.debug("Analyzing prompt to determine optimal parameters")
options = await self._analyzePromptAndCreateOptions(prompt)
else:
logger.debug(f"Using provided options: operationType={options.operationType}, priority={options.priority}")
# CRITICAL: For document generation with JSON templates, NEVER compress the prompt
# Compressing would truncate the template structure and confuse the AI
if outputFormat: # Document generation with structured output
if not options:
options = AiCallOptions()
options.compressPrompt = False # JSON templates must NOT be truncated
options.compressContext = False # Context also should not be compressed
logger.debug("Document generation detected - disabled prompt/context compression")
# Handle document generation with specific output format using unified approach
if outputFormat:
# Use unified generation method for all document generation
if documents and len(documents) > 0:
logger.debug(f"Extracting content from {len(documents)} documents")
extracted_content = await self.services.ai.documentProcessor.callAiText(prompt, documents, options)
else:
logger.debug("No documents provided - using direct generation")
extracted_content = None
logger.debug(f"[DEBUG] title value: {title}, type: {type(title)}")
from modules.services.serviceGeneration.subPromptBuilderGeneration import buildGenerationPrompt
# First call without continuation context
generation_prompt = await buildGenerationPrompt(outputFormat, prompt, title, extracted_content, None)
# Prepare prompt builder arguments for continuation
promptArgs = {
"outputFormat": outputFormat,
"userPrompt": prompt,
"title": title,
"extracted_content": extracted_content
}
generated_json = await self._callAiWithLooping(
generation_prompt,
options,
"document_generation",
buildGenerationPrompt,
promptArgs
)
# Parse the generated JSON (extract fenced/embedded JSON first)
try:
extracted_json = self.services.utils.jsonExtractString(generated_json)
generated_data = json.loads(extracted_json)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse generated JSON: {str(e)}")
logger.error(f"JSON content length: {len(generated_json)}")
logger.error(f"JSON content preview (last 200 chars): ...{generated_json[-200:]}")
logger.error(f"JSON content around error position: {generated_json[max(0, e.pos-50):e.pos+50]}")
# Write the problematic JSON to debug file
self.services.utils.writeDebugFile(generated_json, "failed_json_parsing")
return {"success": False, "error": f"Generated content is not valid JSON: {str(e)}"}
# Render to final format using the existing renderer
try:
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generationService = GenerationService(self.services)
rendered_content, mime_type = await generationService.renderReport(
generated_data, outputFormat, title or "Generated Document", prompt, self
)
# Build result in the expected format
result = {
"success": True,
"content": generated_data,
"documents": [{
"documentName": f"generated.{outputFormat}",
"documentData": rendered_content,
"mimeType": mime_type,
"title": title or "Generated Document"
}],
"is_multi_file": False,
"format": outputFormat,
"title": title,
"split_strategy": "single",
"total_documents": 1,
"processed_documents": 1
}
# Log AI response for debugging
self.services.utils.writeDebugFile(str(result), "document_generation_response", documents)
return result
except Exception as e:
logger.error(f"Error rendering document: {str(e)}")
return {"success": False, "error": f"Rendering failed: {str(e)}"}
# Handle text calls (no output format specified)
if documents:
# Use document processing for text calls with documents
result = await self.services.ai.documentProcessor.callAiText(prompt, documents, options)
else:
# Use shared core function for direct text calls
result = await self._callAiWithLooping(prompt, options, "text")
return result
# AI Image Analysis
async def readImage(
self,
prompt: str,
imageData: Union[str, bytes],
mimeType: str = None,
options: Optional[AiCallOptions] = None,
) -> str:
"""Call AI for image analysis using interface.call() with contentParts."""
try:
# Check if imageData is valid
if not imageData:
error_msg = "No image data provided"
self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {error_msg}")
return f"Error: {error_msg}"
self.services.utils.debugLogToFile(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}", "AI_SERVICE")
logger.info(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}")
# Always use IMAGE_ANALYSE operation type for image processing
if options is None:
options = AiCallOptions(operationType=OperationTypeEnum.IMAGE_ANALYSE)
else:
# Override the operation type to ensure image analysis
options.operationType = OperationTypeEnum.IMAGE_ANALYSE
# Create content parts with image data
from modules.datamodels.datamodelExtraction import ContentPart
import base64
# ContentPart.data must be a string - convert bytes to base64 if needed
if isinstance(imageData, bytes):
imageDataStr = base64.b64encode(imageData).decode('utf-8')
else:
# Already a base64 string
imageDataStr = imageData
imagePart = ContentPart(
id="image_0",
parentId=None,
label="Image",
typeGroup="image",
mimeType=mimeType or "image/jpeg",
data=imageDataStr, # Must be a string (base64 encoded)
metadata={"imageAnalysis": True}
)
# Create request with content parts
from modules.datamodels.datamodelAi import AiCallRequest
request = AiCallRequest(
prompt=prompt,
context="",
options=options,
contentParts=[imagePart]
)
self.services.utils.debugLogToFile(f"Calling aiObjects.call() with operationType: {options.operationType}", "AI_SERVICE")
logger.info(f"Calling aiObjects.call() with operationType: {options.operationType}")
# Write image analysis prompt to debug file
self.services.utils.writeDebugFile(prompt, "image_analysis_prompt")
response = await self.aiObjects.call(request)
# Write image analysis response to debug file
# response is an AiCallResponse object
result = response.content
self.services.utils.writeDebugFile(result, "image_analysis_response")
# Debug the result
self.services.utils.debugLogToFile(f"AI image analysis result type: {type(response)}, content length: {len(result)}", "AI_SERVICE")
# Check if result is valid
if not result or (isinstance(result, str) and not result.strip()):
error_msg = f"No response from AI image analysis (result: {repr(result)})"
self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {error_msg}")
return f"Error: {error_msg}"
self.services.utils.debugLogToFile(f"callImage returned: {result[:200]}..." if len(result) > 200 else result, "AI_SERVICE")
logger.info(f"callImage returned: {result[:200]}..." if len(result) > 200 else result)
return result
except Exception as e:
self.services.utils.debugLogToFile(f"Error in AI image analysis: {str(e)}", "AI_SERVICE")
logger.error(f"Error in AI image analysis: {str(e)}")
return f"Error: {str(e)}"
# AI Image Generation
async def generateImage(
self,
prompt: str,
size: str = "1024x1024",
quality: str = "standard",
style: str = "vivid",
options: Optional[AiCallOptions] = None,
) -> Dict[str, Any]:
"""Generate an image using AI using interface.generateImage()."""
try:
response = await self.aiObjects.generateImage(prompt, size, quality, style, options)
# Emit stats for image generation
self.services.workflow.storeWorkflowStat(
self.services.currentWorkflow,
response,
f"ai.generate.image"
)
# Convert response to dict format for backward compatibility
if hasattr(response, 'content'):
return {
"success": True,
"content": response.content,
"modelName": response.modelName,
"priceUsd": response.priceUsd,
"processingTime": response.processingTime
}
else:
return response
except Exception as e:
logger.error(f"Error in AI image generation: {str(e)}")
return {"success": False, "error": str(e)}